From a653816ed200c3885fa3c34cbaf776d0504e5ff6 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Tue, 15 Apr 2025 11:49:43 +0200 Subject: [PATCH] Export cpu and memory usage --- daqingest/Cargo.toml | 1 + daqingest/src/daemon.rs | 94 ++++++++++++++++++++++++++++++++++++++++- netfetch/src/ca/conn.rs | 7 +-- stats/mettdecl.rs | 8 +++- stats/src/stats.rs | 3 +- 5 files changed, 103 insertions(+), 10 deletions(-) diff --git a/daqingest/Cargo.toml b/daqingest/Cargo.toml index 31a83e9..e1ad923 100644 --- a/daqingest/Cargo.toml +++ b/daqingest/Cargo.toml @@ -8,6 +8,7 @@ edition = "2024" default = [] # default = ["bsread"] bsread = [] +DISABLED = [] [dependencies] clap = { version = "4.5.28", features = ["derive", "cargo"] } diff --git a/daqingest/src/daemon.rs b/daqingest/src/daemon.rs index a1c3730..a55d7db 100644 --- a/daqingest/src/daemon.rs +++ b/daqingest/src/daemon.rs @@ -84,6 +84,8 @@ pub struct Daemon { series_conf_by_id_tx: Sender<()>, iqtx: Option, daemon_metrics: stats::mett::DaemonMetrics, + cpu_latest: u64, + rss_latest: u64, } impl Daemon { @@ -417,6 +419,8 @@ impl Daemon { series_conf_by_id_tx, iqtx: Some(iqtx2), daemon_metrics: stats::mett::DaemonMetrics::new(), + cpu_latest: 0, + rss_latest: 0, }; Ok(ret) } @@ -442,10 +446,93 @@ impl Daemon { Ok(()) } + fn get_cpu_usage() -> u64 { + let ret = match std::fs::read("/proc/self/stat") { + Ok(buf) => { + let line = String::from_utf8_lossy(&buf); + let a: Vec<_> = line.split(" ").collect(); + let utime = if let Some(s) = a.get(13) { + match s.parse::() { + Ok(n) => Some(n), + Err(_) => None, + } + } else { + None + }; + let stime = if let Some(s) = a.get(14) { + match s.parse::() { + Ok(n) => Some(n), + Err(_) => None, + } + } else { + None + }; + match (utime, stime) { + (Some(utime), Some(stime)) => Some(utime + stime), + _ => None, + } + } + Err(_) => None, + }; + ret.unwrap_or(0) + } + + fn update_cpu_usage(&mut self) { + let cpu = Self::get_cpu_usage(); + if cpu > self.cpu_latest { + let diff = cpu - self.cpu_latest; + self.cpu_latest = cpu; + self.daemon_metrics.proc_cpu_v0_inc().add(diff as u32); + } else if cpu < self.cpu_latest { + let diff = self.cpu_latest - cpu; + self.cpu_latest = cpu; + self.daemon_metrics.proc_cpu_v0_dec().add(diff as u32); + } + } + + fn get_memory_usage() -> u64 { + let ret = match std::fs::read("/proc/self/statm") { + Ok(statm_line) => { + let statm_str = String::from_utf8_lossy(&statm_line); + let mut it = statm_str.split(" "); + if let Some(_) = it.next() { + if let Some(s) = it.next() { + match s.parse::() { + Ok(n) => { + let ps = unsafe { libc::sysconf(libc::_SC_PAGESIZE) } as u64; + Some(n * ps) + } + Err(_) => None, + } + } else { + None + } + } else { + None + } + } + Err(_) => None, + }; + ret.unwrap_or(0) + } + + fn update_memory_usage(&mut self) { + let rss = Self::get_memory_usage(); + if rss > self.rss_latest { + let diff = rss - self.rss_latest; + self.rss_latest = rss; + self.daemon_metrics.proc_mem_rss_inc().add(diff as u32); + } else if rss < self.rss_latest { + let diff = self.rss_latest - rss; + self.rss_latest = rss; + self.daemon_metrics.proc_mem_rss_dec().add(diff as u32); + } + } + async fn handle_timer_tick(&mut self) -> Result<(), Error> { if self.shutting_down { let nworkers = self.insert_workers_running.load(atomic::Ordering::Acquire); - #[cfg(target_abi = "x32")] + #[cfg(feature = "DISABLED")] { let nitems = self .query_item_tx_weak @@ -496,6 +583,7 @@ impl Daemon { .as_ref() .map(|x| netfetch::metrics::types::InsertQueuesTxMetrics::from(x)); if let Some(iqtxm) = iqtxm { + // TODO metrics self.stats().iqtx_len_st_rf1().set(iqtxm.st_rf1_len as _); self.stats().iqtx_len_st_rf3().set(iqtxm.st_rf3_len as _); self.stats().iqtx_len_mt_rf3().set(iqtxm.mt_rf3_len as _); @@ -508,6 +596,8 @@ impl Daemon { self.stats().iqtx_len_lt_rf3().set(2); self.stats().iqtx_len_lt_rf3_lat5().set(2); } + self.update_cpu_usage(); + self.update_memory_usage(); Ok(()) } @@ -733,7 +823,7 @@ impl Daemon { GetMetrics(tx) => { match tx.send((&self.daemon_metrics).into()).await { Ok(()) => {} - Err(e) => { + Err(_) => { error!("can not send metrics into channel"); } } diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index 741563a..04e815c 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -2738,7 +2738,6 @@ impl CaConn { } if st2.channel.ts_activity_last + conf.conf.expect_activity_within() < tsnow { not_alive_count += 1; - self.mett.channel_not_alive_no_activity().inc(); } else { alive_count += 1; } @@ -2828,14 +2827,12 @@ impl CaConn { } CaMsgTy::ReadNotifyRes(ev) => Self::handle_read_notify_res(self, ev, camsg.ts, tsnow)?, CaMsgTy::Echo => { - // let addr = &self.remote_addr_dbg; if let Some(started) = self.ioc_ping_start { let dt = started.elapsed(); - // TODO STATS - // self.stats.pong_recv_lat().ingest_dur_dms(dt); + self.mett.pong_recv_lat().push_dur_100us(dt); } else { let addr = &self.remote_addr_dbg; - warn!("received Echo even though we didn't asked for it {addr:?}"); + warn!("received Echo even though we didn't asked for it {}", addr); } self.ioc_pong_last = Some(self.tmp_ts_poll); self.ioc_ping_next = tsnow + Self::ioc_ping_ivl_rng(&mut self.rng); diff --git a/stats/mettdecl.rs b/stats/mettdecl.rs index 5520b39..f89e9b1 100644 --- a/stats/mettdecl.rs +++ b/stats/mettdecl.rs @@ -36,7 +36,6 @@ mod Metrics { ca_msg_recv, event_add_res_recv, time_check_channels_state_init, - channel_not_alive_no_activity, ping_no_proto, ping_start, pong_timeout, @@ -65,6 +64,7 @@ mod Metrics { poll_reloops, poll_all_dt, iiq_batch_len, + pong_recv_lat, } mod Compose { type Input = ca_proto::mett::CaProtoMetrics; @@ -90,4 +90,10 @@ mod Metrics { type Input = ScyllaInsertWorker; type Name = scy_inswork; } + enum counters { + proc_cpu_v0_inc, + proc_cpu_v0_dec, + proc_mem_rss_inc, + proc_mem_rss_dec, + } } diff --git a/stats/src/stats.rs b/stats/src/stats.rs index 5f96356..282f024 100644 --- a/stats/src/stats.rs +++ b/stats/src/stats.rs @@ -394,14 +394,13 @@ stats_proc::stats_struct!(( recv_read_notify_but_no_longer_ready, recv_read_notify_while_enabling_monitoring, recv_read_notify_while_polling_idle, - channel_not_alive_no_activity, monitor_stale_read_begin, monitor_stale_read_timeout, ca_proto_no_version_as_first, ca_proto_version_later, no_cid_for_subid, ), - histolog2s(poll_op3_dt, pong_recv_lat, ca_ts_off,), + histolog2s(poll_op3_dt, ca_ts_off,), ), agg(name(CaConnStatsAgg), parent(CaConnStats)), diff(name(CaConnStatsAggDiff), input(CaConnStatsAgg)),