From ba4b44477846cf3ce89abb4d4942a7139a19d799 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Wed, 26 Mar 2025 17:59:29 +0100 Subject: [PATCH] WIP refactor metrics --- daqingest/src/daemon.rs | 15 ++++ netfetch/src/ca/conn.rs | 28 +++---- netfetch/src/ca/connset.rs | 26 ++++--- netfetch/src/daemon_common.rs | 5 +- netfetch/src/metrics.rs | 30 +++++++- netfetch/src/metrics/types.rs | 140 ++++++++++++++++++++++++++++++++++ 6 files changed, 217 insertions(+), 27 deletions(-) diff --git a/daqingest/src/daemon.rs b/daqingest/src/daemon.rs index 9bf67d2..99b0070 100644 --- a/daqingest/src/daemon.rs +++ b/daqingest/src/daemon.rs @@ -16,6 +16,7 @@ use netfetch::daemon_common::ChannelName; use netfetch::daemon_common::DaemonEvent; use netfetch::metrics::RoutesResources; use netfetch::metrics::StatsSet; +use netfetch::metrics::types::DaemonMetrics; use netfetch::throttletrace::ThrottleTrace; use netpod::Database; use netpod::ttl::RetentionTime; @@ -85,6 +86,7 @@ pub struct Daemon { // TODO series_conf_by_id_tx: Sender<()>, iqtx: Option, + daemon_metrics: DaemonMetrics, } impl Daemon { @@ -385,6 +387,7 @@ impl Daemon { channel_info_query_tx, series_conf_by_id_tx, iqtx: Some(iqtx2), + daemon_metrics: DaemonMetrics::new(), }; Ok(ret) } @@ -561,6 +564,9 @@ impl Daemon { error!("error from CaConnSet: {e}"); self.handle_shutdown().await?; } + Metrics(metrics) => { + self.daemon_metrics.ingest_ca_conn_set(metrics); + } } Ok(()) } @@ -695,6 +701,15 @@ impl Daemon { CaConnSetItem(item) => self.handle_ca_conn_set_item(item).await, Shutdown => self.handle_shutdown().await, ConfigReload(tx) => self.handle_config_reload(tx).await, + GetMetrics(tx) => { + match tx.send((&self.daemon_metrics).into()).await { + Ok(()) => {} + Err(e) => { + error!("can not send metrics into channel"); + } + } + Ok(()) + } }; let dt = ts1.elapsed(); if dt > Duration::from_millis(200) { diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index a00bec1..54994e1 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -1112,7 +1112,7 @@ pub struct CaConn { read_ioids: HashMap, handler_by_ioid: HashMap>>>, trace_channel_poll: bool, - ca_msg_recv_count: u64, + ca_msg_recv_cnt: u64, ca_version_recv_count: u64, ts_channel_status_pong_last: Instant, } @@ -1179,7 +1179,7 @@ impl CaConn { read_ioids: HashMap::new(), handler_by_ioid: HashMap::new(), trace_channel_poll: false, - ca_msg_recv_count: 0, + ca_msg_recv_cnt: 0, ca_version_recv_count: 0, ts_channel_status_pong_last: tsnow, } @@ -2810,9 +2810,9 @@ impl CaConn { } } CaMsgTy::VersionRes(_) => { - if self.ca_msg_recv_count != 0 { + // TODO must check that version is only ever the first message. + if false { self.stats.ca_proto_version_later().inc(); - // TODO emit log or count stats } } CaMsgTy::ChannelCloseRes(x) => { @@ -2825,12 +2825,11 @@ impl CaConn { } CaItem::Empty => {} } - if self.ca_msg_recv_count == 0 { - if self.ca_version_recv_count == 0 { - self.stats.ca_proto_no_version_as_first().inc(); - } + if false { + // TODO use flags to keep track of whether we have seen version or other msgs. + self.stats.ca_proto_no_version_as_first().inc(); } - self.ca_msg_recv_count += 1; + self.ca_msg_recv_cnt = self.ca_msg_recv_cnt.saturating_add(1); Ready(Some(Ok(()))) } Ready(Some(Err(e))) => { @@ -3184,17 +3183,14 @@ impl CaConn { Ok(()) } - fn housekeeping_self(&mut self) { - let cnt_max = 0xfffffff000000000; - if self.ca_msg_recv_count > cnt_max { - let mask = !cnt_max; - self.ca_msg_recv_count &= mask; - } - } + fn housekeeping_self(&mut self) {} fn metrics_emit(&mut self) { + let ca_msg_recv_cnt = self.ca_msg_recv_cnt; + self.ca_msg_recv_cnt = 0; let item = CaConnMetrics { ca_conn_event_out_queue_len: self.ca_conn_event_out_queue.len(), + ca_msg_recv_cnt, }; let item = CaConnEvent::new(Instant::now(), CaConnEventValue::Metrics(item)); self.ca_conn_event_out_queue.push_back(item); diff --git a/netfetch/src/ca/connset.rs b/netfetch/src/ca/connset.rs index 7e6a203..a029bdd 100644 --- a/netfetch/src/ca/connset.rs +++ b/netfetch/src/ca/connset.rs @@ -59,6 +59,8 @@ use std::net::SocketAddr; use std::net::SocketAddrV4; use std::pin::Pin; +use crate::metrics::types::CaConnMetricsAgg; +use crate::metrics::types::CaConnSetMetrics; use crate::queueset::QueueSet; use netpod::OnDrop; use netpod::TsNano; @@ -257,6 +259,7 @@ impl CaConnSetEvent { pub enum CaConnSetItem { Error(Error), Healthy, + Metrics(crate::metrics::types::CaConnSetMetrics), } pub struct CaConnSetCtrl { @@ -457,6 +460,7 @@ pub struct CaConnSet { rogue_channel_count: u64, connect_fail_count: usize, cssid_latency_max: Duration, + ca_connset_metrics: CaConnSetMetrics, } impl CaConnSet { @@ -530,6 +534,7 @@ impl CaConnSet { rogue_channel_count: 0, connect_fail_count: 0, cssid_latency_max: Duration::from_millis(2000), + ca_connset_metrics: CaConnSetMetrics::new(), }; // TODO await on jh let jh = tokio::spawn(CaConnSet::run(connset)); @@ -765,7 +770,7 @@ impl CaConnSet { CaConnEventValue::EndOfStream(reason) => self.handle_ca_conn_eos(addr, reason), CaConnEventValue::ChannelRemoved(name) => self.handle_ca_conn_channel_removed(addr, name), CaConnEventValue::Metrics(v) => { - // TODO aggregate metrics and stats + self.ca_connset_metrics.ca_conn_agg.ingest(v); Ok(()) } } @@ -1440,7 +1445,8 @@ impl CaConnSet { | CaConnEventValue::EchoTimeout | CaConnEventValue::ConnCommandResult(..) | CaConnEventValue::ChannelCreateFail(..) - | CaConnEventValue::ChannelStatus(..) => { + | CaConnEventValue::ChannelStatus(..) + | CaConnEventValue::Metrics(..) => { if let Err(e) = tx1.send((addr, item)).await { error!("channel send {:?}", e); return Err(e.into()); @@ -1456,9 +1462,6 @@ impl CaConnSet { return Err(e.into()); } } - CaConnEventValue::Metrics(_) => { - // TODO merge metrics - } } } if let Some(x) = eos_reason { @@ -1878,6 +1881,11 @@ impl CaConnSet { self.storage_insert_lt_qu.push_back(a); } } + { + let metrics = std::mem::replace(&mut self.ca_connset_metrics, CaConnSetMetrics::new()); + let item = CaConnSetItem::Metrics(metrics); + self.connset_out_queue.push_back(item); + } Ok(()) } } @@ -2009,10 +2017,6 @@ impl Stream for CaConnSet { break Ready(Some(CaConnSetItem::Error(e))); } - if let Some(item) = self.connset_out_queue.pop_front() { - break Ready(Some(item)); - } - match self.ticker.poll_unpin(cx) { Ready(()) => match self.as_mut().handle_own_ticker_tick(cx) { Ok(()) => { @@ -2028,6 +2032,10 @@ impl Stream for CaConnSet { } } + if let Some(item) = self.connset_out_queue.pop_front() { + break Ready(Some(item)); + } + if let Some((addr, jh)) = self.await_ca_conn_jhs.front_mut() { match jh.poll_unpin(cx) { Ready(x) => { diff --git a/netfetch/src/daemon_common.rs b/netfetch/src/daemon_common.rs index ace5211..4a22423 100644 --- a/netfetch/src/daemon_common.rs +++ b/netfetch/src/daemon_common.rs @@ -1,5 +1,6 @@ use crate::ca::connset::CaConnSetItem; use crate::conf::ChannelConfig; +use crate::metrics::types::MetricsPrometheusShort; use async_channel::Sender; use serde::Serialize; @@ -26,7 +27,8 @@ pub enum DaemonEvent { ChannelCommand(crate::ca::connset::ChannelCommand), CaConnSetItem(CaConnSetItem), Shutdown, - ConfigReload(async_channel::Sender), + ConfigReload(Sender), + GetMetrics(Sender), } impl DaemonEvent { @@ -40,6 +42,7 @@ impl DaemonEvent { CaConnSetItem(_) => format!("CaConnSetItem"), Shutdown => format!("Shutdown"), ConfigReload(..) => format!("ConfigReload"), + GetMetrics(..) => format!("GetMetrics"), } } } diff --git a/netfetch/src/metrics.rs b/netfetch/src/metrics.rs index f6d313d..12bd67b 100644 --- a/netfetch/src/metrics.rs +++ b/netfetch/src/metrics.rs @@ -202,6 +202,18 @@ async fn config_reload(dcom: Arc) -> Result) -> Result { + let (tx, rx) = async_channel::bounded(1); + let item = DaemonEvent::GetMetrics(tx); + dcom.tx.send(item).await; + match rx.recv().await { + Ok(x) => Ok(x.prometheus()), + Err(e) => Err(Error::with_public_msg_no_trace(e.to_string()) + .to_public_err_msg() + .into_response()), + } +} + async fn find_channel( params: HashMap, dcom: Arc, @@ -426,8 +438,24 @@ fn make_routes( Router::new().fallback(|| async { StatusCode::NOT_FOUND }).route( "/", get({ + let dcom = dcom.clone(); let stats_set = stats_set.clone(); - || async move { metrics(&stats_set) } + || async move { + let prom2 = metrics2(dcom).await.unwrap_or(String::new()); + let mut s = metrics(&stats_set); + s.push_str(&prom2); + s + } + }), + ), + ) + .nest( + "/metrics2", + Router::new().fallback(|| async { StatusCode::NOT_FOUND }).route( + "/", + get({ + let dcom = dcom.clone(); + || metrics2(dcom) }), ), ) diff --git a/netfetch/src/metrics/types.rs b/netfetch/src/metrics/types.rs index f8332a5..a60f543 100644 --- a/netfetch/src/metrics/types.rs +++ b/netfetch/src/metrics/types.rs @@ -3,7 +3,98 @@ use serde::Serialize; #[derive(Debug, Serialize)] pub struct CaConnMetrics { + // a value pub ca_conn_event_out_queue_len: usize, + // a term of a running counter sum + pub ca_msg_recv_cnt: u64, +} + +#[derive(Debug, Serialize)] +pub struct CaConnMetricsAgg { + // derived from values + pub ca_conn_event_out_queue_len_max: usize, + // derived from counter terms + pub ca_msg_recv_cnt_all: u64, +} + +impl CaConnMetricsAgg { + pub fn new() -> Self { + Self { + ca_conn_event_out_queue_len_max: 0, + ca_msg_recv_cnt_all: 0, + } + } + + pub fn ingest(&mut self, inp: CaConnMetrics) { + self.ca_conn_event_out_queue_len_max = self + .ca_conn_event_out_queue_len_max + .max(inp.ca_conn_event_out_queue_len); + self.ca_msg_recv_cnt_all += inp.ca_msg_recv_cnt; + } +} + +#[derive(Debug, Serialize)] +pub struct CaConnMetricsAggAgg { + // derived from values + pub ca_conn_event_out_queue_len_max: usize, + // derived from counter terms + pub ca_msg_recv_cnt_all: u64, +} + +impl CaConnMetricsAggAgg { + pub fn new() -> Self { + Self { + ca_conn_event_out_queue_len_max: 0, + ca_msg_recv_cnt_all: 0, + } + } + + pub fn ingest(&mut self, inp: CaConnMetricsAgg) { + // take again the max of the maxs + self.ca_conn_event_out_queue_len_max = self + .ca_conn_event_out_queue_len_max + .max(inp.ca_conn_event_out_queue_len_max); + // sum up again to a total + self.ca_msg_recv_cnt_all += inp.ca_msg_recv_cnt_all; + } +} + +#[derive(Debug, Serialize)] +pub struct CaConnSetMetrics { + pub ca_conn_agg: CaConnMetricsAgg, +} + +impl CaConnSetMetrics { + pub fn new() -> Self { + Self { + ca_conn_agg: CaConnMetricsAgg::new(), + } + } +} + +#[derive(Debug, Serialize)] +pub struct CaConnSetAggMetrics { + pub ca_conn_agg_agg: CaConnMetricsAggAgg, +} + +impl CaConnSetAggMetrics { + pub fn new() -> Self { + Self { + ca_conn_agg_agg: CaConnMetricsAggAgg::new(), + } + } + + pub fn ingest(&mut self, inp: CaConnSetMetrics) { + { + let src = inp.ca_conn_agg; + let dst = &mut self.ca_conn_agg_agg; + // take again the max of the maxs + dst.ca_conn_event_out_queue_len_max = dst + .ca_conn_event_out_queue_len_max + .max(src.ca_conn_event_out_queue_len_max); + dst.ca_msg_recv_cnt_all += src.ca_msg_recv_cnt_all; + } + } } pub struct InsertQueuesTxMetrics { @@ -25,3 +116,52 @@ impl From<&InsertQueuesTx> for InsertQueuesTxMetrics { } } } + +#[derive(Debug, Serialize)] +pub struct DaemonMetrics { + pub ca_conn_set_agg: CaConnSetAggMetrics, +} + +impl DaemonMetrics { + pub fn new() -> Self { + Self { + ca_conn_set_agg: CaConnSetAggMetrics::new(), + } + } + + pub fn ingest_ca_conn_set(&mut self, inp: CaConnSetMetrics) { + self.ca_conn_set_agg.ingest(inp); + } +} + +#[derive(Debug, Serialize)] +pub struct MetricsPrometheusShort { + // derived from values + pub ca_conn_event_out_queue_len_max: usize, + // derived from counter terms + pub ca_msg_recv_cnt_all: u64, +} + +impl MetricsPrometheusShort { + pub fn prometheus(&self) -> String { + use std::fmt::Write; + let mut s = String::new(); + write!( + &mut s, + "ca_conn_event_out_queue_len_max {}\n", + self.ca_conn_event_out_queue_len_max + ) + .unwrap(); + write!(&mut s, "ca_msg_recv_all_sum {}\n", self.ca_msg_recv_cnt_all).unwrap(); + s + } +} + +impl From<&DaemonMetrics> for MetricsPrometheusShort { + fn from(value: &DaemonMetrics) -> Self { + Self { + ca_conn_event_out_queue_len_max: value.ca_conn_set_agg.ca_conn_agg_agg.ca_conn_event_out_queue_len_max, + ca_msg_recv_cnt_all: value.ca_conn_set_agg.ca_conn_agg_agg.ca_msg_recv_cnt_all, + } + } +}