From 71fa333f755428d563382f24f2725eb32bbf1938 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Tue, 28 Nov 2023 15:55:43 +0100 Subject: [PATCH] Publish metrics also for metricbeat --- daqingest/src/daemon.rs | 4 ++- netfetch/src/conf.rs | 5 --- netfetch/src/metrics.rs | 65 ++++++++++++++++++++-------------- stats_proc/src/stats_proc.rs | 32 +++++++++++++++++ stats_types/Cargo.toml | 3 +- stats_types/src/stats_types.rs | 6 ++++ 6 files changed, 82 insertions(+), 33 deletions(-) diff --git a/daqingest/src/daemon.rs b/daqingest/src/daemon.rs index 4e0df3e..32bb82b 100644 --- a/daqingest/src/daemon.rs +++ b/daqingest/src/daemon.rs @@ -103,9 +103,10 @@ impl Daemon { // Insert queue hook // let query_item_rx = inserthook::active_channel_insert_hook(query_item_rx); + let local_epics_hostname = ingest_linux::net::local_hostname(); let conn_set_ctrl = CaConnSet::start( ingest_opts.backend().into(), - ingest_opts.local_epics_hostname(), + local_epics_hostname, query_item_tx, channel_info_query_tx, ingest_opts.clone(), @@ -557,6 +558,7 @@ static SIGTERM: AtomicUsize = AtomicUsize::new(0); static SHUTDOWN_SENT: AtomicUsize = AtomicUsize::new(0); fn handler_sigint(_a: libc::c_int, _b: *const libc::siginfo_t, _c: *const libc::c_void) { + std::process::exit(13); SIGINT.store(1, atomic::Ordering::Release); let _ = ingest_linux::signal::unset_signal_handler(libc::SIGINT); } diff --git a/netfetch/src/conf.rs b/netfetch/src/conf.rs index aa20044..4631d0d 100644 --- a/netfetch/src/conf.rs +++ b/netfetch/src/conf.rs @@ -31,7 +31,6 @@ pub struct CaIngestOpts { insert_worker_concurrency: Option, insert_scylla_sessions: Option, insert_item_queue_cap: Option, - local_epics_hostname: Option, store_workers_rate: Option, insert_frac: Option, use_rate_limit_queue: Option, @@ -95,10 +94,6 @@ impl CaIngestOpts { self.insert_item_queue_cap.unwrap_or(80000) } - pub fn local_epics_hostname(&self) -> String { - self.local_epics_hostname.clone().unwrap_or_else(local_hostname) - } - pub fn store_workers_rate(&self) -> u64 { self.store_workers_rate.unwrap_or(5000) } diff --git a/netfetch/src/metrics.rs b/netfetch/src/metrics.rs index b8e71a4..d00fb6e 100644 --- a/netfetch/src/metrics.rs +++ b/netfetch/src/metrics.rs @@ -72,6 +72,7 @@ impl IntoResponse for CustomErrorResponse { } } +#[derive(Clone)] pub struct StatsSet { daemon: Arc, ca_conn_set: Arc, @@ -230,6 +231,26 @@ impl DaemonComm { } } +fn metricbeat(stats_set: &StatsSet) -> axum::Json { + let mut map = serde_json::Map::new(); + map.insert("daemon".to_string(), stats_set.daemon.json()); + map.insert("insert_worker_stats".to_string(), stats_set.insert_worker_stats.json()); + let mut ret = serde_json::Map::new(); + ret.insert("daqingest".to_string(), serde_json::Value::Object(map)); + axum::Json(serde_json::Value::Object(ret)) +} + +fn metrics(stats_set: &StatsSet) -> String { + let s1 = stats_set.daemon.prometheus(); + let s2 = stats_set.ca_conn_set.prometheus(); + let s3 = stats_set.insert_worker_stats.prometheus(); + let s4 = stats_set.ca_conn.prometheus(); + let s5 = stats_set.series_by_channel_stats.prometheus(); + let s6 = stats_set.ca_proto.prometheus(); + let s7 = stats_set.ioc_finder_stats.prometheus(); + [s1, s2, s3, s4, s5, s6, s7].join("") +} + fn make_routes(dcom: Arc, connset_cmd_tx: Sender, stats_set: StatsSet) -> axum::Router { use axum::extract; use axum::routing::get; @@ -255,37 +276,29 @@ fn make_routes(dcom: Arc, connset_cmd_tx: Sender, st .route( "/metrics", get({ - // - || async move { - let s1 = stats_set.daemon.prometheus(); - let s2 = stats_set.ca_conn_set.prometheus(); - let s3 = stats_set.insert_worker_stats.prometheus(); - let s4 = stats_set.ca_conn.prometheus(); - let s5 = stats_set.series_by_channel_stats.prometheus(); - let s6 = stats_set.ca_proto.prometheus(); - let s7 = stats_set.ioc_finder_stats.prometheus(); - [s1, s2, s3, s4, s5, s6, s7].join("") - } + let stats_set = stats_set.clone(); + || async move { metrics(&stats_set) } + }), + ) + .route( + "/daqingest/metrics", + get({ + let stats_set = stats_set.clone(); + || async move { metrics(&stats_set) } + }), + ) + .route( + "/daqingest/metricbeat", + get({ + let stats_set = stats_set.clone(); + || async move { metricbeat(&stats_set) } }), ) .route( "/metricbeat", get({ - // - || async move { - axum::Json(serde_json::json!({ - "v1": 42_u32, - "o1": { - "v2": 56, - "o2": { - "v3": "test", - }, - }, - "o5": { - "v6": 89, - }, - })) - } + let stats_set = stats_set.clone(); + || async move { metricbeat(&stats_set) } }), ) .route( diff --git a/stats_proc/src/stats_proc.rs b/stats_proc/src/stats_proc.rs index b88a775..2467281 100644 --- a/stats_proc/src/stats_proc.rs +++ b/stats_proc/src/stats_proc.rs @@ -152,6 +152,36 @@ fn stats_struct_impl(st: &StatsStructDef) -> String { " ) }; + let fn_json = { + let mut buf = String::new(); + for x in &st.counters { + buf.push_str(&format!( + "ret.insert(\"{x}\".to_string(), Value::Number(Number::from(self.{x}.load())));\n" + )); + } + for x in &st.values { + buf.push_str(&format!( + "ret.insert(\"{x}\".to_string(), Value::Number(Number::from(self.{x}.load())));\n" + )); + } + for x in &st.histolog2s { + buf.push_str(&format!("let v = self.{x}.to_json(\"__dummyname__\");\n")); + buf.push_str(&format!("ret.insert(\"{x}\".to_string(), v);\n")); + } + format!( + " + pub fn json(&self) -> stats_types::serde_json::Value {{ + use serde_json::Map; + use serde_json::Number; + use serde_json::Value; + use stats_types::serde_json; + let mut ret = Map::new(); + // {buf} + Value::Object(ret) + }} + " + ) + }; let fn_snapshot = { let mut init_counters = String::new(); for x in &st.counters { @@ -193,6 +223,8 @@ impl {name} {{ {fn_prometheus} + {fn_json} + {fn_snapshot} }} diff --git a/stats_types/Cargo.toml b/stats_types/Cargo.toml index c7e011b..f468dda 100644 --- a/stats_types/Cargo.toml +++ b/stats_types/Cargo.toml @@ -7,4 +7,5 @@ edition = "2021" [lib] path = "src/stats_types.rs" -#[dependencies] +[dependencies] +serde_json = "1" diff --git a/stats_types/src/stats_types.rs b/stats_types/src/stats_types.rs index d52fa83..4eb8487 100644 --- a/stats_types/src/stats_types.rs +++ b/stats_types/src/stats_types.rs @@ -1,3 +1,5 @@ +pub use serde_json; + use std::sync::atomic::AtomicU64; use std::sync::atomic::Ordering::AcqRel; use std::sync::atomic::Ordering::Acquire; @@ -192,6 +194,10 @@ impl HistoLog2 { ret.push_str("\n"); ret } + + pub fn to_json(&self, _name: &str) -> serde_json::Value { + serde_json::Value::Null + } } #[test]