From 4ea83f7a1fe8f93736391b27a4a6da3538c5151a Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Wed, 4 Jan 2023 19:54:27 +0100 Subject: [PATCH] WIP timeout handling --- daqingest/Cargo.toml | 1 + daqingest/src/daemon.rs | 487 ++++++++++++++++++++++++++----------- netfetch/Cargo.toml | 2 +- netfetch/src/ca/conn.rs | 16 +- netfetch/src/ca/connset.rs | 3 - netfetch/src/ca/findioc.rs | 12 +- 6 files changed, 368 insertions(+), 153 deletions(-) diff --git a/daqingest/Cargo.toml b/daqingest/Cargo.toml index 3c2e4bc..66ee052 100644 --- a/daqingest/Cargo.toml +++ b/daqingest/Cargo.toml @@ -20,6 +20,7 @@ async-channel = "1.6" chrono = "0.4" bytes = "1.1" scylla = "0.7" +tokio-postgres = "0.7.7" serde = { version = "1.0", features = ["derive"] } err = { path = "../../daqbuffer/err" } log = { path = "../log" } diff --git a/daqingest/src/daemon.rs b/daqingest/src/daemon.rs index 72bfce5..2027e91 100644 --- a/daqingest/src/daemon.rs +++ b/daqingest/src/daemon.rs @@ -1,6 +1,7 @@ use async_channel::Receiver; use async_channel::Sender; use err::Error; +use futures_util::Future; use futures_util::FutureExt; use futures_util::StreamExt; use log::*; @@ -8,16 +9,24 @@ use netfetch::ca::conn::CaConn; use netfetch::ca::conn::ConnCommand; use netfetch::ca::findioc::FindIocRes; use netfetch::ca::findioc::FindIocStream; +use netfetch::ca::store::DataStore; use netfetch::conf::CaIngestOpts; +use netfetch::errconv::ErrConv; +use netfetch::store::CommonInsertItemQueue; +use netpod::Database; +use netpod::ScyllaConfig; use serde::Serialize; use std::collections::BTreeMap; use std::collections::VecDeque; -use std::net::IpAddr; +use std::fmt; use std::net::SocketAddrV4; +use std::pin::Pin; +use std::sync::Arc; use std::time::Duration; use std::time::SystemTime; +use tokio_postgres::Client as PgClient; -const CHECK_CHANS_PER_TICK: usize = 10; +const CHECK_CHANS_PER_TICK: usize = 50000; #[derive(Clone, Debug, Serialize, PartialEq, PartialOrd, Eq, Ord)] pub struct Channel { @@ -65,12 +74,32 @@ pub enum ActiveChannelState { NoAddress, } -#[derive(Clone, Debug, Serialize)] -pub enum ChannelState { +enum ChanOp { + Finder(String, SystemTime), + ConnCmd(Pin> + Send>>), +} + +impl fmt::Debug for ChanOp { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + match self { + Self::Finder(arg0, arg1) => fmt.debug_tuple("Finder").field(arg0).field(arg1).finish(), + Self::ConnCmd(_arg0) => fmt.debug_tuple("ConnCmd").finish(), + } + } +} + +#[derive(Debug, Serialize)] +pub enum ChannelStateValue { Active(ActiveChannelState), ToRemove { addr: Option }, } +#[derive(Debug)] +pub struct ChannelState { + value: ChannelStateValue, + pending_op: Option, +} + #[derive(Debug)] pub enum DaemonEvent { TimerTick, @@ -80,8 +109,20 @@ pub enum DaemonEvent { } pub struct DaemonOpts { + backend: String, + local_epics_hostname: String, + array_truncate: usize, + insert_item_queue_cap: usize, search_tgts: Vec, - search_excl: Vec, + //search_excl: Vec, + pgconf: Database, + scyconf: ScyllaConfig, +} + +impl DaemonOpts { + pub fn backend(&self) -> &str { + &self.backend + } } struct OptFut { @@ -89,8 +130,16 @@ struct OptFut { } impl OptFut { - fn new(fut: Option) -> Self { - Self { fut } + fn empty() -> Self { + Self { fut: None } + } + + fn new(fut: F) -> Self { + Self { fut: Some(fut) } + } + + fn is_enabled(&self) -> bool { + self.fut.is_some() } } @@ -108,99 +157,51 @@ where } } +pub async fn make_pg_client(d: &Database) -> Result { + let (client, pg_conn) = tokio_postgres::connect( + &format!("postgresql://{}:{}@{}:{}/{}", d.user, d.pass, d.host, d.port, d.name), + tokio_postgres::tls::NoTls, + ) + .await + .err_conv()?; + // TODO allow clean shutdown on ctrl-c and join the pg_conn in the end: + tokio::spawn(pg_conn); + Ok(client) +} + pub struct Daemon { opts: DaemonOpts, channel_states: BTreeMap, tx: Sender, rx: Receiver, - conns: BTreeMap, + conns: BTreeMap, tokio::task::JoinHandle<()>)>, chan_check_next: Option, search_tx: Sender, ioc_finder_jh: tokio::task::JoinHandle<()>, + datastore: Arc, + common_insert_item_queue: CommonInsertItemQueue, } impl Daemon { - pub fn new(opts: DaemonOpts) -> Self { - let (tx, rx) = async_channel::bounded(1); + pub async fn new(opts: DaemonOpts) -> Result { + let pg_client = make_pg_client(&opts.pgconf).await?; + let pg_client = Arc::new(pg_client); + let datastore = DataStore::new(&opts.scyconf, pg_client).await?; + let datastore = Arc::new(datastore); + let (tx, rx) = async_channel::bounded(32); let tgts = opts.search_tgts.clone(); - let (search_tx, ioc_finder_jh) = { - let (qtx, qrx) = async_channel::bounded(1); - let (atx, arx) = async_channel::bounded(1); - let ioc_finder_fut = async move { - const FINDER_JOB_QUEUE_LEN_MAX: usize = 1; - let mut finder = FindIocStream::new(tgts); - let mut fut1 = finder.next(); - let mut fut2 = qrx.recv().fuse(); - let mut fut_tick = Box::pin(tokio::time::sleep(Duration::from_millis(2000)).fuse()); - let mut asend = OptFut::new(None).fuse(); - loop { - tokio::time::sleep(Duration::from_millis(200)).await; - futures_util::select! { - _ = asend => { - info!("asend done"); - } - r1 = fut1 => { - match r1 { - Some(item) => { - asend = OptFut::new(Some(atx.send(item))).fuse(); - } - None => { - // TODO finder has stopped, do no longer poll on it - } - } - if finder.job_queue_len() < FINDER_JOB_QUEUE_LEN_MAX { - fut2 = qrx.recv().fuse(); - } - fut1 = finder.next(); - fut_tick = Box::pin(tokio::time::sleep(Duration::from_millis(2000)).fuse()); - } - r2 = fut2 => { - match r2 { - Ok(item) => { - info!("Push to finder: {item:?}"); - finder.push(item); - } - Err(e) => { - // TODO input is done... ignore from here on. - error!("{e}"); - break; - } - } - if finder.job_queue_len() < FINDER_JOB_QUEUE_LEN_MAX { - fut2 = qrx.recv().fuse(); - } - fut1 = finder.next(); - fut_tick = Box::pin(tokio::time::sleep(Duration::from_millis(2000)).fuse()); - } - _ = fut_tick => { - if finder.job_queue_len() < FINDER_JOB_QUEUE_LEN_MAX { - //fut2 = qrx.recv().fuse(); - } - fut1 = finder.next(); - fut_tick = Box::pin(tokio::time::sleep(Duration::from_millis(2000)).fuse()); - } - }; + let (search_tx, ioc_finder_jh) = Self::start_finder(tx.clone(), tgts); + let common_insert_item_queue = CommonInsertItemQueue::new(opts.insert_item_queue_cap); + // TODO hook up with insert worker + tokio::spawn({ + let rx = common_insert_item_queue.receiver(); + async move { + while let Ok(item) = rx.recv().await { + info!("insert queue item {item:?}"); } - }; - let ioc_finder_jh = taskrun::spawn(ioc_finder_fut); - taskrun::spawn({ - let tx = tx.clone(); - async move { - while let Ok(item) = arx.recv().await { - info!("forward search result item"); - match tx.send(DaemonEvent::SearchDone(item)).await { - Ok(_) => {} - Err(e) => { - error!("search res fwd {e}"); - } - } - } - warn!("search res fwd nput broken"); - } - }); - (qtx, ioc_finder_jh) - }; - Self { + } + }); + let ret = Self { opts, channel_states: BTreeMap::new(), tx, @@ -209,83 +210,227 @@ impl Daemon { chan_check_next: None, search_tx, ioc_finder_jh, - } + datastore, + common_insert_item_queue, + }; + Ok(ret) } - fn check_chans(&mut self) -> Result<(), Error> { + fn start_finder(tx: Sender, tgts: Vec) -> (Sender, tokio::task::JoinHandle<()>) { + let (qtx, qrx) = async_channel::bounded(32); + let (atx, arx) = async_channel::bounded(32); + let ioc_finder_fut = async move { + const FINDER_JOB_QUEUE_LEN_MAX: usize = 1; + let mut finder = FindIocStream::new(tgts); + let mut finder_more = true; + let mut fut1 = OptFut::new(finder.next()); + let mut fut2 = OptFut::new(qrx.recv()); + let mut qrx_more = true; + let mut fut_tick = Box::pin(tokio::time::sleep(Duration::from_millis(500))); + let mut asend = OptFut::empty(); + loop { + tokio::time::sleep(Duration::from_millis(10)).await; + tokio::select! { + _ = &mut asend, if asend.is_enabled() => { + //info!("finder asend done"); + asend = OptFut::empty(); + } + r1 = &mut fut1, if fut1.is_enabled() => { + //info!("finder fut1"); + fut1 = OptFut::empty(); + match r1 { + Some(item) => { + asend = OptFut::new(atx.send(item)); + } + None => { + // TODO finder has stopped, do no longer poll on it + warn!("Finder has stopped"); + finder_more = false; + } + } + //info!("finder.job_queue_len() {}", finder.job_queue_len()); + if qrx_more && finder.job_queue_len() < FINDER_JOB_QUEUE_LEN_MAX { + fut2 = OptFut::new(qrx.recv()); + } + if finder_more { + fut1 = OptFut::new(finder.next()); + } + fut_tick = Box::pin(tokio::time::sleep(Duration::from_millis(2000))); + } + r2 = &mut fut2, if fut2.is_enabled() => { + //info!("finder fut2"); + fut2 = OptFut::empty(); + match r2 { + Ok(item) => { + //info!("Push to finder: {item:?}"); + finder.push(item); + } + Err(e) => { + // TODO input is done... ignore from here on. + error!("Finder input channel error {e}"); + qrx_more = false; + } + } + //info!("finder.job_queue_len() {}", finder.job_queue_len()); + if qrx_more && finder.job_queue_len() < FINDER_JOB_QUEUE_LEN_MAX { + fut2 = OptFut::new(qrx.recv()); + } + if finder_more { + fut1 = OptFut::new(finder.next()); + } else { + fut1 = OptFut::empty(); + } + fut_tick = Box::pin(tokio::time::sleep(Duration::from_millis(2000))); + } + _ = &mut fut_tick => { + //info!("finder fut_tick finder.job_queue_len() {}", finder.job_queue_len()); + if qrx_more && finder.job_queue_len() < FINDER_JOB_QUEUE_LEN_MAX { + fut2 = OptFut::new(qrx.recv()); + } + if finder_more { + fut1 = OptFut::new(finder.next()); + } else { + fut1 = OptFut::empty(); + } + fut_tick = Box::pin(tokio::time::sleep(Duration::from_millis(2000))); + } + else => { + error!("all branches are disabled"); + break; + } + }; + } + }; + let ioc_finder_jh = taskrun::spawn(ioc_finder_fut); + taskrun::spawn({ + async move { + while let Ok(item) = arx.recv().await { + //info!("forward search result item"); + match tx.send(DaemonEvent::SearchDone(item)).await { + Ok(_) => {} + Err(e) => { + error!("search res fwd {e}"); + } + } + } + warn!("search res fwd nput broken"); + } + }); + (qtx, ioc_finder_jh) + } + + async fn check_chans(&mut self) -> Result<(), Error> { let tsnow = SystemTime::now(); let k = self.chan_check_next.take(); - info!("check_chans start at {:?}", k); + info!("------------ check_chans start at {:?}", k); + let mut currently_search_pending = 0; + for (_ch, st) in &self.channel_states { + if let ChannelStateValue::Active(ActiveChannelState::SearchPending { .. }) = &st.value { + currently_search_pending += 1; + } + } let it = if let Some(last) = k { self.channel_states.range_mut(last..) } else { self.channel_states.range_mut(..) }; + let mut count_unknown_address = 0; + let mut count_search_pending = 0; + let mut count_no_address = 0; + let mut count_unassigned = 0; + let mut count_assigned = 0; for (i, (ch, st)) in it.enumerate() { - info!("check chan {} {:?}", i, ch); use ActiveChannelState::*; - use ChannelState::*; - match st { + use ChannelStateValue::*; + match &mut st.value { Active(st2) => match st2 { UnknownAddress => { - if self.search_tx.is_full() { - // TODO what to do if the queue is full? - } else { - match self.search_tx.try_send(ch.id().into()) { - Ok(_) => { - *st = Active(SearchPending { since: tsnow }); - } - Err(_) => { - error!("can not send search query"); - } + //info!("UnknownAddress {} {:?}", i, ch); + count_unknown_address += 1; + if currently_search_pending < 10 { + currently_search_pending += 1; + if st.pending_op.is_none() { + st.pending_op = Some(ChanOp::Finder(ch.id().to_string(), tsnow)); + st.value = Active(SearchPending { since: tsnow }); } } } SearchPending { since } => { + //info!("SearchPending {} {:?}", i, ch); + count_search_pending += 1; // TODO handle Err match tsnow.duration_since(*since) { Ok(dt) => { if dt >= Duration::from_millis(10000) { warn!("Search timeout for {ch:?}"); - *st = Active(ActiveChannelState::NoAddress); + st.value = Active(ActiveChannelState::NoAddress); + currently_search_pending -= 1; } } Err(e) => { - error!("{e}"); + error!("SearchPending {e}"); } } } WithAddress { addr, state } => { + //info!("WithAddress {} {:?}", i, ch); use WithAddressState::*; match state { Unassigned { assign_at } => { + count_unassigned += 1; if *assign_at <= tsnow { - match self.conns.get(addr) { - Some(conn) => { - let tx = conn.conn_command_tx(); - let (cmd, rx) = ConnCommand::channel_add(ch.id().into()); - // TODO how to send the command from this non-async context? - //tx.send(cmd).await; - // TODO if the send can be assumed to be on its way (it may still fail) then update state - if true { - let cs = ConnectionState { - updated: tsnow, - value: ConnectionStateValue::Unconnected, - }; - *state = WithAddressState::Assigned(cs) - } + if st.pending_op.is_none() { + if !self.conns.contains_key(addr) { + info!("==================== create CaConn for {ch:?}"); + let backend = self.opts.backend().into(); + let local_epics_hostname = self.opts.local_epics_hostname.clone(); + let array_truncate = self.opts.array_truncate; + let insert_item_sender = self.common_insert_item_queue.sender().await; + let mut conn = CaConn::new( + backend, + addr.clone(), + local_epics_hostname, + self.datastore.clone(), + insert_item_sender, + array_truncate, + 256, + ); + let conn_tx = conn.conn_command_tx(); + let conn_fut = async move { while let Some(_item) = conn.next().await {} }; + let conn_jh = tokio::spawn(conn_fut); + self.conns.insert(*addr, (conn_tx, conn_jh)); + } + if let Some((tx, _)) = self.conns.get(addr) { + let tx = tx.clone(); + let (cmd, rx) = ConnCommand::channel_add(ch.id().into()); + let fut = async move { + tx.send(cmd).await?; + let res = rx.recv().await?; + info!("answer from CaConn: {res:?}"); + Ok(()) + }; + st.pending_op = Some(ChanOp::ConnCmd(Box::pin(fut))); + let cs = ConnectionState { + updated: tsnow, + value: ConnectionStateValue::Unconnected, + }; + *state = WithAddressState::Assigned(cs) + } else { + error!("no CaConn for {ch:?}"); } - None => {} } } } Assigned(_) => { // TODO check if channel is healthy and alive + count_assigned += 1; } } } NoAddress => { // TODO try to find address again after some randomized timeout + //info!("NoAddress {} {:?}", i, ch); + count_no_address += 1; } }, ToRemove { .. } => { @@ -297,51 +442,104 @@ impl Daemon { break; } } + info!( + "{:8} {:8} {:8} {:8} {:8}", + count_unknown_address, count_search_pending, count_unassigned, count_assigned, count_no_address + ); + for (_ch, st) in &mut self.channel_states { + match &mut st.pending_op { + Some(op) => match op { + ChanOp::Finder(s, start) => { + if *start + Duration::from_millis(10000) >= tsnow { + match self.search_tx.try_send(s.clone()) { + Ok(_) => { + st.pending_op = None; + info!("OK, sent msg to Finder"); + } + Err(e) => match e { + async_channel::TrySendError::Full(_) => { + //warn!("Finder channel full"); + *start = tsnow; + } + async_channel::TrySendError::Closed(_) => { + error!("Finder channel closed"); + } + }, + } + } else { + st.pending_op = None; + warn!("ChanOp::Finder timeout"); + } + } + ChanOp::ConnCmd(fut) => { + use std::task::Poll::*; + match futures_util::poll!(fut) { + Ready(res) => { + st.pending_op = None; + match res { + Ok(_) => { + info!("ChanOp::ConnCmd completed fine"); + } + Err(e) => { + error!("ChanOp::ConnCmd {e}"); + } + } + } + Pending => {} + } + } + }, + None => {} + } + } Ok(()) } - fn handle_timer_tick(&mut self) -> Result<(), Error> { - self.check_chans()?; + async fn handle_timer_tick(&mut self) -> Result<(), Error> { + self.check_chans().await?; Ok(()) } fn handle_channel_add(&mut self, ch: Channel) -> Result<(), Error> { if !self.channel_states.contains_key(&ch) { - self.channel_states - .insert(ch, ChannelState::Active(ActiveChannelState::UnknownAddress)); + let st = ChannelState { + value: ChannelStateValue::Active(ActiveChannelState::UnknownAddress), + pending_op: None, + }; + self.channel_states.insert(ch, st); } Ok(()) } fn handle_channel_remove(&mut self, ch: Channel) -> Result<(), Error> { if let Some(k) = self.channel_states.get_mut(&ch) { - match k { - ChannelState::Active(j) => match j { + match &k.value { + ChannelStateValue::Active(j) => match j { ActiveChannelState::UnknownAddress => { - *k = ChannelState::ToRemove { addr: None }; + k.value = ChannelStateValue::ToRemove { addr: None }; } ActiveChannelState::SearchPending { .. } => { - *k = ChannelState::ToRemove { addr: None }; + k.value = ChannelStateValue::ToRemove { addr: None }; } ActiveChannelState::WithAddress { addr, .. } => { - *k = ChannelState::ToRemove { + k.value = ChannelStateValue::ToRemove { addr: Some(addr.clone()), }; } ActiveChannelState::NoAddress => { - *k = ChannelState::ToRemove { addr: None }; + k.value = ChannelStateValue::ToRemove { addr: None }; } }, - ChannelState::ToRemove { .. } => {} + ChannelStateValue::ToRemove { .. } => {} } } Ok(()) } - fn handle_event(&mut self, item: DaemonEvent) -> Result<(), Error> { + async fn handle_event(&mut self, item: DaemonEvent) -> Result<(), Error> { use DaemonEvent::*; match item { - TimerTick => self.handle_timer_tick(), + TimerTick => self.handle_timer_tick().await, ChannelAdd(ch) => self.handle_channel_add(ch), ChannelRemove(ch) => self.handle_channel_remove(ch), SearchDone(res) => { @@ -353,14 +551,16 @@ impl Daemon { let addr = addr.clone(); let ch = Channel::new(res.channel); if let Some(st) = self.channel_states.get_mut(&ch) { - if let ChannelState::Active(ActiveChannelState::SearchPending { .. }) = st { - let stnew = ChannelState::Active(ActiveChannelState::WithAddress { + if let ChannelStateValue::Active(ActiveChannelState::SearchPending { .. }) = + &st.value + { + let stnew = ChannelStateValue::Active(ActiveChannelState::WithAddress { addr, state: WithAddressState::Unassigned { assign_at: SystemTime::now(), }, }); - self.channel_states.insert(ch, stnew); + st.value = stnew; } else { warn!("state for {ch:?} is not SearchPending"); } @@ -385,7 +585,7 @@ impl Daemon { let ticker = { let tx = self.tx.clone(); async move { - let mut ticker = tokio::time::interval(Duration::from_millis(500)); + let mut ticker = tokio::time::interval(Duration::from_millis(1500)); ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); loop { ticker.tick().await; @@ -401,7 +601,7 @@ impl Daemon { match self.rx.recv().await { Ok(item) => { info!("got daemon event {item:?}"); - match self.handle_event(item) { + match self.handle_event(item).await { Ok(_) => {} Err(e) => { error!("daemon: {e}"); @@ -415,6 +615,8 @@ impl Daemon { } } } + warn!("TODO shut down IOC finder properly"); + let _ = &self.ioc_finder_jh; Ok(()) } } @@ -428,10 +630,15 @@ pub async fn run(opts: CaIngestOpts, channels: Vec) -> Result<(), Error> } info!("parsed search_tgts {search_tgts:?}"); let opts2 = DaemonOpts { + backend: opts.backend().into(), + local_epics_hostname: opts.local_epics_hostname().into(), + array_truncate: opts.array_truncate(), + insert_item_queue_cap: opts.insert_item_queue_cap(), + pgconf: opts.postgresql().clone(), + scyconf: opts.scylla().clone(), search_tgts, - search_excl: Vec::new(), }; - let mut daemon = Daemon::new(opts2); + let mut daemon = Daemon::new(opts2).await?; let tx = daemon.tx.clone(); let daemon_jh = taskrun::spawn(async move { // TODO handle Err diff --git a/netfetch/Cargo.toml b/netfetch/Cargo.toml index 6b02d15..597039a 100644 --- a/netfetch/Cargo.toml +++ b/netfetch/Cargo.toml @@ -21,7 +21,7 @@ byteorder = "1.4" futures-util = "0.3" #pin-project-lite = "0.2" scylla = "0.7" -tokio-postgres = "0.7.6" +tokio-postgres = "0.7.7" md-5 = "0.10" hex = "0.4" libc = "0.2" diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index 927c792..935a05e 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -5,7 +5,6 @@ use super::proto::CaMsgTy; use super::proto::CaProto; use super::store::DataStore; use super::ExtraInsertsConf; -use super::IngestCommons; use crate::bsread::ChannelDescDecoded; use crate::ca::proto::CreateChan; use crate::ca::proto::EventAdd; @@ -392,8 +391,6 @@ pub struct CaConn { conn_backoff: f32, conn_backoff_beg: f32, inserts_counter: u64, - #[allow(unused)] - ingest_commons: Arc, extra_inserts_conf: ExtraInsertsConf, } @@ -406,7 +403,6 @@ impl CaConn { insert_item_sender: CommonInsertItemQueueSender, array_truncate: usize, insert_queue_max: usize, - ingest_commons: Arc, ) -> Self { let (cq_tx, cq_rx) = async_channel::bounded(32); Self { @@ -437,7 +433,6 @@ impl CaConn { conn_backoff: 0.02, conn_backoff_beg: 0.02, inserts_counter: 0, - ingest_commons, extra_inserts_conf: ExtraInsertsConf::new(), } } @@ -671,7 +666,11 @@ impl CaConn { self.stats.inserts_queue_push_inc(); self.insert_item_send_fut = None; } - Ready(Err(_)) => break Ready(Err(Error::with_msg_no_trace(format!("can not send the item")))), + Ready(Err(e)) => { + self.insert_item_send_fut = None; + error!("handle_insert_futs can not send item {e}"); + break Ready(Err(Error::with_msg_no_trace(format!("can not send the item")))); + } Pending => { if false { // TODO test this case. @@ -985,7 +984,10 @@ impl CaConn { fn handle_event_add_res(&mut self, ev: proto::EventAddRes, tsnow: Instant) -> Result<(), Error> { // TODO handle subid-not-found which can also be peer error: let cid = *self.cid_by_subid.get(&ev.subid).unwrap(); - // TODO get rid of the string clone when I don't want the log output any longer: + if true { + let name = self.name_by_cid(cid); + info!("event {name:?} {ev:?}"); + } // TODO handle not-found error: let mut series_2 = None; let ch_s = self.channels.get_mut(&cid).unwrap(); diff --git a/netfetch/src/ca/connset.rs b/netfetch/src/ca/connset.rs index 4fb6ef2..1f1ee05 100644 --- a/netfetch/src/ca/connset.rs +++ b/netfetch/src/ca/connset.rs @@ -89,7 +89,6 @@ impl CaConnSet { insert_queue_max: usize, insert_item_queue_sender: CommonInsertItemQueueSender, data_store: Arc, - ingest_commons: Arc, with_channels: Vec, ) -> Result<(), Error> { info!("create new CaConn {:?}", addr); @@ -102,7 +101,6 @@ impl CaConnSet { insert_item_queue_sender, array_truncate, insert_queue_max, - ingest_commons, ); for ch in with_channels { conn.channel_add(ch); @@ -273,7 +271,6 @@ impl CaConnSet { 200, ingest_commons.insert_item_queue.sender().await, ingest_commons.data_store.clone(), - ingest_commons.clone(), vec![channel_name], ) .await?; diff --git a/netfetch/src/ca/findioc.rs b/netfetch/src/ca/findioc.rs index e74326f..a5e3cb8 100644 --- a/netfetch/src/ca/findioc.rs +++ b/netfetch/src/ca/findioc.rs @@ -1,6 +1,6 @@ use crate::ca::proto::{CaMsg, CaMsgTy, HeadInfo}; use err::Error; -use futures_util::{FutureExt, Stream}; +use futures_util::{Future, FutureExt, Stream}; use libc::c_int; use log::*; use std::collections::{BTreeMap, VecDeque}; @@ -69,6 +69,7 @@ pub struct FindIocStream { bids_timed_out: BTreeMap, sids_done: BTreeMap, result_for_done_sid_count: u64, + sleeper: Pin + Send>>, } impl FindIocStream { @@ -94,6 +95,7 @@ impl FindIocStream { in_flight_max: 20, channels_per_batch: 10, batch_run_max: Duration::from_millis(2500), + sleeper: Box::pin(tokio::time::sleep(Duration::from_millis(500))), } } @@ -574,7 +576,13 @@ impl Stream for FindIocStream { continue; } else { if self.channels_input.is_empty() && self.in_flight.is_empty() && self.out_queue.is_empty() { - Ready(None) + match self.sleeper.poll_unpin(cx) { + Ready(_) => { + self.sleeper = Box::pin(tokio::time::sleep(Duration::from_millis(500))); + continue; + } + Pending => Pending, + } } else { Pending }