diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index 7ae59e8..a058a83 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -113,6 +113,7 @@ pub struct ChannelStateInfo { // #[serde(skip_serializing_if = "Option::is_none")] #[serde(serialize_with = "ser_instant")] pub ts_event_last: Option, + pub recv_count: Option, // #[serde(skip_serializing_if = "Option::is_none")] pub item_recv_ivl_ema: Option, pub interest_score: f32, @@ -156,6 +157,8 @@ enum ChannelError { #[derive(Clone, Debug)] struct EventedState { ts_last: Instant, + recv_count: u64, + recv_bytes: u64, } #[derive(Clone, Debug)] @@ -234,6 +237,13 @@ impl ChannelState { }, _ => None, }; + let recv_count = match self { + ChannelState::Created(_series, s) => match &s.state { + MonitoringState::Evented(_, s) => Some(s.recv_count), + _ => None, + }, + _ => None, + }; let item_recv_ivl_ema = match self { ChannelState::Created(_series, s) => { let ema = s.item_recv_ivl_ema.ema(); @@ -259,6 +269,7 @@ impl ChannelState { shape, ts_created, ts_event_last, + recv_count, item_recv_ivl_ema, interest_score, } @@ -1253,13 +1264,20 @@ impl CaConn { st.item_recv_ivl_ema.tick(tsnow); let scalar_type = st.scalar_type.clone(); let shape = st.shape.clone(); - match st.state { - MonitoringState::AddingEvent(ref series) => { + match &mut st.state { + MonitoringState::AddingEvent(series) => { let series = series.clone(); series_2 = Some(series.clone()); - st.state = MonitoringState::Evented(series, EventedState { ts_last: tsnow }); + st.state = MonitoringState::Evented( + series, + EventedState { + ts_last: tsnow, + recv_count: 0, + recv_bytes: 0, + }, + ); } - MonitoringState::Evented(ref series, ref mut st) => { + MonitoringState::Evented(series, st) => { series_2 = Some(series.clone()); st.ts_last = tsnow; } @@ -1267,6 +1285,10 @@ impl CaConn { error!("unexpected state: EventAddRes while having {:?}", st.state); } } + if let MonitoringState::Evented(_, st2) = &mut st.state { + st2.recv_count += 1; + st2.recv_bytes += ev.payload_len as u64; + } let series = match series_2 { Some(k) => k, None => { diff --git a/netfetch/src/ca/proto.rs b/netfetch/src/ca/proto.rs index 820fa4a..eb4e28d 100644 --- a/netfetch/src/ca/proto.rs +++ b/netfetch/src/ca/proto.rs @@ -118,6 +118,7 @@ pub struct EventAddRes { pub status: u32, pub subid: u32, pub value: CaEventValue, + pub payload_len: u32, } #[derive(Debug)] @@ -849,6 +850,7 @@ impl CaMsg { status: hi.param1, subid: hi.param2, value, + payload_len: hi.payload_len() as u32, }; let ty = CaMsgTy::EventAddRes(d); CaMsg::from_ty_ts(ty, tsnow) diff --git a/stats_types/src/stats_types.rs b/stats_types/src/stats_types.rs index b524244..d52fa83 100644 --- a/stats_types/src/stats_types.rs +++ b/stats_types/src/stats_types.rs @@ -118,13 +118,16 @@ struct StatsAReader { impl StatsAReader {} pub struct HistoLog2 { - histo: [AtomicU64; 16], + histo: [AtomicU64; 20], + sum: AtomicU64, sub: u16, } macro_rules! rep16 { ([$x:expr]) => { - [$x, $x, $x, $x, $x, $x, $x, $x, $x, $x, $x, $x, $x, $x, $x, $x] + [ + $x, $x, $x, $x, $x, $x, $x, $x, $x, $x, $x, $x, $x, $x, $x, $x, $x, $x, $x, $x, + ] }; } @@ -132,12 +135,14 @@ impl HistoLog2 { pub fn new(sub: u16) -> Self { Self { histo: rep16!([AtomicU64::new(0)]), + sum: AtomicU64::new(0), sub, } } #[inline] pub fn ingest(&self, mut v: u32) { + self.sum.fetch_add(v as u64, AcqRel); v >>= self.sub; let mut po = 0; while v != 0 && po < self.histo.len() - 1 { @@ -148,6 +153,7 @@ impl HistoLog2 { } pub fn to_prometheus(&self, name: &str) -> String { + let base: u32 = 2; let mut ret = String::with_capacity(2048); ret.push_str("# HELP "); ret.push_str(name); @@ -156,29 +162,30 @@ impl HistoLog2 { ret.push_str(name); ret.push_str(" histogram\n"); let mut cnt = 0; - let mut sum = 0; + let lastix = (self.histo.len() - 1) as u32; for (i, a) in self.histo.iter().enumerate() { use std::ops::Sub; let i = i as u32; - let le = 2u32.pow(i).sub(1); + let le = base.pow(i).sub(1); let v = a.load(Acquire); cnt += v; - sum += v * le as u64; ret.push_str(name); ret.push_str("_bucket{le=\""); - ret.push_str(&le.to_string()); + if i == lastix { + ret.push_str("+Inf"); + } else { + ret.push_str(&le.to_string()); + } ret.push_str("\"} "); ret.push_str(&cnt.to_string()); ret.push_str("\n"); } ret.push_str(name); - ret.push_str("_bucket{le=\"+Inf\"} "); - ret.push_str(&cnt.to_string()); - ret.push_str("\n"); - ret.push_str(name); ret.push_str("_count "); ret.push_str(&cnt.to_string()); ret.push_str("\n"); + + let sum = self.sum.load(Acquire); ret.push_str(name); ret.push_str("_sum "); ret.push_str(&sum.to_string()); @@ -195,27 +202,34 @@ fn histo_00() { // histo.ingest(2); histo.ingest(3); histo.ingest(4); - let exp = r##"# HELP latA help-text-missing -# TYPE latA histogram -latA_bucket{le="0"} 0 -latA_bucket{le="1"} 0 -latA_bucket{le="3"} 1 -latA_bucket{le="7"} 2 -latA_bucket{le="15"} 2 -latA_bucket{le="31"} 2 -latA_bucket{le="63"} 2 -latA_bucket{le="127"} 2 -latA_bucket{le="255"} 2 -latA_bucket{le="511"} 2 -latA_bucket{le="1023"} 2 -latA_bucket{le="2047"} 2 -latA_bucket{le="4095"} 2 -latA_bucket{le="8191"} 2 -latA_bucket{le="16383"} 2 -latA_bucket{le="32767"} 2 -latA_bucket{le="+Inf"} 2 -latA_count 2 -latA_sum 10 + histo.ingest(262143); + histo.ingest(262144); + let s = histo.to_prometheus("the_metric"); + // eprintln!("{s}"); + let exp = r##"# HELP the_metric help-text-missing +# TYPE the_metric histogram +the_metric_bucket{le="0"} 0 +the_metric_bucket{le="1"} 0 +the_metric_bucket{le="3"} 1 +the_metric_bucket{le="7"} 2 +the_metric_bucket{le="15"} 2 +the_metric_bucket{le="31"} 2 +the_metric_bucket{le="63"} 2 +the_metric_bucket{le="127"} 2 +the_metric_bucket{le="255"} 2 +the_metric_bucket{le="511"} 2 +the_metric_bucket{le="1023"} 2 +the_metric_bucket{le="2047"} 2 +the_metric_bucket{le="4095"} 2 +the_metric_bucket{le="8191"} 2 +the_metric_bucket{le="16383"} 2 +the_metric_bucket{le="32767"} 2 +the_metric_bucket{le="65535"} 2 +the_metric_bucket{le="131071"} 2 +the_metric_bucket{le="262143"} 3 +the_metric_bucket{le="+Inf"} 4 +the_metric_count 4 +the_metric_sum 524294 "##; - assert_eq!(histo.to_prometheus("latA"), exp); + assert_eq!(s, exp); }