diff --git a/daqingest/src/opts.rs b/daqingest/src/opts.rs index 6dedaa4..0d87679 100644 --- a/daqingest/src/opts.rs +++ b/daqingest/src/opts.rs @@ -171,6 +171,7 @@ pub struct FindOlder { #[derive(Debug, clap::Parser)] pub struct Ca { + #[arg(long)] pub broadcast: Option, #[command(subcommand)] pub subcmds: CaSubcmds, diff --git a/daqingest/src/tools/catools.rs b/daqingest/src/tools/catools.rs index 136ef2b..47f43f5 100644 --- a/daqingest/src/tools/catools.rs +++ b/daqingest/src/tools/catools.rs @@ -11,16 +11,21 @@ use std::time::Duration; pub enum Error {} pub async fn find(cmd: CaFind, broadcast: String) -> Result<(), Error> { + eprintln!("{:?}", broadcast); let brd = broadcast.split(","); + let tgts = brd + .inspect(|x| eprintln!("try to parse: [{:?}]", x)) + .map(|x| x.parse().unwrap()) + .collect(); + eprintln!("{:?}", tgts); let (channels_input_tx, channels_input_rx) = async_channel::bounded(10); - let tgts = brd.map(|x| x.parse().unwrap()).collect(); let blacklist = Vec::new(); let batch_run_max = Duration::from_millis(1200); let in_flight_max = 1; let batch_size = 1; let stats = Arc::new(IocFinderStats::new()); channels_input_tx.send(cmd.channel).await.unwrap(); - let mut stream = netfetch::ca::findioc::FindIocStream::new( + let stream = netfetch::ca::findioc::FindIocStream::new( channels_input_rx, tgts, blacklist, @@ -29,6 +34,8 @@ pub async fn find(cmd: CaFind, broadcast: String) -> Result<(), Error> { batch_size, stats, ); + let deadline = taskrun::tokio::time::sleep(Duration::from_millis(2000)); + let mut stream = Box::pin(stream.take_until(deadline)); while let Some(e) = stream.next().await { eprintln!("{e:?}"); } diff --git a/netfetch/Cargo.toml b/netfetch/Cargo.toml index 1c99afe..4f0b0be 100644 --- a/netfetch/Cargo.toml +++ b/netfetch/Cargo.toml @@ -37,6 +37,7 @@ dashmap = "6.0.1" hashbrown = "0.15.2" smallvec = "1.13.2" thiserror = "=0.0.1" +autoerr = "0.0.3" log = { path = "../log" } series = { path = "../../daqbuf-series", package = "daqbuf-series" } serieswriter = { path = "../serieswriter" } diff --git a/netfetch/src/ca/finder.rs b/netfetch/src/ca/finder.rs index 4be133d..bfa20fe 100644 --- a/netfetch/src/ca/finder.rs +++ b/netfetch/src/ca/finder.rs @@ -27,14 +27,15 @@ macro_rules! debug_batch { ($($arg:tt)*) => ( if false { debug!($($arg)*); } ) } macro_rules! trace_batch { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) } -#[derive(Debug, thiserror::Error)] -#[cstm(name = "Finder")] -pub enum Error { - Join(#[from] tokio::task::JoinError), - DbPg(#[from] dbpg::err::Error), - Postgres(#[from] dbpg::postgres::Error), - IocSearch(#[from] crate::ca::search::Error), -} +autoerr::create_error_v1!( + name(Error, "Finder"), + enum variants { + Join(#[from] tokio::task::JoinError), + DbPg(#[from] dbpg::err::Error), + Postgres(#[from] dbpg::postgres::Error), + IocSearch(#[from] crate::ca::search::Error), + }, +); fn transform_pgres(rows: Vec) -> VecDeque { let mut ret = VecDeque::new(); @@ -147,7 +148,7 @@ async fn finder_worker_single( let sql = concat!( "with q1 as (select * from unnest($2::text[]) as unn (ch))", " select distinct on (tt.facility, tt.channel) tt.channel, tt.addr", - " from ioc_by_channel_log tt join q1 on tt.channel = q1.ch and tt.facility = $1 and tt.addr is not null", + " from ioc_by_channel_log tt join q1 on tt.channel = q1.ch and tt.facility = $1 and tt.archived = 0 and tt.addr is not null", " order by tt.facility, tt.channel, tsmod desc", ); let qu_select_multi = pg.prepare(sql).await?; @@ -155,6 +156,9 @@ async fn finder_worker_single( loop { match inp.recv().await { Ok(batch) => { + if batch.iter().filter(|x| crate::dbg_chn(x.name())).next().is_some() { + info!("SEARCHING FOR DBG"); + }; stats.dbsearcher_batch_recv().inc(); stats.dbsearcher_item_recv().add(batch.len() as _); let ts1 = Instant::now(); @@ -203,6 +207,11 @@ async fn finder_worker_single( for e in &items { trace!("found in database: {e:?}"); } + for e in items.iter() { + if crate::dbg_chn(&e.channel) { + info!("FOUND {e:?}"); + } + } let items_len = items.len(); if items_len != nbatch { stats.dbsearcher_select_error_len_mismatch().inc(); diff --git a/netfetch/src/ca/findioc.rs b/netfetch/src/ca/findioc.rs index 3bd017a..ffdcd5a 100644 --- a/netfetch/src/ca/findioc.rs +++ b/netfetch/src/ca/findioc.rs @@ -25,21 +25,22 @@ use std::time::Instant; use taskrun::tokio; use tokio::io::unix::AsyncFd; -#[derive(Debug, thiserror::Error)] -#[cstm(name = "FindIoc")] -pub enum Error { - SocketCreate, - SocketConvertTokio, - BroadcastEnable, - NonblockEnable, - SocketBind, - SendFailure, - ReadFailure, - ReadEmpty, - Proto(#[from] proto::Error), - Slidebuf(#[from] slidebuf::Error), - IO(#[from] std::io::Error), -} +autoerr::create_error_v1!( + name(Error, "FindIoc"), + enum variants { + SocketCreate, + SocketConvertTokio, + BroadcastEnable, + NonblockEnable, + SocketBind, + SendFailure, + ReadFailure, + ReadEmpty, + Proto(#[from] proto::Error), + Slidebuf(#[from] slidebuf::Error), + IO(#[from] std::io::Error), + }, +); struct SockBox(c_int); diff --git a/netfetch/src/ca/statemap.rs b/netfetch/src/ca/statemap.rs index 651b11a..87b24a2 100644 --- a/netfetch/src/ca/statemap.rs +++ b/netfetch/src/ca/statemap.rs @@ -108,8 +108,8 @@ pub struct MaybeWrongAddressState { impl MaybeWrongAddressState { pub fn new(since: SystemTime, backoff_cnt: u32) -> Self { - let f = 1. + 10. * (backoff_cnt as f32 / 4.).tanh(); - let dtms = 4e3_f32 * f; + let f = 2. + 60. * (backoff_cnt as f32 / 5.).tanh(); + let dtms = 1e-3 * f; Self { since, backoff_dt: Duration::from_millis(dtms as u64), diff --git a/netfetch/src/lib.rs b/netfetch/src/lib.rs index 2edae26..b06007e 100644 --- a/netfetch/src/lib.rs +++ b/netfetch/src/lib.rs @@ -21,3 +21,8 @@ pub fn log_test() { debug!("log-test"); trace!("log-test"); } + +pub fn dbg_chn(chn: &str) -> bool { + let chns = ["SINEG01:QE-B1-OP"]; + chns.contains(&chn) +}