diff --git a/daqingest/src/daemon.rs b/daqingest/src/daemon.rs index 2027e91..3a9b7f0 100644 --- a/daqingest/src/daemon.rs +++ b/daqingest/src/daemon.rs @@ -12,6 +12,7 @@ use netfetch::ca::findioc::FindIocStream; use netfetch::ca::store::DataStore; use netfetch::conf::CaIngestOpts; use netfetch::errconv::ErrConv; +use netfetch::metrics::ExtraInsertsConf; use netfetch::store::CommonInsertItemQueue; use netpod::Database; use netpod::ScyllaConfig; @@ -21,12 +22,22 @@ use std::collections::VecDeque; use std::fmt; use std::net::SocketAddrV4; use std::pin::Pin; +use std::sync::atomic; +use std::sync::atomic::AtomicU64; +use std::sync::atomic::AtomicUsize; use std::sync::Arc; use std::time::Duration; use std::time::SystemTime; use tokio_postgres::Client as PgClient; -const CHECK_CHANS_PER_TICK: usize = 50000; +const CHECK_CHANS_PER_TICK: usize = 10000; +const FINDER_TIMEOUT: usize = 100; +const FINDER_JOB_QUEUE_LEN_MAX: usize = 20; +const FINDER_IN_FLIGHT_MAX: usize = 200; +const FINDER_BATCH_SIZE: usize = 8; +const CURRENT_SEARCH_PENDING_MAX: usize = 220; +const SEARCH_PENDING_TIMEOUT: usize = 10000; +const TIMEOUT_WARN_FACTOR: usize = 10; #[derive(Clone, Debug, Serialize, PartialEq, PartialOrd, Eq, Ord)] pub struct Channel { @@ -179,25 +190,49 @@ pub struct Daemon { search_tx: Sender, ioc_finder_jh: tokio::task::JoinHandle<()>, datastore: Arc, - common_insert_item_queue: CommonInsertItemQueue, + common_insert_item_queue: Arc, + insert_queue_counter: Arc, + count_unknown_address: usize, + count_search_pending: usize, + count_no_address: usize, + count_unassigned: usize, + count_assigned: usize, + last_status_print: SystemTime, } impl Daemon { pub async fn new(opts: DaemonOpts) -> Result { - let pg_client = make_pg_client(&opts.pgconf).await?; - let pg_client = Arc::new(pg_client); + let pg_client = Arc::new(make_pg_client(&opts.pgconf).await?); let datastore = DataStore::new(&opts.scyconf, pg_client).await?; let datastore = Arc::new(datastore); let (tx, rx) = async_channel::bounded(32); let tgts = opts.search_tgts.clone(); let (search_tx, ioc_finder_jh) = Self::start_finder(tx.clone(), tgts); - let common_insert_item_queue = CommonInsertItemQueue::new(opts.insert_item_queue_cap); + let common_insert_item_queue = Arc::new(CommonInsertItemQueue::new(opts.insert_item_queue_cap)); + let insert_queue_counter = Arc::new(AtomicUsize::new(0)); + + let ingest_commons = netfetch::ca::IngestCommons { + pgconf: Arc::new(opts.pgconf.clone()), + backend: opts.backend().into(), + local_epics_hostname: opts.local_epics_hostname.clone(), + insert_item_queue: common_insert_item_queue.clone(), + data_store: datastore.clone(), + insert_ivl_min: Arc::new(AtomicU64::new(0)), + extra_inserts_conf: tokio::sync::Mutex::new(ExtraInsertsConf::new()), + store_workers_rate: AtomicU64::new(20000), + insert_frac: AtomicU64::new(1000), + ca_conn_set: netfetch::ca::connset::CaConnSet::new(), + }; + let _ingest_commons = Arc::new(ingest_commons); + // TODO hook up with insert worker tokio::spawn({ let rx = common_insert_item_queue.receiver(); + let insert_queue_counter = insert_queue_counter.clone(); async move { while let Ok(item) = rx.recv().await { - info!("insert queue item {item:?}"); + insert_queue_counter.fetch_add(1, atomic::Ordering::AcqRel); + trace!("insert queue item {item:?}"); } } }); @@ -212,6 +247,13 @@ impl Daemon { ioc_finder_jh, datastore, common_insert_item_queue, + insert_queue_counter, + count_unknown_address: 0, + count_search_pending: 0, + count_no_address: 0, + count_unassigned: 0, + count_assigned: 0, + last_status_print: SystemTime::now(), }; Ok(ret) } @@ -220,24 +262,29 @@ impl Daemon { let (qtx, qrx) = async_channel::bounded(32); let (atx, arx) = async_channel::bounded(32); let ioc_finder_fut = async move { - const FINDER_JOB_QUEUE_LEN_MAX: usize = 1; - let mut finder = FindIocStream::new(tgts); + let mut finder = FindIocStream::new( + tgts, + Duration::from_millis(FINDER_TIMEOUT as u64), + FINDER_IN_FLIGHT_MAX, + FINDER_BATCH_SIZE, + ); + let fut_tick_dur = Duration::from_millis(100); let mut finder_more = true; - let mut fut1 = OptFut::new(finder.next()); - let mut fut2 = OptFut::new(qrx.recv()); + let mut finder_fut = OptFut::new(finder.next()); + let mut qrx_fut = OptFut::new(qrx.recv()); let mut qrx_more = true; - let mut fut_tick = Box::pin(tokio::time::sleep(Duration::from_millis(500))); + let mut fut_tick = Box::pin(tokio::time::sleep(fut_tick_dur)); let mut asend = OptFut::empty(); loop { - tokio::time::sleep(Duration::from_millis(10)).await; + //tokio::time::sleep(Duration::from_millis(1)).await; tokio::select! { _ = &mut asend, if asend.is_enabled() => { //info!("finder asend done"); asend = OptFut::empty(); } - r1 = &mut fut1, if fut1.is_enabled() => { + r1 = &mut finder_fut, if finder_fut.is_enabled() => { //info!("finder fut1"); - fut1 = OptFut::empty(); + finder_fut = OptFut::empty(); match r1 { Some(item) => { asend = OptFut::new(atx.send(item)); @@ -250,16 +297,16 @@ impl Daemon { } //info!("finder.job_queue_len() {}", finder.job_queue_len()); if qrx_more && finder.job_queue_len() < FINDER_JOB_QUEUE_LEN_MAX { - fut2 = OptFut::new(qrx.recv()); + qrx_fut = OptFut::new(qrx.recv()); } if finder_more { - fut1 = OptFut::new(finder.next()); + finder_fut = OptFut::new(finder.next()); } - fut_tick = Box::pin(tokio::time::sleep(Duration::from_millis(2000))); + fut_tick = Box::pin(tokio::time::sleep(fut_tick_dur)); } - r2 = &mut fut2, if fut2.is_enabled() => { + r2 = &mut qrx_fut, if qrx_fut.is_enabled() => { //info!("finder fut2"); - fut2 = OptFut::empty(); + qrx_fut = OptFut::empty(); match r2 { Ok(item) => { //info!("Push to finder: {item:?}"); @@ -273,26 +320,26 @@ impl Daemon { } //info!("finder.job_queue_len() {}", finder.job_queue_len()); if qrx_more && finder.job_queue_len() < FINDER_JOB_QUEUE_LEN_MAX { - fut2 = OptFut::new(qrx.recv()); + qrx_fut = OptFut::new(qrx.recv()); } if finder_more { - fut1 = OptFut::new(finder.next()); + finder_fut = OptFut::new(finder.next()); } else { - fut1 = OptFut::empty(); + finder_fut = OptFut::empty(); } - fut_tick = Box::pin(tokio::time::sleep(Duration::from_millis(2000))); + fut_tick = Box::pin(tokio::time::sleep(fut_tick_dur)); } _ = &mut fut_tick => { //info!("finder fut_tick finder.job_queue_len() {}", finder.job_queue_len()); if qrx_more && finder.job_queue_len() < FINDER_JOB_QUEUE_LEN_MAX { - fut2 = OptFut::new(qrx.recv()); + qrx_fut = OptFut::new(qrx.recv()); } if finder_more { - fut1 = OptFut::new(finder.next()); + finder_fut = OptFut::new(finder.next()); } else { - fut1 = OptFut::empty(); + finder_fut = OptFut::empty(); } - fut_tick = Box::pin(tokio::time::sleep(Duration::from_millis(2000))); + fut_tick = Box::pin(tokio::time::sleep(fut_tick_dur)); } else => { error!("all branches are disabled"); @@ -322,11 +369,13 @@ impl Daemon { async fn check_chans(&mut self) -> Result<(), Error> { let tsnow = SystemTime::now(); let k = self.chan_check_next.take(); - info!("------------ check_chans start at {:?}", k); + trace!("------------ check_chans start at {:?}", k); let mut currently_search_pending = 0; - for (_ch, st) in &self.channel_states { - if let ChannelStateValue::Active(ActiveChannelState::SearchPending { .. }) = &st.value { - currently_search_pending += 1; + { + for (_ch, st) in &self.channel_states { + if let ChannelStateValue::Active(ActiveChannelState::SearchPending { .. }) = &st.value { + currently_search_pending += 1; + } } } let it = if let Some(last) = k { @@ -334,11 +383,6 @@ impl Daemon { } else { self.channel_states.range_mut(..) }; - let mut count_unknown_address = 0; - let mut count_search_pending = 0; - let mut count_no_address = 0; - let mut count_unassigned = 0; - let mut count_assigned = 0; for (i, (ch, st)) in it.enumerate() { use ActiveChannelState::*; use ChannelStateValue::*; @@ -346,8 +390,7 @@ impl Daemon { Active(st2) => match st2 { UnknownAddress => { //info!("UnknownAddress {} {:?}", i, ch); - count_unknown_address += 1; - if currently_search_pending < 10 { + if currently_search_pending < CURRENT_SEARCH_PENDING_MAX { currently_search_pending += 1; if st.pending_op.is_none() { st.pending_op = Some(ChanOp::Finder(ch.id().to_string(), tsnow)); @@ -357,12 +400,11 @@ impl Daemon { } SearchPending { since } => { //info!("SearchPending {} {:?}", i, ch); - count_search_pending += 1; // TODO handle Err match tsnow.duration_since(*since) { Ok(dt) => { - if dt >= Duration::from_millis(10000) { - warn!("Search timeout for {ch:?}"); + if dt >= Duration::from_millis(SEARCH_PENDING_TIMEOUT as u64) { + debug!("Search timeout for {ch:?}"); st.value = Active(ActiveChannelState::NoAddress); currently_search_pending -= 1; } @@ -377,11 +419,10 @@ impl Daemon { use WithAddressState::*; match state { Unassigned { assign_at } => { - count_unassigned += 1; if *assign_at <= tsnow { if st.pending_op.is_none() { if !self.conns.contains_key(addr) { - info!("==================== create CaConn for {ch:?}"); + debug!("==================== create CaConn for {ch:?}"); let backend = self.opts.backend().into(); let local_epics_hostname = self.opts.local_epics_hostname.clone(); let array_truncate = self.opts.array_truncate; @@ -406,7 +447,10 @@ impl Daemon { let fut = async move { tx.send(cmd).await?; let res = rx.recv().await?; - info!("answer from CaConn: {res:?}"); + debug!("answer from CaConn: {res:?}"); + if res != true { + warn!("problem from CaConn"); + } Ok(()) }; st.pending_op = Some(ChanOp::ConnCmd(Box::pin(fut))); @@ -423,14 +467,12 @@ impl Daemon { } Assigned(_) => { // TODO check if channel is healthy and alive - count_assigned += 1; } } } NoAddress => { // TODO try to find address again after some randomized timeout //info!("NoAddress {} {:?}", i, ch); - count_no_address += 1; } }, ToRemove { .. } => { @@ -442,19 +484,15 @@ impl Daemon { break; } } - info!( - "{:8} {:8} {:8} {:8} {:8}", - count_unknown_address, count_search_pending, count_unassigned, count_assigned, count_no_address - ); - for (_ch, st) in &mut self.channel_states { + for (ch, st) in &mut self.channel_states { match &mut st.pending_op { Some(op) => match op { ChanOp::Finder(s, start) => { if *start + Duration::from_millis(10000) >= tsnow { match self.search_tx.try_send(s.clone()) { Ok(_) => { + *start = tsnow; st.pending_op = None; - info!("OK, sent msg to Finder"); } Err(e) => match e { async_channel::TrySendError::Full(_) => { @@ -468,7 +506,11 @@ impl Daemon { } } else { st.pending_op = None; - warn!("ChanOp::Finder timeout"); + warn!("ChanOp::Finder send timeout for {ch:?}"); + *st = ChannelState { + value: ChannelStateValue::Active(ActiveChannelState::UnknownAddress), + pending_op: None, + }; } } ChanOp::ConnCmd(fut) => { @@ -478,7 +520,7 @@ impl Daemon { st.pending_op = None; match res { Ok(_) => { - info!("ChanOp::ConnCmd completed fine"); + debug!("ChanOp::ConnCmd completed fine"); } Err(e) => { error!("ChanOp::ConnCmd {e}"); @@ -492,11 +534,55 @@ impl Daemon { None => {} } } + { + self.count_unknown_address = 0; + self.count_search_pending = 0; + self.count_no_address = 0; + self.count_unassigned = 0; + self.count_assigned = 0; + for (_ch, st) in &self.channel_states { + match &st.value { + ChannelStateValue::Active(st) => match st { + ActiveChannelState::UnknownAddress => { + self.count_unknown_address += 1; + } + ActiveChannelState::SearchPending { .. } => { + self.count_search_pending += 1; + } + ActiveChannelState::WithAddress { state, .. } => match state { + WithAddressState::Unassigned { .. } => { + self.count_unassigned += 1; + } + WithAddressState::Assigned(_) => { + self.count_assigned += 1; + } + }, + ActiveChannelState::NoAddress => { + self.count_no_address += 1; + } + }, + ChannelStateValue::ToRemove { .. } => {} + } + } + } Ok(()) } async fn handle_timer_tick(&mut self) -> Result<(), Error> { + let tsnow = SystemTime::now(); self.check_chans().await?; + if tsnow.duration_since(self.last_status_print).unwrap_or(Duration::ZERO) >= Duration::from_millis(1000) { + self.last_status_print = tsnow; + info!( + "{:8} {:8} {:8} : {:8} {:8} : {:10}", + self.count_unknown_address, + self.count_search_pending, + self.count_no_address, + self.count_unassigned, + self.count_assigned, + self.insert_queue_counter.load(atomic::Ordering::Acquire), + ); + } Ok(()) } @@ -536,48 +622,77 @@ impl Daemon { Ok(()) } + fn handle_search_done(&mut self, item: Result, Error>) -> Result<(), Error> { + //debug!("handle SearchDone: {res:?}"); + match item { + Ok(a) => { + for res in a { + if let Some(addr) = &res.addr { + let addr = addr.clone(); + let ch = Channel::new(res.channel); + if let Some(st) = self.channel_states.get_mut(&ch) { + if let ChannelStateValue::Active(ActiveChannelState::SearchPending { since }) = &st.value { + let dt = SystemTime::now().duration_since(*since).unwrap(); + if dt > Duration::from_millis(FINDER_TIMEOUT as u64 * TIMEOUT_WARN_FACTOR as u64) { + warn!( + " FOUND {:5.0} {:5.0} {addr}", + 1e3 * dt.as_secs_f32(), + 1e3 * res.dt.as_secs_f32() + ); + } + let stnew = ChannelStateValue::Active(ActiveChannelState::WithAddress { + addr, + state: WithAddressState::Unassigned { + assign_at: SystemTime::now(), + }, + }); + st.value = stnew; + } else { + warn!( + "address found, but state for {ch:?} is not SearchPending: {:?}", + st.value + ); + } + } else { + warn!("can not find channel state for {ch:?}"); + } + } else { + //debug!("no addr from search in {res:?}"); + let ch = Channel::new(res.channel); + if let Some(st) = self.channel_states.get_mut(&ch) { + if let ChannelStateValue::Active(ActiveChannelState::SearchPending { since }) = &st.value { + let dt = SystemTime::now().duration_since(*since).unwrap(); + if dt > Duration::from_millis(FINDER_TIMEOUT as u64 * TIMEOUT_WARN_FACTOR as u64) { + warn!( + "NOT FOUND {:5.0} {:5.0}", + 1e3 * dt.as_secs_f32(), + 1e3 * res.dt.as_secs_f32() + ); + } + st.value = ChannelStateValue::Active(ActiveChannelState::NoAddress); + } else { + warn!("no address, but state for {ch:?} is not SearchPending: {:?}", st.value); + } + } else { + warn!("can not find channel state for {ch:?}"); + } + } + } + } + Err(e) => { + error!("error from search: {e}"); + } + } + Ok(()) + } + async fn handle_event(&mut self, item: DaemonEvent) -> Result<(), Error> { use DaemonEvent::*; match item { TimerTick => self.handle_timer_tick().await, ChannelAdd(ch) => self.handle_channel_add(ch), ChannelRemove(ch) => self.handle_channel_remove(ch), - SearchDone(res) => { - info!("handle SearchDone: {res:?}"); - match res { - Ok(a) => { - for res in a { - if let Some(addr) = &res.addr { - let addr = addr.clone(); - let ch = Channel::new(res.channel); - if let Some(st) = self.channel_states.get_mut(&ch) { - if let ChannelStateValue::Active(ActiveChannelState::SearchPending { .. }) = - &st.value - { - let stnew = ChannelStateValue::Active(ActiveChannelState::WithAddress { - addr, - state: WithAddressState::Unassigned { - assign_at: SystemTime::now(), - }, - }); - st.value = stnew; - } else { - warn!("state for {ch:?} is not SearchPending"); - } - } else { - warn!("can not find channel state for {ch:?}"); - } - } else { - warn!("no addr from search in {res:?}"); - } - } - } - Err(e) => { - error!("error from search: {e}"); - } - } - Ok(()) - } + SearchDone(item) => self.handle_search_done(item), } } @@ -585,7 +700,7 @@ impl Daemon { let ticker = { let tx = self.tx.clone(); async move { - let mut ticker = tokio::time::interval(Duration::from_millis(1500)); + let mut ticker = tokio::time::interval(Duration::from_millis(100)); ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); loop { ticker.tick().await; @@ -599,16 +714,13 @@ impl Daemon { taskrun::spawn(ticker); loop { match self.rx.recv().await { - Ok(item) => { - info!("got daemon event {item:?}"); - match self.handle_event(item).await { - Ok(_) => {} - Err(e) => { - error!("daemon: {e}"); - break; - } + Ok(item) => match self.handle_event(item).await { + Ok(_) => {} + Err(e) => { + error!("daemon: {e}"); + break; } - } + }, Err(e) => { error!("{e}"); break; diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index 935a05e..30de673 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -984,7 +984,7 @@ impl CaConn { fn handle_event_add_res(&mut self, ev: proto::EventAddRes, tsnow: Instant) -> Result<(), Error> { // TODO handle subid-not-found which can also be peer error: let cid = *self.cid_by_subid.get(&ev.subid).unwrap(); - if true { + if false { let name = self.name_by_cid(cid); info!("event {name:?} {ev:?}"); } diff --git a/netfetch/src/ca/findioc.rs b/netfetch/src/ca/findioc.rs index a5e3cb8..ca52389 100644 --- a/netfetch/src/ca/findioc.rs +++ b/netfetch/src/ca/findioc.rs @@ -39,7 +39,7 @@ struct SearchBatch { tgts: VecDeque, channels: Vec, sids: Vec, - done: Vec, + done: Vec, } #[derive(Debug)] @@ -48,6 +48,7 @@ pub struct FindIocRes { pub query_addr: Option, pub response_addr: Option, pub addr: Option, + pub dt: Duration, } pub struct FindIocStream { @@ -73,7 +74,7 @@ pub struct FindIocStream { } impl FindIocStream { - pub fn new(tgts: Vec) -> Self { + pub fn new(tgts: Vec, batch_run_max: Duration, in_flight_max: usize, batch_size: usize) -> Self { let sock = unsafe { Self::create_socket() }.unwrap(); let afd = AsyncFd::new(sock.0).unwrap(); Self { @@ -92,9 +93,9 @@ impl FindIocStream { bids_timed_out: BTreeMap::new(), sids_done: BTreeMap::new(), result_for_done_sid_count: 0, - in_flight_max: 20, - channels_per_batch: 10, - batch_run_max: Duration::from_millis(2500), + in_flight_max, + channels_per_batch: batch_size, + batch_run_max, sleeper: Box::pin(tokio::time::sleep(Duration::from_millis(500))), } } @@ -294,23 +295,35 @@ impl FindIocStream { if accounted != ec as usize { info!("unaccounted data ec {} accounted {}", ec, accounted); } - if msgs.len() != 2 { - info!("expect always 2 commands in the response, instead got {}", msgs.len()); + if msgs.len() < 1 { + warn!("received answer without messages"); } - for m in &msgs { - debug!("m: {m:?}"); + if msgs.len() == 1 { + warn!("received answer with single message: {msgs:?}"); + } + let mut good = true; + if let CaMsgTy::VersionRes(v) = msgs[0].ty { + if v != 13 { + warn!("bad version: {msgs:?}"); + good = false; + } + } else { + debug!("first message is not a version: {:?}", msgs[0].ty); + // Seems like a bug in many IOCs + //good = false; } let mut res = Vec::new(); - for msg in msgs.iter() { - match &msg.ty { - CaMsgTy::SearchRes(k) => { - info!("SearchRes: {k:?}"); - let addr = SocketAddrV4::new(src_addr, k.tcp_port); - res.push((SearchId(k.id), addr)); - } - CaMsgTy::VersionRes(13) => {} - _ => { - warn!("try_read: unknown message received {:?}", msg.ty); + if good { + for msg in &msgs[1..] { + match &msg.ty { + CaMsgTy::SearchRes(k) => { + let addr = SocketAddrV4::new(src_addr, k.tcp_port); + res.push((SearchId(k.id), addr)); + } + //CaMsgTy::VersionRes(13) => {} + _ => { + warn!("try_read: unknown message received {:?}", msg.ty); + } } } } @@ -338,29 +351,29 @@ impl FindIocStream { } fn create_in_flight(&mut self) { - let bid = BATCH_ID.fetch_add(1, Ordering::AcqRel); - let bid = BatchId(bid as u32); + let bid = BatchId(BATCH_ID.fetch_add(1, Ordering::AcqRel) as u32); let mut sids = Vec::new(); let mut chs = Vec::new(); while chs.len() < self.channels_per_batch && self.channels_input.len() > 0 { - let sid = SEARCH_ID2.fetch_add(1, Ordering::AcqRel); - let sid = SearchId(sid as u32); + let sid = SearchId(SEARCH_ID2.fetch_add(1, Ordering::AcqRel) as u32); self.bid_by_sid.insert(sid.clone(), bid.clone()); sids.push(sid); chs.push(self.channels_input.pop_front().unwrap()); } + let n = chs.len(); let batch = SearchBatch { ts_beg: Instant::now(), channels: chs, tgts: self.tgts.iter().enumerate().map(|x| x.0).collect(), sids, - done: Vec::new(), + done: vec![false; n], }; self.in_flight.insert(bid.clone(), batch); self.batch_send_queue.push_back(bid); } fn handle_result(&mut self, src: SocketAddrV4, res: Vec<(SearchId, SocketAddrV4)>) { + let tsnow = Instant::now(); let mut sids_remove = Vec::new(); for (sid, addr) in res { self.sids_done.insert(sid.clone(), ()); @@ -369,39 +382,38 @@ impl FindIocStream { sids_remove.push(sid.clone()); match self.in_flight.get_mut(bid) { Some(batch) => { - // TGT + let mut found_sid = false; for (i2, s2) in batch.sids.iter().enumerate() { if s2 == &sid { + found_sid = true; + batch.done[i2] = true; match batch.channels.get(i2) { Some(ch) => { + let dt = tsnow.saturating_duration_since(batch.ts_beg); let res = FindIocRes { channel: ch.into(), // TODO associate a batch with a specific query address. query_addr: None, response_addr: Some(src.clone()), addr: Some(addr), + dt, }; self.out_queue.push_back(res); } None => { - error!("no matching channel for {sid:?}"); + error!( + "logic error batch sids / channels lens: {} vs {}", + batch.sids.len(), + batch.channels.len() + ); } } } } - // Book keeping: - batch.done.push(sid); - let mut all_done = true; - if batch.done.len() >= batch.sids.len() { - for s1 in &batch.sids { - if !batch.done.contains(s1) { - all_done = false; - break; - } - } - } else { - all_done = false; + if !found_sid { + error!("can not find sid {sid:?} in batch {bid:?}"); } + let all_done = batch.done.iter().all(|x| *x); if all_done { self.bids_all_done.insert(bid.clone(), ()); self.in_flight.remove(bid); @@ -429,29 +441,33 @@ impl FindIocStream { } fn clear_timed_out(&mut self) { - let now = Instant::now(); + let tsnow = Instant::now(); let mut bids = Vec::new(); let mut sids = Vec::new(); let mut chns = Vec::new(); + let mut dts = Vec::new(); for (bid, batch) in &mut self.in_flight { - if now.duration_since(batch.ts_beg) > self.batch_run_max { + let dt = tsnow.saturating_duration_since(batch.ts_beg); + if dt > self.batch_run_max { self.bids_timed_out.insert(bid.clone(), ()); for (i2, sid) in batch.sids.iter().enumerate() { - if batch.done.contains(sid) == false { + if batch.done[i2] == false { debug!("Timeout: {bid:?} {}", batch.channels[i2]); + sids.push(sid.clone()); + chns.push(batch.channels[i2].clone()); + dts.push(dt); } - sids.push(sid.clone()); - chns.push(batch.channels[i2].clone()); } bids.push(bid.clone()); } } - for (sid, ch) in sids.into_iter().zip(chns) { + for ((sid, ch), dt) in sids.into_iter().zip(chns).zip(dts) { let res = FindIocRes { query_addr: None, response_addr: None, channel: ch, addr: None, + dt, }; self.out_queue.push_back(res); self.bid_by_sid.remove(&sid); @@ -536,9 +552,10 @@ impl Stream for FindIocStream { } None => { if self.bids_all_done.contains_key(&bid) { - // TODO count events + // Already answered from another target + //trace!("bid {bid:?} from batch send queue not in flight AND all done"); } else { - info!("Batch {bid:?} seems already done"); + warn!("bid {bid:?} from batch send queue not in flight NOT done"); } loop_again = true; } diff --git a/netfetch/src/ca/proto.rs b/netfetch/src/ca/proto.rs index 82c7eb9..b0667d1 100644 --- a/netfetch/src/ca/proto.rs +++ b/netfetch/src/ca/proto.rs @@ -524,6 +524,7 @@ impl CaMsg { CaScalarType::I32 => convert_wave_value!(i32, I32, n, buf), CaScalarType::F32 => convert_wave_value!(f32, F32, n, buf), CaScalarType::F64 => convert_wave_value!(f64, F64, n, buf), + CaScalarType::String => CaDataValue::Scalar(CaDataScalarValue::String("todo-array-string".into())), _ => { warn!("TODO conversion array {scalar_type:?}"); return Err(Error::with_msg_no_trace(format!( diff --git a/netfetch/src/ca/search.rs b/netfetch/src/ca/search.rs index 9aa79a4..19d3ba0 100644 --- a/netfetch/src/ca/search.rs +++ b/netfetch/src/ca/search.rs @@ -104,7 +104,7 @@ pub async fn ca_search(opts: CaIngestOpts, channels: &Vec) -> Result<(), } }) .collect(); - let mut finder = FindIocStream::new(addrs); + let mut finder = FindIocStream::new(addrs, Duration::from_millis(1000), 20, 1); for ch in channels.iter() { finder.push(ch.into()); }