Refactor stats collection [WIP]

This commit is contained in:
Dominik Werder
2022-05-16 15:18:32 +02:00
parent ae197e2ef2
commit c159b83b8c
7 changed files with 367 additions and 46 deletions

View File

@@ -11,7 +11,7 @@ use log::*;
use netpod::Database;
use scylla::batch::Consistency;
use serde::{Deserialize, Serialize};
use stats::{CaConnStats2, CaConnVecStats};
use stats::{CaConnStats2, CaConnStats2Agg, CaConnVecStats};
use std::collections::BTreeMap;
use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
use std::path::PathBuf;
@@ -322,9 +322,14 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> {
for st in &conn_stats_all {
agg.push(&st);
}
let mut agg2 = CaConnStats2Agg::new();
for st in &conn_stats2 {
agg2.push(&st);
}
let diff = agg.diff_against(&agg_last);
info!("{diff}");
agg_last = agg;
agg2_last = agg2;
}
for jh in conn_jhs {
match jh.await {

View File

@@ -8,6 +8,7 @@ edition = "2021"
path = "src/stats.rs"
[dependencies]
stats_types = { path = "../stats_types" }
stats_proc = { path = "../stats_proc" }
log = { path = "../log" }
err = { path = "../../daqbuffer/err" }

View File

@@ -1,6 +1,7 @@
use stats_types::*;
use std::fmt;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering::{AcqRel, Acquire, SeqCst};
use std::sync::atomic::Ordering::{self, AcqRel, Acquire, SeqCst};
use std::sync::RwLock;
use std::time::{Duration, Instant};
@@ -113,15 +114,18 @@ impl IntervalEma {
}
}
stats_proc::stats_struct!(
name(CaConnStats2),
counters(
//
inserts_val,
inserts_msp,
inserts_discard,
)
);
stats_proc::stats_struct2!((
StatsStruct(
name(CaConnStats2),
counters(
//
inserts_val,
inserts_msp,
inserts_discard,
),
),
StatsStruct(name(SomeOtherStats), counters(c1, c2,),),
));
pub struct CaConnStats {
pub poll_time_all: AtomicU64,

View File

@@ -9,4 +9,6 @@ path = "src/stats_proc.rs"
proc-macro = true
[dependencies]
stats_types = { path = "../stats_types" }
syn = "1"
quote = "1"

View File

@@ -1,15 +1,9 @@
use proc_macro::{Delimiter, TokenStream, TokenTree};
#[derive(Debug)]
struct Counter {
name: String,
}
#[derive(Debug)]
struct StatsStruct {
name: String,
counters: Vec<Counter>,
}
use proc_macro::{Delimiter, Span, TokenStream, TokenTree};
use quote::quote;
use stats_types::*;
use syn::parse::ParseStream;
use syn::punctuated::Punctuated;
use syn::{parse_macro_input, ExprTuple, Ident, Token};
fn parse_name(gr: proc_macro::Group) -> String {
for tok in gr.stream() {
@@ -35,21 +29,27 @@ fn parse_counters(gr: proc_macro::Group, a: &mut Vec<Counter>) {
}
}
fn stats_struct_agg_impl(st: &StatsStruct) -> String {
fn extend_str(mut a: String, x: impl AsRef<str>) -> String {
a.push_str(x.as_ref());
a
}
fn stats_struct_agg_impl(st: &StatsStructDef) -> String {
let name = format!("{}Agg", st.name);
let name_inp = &st.name;
let counters_decl: Vec<_> = st
.counters
.iter()
.map(|x| format!("pub {}: AtomicU64", x.name))
.map(|x| format!("pub {}: AtomicU64", x.to_string()))
.collect();
let counters_decl = counters_decl.join(",\n");
let inits: Vec<_> = st
.counters
.iter()
.map(|x| format!("{}: AtomicU64::new(0)", x.name))
.map(|x| format!("{}: AtomicU64::new(0)", x.to_string()))
.collect();
let inits = inits.join(",\n");
format!(
let mut code = format!(
"
pub struct {name} {{
pub ts_create: Instant,
@@ -64,20 +64,60 @@ impl {name} {{
aggcount: AtomicU64::new(0),
{inits},
}}
}}"
);
let counters_add = st
.counters
.iter()
.map(|x| {
format!(
"self.{}.fetch_add(inp.{}.load(Ordering::Acquire), Ordering::AcqRel);\n",
x.to_string(),
x.to_string()
)
})
.fold(String::new(), extend_str);
let s = format!(
"
pub fn push(&self, inp: &{name_inp}) {{
{counters_add}
}}
}}
"
)
"
);
code.push_str(&s);
code.push_str(
"
}
",
);
code
}
fn stats_struct_impl(st: &StatsStruct) -> String {
fn stats_struct_impl(st: &StatsStructDef) -> String {
let name = &st.name;
let inits: Vec<_> = st
.counters
.iter()
.map(|x| format!("{}: AtomicU64::new(0)", x.name))
.map(|x| format!("{}: AtomicU64::new(0)", x.to_string()))
.collect();
let inits = inits.join(",\n");
let incers: String = st
.counters
.iter()
.map(|x| {
let nn = x.to_string();
format!(
"
pub fn {nn}_inc(&mut self) {{
self.{nn}.fetch_add(1, Ordering::AcqRel);
}}
pub fn {nn}_add(&mut self, v: u64) {{
self.{nn}.fetch_add(v, Ordering::AcqRel);
}}
"
)
})
.fold(String::new(), |a, x| format!("{}{}", a, x));
format!(
"
impl {name} {{
@@ -87,27 +127,66 @@ impl {name} {{
{inits}
}}
}}
{incers}
}}
"
)
}
fn make_code(st: &StatsStruct) -> String {
let counters_decl: Vec<_> = st
fn stats_struct_agg_diff_impl(st: &StatsStruct) -> String {
let name = format!("{}AggDiff", st.name);
let name_inp = &st.name;
let decl = st
.counters
.iter()
.map(|x| format!("pub {}: AtomicU64", x.name))
.collect();
let counters_decl = counters_decl.join(",\n");
let structt = format!(
.map(|x| format!("{}: AtomicU64,\n", x.name))
.fold(String::new(), extend_str);
// TODO the diff method must belong to StructAgg.
let diffs = st
.counters
.iter()
.map(|x| {
format!(
"
pub fn diff_against(&self, k: &Self) -> {name} {{
}}
"
)
})
.fold(String::new(), extend_str);
let code = format!(
"
pub struct {} {{
pub ts_create: Instant,
{},
pub struct {name} {{
pub dt: AtomicU64,
pub aggcount: AtomicU64,
{decl},
}}
",
st.name, counters_decl
impl {name} {{
{diffs}
}}
"
);
code
}
fn stats_struct_decl_impl(st: &StatsStructDef) -> String {
let name = &st.name;
let counters_decl = st
.counters
.iter()
.map(|x| format!("pub {}: AtomicU64,\n", x.to_string()))
.fold(String::new(), extend_str);
let structt = format!(
"
pub struct {name} {{
pub ts_create: Instant,
{counters_decl}
}}
"
);
let code = format!(
"{}\n\n{}\n\n{}",
@@ -118,6 +197,211 @@ pub struct {} {{
code
}
fn stats_struct_def(st: &StatsStruct) -> String {
let name_def = format!("{}", st.name);
let structt = format!(
"
const {name_def}: StatsStructDef = StatsStructDef {{
name: String::new(),
counters: vec![],
}};
"
);
structt
}
fn ident_from_expr(inp: syn::Expr) -> syn::Result<syn::Ident> {
use syn::spanned::Spanned;
match inp {
syn::Expr::Path(k) => {
if k.path.segments.len() == 1 {
Ok(k.path.segments[0].ident.clone())
} else {
Err(syn::Error::new(k.span(), "Expect identifier"))
}
}
_ => Err(syn::Error::new(inp.span(), "Expect identifier")),
}
}
fn idents_from_exprs(inp: PunctExpr) -> syn::Result<Vec<syn::Ident>> {
let mut ret = vec![];
for k in inp {
let g = ident_from_expr(k)?;
ret.push(g);
}
Ok(ret)
}
fn func_name_from_expr(inp: syn::Expr) -> syn::Result<syn::Ident> {
use syn::spanned::Spanned;
use syn::{Error, Expr};
match inp {
Expr::Path(k) => {
if k.path.segments.len() != 1 {
return Err(Error::new(k.span(), "Expect function name"));
}
let res = k.path.segments[0].ident.clone();
Ok(res)
}
_ => {
return Err(Error::new(inp.span(), "Expect function name"));
}
}
}
fn extract_some_stuff_as_string(inp: impl IntoIterator<Item = syn::Expr>) -> Result<Vec<String>, syn::Error> {
use syn::spanned::Spanned;
use syn::{Error, Expr};
let inp: Vec<_> = inp.into_iter().collect();
let args: Vec<_> = inp
.into_iter()
.map(|k| match k {
Expr::Path(k) => {
let sp = k.span();
if k.path.segments.len() != 1 {
return Err(Error::new(sp, "Expect function name with one segment"));
}
let res = k.path.segments[0].ident.clone();
Ok(res)
}
_ => {
return Err(Error::new(k.span(), format!("Expect function name Path {k:?}")));
}
})
.collect();
for k in &args {
if k.is_err() {
return Err(k.clone().unwrap_err());
}
}
let args = args.into_iter().map(Result::unwrap).map(|x| x.to_string()).collect();
Ok(args)
}
fn func_args_from_expr(inp: Punctuated<syn::Expr, syn::token::Comma>) -> Result<Vec<syn::Ident>, syn::Error> {
use syn::spanned::Spanned;
use syn::{Error, Expr};
let inp: Vec<_> = inp.into_iter().collect();
let args: Vec<_> = inp
.into_iter()
.map(|k| match k {
Expr::Path(k) => {
let sp = k.span();
if k.path.segments.len() != 1 {
return Err(Error::new(sp, "Expect function name with one segment"));
}
let res = k.path.segments[0].ident.clone();
Ok(res)
}
_ => {
return Err(Error::new(k.span(), format!("Expect function name Path {k:?}")));
}
})
.collect();
for k in &args {
if k.is_err() {
return Err(k.clone().unwrap_err());
}
}
let args = args.into_iter().map(Result::unwrap).collect();
Ok(args)
}
type PunctExpr = syn::punctuated::Punctuated<syn::Expr, syn::token::Comma>;
struct FuncCallWithArgs {
name: Ident,
args: PunctExpr,
}
impl FuncCallWithArgs {
fn from_expr(inp: syn::Expr) -> Result<Self, syn::Error> {
use syn::spanned::Spanned;
use syn::{Error, Expr};
let span_all = inp.span();
match inp {
Expr::Call(k) => {
let name = func_name_from_expr(*k.func)?;
let args = k.args;
let ret = FuncCallWithArgs { name, args };
Ok(ret)
}
_ => {
return Err(Error::new(span_all, format!("BAD {:?}", inp)));
}
}
}
}
#[derive(Debug)]
struct StatsStructDef {
name: syn::Ident,
counters: Vec<syn::Ident>,
}
impl StatsStructDef {
fn from_args(inp: PunctExpr) -> syn::Result<Self> {
let mut name = None;
let mut counters = None;
for k in inp {
let fa = FuncCallWithArgs::from_expr(k)?;
if fa.name == "name" {
let ident = ident_from_expr(fa.args[0].clone())?;
name = Some(ident);
}
if fa.name == "counters" {
let idents = idents_from_exprs(fa.args)?;
counters = Some(idents);
}
}
let ret = StatsStructDef {
name: name.expect("Expect name for StatsStructDef"),
counters: counters.unwrap_or(vec![]),
};
Ok(ret)
}
}
#[derive(Debug)]
struct StatsTreeDef {
stats_struct_defs: Vec<StatsStructDef>,
}
impl syn::parse::Parse for StatsTreeDef {
fn parse(inp: ParseStream) -> syn::Result<Self> {
let k = inp.parse::<syn::ExprTuple>()?;
let mut a = vec![];
for k in k.elems {
let fa = FuncCallWithArgs::from_expr(k)?;
if fa.name == "StatsStruct" {
let stats_struct_def = StatsStructDef::from_args(fa.args)?;
a.push(stats_struct_def);
} else {
return Err(syn::Error::new(fa.name.span(), "Unexpected"));
}
}
let ret = StatsTreeDef { stats_struct_defs: a };
Ok(ret)
}
}
#[proc_macro]
pub fn stats_struct2(ts: TokenStream) -> TokenStream {
let def: StatsTreeDef = parse_macro_input!(ts);
//panic!("DEF: {def:?}");
let mut ts1 = TokenStream::new();
for k in def.stats_struct_defs {
let s = stats_struct_decl_impl(&k);
let ts2: TokenStream = s.parse().unwrap();
ts1.extend(ts2);
}
let _ts3 = TokenStream::from(quote!(
mod asd {}
));
ts1
}
#[proc_macro]
pub fn stats_struct(ts: TokenStream) -> TokenStream {
use std::fmt::Write;
@@ -159,7 +443,7 @@ pub fn stats_struct(ts: TokenStream) -> TokenStream {
write!(log, "{:?}\n", stats_struct);
//panic!("{}", make_code(&stats_struct));
//panic!("{}", log);
let ts2 = TokenStream::from_iter(ts.into_iter());
TokenTree::Group(proc_macro::Group::new(Delimiter::Brace, TokenStream::new()));
make_code(&stats_struct).parse().unwrap()
//let ts2 = TokenStream::from_iter(ts.into_iter());
//TokenTree::Group(proc_macro::Group::new(Delimiter::Brace, TokenStream::new()));
stats_struct_def(&stats_struct).parse().unwrap()
}

10
stats_types/Cargo.toml Normal file
View File

@@ -0,0 +1,10 @@
[package]
name = "stats_types"
version = "0.0.1"
authors = ["Dominik Werder <dominik.werder@gmail.com>"]
edition = "2021"
[lib]
path = "src/stats_types.rs"
#[dependencies]

View File

@@ -0,0 +1,15 @@
#[derive(Debug)]
pub struct Counter {
pub name: String,
}
#[derive(Debug)]
pub struct StatsStruct {
pub name: String,
pub counters: Vec<Counter>,
}
pub struct StatsStructDef {
pub name: String,
pub counters: Vec<Counter>,
}