From 2f9a4092c8482b942d45d2efcc8cac01fcb17c74 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Tue, 17 May 2022 16:03:54 +0200 Subject: [PATCH] Add service for Prometheus, shows up in Grafana --- netfetch/Cargo.toml | 1 + netfetch/src/ca.rs | 76 +++-- netfetch/src/ca/conn.rs | 48 +-- stats/src/stats.rs | 198 +---------- stats_proc/src/stats_proc.rs | 641 ++++++++++++++++++++--------------- 5 files changed, 449 insertions(+), 515 deletions(-) diff --git a/netfetch/Cargo.toml b/netfetch/Cargo.toml index b6e4eeb..4caa3d8 100644 --- a/netfetch/Cargo.toml +++ b/netfetch/Cargo.toml @@ -27,6 +27,7 @@ md-5 = "0.9" hex = "0.4" libc = "0.2" regex = "1.5.5" +axum = "0.5" log = { path = "../log" } stats = { path = "../stats" } err = { path = "../../daqbuffer/err" } diff --git a/netfetch/src/ca.rs b/netfetch/src/ca.rs index 888cae2..b963747 100644 --- a/netfetch/src/ca.rs +++ b/netfetch/src/ca.rs @@ -11,16 +11,31 @@ use log::*; use netpod::Database; use scylla::batch::Consistency; use serde::{Deserialize, Serialize}; -use stats::{CaConnStats2, CaConnStats2Agg, CaConnVecStats}; +use stats::{CaConnStats2Agg, CaConnStats2AggDiff}; use std::collections::BTreeMap; use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4}; use std::path::PathBuf; -use std::sync::Arc; +use std::sync::{Arc, Mutex, Once}; use std::time::{Duration, Instant}; use tokio::fs::OpenOptions; use tokio::io::AsyncReadExt; use tokio::net::TcpStream; +static mut METRICS: Option>> = None; +static METRICS_ONCE: Once = Once::new(); + +fn get_metrics() -> &'static mut Option { + METRICS_ONCE.call_once(|| unsafe { + METRICS = Some(Mutex::new(None)); + }); + let mut g = unsafe { METRICS.as_mut().unwrap().lock().unwrap() }; + //let ret = g.as_mut().unwrap(); + //let ret = g.as_mut(; + let ret: &mut Option = &mut *g; + let ret = unsafe { &mut *(ret as *mut _) }; + ret +} + #[derive(Debug, Serialize, Deserialize)] struct ChannelConfig { channels: Vec, @@ -214,6 +229,7 @@ pub async fn ca_search(opts: ListenFromFileOpts) -> Result<(), Error> { } pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> { + tokio::spawn(start_metrics_service()); let facility = "scylla"; let opts = parse_config(opts.config).await?; let d = Database { @@ -249,7 +265,7 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> { let rows = pg_client .query(&qu_find_addr, &[&facility, ch]) .await - .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; + .map_err(|e| Error::with_msg_no_trace(format!("PG error: {e:?}")))?; if rows.is_empty() { error!("can not find address of channel {}", ch); } else { @@ -280,8 +296,7 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> { } let data_store = Arc::new(DataStore::new(pg_client, scy.clone()).await?); let mut conn_jhs = vec![]; - let mut conn_stats_all = vec![]; - let mut conn_stats2 = vec![]; + let mut conn_stats = vec![]; for (host, channels) in channels_by_host { if false && host.ip() != &"172.26.24.76".parse::().unwrap() { continue; @@ -289,10 +304,15 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> { let data_store = data_store.clone(); debug!("Create TCP connection to {:?}", (host.ip(), host.port())); let addr = SocketAddrV4::new(host.ip().clone(), host.port()); - let tcp = TcpStream::connect(addr).await?; + let tcp = match TcpStream::connect(addr).await { + Ok(k) => k, + Err(e) => { + error!("Can not connect to {addr:?} {e:?}"); + continue; + } + }; let mut conn = CaConn::new(tcp, addr, data_store.clone()); - conn_stats_all.push(conn.stats()); - conn_stats2.push(conn.stats2()); + conn_stats.push(conn.stats2()); for c in channels { conn.channel_add(c); } @@ -314,22 +334,21 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> { let jh = tokio::spawn(conn_block); conn_jhs.push(jh); } - let mut agg_last = CaConnVecStats::new(Instant::now()); - let mut agg2_last = CaConnStats2Agg::new(); + let mut agg_last = CaConnStats2Agg::new(); loop { tokio::time::sleep(Duration::from_millis(2000)).await; - let mut agg = CaConnVecStats::new(Instant::now()); - for st in &conn_stats_all { - agg.push(&st); + let agg = CaConnStats2Agg::new(); + for g in &conn_stats { + agg.push(&g); } - let mut agg2 = CaConnStats2Agg::new(); - for st in &conn_stats2 { - agg2.push(&st); - } - let diff = agg.diff_against(&agg_last); - info!("{diff}"); + let m = get_metrics(); + *m = Some(agg.clone()); + let diff = CaConnStats2AggDiff::diff_from(&agg_last, &agg); + info!("{}", diff.display()); agg_last = agg; - agg2_last = agg2; + if false { + break; + } } for jh in conn_jhs { match jh.await { @@ -346,3 +365,20 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> { } Ok(()) } + +async fn start_metrics_service() { + let app = axum::Router::new().route( + "/metrics", + axum::routing::get(|| async { + let stats = get_metrics(); + match stats { + Some(s) => s.prometheus(), + None => String::new(), + } + }), + ); + axum::Server::bind(&"0.0.0.0:3011".parse().unwrap()) + .serve(app.into_make_service()) + .await + .unwrap() +} diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index 3a5d120..3bf2064 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -11,11 +11,11 @@ use libc::c_int; use log::*; use netpod::timeunits::SEC; use netpod::{ScalarType, Shape}; -use stats::{CaConnStats, CaConnStats2, IntervalEma}; +use stats::{CaConnStats2, IntervalEma}; use std::collections::{BTreeMap, VecDeque}; use std::net::{Ipv4Addr, SocketAddrV4}; use std::pin::Pin; -use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering}; +use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use std::task::{Context, Poll}; use std::time::{Duration, Instant, SystemTime}; @@ -105,10 +105,10 @@ macro_rules! insert_scalar_impl { ts_msp_changed: bool, st: ScalarType, sh: Shape, - stats2: Arc, + stats: Arc, ) { if futs_queue.len() >= INSERT_FUTS_LIM { - stats2.inserts_discard.fetch_add(1, Ordering::Release); + stats.inserts_discard.fetch_add(1, Ordering::AcqRel); return; } let pulse = 0 as u64; @@ -120,7 +120,7 @@ macro_rules! insert_scalar_impl { val, ); let fut3 = ScyInsertFut::new(data_store.scy.clone(), data_store.$qu_insert.clone(), params); - stats2.inserts_val.fetch_add(1, Ordering::Release); + stats.inserts_val.fetch_add(1, Ordering::AcqRel); let fut = if ts_msp_changed { let fut1 = ScyInsertFut::new( data_store.scy.clone(), @@ -138,7 +138,7 @@ macro_rules! insert_scalar_impl { data_store.qu_insert_ts_msp.clone(), (series.id() as i64, ts_msp as i64), ); - stats2.inserts_msp.fetch_add(1, Ordering::Release); + stats.inserts_msp.fetch_add(1, Ordering::AcqRel); Box::pin(fut1.and_then(move |_| fut2).and_then(move |_| fut3)) as _ } else { Box::pin(fut3) as _ @@ -162,10 +162,10 @@ macro_rules! insert_array_impl { ts_msp_changed: bool, st: ScalarType, sh: Shape, - stats2: Arc, + stats: Arc, ) { if futs_queue.len() >= INSERT_FUTS_LIM { - stats2.inserts_discard.fetch_add(1, Ordering::Release); + stats.inserts_discard.fetch_add(1, Ordering::AcqRel); return; } let pulse = 0 as u64; @@ -177,7 +177,7 @@ macro_rules! insert_array_impl { val, ); let fut3 = ScyInsertFut::new(data_store.scy.clone(), data_store.$qu_insert.clone(), params); - stats2.inserts_val.fetch_add(1, Ordering::Release); + stats.inserts_val.fetch_add(1, Ordering::AcqRel); let fut = if ts_msp_changed { let fut1 = ScyInsertFut::new( data_store.scy.clone(), @@ -195,7 +195,7 @@ macro_rules! insert_array_impl { data_store.qu_insert_ts_msp.clone(), (series.id() as i64, ts_msp as i64), ); - stats2.inserts_msp.fetch_add(1, Ordering::Release); + stats.inserts_msp.fetch_add(1, Ordering::AcqRel); Box::pin(fut1.and_then(move |_| fut2).and_then(move |_| fut3)) as _ } else { Box::pin(fut3) as _ @@ -298,7 +298,6 @@ pub struct CaConn { FuturesOrdered), Error>> + Send>>>, value_insert_futs: FuturesOrdered> + Send>>>, remote_addr_dbg: SocketAddrV4, - stats: Arc, stats2: Arc, } @@ -321,15 +320,10 @@ impl CaConn { fut_get_series: FuturesOrdered::new(), value_insert_futs: FuturesOrdered::new(), remote_addr_dbg, - stats: Arc::new(CaConnStats::new()), stats2: Arc::new(CaConnStats2::new()), } } - pub fn stats(&self) -> Arc { - self.stats.clone() - } - pub fn stats2(&self) -> Arc { self.stats2.clone() } @@ -522,7 +516,6 @@ impl CaConn { } } } - self.stats.inserts_started.fetch_add(1, Ordering::Release); Ok(()) } @@ -669,7 +662,7 @@ impl CaConn { let mut msgs_tmp = vec![]; self.check_channels_state_init(&mut msgs_tmp)?; let ts2 = Instant::now(); - self.stats + self.stats2 .time_check_channels_state_init .fetch_add((ts2.duration_since(ts1) * 1000000).as_secs(), Ordering::Release); ts1 = ts2; @@ -737,7 +730,7 @@ impl CaConn { CaMsgTy::EventAddRes(k) => { let res = Self::handle_event_add_res(self, k); let ts2 = Instant::now(); - self.stats + self.stats2 .time_handle_event_add_res .fetch_add((ts2.duration_since(ts1) * 1000000).as_secs(), Ordering::Release); ts1 = ts2; @@ -781,13 +774,13 @@ impl Stream for CaConn { let ret = loop { self.handle_insert_futs(cx)?; let ts2 = Instant::now(); - self.stats + self.stats2 .poll_time_handle_insert_futs .fetch_add((ts2.duration_since(ts1) * 1000000).as_secs(), Ordering::AcqRel); ts1 = ts2; self.handle_get_series_futs(cx)?; let ts2 = Instant::now(); - self.stats + self.stats2 .poll_time_get_series_futs .fetch_add((ts2.duration_since(ts1) * 1000000).as_secs(), Ordering::AcqRel); ts1 = ts2; @@ -812,7 +805,7 @@ impl Stream for CaConn { CaConnState::Listen => match { let res = self.handle_conn_listen(cx); let ts2 = Instant::now(); - self.stats + self.stats2 .time_handle_conn_listen .fetch_add((ts2.duration_since(ts1) * 1000000).as_secs(), Ordering::AcqRel); ts1 = ts2; @@ -824,9 +817,7 @@ impl Stream for CaConn { CaConnState::PeerReady => { let res = self.handle_peer_ready(cx); let ts2 = Instant::now(); - self.stats - .time_handle_peer_ready - .fetch_add((ts2.duration_since(ts1) * 1000000).as_secs(), Ordering::AcqRel); + self.stats2.time_handle_peer_ready_dur(ts2.duration_since(ts1)); ts1 = ts2; res } @@ -837,12 +828,9 @@ impl Stream for CaConn { if nn > 1000 { warn!("insert_queue_len {nn}"); } - self.stats.insert_queue_len.store(nn, Ordering::Release); + self.stats2.inserts_queue_len.store(nn, Ordering::Release); let ts_outer_2 = Instant::now(); - self.stats.poll_time_all.fetch_add( - (ts_outer_2.duration_since(ts_outer_1) * 1000000).as_secs(), - Ordering::AcqRel, - ); + self.stats2.poll_time_all_dur(ts_outer_2.duration_since(ts_outer_1)); ret } } diff --git a/stats/src/stats.rs b/stats/src/stats.rs index 360368c..b71bfa9 100644 --- a/stats/src/stats.rs +++ b/stats/src/stats.rs @@ -1,8 +1,5 @@ -use stats_types::*; -use std::fmt; use std::sync::atomic::AtomicU64; -use std::sync::atomic::Ordering::{self, AcqRel, Acquire, SeqCst}; -use std::sync::RwLock; +use std::sync::atomic::Ordering; use std::time::{Duration, Instant}; const US: u64 = 1000; @@ -114,192 +111,23 @@ impl IntervalEma { } } -stats_proc::stats_struct2!(( - StatsStruct( +stats_proc::stats_struct!(( + stats_struct( name(CaConnStats2), counters( - // inserts_val, inserts_msp, inserts_discard, + inserts_queue_len, + poll_time_all, + poll_time_handle_insert_futs, + poll_time_get_series_futs, + time_handle_conn_listen, + time_handle_peer_ready, + time_check_channels_state_init, + time_handle_event_add_res, ), ), - StatsStruct(name(SomeOtherStats), counters(c1, c2,),), + agg(name(CaConnStats2Agg), parent(CaConnStats2)), + diff(name(CaConnStats2AggDiff), input(CaConnStats2Agg)), )); - -pub struct CaConnStats { - pub poll_time_all: AtomicU64, - pub poll_time_handle_insert_futs: AtomicU64, - pub poll_time_get_series_futs: AtomicU64, - pub time_handle_conn_listen: AtomicU64, - pub time_handle_peer_ready: AtomicU64, - pub time_check_channels_state_init: AtomicU64, - pub time_handle_event_add_res: AtomicU64, - pub inserts_started: AtomicU64, - pub inserts_discarded: AtomicU64, - pub insert_queue_len: AtomicU64, -} - -impl CaConnStats { - pub fn new() -> Self { - Self { - poll_time_all: AtomicU64::new(0), - poll_time_handle_insert_futs: AtomicU64::new(0), - poll_time_get_series_futs: AtomicU64::new(0), - time_handle_conn_listen: AtomicU64::new(0), - time_handle_peer_ready: AtomicU64::new(0), - time_check_channels_state_init: AtomicU64::new(0), - time_handle_event_add_res: AtomicU64::new(0), - inserts_started: AtomicU64::new(0), - inserts_discarded: AtomicU64::new(0), - insert_queue_len: AtomicU64::new(0), - } - } -} - -pub struct CaConnVecStats { - pub ts_create: RwLock, - pub nstats: AtomicU64, - pub poll_time_all: AtomicU64, - pub poll_time_handle_insert_futs: AtomicU64, - pub poll_time_get_series_futs: AtomicU64, - pub time_handle_conn_listen: AtomicU64, - pub time_handle_peer_ready: AtomicU64, - pub time_check_channels_state_init: AtomicU64, - pub time_handle_event_add_res: AtomicU64, - pub inserts_started: AtomicU64, - pub inserts_discarded: AtomicU64, - pub insert_queue_len: AtomicU64, -} - -pub struct CaConnVecStatsDiff { - pub dt: AtomicU64, - pub nstats: AtomicU64, - pub poll_time_all: AtomicU64, - pub poll_time_handle_insert_futs: AtomicU64, - pub poll_time_get_series_futs: AtomicU64, - pub time_handle_conn_listen: AtomicU64, - pub time_handle_peer_ready: AtomicU64, - pub time_check_channels_state_init: AtomicU64, - pub time_handle_event_add_res: AtomicU64, - pub inserts_started: AtomicU64, - pub inserts_discarded: AtomicU64, - pub insert_queue_len: AtomicU64, -} - -impl CaConnVecStats { - pub fn new(ts_create: Instant) -> Self { - Self { - ts_create: RwLock::new(ts_create), - nstats: AtomicU64::new(0), - poll_time_all: AtomicU64::new(0), - poll_time_handle_insert_futs: AtomicU64::new(0), - poll_time_get_series_futs: AtomicU64::new(0), - time_handle_conn_listen: AtomicU64::new(0), - time_handle_peer_ready: AtomicU64::new(0), - time_check_channels_state_init: AtomicU64::new(0), - time_handle_event_add_res: AtomicU64::new(0), - inserts_started: AtomicU64::new(0), - inserts_discarded: AtomicU64::new(0), - insert_queue_len: AtomicU64::new(0), - } - } - - pub fn push(&mut self, k: &CaConnStats) { - self.nstats.fetch_add(1, AcqRel); - self.poll_time_all.fetch_add(k.poll_time_all.load(Acquire), AcqRel); - self.poll_time_handle_insert_futs - .fetch_add(k.poll_time_handle_insert_futs.load(Acquire), AcqRel); - self.poll_time_get_series_futs - .fetch_add(k.poll_time_get_series_futs.load(Acquire), AcqRel); - self.time_handle_conn_listen - .fetch_add(k.time_handle_conn_listen.load(Acquire), AcqRel); - self.time_handle_peer_ready - .fetch_add(k.time_handle_peer_ready.load(Acquire), AcqRel); - self.time_check_channels_state_init - .fetch_add(k.time_check_channels_state_init.load(Acquire), AcqRel); - self.time_handle_event_add_res - .fetch_add(k.time_handle_event_add_res.load(Acquire), AcqRel); - self.inserts_started.fetch_add(k.inserts_started.load(Acquire), AcqRel); - self.inserts_discarded - .fetch_add(k.inserts_discarded.load(Acquire), AcqRel); - self.insert_queue_len - .fetch_add(k.insert_queue_len.load(Acquire), SeqCst); - } - - pub fn diff_against(&self, k: &Self) -> CaConnVecStatsDiff { - let dur = self - .ts_create - .read() - .unwrap() - .duration_since(*k.ts_create.read().unwrap()); - CaConnVecStatsDiff { - dt: AtomicU64::new(dur.as_secs() * SEC + dur.subsec_nanos() as u64), - nstats: AtomicU64::new(self.nstats.load(Acquire)), - poll_time_all: AtomicU64::new(self.poll_time_all.load(Acquire) - k.poll_time_all.load(Acquire)), - poll_time_handle_insert_futs: AtomicU64::new( - self.poll_time_handle_insert_futs.load(Acquire) - k.poll_time_handle_insert_futs.load(Acquire), - ), - poll_time_get_series_futs: AtomicU64::new( - self.poll_time_get_series_futs.load(Acquire) - k.poll_time_get_series_futs.load(Acquire), - ), - time_handle_conn_listen: AtomicU64::new( - self.time_handle_conn_listen.load(Acquire) - k.time_handle_conn_listen.load(Acquire), - ), - time_handle_peer_ready: AtomicU64::new( - self.time_handle_peer_ready.load(Acquire) - k.time_handle_peer_ready.load(Acquire), - ), - time_check_channels_state_init: AtomicU64::new( - self.time_check_channels_state_init.load(Acquire) - k.time_check_channels_state_init.load(Acquire), - ), - time_handle_event_add_res: AtomicU64::new( - self.time_handle_event_add_res.load(Acquire) - k.time_handle_event_add_res.load(Acquire), - ), - inserts_started: AtomicU64::new(self.inserts_started.load(Acquire) - k.inserts_started.load(Acquire)), - inserts_discarded: AtomicU64::new(self.inserts_discarded.load(Acquire) - k.inserts_discarded.load(Acquire)), - insert_queue_len: AtomicU64::new(self.insert_queue_len.load(Acquire)), - } - } -} - -impl fmt::Display for CaConnVecStatsDiff { - fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - let nstats = self.nstats.load(Acquire); - let insert_freq = self.inserts_started.load(Acquire) / (self.dt.load(Acquire) / SEC); - let poll_time = self.poll_time_all.load(Acquire); - let poll_time_handle_insert_futs = self.poll_time_handle_insert_futs.load(Acquire); - let poll_time_get_series_futs = self.poll_time_get_series_futs.load(Acquire); - let time_handle_conn_listen = self.time_handle_conn_listen.load(Acquire); - let time_handle_peer_ready = self.time_handle_peer_ready.load(Acquire); - let time_check_channels_state_init = self.time_check_channels_state_init.load(Acquire); - let time_handle_event_add_res = self.time_check_channels_state_init.load(Acquire); - let poll_pct_handle_insert_futs = poll_time_handle_insert_futs * 100 / poll_time; - let poll_pct_get_series_futs = poll_time_get_series_futs * 100 / poll_time; - let pct_handle_conn_listen = time_handle_conn_listen * 100 / poll_time; - let pct_handle_peer_ready = time_handle_peer_ready * 100 / poll_time; - let pct_check_channels_state_init = time_check_channels_state_init * 100 / poll_time; - let pct_handle_event_add_res = time_handle_event_add_res * 100 / poll_time; - let inserts_discarded_freq = self.inserts_discarded.load(Acquire); - let insert_queue_len = self.insert_queue_len.load(Acquire); - let insqavg = insert_queue_len as f32 / nstats as f32; - write!( - fmt, - "nstats {} insq {} insqavg {:.2}\n", - nstats, insert_queue_len, insqavg - ) - .unwrap(); - write!( - fmt, - "infr {} dis {} pt {:5} ins {:2}% sid {:2}% listen {:2}% peer {:2}% checkinit {:2}% evadd {:2}%", - insert_freq, - inserts_discarded_freq, - poll_time / 1000, - poll_pct_handle_insert_futs, - poll_pct_get_series_futs, - pct_handle_conn_listen, - pct_handle_peer_ready, - pct_check_channels_state_init, - pct_handle_event_add_res - ) - } -} diff --git a/stats_proc/src/stats_proc.rs b/stats_proc/src/stats_proc.rs index 5aa830e..a88f689 100644 --- a/stats_proc/src/stats_proc.rs +++ b/stats_proc/src/stats_proc.rs @@ -1,32 +1,33 @@ -use proc_macro::{Delimiter, Span, TokenStream, TokenTree}; +use proc_macro::TokenStream; use quote::quote; -use stats_types::*; use syn::parse::ParseStream; -use syn::punctuated::Punctuated; -use syn::{parse_macro_input, ExprTuple, Ident, Token}; +use syn::{parse_macro_input, Ident}; -fn parse_name(gr: proc_macro::Group) -> String { - for tok in gr.stream() { - match tok { - TokenTree::Ident(k) => { - return k.to_string(); - } - _ => {} - } - } - panic!(); +type PunctExpr = syn::punctuated::Punctuated; + +struct FuncCallWithArgs { + name: Ident, + args: PunctExpr, } -fn parse_counters(gr: proc_macro::Group, a: &mut Vec) { - for tok in gr.stream() { - match tok { - TokenTree::Ident(k) => { - let x = Counter { name: k.to_string() }; - a.push(x); - } - _ => {} - } - } +#[derive(Clone, Debug)] +struct StatsStructDef { + name: syn::Ident, + counters: Vec, +} + +#[derive(Debug)] +struct AggStructDef { + name: syn::Ident, + parent: syn::Ident, + // TODO this currently describes our input (especially the input's name): + stats: StatsStructDef, +} + +#[derive(Debug)] +struct DiffStructDef { + name: syn::Ident, + input: syn::Ident, } fn extend_str(mut a: String, x: impl AsRef) -> String { @@ -34,38 +35,131 @@ fn extend_str(mut a: String, x: impl AsRef) -> String { a } -fn stats_struct_agg_impl(st: &StatsStructDef) -> String { - let name = format!("{}Agg", st.name); - let name_inp = &st.name; - let counters_decl: Vec<_> = st - .counters - .iter() - .map(|x| format!("pub {}: AtomicU64", x.to_string())) - .collect(); - let counters_decl = counters_decl.join(",\n"); +fn stats_struct_impl(st: &StatsStructDef) -> String { + let name = &st.name; let inits: Vec<_> = st .counters .iter() - .map(|x| format!("{}: AtomicU64::new(0)", x.to_string())) + .map(|x| format!("{:12}{}: AtomicU64::new(0)", "", x.to_string())) .collect(); let inits = inits.join(",\n"); - let mut code = format!( + let incers: String = st + .counters + .iter() + .map(|x| { + let nn = x.to_string(); + format!( + " + pub fn {nn}_inc(&self) {{ + self.{nn}.fetch_add(1, Ordering::AcqRel); + }} + pub fn {nn}_add(&self, v: u64) {{ + self.{nn}.fetch_add(v, Ordering::AcqRel); + }} + pub fn {nn}_dur(&self, v: Duration) {{ + self.{nn}.fetch_add((v * 1000000).as_secs(), Ordering::AcqRel); + }} +" + ) + }) + .fold(String::new(), |a, x| format!("{}{}", a, x)); + format!( + " +impl {name} {{ + pub fn new() -> Self {{ + Self {{ + ts_create: Instant::now(), +{inits} + }} + }} + + {incers} +}} + " + ) +} + +fn stats_struct_decl_impl(st: &StatsStructDef) -> String { + let name = &st.name; + let counters_decl = st + .counters + .iter() + .map(|x| format!("{:4}pub {}: AtomicU64,\n", "", x.to_string())) + .fold(String::new(), extend_str); + let structt = format!( " pub struct {name} {{ pub ts_create: Instant, - pub aggcount: AtomicU64, - {counters_decl}, +{counters_decl} }} +" + ); + let code = format!("{}\n\n{}", structt, stats_struct_impl(st),); + code +} + +fn agg_decl_impl(st: &StatsStructDef, ag: &AggStructDef) -> String { + let name = &ag.name; + let name_inp = &st.name; + let counters_decl = st + .counters + .iter() + .map(|x| format!("{:4}pub {}: AtomicU64,\n", "", x.to_string())) + .fold(String::new(), extend_str); + let mut code = String::new(); + let s = format!( + " +// Agg decl +pub struct {name} {{ + pub ts_create: Instant, + pub aggcount: AtomicU64, +{counters_decl} +}} +" + ); + code.push_str(&s); + let clone_counters = st + .counters + .iter() + .map(|x| { + let n = x.to_string(); + format!("{:12}{}: AtomicU64::new(self.{}.load(Ordering::Acquire)),\n", "", n, n) + }) + .fold(String::new(), extend_str); + let s = format!( + " +impl Clone for {name} {{ + fn clone(&self) -> Self {{ + Self {{ + ts_create: self.ts_create.clone(), + aggcount: AtomicU64::new(self.aggcount.load(Ordering::Acquire)), +{clone_counters} + }} + }} +}} +" + ); + code.push_str(&s); + let inits = st + .counters + .iter() + .map(|x| format!("{:12}{}: AtomicU64::new(0),\n", "", x.to_string())) + .fold(String::new(), extend_str); + let s = format!( + " +// Agg impl impl {name} {{ pub fn new() -> Self {{ Self {{ ts_create: Instant::now(), aggcount: AtomicU64::new(0), - {inits}, +{inits} }} - }}" + }} +" ); + code.push_str(&s); let counters_add = st .counters .iter() @@ -80,8 +174,108 @@ impl {name} {{ let s = format!( " pub fn push(&self, inp: &{name_inp}) {{ + self.aggcount.fetch_add(1, Ordering::AcqRel); {counters_add} }} +" + ); + code.push_str(&s); + { + let mut buf = String::new(); + for x in &st.counters { + let n = x.to_string(); + buf.push_str(&format!( + "ret.push_str(&format!(\"{} {{}}\\n\", self.{}.load(Ordering::Acquire)));\n", + n, n + )); + } + let s = format!( + " + pub fn prometheus(&self) -> String {{ + let mut ret = String::new(); + ret.push_str(&format!(\"aggcount {{}}\\n\", self.aggcount.load(Ordering::Acquire))); +{buf} + ret + }} + " + ); + code.push_str(&s); + } + code.push_str( + " +} +", + ); + code +} + +// TODO maybe basic and agg structs need a different treatment? +// Should probably implement the methods behind a common trait. +fn diff_decl_impl(st: &DiffStructDef, inp: &StatsStructDef) -> String { + let name = &st.name; + let inp_ty = &inp.name; + let decl = inp + .counters + .iter() + .map(|x| format!("{:4}pub {}: AtomicU64,\n", "", x.to_string())) + .fold(String::new(), extend_str); + let mut code = String::new(); + let s = format!( + " +pub struct {name} {{ + pub dt: AtomicU64, +{decl} +}} +" + ); + code.push_str(&s); + code.push_str(&format!( + "impl {name} {{ +" + )); + let diffs = inp + .counters + .iter() + .map(|x| { + let n = x.to_string(); + format!( + "{:12}let {} = AtomicU64::new(b.{}.load(Ordering::Acquire) - a.{}.load(Ordering::Acquire));\n", + "", n, n, n + ) + }) + .fold(String::new(), extend_str); + let inits = inp + .counters + .iter() + .map(|x| { + let n = x.to_string(); + format!("{:16}{},\n", "", n) + }) + .fold(String::new(), extend_str); + let s = format!( + " + pub fn diff_from(a: &{inp_ty}, b: &{inp_ty}) -> Self {{ + let dur = b.ts_create.duration_since(a.ts_create); +{diffs} + Self {{ + dt: AtomicU64::new(dur.as_secs() * SEC + dur.subsec_nanos() as u64), +{inits} + }} + }} + " + ); + code.push_str(&s); + let mut a = String::new(); + let mut b = String::new(); + for h in &inp.counters { + a.push_str(&format!("{} {{}} ", h.to_string())); + b.push_str(&format!("self.{}.load(Ordering::Acquire), ", h.to_string())); + } + let s = format!( + " + pub fn display(&self) -> String {{ + format!(\"dt {{}} {a}\", self.dt.load(Ordering::Acquire) / 1000000, {b}) + }} " ); code.push_str(&s); @@ -93,123 +287,6 @@ impl {name} {{ code } -fn stats_struct_impl(st: &StatsStructDef) -> String { - let name = &st.name; - let inits: Vec<_> = st - .counters - .iter() - .map(|x| format!("{}: AtomicU64::new(0)", x.to_string())) - .collect(); - let inits = inits.join(",\n"); - let incers: String = st - .counters - .iter() - .map(|x| { - let nn = x.to_string(); - format!( - " - pub fn {nn}_inc(&mut self) {{ - self.{nn}.fetch_add(1, Ordering::AcqRel); - }} - pub fn {nn}_add(&mut self, v: u64) {{ - self.{nn}.fetch_add(v, Ordering::AcqRel); - }} -" - ) - }) - .fold(String::new(), |a, x| format!("{}{}", a, x)); - format!( - " -impl {name} {{ - pub fn new() -> Self {{ - Self {{ - ts_create: Instant::now(), - {inits} - }} - }} - - {incers} -}} - " - ) -} - -fn stats_struct_agg_diff_impl(st: &StatsStruct) -> String { - let name = format!("{}AggDiff", st.name); - let name_inp = &st.name; - let decl = st - .counters - .iter() - .map(|x| format!("{}: AtomicU64,\n", x.name)) - .fold(String::new(), extend_str); - - // TODO the diff method must belong to StructAgg. - let diffs = st - .counters - .iter() - .map(|x| { - format!( - " - pub fn diff_against(&self, k: &Self) -> {name} {{ - - }} -" - ) - }) - .fold(String::new(), extend_str); - let code = format!( - " -pub struct {name} {{ - pub dt: AtomicU64, - pub aggcount: AtomicU64, - {decl}, -}} - -impl {name} {{ - {diffs} -}} -" - ); - code -} - -fn stats_struct_decl_impl(st: &StatsStructDef) -> String { - let name = &st.name; - let counters_decl = st - .counters - .iter() - .map(|x| format!("pub {}: AtomicU64,\n", x.to_string())) - .fold(String::new(), extend_str); - let structt = format!( - " -pub struct {name} {{ - pub ts_create: Instant, - {counters_decl} -}} -" - ); - let code = format!( - "{}\n\n{}\n\n{}", - structt, - stats_struct_impl(st), - stats_struct_agg_impl(st) - ); - code -} - -fn stats_struct_def(st: &StatsStruct) -> String { - let name_def = format!("{}", st.name); - let structt = format!( - " -const {name_def}: StatsStructDef = StatsStructDef {{ - name: String::new(), - counters: vec![], -}}; -" - ); - structt -} - fn ident_from_expr(inp: syn::Expr) -> syn::Result { use syn::spanned::Spanned; match inp { @@ -250,71 +327,6 @@ fn func_name_from_expr(inp: syn::Expr) -> syn::Result { } } -fn extract_some_stuff_as_string(inp: impl IntoIterator) -> Result, syn::Error> { - use syn::spanned::Spanned; - use syn::{Error, Expr}; - let inp: Vec<_> = inp.into_iter().collect(); - let args: Vec<_> = inp - .into_iter() - .map(|k| match k { - Expr::Path(k) => { - let sp = k.span(); - if k.path.segments.len() != 1 { - return Err(Error::new(sp, "Expect function name with one segment")); - } - let res = k.path.segments[0].ident.clone(); - Ok(res) - } - _ => { - return Err(Error::new(k.span(), format!("Expect function name Path {k:?}"))); - } - }) - .collect(); - for k in &args { - if k.is_err() { - return Err(k.clone().unwrap_err()); - } - } - let args = args.into_iter().map(Result::unwrap).map(|x| x.to_string()).collect(); - Ok(args) -} - -fn func_args_from_expr(inp: Punctuated) -> Result, syn::Error> { - use syn::spanned::Spanned; - use syn::{Error, Expr}; - let inp: Vec<_> = inp.into_iter().collect(); - let args: Vec<_> = inp - .into_iter() - .map(|k| match k { - Expr::Path(k) => { - let sp = k.span(); - if k.path.segments.len() != 1 { - return Err(Error::new(sp, "Expect function name with one segment")); - } - let res = k.path.segments[0].ident.clone(); - Ok(res) - } - _ => { - return Err(Error::new(k.span(), format!("Expect function name Path {k:?}"))); - } - }) - .collect(); - for k in &args { - if k.is_err() { - return Err(k.clone().unwrap_err()); - } - } - let args = args.into_iter().map(Result::unwrap).collect(); - Ok(args) -} - -type PunctExpr = syn::punctuated::Punctuated; - -struct FuncCallWithArgs { - name: Ident, - args: PunctExpr, -} - impl FuncCallWithArgs { fn from_expr(inp: syn::Expr) -> Result { use syn::spanned::Spanned; @@ -334,13 +346,14 @@ impl FuncCallWithArgs { } } -#[derive(Debug)] -struct StatsStructDef { - name: syn::Ident, - counters: Vec, -} - impl StatsStructDef { + fn empty() -> Self { + Self { + name: syn::parse_str("__empty").unwrap(), + counters: vec![], + } + } + fn from_args(inp: PunctExpr) -> syn::Result { let mut name = None; let mut counters = None; @@ -363,87 +376,155 @@ impl StatsStructDef { } } +impl AggStructDef { + fn from_args(inp: PunctExpr) -> syn::Result { + let mut name = None; + let mut parent = None; + for k in inp { + let fa = FuncCallWithArgs::from_expr(k)?; + if fa.name == "name" { + let ident = ident_from_expr(fa.args[0].clone())?; + name = Some(ident); + } + if fa.name == "parent" { + let ident = ident_from_expr(fa.args[0].clone())?; + parent = Some(ident); + } + } + let ret = AggStructDef { + name: name.expect("Expect name for AggStructDef"), + // Will get resolved later: + stats: StatsStructDef::empty(), + parent: parent.expect("Expect parent"), + }; + Ok(ret) + } +} + +impl DiffStructDef { + fn from_args(inp: PunctExpr) -> syn::Result { + let mut name = None; + let mut input = None; + for k in inp { + let fa = FuncCallWithArgs::from_expr(k)?; + if fa.name == "name" { + let ident = ident_from_expr(fa.args[0].clone())?; + name = Some(ident); + } + if fa.name == "input" { + let ident = ident_from_expr(fa.args[0].clone())?; + input = Some(ident); + } + } + let ret = DiffStructDef { + name: name.expect("Expect name for DiffStructDef"), + input: input.expect("Expect input for DiffStructDef"), + }; + Ok(ret) + } +} + #[derive(Debug)] struct StatsTreeDef { stats_struct_defs: Vec, + agg_defs: Vec, + diff_defs: Vec, } impl syn::parse::Parse for StatsTreeDef { fn parse(inp: ParseStream) -> syn::Result { let k = inp.parse::()?; let mut a = vec![]; + let mut agg_defs = vec![]; + let mut diff_defs = vec![]; for k in k.elems { let fa = FuncCallWithArgs::from_expr(k)?; - if fa.name == "StatsStruct" { + if fa.name == "stats_struct" { let stats_struct_def = StatsStructDef::from_args(fa.args)?; a.push(stats_struct_def); + } else if fa.name == "agg" { + let agg_def = AggStructDef::from_args(fa.args)?; + agg_defs.push(agg_def); + } else if fa.name == "diff" { + let diff_def = DiffStructDef::from_args(fa.args)?; + diff_defs.push(diff_def); } else { return Err(syn::Error::new(fa.name.span(), "Unexpected")); } } - let ret = StatsTreeDef { stats_struct_defs: a }; + let ret = StatsTreeDef { + stats_struct_defs: a, + agg_defs, + diff_defs, + }; Ok(ret) } } #[proc_macro] -pub fn stats_struct2(ts: TokenStream) -> TokenStream { - let def: StatsTreeDef = parse_macro_input!(ts); - //panic!("DEF: {def:?}"); +pub fn stats_struct(ts: TokenStream) -> TokenStream { + let mut def: StatsTreeDef = parse_macro_input!(ts); + for h in &mut def.agg_defs { + for k in &def.stats_struct_defs { + if k.name == h.parent { + // TODO factor this out.. + h.stats = k.clone(); + } + } + } + if false { + for j in &def.agg_defs { + let h = StatsStructDef { + name: j.name.clone(), + counters: j.stats.counters.clone(), + }; + def.stats_struct_defs.push(h); + } + } + let mut code = String::new(); let mut ts1 = TokenStream::new(); - for k in def.stats_struct_defs { - let s = stats_struct_decl_impl(&k); + for k in &def.stats_struct_defs { + let s = stats_struct_decl_impl(k); + code.push_str(&s); let ts2: TokenStream = s.parse().unwrap(); ts1.extend(ts2); } + for k in &def.agg_defs { + for st in &def.stats_struct_defs { + if st.name == k.parent { + let s = agg_decl_impl(st, k); + code.push_str(&s); + let ts2: TokenStream = s.parse().unwrap(); + ts1.extend(ts2); + } + } + } + for k in &def.diff_defs { + for j in &def.agg_defs { + if j.name == k.input { + // TODO currently, "j.stats" describes the input to the "agg", so that contains the wrong name. + let p = StatsStructDef { + name: k.input.clone(), + counters: j.stats.counters.clone(), + }; + let s = diff_decl_impl(k, &p); + code.push_str(&s); + let ts2: TokenStream = s.parse().unwrap(); + ts1.extend(ts2); + } + } + for j in &def.stats_struct_defs { + if j.name == k.input { + let s = diff_decl_impl(k, j); + code.push_str(&s); + let ts2: TokenStream = s.parse().unwrap(); + ts1.extend(ts2); + } + } + } + //panic!("CODE: {}", code); let _ts3 = TokenStream::from(quote!( mod asd {} )); ts1 } - -#[proc_macro] -pub fn stats_struct(ts: TokenStream) -> TokenStream { - use std::fmt::Write; - let mut counters = vec![]; - let mut log = String::new(); - let mut in_name = false; - let mut in_counters = false; - let mut name = None; - for tt in ts.clone() { - match tt { - TokenTree::Ident(k) => { - if k.to_string() == "name" { - in_name = true; - } - if k.to_string() == "counters" { - in_counters = true; - } - } - TokenTree::Group(k) => { - if in_counters { - in_counters = false; - parse_counters(k, &mut counters); - } else if in_name { - in_name = false; - name = Some(parse_name(k)); - } - } - TokenTree::Punct(..) => (), - TokenTree::Literal(k) => { - if in_name { - write!(log, "NAME write literal {}\n", k); - name = Some(k.to_string()); - } - } - } - } - let name = name.unwrap(); - let stats_struct = StatsStruct { name, counters }; - write!(log, "{:?}\n", stats_struct); - //panic!("{}", make_code(&stats_struct)); - //panic!("{}", log); - //let ts2 = TokenStream::from_iter(ts.into_iter()); - //TokenTree::Group(proc_macro::Group::new(Delimiter::Brace, TokenStream::new())); - stats_struct_def(&stats_struct).parse().unwrap() -}