WIP refactor metrics

This commit is contained in:
Dominik Werder
2025-03-26 17:59:29 +01:00
parent 228eea4f90
commit ba4b444778
6 changed files with 217 additions and 27 deletions

View File

@@ -16,6 +16,7 @@ use netfetch::daemon_common::ChannelName;
use netfetch::daemon_common::DaemonEvent;
use netfetch::metrics::RoutesResources;
use netfetch::metrics::StatsSet;
use netfetch::metrics::types::DaemonMetrics;
use netfetch::throttletrace::ThrottleTrace;
use netpod::Database;
use netpod::ttl::RetentionTime;
@@ -85,6 +86,7 @@ pub struct Daemon {
// TODO
series_conf_by_id_tx: Sender<()>,
iqtx: Option<InsertQueuesTx>,
daemon_metrics: DaemonMetrics,
}
impl Daemon {
@@ -385,6 +387,7 @@ impl Daemon {
channel_info_query_tx,
series_conf_by_id_tx,
iqtx: Some(iqtx2),
daemon_metrics: DaemonMetrics::new(),
};
Ok(ret)
}
@@ -561,6 +564,9 @@ impl Daemon {
error!("error from CaConnSet: {e}");
self.handle_shutdown().await?;
}
Metrics(metrics) => {
self.daemon_metrics.ingest_ca_conn_set(metrics);
}
}
Ok(())
}
@@ -695,6 +701,15 @@ impl Daemon {
CaConnSetItem(item) => self.handle_ca_conn_set_item(item).await,
Shutdown => self.handle_shutdown().await,
ConfigReload(tx) => self.handle_config_reload(tx).await,
GetMetrics(tx) => {
match tx.send((&self.daemon_metrics).into()).await {
Ok(()) => {}
Err(e) => {
error!("can not send metrics into channel");
}
}
Ok(())
}
};
let dt = ts1.elapsed();
if dt > Duration::from_millis(200) {

View File

@@ -1112,7 +1112,7 @@ pub struct CaConn {
read_ioids: HashMap<Ioid, Cid>,
handler_by_ioid: HashMap<Ioid, Option<Pin<Box<dyn ConnFuture>>>>,
trace_channel_poll: bool,
ca_msg_recv_count: u64,
ca_msg_recv_cnt: u64,
ca_version_recv_count: u64,
ts_channel_status_pong_last: Instant,
}
@@ -1179,7 +1179,7 @@ impl CaConn {
read_ioids: HashMap::new(),
handler_by_ioid: HashMap::new(),
trace_channel_poll: false,
ca_msg_recv_count: 0,
ca_msg_recv_cnt: 0,
ca_version_recv_count: 0,
ts_channel_status_pong_last: tsnow,
}
@@ -2810,9 +2810,9 @@ impl CaConn {
}
}
CaMsgTy::VersionRes(_) => {
if self.ca_msg_recv_count != 0 {
// TODO must check that version is only ever the first message.
if false {
self.stats.ca_proto_version_later().inc();
// TODO emit log or count stats
}
}
CaMsgTy::ChannelCloseRes(x) => {
@@ -2825,12 +2825,11 @@ impl CaConn {
}
CaItem::Empty => {}
}
if self.ca_msg_recv_count == 0 {
if self.ca_version_recv_count == 0 {
self.stats.ca_proto_no_version_as_first().inc();
}
if false {
// TODO use flags to keep track of whether we have seen version or other msgs.
self.stats.ca_proto_no_version_as_first().inc();
}
self.ca_msg_recv_count += 1;
self.ca_msg_recv_cnt = self.ca_msg_recv_cnt.saturating_add(1);
Ready(Some(Ok(())))
}
Ready(Some(Err(e))) => {
@@ -3184,17 +3183,14 @@ impl CaConn {
Ok(())
}
fn housekeeping_self(&mut self) {
let cnt_max = 0xfffffff000000000;
if self.ca_msg_recv_count > cnt_max {
let mask = !cnt_max;
self.ca_msg_recv_count &= mask;
}
}
fn housekeeping_self(&mut self) {}
fn metrics_emit(&mut self) {
let ca_msg_recv_cnt = self.ca_msg_recv_cnt;
self.ca_msg_recv_cnt = 0;
let item = CaConnMetrics {
ca_conn_event_out_queue_len: self.ca_conn_event_out_queue.len(),
ca_msg_recv_cnt,
};
let item = CaConnEvent::new(Instant::now(), CaConnEventValue::Metrics(item));
self.ca_conn_event_out_queue.push_back(item);

View File

@@ -59,6 +59,8 @@ use std::net::SocketAddr;
use std::net::SocketAddrV4;
use std::pin::Pin;
use crate::metrics::types::CaConnMetricsAgg;
use crate::metrics::types::CaConnSetMetrics;
use crate::queueset::QueueSet;
use netpod::OnDrop;
use netpod::TsNano;
@@ -257,6 +259,7 @@ impl CaConnSetEvent {
pub enum CaConnSetItem {
Error(Error),
Healthy,
Metrics(crate::metrics::types::CaConnSetMetrics),
}
pub struct CaConnSetCtrl {
@@ -457,6 +460,7 @@ pub struct CaConnSet {
rogue_channel_count: u64,
connect_fail_count: usize,
cssid_latency_max: Duration,
ca_connset_metrics: CaConnSetMetrics,
}
impl CaConnSet {
@@ -530,6 +534,7 @@ impl CaConnSet {
rogue_channel_count: 0,
connect_fail_count: 0,
cssid_latency_max: Duration::from_millis(2000),
ca_connset_metrics: CaConnSetMetrics::new(),
};
// TODO await on jh
let jh = tokio::spawn(CaConnSet::run(connset));
@@ -765,7 +770,7 @@ impl CaConnSet {
CaConnEventValue::EndOfStream(reason) => self.handle_ca_conn_eos(addr, reason),
CaConnEventValue::ChannelRemoved(name) => self.handle_ca_conn_channel_removed(addr, name),
CaConnEventValue::Metrics(v) => {
// TODO aggregate metrics and stats
self.ca_connset_metrics.ca_conn_agg.ingest(v);
Ok(())
}
}
@@ -1440,7 +1445,8 @@ impl CaConnSet {
| CaConnEventValue::EchoTimeout
| CaConnEventValue::ConnCommandResult(..)
| CaConnEventValue::ChannelCreateFail(..)
| CaConnEventValue::ChannelStatus(..) => {
| CaConnEventValue::ChannelStatus(..)
| CaConnEventValue::Metrics(..) => {
if let Err(e) = tx1.send((addr, item)).await {
error!("channel send {:?}", e);
return Err(e.into());
@@ -1456,9 +1462,6 @@ impl CaConnSet {
return Err(e.into());
}
}
CaConnEventValue::Metrics(_) => {
// TODO merge metrics
}
}
}
if let Some(x) = eos_reason {
@@ -1878,6 +1881,11 @@ impl CaConnSet {
self.storage_insert_lt_qu.push_back(a);
}
}
{
let metrics = std::mem::replace(&mut self.ca_connset_metrics, CaConnSetMetrics::new());
let item = CaConnSetItem::Metrics(metrics);
self.connset_out_queue.push_back(item);
}
Ok(())
}
}
@@ -2009,10 +2017,6 @@ impl Stream for CaConnSet {
break Ready(Some(CaConnSetItem::Error(e)));
}
if let Some(item) = self.connset_out_queue.pop_front() {
break Ready(Some(item));
}
match self.ticker.poll_unpin(cx) {
Ready(()) => match self.as_mut().handle_own_ticker_tick(cx) {
Ok(()) => {
@@ -2028,6 +2032,10 @@ impl Stream for CaConnSet {
}
}
if let Some(item) = self.connset_out_queue.pop_front() {
break Ready(Some(item));
}
if let Some((addr, jh)) = self.await_ca_conn_jhs.front_mut() {
match jh.poll_unpin(cx) {
Ready(x) => {

View File

@@ -1,5 +1,6 @@
use crate::ca::connset::CaConnSetItem;
use crate::conf::ChannelConfig;
use crate::metrics::types::MetricsPrometheusShort;
use async_channel::Sender;
use serde::Serialize;
@@ -26,7 +27,8 @@ pub enum DaemonEvent {
ChannelCommand(crate::ca::connset::ChannelCommand),
CaConnSetItem(CaConnSetItem),
Shutdown,
ConfigReload(async_channel::Sender<u64>),
ConfigReload(Sender<u64>),
GetMetrics(Sender<MetricsPrometheusShort>),
}
impl DaemonEvent {
@@ -40,6 +42,7 @@ impl DaemonEvent {
CaConnSetItem(_) => format!("CaConnSetItem"),
Shutdown => format!("Shutdown"),
ConfigReload(..) => format!("ConfigReload"),
GetMetrics(..) => format!("GetMetrics"),
}
}
}

View File

@@ -202,6 +202,18 @@ async fn config_reload(dcom: Arc<DaemonComm>) -> Result<axum::Json<serde_json::V
}
}
async fn metrics2(dcom: Arc<DaemonComm>) -> Result<String, Response> {
let (tx, rx) = async_channel::bounded(1);
let item = DaemonEvent::GetMetrics(tx);
dcom.tx.send(item).await;
match rx.recv().await {
Ok(x) => Ok(x.prometheus()),
Err(e) => Err(Error::with_public_msg_no_trace(e.to_string())
.to_public_err_msg()
.into_response()),
}
}
async fn find_channel(
params: HashMap<String, String>,
dcom: Arc<DaemonComm>,
@@ -426,8 +438,24 @@ fn make_routes(
Router::new().fallback(|| async { StatusCode::NOT_FOUND }).route(
"/",
get({
let dcom = dcom.clone();
let stats_set = stats_set.clone();
|| async move { metrics(&stats_set) }
|| async move {
let prom2 = metrics2(dcom).await.unwrap_or(String::new());
let mut s = metrics(&stats_set);
s.push_str(&prom2);
s
}
}),
),
)
.nest(
"/metrics2",
Router::new().fallback(|| async { StatusCode::NOT_FOUND }).route(
"/",
get({
let dcom = dcom.clone();
|| metrics2(dcom)
}),
),
)

View File

@@ -3,7 +3,98 @@ use serde::Serialize;
#[derive(Debug, Serialize)]
pub struct CaConnMetrics {
// a value
pub ca_conn_event_out_queue_len: usize,
// a term of a running counter sum
pub ca_msg_recv_cnt: u64,
}
#[derive(Debug, Serialize)]
pub struct CaConnMetricsAgg {
// derived from values
pub ca_conn_event_out_queue_len_max: usize,
// derived from counter terms
pub ca_msg_recv_cnt_all: u64,
}
impl CaConnMetricsAgg {
pub fn new() -> Self {
Self {
ca_conn_event_out_queue_len_max: 0,
ca_msg_recv_cnt_all: 0,
}
}
pub fn ingest(&mut self, inp: CaConnMetrics) {
self.ca_conn_event_out_queue_len_max = self
.ca_conn_event_out_queue_len_max
.max(inp.ca_conn_event_out_queue_len);
self.ca_msg_recv_cnt_all += inp.ca_msg_recv_cnt;
}
}
#[derive(Debug, Serialize)]
pub struct CaConnMetricsAggAgg {
// derived from values
pub ca_conn_event_out_queue_len_max: usize,
// derived from counter terms
pub ca_msg_recv_cnt_all: u64,
}
impl CaConnMetricsAggAgg {
pub fn new() -> Self {
Self {
ca_conn_event_out_queue_len_max: 0,
ca_msg_recv_cnt_all: 0,
}
}
pub fn ingest(&mut self, inp: CaConnMetricsAgg) {
// take again the max of the maxs
self.ca_conn_event_out_queue_len_max = self
.ca_conn_event_out_queue_len_max
.max(inp.ca_conn_event_out_queue_len_max);
// sum up again to a total
self.ca_msg_recv_cnt_all += inp.ca_msg_recv_cnt_all;
}
}
#[derive(Debug, Serialize)]
pub struct CaConnSetMetrics {
pub ca_conn_agg: CaConnMetricsAgg,
}
impl CaConnSetMetrics {
pub fn new() -> Self {
Self {
ca_conn_agg: CaConnMetricsAgg::new(),
}
}
}
#[derive(Debug, Serialize)]
pub struct CaConnSetAggMetrics {
pub ca_conn_agg_agg: CaConnMetricsAggAgg,
}
impl CaConnSetAggMetrics {
pub fn new() -> Self {
Self {
ca_conn_agg_agg: CaConnMetricsAggAgg::new(),
}
}
pub fn ingest(&mut self, inp: CaConnSetMetrics) {
{
let src = inp.ca_conn_agg;
let dst = &mut self.ca_conn_agg_agg;
// take again the max of the maxs
dst.ca_conn_event_out_queue_len_max = dst
.ca_conn_event_out_queue_len_max
.max(src.ca_conn_event_out_queue_len_max);
dst.ca_msg_recv_cnt_all += src.ca_msg_recv_cnt_all;
}
}
}
pub struct InsertQueuesTxMetrics {
@@ -25,3 +116,52 @@ impl From<&InsertQueuesTx> for InsertQueuesTxMetrics {
}
}
}
#[derive(Debug, Serialize)]
pub struct DaemonMetrics {
pub ca_conn_set_agg: CaConnSetAggMetrics,
}
impl DaemonMetrics {
pub fn new() -> Self {
Self {
ca_conn_set_agg: CaConnSetAggMetrics::new(),
}
}
pub fn ingest_ca_conn_set(&mut self, inp: CaConnSetMetrics) {
self.ca_conn_set_agg.ingest(inp);
}
}
#[derive(Debug, Serialize)]
pub struct MetricsPrometheusShort {
// derived from values
pub ca_conn_event_out_queue_len_max: usize,
// derived from counter terms
pub ca_msg_recv_cnt_all: u64,
}
impl MetricsPrometheusShort {
pub fn prometheus(&self) -> String {
use std::fmt::Write;
let mut s = String::new();
write!(
&mut s,
"ca_conn_event_out_queue_len_max {}\n",
self.ca_conn_event_out_queue_len_max
)
.unwrap();
write!(&mut s, "ca_msg_recv_all_sum {}\n", self.ca_msg_recv_cnt_all).unwrap();
s
}
}
impl From<&DaemonMetrics> for MetricsPrometheusShort {
fn from(value: &DaemonMetrics) -> Self {
Self {
ca_conn_event_out_queue_len_max: value.ca_conn_set_agg.ca_conn_agg_agg.ca_conn_event_out_queue_len_max,
ca_msg_recv_cnt_all: value.ca_conn_set_agg.ca_conn_agg_agg.ca_msg_recv_cnt_all,
}
}
}