Improve stats histo, maintain recv stats per channel

This commit is contained in:
Dominik Werder
2023-10-01 22:52:28 +02:00
parent 9a62d17566
commit e66d37706e
3 changed files with 74 additions and 36 deletions

View File

@@ -113,6 +113,7 @@ pub struct ChannelStateInfo {
// #[serde(skip_serializing_if = "Option::is_none")]
#[serde(serialize_with = "ser_instant")]
pub ts_event_last: Option<Instant>,
pub recv_count: Option<u64>,
// #[serde(skip_serializing_if = "Option::is_none")]
pub item_recv_ivl_ema: Option<f32>,
pub interest_score: f32,
@@ -156,6 +157,8 @@ enum ChannelError {
#[derive(Clone, Debug)]
struct EventedState {
ts_last: Instant,
recv_count: u64,
recv_bytes: u64,
}
#[derive(Clone, Debug)]
@@ -234,6 +237,13 @@ impl ChannelState {
},
_ => None,
};
let recv_count = match self {
ChannelState::Created(_series, s) => match &s.state {
MonitoringState::Evented(_, s) => Some(s.recv_count),
_ => None,
},
_ => None,
};
let item_recv_ivl_ema = match self {
ChannelState::Created(_series, s) => {
let ema = s.item_recv_ivl_ema.ema();
@@ -259,6 +269,7 @@ impl ChannelState {
shape,
ts_created,
ts_event_last,
recv_count,
item_recv_ivl_ema,
interest_score,
}
@@ -1253,13 +1264,20 @@ impl CaConn {
st.item_recv_ivl_ema.tick(tsnow);
let scalar_type = st.scalar_type.clone();
let shape = st.shape.clone();
match st.state {
MonitoringState::AddingEvent(ref series) => {
match &mut st.state {
MonitoringState::AddingEvent(series) => {
let series = series.clone();
series_2 = Some(series.clone());
st.state = MonitoringState::Evented(series, EventedState { ts_last: tsnow });
st.state = MonitoringState::Evented(
series,
EventedState {
ts_last: tsnow,
recv_count: 0,
recv_bytes: 0,
},
);
}
MonitoringState::Evented(ref series, ref mut st) => {
MonitoringState::Evented(series, st) => {
series_2 = Some(series.clone());
st.ts_last = tsnow;
}
@@ -1267,6 +1285,10 @@ impl CaConn {
error!("unexpected state: EventAddRes while having {:?}", st.state);
}
}
if let MonitoringState::Evented(_, st2) = &mut st.state {
st2.recv_count += 1;
st2.recv_bytes += ev.payload_len as u64;
}
let series = match series_2 {
Some(k) => k,
None => {

View File

@@ -118,6 +118,7 @@ pub struct EventAddRes {
pub status: u32,
pub subid: u32,
pub value: CaEventValue,
pub payload_len: u32,
}
#[derive(Debug)]
@@ -849,6 +850,7 @@ impl CaMsg {
status: hi.param1,
subid: hi.param2,
value,
payload_len: hi.payload_len() as u32,
};
let ty = CaMsgTy::EventAddRes(d);
CaMsg::from_ty_ts(ty, tsnow)

View File

@@ -118,13 +118,16 @@ struct StatsAReader {
impl StatsAReader {}
pub struct HistoLog2 {
histo: [AtomicU64; 16],
histo: [AtomicU64; 20],
sum: AtomicU64,
sub: u16,
}
macro_rules! rep16 {
([$x:expr]) => {
[$x, $x, $x, $x, $x, $x, $x, $x, $x, $x, $x, $x, $x, $x, $x, $x]
[
$x, $x, $x, $x, $x, $x, $x, $x, $x, $x, $x, $x, $x, $x, $x, $x, $x, $x, $x, $x,
]
};
}
@@ -132,12 +135,14 @@ impl HistoLog2 {
pub fn new(sub: u16) -> Self {
Self {
histo: rep16!([AtomicU64::new(0)]),
sum: AtomicU64::new(0),
sub,
}
}
#[inline]
pub fn ingest(&self, mut v: u32) {
self.sum.fetch_add(v as u64, AcqRel);
v >>= self.sub;
let mut po = 0;
while v != 0 && po < self.histo.len() - 1 {
@@ -148,6 +153,7 @@ impl HistoLog2 {
}
pub fn to_prometheus(&self, name: &str) -> String {
let base: u32 = 2;
let mut ret = String::with_capacity(2048);
ret.push_str("# HELP ");
ret.push_str(name);
@@ -156,29 +162,30 @@ impl HistoLog2 {
ret.push_str(name);
ret.push_str(" histogram\n");
let mut cnt = 0;
let mut sum = 0;
let lastix = (self.histo.len() - 1) as u32;
for (i, a) in self.histo.iter().enumerate() {
use std::ops::Sub;
let i = i as u32;
let le = 2u32.pow(i).sub(1);
let le = base.pow(i).sub(1);
let v = a.load(Acquire);
cnt += v;
sum += v * le as u64;
ret.push_str(name);
ret.push_str("_bucket{le=\"");
ret.push_str(&le.to_string());
if i == lastix {
ret.push_str("+Inf");
} else {
ret.push_str(&le.to_string());
}
ret.push_str("\"} ");
ret.push_str(&cnt.to_string());
ret.push_str("\n");
}
ret.push_str(name);
ret.push_str("_bucket{le=\"+Inf\"} ");
ret.push_str(&cnt.to_string());
ret.push_str("\n");
ret.push_str(name);
ret.push_str("_count ");
ret.push_str(&cnt.to_string());
ret.push_str("\n");
let sum = self.sum.load(Acquire);
ret.push_str(name);
ret.push_str("_sum ");
ret.push_str(&sum.to_string());
@@ -195,27 +202,34 @@ fn histo_00() {
// histo.ingest(2);
histo.ingest(3);
histo.ingest(4);
let exp = r##"# HELP latA help-text-missing
# TYPE latA histogram
latA_bucket{le="0"} 0
latA_bucket{le="1"} 0
latA_bucket{le="3"} 1
latA_bucket{le="7"} 2
latA_bucket{le="15"} 2
latA_bucket{le="31"} 2
latA_bucket{le="63"} 2
latA_bucket{le="127"} 2
latA_bucket{le="255"} 2
latA_bucket{le="511"} 2
latA_bucket{le="1023"} 2
latA_bucket{le="2047"} 2
latA_bucket{le="4095"} 2
latA_bucket{le="8191"} 2
latA_bucket{le="16383"} 2
latA_bucket{le="32767"} 2
latA_bucket{le="+Inf"} 2
latA_count 2
latA_sum 10
histo.ingest(262143);
histo.ingest(262144);
let s = histo.to_prometheus("the_metric");
// eprintln!("{s}");
let exp = r##"# HELP the_metric help-text-missing
# TYPE the_metric histogram
the_metric_bucket{le="0"} 0
the_metric_bucket{le="1"} 0
the_metric_bucket{le="3"} 1
the_metric_bucket{le="7"} 2
the_metric_bucket{le="15"} 2
the_metric_bucket{le="31"} 2
the_metric_bucket{le="63"} 2
the_metric_bucket{le="127"} 2
the_metric_bucket{le="255"} 2
the_metric_bucket{le="511"} 2
the_metric_bucket{le="1023"} 2
the_metric_bucket{le="2047"} 2
the_metric_bucket{le="4095"} 2
the_metric_bucket{le="8191"} 2
the_metric_bucket{le="16383"} 2
the_metric_bucket{le="32767"} 2
the_metric_bucket{le="65535"} 2
the_metric_bucket{le="131071"} 2
the_metric_bucket{le="262143"} 3
the_metric_bucket{le="+Inf"} 4
the_metric_count 4
the_metric_sum 524294
"##;
assert_eq!(histo.to_prometheus("latA"), exp);
assert_eq!(s, exp);
}