From c159b83b8c963455fe591a02acc61c9a5b2ac71f Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Mon, 16 May 2022 15:18:32 +0200 Subject: [PATCH] Refactor stats collection [WIP] --- netfetch/src/ca.rs | 7 +- stats/Cargo.toml | 1 + stats/src/stats.rs | 24 ++- stats_proc/Cargo.toml | 2 + stats_proc/src/stats_proc.rs | 354 +++++++++++++++++++++++++++++---- stats_types/Cargo.toml | 10 + stats_types/src/stats_types.rs | 15 ++ 7 files changed, 367 insertions(+), 46 deletions(-) create mode 100644 stats_types/Cargo.toml create mode 100644 stats_types/src/stats_types.rs diff --git a/netfetch/src/ca.rs b/netfetch/src/ca.rs index 96b57d2..888cae2 100644 --- a/netfetch/src/ca.rs +++ b/netfetch/src/ca.rs @@ -11,7 +11,7 @@ use log::*; use netpod::Database; use scylla::batch::Consistency; use serde::{Deserialize, Serialize}; -use stats::{CaConnStats2, CaConnVecStats}; +use stats::{CaConnStats2, CaConnStats2Agg, CaConnVecStats}; use std::collections::BTreeMap; use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4}; use std::path::PathBuf; @@ -322,9 +322,14 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> { for st in &conn_stats_all { agg.push(&st); } + let mut agg2 = CaConnStats2Agg::new(); + for st in &conn_stats2 { + agg2.push(&st); + } let diff = agg.diff_against(&agg_last); info!("{diff}"); agg_last = agg; + agg2_last = agg2; } for jh in conn_jhs { match jh.await { diff --git a/stats/Cargo.toml b/stats/Cargo.toml index 916055e..439f1a6 100644 --- a/stats/Cargo.toml +++ b/stats/Cargo.toml @@ -8,6 +8,7 @@ edition = "2021" path = "src/stats.rs" [dependencies] +stats_types = { path = "../stats_types" } stats_proc = { path = "../stats_proc" } log = { path = "../log" } err = { path = "../../daqbuffer/err" } diff --git a/stats/src/stats.rs b/stats/src/stats.rs index 0092d02..360368c 100644 --- a/stats/src/stats.rs +++ b/stats/src/stats.rs @@ -1,6 +1,7 @@ +use stats_types::*; use std::fmt; use std::sync::atomic::AtomicU64; -use std::sync::atomic::Ordering::{AcqRel, Acquire, SeqCst}; +use std::sync::atomic::Ordering::{self, AcqRel, Acquire, SeqCst}; use std::sync::RwLock; use std::time::{Duration, Instant}; @@ -113,15 +114,18 @@ impl IntervalEma { } } -stats_proc::stats_struct!( - name(CaConnStats2), - counters( - // - inserts_val, - inserts_msp, - inserts_discard, - ) -); +stats_proc::stats_struct2!(( + StatsStruct( + name(CaConnStats2), + counters( + // + inserts_val, + inserts_msp, + inserts_discard, + ), + ), + StatsStruct(name(SomeOtherStats), counters(c1, c2,),), +)); pub struct CaConnStats { pub poll_time_all: AtomicU64, diff --git a/stats_proc/Cargo.toml b/stats_proc/Cargo.toml index d401fcb..4af546f 100644 --- a/stats_proc/Cargo.toml +++ b/stats_proc/Cargo.toml @@ -9,4 +9,6 @@ path = "src/stats_proc.rs" proc-macro = true [dependencies] +stats_types = { path = "../stats_types" } syn = "1" +quote = "1" diff --git a/stats_proc/src/stats_proc.rs b/stats_proc/src/stats_proc.rs index cef6d01..5aa830e 100644 --- a/stats_proc/src/stats_proc.rs +++ b/stats_proc/src/stats_proc.rs @@ -1,15 +1,9 @@ -use proc_macro::{Delimiter, TokenStream, TokenTree}; - -#[derive(Debug)] -struct Counter { - name: String, -} - -#[derive(Debug)] -struct StatsStruct { - name: String, - counters: Vec, -} +use proc_macro::{Delimiter, Span, TokenStream, TokenTree}; +use quote::quote; +use stats_types::*; +use syn::parse::ParseStream; +use syn::punctuated::Punctuated; +use syn::{parse_macro_input, ExprTuple, Ident, Token}; fn parse_name(gr: proc_macro::Group) -> String { for tok in gr.stream() { @@ -35,21 +29,27 @@ fn parse_counters(gr: proc_macro::Group, a: &mut Vec) { } } -fn stats_struct_agg_impl(st: &StatsStruct) -> String { +fn extend_str(mut a: String, x: impl AsRef) -> String { + a.push_str(x.as_ref()); + a +} + +fn stats_struct_agg_impl(st: &StatsStructDef) -> String { let name = format!("{}Agg", st.name); + let name_inp = &st.name; let counters_decl: Vec<_> = st .counters .iter() - .map(|x| format!("pub {}: AtomicU64", x.name)) + .map(|x| format!("pub {}: AtomicU64", x.to_string())) .collect(); let counters_decl = counters_decl.join(",\n"); let inits: Vec<_> = st .counters .iter() - .map(|x| format!("{}: AtomicU64::new(0)", x.name)) + .map(|x| format!("{}: AtomicU64::new(0)", x.to_string())) .collect(); let inits = inits.join(",\n"); - format!( + let mut code = format!( " pub struct {name} {{ pub ts_create: Instant, @@ -64,20 +64,60 @@ impl {name} {{ aggcount: AtomicU64::new(0), {inits}, }} + }}" + ); + let counters_add = st + .counters + .iter() + .map(|x| { + format!( + "self.{}.fetch_add(inp.{}.load(Ordering::Acquire), Ordering::AcqRel);\n", + x.to_string(), + x.to_string() + ) + }) + .fold(String::new(), extend_str); + let s = format!( + " + pub fn push(&self, inp: &{name_inp}) {{ + {counters_add} }} -}} - " - ) +" + ); + code.push_str(&s); + code.push_str( + " +} +", + ); + code } -fn stats_struct_impl(st: &StatsStruct) -> String { +fn stats_struct_impl(st: &StatsStructDef) -> String { let name = &st.name; let inits: Vec<_> = st .counters .iter() - .map(|x| format!("{}: AtomicU64::new(0)", x.name)) + .map(|x| format!("{}: AtomicU64::new(0)", x.to_string())) .collect(); let inits = inits.join(",\n"); + let incers: String = st + .counters + .iter() + .map(|x| { + let nn = x.to_string(); + format!( + " + pub fn {nn}_inc(&mut self) {{ + self.{nn}.fetch_add(1, Ordering::AcqRel); + }} + pub fn {nn}_add(&mut self, v: u64) {{ + self.{nn}.fetch_add(v, Ordering::AcqRel); + }} +" + ) + }) + .fold(String::new(), |a, x| format!("{}{}", a, x)); format!( " impl {name} {{ @@ -87,27 +127,66 @@ impl {name} {{ {inits} }} }} + + {incers} }} " ) } -fn make_code(st: &StatsStruct) -> String { - let counters_decl: Vec<_> = st +fn stats_struct_agg_diff_impl(st: &StatsStruct) -> String { + let name = format!("{}AggDiff", st.name); + let name_inp = &st.name; + let decl = st .counters .iter() - .map(|x| format!("pub {}: AtomicU64", x.name)) - .collect(); - let counters_decl = counters_decl.join(",\n"); - let structt = format!( + .map(|x| format!("{}: AtomicU64,\n", x.name)) + .fold(String::new(), extend_str); + + // TODO the diff method must belong to StructAgg. + let diffs = st + .counters + .iter() + .map(|x| { + format!( + " + pub fn diff_against(&self, k: &Self) -> {name} {{ + + }} +" + ) + }) + .fold(String::new(), extend_str); + let code = format!( " -pub struct {} {{ - pub ts_create: Instant, - {}, +pub struct {name} {{ + pub dt: AtomicU64, + pub aggcount: AtomicU64, + {decl}, }} -", - st.name, counters_decl +impl {name} {{ + {diffs} +}} +" + ); + code +} + +fn stats_struct_decl_impl(st: &StatsStructDef) -> String { + let name = &st.name; + let counters_decl = st + .counters + .iter() + .map(|x| format!("pub {}: AtomicU64,\n", x.to_string())) + .fold(String::new(), extend_str); + let structt = format!( + " +pub struct {name} {{ + pub ts_create: Instant, + {counters_decl} +}} +" ); let code = format!( "{}\n\n{}\n\n{}", @@ -118,6 +197,211 @@ pub struct {} {{ code } +fn stats_struct_def(st: &StatsStruct) -> String { + let name_def = format!("{}", st.name); + let structt = format!( + " +const {name_def}: StatsStructDef = StatsStructDef {{ + name: String::new(), + counters: vec![], +}}; +" + ); + structt +} + +fn ident_from_expr(inp: syn::Expr) -> syn::Result { + use syn::spanned::Spanned; + match inp { + syn::Expr::Path(k) => { + if k.path.segments.len() == 1 { + Ok(k.path.segments[0].ident.clone()) + } else { + Err(syn::Error::new(k.span(), "Expect identifier")) + } + } + _ => Err(syn::Error::new(inp.span(), "Expect identifier")), + } +} + +fn idents_from_exprs(inp: PunctExpr) -> syn::Result> { + let mut ret = vec![]; + for k in inp { + let g = ident_from_expr(k)?; + ret.push(g); + } + Ok(ret) +} + +fn func_name_from_expr(inp: syn::Expr) -> syn::Result { + use syn::spanned::Spanned; + use syn::{Error, Expr}; + match inp { + Expr::Path(k) => { + if k.path.segments.len() != 1 { + return Err(Error::new(k.span(), "Expect function name")); + } + let res = k.path.segments[0].ident.clone(); + Ok(res) + } + _ => { + return Err(Error::new(inp.span(), "Expect function name")); + } + } +} + +fn extract_some_stuff_as_string(inp: impl IntoIterator) -> Result, syn::Error> { + use syn::spanned::Spanned; + use syn::{Error, Expr}; + let inp: Vec<_> = inp.into_iter().collect(); + let args: Vec<_> = inp + .into_iter() + .map(|k| match k { + Expr::Path(k) => { + let sp = k.span(); + if k.path.segments.len() != 1 { + return Err(Error::new(sp, "Expect function name with one segment")); + } + let res = k.path.segments[0].ident.clone(); + Ok(res) + } + _ => { + return Err(Error::new(k.span(), format!("Expect function name Path {k:?}"))); + } + }) + .collect(); + for k in &args { + if k.is_err() { + return Err(k.clone().unwrap_err()); + } + } + let args = args.into_iter().map(Result::unwrap).map(|x| x.to_string()).collect(); + Ok(args) +} + +fn func_args_from_expr(inp: Punctuated) -> Result, syn::Error> { + use syn::spanned::Spanned; + use syn::{Error, Expr}; + let inp: Vec<_> = inp.into_iter().collect(); + let args: Vec<_> = inp + .into_iter() + .map(|k| match k { + Expr::Path(k) => { + let sp = k.span(); + if k.path.segments.len() != 1 { + return Err(Error::new(sp, "Expect function name with one segment")); + } + let res = k.path.segments[0].ident.clone(); + Ok(res) + } + _ => { + return Err(Error::new(k.span(), format!("Expect function name Path {k:?}"))); + } + }) + .collect(); + for k in &args { + if k.is_err() { + return Err(k.clone().unwrap_err()); + } + } + let args = args.into_iter().map(Result::unwrap).collect(); + Ok(args) +} + +type PunctExpr = syn::punctuated::Punctuated; + +struct FuncCallWithArgs { + name: Ident, + args: PunctExpr, +} + +impl FuncCallWithArgs { + fn from_expr(inp: syn::Expr) -> Result { + use syn::spanned::Spanned; + use syn::{Error, Expr}; + let span_all = inp.span(); + match inp { + Expr::Call(k) => { + let name = func_name_from_expr(*k.func)?; + let args = k.args; + let ret = FuncCallWithArgs { name, args }; + Ok(ret) + } + _ => { + return Err(Error::new(span_all, format!("BAD {:?}", inp))); + } + } + } +} + +#[derive(Debug)] +struct StatsStructDef { + name: syn::Ident, + counters: Vec, +} + +impl StatsStructDef { + fn from_args(inp: PunctExpr) -> syn::Result { + let mut name = None; + let mut counters = None; + for k in inp { + let fa = FuncCallWithArgs::from_expr(k)?; + if fa.name == "name" { + let ident = ident_from_expr(fa.args[0].clone())?; + name = Some(ident); + } + if fa.name == "counters" { + let idents = idents_from_exprs(fa.args)?; + counters = Some(idents); + } + } + let ret = StatsStructDef { + name: name.expect("Expect name for StatsStructDef"), + counters: counters.unwrap_or(vec![]), + }; + Ok(ret) + } +} + +#[derive(Debug)] +struct StatsTreeDef { + stats_struct_defs: Vec, +} + +impl syn::parse::Parse for StatsTreeDef { + fn parse(inp: ParseStream) -> syn::Result { + let k = inp.parse::()?; + let mut a = vec![]; + for k in k.elems { + let fa = FuncCallWithArgs::from_expr(k)?; + if fa.name == "StatsStruct" { + let stats_struct_def = StatsStructDef::from_args(fa.args)?; + a.push(stats_struct_def); + } else { + return Err(syn::Error::new(fa.name.span(), "Unexpected")); + } + } + let ret = StatsTreeDef { stats_struct_defs: a }; + Ok(ret) + } +} + +#[proc_macro] +pub fn stats_struct2(ts: TokenStream) -> TokenStream { + let def: StatsTreeDef = parse_macro_input!(ts); + //panic!("DEF: {def:?}"); + let mut ts1 = TokenStream::new(); + for k in def.stats_struct_defs { + let s = stats_struct_decl_impl(&k); + let ts2: TokenStream = s.parse().unwrap(); + ts1.extend(ts2); + } + let _ts3 = TokenStream::from(quote!( + mod asd {} + )); + ts1 +} + #[proc_macro] pub fn stats_struct(ts: TokenStream) -> TokenStream { use std::fmt::Write; @@ -159,7 +443,7 @@ pub fn stats_struct(ts: TokenStream) -> TokenStream { write!(log, "{:?}\n", stats_struct); //panic!("{}", make_code(&stats_struct)); //panic!("{}", log); - let ts2 = TokenStream::from_iter(ts.into_iter()); - TokenTree::Group(proc_macro::Group::new(Delimiter::Brace, TokenStream::new())); - make_code(&stats_struct).parse().unwrap() + //let ts2 = TokenStream::from_iter(ts.into_iter()); + //TokenTree::Group(proc_macro::Group::new(Delimiter::Brace, TokenStream::new())); + stats_struct_def(&stats_struct).parse().unwrap() } diff --git a/stats_types/Cargo.toml b/stats_types/Cargo.toml new file mode 100644 index 0000000..c7e011b --- /dev/null +++ b/stats_types/Cargo.toml @@ -0,0 +1,10 @@ +[package] +name = "stats_types" +version = "0.0.1" +authors = ["Dominik Werder "] +edition = "2021" + +[lib] +path = "src/stats_types.rs" + +#[dependencies] diff --git a/stats_types/src/stats_types.rs b/stats_types/src/stats_types.rs new file mode 100644 index 0000000..6f59644 --- /dev/null +++ b/stats_types/src/stats_types.rs @@ -0,0 +1,15 @@ +#[derive(Debug)] +pub struct Counter { + pub name: String, +} + +#[derive(Debug)] +pub struct StatsStruct { + pub name: String, + pub counters: Vec, +} + +pub struct StatsStructDef { + pub name: String, + pub counters: Vec, +}