diff --git a/.cargo/cargo-lock b/.cargo/cargo-lock index 1560380..96bd0e4 100644 --- a/.cargo/cargo-lock +++ b/.cargo/cargo-lock @@ -87,9 +87,9 @@ dependencies = [ [[package]] name = "async-trait" -version = "0.1.61" +version = "0.1.62" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "705339e0e4a9690e2908d2b3d049d85682cf19fbd5782494498fbf7003a6a282" +checksum = "689894c2db1ea643a50834b999abf1c110887402542955ff5451dab8f861f9ed" dependencies = [ "proc-macro2", "quote", @@ -209,9 +209,9 @@ dependencies = [ [[package]] name = "bumpalo" -version = "3.11.1" +version = "3.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "572f695136211188308f16ad2ca5c851a712c464060ae6974944458eb83880ba" +checksum = "0d261e256854913907f67ed06efbc3338dfe6179796deefc1ff763fc1aee5535" [[package]] name = "byteorder" @@ -400,9 +400,9 @@ dependencies = [ [[package]] name = "cxx" -version = "1.0.86" +version = "1.0.87" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "51d1075c37807dcf850c379432f0df05ba52cc30f279c5cfc43cc221ce7f8579" +checksum = "b61a7545f753a88bcbe0a70de1fcc0221e10bfc752f576754fa91e663db1622e" dependencies = [ "cc", "cxxbridge-flags", @@ -412,9 +412,9 @@ dependencies = [ [[package]] name = "cxx-build" -version = "1.0.86" +version = "1.0.87" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5044281f61b27bc598f2f6647d480aed48d2bf52d6eb0b627d84c0361b17aa70" +checksum = "f464457d494b5ed6905c63b0c4704842aba319084a0a3561cdc1359536b53200" dependencies = [ "cc", "codespan-reporting", @@ -427,15 +427,15 @@ dependencies = [ [[package]] name = "cxxbridge-flags" -version = "1.0.86" +version = "1.0.87" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "61b50bc93ba22c27b0d31128d2d130a0a6b3d267ae27ef7e4fae2167dfe8781c" +checksum = "43c7119ce3a3701ed81aca8410b9acf6fc399d2629d057b87e2efa4e63a3aaea" [[package]] name = "cxxbridge-macro" -version = "1.0.86" +version = "1.0.87" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "39e61fda7e62115119469c7b3591fd913ecca96fb766cfd3f2e2502ab7bc87a5" +checksum = "65e07508b90551e610910fa648a1878991d367064997a596135b86df30daf07e" dependencies = [ "proc-macro2", "quote", @@ -1412,9 +1412,9 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.49" +version = "1.0.50" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "57a8eca9f9c4ffde41714334dee777596264c7825420f521abc92b5b5deb63a5" +checksum = "6ef7d57beacfaf2d8aee5937dab7b7f28de3cb8b1828479bb5de2a7106f2bae2" dependencies = [ "unicode-ident", ] @@ -1556,9 +1556,9 @@ checksum = "7ef03e0a2b150c7a90d01faf6254c9c48a41e95fb2a8c2ac1c6f0d2b9aefc342" [[package]] name = "rustix" -version = "0.36.6" +version = "0.36.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4feacf7db682c6c329c4ede12649cd36ecab0f3be5b7d74e6a20304725db4549" +checksum = "d4fdebc4b395b7fbb9ab11e462e20ed9051e7b16e42d24042c776eca0ac81b03" dependencies = [ "bitflags", "errno", @@ -1719,9 +1719,9 @@ dependencies = [ [[package]] name = "serde_yaml" -version = "0.9.16" +version = "0.9.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "92b5b431e8907b50339b51223b97d102db8d987ced36f6e4d03621db9316c834" +checksum = "8fb06d4b6cdaef0e0c51fa881acb721bed3c924cfaa71d9c94a3b771dfdf6567" dependencies = [ "indexmap", "itoa", @@ -1984,9 +1984,9 @@ checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c" [[package]] name = "tokio" -version = "1.24.1" +version = "1.24.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1d9f76183f91ecfb55e1d7d5602bd1d979e38a3a522fe900241cf195624d67ae" +checksum = "597a12a59981d9e3c38d216785b0c37399f6e415e8d0712047620f189371b0bb" dependencies = [ "autocfg", "bytes", @@ -2073,9 +2073,9 @@ dependencies = [ [[package]] name = "toml" -version = "0.5.10" +version = "0.5.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1333c76748e868a4d9d1017b5ab53171dfd095f70c712fdb4653a406547f598f" +checksum = "f4f7f0dd8d50a853a531c426359045b1998f04219d88799810762cd4ad314234" dependencies = [ "serde", ] @@ -2261,9 +2261,9 @@ checksum = "497961ef93d974e23eb6f433eb5fe1b7930b659f06d12dec6fc44a8f554c0bba" [[package]] name = "unicode-bidi" -version = "0.3.8" +version = "0.3.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "099b7128301d285f79ddd55b9a83d5e6b9e97c92e0ea0daebee7263e932de992" +checksum = "0046be40136ef78dc325e0edefccf84ccddacd0afcc1ca54103fa3c61bbdab1d" [[package]] name = "unicode-ident" diff --git a/daqingest/src/daemon.rs b/daqingest/src/daemon.rs index 6769bf5..75cad77 100644 --- a/daqingest/src/daemon.rs +++ b/daqingest/src/daemon.rs @@ -4,6 +4,7 @@ use err::Error; use futures_util::FutureExt; use futures_util::StreamExt; use log::*; +use netfetch::ca::conn::CaConnEvent; use netfetch::ca::conn::ConnCommand; use netfetch::ca::connset::CaConnSet; use netfetch::ca::findioc::FindIocRes; @@ -17,10 +18,12 @@ use netfetch::daemon_common::DaemonEvent; use netfetch::errconv::ErrConv; use netfetch::insertworker::Ttls; use netfetch::metrics::ExtraInsertsConf; +use netfetch::metrics::StatsSet; use netfetch::store::CommonInsertItemQueue; use netpod::Database; use netpod::ScyllaConfig; use serde::Serialize; +use stats::DaemonStats; use std::collections::BTreeMap; use std::collections::HashMap; use std::collections::VecDeque; @@ -153,17 +156,19 @@ pub struct ChannelState { value: ChannelStateValue, } -#[derive(Debug, Serialize)] -pub enum CaConnStateValue {} +#[derive(Debug)] +pub enum CaConnStateValue { + Fresh, + HadFeedback, + Shutdown { since: Instant }, +} #[derive(Debug)] pub struct CaConnState { last_feedback: Instant, - #[allow(unused)] value: CaConnStateValue, } -#[derive(Debug, Clone)] pub struct DaemonOpts { backend: String, local_epics_hostname: String, @@ -248,6 +253,7 @@ pub struct Daemon { pg_client: Arc, ingest_commons: Arc, caconn_last_channel_check: Instant, + stats: Arc, } impl Daemon { @@ -409,6 +415,7 @@ impl Daemon { pg_client, ingest_commons, caconn_last_channel_check: Instant::now(), + stats: Arc::new(DaemonStats::new()), }; Ok(ret) } @@ -662,23 +669,77 @@ impl Daemon { (qtx, ioc_finder_jh) } + fn stats(&self) -> &Arc { + &self.stats + } + async fn check_chans(&mut self) -> Result<(), Error> { { let tsnow = Instant::now(); - for (k, v) in &self.connection_states { - // TODO check for delta t since last issued status command. - if tsnow.duration_since(v.last_feedback) > Duration::from_millis(20000) { - error!("CaConn without status feedback {k:?}"); + for (k, v) in &mut self.connection_states { + match v.value { + CaConnStateValue::Fresh => { + // TODO check for delta t since last issued status command. + if tsnow.duration_since(v.last_feedback) > Duration::from_millis(20000) { + error!("TODO send connection-close for {k:?}"); + self.stats.ca_conn_status_feedback_timeout_inc(); + v.value = CaConnStateValue::Shutdown { since: tsnow }; + } + } + CaConnStateValue::HadFeedback => { + // TODO check for delta t since last issued status command. + if tsnow.duration_since(v.last_feedback) > Duration::from_millis(20000) { + error!("TODO send connection-close for {k:?}"); + self.stats.ca_conn_status_feedback_timeout_inc(); + v.value = CaConnStateValue::Shutdown { since: tsnow }; + } + } + CaConnStateValue::Shutdown { since } => { + if tsnow.saturating_duration_since(since) > Duration::from_millis(10000) { + self.stats.critical_error_inc(); + error!("Shutdown of CaConn to {} failed", k); + } + } } } } let mut currently_search_pending = 0; { + let mut with_address_count = 0; + let mut without_address_count = 0; for (_ch, st) in &self.channel_states { - if let ChannelStateValue::Active(ActiveChannelState::SearchPending { .. }) = &st.value { - currently_search_pending += 1; + match &st.value { + ChannelStateValue::Active(st2) => match st2 { + ActiveChannelState::UnknownAddress { since: _ } => { + without_address_count += 1; + } + ActiveChannelState::SearchPending { since: _, did_send: _ } => { + currently_search_pending += 1; + without_address_count += 1; + } + ActiveChannelState::WithAddress { addr: _, state } => match state { + WithAddressState::Unassigned { assign_at: _ } => { + with_address_count += 1; + } + WithAddressState::Assigned(_) => { + with_address_count += 1; + } + }, + ActiveChannelState::NoAddress { since: _ } => { + without_address_count += 1; + } + }, + ChannelStateValue::ToRemove { addr: _ } => { + with_address_count += 1; + } } } + self.stats + .channel_with_address + .store(with_address_count, atomic::Ordering::Release); + self.stats + .channel_without_address + .store(without_address_count, atomic::Ordering::Release); } let k = self.chan_check_next.take(); trace!("------------ check_chans start at {:?}", k); @@ -745,6 +806,13 @@ impl Daemon { value: ConnectionStateValue::Unconnected, }; *state = WithAddressState::Assigned(cs); + self.connection_states.entry(*addr).or_insert_with(|| { + let t = CaConnState { + last_feedback: Instant::now(), + value: CaConnStateValue::Fresh, + }; + t + }); } } Assigned(_) => { @@ -838,7 +906,17 @@ impl Daemon { Ok(()) } + async fn ca_conn_send_shutdown(&mut self) -> Result<(), Error> { + warn!("send shutdown to all ca connections"); + self.ingest_commons + .ca_conn_set + .enqueue_command_to_all(|| ConnCommand::shutdown()) + .await?; + Ok(()) + } + async fn handle_timer_tick(&mut self) -> Result<(), Error> { + self.stats.handle_timer_tick_count_inc(); let ts1 = Instant::now(); let tsnow = SystemTime::now(); if SIGINT.load(atomic::Ordering::Acquire) == 1 { @@ -936,6 +1014,7 @@ impl Daemon { SEARCH_ANS_COUNT.fetch_add(ress.len(), atomic::Ordering::AcqRel); for res in ress { if let Some(addr) = &res.addr { + self.stats.ioc_search_some_inc(); let ch = Channel::new(res.channel); if let Some(st) = self.channel_states.get_mut(&ch) { if let ChannelStateValue::Active(ActiveChannelState::SearchPending { since, did_send: _ }) = @@ -989,6 +1068,7 @@ impl Daemon { } } Err(e) => { + self.stats.ioc_search_err_inc(); error!("error from search: {e}"); } } @@ -998,7 +1078,7 @@ impl Daemon { async fn handle_ca_conn_done(&mut self, conn_addr: SocketAddrV4) -> Result<(), Error> { info!("handle_ca_conn_done {conn_addr:?}"); self.connection_states.remove(&conn_addr); - for (k, v) in self.channel_states.iter_mut() { + for (_k, v) in self.channel_states.iter_mut() { match &v.value { ChannelStateValue::Active(st2) => match st2 { ActiveChannelState::UnknownAddress { .. } => {} @@ -1022,8 +1102,45 @@ impl Daemon { Ok(()) } + async fn handle_ca_conn_event(&mut self, addr: SocketAddrV4, item: CaConnEvent) -> Result<(), Error> { + self.stats.event_ca_conn_inc(); + use netfetch::ca::conn::CaConnEventValue::*; + match item.value { + None => { + // TODO count, maybe reduce. + Ok(()) + } + EchoTimeout => { + self.stats.ca_echo_timeout_total_inc(); + error!("TODO on EchoTimeout remove the CaConn and reset channels"); + Ok(()) + } + ConnCommandResult(item) => { + self.stats.todo_mark_inc(); + use netfetch::ca::conn::ConnCommandResultKind::*; + match &item.kind { + CheckHealth => { + if let Some(st) = self.connection_states.get_mut(&addr) { + self.stats.ca_conn_status_feedback_recv_inc(); + st.last_feedback = Instant::now(); + Ok(()) + } else { + self.stats.ca_conn_status_feedback_no_dst_inc(); + Ok(()) + } + } + } + } + EndOfStream => { + self.stats.ca_conn_status_done_inc(); + self.handle_ca_conn_done(addr).await + } + } + } + async fn handle_event(&mut self, item: DaemonEvent, ticker_inp_tx: &Sender) -> Result<(), Error> { use DaemonEvent::*; + self.stats.events_inc(); let ts1 = Instant::now(); let item_summary = item.summary(); let ret = match item { @@ -1032,6 +1149,7 @@ impl Daemon { match ticker_inp_tx.send(42).await { Ok(_) => {} Err(_) => { + self.stats.ticker_token_release_error_inc(); error!("can not send ticker token"); return Err(Error::with_msg_no_trace("can not send ticker token")); } @@ -1041,34 +1159,7 @@ impl Daemon { ChannelAdd(ch) => self.handle_channel_add(ch), ChannelRemove(ch) => self.handle_channel_remove(ch), SearchDone(item) => self.handle_search_done(item).await, - CaConnEvent(addr, item) => { - use netfetch::ca::conn::CaConnEventValue::*; - match item.value { - None => { - // TODO count, maybe reduce. - Ok(()) - } - EchoTimeout => { - error!("TODO on EchoTimeout remove the CaConn and reset channels"); - Ok(()) - } - HealthCheckDone => { - if let Some(st) = self.connection_states.get_mut(&addr) { - st.last_feedback = Instant::now(); - Ok(()) - } else { - error!("received HealthCheckDone for unknown CaConn"); - // TODO - Ok(()) - } - } - ConnCommandResult(item) => { - info!("TODO handle ConnCommandResult {item:?}"); - Ok(()) - } - EndOfStream => self.handle_ca_conn_done(addr).await, - } - } + CaConnEvent(addr, item) => self.handle_ca_conn_event(addr, item).await, }; let dt = ts1.elapsed(); if dt > Duration::from_millis(200) { @@ -1081,6 +1172,7 @@ impl Daemon { let (ticker_inp_tx, ticker_inp_rx) = async_channel::bounded::(1); let ticker = { let tx = self.tx.clone(); + let stats = self.stats.clone(); async move { loop { tokio::time::sleep(Duration::from_millis(100)).await; @@ -1092,7 +1184,10 @@ impl Daemon { for _ in 0..c { match ticker_inp_rx.recv().await { Ok(_) => {} - Err(_) => break, + Err(_) => { + stats.ticker_token_acquire_error_inc(); + break; + } } } } @@ -1161,9 +1256,14 @@ pub async fn run(opts: CaIngestOpts, channels: Vec) -> Result<(), Error> }; let mut daemon = Daemon::new(opts2).await?; let tx = daemon.tx.clone(); + let daemon_stats = daemon.stats().clone(); - let dcom = Arc::new(netfetch::metrics::DaemonComm::dummy()); - netfetch::metrics::start_metrics_service(opts.api_bind(), dcom); + let dcom = Arc::new(netfetch::metrics::DaemonComm::new(tx.clone())); + let metrics_jh = { + let stats_set = StatsSet::new(daemon_stats); + let fut = netfetch::metrics::start_metrics_service(opts.api_bind(), dcom, stats_set); + tokio::task::spawn(fut) + }; let daemon_jh = taskrun::spawn(async move { // TODO handle Err @@ -1175,5 +1275,8 @@ pub async fn run(opts: CaIngestOpts, channels: Vec) -> Result<(), Error> } info!("all channels sent to daemon"); daemon_jh.await.unwrap(); + if false { + metrics_jh.await.unwrap(); + } Ok(()) } diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index b9f54e0..3039aeb 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -338,15 +338,14 @@ pub enum ConnCommandResultKind { #[derive(Debug)] pub struct ConnCommandResult { - id: usize, - kind: ConnCommandResultKind, + pub id: usize, + pub kind: ConnCommandResultKind, } #[derive(Debug)] pub enum CaConnEventValue { None, EchoTimeout, - HealthCheckDone, ConnCommandResult(ConnCommandResult), EndOfStream, } @@ -423,6 +422,7 @@ pub struct CaConn { ioc_ping_last: Instant, ioc_ping_start: Option, cmd_res_queue: VecDeque, + ca_conn_event_out_queue: VecDeque, channel_set_ops: Arc, channel_info_query_tx: Sender, series_lookup_schedule: BTreeMap, @@ -474,6 +474,7 @@ impl CaConn { ioc_ping_last: Instant::now(), ioc_ping_start: None, cmd_res_queue: VecDeque::new(), + ca_conn_event_out_queue: VecDeque::new(), channel_set_ops: Arc::new(ChannelSetOps { ops: StdMutex::new(BTreeMap::new()), flag: AtomicUsize::new(0), @@ -528,8 +529,13 @@ impl CaConn { self.trigger_shutdown(); } } - //self.stats.caconn_command_can_not_reply_inc(); // TODO return the result + let res = ConnCommandResult { + id: 0, + kind: ConnCommandResultKind::CheckHealth, + }; + self.cmd_res_queue.push_back(res); + //self.stats.caconn_command_can_not_reply_inc(); } fn cmd_find_channel(&self, pattern: &str) { @@ -811,6 +817,11 @@ impl CaConn { if let Some(started) = self.ioc_ping_start { if started.elapsed() > Duration::from_millis(4000) { warn!("Echo timeout {addr:?}", addr = self.remote_addr_dbg); + let item = CaConnEvent { + ts: Instant::now(), + value: CaConnEventValue::EchoTimeout, + }; + self.ca_conn_event_out_queue.push_back(item); self.trigger_shutdown(); } } else { @@ -1703,6 +1714,8 @@ impl Stream for CaConn { ts: Instant::now(), value: CaConnEventValue::ConnCommandResult(item), }))) + } else if let Some(item) = self.ca_conn_event_out_queue.pop_front() { + Ready(Some(Ok(item))) } else { let mut i1 = 0; let ret = loop { diff --git a/netfetch/src/metrics.rs b/netfetch/src/metrics.rs index f9c4502..1d5d6ff 100644 --- a/netfetch/src/metrics.rs +++ b/netfetch/src/metrics.rs @@ -11,12 +11,23 @@ use serde::Serialize; use stats::CaConnStats; use stats::CaConnStatsAgg; use stats::CaConnStatsAggDiff; +use stats::DaemonStats; use std::collections::HashMap; use std::net::SocketAddrV4; use std::sync::atomic::Ordering; use std::sync::Arc; use std::time::Duration; +pub struct StatsSet { + daemon: Arc, +} + +impl StatsSet { + pub fn new(daemon: Arc) -> Self { + Self { daemon } + } +} + #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ExtraInsertsConf { pub copies: Vec<(u64, u64)>, @@ -112,19 +123,17 @@ struct DummyQuery { age: usize, } -pub struct DaemonComm {} +pub struct DaemonComm { + tx: Sender, +} impl DaemonComm { pub fn new(tx: Sender) -> Self { - Self {} - } - - pub fn dummy() -> Self { - Self {} + Self { tx } } } -fn make_routes(dcom: Arc) -> axum::Router { +fn make_routes(dcom: Arc, stats_set: StatsSet) -> axum::Router { use axum::extract; use axum::routing::get; use axum::routing::put; @@ -147,17 +156,12 @@ fn make_routes(dcom: Arc) -> axum::Router { ) .route( "/metrics", - get(|| async { - let stats = crate::ca::METRICS.lock().unwrap(); - match stats.as_ref() { - Some(s) => { - trace!("Metrics"); - s.prometheus() - } - None => { - trace!("Metrics empty"); - String::new() - } + get({ + // + || async move { + info!("metrics"); + let s1 = stats_set.daemon.prometheus(); + s1 } }), ) @@ -238,9 +242,9 @@ fn make_routes(dcom: Arc) -> axum::Router { ) } -pub async fn start_metrics_service(bind_to: String, dcom: Arc) { +pub async fn start_metrics_service(bind_to: String, dcom: Arc, stats_set: StatsSet) { axum::Server::bind(&bind_to.parse().unwrap()) - .serve(make_routes(dcom).into_make_service()) + .serve(make_routes(dcom, stats_set).into_make_service()) .await .unwrap() } diff --git a/stats/src/stats.rs b/stats/src/stats.rs index 723b1b6..e9b9436 100644 --- a/stats/src/stats.rs +++ b/stats/src/stats.rs @@ -266,7 +266,29 @@ stats_proc::stats_struct!(( )); stats_proc::stats_struct!(( - stats_struct(name(DaemonStats), counters(main_lookupaddr_ok)), + stats_struct( + name(DaemonStats), + prefix(daemon), + counters( + critical_error, + todo_mark, + ticker_token_acquire_error, + ticker_token_release_error, + handle_timer_tick_count, + ioc_search_err, + ioc_search_some, + ioc_search_none, + lookupaddr_ok, + events, + event_ca_conn, + ca_conn_status_done, + ca_conn_status_feedback_timeout, + ca_conn_status_feedback_recv, + ca_conn_status_feedback_no_dst, + ca_echo_timeout_total, + ), + values(channel_without_address, channel_with_address), + ), agg(name(DaemonStatsAgg), parent(DaemonStats)), diff(name(DaemonStatsAggDiff), input(DaemonStatsAgg)), )); diff --git a/stats_proc/src/stats_proc.rs b/stats_proc/src/stats_proc.rs index 6340ddb..6025309 100644 --- a/stats_proc/src/stats_proc.rs +++ b/stats_proc/src/stats_proc.rs @@ -13,7 +13,9 @@ struct FuncCallWithArgs { #[derive(Clone, Debug)] struct StatsStructDef { name: syn::Ident, + prefix: Option, counters: Vec, + values: Vec, } #[derive(Debug)] @@ -36,18 +38,22 @@ fn extend_str(mut a: String, x: impl AsRef) -> String { } fn stats_struct_impl(st: &StatsStructDef) -> String { + use std::fmt::Write; let name = &st.name; - let inits: Vec<_> = st + let inits1 = st .counters .iter() - .map(|x| format!("{:12}{}: AtomicU64::new(0)", "", x.to_string())) - .collect(); + .map(|x| format!("{:12}{}: AtomicU64::new(0)", "", x.to_string())); + let inits2 = st + .values + .iter() + .map(|x| format!("{:12}{}: AtomicU64::new(0)", "", x.to_string())); + let inits: Vec<_> = inits1.into_iter().chain(inits2).collect(); let inits = inits.join(",\n"); let incers: String = st .counters .iter() - .map(|x| { - let nn = x.to_string(); + .map(|nn| { format!( " pub fn {nn}_inc(&self) {{ @@ -62,7 +68,61 @@ fn stats_struct_impl(st: &StatsStructDef) -> String { " ) }) - .fold(String::new(), |a, x| format!("{}{}", a, x)); + .fold(String::new(), |mut a, x| { + a.push_str(&x); + a + }); + let values = { + let mut buf = String::new(); + for nn in &st.values { + write!( + buf, + " + pub fn {nn}_set(&self, v: u64) {{ + self.{nn}.store(v, Ordering::Release); + }} +" + ) + .unwrap(); + } + buf + }; + let fn_prometheus = { + let mut buf = String::new(); + for x in &st.counters { + let n = x.to_string(); + let pre = if let Some(x) = &st.prefix { + format!("_{}", x) + } else { + String::new() + }; + buf.push_str(&format!( + "ret.push_str(&format!(\"daqingest{}_{} {{}}\\n\", self.{}.load(Ordering::Acquire)));\n", + pre, n, n + )); + } + for x in &st.values { + let n = x.to_string(); + let nn = if let Some(pre) = &st.prefix { + format!("{pre}_{n}") + } else { + n.to_string() + }; + buf.push_str(&format!( + "ret.push_str(&format!(\"daqingest_{} {{}}\\n\", self.{}.load(Ordering::Acquire)));\n", + nn, n + )); + } + format!( + " + pub fn prometheus(&self) -> String {{ + let mut ret = String::new(); + {buf} + ret + }} + " + ) + }; format!( " impl {name} {{ @@ -74,6 +134,10 @@ impl {name} {{ }} {incers} + + {values} + + {fn_prometheus} }} " ) @@ -86,11 +150,17 @@ fn stats_struct_decl_impl(st: &StatsStructDef) -> String { .iter() .map(|x| format!("{:4}pub {}: AtomicU64,\n", "", x.to_string())) .fold(String::new(), extend_str); + let values_decl = st + .values + .iter() + .map(|x| format!("{:4}pub {}: AtomicU64,\n", "", x.to_string())) + .fold(String::new(), extend_str); let structt = format!( " pub struct {name} {{ pub ts_create: Instant, {counters_decl} +{values_decl} }} " @@ -350,27 +420,40 @@ impl StatsStructDef { fn empty() -> Self { Self { name: syn::parse_str("__empty").unwrap(), - counters: vec![], + prefix: syn::parse_str("__empty").unwrap(), + counters: Vec::new(), + values: Vec::new(), } } fn from_args(inp: PunctExpr) -> syn::Result { let mut name = None; + let mut prefix = None; let mut counters = None; + let mut values = None; for k in inp { let fa = FuncCallWithArgs::from_expr(k)?; if fa.name == "name" { let ident = ident_from_expr(fa.args[0].clone())?; name = Some(ident); - } - if fa.name == "counters" { + } else if fa.name == "prefix" { + let ident = ident_from_expr(fa.args[0].clone())?; + prefix = Some(ident); + } else if fa.name == "counters" { let idents = idents_from_exprs(fa.args)?; counters = Some(idents); + } else if fa.name == "values" { + let idents = idents_from_exprs(fa.args)?; + values = Some(idents); + } else { + panic!("fa.name: {:?}", fa.name); } } let ret = StatsStructDef { name: name.expect("Expect name for StatsStructDef"), - counters: counters.unwrap_or(vec![]), + prefix, + counters: counters.unwrap_or(Vec::new()), + values: values.unwrap_or(Vec::new()), }; Ok(ret) } @@ -476,7 +559,9 @@ pub fn stats_struct(ts: TokenStream) -> TokenStream { for j in &def.agg_defs { let h = StatsStructDef { name: j.name.clone(), + prefix: None, counters: j.stats.counters.clone(), + values: Vec::new(), }; def.stats_struct_defs.push(h); } @@ -505,7 +590,11 @@ pub fn stats_struct(ts: TokenStream) -> TokenStream { // TODO currently, "j.stats" describes the input to the "agg", so that contains the wrong name. let p = StatsStructDef { name: k.input.clone(), + // TODO refactor + prefix: None, counters: j.stats.counters.clone(), + // TODO compute values + values: Vec::new(), }; let s = diff_decl_impl(k, &p); code.push_str(&s);