Ignore missing channel path on pulse map

This commit is contained in:
Dominik Werder
2023-01-06 08:13:41 +01:00
parent 3ded7c6136
commit f781166053
6 changed files with 40 additions and 16 deletions

View File

@@ -20,7 +20,7 @@ bytes = "1.3"
pin-project = "1"
#async-channel = "1"
#dashmap = "3"
scylla = "0.6.1"
scylla = "0.7"
async-channel = "1.6"
chrono = "0.4"
regex = "1.7.0"

View File

@@ -276,6 +276,12 @@ impl From<serde_json::Error> for Error {
}
}
impl<T> From<async_channel::SendError<T>> for Error {
fn from(k: async_channel::SendError<T>) -> Self {
Self::with_msg(format!("{:?}", k))
}
}
impl From<async_channel::RecvError> for Error {
fn from(k: async_channel::RecvError) -> Self {
Self::with_msg(k.to_string())

View File

@@ -21,7 +21,7 @@ tracing-futures = "0.2"
async-channel = "1.6"
itertools = "0.10.1"
chrono = "0.4.23"
scylla = "0.6.1"
scylla = "0.7"
md-5 = "0.10"
regex = "1.7"
err = { path = "../err" }

View File

@@ -65,7 +65,7 @@ async fn make_tables(node_config: &NodeConfigCached) -> Result<(), Error> {
fn timer_channel_names() -> Vec<String> {
let sections = vec!["SINEG01", "SINSB01", "SINSB02", "SINSB03", "SINSB04", "SINXB01"];
let suffixes = vec!["MASTER"];
let all: Vec<_> = sections
let mut all: Vec<_> = sections
.iter()
.map(|sec| {
suffixes
@@ -74,6 +74,7 @@ fn timer_channel_names() -> Vec<String> {
})
.flatten()
.collect();
all.push("SIN-CVME-TIFGUN-EVR0:RX-PULSEID".into());
all
}
@@ -254,6 +255,7 @@ impl IndexFullHttpFunction {
let mut msg = format!("Index channel {}", channel_name);
let files = datafiles_for_channel(channel_name.clone(), node_config).await?;
msg = format!("{}\n{:?}", msg, files);
let mut latest_pair = (0, 0);
for path in files {
let splitted: Vec<_> = path.to_str().unwrap().split("/").collect();
let timebin: u64 = splitted[splitted.len() - 3].parse()?;
@@ -271,6 +273,9 @@ impl IndexFullHttpFunction {
let (r3, _file) = read_last_chunk(file, r2.pos, r2.len).await?;
msg = format!("{}\n{:?}", msg, r3);
if let Some(r3) = r3 {
if r3.pulse > latest_pair.0 {
latest_pair = (r3.pulse, r3.ts);
}
// TODO remove update of static columns when older clients are removed.
let sql = "insert into map_pulse_files (channel, split, timebin, pulse_min, pulse_max, hostname) values ($1, $2, $3, $4, $5, $6) on conflict (channel, split, timebin) do update set pulse_min = $4, pulse_max = $5, upc1 = map_pulse_files.upc1 + 1, hostname = $6";
conn.execute(
@@ -290,6 +295,7 @@ impl IndexFullHttpFunction {
warn!("could not find first event chunk in {path:?}");
}
}
info!("latest for {channel_name} {latest_pair:?}");
Ok(msg)
}
@@ -307,7 +313,7 @@ impl IndexFullHttpFunction {
}
Err(e) => {
error!("error while indexing {} {:?}", channel_name, e);
return Err(e);
//return Err(e);
}
}
}
@@ -630,17 +636,29 @@ impl MapPulseLocalHttpFunction {
//let mut msg = String::new();
//use std::fmt::Write;
//write!(&mut msg, "cands: {:?}\n", cands)?;
let mut tss = vec![];
let mut channels = vec![];
for cand in cands {
let mut tss = Vec::new();
let mut channels = Vec::new();
for (ch, _, tb, sp) in cands {
let ks = 2;
let path = disk::paths::data_path_tb(ks, &cand.0, cand.2, 86400000, cand.3, &node_config.node)?;
//write!(&mut msg, "data_path_tb: {:?}\n", path)?;
let ts = search_pulse(pulse, &path).await?;
//write!(&mut msg, "SEARCH: {:?} for {}\n", ts, pulse)?;
if let Some(ts) = ts {
tss.push(ts);
channels.push(cand.0);
match disk::paths::data_path_tb(ks, &ch, tb, 86400000, sp, &node_config.node) {
Ok(path) => {
//write!(&mut msg, "data_path_tb: {:?}\n", path)?;
match search_pulse(pulse, &path).await {
Ok(ts) => {
//write!(&mut msg, "SEARCH: {:?} for {}\n", ts, pulse)?;
if let Some(ts) = ts {
tss.push(ts);
channels.push(ch);
}
}
Err(e) => {
warn!("can not map pulse with {ch} {sp} {tb} {e}");
}
}
}
Err(e) => {
warn!("can not get path to files {ch} {e}");
}
}
}
let ret = LocalMap { pulse, tss, channels };

View File

@@ -22,7 +22,7 @@ byteorder = "1.4.3"
futures-util = "0.3.14"
tracing = "0.1.25"
hex = "0.4.3"
scylla = "0.6.1"
scylla = "0.7"
tokio-postgres = "0.7.7"
err = { path = "../err" }
netpod = { path = "../netpod" }

View File

@@ -21,7 +21,7 @@ chrono = { version = "0.4.19", features = ["serde"] }
crc32fast = "1.3.2"
futures-util = "0.3.24"
async-channel = "1.7.1"
scylla = "0.6.1"
scylla = "0.7"
tokio-postgres = { version = "0.7.7", features = ["with-chrono-0_4", "with-serde_json-1"] }
err = { path = "../err" }
netpod = { path = "../netpod" }