diff --git a/.github/workflows/build-rhel7.yml b/.github/workflows/build-rhel7.yml index 00fdff5..ea900e9 100644 --- a/.github/workflows/build-rhel7.yml +++ b/.github/workflows/build-rhel7.yml @@ -78,8 +78,10 @@ jobs: working-directory: ${{steps.wdset.outputs.gh}} - run: git clone --branch dev https://github.com/paulscherrerinstitute/daqbuffer.git working-directory: ${{steps.wdset.outputs.gh}}/build - - run: git clone --branch dev https://github.com/paulscherrerinstitute/daqingest.git + - run: git clone https://github.com/paulscherrerinstitute/daqingest.git working-directory: ${{steps.wdset.outputs.gh}}/build + - run: git reset --hard $GITHUB_SHA + working-directory: ${{steps.wdset.outputs.gh}}/build/daqingest # - run: ls -la $GITHUB_WORKSPACE # - run: find $GITHUB_WORKSPACE -type f -and \( -name \*.rs -or -name \*.toml \) # - run: find ${{steps.wdset.outputs.gh}} -type f -and \( -name \*.rs -or -name \*.toml \) @@ -102,8 +104,9 @@ jobs: - run: "echo 'version: [${{env.DAQVER}}]'" - run: echo "SELFPKG=daqingest-$DAQVER" >> $GITHUB_ENV - run: echo "SELFPKGTGT=$SELFPKG-amd64-rhel7" >> $GITHUB_ENV - - run: echo SELFPKG $SELFPKG - - run: echo SELFPKGTGT $SELFPKGTGT + - run: echo DAQVER ..$DAQVER.. + - run: echo SELFPKG ..$SELFPKG.. + - run: echo SELFPKGTGT ..$SELFPKGTGT.. - run: mkdir $SELFPKGTGT - run: cp ${{steps.wdset.outputs.gh}}/build/daqingest/target/release/daqingest $SELFPKGTGT/daqingest - run: tar -czf $SELFPKGTGT.tar.gz $SELFPKGTGT diff --git a/daqingest/src/daemon.rs b/daqingest/src/daemon.rs index 30ee0c8..d65b173 100644 --- a/daqingest/src/daemon.rs +++ b/daqingest/src/daemon.rs @@ -11,6 +11,7 @@ use netfetch::ca::conn::CaConnEvent; use netfetch::ca::conn::ConnCommand; use netfetch::ca::connset::CaConnSet; use netfetch::ca::connset::CaConnSetCtrl; +use netfetch::ca::connset::CaConnSetItem; use netfetch::ca::findioc::FindIocRes; use netfetch::ca::IngestCommons; use netfetch::ca::SlowWarnable; @@ -95,8 +96,8 @@ pub struct Daemon { stats: Arc, shutting_down: bool, insert_rx_weak: WeakReceiver, - channel_info_query_tx: Sender, connset_ctrl: CaConnSetCtrl, + connset_status_last: Instant, query_item_tx: Sender, } @@ -124,10 +125,27 @@ impl Daemon { opts.backend.clone(), opts.local_epics_hostname.clone(), common_insert_item_queue.sender().unwrap().inner().clone(), - channel_info_query_tx.clone(), + channel_info_query_tx, opts.pgconf.clone(), ); + // TODO remove + tokio::spawn({ + let rx = conn_set_ctrl.rx.clone(); + let tx = daemon_ev_tx.clone(); + async move { + loop { + match rx.recv().await { + Ok(item) => { + let item = DaemonEvent::CaConnSetItem(item); + tx.send(item).await; + } + Err(e) => break, + } + } + } + }); + let ingest_commons = IngestCommons { pgconf: Arc::new(opts.pgconf.clone()), backend: opts.backend().into(), @@ -211,8 +229,8 @@ impl Daemon { stats: Arc::new(DaemonStats::new()), shutting_down: false, insert_rx_weak: common_insert_item_queue_2.downgrade(), - channel_info_query_tx, connset_ctrl: conn_set_ctrl, + connset_status_last: Instant::now(), query_item_tx: common_insert_item_queue.sender().unwrap().inner().clone(), }; Ok(ret) @@ -268,10 +286,12 @@ impl Daemon { warn!("Received SIGTERM"); SIGTERM.store(2, atomic::Ordering::Release); } - warn!("TODO let CaConnSet check health"); - // TODO - // self.check_connection_states()?; - // self.check_channel_states().await?; + if self.connset_status_last + Duration::from_millis(2000) < ts1 { + self.connset_ctrl.check_health().await?; + } + if self.connset_status_last + Duration::from_millis(10000) < ts1 { + error!("CaConnSet has not reported health status"); + } let dt = ts1.elapsed(); if dt > Duration::from_millis(500) { info!("slow check_chans {}ms", dt.as_secs_f32() * 1e3); @@ -563,16 +583,31 @@ impl Daemon { } } + async fn handle_ca_conn_set_item(&mut self, item: CaConnSetItem) -> Result<(), Error> { + use CaConnSetItem::*; + match item { + Healthy => { + self.connset_status_last = Instant::now(); + } + } + Ok(()) + } + async fn handle_shutdown(&mut self) -> Result<(), Error> { error!("TODO handle_shutdown"); - // TODO make sure we: - // set a flag so that we don't attempt to use resources any longer (why could that happen?) - // does anybody might still want to communicate with us? can't be excluded. - // send shutdown signal to everyone. - // drop our ends of channels to workers (gate them behind option?). - // await the connection sets. - // await other workers that we've spawned. - self.connset_ctrl.shutdown().await?; + if self.shutting_down { + warn!("already shutting down"); + } else { + self.shutting_down = true; + // TODO make sure we: + // set a flag so that we don't attempt to use resources any longer (why could that happen?) + // does anybody might still want to communicate with us? can't be excluded. + // send shutdown signal to everyone. + // drop our ends of channels to workers (gate them behind option?). + // await the connection sets. + // await other workers that we've spawned. + self.connset_ctrl.shutdown().await?; + } Ok(()) } @@ -580,10 +615,8 @@ impl Daemon { async fn handle_shutdown(&mut self) -> Result<(), Error> { warn!("received shutdown event"); if self.shutting_down { - info!("already shutting down"); Ok(()) } else { - self.shutting_down = true; self.channel_states.clear(); self.ca_conn_send_shutdown().await?; self.ingest_commons.insert_item_queue.drop_sender(); @@ -616,6 +649,7 @@ impl Daemon { ChannelRemove(ch) => self.handle_channel_remove(ch).await, SearchDone(item) => self.handle_search_done(item).await, CaConnEvent(addr, item) => self.handle_ca_conn_event(addr, item).await, + CaConnSetItem(item) => self.handle_ca_conn_set_item(item).await, Shutdown => self.handle_shutdown().await, }; let dt = ts1.elapsed(); diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index eaa2ac5..e24fbd2 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -13,7 +13,6 @@ use dbpg::seriesbychannel::CanSendChannelInfoResult; use dbpg::seriesbychannel::ChannelInfoQuery; use dbpg::seriesbychannel::ChannelInfoResult; use err::Error; -use futures_util::stream::FuturesUnordered; use futures_util::Future; use futures_util::FutureExt; use futures_util::Stream; @@ -36,7 +35,6 @@ use scywriiq::IvlItem; use scywriiq::MuteItem; use scywriiq::QueryItem; use serde::Serialize; -use series::series::Existence; use series::ChannelStatusSeriesId; use series::SeriesId; use stats::CaConnStats; @@ -455,7 +453,6 @@ pub struct CaConn { cid_by_subid: BTreeMap, name_by_cid: BTreeMap, insert_item_queue: VecDeque, - sender_polling: SenderPolling, remote_addr_dbg: SocketAddrV4, local_epics_hostname: String, stats: Arc, @@ -473,7 +470,6 @@ pub struct CaConn { channel_info_query_queue: VecDeque, channel_info_query_sending: SenderPolling, time_binners: BTreeMap, - ts_earliest_warn_poll_slow: Instant, } impl Drop for CaConn { @@ -505,7 +501,6 @@ impl CaConn { cid_by_subid: BTreeMap::new(), name_by_cid: BTreeMap::new(), insert_item_queue: VecDeque::new(), - sender_polling: SenderPolling::new(async_channel::bounded(1).0), remote_addr_dbg, local_epics_hostname, stats: Arc::new(CaConnStats::new()), @@ -523,12 +518,11 @@ impl CaConn { channel_info_query_queue: VecDeque::new(), channel_info_query_sending: SenderPolling::new(channel_info_query_tx), time_binners: BTreeMap::new(), - ts_earliest_warn_poll_slow: Instant::now(), } } fn new_self_ticker() -> Pin> { - Box::pin(tokio::time::sleep(Duration::from_millis(1000))) + Box::pin(tokio::time::sleep(Duration::from_millis(500))) } pub fn conn_command_tx(&self) -> async_channel::Sender { @@ -623,6 +617,7 @@ impl CaConn { } fn cmd_shutdown(&mut self) { + debug!("cmd_shutdown {}", self.remote_addr_dbg); self.trigger_shutdown(ChannelStatusClosedReason::ShutdownCommand); } @@ -660,17 +655,17 @@ impl CaConn { match self.channel_to_evented(cid, sid, data_type, data_count, series) { Ok(_) => {} Err(e) => { - error!("channel_to_evented {e}"); + error!("handle_series_lookup_result {e}"); } } } else { - warn!("TODO channel in bad state, reset"); + warn!("TODO handle_series_lookup_result channel in bad state, reset"); } } else { - warn!("TODO channel in bad state, reset"); + warn!("TODO handle_series_lookup_result channel in bad state, reset"); } } else { - warn!("TODO channel in bad state, reset"); + warn!("TODO handle_series_lookup_result channel in bad state, reset"); } } Err(e) => { @@ -683,11 +678,10 @@ impl CaConn { fn handle_conn_command(&mut self, cx: &mut Context) -> Poll>> { // TODO if this loops for too long time, yield and make sure we get wake up again. use Poll::*; - trace!("handle_conn_command {}", self.remote_addr_dbg); self.stats.caconn_loop3_count_inc(); match self.conn_command_rx.poll_next_unpin(cx) { Ready(Some(a)) => { - trace!("handle_conn_command received a command"); + trace!("handle_conn_command received a command {}", self.remote_addr_dbg); match a.kind { ConnCommandKind::ChannelAdd(name, cssid) => { self.cmd_channel_add(name, cssid); @@ -1019,9 +1013,6 @@ impl CaConn { info_store_msp_last: info_store_msp_from_time(SystemTime::now()), }; *ch_s = ChannelState::Created(series, created_state); - let scalar_type = ScalarType::from_ca_id(data_type)?; - let shape = Shape::from_ca_count(data_count)?; - error!("TODO channel_to_evented make sure we get polled again?"); Ok(()) } @@ -1534,8 +1525,7 @@ impl CaConn { Break(Pending) } - // `?` works not in here. - fn handle_conn_state(&mut self, cx: &mut Context) -> Option>> { + fn handle_conn_state(&mut self, cx: &mut Context) -> Result>, Error> { use Poll::*; match &mut self.state { CaConnState::Unconnected => { @@ -1543,7 +1533,7 @@ impl CaConn { trace!("create tcp connection to {:?}", (addr.ip(), addr.port())); let fut = tokio::time::timeout(Duration::from_millis(1000), TcpStream::connect(addr)); self.state = CaConnState::Connecting(addr, Box::pin(fut)); - None + Ok(None) } CaConnState::Connecting(ref addr, ref mut fut) => { match fut.poll_unpin(cx) { @@ -1561,7 +1551,7 @@ impl CaConn { let proto = CaProto::new(tcp, self.remote_addr_dbg.clone(), self.opts.array_truncate); self.state = CaConnState::Init; self.proto = Some(proto); - None + Ok(None) } Ok(Err(_e)) => { // TODO log with exponential backoff @@ -1575,7 +1565,7 @@ impl CaConn { let dt = self.backoff_next(); self.state = CaConnState::Wait(wait_fut(dt)); self.proto = None; - None + Ok(None) } Err(e) => { // TODO log with exponential backoff @@ -1590,11 +1580,11 @@ impl CaConn { let dt = self.backoff_next(); self.state = CaConnState::Wait(wait_fut(dt)); self.proto = None; - None + Ok(None) } } } - Pending => Some(Pending), + Pending => Ok(Some(Pending)), } } CaConnState::Init => { @@ -1611,49 +1601,52 @@ impl CaConn { }; proto.push_out(msg); self.state = CaConnState::Listen; - None + Ok(None) } CaConnState::Listen => match { let res = self.handle_conn_listen(cx); res } { - Ready(Some(Ok(()))) => Some(Ready(Ok(()))), - Ready(Some(Err(e))) => Some(Ready(Err(e))), - Ready(None) => None, - Pending => Some(Pending), + Ready(Some(Ok(()))) => Ok(Some(Ready(()))), + Ready(Some(Err(e))) => Err(e), + Ready(None) => Ok(None), + Pending => Ok(Some(Pending)), }, CaConnState::PeerReady => { let res = self.handle_peer_ready(cx); match res { - Ready(Some(Ok(()))) => None, - Ready(Some(Err(e))) => Some(Ready(Err(e))), - Ready(None) => None, - Pending => Some(Pending), + Ready(Some(Ok(()))) => Ok(None), + Ready(Some(Err(e))) => Err(e), + Ready(None) => Ok(None), + Pending => Ok(Some(Pending)), } } CaConnState::Wait(inst) => match inst.poll_unpin(cx) { Ready(_) => { self.state = CaConnState::Unconnected; self.proto = None; - None + Ok(None) } - Pending => Some(Pending), + Pending => Ok(Some(Pending)), }, - CaConnState::Shutdown => None, + CaConnState::Shutdown => Ok(None), } } - fn loop_inner(&mut self, cx: &mut Context) -> Option>> { + fn loop_inner(&mut self, cx: &mut Context) -> Result>, Error> { + use Poll::*; loop { self.stats.caconn_loop2_count_inc(); - if let Some(v) = self.handle_conn_state(cx) { - break Some(v); + if self.is_shutdown() { + break Ok(None); } if self.insert_item_queue.len() >= self.opts.insert_queue_max { - break None; + break Ok(None); } - if self.is_shutdown() { - break None; + match self.handle_conn_state(cx)? { + Some(Ready(_)) => continue, + Some(Pending) => break Ok(Some(Pending)), + None => break Ok(None), } } } @@ -1699,7 +1692,31 @@ impl CaConn { Self::apply_channel_ops_with_res(res) } - fn handle_own_ticker_tick(mut self: Pin<&mut Self>, cx: &mut Context) -> Result<(), Error> { + fn handle_own_ticker(mut self: Pin<&mut Self>, cx: &mut Context) -> Result<(), Error> { + use Poll::*; + match self.ticker.poll_unpin(cx) { + Ready(()) => { + match self.as_mut().handle_own_ticker_tick(cx) { + Ok(_) => { + if !self.is_shutdown() { + self.ticker = Self::new_self_ticker(); + let _ = self.ticker.poll_unpin(cx); + // cx.waker().wake_by_ref(); + } + Ok(()) + } + Err(e) => { + error!("handle_own_ticker {e}"); + self.trigger_shutdown(ChannelStatusClosedReason::InternalError); + Err(e) + } + } + } + Pending => Ok(()), + } + } + + fn handle_own_ticker_tick(self: Pin<&mut Self>, _cx: &mut Context) -> Result<(), Error> { let this = self.get_mut(); for (_, tb) in this.time_binners.iter_mut() { let iiq = &mut this.insert_item_queue; @@ -1709,7 +1726,27 @@ impl CaConn { } fn outgoing_queues_empty(&self) -> bool { - self.insert_item_queue.is_empty() && !self.sender_polling.is_sending() + self.channel_info_query_queue.is_empty() && !self.channel_info_query_sending.is_sending() + } + + fn attempt_flush_channel_info_query(mut self: Pin<&mut Self>, cx: &mut Context) -> Result<(), Error> { + use Poll::*; + loop { + let sd = &mut self.channel_info_query_sending; + break if sd.is_sending() { + match sd.poll_unpin(cx) { + Ready(Ok(())) => continue, + Ready(Err(e)) => Err(Error::with_msg_no_trace("can not send into channel")), + Pending => Ok(()), + } + } else if let Some(item) = self.channel_info_query_queue.pop_front() { + let sd = &mut self.channel_info_query_sending; + sd.send2(item); + continue; + } else { + Ok(()) + }; + } } } @@ -1718,127 +1755,88 @@ impl Stream for CaConn { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; - let poll_ts1 = Instant::now(); self.stats.caconn_poll_count_inc(); - match self.ticker.poll_unpin(cx) { - Ready(()) => { - match self.as_mut().handle_own_ticker_tick(cx) { - Ok(_) => { - let _ = self.ticker.poll_unpin(cx); - } - Err(e) => { - error!("{e}"); - self.trigger_shutdown(ChannelStatusClosedReason::InternalError); - return Ready(Some(Err(e))); - } - } - self.ticker = Self::new_self_ticker(); - let _ = self.ticker.poll_unpin(cx); - // cx.waker().wake_by_ref(); - } - Pending => {} - } - let ret = if let Some(item) = self.cmd_res_queue.pop_front() { - Ready(Some(Ok(CaConnEvent { - ts: Instant::now(), - value: CaConnEventValue::ConnCommandResult(item), - }))) - } else if let Some(item) = self.ca_conn_event_out_queue.pop_front() { - Ready(Some(Ok(item))) - } else if let Some(item) = self.insert_item_queue.pop_front() { - let ev = CaConnEvent { - ts: Instant::now(), - value: CaConnEventValue::QueryItem(item), - }; - Ready(Some(Ok(ev))) - } else { - let _ = loop { - let sd = &mut self.channel_info_query_sending; - break if sd.is_sending() { - match sd.poll_unpin(cx) { - Ready(Ok(())) => continue, - Ready(Err(e)) => Ready(Some(e)), - Pending => Pending, - } - } else if let Some(item) = self.channel_info_query_queue.pop_front() { - let sd = &mut self.channel_info_query_sending; - sd.send2(item); - continue; - } else { - Ready(None) + loop { + let mut have_pending = false; + break if let Err(e) = self.as_mut().handle_own_ticker(cx) { + Ready(Some(Err(e))) + } else if let Some(item) = self.cmd_res_queue.pop_front() { + let item = CaConnEvent { + ts: Instant::now(), + value: CaConnEventValue::ConnCommandResult(item), }; - }; - let ret = loop { - self.stats.caconn_loop1_count_inc(); - loop { - break if self.is_shutdown() { - () - } else { - match self.handle_conn_command(cx) { - Ready(Some(Ok(_))) => (), - Ready(Some(Err(e))) => { - error!("{e}"); - self.trigger_shutdown(ChannelStatusClosedReason::InternalError); - () - } - Ready(None) => { - warn!("command input queue closed, do shutdown"); - self.trigger_shutdown(ChannelStatusClosedReason::InternalError); - () - } - Pending => (), - } - }; - } + Ready(Some(Ok(item))) + } else if let Some(item) = self.ca_conn_event_out_queue.pop_front() { + Ready(Some(Ok(item))) + } else if let Some(item) = self.insert_item_queue.pop_front() { + let ev = CaConnEvent { + ts: Instant::now(), + value: CaConnEventValue::QueryItem(item), + }; + Ready(Some(Ok(ev))) + } else if let Err(e) = self.as_mut().attempt_flush_channel_info_query(cx) { + Ready(Some(Err(e))) + } else if let Ready(Some(Err(e))) = self.as_mut().handle_conn_command(cx) { + Ready(Some(Err(e))) + } else if let Some(item) = { if self.is_shutdown() { - if self.outgoing_queues_empty() { - debug!("shut down and all items flushed {}", self.remote_addr_dbg); - break Ready(Ok(())); - } else { - // trace!("more items {}", self.insert_item_queue.len()); + None + } else { + match self.loop_inner(cx) { + // TODO what does this mean: should we re-loop or yield something? + Ok(Some(Ready(()))) => None, + // This is the last step, so we yield Pending. + // But in general, this does not compose well when we would add another step. + Ok(Some(Pending)) => { + have_pending = true; + None + } + Ok(None) => None, + Err(e) => Some(Err(e)), } } - if self.insert_item_queue.len() >= self.opts.insert_queue_max { - break Pending; - } - if !self.is_shutdown() { - if let Some(v) = self.loop_inner(cx) { - break v; - } + } { + Ready(Some(item)) + } else { + // Ready(_) => self.stats.conn_stream_ready_inc(), + // Pending => self.stats.conn_stream_pending_inc(), + let _item = CaConnEvent { + ts: Instant::now(), + value: CaConnEventValue::None, + }; + if have_pending { + Pending + } else { + continue; } }; - match &ret { - Ready(_) => self.stats.conn_stream_ready_inc(), - Pending => self.stats.conn_stream_pending_inc(), - } - if self.is_shutdown() && self.outgoing_queues_empty() { - debug!("end stream {}", self.remote_addr_dbg); - Ready(None) - } else { - match ret { - Ready(Ok(())) => { - let item = CaConnEvent { - ts: Instant::now(), - value: CaConnEventValue::None, - }; - Ready(Some(Ok(item))) - } - Ready(Err(e)) => Ready(Some(Err(e))), - Pending => Pending, - } - } - }; - { - let tsnow = Instant::now(); - let dt = tsnow.saturating_duration_since(poll_ts1); - if dt > Duration::from_millis(40) { - if poll_ts1 > self.ts_earliest_warn_poll_slow { - // TODO factor out the rate limit logic in reusable type - self.ts_earliest_warn_poll_slow = tsnow + Duration::from_millis(2000); - warn!("slow poll: {}ms", dt.as_secs_f32() * 1e3); - } - } } + } +} + +pub struct PollTimer { + inp: INP, +} + +impl PollTimer { + pub fn new(inp: INP) -> Self { + Self { inp } + } +} + +impl Stream for PollTimer +where + INP: Stream + Unpin, +{ + type Item = ::Item; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + let poll_ts1 = Instant::now(); + let inp = &mut self.inp; + let ret = inp.poll_next_unpin(cx); + let poll_ts2 = Instant::now(); + let dt = poll_ts2.saturating_duration_since(poll_ts1); + if dt > Duration::from_millis(40) {} ret } } diff --git a/netfetch/src/ca/connset.rs b/netfetch/src/ca/connset.rs index 74f8a46..ea4ef75 100644 --- a/netfetch/src/ca/connset.rs +++ b/netfetch/src/ca/connset.rs @@ -126,9 +126,15 @@ pub enum CaConnSetEvent { CaConnEvent((SocketAddr, CaConnEvent)), } +#[derive(Debug)] +pub enum CaConnSetItem { + Healthy, +} + #[derive(Clone)] pub struct CaConnSetCtrl { tx: Sender, + pub rx: Receiver, } impl CaConnSetCtrl { @@ -187,9 +193,11 @@ pub struct CaConnSet { connset_rx: Receiver, channel_info_query_tx: Sender, storage_insert_tx: Sender, - shutdown: bool, + shutdown_stopping: bool, + shutdown_done: bool, chan_check_next: Option, stats: CaConnSetStats, + connset_out_tx: Sender, } impl CaConnSet { @@ -200,6 +208,7 @@ impl CaConnSet { channel_info_query_tx: Sender, pgconf: Database, ) -> CaConnSetCtrl { + let (connset_out_tx, connset_out_rx) = async_channel::bounded(256); let (connset_tx, connset_rx) = async_channel::bounded(10000); let (search_tx, ioc_finder_jh) = super::finder::start_finder(connset_tx.clone(), backend.clone(), pgconf); let connset = Self { @@ -212,13 +221,18 @@ impl CaConnSet { connset_rx, channel_info_query_tx, storage_insert_tx, - shutdown: false, + shutdown_stopping: false, + shutdown_done: false, chan_check_next: None, stats: CaConnSetStats::new(), + connset_out_tx, }; // TODO await on jh let jh = tokio::spawn(CaConnSet::run(connset)); - CaConnSetCtrl { tx: connset_tx } + CaConnSetCtrl { + tx: connset_tx, + rx: connset_out_rx, + } } async fn run(mut this: CaConnSet) -> Result<(), Error> { @@ -227,11 +241,11 @@ impl CaConnSet { match x { Ok(ev) => this.handle_event(ev).await?, Err(_) => { - if this.shutdown { + if this.shutdown_done { // all fine break Ok(()); } else { - error!("channel closed without shutdown"); + error!("channel closed without shutdown_done"); } } } @@ -245,16 +259,9 @@ impl CaConnSet { ConnSetCmd::ChannelAddWithStatusId(x) => self.handle_add_channel_with_status_id(x).await, ConnSetCmd::ChannelAddWithAddr(x) => self.handle_add_channel_with_addr(x).await, ConnSetCmd::IocAddrQueryResult(x) => self.handle_ioc_query_result(x).await, - ConnSetCmd::CheckHealth => { - error!("TODO implement check health"); - Ok(()) - } - ConnSetCmd::Shutdown => { - debug!("shutdown received"); - self.shutdown = true; - Ok(()) - } ConnSetCmd::SeriesLookupResult(x) => self.handle_series_lookup_result(x).await, + ConnSetCmd::CheckHealth => self.handle_check_health().await, + ConnSetCmd::Shutdown => self.handle_shutdown().await, }, CaConnSetEvent::CaConnEvent((addr, ev)) => match ev.value { CaConnEventValue::None => Ok(()), @@ -264,7 +271,7 @@ impl CaConnSet { self.storage_insert_tx.send(item).await?; Ok(()) } - CaConnEventValue::EndOfStream => todo!(), + CaConnEventValue::EndOfStream => self.handle_ca_conn_eos(addr).await, }, } } @@ -398,6 +405,44 @@ impl CaConnSet { Ok(()) } + async fn handle_check_health(&mut self) -> Result<(), Error> { + debug!("TODO handle_check_health"); + let item = CaConnSetItem::Healthy; + self.connset_out_tx.send(item).await?; + Ok(()) + } + + async fn handle_shutdown(&mut self) -> Result<(), Error> { + debug!("TODO handle_shutdown"); + debug!("shutdown received"); + self.shutdown_stopping = true; + for (addr, res) in self.ca_conn_ress.iter() { + let item = ConnCommand::shutdown(); + res.sender.send(item).await?; + } + Ok(()) + } + + async fn handle_ca_conn_eos(&mut self, addr: SocketAddr) -> Result<(), Error> { + debug!("handle_ca_conn_eos {addr}"); + if let Some(e) = self.ca_conn_ress.remove(&addr) { + match e.jh.await { + Ok(Ok(())) => { + debug!("CaConn {addr} finished well"); + } + Ok(Err(e)) => { + error!("CaConn {addr} task error: {e}"); + } + Err(e) => { + error!("CaConn {addr} join error: {e}"); + } + } + } else { + warn!("end-of-stream received for non-existent CaConn {addr}"); + } + Ok(()) + } + fn create_ca_conn(&self, add: ChannelAddWithAddr) -> Result { // TODO should we save this as event? let opts = CaConnOpts::default(); diff --git a/netfetch/src/daemon_common.rs b/netfetch/src/daemon_common.rs index 3c573ea..65eaf0d 100644 --- a/netfetch/src/daemon_common.rs +++ b/netfetch/src/daemon_common.rs @@ -1,4 +1,5 @@ use crate::ca::conn::CaConnEvent; +use crate::ca::connset::CaConnSetItem; use crate::ca::findioc::FindIocRes; use async_channel::Sender; use err::Error; @@ -28,6 +29,7 @@ pub enum DaemonEvent { ChannelRemove(Channel), SearchDone(Result, Error>), CaConnEvent(SocketAddrV4, CaConnEvent), + CaConnSetItem(CaConnSetItem), Shutdown, } @@ -49,6 +51,7 @@ impl DaemonEvent { EndOfStream => format!("CaConnEvent/EndOfStream"), } } + CaConnSetItem(_) => format!("CaConnSetItem"), Shutdown => format!("Shutdown"), } }