Export cpu and memory usage

This commit is contained in:
Dominik Werder
2025-04-15 11:49:43 +02:00
parent 5aa184266b
commit a653816ed2
5 changed files with 103 additions and 10 deletions

View File

@@ -8,6 +8,7 @@ edition = "2024"
default = []
# default = ["bsread"]
bsread = []
DISABLED = []
[dependencies]
clap = { version = "4.5.28", features = ["derive", "cargo"] }

View File

@@ -84,6 +84,8 @@ pub struct Daemon {
series_conf_by_id_tx: Sender<()>,
iqtx: Option<InsertQueuesTx>,
daemon_metrics: stats::mett::DaemonMetrics,
cpu_latest: u64,
rss_latest: u64,
}
impl Daemon {
@@ -417,6 +419,8 @@ impl Daemon {
series_conf_by_id_tx,
iqtx: Some(iqtx2),
daemon_metrics: stats::mett::DaemonMetrics::new(),
cpu_latest: 0,
rss_latest: 0,
};
Ok(ret)
}
@@ -442,10 +446,93 @@ impl Daemon {
Ok(())
}
fn get_cpu_usage() -> u64 {
let ret = match std::fs::read("/proc/self/stat") {
Ok(buf) => {
let line = String::from_utf8_lossy(&buf);
let a: Vec<_> = line.split(" ").collect();
let utime = if let Some(s) = a.get(13) {
match s.parse::<u64>() {
Ok(n) => Some(n),
Err(_) => None,
}
} else {
None
};
let stime = if let Some(s) = a.get(14) {
match s.parse::<u64>() {
Ok(n) => Some(n),
Err(_) => None,
}
} else {
None
};
match (utime, stime) {
(Some(utime), Some(stime)) => Some(utime + stime),
_ => None,
}
}
Err(_) => None,
};
ret.unwrap_or(0)
}
fn update_cpu_usage(&mut self) {
let cpu = Self::get_cpu_usage();
if cpu > self.cpu_latest {
let diff = cpu - self.cpu_latest;
self.cpu_latest = cpu;
self.daemon_metrics.proc_cpu_v0_inc().add(diff as u32);
} else if cpu < self.cpu_latest {
let diff = self.cpu_latest - cpu;
self.cpu_latest = cpu;
self.daemon_metrics.proc_cpu_v0_dec().add(diff as u32);
}
}
fn get_memory_usage() -> u64 {
let ret = match std::fs::read("/proc/self/statm") {
Ok(statm_line) => {
let statm_str = String::from_utf8_lossy(&statm_line);
let mut it = statm_str.split(" ");
if let Some(_) = it.next() {
if let Some(s) = it.next() {
match s.parse::<u64>() {
Ok(n) => {
let ps = unsafe { libc::sysconf(libc::_SC_PAGESIZE) } as u64;
Some(n * ps)
}
Err(_) => None,
}
} else {
None
}
} else {
None
}
}
Err(_) => None,
};
ret.unwrap_or(0)
}
fn update_memory_usage(&mut self) {
let rss = Self::get_memory_usage();
if rss > self.rss_latest {
let diff = rss - self.rss_latest;
self.rss_latest = rss;
self.daemon_metrics.proc_mem_rss_inc().add(diff as u32);
} else if rss < self.rss_latest {
let diff = self.rss_latest - rss;
self.rss_latest = rss;
self.daemon_metrics.proc_mem_rss_dec().add(diff as u32);
}
}
async fn handle_timer_tick(&mut self) -> Result<(), Error> {
if self.shutting_down {
let nworkers = self.insert_workers_running.load(atomic::Ordering::Acquire);
#[cfg(target_abi = "x32")]
#[cfg(feature = "DISABLED")]
{
let nitems = self
.query_item_tx_weak
@@ -496,6 +583,7 @@ impl Daemon {
.as_ref()
.map(|x| netfetch::metrics::types::InsertQueuesTxMetrics::from(x));
if let Some(iqtxm) = iqtxm {
// TODO metrics
self.stats().iqtx_len_st_rf1().set(iqtxm.st_rf1_len as _);
self.stats().iqtx_len_st_rf3().set(iqtxm.st_rf3_len as _);
self.stats().iqtx_len_mt_rf3().set(iqtxm.mt_rf3_len as _);
@@ -508,6 +596,8 @@ impl Daemon {
self.stats().iqtx_len_lt_rf3().set(2);
self.stats().iqtx_len_lt_rf3_lat5().set(2);
}
self.update_cpu_usage();
self.update_memory_usage();
Ok(())
}
@@ -733,7 +823,7 @@ impl Daemon {
GetMetrics(tx) => {
match tx.send((&self.daemon_metrics).into()).await {
Ok(()) => {}
Err(e) => {
Err(_) => {
error!("can not send metrics into channel");
}
}

View File

@@ -2738,7 +2738,6 @@ impl CaConn {
}
if st2.channel.ts_activity_last + conf.conf.expect_activity_within() < tsnow {
not_alive_count += 1;
self.mett.channel_not_alive_no_activity().inc();
} else {
alive_count += 1;
}
@@ -2828,14 +2827,12 @@ impl CaConn {
}
CaMsgTy::ReadNotifyRes(ev) => Self::handle_read_notify_res(self, ev, camsg.ts, tsnow)?,
CaMsgTy::Echo => {
// let addr = &self.remote_addr_dbg;
if let Some(started) = self.ioc_ping_start {
let dt = started.elapsed();
// TODO STATS
// self.stats.pong_recv_lat().ingest_dur_dms(dt);
self.mett.pong_recv_lat().push_dur_100us(dt);
} else {
let addr = &self.remote_addr_dbg;
warn!("received Echo even though we didn't asked for it {addr:?}");
warn!("received Echo even though we didn't asked for it {}", addr);
}
self.ioc_pong_last = Some(self.tmp_ts_poll);
self.ioc_ping_next = tsnow + Self::ioc_ping_ivl_rng(&mut self.rng);

View File

@@ -36,7 +36,6 @@ mod Metrics {
ca_msg_recv,
event_add_res_recv,
time_check_channels_state_init,
channel_not_alive_no_activity,
ping_no_proto,
ping_start,
pong_timeout,
@@ -65,6 +64,7 @@ mod Metrics {
poll_reloops,
poll_all_dt,
iiq_batch_len,
pong_recv_lat,
}
mod Compose {
type Input = ca_proto::mett::CaProtoMetrics;
@@ -90,4 +90,10 @@ mod Metrics {
type Input = ScyllaInsertWorker;
type Name = scy_inswork;
}
enum counters {
proc_cpu_v0_inc,
proc_cpu_v0_dec,
proc_mem_rss_inc,
proc_mem_rss_dec,
}
}

View File

@@ -394,14 +394,13 @@ stats_proc::stats_struct!((
recv_read_notify_but_no_longer_ready,
recv_read_notify_while_enabling_monitoring,
recv_read_notify_while_polling_idle,
channel_not_alive_no_activity,
monitor_stale_read_begin,
monitor_stale_read_timeout,
ca_proto_no_version_as_first,
ca_proto_version_later,
no_cid_for_subid,
),
histolog2s(poll_op3_dt, pong_recv_lat, ca_ts_off,),
histolog2s(poll_op3_dt, ca_ts_off,),
),
agg(name(CaConnStatsAgg), parent(CaConnStats)),
diff(name(CaConnStatsAggDiff), input(CaConnStatsAgg)),