diff --git a/daqingest/src/daemon.rs b/daqingest/src/daemon.rs index be17755..c5992de 100644 --- a/daqingest/src/daemon.rs +++ b/daqingest/src/daemon.rs @@ -880,10 +880,8 @@ impl Daemon { ); let rres = Arc::new(rres); let metrics_jh = { - let conn_set_stats = self.connset_ctrl.stats().clone(); let stats_set = StatsSet::new( daemon_stats, - conn_set_stats, self.insert_worker_stats.clone(), self.series_by_channel_stats.clone(), self.connset_ctrl.ioc_finder_stats().clone(), diff --git a/netfetch/src/ca/connset.rs b/netfetch/src/ca/connset.rs index e043a56..20bf1ef 100644 --- a/netfetch/src/ca/connset.rs +++ b/netfetch/src/ca/connset.rs @@ -44,7 +44,6 @@ use statemap::ConnectionState; use statemap::ConnectionStateValue; use statemap::WithStatusSeriesIdState; use statemap::WithStatusSeriesIdStateInner; -use stats::CaConnSetStats; use stats::IocFinderStats; use std::collections::BTreeMap; use std::collections::VecDeque; @@ -58,6 +57,7 @@ use netpod::OnDrop; use netpod::TsNano; use scywr::insertqueues::InsertQueuesTx; use series::SeriesId; +use stats::mett::CaConnSetMetrics; use std::sync::Arc; use std::task::Context; use std::task::Poll; @@ -247,7 +247,6 @@ pub enum CaConnSetItem { pub struct CaConnSetCtrl { tx: Sender, rx: Receiver, - stats: Arc, ioc_finder_stats: Arc, jh: JoinHandle>, } @@ -307,10 +306,6 @@ impl CaConnSetCtrl { Ok(()) } - pub fn stats(&self) -> &Arc { - &self.stats - } - pub fn ioc_finder_stats(&self) -> &Arc { &self.ioc_finder_stats } @@ -362,11 +357,11 @@ impl CanSendChannelInfoResult for SeriesLookupSender { struct StateTransRes<'a> { backend: &'a str, - stats: &'a CaConnSetStats, ca_conn_ress: &'a mut HashMap, channel_info_query_qu: &'a mut VecDeque, channel_info_res_tx: Pin<&'a mut Sender>>, chst: &'a mut ChannelState, + mett: &'a mut CaConnSetMetrics, } impl<'a> StateTransRes<'a> { @@ -374,11 +369,11 @@ impl<'a> StateTransRes<'a> { let chst = value.channel_states.get_mut_or_dummy_init(&chname); Self { backend: &value.backend, - stats: &value.stats, ca_conn_ress: &mut value.ca_conn_ress, channel_info_query_qu: &mut value.channel_info_query_qu, channel_info_res_tx: value.channel_info_res_tx.as_mut(), chst, + mett: &mut value.mett, } } } @@ -414,13 +409,11 @@ pub struct CaConnSet { shutdown_stopping: bool, shutdown_done: bool, chan_check_next: Option, - stats: Arc, ioc_finder_jh: JoinHandle>, await_ca_conn_jhs: VecDeque<(SocketAddr, JoinHandle>)>, thr_msg_storage_len: ThrottleTrace, - rogue_channel_count: u64, cssid_latency_max: Duration, - ca_connset_metrics: stats::mett::CaConnSetMetrics, + mett: stats::mett::CaConnSetMetrics, } impl CaConnSet { @@ -448,7 +441,6 @@ impl CaConnSet { ) .unwrap(); let (channel_info_res_tx, channel_info_res_rx) = async_channel::bounded(400); - let stats = Arc::new(CaConnSetStats::new()); let connset = Self { ticker: Self::new_self_ticker(), backend, @@ -479,23 +471,20 @@ impl CaConnSet { shutdown_stopping: false, shutdown_done: false, chan_check_next: None, - stats: stats.clone(), connset_out_tx: Box::pin(connset_out_tx), connset_out_queue: VecDeque::new(), // connset_out_sender: SenderPolling::new(connset_out_tx), ioc_finder_jh, await_ca_conn_jhs: VecDeque::new(), thr_msg_storage_len: ThrottleTrace::new(Duration::from_millis(1000)), - rogue_channel_count: 0, cssid_latency_max: Duration::from_millis(2000), - ca_connset_metrics: stats::mett::CaConnSetMetrics::new(), + mett: stats::mett::CaConnSetMetrics::new(), }; // TODO await on jh let jh = tokio::spawn(CaConnSet::run(connset)); CaConnSetCtrl { tx: connset_inp_tx, rx: connset_out_rx, - stats, ioc_finder_stats, jh, } @@ -577,7 +566,7 @@ impl CaConnSet { ress.channel_info_query_qu.push_back(item); } if let Err(_) = cmd.restx.try_send(Ok(())) { - ress.stats.command_reply_fail().inc(); + ress.mett.cmd_res_send_err().inc(); } Ok(()) } @@ -588,7 +577,7 @@ impl CaConnSet { if cmd.ch_cfg == ress.chst.config { debug!("handle_add_channel_existing config same {}", cmd.name()); if let Err(_) = cmd.restx.try_send(Ok(())) { - ress.stats.command_reply_fail().inc(); + ress.mett.cmd_res_send_err().inc(); } Ok(()) } else { @@ -652,7 +641,7 @@ impl CaConnSet { } } if let Err(_) = cmd.restx.try_send(Ok(())) { - ress.stats.command_reply_fail().inc(); + ress.mett.cmd_res_send_err().inc(); } Ok(()) } @@ -663,7 +652,7 @@ impl CaConnSet { chst.1.touched = 0; } if let Err(_) = cmd.restx.try_send(Ok(())) { - self.stats.command_reply_fail().inc(); + self.mett.cmd_res_send_err().inc(); } Ok(()) } @@ -683,7 +672,7 @@ impl CaConnSet { self.handle_remove_channel(cmd)?; } if let Err(_) = cmd.restx.try_send(Ok(())) { - self.stats.command_reply_fail().inc(); + self.mett.cmd_res_send_err().inc(); } Ok(()) } @@ -697,7 +686,6 @@ impl CaConnSet { info!("handle_add_channel {:?}", cmd); } trace_channel_state!("handle_add_channel {:?}", cmd); - self.stats.channel_add().inc(); // TODO should I add the transition through ActiveChannelState::Init as well? let chname = ChannelName::new(cmd.name().into()); let ress = StateTransRes::init(self, &chname); @@ -720,7 +708,7 @@ impl CaConnSet { CaConnEventValue::EndOfStream(reason) => self.handle_ca_conn_eos(addr, reason), CaConnEventValue::ChannelRemoved(name) => self.handle_ca_conn_channel_removed(addr, name), CaConnEventValue::Metrics(v) => { - self.ca_connset_metrics.ca_conn().ingest(v); + self.mett.ca_conn().ingest(v); Ok(()) } } @@ -766,7 +754,7 @@ impl CaConnSet { debug!("handle_add_channel but shutdown_stopping"); return Ok(()); } - self.stats.channel_status_series_found().inc(); + self.mett.channel_status_series_found().inc(); if series::dbg::dbg_chn(&name) { info!("handle_add_channel_with_status_id {cmd:?}"); } @@ -805,17 +793,17 @@ impl CaConnSet { }); let qu = IocAddrQuery::cached(name.into()); self.find_ioc_query_queue.push_back(qu); - self.stats.ioc_search_start().inc(); + self.mett.ioc_search_start().inc(); } else { - self.stats.logic_issue().inc(); + self.mett.logic_err().inc(); trace!("TODO have a status series id but no more channel"); } } else { - self.stats.logic_issue().inc(); + self.mett.logic_err().inc(); trace!("TODO have a status series id but no more channel"); } } else { - self.stats.logic_issue().inc(); + self.mett.logic_err().inc(); trace!("TODO have a status series id but no more channel"); } Ok(()) @@ -842,7 +830,7 @@ impl CaConnSet { if let ChannelStateValue::Active(ast) = &mut chst.value { if let ActiveChannelState::WithStatusSeriesId(st3) = ast { trace!("handle_add_channel_with_addr INNER {cmd:?}"); - self.stats.handle_add_channel_with_addr().inc(); + self.mett.handle_add_channel_with_addr().inc(); let tsnow = SystemTime::now(); let mut writer_status = serieswriter::writer::SeriesWriter::new(SeriesId::new(cmd.cssid.id()))?; let mut writer_status_state = serieswriter::fixgridwriter::ChannelStatusWriteState::new( @@ -958,7 +946,7 @@ impl CaConnSet { if let ChannelStateValue::Active(ast) = &mut chst.value { if let ActiveChannelState::WithStatusSeriesId(st2) = ast { if let Some(addr) = res.addr { - self.stats.ioc_addr_found().inc(); + self.mett.ioc_addr_found().inc(); trace!("ioc found {res:?}"); let cmd = ChannelAddWithAddr { ch_cfg: chst.config.clone(), @@ -967,21 +955,21 @@ impl CaConnSet { }; self.handle_add_channel_with_addr(cmd)?; } else { - self.stats.ioc_addr_not_found().inc(); + self.mett.ioc_addr_not_found().inc(); trace!("ioc not found {res:?}"); let since = SystemTime::now(); st2.inner = WithStatusSeriesIdStateInner::UnknownAddress { since }; } } else { - self.stats.ioc_addr_result_for_unknown_channel().inc(); + self.mett.ioc_addr_result_for_unknown_channel().inc(); warn!("TODO got address but no longer active"); } } else { - self.stats.ioc_addr_result_for_unknown_channel().inc(); + self.mett.ioc_addr_result_for_unknown_channel().inc(); warn!("TODO got address but no longer active"); } } else { - self.stats.ioc_addr_result_for_unknown_channel().inc(); + self.mett.ioc_addr_result_for_unknown_channel().inc(); warn!("ioc addr lookup done but channel no longer here"); } } @@ -1021,7 +1009,7 @@ impl CaConnSet { .collect(); let item = ChannelStatusesResponse { channels_ca_conn_set }; if req.tx.try_send(item).is_err() { - self.stats.response_tx_fail.inc(); + self.mett.chan_send_err().inc(); } Ok(()) } @@ -1085,9 +1073,9 @@ impl CaConnSet { fn apply_ca_conn_health_update(&mut self, addr: SocketAddr, res: ChannelStatusPartial) -> Result<(), Error> { trace_health_update!("apply_ca_conn_health_update {addr}"); let tsnow = SystemTime::now(); - self.rogue_channel_count = 0; + let mut rogue_channel_count = 0; for (k, v) in res.channel_statuses { - trace_health_update!("self.rogue_channel_count {}", self.rogue_channel_count); + trace_health_update!("self.rogue_channel_count {}", rogue_channel_count); trace_health_update!("apply_ca_conn_health_update {k:?} {v:?}"); let ch = if let Some(x) = self.channel_by_cssid.get(&k) { x @@ -1103,30 +1091,30 @@ impl CaConnSet { } = &mut st3.inner { if SocketAddr::V4(*conn_addr) != addr { - self.rogue_channel_count += 1; + rogue_channel_count += 1; } if let WithAddressState::Assigned(st5) = st4 { st5.updated = tsnow; st5.health_update_count += 1; st5.value = ConnectionStateValue::ChannelStateInfo(v); } else { - self.rogue_channel_count += 1; + rogue_channel_count += 1; } } else { - self.rogue_channel_count += 1; + rogue_channel_count += 1; } } else { - self.rogue_channel_count += 1; + rogue_channel_count += 1; } } else { - self.rogue_channel_count += 1; + rogue_channel_count += 1; } } else { - self.rogue_channel_count += 1; + rogue_channel_count += 1; } } - trace_health_update!("self.rogue_channel_count {}", self.rogue_channel_count); - self.stats.channel_rogue.set(self.rogue_channel_count); + trace_health_update!("rogue_channel_count {}", rogue_channel_count); + self.mett.channel_rogue().set(rogue_channel_count); Ok(()) } @@ -1167,10 +1155,10 @@ impl CaConnSet { fn handle_ca_conn_eos(&mut self, addr: SocketAddr, reason: EndOfStreamReason) -> Result<(), Error> { debug!("handle_ca_conn_eos {addr} {reason:?}"); if let Some(e) = self.ca_conn_ress.remove(&addr) { - self.stats.ca_conn_eos_ok().inc(); + self.mett.ca_conn_eos_ok().inc(); self.await_ca_conn_jhs.push_back((addr, e.jh)); } else { - self.stats.ca_conn_eos_unexpected().inc(); + self.mett.ca_conn_eos_unexpected().inc(); warn!("end-of-stream received for non-existent CaConn {addr}"); } { @@ -1299,7 +1287,7 @@ impl CaConnSet { } } - fn create_ca_conn(&self, add: ChannelAddWithAddr) -> Result { + fn create_ca_conn(&mut self, add: ChannelAddWithAddr) -> Result { // TODO should we save this as event? let opts = CaConnOpts::default(); let addr = add.addr; @@ -1308,7 +1296,7 @@ impl CaConnSet { } else { return Err(Error::ExpectIpv4); }; - self.stats.create_ca_conn().inc(); + self.mett.create_ca_conn().inc(); let conn = CaConn::new( opts, self.backend.clone(), @@ -1412,6 +1400,8 @@ impl CaConnSet { fn check_channel_states(&mut self, tsnow: Instant, stnow: SystemTime) -> Result<(), Error> { let (mut search_pending_count, mut assigned_without_health_update) = self.update_channel_state_counts(); + let mut channel_health_timeout_soon = 0; + let mut channel_health_timeout_reached = 0; let mut cmd_remove_channel = Vec::new(); let mut cmd_add_channel = Vec::new(); let k = self.chan_check_next.take(); @@ -1429,7 +1419,7 @@ impl CaConnSet { ChannelStateValue::Active(st2) => match st2 { ActiveChannelState::Init { since: _ } => { // TODO no longer used? remove? - self.stats.logic_error().inc(); + self.mett.logic_err().inc(); } ActiveChannelState::WaitForStatusSeriesId { since } => { let dt = stnow.duration_since(*since).unwrap_or(Duration::ZERO); @@ -1474,10 +1464,10 @@ impl CaConnSet { } Assigned(st4) => { if st4.updated + CHANNEL_HEALTH_TIMEOUT / 3 < stnow { - self.stats.channel_health_timeout_soon().inc(); + channel_health_timeout_soon += 1; } if st4.updated + CHANNEL_HEALTH_TIMEOUT < stnow { - self.stats.channel_health_timeout().inc(); + channel_health_timeout_reached += 1; trace!("health timeout channel {ch:?} ~~~~~~~~~~~~~~~~~~~"); // TODO error!("TODO health timeout channel {ch:?} ~~~~~~~~~~~~~~~~~~~"); @@ -1555,6 +1545,10 @@ impl CaConnSet { break; } } + self.mett + .channel_health_timeout_reached() + .set(channel_health_timeout_reached); + self.mett.channel_health_timeout_soon().set(channel_health_timeout_soon); loop { break if search_pending_count >= CURRENT_SEARCH_PENDING_MAX as _ { } else { @@ -1572,7 +1566,7 @@ impl CaConnSet { st3.inner = WithStatusSeriesIdStateInner::AddrSearchPending { since: stnow }; let qu = IocAddrQuery::uncached(ch.name().into()); self.find_ioc_query_queue.push_back(qu); - self.stats.ioc_search_start().inc(); + self.mett.ioc_search_start().inc(); } _ => {} }, @@ -1600,7 +1594,7 @@ impl CaConnSet { } // TODO should use both counters and values - fn update_channel_state_counts(&mut self) -> (u64, u64) { + fn update_channel_state_counts(&mut self) -> (u32, u32) { let mut unknown_address = 0; let mut search_pending = 0; let mut no_address = 0; @@ -1663,16 +1657,16 @@ impl CaConnSet { ChannelStateValue::InitDummy => {} } } - self.stats.channel_unknown_address.set(unknown_address); - self.stats.channel_search_pending.set(search_pending); - self.stats.channel_no_address.set(no_address); - self.stats.channel_unassigned.set(unassigned); - // self.stats.channel_backoff.set(backoff); - self.stats.channel_assigned.set(assigned); - self.stats.channel_connected.set(connected); - self.stats.channel_maybe_wrong_address.set(maybe_wrong_address); - self.stats - .channel_assigned_without_health_update + self.mett.channel_unknown_address().set(unknown_address); + self.mett.channel_search_pending().set(search_pending); + self.mett.channel_no_address().set(no_address); + self.mett.channel_unassigned().set(unassigned); + // self.mett.channel_backoff().set(backoff); + self.mett.channel_assigned().set(assigned); + self.mett.channel_connected().set(connected); + self.mett.channel_maybe_wrong_address().set(maybe_wrong_address); + self.mett + .channel_assigned_without_health_update() .set(assigned_without_health_update); (search_pending, assigned_without_health_update) } @@ -1687,7 +1681,7 @@ impl CaConnSet { break if tx.is_sending() { match tx.poll_unpin(cx) { Ready(Ok(())) => { - self.stats.try_push_ca_conn_cmds_sent.inc(); + self.mett.try_push_ca_conn_cmds_sent().inc(); have_progress = true; continue; } @@ -1702,7 +1696,7 @@ impl CaConnSet { // Should be nothing to do here. // The connection ended, which CaConnSet notices anyway. // self.handle_connect_fail(addr)?; - self.stats.try_push_ca_conn_cmds_closed().inc(); + self.mett.try_push_ca_conn_cmds_closed().inc(); } }, Pending => { @@ -1748,7 +1742,7 @@ impl CaConnSet { } { // let item = std::mem::replace(&mut self.ca_connset_metrics, CaConnSetMetrics::new()); - let item = self.ca_connset_metrics.take_and_reset(); + let item = self.mett.take_and_reset(); let item = CaConnSetItem::Metrics(item); self.connset_out_queue.push_back(item); } @@ -1852,29 +1846,30 @@ impl Stream for CaConnSet { use Poll::*; trace4!("CaConnSet poll begin"); let poll_ts1 = Instant::now(); - self.stats.poll_fn_begin().inc(); + self.mett.poll_fn_begin().inc(); let ret = loop { trace4!("CaConnSet poll loop"); - self.stats.poll_loop_begin().inc(); - - // TODO generalize to all combinations - self.stats.storage_insert_tx_len.set(self.iqtx.st_rf3_tx.len() as _); - self.stats - .storage_insert_queue_len - .set(self.storage_insert_st_qu.len() as _); - self.stats - .channel_info_query_queue_len - .set(self.channel_info_query_qu.len() as _); - self.stats - .channel_info_query_sender_len - .set(self.channel_info_query_sender.len().unwrap_or(0) as _); - self.stats - .channel_info_res_tx_len - .set(self.channel_info_res_tx.len() as _); - self.stats - .find_ioc_query_sender_len - .set(self.find_ioc_query_sender.len().unwrap_or(0) as _); - self.stats.ca_conn_res_tx_len.set(self.ca_conn_res_tx.len() as _); + { + let mut self2 = self.as_mut().get_mut(); + self2.mett.poll_loop_begin().inc(); + self2 + .mett + .channel_info_query_queue_len() + .set(self2.channel_info_query_qu.len() as _); + self2 + .mett + .channel_info_query_sender_len() + .set(self2.channel_info_query_sender.len().unwrap_or(0) as _); + self2 + .mett + .channel_info_res_tx_len() + .set(self2.channel_info_res_tx.len() as _); + self2 + .mett + .find_ioc_query_sender_len() + .set(self2.find_ioc_query_sender.len().unwrap_or(0) as _); + self2.mett.ca_conn_res_tx_len().set(self2.ca_conn_res_tx.len() as _); + } let mut penpro = PendingProgress::new(); @@ -1910,15 +1905,15 @@ impl Stream for CaConnSet { let left = self.await_ca_conn_jhs.len(); match x { Ok(Ok(())) => { - self.stats.ca_conn_task_join_done_ok.inc(); + self.mett.ca_conn_task_join_done_ok().inc(); debug!("CaConn {addr} finished well left {left}"); } Ok(Err(e)) => { - self.stats.ca_conn_task_join_done_err.inc(); + self.mett.ca_conn_task_join_done_err().inc(); error!("CaConn {addr} task error: {e} left {left}"); } Err(e) => { - self.stats.ca_conn_task_join_err.inc(); + self.mett.ca_conn_task_join_err().inc(); error!("CaConn {addr} join error: {e} left {left}"); } } @@ -1934,7 +1929,7 @@ impl Stream for CaConnSet { let this = self.as_mut().get_mut(); let qu = &mut this.storage_insert_st_qu; let tx = this.storage_insert_st_tx.as_mut(); - let counter = this.stats.storage_insert_queue_send(); + let counter = this.mett.storage_insert_queue_send(); let x = sender_polling_send(qu, tx, cx, || { counter.inc(); }); @@ -1946,7 +1941,7 @@ impl Stream for CaConnSet { let this = self.as_mut().get_mut(); let qu = &mut this.storage_insert_lt_qu; let tx = this.storage_insert_lt_tx.as_mut(); - let counter = this.stats.storage_insert_queue_send(); + let counter = this.mett.storage_insert_queue_send(); let x = sender_polling_send(qu, tx, cx, || { counter.inc(); }); @@ -2030,23 +2025,23 @@ impl Stream for CaConnSet { } break if self.ready_for_end_of_stream() { - self.stats.ready_for_end_of_stream().inc(); + self.mett.ready_for_end_of_stream().inc(); if penpro.progress() { - self.stats.ready_for_end_of_stream_with_progress().inc(); + self.mett.ready_for_end_of_stream_with_progress().inc(); continue; } else { Ready(None) } } else { if penpro.progress() { - self.stats.poll_reloop().inc(); + self.mett.poll_reloop().inc(); continue; } else { if penpro.pending() { - self.stats.poll_pending().inc(); + self.mett.poll_pending().inc(); Pending } else { - self.stats.poll_no_progress_no_pending().inc(); + self.mett.poll_no_progress_no_pending().inc(); let e = Error::NoProgressNoPending; Ready(Some(CaConnSetItem::Error(e))) } @@ -2056,7 +2051,7 @@ impl Stream for CaConnSet { trace4!("CaConnSet poll done"); let poll_ts2 = Instant::now(); let dt = poll_ts2.saturating_duration_since(poll_ts1); - self.stats.poll_all_dt().ingest((1e3 * dt.as_secs_f32()) as u32); + self.mett.poll_all_dt().push_dur_100us(dt); ret } } diff --git a/netfetch/src/metrics.rs b/netfetch/src/metrics.rs index 3e6ff9b..6bb498f 100644 --- a/netfetch/src/metrics.rs +++ b/netfetch/src/metrics.rs @@ -33,7 +33,6 @@ use scywr::iteminsertqueue::QueryItem; use serde::Deserialize; use serde::Serialize; use serde_json::json; -use stats::CaConnSetStats; use stats::CaProtoStats; use stats::DaemonStats; use stats::InsertWorkerStats; @@ -130,7 +129,6 @@ impl IntoResponse for CustomErrorResponse { #[derive(Clone)] pub struct StatsSet { daemon: Arc, - ca_conn_set: Arc, insert_worker_stats: Arc, series_by_channel_stats: Arc, ioc_finder_stats: Arc, @@ -140,7 +138,6 @@ pub struct StatsSet { impl StatsSet { pub fn new( daemon: Arc, - ca_conn_set: Arc, insert_worker_stats: Arc, series_by_channel_stats: Arc, ioc_finder_stats: Arc, @@ -148,7 +145,6 @@ impl StatsSet { ) -> Self { Self { daemon, - ca_conn_set, insert_worker_stats, series_by_channel_stats, ioc_finder_stats, @@ -354,7 +350,6 @@ fn metricbeat(stats_set: &StatsSet) -> axum::Json { fn metrics(stats_set: &StatsSet) -> String { let ss = [ stats_set.daemon.prometheus(), - stats_set.ca_conn_set.prometheus(), stats_set.insert_worker_stats.prometheus(), stats_set.series_by_channel_stats.prometheus(), stats_set.ioc_finder_stats.prometheus(), diff --git a/scywr/Cargo.toml b/scywr/Cargo.toml index 8aac481..5a505d3 100644 --- a/scywr/Cargo.toml +++ b/scywr/Cargo.toml @@ -7,7 +7,6 @@ edition = "2024" [dependencies] futures-util = "0.3.28" async-channel = "2.3.1" -#scylla = "0.15.1" scylla = "1.1" smallvec = "1.11.0" pin-project = "1.1.5" diff --git a/stats/mettdecl.rs b/stats/mettdecl.rs index 0f5d56b..a4cdace 100644 --- a/stats/mettdecl.rs +++ b/stats/mettdecl.rs @@ -87,6 +87,54 @@ mod Metrics { type Input = CaConnMetrics; type Name = ca_conn; } + enum counters { + poll_fn_begin, + poll_loop_begin, + ready_for_end_of_stream, + ready_for_end_of_stream_with_progress, + poll_reloop, + poll_pending, + poll_no_progress_no_pending, + ioc_search_start, + chan_send_err, + cmd_res_send_err, + logic_err, + channel_status_series_found, + ioc_addr_found, + ioc_addr_not_found, + ioc_addr_result_for_unknown_channel, + ca_conn_eos_ok, + ca_conn_eos_unexpected, + handle_add_channel_with_addr, + try_push_ca_conn_cmds_sent, + try_push_ca_conn_cmds_closed, + create_ca_conn, + storage_insert_queue_send, + ca_conn_task_join_done_ok, + ca_conn_task_join_done_err, + ca_conn_task_join_err, + } + enum values { + channel_info_query_queue_len, + channel_info_query_sender_len, + channel_info_res_tx_len, + ca_conn_res_tx_len, + find_ioc_query_sender_len, + channel_rogue, + channel_unknown_address, + channel_search_pending, + channel_no_address, + channel_unassigned, + channel_assigned, + channel_connected, + channel_maybe_wrong_address, + channel_assigned_without_health_update, + channel_health_timeout_soon, + channel_health_timeout_reached, + } + enum histolog2s { + poll_all_dt, + } } mod Metrics { diff --git a/stats/src/stats.rs b/stats/src/stats.rs index be36805..2e42c41 100644 --- a/stats/src/stats.rs +++ b/stats/src/stats.rs @@ -318,64 +318,6 @@ stats_proc::stats_struct!(( ), histolog2s(payload_size, data_count, outbuf_len,), ), - stats_struct( - name(CaConnSetStats), - prefix(connset), - counters( - channel_add, - channel_status_series_found, - channel_health_timeout_soon, - channel_health_timeout, - ioc_search_start, - ioc_addr_found, - ioc_addr_not_found, - ioc_addr_result_for_unknown_channel, - ca_conn_task_begin, - ca_conn_task_done, - ca_conn_task_join_done_ok, - ca_conn_task_join_done_err, - ca_conn_task_join_err, - ca_conn_eos_ok, - ca_conn_eos_err, - ca_conn_eos_unexpected, - response_tx_fail, - try_push_ca_conn_cmds_sent, - try_push_ca_conn_cmds_closed, - logic_error, - logic_issue, - ready_for_end_of_stream, - ready_for_end_of_stream_with_progress, - poll_fn_begin, - poll_loop_begin, - poll_pending, - poll_reloop, - poll_no_progress_no_pending, - handle_add_channel_with_addr, - create_ca_conn, - command_reply_fail, - storage_insert_queue_send, - ), - values( - storage_insert_queue_len, - storage_insert_tx_len, - channel_info_query_queue_len, - channel_info_query_sender_len, - channel_info_res_tx_len, - find_ioc_query_sender_len, - ca_conn_res_tx_len, - channel_unknown_address, - channel_search_pending, - channel_no_address, - channel_unassigned, - channel_backoff, - channel_assigned, - channel_connected, - channel_maybe_wrong_address, - channel_assigned_without_health_update, - channel_rogue, - ), - histolog2s(poll_all_dt,), - ), stats_struct( name(SeriesByChannelStats), prefix(seriesbychannel),