diff --git a/daqingest/src/daemon.rs b/daqingest/src/daemon.rs index 415d251..9f9bc4e 100644 --- a/daqingest/src/daemon.rs +++ b/daqingest/src/daemon.rs @@ -15,6 +15,7 @@ use netfetch::daemon_common::Channel; use netfetch::daemon_common::DaemonEvent; use netfetch::metrics::ExtraInsertsConf; use netfetch::metrics::StatsSet; +use netfetch::throttletrace::ThrottleTrace; use netpod::Database; use netpod::ScyllaConfig; use scywr::insertworker::InsertWorkerOpts; @@ -592,16 +593,19 @@ pub async fn run(opts: CaIngestOpts, channels: Vec) -> Result<(), Error> let tx = daemon.tx.clone(); let daemon_stats = daemon.stats().clone(); + let (metrics_shutdown_tx, metrics_shutdown_rx) = async_channel::bounded(8); + let dcom = Arc::new(netfetch::metrics::DaemonComm::new(tx.clone())); let metrics_jh = { let stats_set = StatsSet::new(daemon_stats); - let fut = netfetch::metrics::start_metrics_service(opts.api_bind(), dcom, stats_set); + let fut = netfetch::metrics::metrics_service(opts.api_bind(), dcom, stats_set, metrics_shutdown_rx); tokio::task::spawn(fut) }; let daemon_jh = taskrun::spawn(daemon.daemon()); debug!("will configure {} channels", channels.len()); + let mut thr_msg = ThrottleTrace::new(Duration::from_millis(1000)); let mut i = 0; for s in &channels { let ch = Channel::new(s.into()); @@ -609,15 +613,17 @@ pub async fn run(opts: CaIngestOpts, channels: Vec) -> Result<(), Error> Ok(()) => {} Err(_) => break, } + thr_msg.trigger_fmt("sent ChannelAdd", &[&i as &_]); i += 1; - if i % 100 == 0 { - debug!("sent {} ChannelAdd", i); - } + // if i % 100 == 0 { + // debug!("sent {} ChannelAdd", i); + // } } debug!("{} configured channels applied", channels.len()); daemon_jh.await.map_err(|e| Error::with_msg_no_trace(e.to_string()))??; - if false { - metrics_jh.await.unwrap(); - } + info!("Daemon joined."); + metrics_shutdown_tx.send(1).await?; + metrics_jh.await.unwrap(); + info!("Metrics joined."); Ok(()) } diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index 4f58e09..3519be1 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -1,6 +1,7 @@ use super::proto; use super::ExtraInsertsConf; use crate::senderpolling::SenderPolling; +use crate::throttletrace::ThrottleTrace; use crate::timebin::ConnTimeBin; use async_channel::Sender; use core::fmt; @@ -477,6 +478,7 @@ pub struct CaConn { channel_info_query_queue: VecDeque, channel_info_query_sending: SenderPolling, time_binners: BTreeMap, + thr_msg_poll: ThrottleTrace, } impl Drop for CaConn { @@ -525,6 +527,7 @@ impl CaConn { channel_info_query_queue: VecDeque::new(), channel_info_query_sending: SenderPolling::new(channel_info_query_tx), time_binners: BTreeMap::new(), + thr_msg_poll: ThrottleTrace::new(Duration::from_millis(10000)), } } @@ -1708,7 +1711,7 @@ impl CaConn { } fn handle_own_ticker_tick(self: Pin<&mut Self>, _cx: &mut Context) -> Result<(), Error> { - debug!("tick CaConn {}", self.remote_addr_dbg); + // debug!("tick CaConn {}", self.remote_addr_dbg); let this = self.get_mut(); if false { for (_, tb) in this.time_binners.iter_mut() { @@ -1759,6 +1762,7 @@ impl Stream for CaConn { self.stats.caconn_poll_count.inc(); let poll_ts1 = Instant::now(); let ret = loop { + self.thr_msg_poll.trigger("CaConn::poll_next"); break if let CaConnState::EndOfStream = self.state { Ready(None) } else if let Err(e) = self.as_mut().handle_own_ticker(cx) { @@ -1819,7 +1823,7 @@ impl Stream for CaConn { warn!("long poll duration {:.0} ms", dt.as_secs_f32() * 1e3) } else if dt > Duration::from_millis(40) { info!("long poll duration {:.0} ms", dt.as_secs_f32() * 1e3) - } else if dt > Duration::from_millis(5) { + } else if false && dt > Duration::from_millis(5) { debug!("long poll duration {:.0} ms", dt.as_secs_f32() * 1e3) } ret diff --git a/netfetch/src/ca/connset.rs b/netfetch/src/ca/connset.rs index 217bc64..2b5a46b 100644 --- a/netfetch/src/ca/connset.rs +++ b/netfetch/src/ca/connset.rs @@ -1,4 +1,3 @@ -use super::connset_input_merge::InputMerge; use super::findioc::FindIocRes; use super::statemap; use super::statemap::ChannelState; @@ -16,6 +15,7 @@ use crate::errconv::ErrConv; use crate::rt::JoinHandle; use crate::rt::TokMx; use crate::senderpolling::SenderPolling; +use crate::throttletrace::ThrottleTrace; use async_channel::Receiver; use async_channel::Sender; use atomic::AtomicUsize; @@ -260,6 +260,8 @@ pub struct CaConnSet { stats: CaConnSetStats, ioc_finder_jh: JoinHandle>, await_ca_conn_jhs: VecDeque<(SocketAddr, JoinHandle>)>, + thr_msg_poll_1: ThrottleTrace, + thr_msg_storage_len: ThrottleTrace, } impl CaConnSet { @@ -304,6 +306,8 @@ impl CaConnSet { // connset_out_sender: SenderPolling::new(connset_out_tx), ioc_finder_jh, await_ca_conn_jhs: VecDeque::new(), + thr_msg_poll_1: ThrottleTrace::new(Duration::from_millis(2000)), + thr_msg_storage_len: ThrottleTrace::new(Duration::from_millis(1000)), }; // TODO await on jh let jh = tokio::spawn(CaConnSet::run(connset)); @@ -555,6 +559,8 @@ impl CaConnSet { if self.shutdown_stopping { return Ok(()); } + self.thr_msg_storage_len + .trigger_fmt("msg", &[&self.storage_insert_sender.len()]); debug!("TODO handle_check_health"); let ts2 = Instant::now(); let item = CaConnSetItem::Healthy(ts1, ts2); @@ -636,7 +642,7 @@ impl CaConnSet { tx: Sender<(SocketAddr, CaConnEvent)>, addr: SocketAddr, ) -> Result<(), Error> { - debug!("ca_conn_consumer begin {}", addr); + trace2!("ca_conn_consumer begin {}", addr); let stats = conn.stats(); let mut conn = conn; let mut ret = Ok(()); @@ -652,7 +658,7 @@ impl CaConnSet { } } } - debug!("ca_conn_consumer ended {}", addr); + trace2!("ca_conn_consumer ended {}", addr); tx.send(( addr, CaConnEvent { @@ -661,7 +667,7 @@ impl CaConnSet { }, )) .await?; - debug!("ca_conn_consumer signaled {}", addr); + trace!("ca_conn_consumer signaled {}", addr); ret } @@ -980,8 +986,9 @@ impl Stream for CaConnSet { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; - debug!("CaConnSet::poll"); loop { + self.thr_msg_poll_1.trigger("CaConnSet"); + let mut have_pending = false; if let Some(item) = self.connset_out_queue.pop_front() { diff --git a/netfetch/src/lib.rs b/netfetch/src/lib.rs index 000e888..e101dbf 100644 --- a/netfetch/src/lib.rs +++ b/netfetch/src/lib.rs @@ -12,4 +12,5 @@ pub mod rt; pub mod senderpolling; #[cfg(test)] pub mod test; +pub mod throttletrace; pub mod timebin; diff --git a/netfetch/src/metrics.rs b/netfetch/src/metrics.rs index aa6a126..7511424 100644 --- a/netfetch/src/metrics.rs +++ b/netfetch/src/metrics.rs @@ -1,5 +1,6 @@ use crate::ca::METRICS; use crate::daemon_common::DaemonEvent; +use async_channel::Receiver; use async_channel::Sender; use async_channel::WeakSender; use axum::extract::Query; @@ -244,9 +245,19 @@ fn make_routes(dcom: Arc, stats_set: StatsSet) -> axum::Router { ) } -pub async fn start_metrics_service(bind_to: String, dcom: Arc, stats_set: StatsSet) { - axum::Server::bind(&bind_to.parse().unwrap()) - .serve(make_routes(dcom, stats_set).into_make_service()) +pub async fn metrics_service( + bind_to: String, + dcom: Arc, + stats_set: StatsSet, + shutdown_signal: Receiver, +) { + let addr = bind_to.parse().unwrap(); + let router = make_routes(dcom, stats_set).into_make_service(); + axum::Server::bind(&addr) + .serve(router) + .with_graceful_shutdown(async move { + let _ = shutdown_signal.recv().await; + }) .await .unwrap() } diff --git a/netfetch/src/senderpolling.rs b/netfetch/src/senderpolling.rs index 39c8246..2568524 100644 --- a/netfetch/src/senderpolling.rs +++ b/netfetch/src/senderpolling.rs @@ -75,6 +75,10 @@ impl SenderPolling { self.sender = None; self.fut = None; } + + pub fn len(&self) -> Option { + self.sender.as_ref().map(|x| x.len()) + } } impl Future for SenderPolling { diff --git a/netfetch/src/throttletrace.rs b/netfetch/src/throttletrace.rs new file mode 100644 index 0000000..49d3ab8 --- /dev/null +++ b/netfetch/src/throttletrace.rs @@ -0,0 +1,44 @@ +use core::fmt; +use log::*; +use std::time::Duration; +use std::time::Instant; + +pub struct ThrottleTrace { + ivl: Duration, + next: Instant, + count: u64, +} + +impl ThrottleTrace { + pub fn new(ivl: Duration) -> Self { + Self { + ivl, + next: Instant::now(), + count: 0, + } + } + + pub fn trigger(&mut self, msg: &str) { + self.count += 1; + let tsnow = Instant::now(); + if self.next <= tsnow { + self.next = tsnow + self.ivl; + debug!("{} (count {})", msg, self.count); + } + } + + pub fn trigger_fmt(&mut self, msg: &str, params: &[&dyn fmt::Debug]) { + self.count += 1; + let tsnow = Instant::now(); + if self.next <= tsnow { + self.next = tsnow + self.ivl; + if params.len() == 1 { + debug!("{} {:?} (count {})", msg, params[0], self.count); + } else if params.len() == 2 { + debug!("{} {:?} {:?} (count {})", msg, params[0], params[1], self.count); + } else { + debug!("{} (count {})", msg, self.count); + } + } + } +}