diff --git a/daqingest/src/daemon.rs b/daqingest/src/daemon.rs index d65b173..7210c88 100644 --- a/daqingest/src/daemon.rs +++ b/daqingest/src/daemon.rs @@ -4,17 +4,12 @@ pub mod inserthook; use async_channel::Receiver; use async_channel::Sender; use async_channel::WeakReceiver; -use dbpg::seriesbychannel::ChannelInfoQuery; use err::Error; use log::*; -use netfetch::ca::conn::CaConnEvent; -use netfetch::ca::conn::ConnCommand; use netfetch::ca::connset::CaConnSet; use netfetch::ca::connset::CaConnSetCtrl; use netfetch::ca::connset::CaConnSetItem; -use netfetch::ca::findioc::FindIocRes; use netfetch::ca::IngestCommons; -use netfetch::ca::SlowWarnable; use netfetch::conf::CaIngestOpts; use netfetch::daemon_common::Channel; use netfetch::daemon_common::DaemonEvent; @@ -25,14 +20,8 @@ use netpod::ScyllaConfig; use scywr::insertworker::Ttls; use scywr::iteminsertqueue as scywriiq; use scywr::store::DataStore; -use scywriiq::ChannelStatus; -use scywriiq::ChannelStatusItem; -use scywriiq::CommonInsertItemQueue; -use scywriiq::ConnectionStatus; -use scywriiq::ConnectionStatusItem; use scywriiq::QueryItem; use serde::Serialize; -use series::series::Existence; use series::ChannelStatusSeriesId; use series::SeriesId; use stats::DaemonStats; @@ -49,8 +38,6 @@ use std::time::Instant; use std::time::SystemTime; use taskrun::tokio; use tokio::task::JoinHandle; -use tracing::info_span; -use tracing::Instrument; const CA_CONN_INSERT_QUEUE_MAX: usize = 256; @@ -90,7 +77,7 @@ pub struct Daemon { count_unassigned: usize, count_assigned: usize, last_status_print: SystemTime, - insert_workers_jh: Vec>, + insert_workers_jh: Vec>>, ingest_commons: Arc, caconn_last_channel_check: Instant, stats: Arc, @@ -98,7 +85,6 @@ pub struct Daemon { insert_rx_weak: WeakReceiver, connset_ctrl: CaConnSetCtrl, connset_status_last: Instant, - query_item_tx: Sender, } impl Daemon { @@ -114,33 +100,38 @@ impl Daemon { .await .map_err(|e| Error::with_msg_no_trace(e.to_string()))?; - let common_insert_item_queue = Arc::new(CommonInsertItemQueue::new(opts.insert_item_queue_cap)); + let (query_item_tx, query_item_rx) = async_channel::bounded(opts.insert_item_queue_cap); let insert_queue_counter = Arc::new(AtomicUsize::new(0)); // Insert queue hook - let rx = inserthook::active_channel_insert_hook(common_insert_item_queue.receiver().unwrap()); - let common_insert_item_queue_2 = rx; + let query_item_rx = inserthook::active_channel_insert_hook(query_item_rx); let conn_set_ctrl = CaConnSet::start( opts.backend.clone(), opts.local_epics_hostname.clone(), - common_insert_item_queue.sender().unwrap().inner().clone(), + query_item_tx, channel_info_query_tx, opts.pgconf.clone(), ); // TODO remove tokio::spawn({ - let rx = conn_set_ctrl.rx.clone(); + let rx = conn_set_ctrl.receiver().clone(); let tx = daemon_ev_tx.clone(); async move { loop { match rx.recv().await { Ok(item) => { let item = DaemonEvent::CaConnSetItem(item); - tx.send(item).await; + if let Err(_) = tx.send(item).await { + debug!("CaConnSet to Daemon adapter: tx closed, break"); + break; + } + } + Err(_) => { + debug!("CaConnSet to Daemon adapter: rx done, break"); + break; } - Err(e) => break, } } } @@ -150,7 +141,6 @@ impl Daemon { pgconf: Arc::new(opts.pgconf.clone()), backend: opts.backend().into(), local_epics_hostname: opts.local_epics_hostname.clone(), - insert_item_queue: common_insert_item_queue.clone(), data_store: datastore.clone(), insert_ivl_min: Arc::new(AtomicU64::new(0)), extra_inserts_conf: tokio::sync::Mutex::new(ExtraInsertsConf::new()), @@ -166,11 +156,11 @@ impl Daemon { let store_stats = Arc::new(stats::CaConnStats::new()); let ttls = opts.ttls.clone(); let insert_worker_opts = Arc::new(ingest_commons.as_ref().into()); - let jh_insert_workers = scywr::insertworker::spawn_scylla_insert_workers( + let insert_workers_jh = scywr::insertworker::spawn_scylla_insert_workers( opts.scyconf.clone(), opts.insert_scylla_sessions, opts.insert_worker_count, - common_insert_item_queue_2.clone(), + query_item_rx.clone(), insert_worker_opts, store_stats.clone(), use_rate_limit_queue, @@ -223,15 +213,14 @@ impl Daemon { count_unassigned: 0, count_assigned: 0, last_status_print: SystemTime::now(), - insert_workers_jh: jh_insert_workers, + insert_workers_jh, ingest_commons, caconn_last_channel_check: Instant::now(), stats: Arc::new(DaemonStats::new()), shutting_down: false, - insert_rx_weak: common_insert_item_queue_2.downgrade(), + insert_rx_weak: query_item_rx.downgrade(), connset_ctrl: conn_set_ctrl, connset_status_last: Instant::now(), - query_item_tx: common_insert_item_queue.sender().unwrap().inner().clone(), }; Ok(ret) } @@ -240,10 +229,6 @@ impl Daemon { &self.stats } - fn allow_create_new_connections(&self) -> bool { - !self.shutting_down - } - async fn check_caconn_chans(&mut self) -> Result<(), Error> { if self.caconn_last_channel_check.elapsed() > CHANNEL_CHECK_INTERVAL { self.connset_ctrl.check_health().await?; @@ -252,24 +237,17 @@ impl Daemon { Ok(()) } - async fn ca_conn_send_shutdown(&mut self) -> Result<(), Error> { - self.connset_ctrl.shutdown().await?; - Ok(()) - } - async fn handle_timer_tick(&mut self) -> Result<(), Error> { if self.shutting_down { - let sa1 = self.ingest_commons.insert_item_queue.sender_count(); - let sa2 = self.ingest_commons.insert_item_queue.sender_count_2(); let nworkers = self .ingest_commons .insert_workers_running .load(atomic::Ordering::Acquire); - let nitems = self.insert_rx_weak.upgrade().map(|x| x.len()); - info!( - "qu senders A {:?} {:?} nworkers {} nitems {:?}", - sa1, sa2, nworkers, nitems - ); + let nitems = self + .insert_rx_weak + .upgrade() + .map(|x| (x.sender_count(), x.receiver_count(), x.len())); + info!("qu senders A nworkers {} nitems {:?}", nworkers, nitems); if nworkers == 0 { info!("goodbye"); std::process::exit(0); @@ -330,163 +308,7 @@ impl Daemon { } async fn handle_channel_remove(&mut self, ch: Channel) -> Result<(), Error> { - warn!("TODO handle_channel_remove"); - #[cfg(DISABLED)] - if let Some(k) = self.channel_states.get_mut(&ch) { - match &k.value { - ChannelStateValue::Active(j) => match j { - ActiveChannelState::Init { .. } => { - k.value = ChannelStateValue::ToRemove { addr: None }; - } - ActiveChannelState::WaitForStatusSeriesId { .. } => { - k.value = ChannelStateValue::ToRemove { addr: None }; - } - ActiveChannelState::WithStatusSeriesId { - status_series_id: _, - state, - } => match state.inner { - WithStatusSeriesIdStateInner::UnknownAddress { .. } => { - k.value = ChannelStateValue::ToRemove { addr: None }; - } - WithStatusSeriesIdStateInner::SearchPending { .. } => { - k.value = ChannelStateValue::ToRemove { addr: None }; - } - WithStatusSeriesIdStateInner::WithAddress { addr, .. } => { - k.value = ChannelStateValue::ToRemove { - addr: Some(addr.clone()), - }; - } - WithStatusSeriesIdStateInner::NoAddress { .. } => { - k.value = ChannelStateValue::ToRemove { addr: None }; - } - }, - }, - ChannelStateValue::ToRemove { .. } => {} - } - } - Ok(()) - } - - async fn handle_search_done(&mut self, item: Result, Error>) -> Result<(), Error> { - warn!("TODO handle_search_done"); - //debug!("handle SearchDone: {res:?}"); - // let allow_create_new_connections = self.allow_create_new_connections(); - // let tsnow = SystemTime::now(); - #[cfg(DISABLED)] - match item { - Ok(ress) => { - SEARCH_ANS_COUNT.fetch_add(ress.len(), atomic::Ordering::AcqRel); - for res in ress { - if let Some(addr) = &res.addr { - self.stats.ioc_search_some_inc(); - - let ch = Channel::new(res.channel); - if let Some(st) = self.channel_states.get_mut(&ch) { - match &st.value { - ChannelStateValue::Active(st2) => match st2 { - ActiveChannelState::Init { .. } => {} - ActiveChannelState::WaitForStatusSeriesId { .. } => {} - ActiveChannelState::WithStatusSeriesId { - status_series_id, - state, - } => match state.inner { - WithStatusSeriesIdStateInner::SearchPending { since, did_send: _ } => { - if allow_create_new_connections { - let dt = tsnow.duration_since(since).unwrap(); - if dt > SEARCH_PENDING_TIMEOUT_WARN { - warn!( - " FOUND {:5.0} {:5.0} {addr}", - 1e3 * dt.as_secs_f32(), - 1e3 * res.dt.as_secs_f32() - ); - } - let stnew = - ChannelStateValue::Active(ActiveChannelState::WithStatusSeriesId { - status_series_id: status_series_id.clone(), - state: WithStatusSeriesIdState { - inner: WithStatusSeriesIdStateInner::WithAddress { - addr: addr.clone(), - state: WithAddressState::Unassigned { - assign_at: tsnow, - }, - }, - }, - }); - st.value = stnew; - } else { - // Emit something here? - } - } - _ => { - warn!( - "address found, but state for {ch:?} is not SearchPending: {:?}", - st.value - ); - } - }, - }, - ChannelStateValue::ToRemove { addr: _ } => {} - } - } else { - warn!("can not find channel state for {ch:?}"); - } - } else { - //debug!("no addr from search in {res:?}"); - let ch = Channel::new(res.channel); - if let Some(st) = self.channel_states.get_mut(&ch) { - let mut unexpected_state = true; - match &st.value { - ChannelStateValue::Active(st2) => match st2 { - ActiveChannelState::Init { .. } => {} - ActiveChannelState::WaitForStatusSeriesId { .. } => {} - ActiveChannelState::WithStatusSeriesId { - status_series_id, - state: st3, - } => match &st3.inner { - WithStatusSeriesIdStateInner::UnknownAddress { .. } => {} - WithStatusSeriesIdStateInner::SearchPending { since, .. } => { - unexpected_state = false; - let dt = tsnow.duration_since(*since).unwrap(); - if dt > SEARCH_PENDING_TIMEOUT_WARN { - warn!( - "NOT FOUND {:5.0} {:5.0}", - 1e3 * dt.as_secs_f32(), - 1e3 * res.dt.as_secs_f32() - ); - } - st.value = - ChannelStateValue::Active(ActiveChannelState::WithStatusSeriesId { - status_series_id: status_series_id.clone(), - state: WithStatusSeriesIdState { - inner: WithStatusSeriesIdStateInner::NoAddress { since: tsnow }, - }, - }); - } - WithStatusSeriesIdStateInner::WithAddress { .. } => {} - WithStatusSeriesIdStateInner::NoAddress { .. } => {} - }, - }, - ChannelStateValue::ToRemove { .. } => {} - } - if unexpected_state { - warn!("no address, but state for {ch:?} is not SearchPending: {:?}", st.value); - } - } else { - warn!("can not find channel state for {ch:?}"); - } - } - } - } - Err(e) => { - self.stats.ioc_search_err_inc(); - error!("error from search: {e}"); - } - } - Ok(()) - } - - async fn handle_ca_conn_done(&mut self, conn_addr: SocketAddrV4) -> Result<(), Error> { - warn!("TODO handle_ca_conn_done {conn_addr:?}"); + self.connset_ctrl.remove_channel(ch.id().into()).await?; Ok(()) } @@ -541,48 +363,6 @@ impl Daemon { Ok(()) } - async fn handle_ca_conn_event(&mut self, addr: SocketAddrV4, item: CaConnEvent) -> Result<(), Error> { - self.stats.event_ca_conn_inc(); - use netfetch::ca::conn::CaConnEventValue::*; - match item.value { - None => { - // TODO count, maybe reduce. - Ok(()) - } - EchoTimeout => { - self.stats.ca_echo_timeout_total_inc(); - error!("TODO on EchoTimeout remove the CaConn and reset channels"); - Ok(()) - } - ConnCommandResult(item) => { - self.stats.todo_mark_inc(); - use netfetch::ca::conn::ConnCommandResultKind::*; - match &item.kind { - CheckHealth => { - todo!("TODO collect the CaConn health check in CaConnSet"); - #[cfg(DISABLED)] - if let Some(st) = self.connection_states.get_mut(&addr) { - self.stats.ca_conn_status_feedback_recv_inc(); - st.last_feedback = Instant::now(); - Ok(()) - } else { - self.stats.ca_conn_status_feedback_no_dst_inc(); - Ok(()) - } - } - } - } - QueryItem(item) => { - self.query_item_tx.send(item).await?; - Ok(()) - } - EndOfStream => { - self.stats.ca_conn_status_done_inc(); - self.handle_ca_conn_done(addr).await - } - } - } - async fn handle_ca_conn_set_item(&mut self, item: CaConnSetItem) -> Result<(), Error> { use CaConnSetItem::*; match item { @@ -647,8 +427,6 @@ impl Daemon { } ChannelAdd(ch) => self.handle_channel_add(ch).await, ChannelRemove(ch) => self.handle_channel_remove(ch).await, - SearchDone(item) => self.handle_search_done(item).await, - CaConnEvent(addr, item) => self.handle_ca_conn_event(addr, item).await, CaConnSetItem(item) => self.handle_ca_conn_set_item(item).await, Shutdown => self.handle_shutdown().await, }; @@ -696,14 +474,17 @@ impl Daemon { taskrun::spawn(ticker); } - pub async fn daemon(&mut self) -> Result<(), Error> { + pub async fn daemon(mut self) -> Result<(), Error> { Self::spawn_ticker(self.tx.clone(), self.stats.clone()); loop { + if self.shutting_down { + break; + } match self.rx.recv().await { - Ok(item) => match self.handle_event(item).await { - Ok(_) => {} + Ok(item) => match self.handle_event(item.clone()).await { + Ok(()) => {} Err(e) => { - error!("daemon: {e}"); + error!("fn daemon: error from handle_event {item:?} {e}"); break; } }, @@ -713,8 +494,23 @@ impl Daemon { } } } + warn!("TODO should not have to close the channel"); warn!("TODO wait for insert workers"); - let _ = &self.insert_workers_jh; + while let Some(jh) = self.insert_workers_jh.pop() { + match jh.await.map_err(Error::from_string) { + Ok(x) => match x { + Ok(()) => { + debug!("joined insert worker"); + } + Err(e) => { + error!("joined insert worker, error {e}"); + } + }, + Err(e) => { + error!("insert worker join error {e}"); + } + } + } info!("daemon done"); Ok(()) } @@ -778,7 +574,7 @@ pub async fn run(opts: CaIngestOpts, channels: Vec) -> Result<(), Error> insert_worker_count: opts.insert_worker_count(), insert_scylla_sessions: opts.insert_scylla_sessions(), }; - let mut daemon = Daemon::new(opts2).await?; + let daemon = Daemon::new(opts2).await?; let tx = daemon.tx.clone(); let daemon_stats = daemon.stats().clone(); @@ -789,16 +585,13 @@ pub async fn run(opts: CaIngestOpts, channels: Vec) -> Result<(), Error> tokio::task::spawn(fut) }; - let daemon_jh = taskrun::spawn(async move { - // TODO handle Err - daemon.daemon().await.unwrap(); - }); + let daemon_jh = taskrun::spawn(daemon.daemon()); for s in &channels { let ch = Channel::new(s.into()); tx.send(DaemonEvent::ChannelAdd(ch)).await?; } - info!("configured channels applied"); - daemon_jh.await.unwrap(); + debug!("{} configured channels applied", channels.len()); + daemon_jh.await.map_err(|e| Error::with_msg_no_trace(e.to_string()))??; if false { metrics_jh.await.unwrap(); } diff --git a/netfetch/src/ca.rs b/netfetch/src/ca.rs index 810b5c2..f1a127d 100644 --- a/netfetch/src/ca.rs +++ b/netfetch/src/ca.rs @@ -6,8 +6,6 @@ pub mod proto; pub mod search; pub mod statemap; -use self::connset::CaConnSetCtrl; -use crate::ca::connset::CaConnSet; use crate::metrics::ExtraInsertsConf; use crate::rt::TokMx; use futures_util::Future; @@ -15,7 +13,6 @@ use futures_util::FutureExt; use log::*; use netpod::Database; use scywr::insertworker::InsertWorkerOpts; -use scywr::iteminsertqueue::CommonInsertItemQueue; use scywr::store::DataStore; use stats::CaConnStatsAgg; use std::pin::Pin; @@ -39,7 +36,6 @@ pub struct IngestCommons { pub pgconf: Arc, pub backend: String, pub local_epics_hostname: String, - pub insert_item_queue: Arc, pub data_store: Arc, pub insert_ivl_min: Arc, pub extra_inserts_conf: TokMx, diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index e24fbd2..281e38b 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -262,6 +262,7 @@ enum CaConnState { PeerReady, Wait(Pin + Send>>), Shutdown, + EndOfStream, } fn wait_fut(dt: u64) -> Pin + Send>> { @@ -852,11 +853,11 @@ impl CaConn { fn check_channels_alive(&mut self) -> Result<(), Error> { let tsnow = Instant::now(); - trace!("CheckChannelsAlive {addr:?}", addr = &self.remote_addr_dbg); + trace!("check_channels_alive {addr:?}", addr = &self.remote_addr_dbg); if self.ioc_ping_last.elapsed() > Duration::from_millis(20000) { if let Some(started) = self.ioc_ping_start { if started.elapsed() > Duration::from_millis(4000) { - warn!("Echo timeout {addr:?}", addr = self.remote_addr_dbg); + warn!("pong timeout {addr:?}", addr = self.remote_addr_dbg); let item = CaConnEvent { ts: Instant::now(), value: CaConnEventValue::EchoTimeout, @@ -867,11 +868,11 @@ impl CaConn { } else { self.ioc_ping_start = Some(Instant::now()); if let Some(proto) = &mut self.proto { - debug!("push echo to {}", self.remote_addr_dbg); + debug!("ping to {}", self.remote_addr_dbg); let msg = CaMsg { ty: CaMsgTy::Echo }; proto.push_out(msg); } else { - warn!("can not push echo, no proto {}", self.remote_addr_dbg); + warn!("can not ping {} no proto", self.remote_addr_dbg); self.trigger_shutdown(ChannelStatusClosedReason::NoProtocol); } } @@ -1630,6 +1631,7 @@ impl CaConn { Pending => Ok(Some(Pending)), }, CaConnState::Shutdown => Ok(None), + CaConnState::EndOfStream => Ok(None), } } @@ -1725,8 +1727,8 @@ impl CaConn { Ok(()) } - fn outgoing_queues_empty(&self) -> bool { - self.channel_info_query_queue.is_empty() && !self.channel_info_query_sending.is_sending() + fn queues_async_out_flushed(&self) -> bool { + self.channel_info_query_queue.is_empty() && self.channel_info_query_sending.is_idle() } fn attempt_flush_channel_info_query(mut self: Pin<&mut Self>, cx: &mut Context) -> Result<(), Error> { @@ -1741,7 +1743,7 @@ impl CaConn { } } else if let Some(item) = self.channel_info_query_queue.pop_front() { let sd = &mut self.channel_info_query_sending; - sd.send2(item); + sd.send(item); continue; } else { Ok(()) @@ -1758,7 +1760,9 @@ impl Stream for CaConn { self.stats.caconn_poll_count_inc(); loop { let mut have_pending = false; - break if let Err(e) = self.as_mut().handle_own_ticker(cx) { + break if let CaConnState::EndOfStream = self.state { + Ready(None) + } else if let Err(e) = self.as_mut().handle_own_ticker(cx) { Ready(Some(Err(e))) } else if let Some(item) = self.cmd_res_queue.pop_front() { let item = CaConnEvent { @@ -1779,21 +1783,17 @@ impl Stream for CaConn { } else if let Ready(Some(Err(e))) = self.as_mut().handle_conn_command(cx) { Ready(Some(Err(e))) } else if let Some(item) = { - if self.is_shutdown() { - None - } else { - match self.loop_inner(cx) { - // TODO what does this mean: should we re-loop or yield something? - Ok(Some(Ready(()))) => None, - // This is the last step, so we yield Pending. - // But in general, this does not compose well when we would add another step. - Ok(Some(Pending)) => { - have_pending = true; - None - } - Ok(None) => None, - Err(e) => Some(Err(e)), + match self.loop_inner(cx) { + // TODO what does this mean: should we re-loop or yield something? + Ok(Some(Ready(()))) => None, + // This is the last step, so we yield Pending. + // But in general, this does not compose well when we would add another step. + Ok(Some(Pending)) => { + have_pending = true; + None } + Ok(None) => None, + Err(e) => Some(Err(e)), } } { Ready(Some(item)) @@ -1804,7 +1804,10 @@ impl Stream for CaConn { ts: Instant::now(), value: CaConnEventValue::None, }; - if have_pending { + if self.is_shutdown() && self.queues_async_out_flushed() { + self.state = CaConnState::EndOfStream; + Ready(None) + } else if have_pending { Pending } else { continue; diff --git a/netfetch/src/ca/connset.rs b/netfetch/src/ca/connset.rs index ea4ef75..dfaea0d 100644 --- a/netfetch/src/ca/connset.rs +++ b/netfetch/src/ca/connset.rs @@ -109,12 +109,18 @@ pub struct ChannelAdd { local_epics_hostname: String, } +#[derive(Debug, Clone)] +pub struct ChannelRemove { + name: String, +} + #[derive(Debug)] pub enum ConnSetCmd { SeriesLookupResult(Result), ChannelAdd(ChannelAdd), ChannelAddWithStatusId(ChannelAddWithStatusId), ChannelAddWithAddr(ChannelAddWithAddr), + ChannelRemove(ChannelRemove), IocAddrQueryResult(VecDeque), CheckHealth, Shutdown, @@ -126,18 +132,22 @@ pub enum CaConnSetEvent { CaConnEvent((SocketAddr, CaConnEvent)), } -#[derive(Debug)] +#[derive(Debug, Clone)] pub enum CaConnSetItem { Healthy, } -#[derive(Clone)] pub struct CaConnSetCtrl { tx: Sender, - pub rx: Receiver, + rx: Receiver, + jh: JoinHandle>, } impl CaConnSetCtrl { + pub fn receiver(&self) -> Receiver { + self.rx.clone() + } + pub async fn add_channel(&self, backend: String, name: String, local_epics_hostname: String) -> Result<(), Error> { let cmd = ChannelAdd { backend, @@ -149,6 +159,13 @@ impl CaConnSetCtrl { Ok(()) } + pub async fn remove_channel(&self, name: String) -> Result<(), Error> { + let cmd = ChannelRemove { name }; + let cmd = ConnSetCmd::ChannelRemove(cmd); + self.tx.send(CaConnSetEvent::ConnSetCmd(cmd)).await?; + Ok(()) + } + pub async fn shutdown(&self) -> Result<(), Error> { let cmd = ConnSetCmd::Shutdown; self.tx.send(CaConnSetEvent::ConnSetCmd(cmd)).await?; @@ -160,6 +177,11 @@ impl CaConnSetCtrl { self.tx.send(CaConnSetEvent::ConnSetCmd(cmd)).await?; Ok(()) } + + pub async fn join(self) -> Result<(), Error> { + self.jh.await.map_err(|e| Error::with_msg_no_trace(e.to_string()))??; + Ok(()) + } } #[derive(Debug)] @@ -198,6 +220,7 @@ pub struct CaConnSet { chan_check_next: Option, stats: CaConnSetStats, connset_out_tx: Sender, + ioc_finder_jh: JoinHandle>, } impl CaConnSet { @@ -226,12 +249,14 @@ impl CaConnSet { chan_check_next: None, stats: CaConnSetStats::new(), connset_out_tx, + ioc_finder_jh, }; // TODO await on jh let jh = tokio::spawn(CaConnSet::run(connset)); CaConnSetCtrl { tx: connset_tx, rx: connset_out_rx, + jh, } } @@ -241,15 +266,31 @@ impl CaConnSet { match x { Ok(ev) => this.handle_event(ev).await?, Err(_) => { - if this.shutdown_done { + if this.shutdown_stopping { // all fine - break Ok(()); + break; } else { - error!("channel closed without shutdown_done"); + error!("channel closed without shutdown_stopping"); } } } + if this.shutdown_stopping { + break; + } } + debug!( + "search_tx sender {} receiver {}", + this.search_tx.sender_count(), + this.search_tx.receiver_count() + ); + this.ioc_finder_jh + .await + .map_err(|e| Error::with_msg_no_trace(e.to_string()))??; + debug!("joined ioc_finder_jh"); + this.connset_out_tx.close(); + this.connset_rx.close(); + this.shutdown_done = true; + Ok(()) } async fn handle_event(&mut self, ev: CaConnSetEvent) -> Result<(), Error> { @@ -258,6 +299,7 @@ impl CaConnSet { ConnSetCmd::ChannelAdd(x) => self.handle_add_channel(x).await, ConnSetCmd::ChannelAddWithStatusId(x) => self.handle_add_channel_with_status_id(x).await, ConnSetCmd::ChannelAddWithAddr(x) => self.handle_add_channel_with_addr(x).await, + ConnSetCmd::ChannelRemove(x) => self.handle_remove_channel(x).await, ConnSetCmd::IocAddrQueryResult(x) => self.handle_ioc_query_result(x).await, ConnSetCmd::SeriesLookupResult(x) => self.handle_series_lookup_result(x).await, ConnSetCmd::CheckHealth => self.handle_check_health().await, @@ -301,6 +343,10 @@ impl CaConnSet { } async fn handle_add_channel(&mut self, add: ChannelAdd) -> Result<(), Error> { + if self.shutdown_stopping { + debug!("handle_add_channel but shutdown_stopping"); + return Ok(()); + } // TODO should I add the transition through ActiveChannelState::Init as well? let ch = Channel::new(add.name.clone()); let _st = self.channel_states.inner().entry(ch).or_insert_with(|| ChannelState { @@ -322,6 +368,10 @@ impl CaConnSet { } async fn handle_add_channel_with_status_id(&mut self, add: ChannelAddWithStatusId) -> Result<(), Error> { + if self.shutdown_stopping { + debug!("handle_add_channel but shutdown_stopping"); + return Ok(()); + } debug!("handle_add_channel_with_status_id {add:?}"); let ch = Channel::new(add.name.clone()); if let Some(chst) = self.channel_states.inner().get_mut(&ch) { @@ -350,6 +400,10 @@ impl CaConnSet { } async fn handle_add_channel_with_addr(&mut self, add: ChannelAddWithAddr) -> Result<(), Error> { + if self.shutdown_stopping { + debug!("handle_add_channel but shutdown_stopping"); + return Ok(()); + } if !self.ca_conn_ress.contains_key(&add.addr) { let c = self.create_ca_conn(add.clone())?; self.ca_conn_ress.insert(add.addr, c); @@ -360,6 +414,43 @@ impl CaConnSet { Ok(()) } + async fn handle_remove_channel(&mut self, add: ChannelRemove) -> Result<(), Error> { + let ch = Channel::new(add.name); + if let Some(k) = self.channel_states.inner().get_mut(&ch) { + match &k.value { + ChannelStateValue::Active(j) => match j { + ActiveChannelState::Init { .. } => { + k.value = ChannelStateValue::ToRemove { addr: None }; + } + ActiveChannelState::WaitForStatusSeriesId { .. } => { + k.value = ChannelStateValue::ToRemove { addr: None }; + } + ActiveChannelState::WithStatusSeriesId { + status_series_id: _, + state, + } => match &state.inner { + WithStatusSeriesIdStateInner::UnknownAddress { .. } => { + k.value = ChannelStateValue::ToRemove { addr: None }; + } + WithStatusSeriesIdStateInner::SearchPending { .. } => { + k.value = ChannelStateValue::ToRemove { addr: None }; + } + WithStatusSeriesIdStateInner::WithAddress { addr, state: _ } => { + k.value = ChannelStateValue::ToRemove { + addr: Some(addr.clone()), + }; + } + WithStatusSeriesIdStateInner::NoAddress { .. } => { + k.value = ChannelStateValue::ToRemove { addr: None }; + } + }, + }, + ChannelStateValue::ToRemove { .. } => {} + } + } + Ok(()) + } + async fn handle_ioc_query_result(&mut self, res: VecDeque) -> Result<(), Error> { for e in res { let ch = Channel::new(e.channel.clone()); @@ -416,6 +507,7 @@ impl CaConnSet { debug!("TODO handle_shutdown"); debug!("shutdown received"); self.shutdown_stopping = true; + self.search_tx.close(); for (addr, res) in self.ca_conn_ress.iter() { let item = ConnCommand::shutdown(); res.sender.send(item).await?; @@ -428,16 +520,20 @@ impl CaConnSet { if let Some(e) = self.ca_conn_ress.remove(&addr) { match e.jh.await { Ok(Ok(())) => { + self.stats.ca_conn_task_join_done_ok_inc(); debug!("CaConn {addr} finished well"); } Ok(Err(e)) => { + self.stats.ca_conn_task_join_done_err_inc(); error!("CaConn {addr} task error: {e}"); } Err(e) => { + self.stats.ca_conn_task_join_err_inc(); error!("CaConn {addr} join error: {e}"); } } } else { + self.stats.ca_conn_task_eos_non_exist_inc(); warn!("end-of-stream received for non-existent CaConn {addr}"); } Ok(()) diff --git a/netfetch/src/ca/finder.rs b/netfetch/src/ca/finder.rs index 822ff06..7ca0783 100644 --- a/netfetch/src/ca/finder.rs +++ b/netfetch/src/ca/finder.rs @@ -338,14 +338,9 @@ fn start_finder_ca(tx: Sender, tgts: Vec) -> (Sender< taskrun::spawn({ async move { while let Ok(item) = arx.recv().await { - match tx.send(DaemonEvent::SearchDone(item)).await { - Ok(_) => {} - Err(e) => { - error!("search res fwd {e}"); - } - } + todo!("send the result item"); } - warn!("search res fwd nput broken"); + warn!("search res fwd inp closed"); } }); (qtx, ioc_finder_jh) diff --git a/netfetch/src/daemon_common.rs b/netfetch/src/daemon_common.rs index 65eaf0d..ccd0055 100644 --- a/netfetch/src/daemon_common.rs +++ b/netfetch/src/daemon_common.rs @@ -1,11 +1,6 @@ -use crate::ca::conn::CaConnEvent; use crate::ca::connset::CaConnSetItem; -use crate::ca::findioc::FindIocRes; use async_channel::Sender; -use err::Error; use serde::Serialize; -use std::collections::VecDeque; -use std::net::SocketAddrV4; #[derive(Clone, Debug, Serialize, PartialEq, PartialOrd, Eq, Ord)] pub struct Channel { @@ -22,13 +17,11 @@ impl Channel { } } -#[derive(Debug)] +#[derive(Debug, Clone)] pub enum DaemonEvent { TimerTick(u32, Sender), ChannelAdd(Channel), ChannelRemove(Channel), - SearchDone(Result, Error>), - CaConnEvent(SocketAddrV4, CaConnEvent), CaConnSetItem(CaConnSetItem), Shutdown, } @@ -40,17 +33,6 @@ impl DaemonEvent { TimerTick(_, _) => format!("TimerTick"), ChannelAdd(x) => format!("ChannelAdd {x:?}"), ChannelRemove(x) => format!("ChannelRemove {x:?}"), - SearchDone(_x) => format!("SearchDone"), - CaConnEvent(_a, b) => { - use crate::ca::conn::CaConnEventValue::*; - match &b.value { - None => format!("CaConnEvent/None"), - EchoTimeout => format!("CaConnEvent/EchoTimeout"), - ConnCommandResult(_) => format!("CaConnEvent/ConnCommandResult"), - QueryItem(_) => format!("CaConnEvent/QueryItem"), - EndOfStream => format!("CaConnEvent/EndOfStream"), - } - } CaConnSetItem(_) => format!("CaConnSetItem"), Shutdown => format!("Shutdown"), } diff --git a/netfetch/src/metrics.rs b/netfetch/src/metrics.rs index 2049075..6b64e07 100644 --- a/netfetch/src/metrics.rs +++ b/netfetch/src/metrics.rs @@ -270,11 +270,10 @@ pub async fn metrics_agg_task( } } { - let val = ingest_commons - .insert_item_queue - .receiver() - .map_or(0, |x| x.len() as u64); - agg.store_worker_recv_queue_len.store(val, Ordering::Release); + warn!("TODO provide metrics with a weak ref to the query_item_channel"); + let nitems = 0; + // let nitems = weak.upgrade()..len(); + agg.store_worker_recv_queue_len.store(nitems, Ordering::Release); } let mut m = METRICS.lock().unwrap(); *m = Some(agg.clone()); diff --git a/netfetch/src/senderpolling.rs b/netfetch/src/senderpolling.rs index 8ec9bba..26d8660 100644 --- a/netfetch/src/senderpolling.rs +++ b/netfetch/src/senderpolling.rs @@ -42,11 +42,15 @@ impl SenderPolling { ret } + pub fn is_idle(&self) -> bool { + self.fut.is_none() + } + pub fn is_sending(&self) -> bool { self.fut.is_some() } - pub fn send(self: Pin<&mut Self>, item: T) { + pub fn send_pin(self: Pin<&mut Self>, item: T) { let (tx, fut) = unsafe { let x = Pin::get_unchecked_mut(self); (x.sender_ptr.as_mut(), &mut x.fut) @@ -55,7 +59,7 @@ impl SenderPolling { *fut = Some(s); } - pub fn send2(&mut self, item: T) { + pub fn send(&mut self, item: T) { let sender = unsafe { self.sender_ptr.as_mut() }; let s = sender.send(item); self.fut = Some(s); diff --git a/scywr/src/insertworker.rs b/scywr/src/insertworker.rs index 89ff55b..2c0e918 100644 --- a/scywr/src/insertworker.rs +++ b/scywr/src/insertworker.rs @@ -10,6 +10,7 @@ use log::*; use netpod::timeunits::MS; use netpod::timeunits::SEC; use netpod::ScyllaConfig; +use stats::CaConnStats; use std::sync::atomic; use std::sync::atomic::AtomicU64; use std::sync::atomic::Ordering; @@ -129,6 +130,175 @@ fn rate_limiter( rx } +async fn worker( + worker_ix: usize, + item_inp: Receiver, + ttls: Ttls, + insert_worker_opts: Arc, + data_store: Arc, + stats: Arc, +) -> Result<(), Error> { + insert_worker_opts + .insert_workers_running + .fetch_add(1, atomic::Ordering::AcqRel); + let backoff_0 = Duration::from_millis(10); + let mut backoff = backoff_0.clone(); + let mut i1 = 0; + loop { + let item = if let Ok(item) = item_inp.recv().await { + stats.store_worker_item_recv_inc(); + item + } else { + break; + }; + match item { + QueryItem::ConnectionStatus(item) => { + match insert_connection_status(item, ttls.index, &data_store, &stats).await { + Ok(_) => { + stats.connection_status_insert_done_inc(); + backoff = backoff_0; + } + Err(e) => { + stats_inc_for_err(&stats, &e); + back_off_sleep(&mut backoff).await; + } + } + } + QueryItem::ChannelStatus(item) => { + match insert_channel_status(item, ttls.index, &data_store, &stats).await { + Ok(_) => { + stats.channel_status_insert_done_inc(); + backoff = backoff_0; + } + Err(e) => { + stats_inc_for_err(&stats, &e); + back_off_sleep(&mut backoff).await; + } + } + } + QueryItem::Insert(item) => { + let insert_frac = insert_worker_opts.insert_frac.load(Ordering::Acquire); + if i1 % 1000 < insert_frac { + match insert_item(item, ttls.index, ttls.d0, ttls.d1, &data_store, &stats).await { + Ok(_) => { + stats.store_worker_insert_done_inc(); + backoff = backoff_0; + } + Err(e) => { + stats_inc_for_err(&stats, &e); + back_off_sleep(&mut backoff).await; + } + } + } else { + stats.store_worker_fraction_drop_inc(); + } + i1 += 1; + } + QueryItem::Mute(item) => { + let values = ( + (item.series.id() & 0xff) as i32, + item.series.id() as i64, + item.ts as i64, + item.ema, + item.emd, + ttls.index.as_secs() as i32, + ); + let qres = data_store.scy.execute(&data_store.qu_insert_muted, values).await; + match qres { + Ok(_) => { + stats.mute_insert_done_inc(); + backoff = backoff_0; + } + Err(e) => { + stats_inc_for_err(&stats, &crate::iteminsertqueue::Error::QueryError(e)); + back_off_sleep(&mut backoff).await; + } + } + } + QueryItem::Ivl(item) => { + let values = ( + (item.series.id() & 0xff) as i32, + item.series.id() as i64, + item.ts as i64, + item.ema, + item.emd, + ttls.index.as_secs() as i32, + ); + let qres = data_store + .scy + .execute(&data_store.qu_insert_item_recv_ivl, values) + .await; + match qres { + Ok(_) => { + stats.ivl_insert_done_inc(); + backoff = backoff_0; + } + Err(e) => { + stats_inc_for_err(&stats, &crate::iteminsertqueue::Error::QueryError(e)); + back_off_sleep(&mut backoff).await; + } + } + } + QueryItem::ChannelInfo(item) => { + let params = ( + (item.series.id() & 0xff) as i32, + item.ts_msp as i32, + item.series.id() as i64, + item.ivl, + item.interest, + item.evsize as i32, + ttls.index.as_secs() as i32, + ); + let qres = data_store.scy.execute(&data_store.qu_insert_channel_ping, params).await; + match qres { + Ok(_) => { + stats.channel_info_insert_done_inc(); + backoff = backoff_0; + } + Err(e) => { + stats_inc_for_err(&stats, &crate::iteminsertqueue::Error::QueryError(e)); + back_off_sleep(&mut backoff).await; + } + } + } + QueryItem::TimeBinPatchSimpleF32(item) => { + info!("have time bin patch to insert: {item:?}"); + let params = ( + item.series.id() as i64, + item.bin_len_sec as i32, + item.bin_count as i32, + item.off_msp as i32, + item.off_lsp as i32, + item.counts, + item.mins, + item.maxs, + item.avgs, + ttls.binned.as_secs() as i32, + ); + let qres = data_store + .scy + .execute(&data_store.qu_insert_binned_scalar_f32_v01, params) + .await; + match qres { + Ok(_) => { + stats.store_worker_insert_binned_done_inc(); + backoff = backoff_0; + } + Err(e) => { + stats_inc_for_err(&stats, &crate::iteminsertqueue::Error::QueryError(e)); + back_off_sleep(&mut backoff).await; + } + } + } + } + } + insert_worker_opts + .insert_workers_running + .fetch_sub(1, atomic::Ordering::AcqRel); + trace!("insert worker {worker_ix} done"); + Ok(()) +} + pub async fn spawn_scylla_insert_workers( scyconf: ScyllaConfig, insert_scylla_sessions: usize, @@ -138,7 +308,7 @@ pub async fn spawn_scylla_insert_workers( store_stats: Arc, use_rate_limit_queue: bool, ttls: Ttls, -) -> Result>, Error> { +) -> Result>>, Error> { let item_inp = if use_rate_limit_queue { rate_limiter(item_inp, insert_worker_opts.clone(), store_stats.clone()) } else { @@ -151,171 +321,15 @@ pub async fn spawn_scylla_insert_workers( data_stores.push(data_store); } for worker_ix in 0..insert_worker_count { - let item_inp = item_inp.clone(); let data_store = data_stores[worker_ix * data_stores.len() / insert_worker_count].clone(); - let stats = store_stats.clone(); - let insert_worker_opts = insert_worker_opts.clone(); - let fut = async move { - insert_worker_opts - .insert_workers_running - .fetch_add(1, atomic::Ordering::AcqRel); - let backoff_0 = Duration::from_millis(10); - let mut backoff = backoff_0.clone(); - let mut i1 = 0; - loop { - let item = if let Ok(item) = item_inp.recv().await { - stats.store_worker_item_recv_inc(); - item - } else { - break; - }; - match item { - QueryItem::ConnectionStatus(item) => { - match insert_connection_status(item, ttls.index, &data_store, &stats).await { - Ok(_) => { - stats.connection_status_insert_done_inc(); - backoff = backoff_0; - } - Err(e) => { - stats_inc_for_err(&stats, &e); - back_off_sleep(&mut backoff).await; - } - } - } - QueryItem::ChannelStatus(item) => { - match insert_channel_status(item, ttls.index, &data_store, &stats).await { - Ok(_) => { - stats.channel_status_insert_done_inc(); - backoff = backoff_0; - } - Err(e) => { - stats_inc_for_err(&stats, &e); - back_off_sleep(&mut backoff).await; - } - } - } - QueryItem::Insert(item) => { - let insert_frac = insert_worker_opts.insert_frac.load(Ordering::Acquire); - if i1 % 1000 < insert_frac { - match insert_item(item, ttls.index, ttls.d0, ttls.d1, &data_store, &stats).await { - Ok(_) => { - stats.store_worker_insert_done_inc(); - backoff = backoff_0; - } - Err(e) => { - stats_inc_for_err(&stats, &e); - back_off_sleep(&mut backoff).await; - } - } - } else { - stats.store_worker_fraction_drop_inc(); - } - i1 += 1; - } - QueryItem::Mute(item) => { - let values = ( - (item.series.id() & 0xff) as i32, - item.series.id() as i64, - item.ts as i64, - item.ema, - item.emd, - ttls.index.as_secs() as i32, - ); - let qres = data_store.scy.execute(&data_store.qu_insert_muted, values).await; - match qres { - Ok(_) => { - stats.mute_insert_done_inc(); - backoff = backoff_0; - } - Err(e) => { - stats_inc_for_err(&stats, &crate::iteminsertqueue::Error::QueryError(e)); - back_off_sleep(&mut backoff).await; - } - } - } - QueryItem::Ivl(item) => { - let values = ( - (item.series.id() & 0xff) as i32, - item.series.id() as i64, - item.ts as i64, - item.ema, - item.emd, - ttls.index.as_secs() as i32, - ); - let qres = data_store - .scy - .execute(&data_store.qu_insert_item_recv_ivl, values) - .await; - match qres { - Ok(_) => { - stats.ivl_insert_done_inc(); - backoff = backoff_0; - } - Err(e) => { - stats_inc_for_err(&stats, &crate::iteminsertqueue::Error::QueryError(e)); - back_off_sleep(&mut backoff).await; - } - } - } - QueryItem::ChannelInfo(item) => { - let params = ( - (item.series.id() & 0xff) as i32, - item.ts_msp as i32, - item.series.id() as i64, - item.ivl, - item.interest, - item.evsize as i32, - ttls.index.as_secs() as i32, - ); - let qres = data_store.scy.execute(&data_store.qu_insert_channel_ping, params).await; - match qres { - Ok(_) => { - stats.channel_info_insert_done_inc(); - backoff = backoff_0; - } - Err(e) => { - stats_inc_for_err(&stats, &crate::iteminsertqueue::Error::QueryError(e)); - back_off_sleep(&mut backoff).await; - } - } - } - QueryItem::TimeBinPatchSimpleF32(item) => { - info!("have time bin patch to insert: {item:?}"); - let params = ( - item.series.id() as i64, - item.bin_len_sec as i32, - item.bin_count as i32, - item.off_msp as i32, - item.off_lsp as i32, - item.counts, - item.mins, - item.maxs, - item.avgs, - ttls.binned.as_secs() as i32, - ); - let qres = data_store - .scy - .execute(&data_store.qu_insert_binned_scalar_f32_v01, params) - .await; - match qres { - Ok(_) => { - stats.store_worker_insert_binned_done_inc(); - backoff = backoff_0; - } - Err(e) => { - stats_inc_for_err(&stats, &crate::iteminsertqueue::Error::QueryError(e)); - back_off_sleep(&mut backoff).await; - } - } - } - } - } - insert_worker_opts - .insert_workers_running - .fetch_sub(1, atomic::Ordering::AcqRel); - trace!("insert worker {worker_ix} done"); - }; - let jh = tokio::spawn(fut); + let jh = tokio::spawn(worker( + worker_ix, + item_inp.clone(), + ttls.clone(), + insert_worker_opts.clone(), + data_store, + store_stats.clone(), + )); jhs.push(jh); } Ok(jhs) diff --git a/stats/src/stats.rs b/stats/src/stats.rs index b19d02d..d92b754 100644 --- a/stats/src/stats.rs +++ b/stats/src/stats.rs @@ -217,6 +217,10 @@ stats_proc::stats_struct!(( channel_with_address, channel_unassigned, channel_assigned, + ca_conn_task_join_done_ok, + ca_conn_task_join_done_err, + ca_conn_task_join_err, + ca_conn_task_eos_non_exist, ), ), // agg(name(CaConnSetStatsAgg), parent(CaConnSetStats)),