From ae197e2ef206ab2e81641c71f182d42727a8fe3f Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Fri, 13 May 2022 16:31:23 +0200 Subject: [PATCH] Count inserts --- netfetch/src/ca.rs | 11 ++- netfetch/src/ca/conn.rs | 104 +++++++++++++--------- netfetch/src/ca/proto.rs | 19 ++-- stats/Cargo.toml | 1 + stats/src/stats.rs | 68 ++++++++++++++- stats_proc/Cargo.toml | 12 +++ stats_proc/src/stats_proc.rs | 165 +++++++++++++++++++++++++++++++++++ 7 files changed, 327 insertions(+), 53 deletions(-) create mode 100644 stats_proc/Cargo.toml create mode 100644 stats_proc/src/stats_proc.rs diff --git a/netfetch/src/ca.rs b/netfetch/src/ca.rs index b43b1a0..96b57d2 100644 --- a/netfetch/src/ca.rs +++ b/netfetch/src/ca.rs @@ -11,7 +11,7 @@ use log::*; use netpod::Database; use scylla::batch::Consistency; use serde::{Deserialize, Serialize}; -use stats::CaConnVecStats; +use stats::{CaConnStats2, CaConnVecStats}; use std::collections::BTreeMap; use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4}; use std::path::PathBuf; @@ -232,7 +232,7 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> { tokio::spawn(pg_conn); let pg_client = Arc::new(pg_client); let scy = scylla::SessionBuilder::new() - .known_node("sf-nube-11:19042") + .known_node("sf-nube-14:19042") .default_consistency(Consistency::One) .use_keyspace("ks1", true) .build() @@ -241,7 +241,7 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> { let scy = Arc::new(scy); info!("FIND IOCS"); let qu_find_addr = pg_client - .prepare("select addr from ioc_by_channel where facility = $1 and channel = $2") + .prepare("select t2.addr from ioc_by_channel t1, ioc_by_channel t2 where t2.facility = t1.facility and t2.channel = t1.channel and t1.facility = $1 and t1.channel = $2") .await .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; let mut channels_by_host = BTreeMap::new(); @@ -264,7 +264,7 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> { continue; } }; - if ix % 100 == 0 { + if ix % 200 == 0 { info!("{} {} {:?}", ix, ch, addr); } if !channels_by_host.contains_key(&addr) { @@ -281,6 +281,7 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> { let data_store = Arc::new(DataStore::new(pg_client, scy.clone()).await?); let mut conn_jhs = vec![]; let mut conn_stats_all = vec![]; + let mut conn_stats2 = vec![]; for (host, channels) in channels_by_host { if false && host.ip() != &"172.26.24.76".parse::().unwrap() { continue; @@ -291,6 +292,7 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> { let tcp = TcpStream::connect(addr).await?; let mut conn = CaConn::new(tcp, addr, data_store.clone()); conn_stats_all.push(conn.stats()); + conn_stats2.push(conn.stats2()); for c in channels { conn.channel_add(c); } @@ -313,6 +315,7 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> { conn_jhs.push(jh); } let mut agg_last = CaConnVecStats::new(Instant::now()); + let mut agg2_last = CaConnStats2Agg::new(); loop { tokio::time::sleep(Duration::from_millis(2000)).await; let mut agg = CaConnVecStats::new(Instant::now()); diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index 273c5c5..3a5d120 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -5,13 +5,13 @@ use crate::ca::proto::{CreateChan, EventAdd, HeadInfo, ReadNotify}; use crate::series::{Existence, SeriesId}; use crate::store::ScyInsertFut; use err::Error; -use futures_util::stream::FuturesUnordered; +use futures_util::stream::{FuturesOrdered, FuturesUnordered}; use futures_util::{Future, FutureExt, Stream, StreamExt, TryFutureExt}; use libc::c_int; use log::*; use netpod::timeunits::SEC; use netpod::{ScalarType, Shape}; -use stats::CaConnStats; +use stats::{CaConnStats, CaConnStats2, IntervalEma}; use std::collections::{BTreeMap, VecDeque}; use std::net::{Ipv4Addr, SocketAddrV4}; use std::pin::Pin; @@ -23,7 +23,8 @@ use tokio::io::unix::AsyncFd; use tokio::net::TcpStream; const INSERT_FUTS_MAX: usize = 200; -const TABLE_SERIES_MOD: u32 = 2; +const INSERT_FUTS_LIM: usize = 80000; +const TABLE_SERIES_MOD: u32 = 128; #[derive(Debug)] enum ChannelError { @@ -56,6 +57,7 @@ struct CreatedState { state: MonitoringState, ts_msp_last: u64, inserted_in_ts_msp: u64, + ivl_ema: IntervalEma, } #[derive(Debug)] @@ -95,7 +97,7 @@ macro_rules! insert_scalar_impl { fn $fname( data_store: Arc, // TODO maybe use a newtype? - futs_queue: &mut FuturesUnordered> + Send>>>, + futs_queue: &mut FuturesOrdered> + Send>>>, series: SeriesId, ts_msp: u64, ts_lsp: u64, @@ -103,8 +105,12 @@ macro_rules! insert_scalar_impl { ts_msp_changed: bool, st: ScalarType, sh: Shape, - inserts_discarded: &AtomicU64, + stats2: Arc, ) { + if futs_queue.len() >= INSERT_FUTS_LIM { + stats2.inserts_discard.fetch_add(1, Ordering::Release); + return; + } let pulse = 0 as u64; let params = ( series.id() as i64, @@ -114,6 +120,7 @@ macro_rules! insert_scalar_impl { val, ); let fut3 = ScyInsertFut::new(data_store.scy.clone(), data_store.$qu_insert.clone(), params); + stats2.inserts_val.fetch_add(1, Ordering::Release); let fut = if ts_msp_changed { let fut1 = ScyInsertFut::new( data_store.scy.clone(), @@ -131,15 +138,12 @@ macro_rules! insert_scalar_impl { data_store.qu_insert_ts_msp.clone(), (series.id() as i64, ts_msp as i64), ); + stats2.inserts_msp.fetch_add(1, Ordering::Release); Box::pin(fut1.and_then(move |_| fut2).and_then(move |_| fut3)) as _ } else { Box::pin(fut3) as _ }; - if futs_queue.len() >= INSERT_FUTS_MAX { - inserts_discarded.fetch_add(1, Ordering::Release); - } else { - futs_queue.push(fut); - } + futs_queue.push(fut); } }; } @@ -150,7 +154,7 @@ macro_rules! insert_array_impl { fn $fname( data_store: Arc, // TODO maybe use a newtype? - futs_queue: &mut FuturesUnordered> + Send>>>, + futs_queue: &mut FuturesOrdered> + Send>>>, series: SeriesId, ts_msp: u64, ts_lsp: u64, @@ -158,8 +162,12 @@ macro_rules! insert_array_impl { ts_msp_changed: bool, st: ScalarType, sh: Shape, - inserts_discarded: &AtomicU64, + stats2: Arc, ) { + if futs_queue.len() >= INSERT_FUTS_LIM { + stats2.inserts_discard.fetch_add(1, Ordering::Release); + return; + } let pulse = 0 as u64; let params = ( series.id() as i64, @@ -169,6 +177,7 @@ macro_rules! insert_array_impl { val, ); let fut3 = ScyInsertFut::new(data_store.scy.clone(), data_store.$qu_insert.clone(), params); + stats2.inserts_val.fetch_add(1, Ordering::Release); let fut = if ts_msp_changed { let fut1 = ScyInsertFut::new( data_store.scy.clone(), @@ -186,15 +195,12 @@ macro_rules! insert_array_impl { data_store.qu_insert_ts_msp.clone(), (series.id() as i64, ts_msp as i64), ); + stats2.inserts_msp.fetch_add(1, Ordering::Release); Box::pin(fut1.and_then(move |_| fut2).and_then(move |_| fut3)) as _ } else { Box::pin(fut3) as _ }; - if futs_queue.len() >= INSERT_FUTS_MAX { - inserts_discarded.fetch_add(1, Ordering::Release); - } else { - futs_queue.push(fut); - } + futs_queue.push(fut); } }; } @@ -212,8 +218,7 @@ insert_array_impl!(insert_array_f64, f64, qu_insert_array_f64); macro_rules! match_scalar_value_insert { ($stv:ident, $insf:ident, $val:expr, $comm:expr) => {{ - let (data_store, futs_queue, series, ts_msp, ts_lsp, ts_msp_changed, scalar_type, shape, inserts_discarded) = - $comm; + let (data_store, futs_queue, series, ts_msp, ts_lsp, ts_msp_changed, scalar_type, shape, stats2) = $comm; match shape { Shape::Scalar => match scalar_type { ScalarType::$stv => $insf( @@ -226,7 +231,7 @@ macro_rules! match_scalar_value_insert { ts_msp_changed, scalar_type, shape, - inserts_discarded, + stats2, ), _ => { error!("unexpected value type insf {:?}", stringify!($insf)); @@ -240,13 +245,12 @@ macro_rules! match_scalar_value_insert { ); } } - };}; + }}; } macro_rules! match_array_value_insert { ($stv:ident, $insf:ident, $val:expr, $comm:expr) => {{ - let (data_store, futs_queue, series, ts_msp, ts_lsp, ts_msp_changed, scalar_type, shape, inserts_discarded) = - $comm; + let (data_store, futs_queue, series, ts_msp, ts_lsp, ts_msp_changed, scalar_type, shape, stats2) = $comm; match shape { Shape::Wave(_) => match scalar_type { ScalarType::$stv => $insf( @@ -259,7 +263,7 @@ macro_rules! match_array_value_insert { ts_msp_changed, scalar_type, shape, - inserts_discarded, + stats2, ), _ => { error!("unexpected value type insf {:?}", stringify!($insf)); @@ -273,7 +277,7 @@ macro_rules! match_array_value_insert { ); } } - };}; + }}; } pub struct CaConn { @@ -290,12 +294,12 @@ pub struct CaConn { name_by_cid: BTreeMap, poll_count: usize, data_store: Arc, - fut_get_series: FuturesUnordered< - Pin), Error>> + Send>>, - >, - value_insert_futs: FuturesUnordered> + Send>>>, + fut_get_series: + FuturesOrdered), Error>> + Send>>>, + value_insert_futs: FuturesOrdered> + Send>>>, remote_addr_dbg: SocketAddrV4, stats: Arc, + stats2: Arc, } impl CaConn { @@ -314,10 +318,11 @@ impl CaConn { name_by_cid: BTreeMap::new(), poll_count: 0, data_store, - fut_get_series: FuturesUnordered::new(), - value_insert_futs: FuturesUnordered::new(), + fut_get_series: FuturesOrdered::new(), + value_insert_futs: FuturesOrdered::new(), remote_addr_dbg, stats: Arc::new(CaConnStats::new()), + stats2: Arc::new(CaConnStats2::new()), } } @@ -325,6 +330,10 @@ impl CaConn { self.stats.clone() } + pub fn stats2(&self) -> Arc { + self.stats2.clone() + } + pub fn channel_add(&mut self, channel: String) { let cid = self.cid_by_name(&channel); if self.channels.contains_key(&cid) { @@ -404,6 +413,7 @@ impl CaConn { state: MonitoringState::AddingEvent(series), ts_msp_last: 0, inserted_in_ts_msp: u64::MAX, + ivl_ema: IntervalEma::new(), }); let scalar_type = ScalarType::from_ca_id(data_type)?; let shape = Shape::from_ca_count(data_count)?; @@ -442,7 +452,7 @@ impl CaConn { let epoch = ts.duration_since(std::time::UNIX_EPOCH).unwrap(); // TODO decide on better msp/lsp: random offset! // As long as one writer is active, the msp is arbitrary. - let ts = epoch.as_secs() * 1000000000 + epoch.subsec_nanos() as u64; + let ts = epoch.as_secs() * SEC + epoch.subsec_nanos() as u64; let ts_msp = if inserted_in_ts_msp > 2000 { let ts_msp = ts / (60 * SEC) * (60 * SEC); if let ChannelState::Created(st) = self.channels.get_mut(&cid).unwrap() { @@ -486,7 +496,7 @@ impl CaConn { ts_msp_changed, scalar_type, shape, - &self.stats.inserts_discarded, + self.stats2.clone(), ); match ev.value { Scalar(v) => match v { @@ -702,6 +712,7 @@ impl CaConn { state: MonitoringState::FetchSeriesId, ts_msp_last: 0, inserted_in_ts_msp: u64::MAX, + ivl_ema: IntervalEma::new(), }); // TODO handle error in different way. Should most likely not abort. let cd = ChannelDescDecoded { @@ -764,22 +775,27 @@ impl Stream for CaConn { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; - let ts1 = Instant::now(); + let ts_outer_1 = Instant::now(); + let mut ts1 = ts_outer_1; self.poll_count += 1; let ret = loop { - let ts1 = Instant::now(); self.handle_insert_futs(cx)?; let ts2 = Instant::now(); self.stats .poll_time_handle_insert_futs .fetch_add((ts2.duration_since(ts1) * 1000000).as_secs(), Ordering::AcqRel); - let ts1 = ts2; + ts1 = ts2; self.handle_get_series_futs(cx)?; let ts2 = Instant::now(); self.stats .poll_time_get_series_futs .fetch_add((ts2.duration_since(ts1) * 1000000).as_secs(), Ordering::AcqRel); - let mut ts1 = ts2; + ts1 = ts2; + if self.value_insert_futs.len() >= INSERT_FUTS_MAX { + // TODO do not do more. + // But: can I assume that in this case we triggered a Pending? + break Pending; + } break match &self.state { CaConnState::Init => { let msg = CaMsg { ty: CaMsgTy::Version }; @@ -817,10 +833,16 @@ impl Stream for CaConn { CaConnState::Done => Ready(None), }; }; - let ts2 = Instant::now(); - self.stats - .poll_time_all - .fetch_add((ts2.duration_since(ts1) * 1000000).as_secs(), Ordering::AcqRel); + let nn = self.value_insert_futs.len() as u64; + if nn > 1000 { + warn!("insert_queue_len {nn}"); + } + self.stats.insert_queue_len.store(nn, Ordering::Release); + let ts_outer_2 = Instant::now(); + self.stats.poll_time_all.fetch_add( + (ts_outer_2.duration_since(ts_outer_1) * 1000000).as_secs(), + Ordering::AcqRel, + ); ret } } diff --git a/netfetch/src/ca/proto.rs b/netfetch/src/ca/proto.rs index 3013b75..aa4164c 100644 --- a/netfetch/src/ca/proto.rs +++ b/netfetch/src/ca/proto.rs @@ -294,7 +294,7 @@ impl CaMsgTy { VersionRes(_) => {} ClientName => { // TODO allow variable client name. - let s = "werder_d".as_bytes(); + let s = "daqingest".as_bytes(); let n = s.len(); buf.fill(0); buf[..n].copy_from_slice(s); @@ -718,7 +718,6 @@ impl CaProto { if let Ok(buf) = self.outbuf.write_buf(item.len()) { Some((item, buf)) } else { - error!("output buffer too small for message"); None } } else { @@ -755,8 +754,13 @@ impl CaProto { break None; } while let Some((msg, buf)) = self.out_msg_buf() { - msg.place_into(buf); - self.out.pop_front(); + if msg.len() > buf.len() { + error!("got output buffer but too small"); + break; + } else { + msg.place_into(buf); + self.out.pop_front(); + } } while self.outbuf.len() > 0 { match Self::attempt_output(self.as_mut(), cx)? { @@ -851,7 +855,12 @@ impl CaProto { break match &self.state { CaState::StdHead => { let hi = HeadInfo::from_netbuf(&mut self.buf)?; - if hi.cmdid == 6 || hi.cmdid > 26 || hi.data_type > 10 || hi.payload_size > 2800 { + if hi.cmdid == 6 + || hi.cmdid > 26 + || hi.data_type > 10 + || hi.data_count > 4096 + || hi.payload_size > 1024 * 32 + { warn!("StdHead sees {hi:?}"); } if hi.payload_size == 0xffff && hi.data_count == 0 { diff --git a/stats/Cargo.toml b/stats/Cargo.toml index fcf30a2..916055e 100644 --- a/stats/Cargo.toml +++ b/stats/Cargo.toml @@ -8,6 +8,7 @@ edition = "2021" path = "src/stats.rs" [dependencies] +stats_proc = { path = "../stats_proc" } log = { path = "../log" } err = { path = "../../daqbuffer/err" } libc = "0.2" diff --git a/stats/src/stats.rs b/stats/src/stats.rs index fc23133..0092d02 100644 --- a/stats/src/stats.rs +++ b/stats/src/stats.rs @@ -1,6 +1,6 @@ use std::fmt; use std::sync::atomic::AtomicU64; -use std::sync::atomic::Ordering::{AcqRel, Acquire}; +use std::sync::atomic::Ordering::{AcqRel, Acquire, SeqCst}; use std::sync::RwLock; use std::time::{Duration, Instant}; @@ -8,6 +8,7 @@ const US: u64 = 1000; const MS: u64 = US * 1000; const SEC: u64 = MS * 1000; +#[derive(Debug)] pub struct EMA { ema: f32, emv: f32, @@ -84,6 +85,44 @@ impl CheckEvery { } } +#[derive(Debug)] +pub struct IntervalEma { + tslast: Option, + ema: EMA, +} + +impl IntervalEma { + pub fn new() -> Self { + Self { + tslast: None, + ema: EMA::default(), + } + } + + pub fn tick(&mut self, tsnow: Instant) { + match self.tslast { + Some(tslast) => { + let dt = tsnow.duration_since(tslast); + self.tslast = Some(tsnow); + self.ema.update(dt.as_secs_f32()); + } + None => { + self.tslast = Some(tsnow); + } + } + } +} + +stats_proc::stats_struct!( + name(CaConnStats2), + counters( + // + inserts_val, + inserts_msp, + inserts_discard, + ) +); + pub struct CaConnStats { pub poll_time_all: AtomicU64, pub poll_time_handle_insert_futs: AtomicU64, @@ -94,6 +133,7 @@ pub struct CaConnStats { pub time_handle_event_add_res: AtomicU64, pub inserts_started: AtomicU64, pub inserts_discarded: AtomicU64, + pub insert_queue_len: AtomicU64, } impl CaConnStats { @@ -108,12 +148,14 @@ impl CaConnStats { time_handle_event_add_res: AtomicU64::new(0), inserts_started: AtomicU64::new(0), inserts_discarded: AtomicU64::new(0), + insert_queue_len: AtomicU64::new(0), } } } pub struct CaConnVecStats { pub ts_create: RwLock, + pub nstats: AtomicU64, pub poll_time_all: AtomicU64, pub poll_time_handle_insert_futs: AtomicU64, pub poll_time_get_series_futs: AtomicU64, @@ -123,10 +165,12 @@ pub struct CaConnVecStats { pub time_handle_event_add_res: AtomicU64, pub inserts_started: AtomicU64, pub inserts_discarded: AtomicU64, + pub insert_queue_len: AtomicU64, } pub struct CaConnVecStatsDiff { pub dt: AtomicU64, + pub nstats: AtomicU64, pub poll_time_all: AtomicU64, pub poll_time_handle_insert_futs: AtomicU64, pub poll_time_get_series_futs: AtomicU64, @@ -136,12 +180,14 @@ pub struct CaConnVecStatsDiff { pub time_handle_event_add_res: AtomicU64, pub inserts_started: AtomicU64, pub inserts_discarded: AtomicU64, + pub insert_queue_len: AtomicU64, } impl CaConnVecStats { pub fn new(ts_create: Instant) -> Self { Self { ts_create: RwLock::new(ts_create), + nstats: AtomicU64::new(0), poll_time_all: AtomicU64::new(0), poll_time_handle_insert_futs: AtomicU64::new(0), poll_time_get_series_futs: AtomicU64::new(0), @@ -151,10 +197,12 @@ impl CaConnVecStats { time_handle_event_add_res: AtomicU64::new(0), inserts_started: AtomicU64::new(0), inserts_discarded: AtomicU64::new(0), + insert_queue_len: AtomicU64::new(0), } } pub fn push(&mut self, k: &CaConnStats) { + self.nstats.fetch_add(1, AcqRel); self.poll_time_all.fetch_add(k.poll_time_all.load(Acquire), AcqRel); self.poll_time_handle_insert_futs .fetch_add(k.poll_time_handle_insert_futs.load(Acquire), AcqRel); @@ -171,6 +219,8 @@ impl CaConnVecStats { self.inserts_started.fetch_add(k.inserts_started.load(Acquire), AcqRel); self.inserts_discarded .fetch_add(k.inserts_discarded.load(Acquire), AcqRel); + self.insert_queue_len + .fetch_add(k.insert_queue_len.load(Acquire), SeqCst); } pub fn diff_against(&self, k: &Self) -> CaConnVecStatsDiff { @@ -181,6 +231,7 @@ impl CaConnVecStats { .duration_since(*k.ts_create.read().unwrap()); CaConnVecStatsDiff { dt: AtomicU64::new(dur.as_secs() * SEC + dur.subsec_nanos() as u64), + nstats: AtomicU64::new(self.nstats.load(Acquire)), poll_time_all: AtomicU64::new(self.poll_time_all.load(Acquire) - k.poll_time_all.load(Acquire)), poll_time_handle_insert_futs: AtomicU64::new( self.poll_time_handle_insert_futs.load(Acquire) - k.poll_time_handle_insert_futs.load(Acquire), @@ -202,12 +253,14 @@ impl CaConnVecStats { ), inserts_started: AtomicU64::new(self.inserts_started.load(Acquire) - k.inserts_started.load(Acquire)), inserts_discarded: AtomicU64::new(self.inserts_discarded.load(Acquire) - k.inserts_discarded.load(Acquire)), + insert_queue_len: AtomicU64::new(self.insert_queue_len.load(Acquire)), } } } impl fmt::Display for CaConnVecStatsDiff { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + let nstats = self.nstats.load(Acquire); let insert_freq = self.inserts_started.load(Acquire) / (self.dt.load(Acquire) / SEC); let poll_time = self.poll_time_all.load(Acquire); let poll_time_handle_insert_futs = self.poll_time_handle_insert_futs.load(Acquire); @@ -223,10 +276,19 @@ impl fmt::Display for CaConnVecStatsDiff { let pct_check_channels_state_init = time_check_channels_state_init * 100 / poll_time; let pct_handle_event_add_res = time_handle_event_add_res * 100 / poll_time; let inserts_discarded_freq = self.inserts_discarded.load(Acquire); + let insert_queue_len = self.insert_queue_len.load(Acquire); + let insqavg = insert_queue_len as f32 / nstats as f32; write!( fmt, - "insfreq {} disc {} poll_time {:5} ms inserts {:2}% seriesid {:2}% listen {:2}% peer {:2}% checkinit {:2}% evadd {:2}%", - insert_freq,inserts_discarded_freq, + "nstats {} insq {} insqavg {:.2}\n", + nstats, insert_queue_len, insqavg + ) + .unwrap(); + write!( + fmt, + "infr {} dis {} pt {:5} ins {:2}% sid {:2}% listen {:2}% peer {:2}% checkinit {:2}% evadd {:2}%", + insert_freq, + inserts_discarded_freq, poll_time / 1000, poll_pct_handle_insert_futs, poll_pct_get_series_futs, diff --git a/stats_proc/Cargo.toml b/stats_proc/Cargo.toml new file mode 100644 index 0000000..d401fcb --- /dev/null +++ b/stats_proc/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "stats_proc" +version = "0.0.1" +authors = ["Dominik Werder "] +edition = "2021" + +[lib] +path = "src/stats_proc.rs" +proc-macro = true + +[dependencies] +syn = "1" diff --git a/stats_proc/src/stats_proc.rs b/stats_proc/src/stats_proc.rs new file mode 100644 index 0000000..cef6d01 --- /dev/null +++ b/stats_proc/src/stats_proc.rs @@ -0,0 +1,165 @@ +use proc_macro::{Delimiter, TokenStream, TokenTree}; + +#[derive(Debug)] +struct Counter { + name: String, +} + +#[derive(Debug)] +struct StatsStruct { + name: String, + counters: Vec, +} + +fn parse_name(gr: proc_macro::Group) -> String { + for tok in gr.stream() { + match tok { + TokenTree::Ident(k) => { + return k.to_string(); + } + _ => {} + } + } + panic!(); +} + +fn parse_counters(gr: proc_macro::Group, a: &mut Vec) { + for tok in gr.stream() { + match tok { + TokenTree::Ident(k) => { + let x = Counter { name: k.to_string() }; + a.push(x); + } + _ => {} + } + } +} + +fn stats_struct_agg_impl(st: &StatsStruct) -> String { + let name = format!("{}Agg", st.name); + let counters_decl: Vec<_> = st + .counters + .iter() + .map(|x| format!("pub {}: AtomicU64", x.name)) + .collect(); + let counters_decl = counters_decl.join(",\n"); + let inits: Vec<_> = st + .counters + .iter() + .map(|x| format!("{}: AtomicU64::new(0)", x.name)) + .collect(); + let inits = inits.join(",\n"); + format!( + " +pub struct {name} {{ + pub ts_create: Instant, + pub aggcount: AtomicU64, + {counters_decl}, +}} + +impl {name} {{ + pub fn new() -> Self {{ + Self {{ + ts_create: Instant::now(), + aggcount: AtomicU64::new(0), + {inits}, + }} + }} +}} + " + ) +} + +fn stats_struct_impl(st: &StatsStruct) -> String { + let name = &st.name; + let inits: Vec<_> = st + .counters + .iter() + .map(|x| format!("{}: AtomicU64::new(0)", x.name)) + .collect(); + let inits = inits.join(",\n"); + format!( + " +impl {name} {{ + pub fn new() -> Self {{ + Self {{ + ts_create: Instant::now(), + {inits} + }} + }} +}} + " + ) +} + +fn make_code(st: &StatsStruct) -> String { + let counters_decl: Vec<_> = st + .counters + .iter() + .map(|x| format!("pub {}: AtomicU64", x.name)) + .collect(); + let counters_decl = counters_decl.join(",\n"); + let structt = format!( + " +pub struct {} {{ + pub ts_create: Instant, + {}, +}} + +", + st.name, counters_decl + ); + let code = format!( + "{}\n\n{}\n\n{}", + structt, + stats_struct_impl(st), + stats_struct_agg_impl(st) + ); + code +} + +#[proc_macro] +pub fn stats_struct(ts: TokenStream) -> TokenStream { + use std::fmt::Write; + let mut counters = vec![]; + let mut log = String::new(); + let mut in_name = false; + let mut in_counters = false; + let mut name = None; + for tt in ts.clone() { + match tt { + TokenTree::Ident(k) => { + if k.to_string() == "name" { + in_name = true; + } + if k.to_string() == "counters" { + in_counters = true; + } + } + TokenTree::Group(k) => { + if in_counters { + in_counters = false; + parse_counters(k, &mut counters); + } else if in_name { + in_name = false; + name = Some(parse_name(k)); + } + } + TokenTree::Punct(..) => (), + TokenTree::Literal(k) => { + if in_name { + write!(log, "NAME write literal {}\n", k); + name = Some(k.to_string()); + } + } + } + } + let name = name.unwrap(); + let stats_struct = StatsStruct { name, counters }; + write!(log, "{:?}\n", stats_struct); + //panic!("{}", make_code(&stats_struct)); + //panic!("{}", log); + let ts2 = TokenStream::from_iter(ts.into_iter()); + TokenTree::Group(proc_macro::Group::new(Delimiter::Brace, TokenStream::new())); + make_code(&stats_struct).parse().unwrap() +}