From 869ed8e5dd86e02664dfc11ca394fac2c8b2fe55 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Wed, 13 Sep 2023 07:28:34 +0200 Subject: [PATCH] Clean up shutdown --- daqingest/src/daemon.rs | 15 +- dbpg/src/seriesbychannel.rs | 9 +- netfetch/src/ca/conn.rs | 124 ++++--- netfetch/src/ca/connset.rs | 451 ++++++++++++++++++------- netfetch/src/ca/connset_input_merge.rs | 45 ++- netfetch/src/senderpolling.rs | 31 +- scywr/src/iteminsertqueue.rs | 25 +- 7 files changed, 501 insertions(+), 199 deletions(-) diff --git a/daqingest/src/daemon.rs b/daqingest/src/daemon.rs index bc6e917..415d251 100644 --- a/daqingest/src/daemon.rs +++ b/daqingest/src/daemon.rs @@ -295,7 +295,7 @@ impl Daemon { } async fn handle_channel_add(&mut self, ch: Channel) -> Result<(), Error> { - debug!("handle_channel_add {ch:?}"); + // debug!("handle_channel_add {ch:?}"); self.connset_ctrl .add_channel( self.opts.backend.clone(), @@ -380,12 +380,15 @@ impl Daemon { } } } + Error(e) => { + error!("error from CaConnSet: {e}"); + self.handle_shutdown().await?; + } } Ok(()) } async fn handle_shutdown(&mut self) -> Result<(), Error> { - error!("TODO handle_shutdown"); if self.shutting_down { warn!("already shutting down"); } else { @@ -398,6 +401,7 @@ impl Daemon { // await the connection sets. // await other workers that we've spawned. self.connset_ctrl.shutdown().await?; + self.rx.close(); } Ok(()) } @@ -601,9 +605,12 @@ pub async fn run(opts: CaIngestOpts, channels: Vec) -> Result<(), Error> let mut i = 0; for s in &channels { let ch = Channel::new(s.into()); - tx.send(DaemonEvent::ChannelAdd(ch)).await?; + match tx.send(DaemonEvent::ChannelAdd(ch)).await { + Ok(()) => {} + Err(_) => break, + } i += 1; - if i % 1000 == 0 { + if i % 100 == 0 { debug!("sent {} ChannelAdd", i); } } diff --git a/dbpg/src/seriesbychannel.rs b/dbpg/src/seriesbychannel.rs index e5aa31e..ab573a5 100644 --- a/dbpg/src/seriesbychannel.rs +++ b/dbpg/src/seriesbychannel.rs @@ -321,12 +321,15 @@ impl Worker { }; trace3!("try to send result for {:?}", item); let fut = r.tx.make_send(Ok(item)); - match fut.await { - Ok(()) => {} - Err(_e) => { + match tokio::time::timeout(Duration::from_millis(2000), fut).await { + Ok(Ok(())) => {} + Ok(Err(_e)) => { warn!("can not deliver result"); return Err(Error::ChannelError); } + Err(_) => { + debug!("timeout can not deliver result"); + } } } } diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index e585a0e..4f58e09 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -3,6 +3,7 @@ use super::ExtraInsertsConf; use crate::senderpolling::SenderPolling; use crate::timebin::ConnTimeBin; use async_channel::Sender; +use core::fmt; use dbpg::seriesbychannel::CanSendChannelInfoResult; use dbpg::seriesbychannel::ChannelInfoQuery; use dbpg::seriesbychannel::ChannelInfoResult; @@ -272,6 +273,21 @@ enum CaConnState { EndOfStream, } +impl fmt::Debug for CaConnState { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + match self { + Self::Unconnected => write!(fmt, "Unconnected"), + Self::Connecting(arg0, _) => fmt.debug_tuple("Connecting").field(arg0).finish(), + Self::Init => write!(fmt, "Init"), + Self::Listen => write!(fmt, "Listen"), + Self::PeerReady => write!(fmt, "PeerReady"), + Self::Wait(_) => fmt.debug_tuple("Wait").finish(), + Self::Shutdown => write!(fmt, "Shutdown"), + Self::EndOfStream => write!(fmt, "EndOfStream"), + } + } +} + fn wait_fut(dt: u64) -> Pin + Send>> { let fut = tokio::time::sleep(Duration::from_millis(dt)); Box::pin(fut) @@ -672,37 +688,41 @@ impl CaConn { use Poll::*; loop { self.stats.caconn_loop3_count.inc(); - break match self.conn_command_rx.poll_next_unpin(cx) { - Ready(Some(a)) => { - trace3!("handle_conn_command received a command {}", self.remote_addr_dbg); - match a.kind { - ConnCommandKind::ChannelAdd(name, cssid) => { - self.cmd_channel_add(name, cssid); - Ready(Some(Ok(()))) + break if self.is_shutdown() { + Ready(None) + } else { + match self.conn_command_rx.poll_next_unpin(cx) { + Ready(Some(a)) => { + trace3!("handle_conn_command received a command {}", self.remote_addr_dbg); + match a.kind { + ConnCommandKind::ChannelAdd(name, cssid) => { + self.cmd_channel_add(name, cssid); + Ready(Some(Ok(()))) + } + ConnCommandKind::ChannelRemove(name) => { + self.cmd_channel_remove(name); + Ready(Some(Ok(()))) + } + ConnCommandKind::CheckHealth => { + self.cmd_check_health(); + Ready(Some(Ok(()))) + } + ConnCommandKind::Shutdown => { + self.cmd_shutdown(); + Ready(Some(Ok(()))) + } + ConnCommandKind::SeriesLookupResult(x) => match self.handle_series_lookup_result(x) { + Ok(()) => Ready(Some(Ok(()))), + Err(e) => Ready(Some(Err(e))), + }, } - ConnCommandKind::ChannelRemove(name) => { - self.cmd_channel_remove(name); - Ready(Some(Ok(()))) - } - ConnCommandKind::CheckHealth => { - self.cmd_check_health(); - Ready(Some(Ok(()))) - } - ConnCommandKind::Shutdown => { - self.cmd_shutdown(); - Ready(Some(Ok(()))) - } - ConnCommandKind::SeriesLookupResult(x) => match self.handle_series_lookup_result(x) { - Ok(()) => Ready(Some(Ok(()))), - Err(e) => Ready(Some(Err(e))), - }, } + Ready(None) => { + error!("Command queue closed"); + Ready(None) + } + Pending => Pending, } - Ready(None) => { - error!("Command queue closed"); - Ready(None) - } - Pending => Pending, }; } } @@ -1688,6 +1708,7 @@ impl CaConn { } fn handle_own_ticker_tick(self: Pin<&mut Self>, _cx: &mut Context) -> Result<(), Error> { + debug!("tick CaConn {}", self.remote_addr_dbg); let this = self.get_mut(); if false { for (_, tb) in this.time_binners.iter_mut() { @@ -1699,26 +1720,32 @@ impl CaConn { } fn queues_async_out_flushed(&self) -> bool { - self.channel_info_query_queue.is_empty() && self.channel_info_query_sending.is_idle() + // self.channel_info_query_queue.is_empty() && self.channel_info_query_sending.is_idle() + // TODO re-enable later + true } fn attempt_flush_channel_info_query(mut self: Pin<&mut Self>, cx: &mut Context) -> Result<(), Error> { use Poll::*; loop { - let sd = &mut self.channel_info_query_sending; - break if sd.is_sending() { - match sd.poll_unpin(cx) { - Ready(Ok(())) => continue, - Ready(Err(_)) => Err(Error::with_msg_no_trace("can not send into channel")), - Pending => Ok(()), - } - } else if let Some(item) = self.channel_info_query_queue.pop_front() { - trace3!("send series query {item:?}"); - let sd = &mut self.channel_info_query_sending; - sd.send(item); - continue; - } else { + break if self.is_shutdown() { Ok(()) + } else { + let sd = &mut self.channel_info_query_sending; + if sd.is_sending() { + match sd.poll_unpin(cx) { + Ready(Ok(())) => continue, + Ready(Err(_)) => Err(Error::with_msg_no_trace("can not send into channel")), + Pending => Ok(()), + } + } else if let Some(item) = self.channel_info_query_queue.pop_front() { + trace3!("send series query {item:?}"); + let sd = &mut self.channel_info_query_sending; + sd.send(item); + continue; + } else { + Ok(()) + } }; } } @@ -1764,10 +1791,15 @@ impl Stream for CaConn { ts: Instant::now(), value: CaConnEventValue::None, }; - if self.is_shutdown() && self.queues_async_out_flushed() { - debug!("end of stream {}", self.remote_addr_dbg); - self.state = CaConnState::EndOfStream; - Ready(None) + if self.is_shutdown() { + if self.queues_async_out_flushed() == false { + debug!("shutdown, but async queues not flushed"); + continue; + } else { + debug!("end of stream {}", self.remote_addr_dbg); + self.state = CaConnState::EndOfStream; + Ready(None) + } } else { continue; } diff --git a/netfetch/src/ca/connset.rs b/netfetch/src/ca/connset.rs index 36a48a4..217bc64 100644 --- a/netfetch/src/ca/connset.rs +++ b/netfetch/src/ca/connset.rs @@ -15,6 +15,7 @@ use crate::daemon_common::Channel; use crate::errconv::ErrConv; use crate::rt::JoinHandle; use crate::rt::TokMx; +use crate::senderpolling::SenderPolling; use async_channel::Receiver; use async_channel::Sender; use atomic::AtomicUsize; @@ -24,6 +25,7 @@ use dbpg::seriesbychannel::ChannelInfoQuery; use dbpg::seriesbychannel::ChannelInfoResult; use err::Error; use futures_util::FutureExt; +use futures_util::Stream; use futures_util::StreamExt; use log::*; use netpod::Database; @@ -46,8 +48,11 @@ use std::collections::BTreeMap; use std::collections::VecDeque; use std::net::SocketAddr; use std::net::SocketAddrV4; +use std::pin::Pin; use std::sync::atomic; use std::sync::Arc; +use std::task::Context; +use std::task::Poll; use std::time::Duration; use std::time::Instant; use std::time::SystemTime; @@ -144,12 +149,8 @@ pub struct ChannelRemove { #[derive(Debug)] pub enum ConnSetCmd { - SeriesLookupResult(Result), ChannelAdd(ChannelAdd), - ChannelAddWithStatusId(ChannelAddWithStatusId), - ChannelAddWithAddr(ChannelAddWithAddr), ChannelRemove(ChannelRemove), - IocAddrQueryResult(VecDeque), CheckHealth(Instant), Shutdown, } @@ -157,11 +158,11 @@ pub enum ConnSetCmd { #[derive(Debug)] pub enum CaConnSetEvent { ConnSetCmd(ConnSetCmd), - CaConnEvent((SocketAddr, CaConnEvent)), } #[derive(Debug, Clone)] pub enum CaConnSetItem { + Error(Error), Healthy(Instant, Instant), } @@ -218,14 +219,14 @@ pub struct IocAddrQuery { } struct SeriesLookupSender { - tx: Sender, + tx: Sender>, } impl CanSendChannelInfoResult for SeriesLookupSender { fn make_send(&self, item: Result) -> BoxedSend { let tx = self.tx.clone(); let fut = async move { - tx.send(CaConnSetEvent::ConnSetCmd(ConnSetCmd::SeriesLookupResult(item))) + tx.send(item.map_err(|e| Error::with_msg_no_trace(e.to_string()))) .await .map_err(|_| ()) }; @@ -236,20 +237,29 @@ impl CanSendChannelInfoResult for SeriesLookupSender { pub struct CaConnSet { backend: String, local_epics_hostname: String, - search_tx: Sender, ca_conn_ress: BTreeMap, channel_states: ChannelStateMap, - connset_tx: Sender, - // connset_rx: Receiver, - connset_rx: crate::ca::connset_input_merge::InputMerge, + connset_inp_rx: Receiver, + channel_info_query_queue: VecDeque, + channel_info_query_sender: SenderPolling, channel_info_query_tx: Sender, - storage_insert_tx: Sender, + channel_info_res_tx: Sender>, + channel_info_res_rx: Receiver>, + find_ioc_query_queue: VecDeque, + find_ioc_query_sender: SenderPolling, + find_ioc_res_rx: Receiver>, + storage_insert_queue: VecDeque, + storage_insert_sender: SenderPolling, + ca_conn_res_tx: Sender<(SocketAddr, CaConnEvent)>, + ca_conn_res_rx: Receiver<(SocketAddr, CaConnEvent)>, + connset_out_queue: VecDeque, + connset_out_tx: Sender, shutdown_stopping: bool, shutdown_done: bool, chan_check_next: Option, stats: CaConnSetStats, - connset_out_tx: Sender, ioc_finder_jh: JoinHandle>, + await_ca_conn_jhs: VecDeque<(SocketAddr, JoinHandle>)>, } impl CaConnSet { @@ -260,28 +270,40 @@ impl CaConnSet { channel_info_query_tx: Sender, pgconf: Database, ) -> CaConnSetCtrl { - let (connset_inp_tx, connset_inp_rx) = async_channel::bounded(256); - let (connset_out_tx, connset_out_rx) = async_channel::bounded(256); - let (find_ioc_res_tx, find_ioc_res_rx) = async_channel::bounded(10000); - let (search_tx, ioc_finder_jh) = super::finder::start_finder(find_ioc_res_tx.clone(), backend.clone(), pgconf); - let input_merge = InputMerge::new(todo!(), find_ioc_res_rx); + let (ca_conn_res_tx, ca_conn_res_rx) = async_channel::bounded(5000); + let (connset_inp_tx, connset_inp_rx) = async_channel::bounded(5000); + let (connset_out_tx, connset_out_rx) = async_channel::bounded(5000); + let (find_ioc_res_tx, find_ioc_res_rx) = async_channel::bounded(5000); + let (find_ioc_query_tx, ioc_finder_jh) = + super::finder::start_finder(find_ioc_res_tx.clone(), backend.clone(), pgconf); + let (channel_info_res_tx, channel_info_res_rx) = async_channel::bounded(5000); let connset = Self { backend, local_epics_hostname, - search_tx, ca_conn_ress: BTreeMap::new(), channel_states: ChannelStateMap::new(), - connset_tx: connset_inp_tx, - // connset_rx: find_ioc_res_rx, - connset_rx: todo!(), + connset_inp_rx, + channel_info_query_queue: VecDeque::new(), + channel_info_query_sender: SenderPolling::new(channel_info_query_tx.clone()), channel_info_query_tx, - storage_insert_tx, + channel_info_res_tx, + channel_info_res_rx, + find_ioc_query_queue: VecDeque::new(), + find_ioc_query_sender: SenderPolling::new(find_ioc_query_tx), + find_ioc_res_rx, + storage_insert_queue: VecDeque::new(), + storage_insert_sender: SenderPolling::new(storage_insert_tx), + ca_conn_res_tx, + ca_conn_res_rx, shutdown_stopping: false, shutdown_done: false, chan_check_next: None, stats: CaConnSetStats::new(), connset_out_tx, + connset_out_queue: VecDeque::new(), + // connset_out_sender: SenderPolling::new(connset_out_tx), ioc_finder_jh, + await_ca_conn_jhs: VecDeque::new(), }; // TODO await on jh let jh = tokio::spawn(CaConnSet::run(connset)); @@ -294,66 +316,60 @@ impl CaConnSet { async fn run(mut this: CaConnSet) -> Result<(), Error> { loop { - let x = this.connset_rx.next().await; + let x = this.next().await; match x { - Some(ev) => this.handle_event(ev).await?, - None => { - if this.shutdown_stopping { - // all fine - break; - } else { - error!("channel closed without shutdown_stopping"); - } - } - } - if this.shutdown_stopping { - break; + Some(x) => this.connset_out_tx.send(x).await?, + None => break, } } - debug!( - "search_tx sender {} receiver {}", - this.search_tx.sender_count(), - this.search_tx.receiver_count() - ); + // debug!( + // "search_tx sender {} receiver {}", + // this.find_ioc_query_tx.sender_count(), + // this.find_ioc_query_tx.receiver_count() + // ); this.ioc_finder_jh .await .map_err(|e| Error::with_msg_no_trace(e.to_string()))??; debug!("joined ioc_finder_jh"); this.connset_out_tx.close(); - this.connset_rx.close(); + this.connset_inp_rx.close(); this.shutdown_done = true; Ok(()) } - async fn handle_event(&mut self, ev: CaConnSetEvent) -> Result<(), Error> { + fn handle_event(&mut self, ev: CaConnSetEvent) -> Result<(), Error> { + // trace!("handle_event {ev:?}"); match ev { CaConnSetEvent::ConnSetCmd(cmd) => match cmd { - ConnSetCmd::ChannelAdd(x) => self.handle_add_channel(x).await, - ConnSetCmd::ChannelAddWithStatusId(x) => self.handle_add_channel_with_status_id(x).await, - ConnSetCmd::ChannelAddWithAddr(x) => self.handle_add_channel_with_addr(x).await, - ConnSetCmd::ChannelRemove(x) => self.handle_remove_channel(x).await, - ConnSetCmd::IocAddrQueryResult(x) => self.handle_ioc_query_result(x).await, - ConnSetCmd::SeriesLookupResult(x) => self.handle_series_lookup_result(x).await, - ConnSetCmd::CheckHealth(ts1) => self.handle_check_health(ts1).await, - ConnSetCmd::Shutdown => self.handle_shutdown().await, - }, - CaConnSetEvent::CaConnEvent((addr, ev)) => match ev.value { - CaConnEventValue::None => Ok(()), - CaConnEventValue::EchoTimeout => todo!(), - CaConnEventValue::ConnCommandResult(_) => todo!(), - CaConnEventValue::QueryItem(item) => { - self.storage_insert_tx.send(item).await?; - Ok(()) - } - CaConnEventValue::EndOfStream => self.handle_ca_conn_eos(addr).await, + ConnSetCmd::ChannelAdd(x) => self.handle_add_channel(x), + // ConnSetCmd::ChannelAddWithStatusId(x) => self.handle_add_channel_with_status_id(x), + // ConnSetCmd::ChannelAddWithAddr(x) => self.handle_add_channel_with_addr(x), + ConnSetCmd::ChannelRemove(x) => self.handle_remove_channel(x), + // ConnSetCmd::IocAddrQueryResult(x) => self.handle_ioc_query_result(x).await, + // ConnSetCmd::SeriesLookupResult(x) => self.handle_series_lookup_result(x).await, + ConnSetCmd::CheckHealth(ts1) => self.handle_check_health(ts1), + ConnSetCmd::Shutdown => self.handle_shutdown(), }, } } - async fn handle_series_lookup_result( - &mut self, - res: Result, - ) -> Result<(), Error> { + fn handle_ca_conn_event(&mut self, addr: SocketAddr, ev: CaConnEvent) -> Result<(), Error> { + match ev.value { + CaConnEventValue::None => Ok(()), + CaConnEventValue::EchoTimeout => todo!(), + CaConnEventValue::ConnCommandResult(_) => todo!(), + CaConnEventValue::QueryItem(item) => { + self.storage_insert_queue.push_back(item); + Ok(()) + } + CaConnEventValue::EndOfStream => self.handle_ca_conn_eos(addr), + } + } + + fn handle_series_lookup_result(&mut self, res: Result) -> Result<(), Error> { + if self.shutdown_stopping { + return Ok(()); + } trace3!("handle_series_lookup_result {res:?}"); match res { Ok(res) => { @@ -363,9 +379,7 @@ impl CaConnSet { local_epics_hostname: self.local_epics_hostname.clone(), cssid: ChannelStatusSeriesId::new(res.series.into_inner().id()), }; - self.connset_tx - .send(CaConnSetEvent::ConnSetCmd(ConnSetCmd::ChannelAddWithStatusId(add))) - .await?; + self.handle_add_channel_with_status_id(add)?; } Err(e) => { warn!("TODO handle error {e}"); @@ -374,9 +388,10 @@ impl CaConnSet { Ok(()) } - async fn handle_add_channel(&mut self, add: ChannelAdd) -> Result<(), Error> { + fn handle_add_channel(&mut self, add: ChannelAdd) -> Result<(), Error> { + trace3!("handle_add_channel {}", add.name); if self.shutdown_stopping { - debug!("handle_add_channel but shutdown_stopping"); + trace3!("handle_add_channel but shutdown_stopping"); return Ok(()); } // TODO should I add the transition through ActiveChannelState::Init as well? @@ -392,19 +407,19 @@ impl CaConnSet { scalar_type: CHANNEL_STATUS_DUMMY_SCALAR_TYPE, shape_dims: Vec::new(), tx: Box::pin(SeriesLookupSender { - tx: self.connset_tx.clone(), + tx: self.channel_info_res_tx.clone(), }), }; - self.channel_info_query_tx.send(item).await?; + self.channel_info_query_queue.push_back(item); Ok(()) } - async fn handle_add_channel_with_status_id(&mut self, add: ChannelAddWithStatusId) -> Result<(), Error> { + fn handle_add_channel_with_status_id(&mut self, add: ChannelAddWithStatusId) -> Result<(), Error> { + trace3!("handle_add_channel_with_status_id {}", add.name); if self.shutdown_stopping { debug!("handle_add_channel but shutdown_stopping"); return Ok(()); } - trace3!("handle_add_channel_with_status_id {add:?}"); let ch = Channel::new(add.name.clone()); if let Some(chst) = self.channel_states.inner().get_mut(&ch) { if let ChannelStateValue::Active(chst2) = &mut chst.value { @@ -418,7 +433,7 @@ impl CaConnSet { }, }; let qu = IocAddrQuery { name: add.name }; - self.search_tx.send(qu).await?; + self.find_ioc_query_queue.push_back(qu); } else { warn!("TODO have a status series id but no more channel"); } @@ -431,9 +446,9 @@ impl CaConnSet { Ok(()) } - async fn handle_add_channel_with_addr(&mut self, add: ChannelAddWithAddr) -> Result<(), Error> { + fn handle_add_channel_with_addr(&mut self, add: ChannelAddWithAddr) -> Result<(), Error> { if self.shutdown_stopping { - debug!("handle_add_channel but shutdown_stopping"); + trace3!("handle_add_channel but shutdown_stopping"); return Ok(()); } if !self.ca_conn_ress.contains_key(&add.addr) { @@ -442,11 +457,16 @@ impl CaConnSet { } let conn_ress = self.ca_conn_ress.get_mut(&add.addr).unwrap(); let cmd = ConnCommand::channel_add(add.name, add.cssid); - conn_ress.sender.send(cmd).await?; + // TODO not the nicest + let tx = conn_ress.sender.clone(); + tokio::spawn(async move { tx.send(cmd).await }); Ok(()) } - async fn handle_remove_channel(&mut self, add: ChannelRemove) -> Result<(), Error> { + fn handle_remove_channel(&mut self, add: ChannelRemove) -> Result<(), Error> { + if self.shutdown_stopping { + return Ok(()); + } let ch = Channel::new(add.name); if let Some(k) = self.channel_states.inner().get_mut(&ch) { match &k.value { @@ -483,7 +503,11 @@ impl CaConnSet { Ok(()) } - async fn handle_ioc_query_result(&mut self, res: VecDeque) -> Result<(), Error> { + fn handle_ioc_query_result(&mut self, res: VecDeque) -> Result<(), Error> { + if self.shutdown_stopping { + return Ok(()); + } + trace3!("handle_ioc_query_result"); for e in res { let ch = Channel::new(e.channel.clone()); if let Some(chst) = self.channel_states.inner().get_mut(&ch) { @@ -502,16 +526,15 @@ impl CaConnSet { cssid: status_series_id.clone(), local_epics_hostname: self.local_epics_hostname.clone(), }; - self.connset_tx - .send(CaConnSetEvent::ConnSetCmd(ConnSetCmd::ChannelAddWithAddr(add))) - .await?; let since = SystemTime::now(); state.inner = WithStatusSeriesIdStateInner::WithAddress { addr, state: WithAddressState::Unassigned { since }, - } + }; + // TODO move state change also in there? + self.handle_add_channel_with_addr(add)?; } else { - debug!("ioc not found {e:?}"); + trace3!("ioc not found {e:?}"); let since = SystemTime::now(); state.inner = WithStatusSeriesIdStateInner::UnknownAddress { since }; } @@ -528,50 +551,56 @@ impl CaConnSet { Ok(()) } - async fn handle_check_health(&mut self, ts1: Instant) -> Result<(), Error> { + fn handle_check_health(&mut self, ts1: Instant) -> Result<(), Error> { + if self.shutdown_stopping { + return Ok(()); + } debug!("TODO handle_check_health"); let ts2 = Instant::now(); let item = CaConnSetItem::Healthy(ts1, ts2); - self.connset_out_tx.send(item).await?; + self.connset_out_queue.push_back(item); Ok(()) } - async fn handle_shutdown(&mut self) -> Result<(), Error> { - debug!("TODO handle_shutdown"); - debug!("shutdown received"); + fn handle_shutdown(&mut self) -> Result<(), Error> { + if self.shutdown_stopping { + return Ok(()); + } + debug!("handle_shutdown"); self.shutdown_stopping = true; - self.search_tx.close(); + self.channel_info_query_sender.drop(); + self.find_ioc_query_sender.drop(); for (addr, res) in self.ca_conn_ress.iter() { let item = ConnCommand::shutdown(); - res.sender.send(item).await?; + // TODO not the nicest + let tx = res.sender.clone(); + tokio::spawn(async move { tx.send(item).await }); } Ok(()) } - async fn handle_ca_conn_eos(&mut self, addr: SocketAddr) -> Result<(), Error> { + fn handle_ca_conn_eos(&mut self, addr: SocketAddr) -> Result<(), Error> { debug!("handle_ca_conn_eos {addr}"); if let Some(e) = self.ca_conn_ress.remove(&addr) { - match e.jh.await { - Ok(Ok(())) => { - self.stats.ca_conn_task_join_done_ok.inc(); - debug!("CaConn {addr} finished well"); - } - Ok(Err(e)) => { - self.stats.ca_conn_task_join_done_err.inc(); - error!("CaConn {addr} task error: {e}"); - } - Err(e) => { - self.stats.ca_conn_task_join_err.inc(); - error!("CaConn {addr} join error: {e}"); - } - } + self.await_ca_conn_jhs.push_back((addr, e.jh)); } else { self.stats.ca_conn_task_eos_non_exist.inc(); warn!("end-of-stream received for non-existent CaConn {addr}"); } + debug!("still CaConn left {}", self.ca_conn_ress.len()); Ok(()) } + fn ready_for_end_of_stream(&self) -> bool { + if self.ca_conn_ress.len() > 0 { + false + } else if self.await_ca_conn_jhs.len() > 1 { + false + } else { + true + } + } + fn create_ca_conn(&self, add: ChannelAddWithAddr) -> Result { // TODO should we save this as event? let opts = CaConnOpts::default(); @@ -591,8 +620,8 @@ impl CaConnSet { ); let conn_tx = conn.conn_command_tx(); let conn_stats = conn.stats(); - let conn_item_tx = self.connset_tx.clone(); - let jh = tokio::spawn(Self::ca_conn_item_merge(conn, conn_item_tx, addr_v4)); + let ca_conn_res_tx = self.ca_conn_res_tx.clone(); + let jh = tokio::spawn(Self::ca_conn_item_merge(conn, ca_conn_res_tx, addr)); let ca_conn_res = CaConnRes { state: CaConnState::new(CaConnStateValue::Fresh), sender: conn_tx, @@ -604,8 +633,8 @@ impl CaConnSet { async fn ca_conn_item_merge( conn: CaConn, - conn_item_tx: Sender, - addr: SocketAddrV4, + tx: Sender<(SocketAddr, CaConnEvent)>, + addr: SocketAddr, ) -> Result<(), Error> { debug!("ca_conn_consumer begin {}", addr); let stats = conn.stats(); @@ -615,9 +644,7 @@ impl CaConnSet { match item { Ok(item) => { stats.conn_item_count.inc(); - conn_item_tx - .send(CaConnSetEvent::CaConnEvent((SocketAddr::V4(addr), item))) - .await?; + tx.send((addr, item)).await?; } Err(e) => { error!("CaConn gives error: {e:?}"); @@ -626,15 +653,14 @@ impl CaConnSet { } } debug!("ca_conn_consumer ended {}", addr); - conn_item_tx - .send(CaConnSetEvent::CaConnEvent(( - SocketAddr::V4(addr), - CaConnEvent { - ts: Instant::now(), - value: CaConnEventValue::EndOfStream, - }, - ))) - .await?; + tx.send(( + addr, + CaConnEvent { + ts: Instant::now(), + value: CaConnEventValue::EndOfStream, + }, + )) + .await?; debug!("ca_conn_consumer signaled {}", addr); ret } @@ -694,7 +720,7 @@ impl CaConnSet { rxs } - pub async fn wait_stopped(&self) -> Result<(), Error> { + async fn wait_stopped(&self) -> Result<(), Error> { warn!("Lock for wait_stopped"); // let mut g = self.ca_conn_ress.lock().await; // let mm = std::mem::replace(&mut *g, BTreeMap::new()); @@ -948,3 +974,180 @@ impl CaConnSet { (search_pending,) } } + +impl Stream for CaConnSet { + type Item = CaConnSetItem; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + use Poll::*; + debug!("CaConnSet::poll"); + loop { + let mut have_pending = false; + + if let Some(item) = self.connset_out_queue.pop_front() { + break Ready(Some(item)); + } + + if let Some((addr, jh)) = self.await_ca_conn_jhs.front_mut() { + match jh.poll_unpin(cx) { + Ready(x) => { + let addr = *addr; + self.await_ca_conn_jhs.pop_front(); + debug!("await_ca_conn_jhs still jhs left {}", self.await_ca_conn_jhs.len()); + match x { + Ok(Ok(())) => { + self.stats.ca_conn_task_join_done_ok.inc(); + debug!("CaConn {addr} finished well"); + } + Ok(Err(e)) => { + self.stats.ca_conn_task_join_done_err.inc(); + error!("CaConn {addr} task error: {e}"); + } + Err(e) => { + self.stats.ca_conn_task_join_err.inc(); + error!("CaConn {addr} join error: {e}"); + } + } + } + Pending => {} + } + } + + if self.storage_insert_sender.is_idle() { + if let Some(item) = self.storage_insert_queue.pop_front() { + self.storage_insert_sender.send(item); + } + } + if self.storage_insert_sender.is_sending() { + match self.storage_insert_sender.poll_unpin(cx) { + Ready(Ok(())) => continue, + Ready(Err(e)) => { + let e = Error::with_msg_no_trace("can not send into channel"); + error!("{e}"); + break Ready(Some(CaConnSetItem::Error(e))); + } + Pending => { + have_pending = true; + } + } + } + + if self.find_ioc_query_sender.is_idle() { + if let Some(item) = self.find_ioc_query_queue.pop_front() { + self.find_ioc_query_sender.send(item); + } + } + if self.find_ioc_query_sender.is_sending() { + match self.find_ioc_query_sender.poll_unpin(cx) { + Ready(Ok(())) => continue, + Ready(Err(e)) => { + let e = Error::with_msg_no_trace("can not send into channel"); + error!("{e}"); + break Ready(Some(CaConnSetItem::Error(e))); + } + Pending => { + have_pending = true; + } + } + } + + if self.channel_info_query_sender.is_idle() { + if let Some(item) = self.channel_info_query_queue.pop_front() { + self.channel_info_query_sender.send(item); + } + } + if self.channel_info_query_sender.is_sending() { + match self.channel_info_query_sender.poll_unpin(cx) { + Ready(Ok(())) => continue, + Ready(Err(e)) => { + let e = Error::with_msg_no_trace("can not send into channel"); + error!("{e}"); + break Ready(Some(CaConnSetItem::Error(e))); + } + Pending => { + have_pending = true; + } + } + } + + let item = match self.find_ioc_res_rx.poll_next_unpin(cx) { + Ready(Some(x)) => match self.handle_ioc_query_result(x) { + Ok(()) => continue, + Err(e) => Ready(Some(CaConnSetItem::Error(e))), + }, + Ready(None) => Ready(None), + Pending => { + have_pending = true; + Pending + } + }; + match item { + Ready(Some(x)) => break Ready(Some(x)), + _ => {} + } + + let item = match self.ca_conn_res_rx.poll_next_unpin(cx) { + Ready(Some((addr, ev))) => match self.handle_ca_conn_event(addr, ev) { + Ok(()) => continue, + Err(e) => Ready(Some(CaConnSetItem::Error(e))), + }, + Ready(None) => Ready(None), + Pending => { + have_pending = true; + Pending + } + }; + match item { + Ready(Some(x)) => break Ready(Some(x)), + _ => {} + } + + let item = match self.channel_info_res_rx.poll_next_unpin(cx) { + Ready(Some(x)) => match self.handle_series_lookup_result(x) { + Ok(()) => continue, + Err(e) => Ready(Some(CaConnSetItem::Error(e))), + }, + Ready(None) => Ready(None), + Pending => { + have_pending = true; + Pending + } + }; + match item { + Ready(Some(x)) => break Ready(Some(x)), + _ => {} + } + + let item = match self.connset_inp_rx.poll_next_unpin(cx) { + Ready(Some(x)) => match self.handle_event(x) { + Ok(()) => continue, + Err(e) => Ready(Some(CaConnSetItem::Error(e))), + }, + Ready(None) => Ready(None), + Pending => { + have_pending = true; + Pending + } + }; + match item { + Ready(Some(x)) => break Ready(Some(x)), + _ => {} + } + + break if have_pending { + if self.shutdown_stopping && self.ready_for_end_of_stream() { + Ready(None) + } else { + Pending + } + } else if self.shutdown_stopping && self.ready_for_end_of_stream() { + debug!("nothing to do but shutdown"); + Ready(None) + } else { + let e = Error::with_msg_no_trace("connset not pending and not shutdown"); + error!("{e}"); + Ready(Some(CaConnSetItem::Error(e))) + }; + } + } +} diff --git a/netfetch/src/ca/connset_input_merge.rs b/netfetch/src/ca/connset_input_merge.rs index 86bb370..3662e9e 100644 --- a/netfetch/src/ca/connset_input_merge.rs +++ b/netfetch/src/ca/connset_input_merge.rs @@ -2,6 +2,8 @@ use super::connset::CaConnSetEvent; use super::findioc::FindIocRes; use crate::ca::connset::ConnSetCmd; use async_channel::Receiver; +use dbpg::seriesbychannel::ChannelInfoResult; +use err::Error; use futures_util::StreamExt; use std::collections::VecDeque; use std::pin::Pin; @@ -11,13 +13,19 @@ use std::task::Poll; pub struct InputMerge { inp1: Option>, inp2: Option>>, + inp3: Option>>, } impl InputMerge { - pub fn new(inp1: Receiver, inp2: Receiver>) -> Self { + pub fn new( + inp1: Receiver, + inp2: Receiver>, + inp3: Receiver>, + ) -> Self { Self { inp1: Some(inp1), inp2: Some(inp2), + inp3: Some(inp3), } } @@ -33,18 +41,35 @@ impl futures_util::Stream for InputMerge { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; - let mut poll_next = false; - let ret = if let Some(inp) = &mut self.inp2 { - match inp.poll_next_unpin(cx) { - Ready(Some(x)) => Some(CaConnSetEvent::ConnSetCmd(ConnSetCmd::IocAddrQueryResult(x))), - Ready(None) => { - self.inp2 = None; - None + let ret = { + if let Some(inp) = &mut self.inp3 { + match inp.poll_next_unpin(cx) { + Ready(Some(x)) => Some(CaConnSetEvent::ConnSetCmd(todo!())), + Ready(None) => { + self.inp2 = None; + None + } + Pending => None, } - Pending => None, + } else { + None } + }; + let ret = if let Some(x) = ret { + Some(x) } else { - None + if let Some(inp) = &mut self.inp2 { + match inp.poll_next_unpin(cx) { + Ready(Some(x)) => Some(CaConnSetEvent::ConnSetCmd(todo!())), + Ready(None) => { + self.inp2 = None; + None + } + Pending => None, + } + } else { + None + } }; if let Some(x) = ret { Ready(Some(x)) diff --git a/netfetch/src/senderpolling.rs b/netfetch/src/senderpolling.rs index 26d8660..39c8246 100644 --- a/netfetch/src/senderpolling.rs +++ b/netfetch/src/senderpolling.rs @@ -22,7 +22,7 @@ pub struct SenderPolling where T: 'static, { - sender: Box>, + sender: Option>>, sender_ptr: NonNull>, fut: Option>, _pin: PhantomPinned, @@ -33,17 +33,17 @@ unsafe impl core::marker::Send for SenderPolling where T: core::marker::Se impl SenderPolling { pub fn new(sender: Sender) -> Self { let mut ret = Self { - sender: Box::new(sender), + sender: Some(Box::new(sender)), sender_ptr: NonNull::dangling(), fut: None, _pin: PhantomPinned, }; - ret.sender_ptr = NonNull::from(ret.sender.as_ref()); + ret.sender_ptr = NonNull::from(ret.sender.as_ref().unwrap().as_ref()); ret } pub fn is_idle(&self) -> bool { - self.fut.is_none() + self.sender.is_some() && self.fut.is_none() } pub fn is_sending(&self) -> bool { @@ -51,19 +51,30 @@ impl SenderPolling { } pub fn send_pin(self: Pin<&mut Self>, item: T) { - let (tx, fut) = unsafe { - let x = Pin::get_unchecked_mut(self); - (x.sender_ptr.as_mut(), &mut x.fut) - }; - let s = tx.send(item); - *fut = Some(s); + unsafe { Pin::get_unchecked_mut(self) }.send(item) } pub fn send(&mut self, item: T) { + if self.sender.is_none() { + // panic!("send on dropped sender"); + // TODO + return; + } let sender = unsafe { self.sender_ptr.as_mut() }; let s = sender.send(item); self.fut = Some(s); } + + pub fn close(&self) { + if let Some(tx) = self.sender.as_ref() { + tx.close(); + } + } + + pub fn drop(&mut self) { + self.sender = None; + self.fut = None; + } } impl Future for SenderPolling { diff --git a/scywr/src/iteminsertqueue.rs b/scywr/src/iteminsertqueue.rs index fa16c5e..84179d1 100644 --- a/scywr/src/iteminsertqueue.rs +++ b/scywr/src/iteminsertqueue.rs @@ -14,8 +14,11 @@ use scylla::transport::errors::QueryError; use series::SeriesId; use stats::CaConnStats; use std::net::SocketAddrV4; +use std::sync::atomic; +use std::sync::atomic::AtomicU64; use std::sync::Mutex; use std::time::Duration; +use std::time::Instant; use std::time::SystemTime; #[derive(Debug, ThisError)] @@ -377,6 +380,8 @@ where Ok(()) } +static warn_last: AtomicU64 = AtomicU64::new(0); + pub async fn insert_item( item: InsertItem, ttl_index: Duration, @@ -423,8 +428,24 @@ pub async fn insert_item( I32(val) => insert_scalar_gen(par, val, &data_store.qu_insert_scalar_i32, &data_store).await?, F32(val) => insert_scalar_gen(par, val, &data_store.qu_insert_scalar_f32, &data_store).await?, F64(val) => insert_scalar_gen(par, val, &data_store.qu_insert_scalar_f64, &data_store).await?, - String(_) => warn!("TODO string insert"), - Bool(_v) => warn!("TODO bool insert"), + String(val) => { + let ts = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .map_or(0, |x| x.as_secs()); + if ts > warn_last.load(atomic::Ordering::Acquire) + 10 { + warn_last.store(ts, atomic::Ordering::Release); + warn!("TODO string insert {val}"); + } + } + Bool(val) => { + let ts = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .map_or(0, |x| x.as_secs()); + if ts > warn_last.load(atomic::Ordering::Acquire) + 10 { + warn_last.store(ts, atomic::Ordering::Release); + warn!("TODO bool insert {val}"); + } + } } } Array(val) => {