diff --git a/daqingest/src/daemon.rs b/daqingest/src/daemon.rs index 7210c88..49d6cea 100644 --- a/daqingest/src/daemon.rs +++ b/daqingest/src/daemon.rs @@ -9,7 +9,6 @@ use log::*; use netfetch::ca::connset::CaConnSet; use netfetch::ca::connset::CaConnSetCtrl; use netfetch::ca::connset::CaConnSetItem; -use netfetch::ca::IngestCommons; use netfetch::conf::CaIngestOpts; use netfetch::daemon_common::Channel; use netfetch::daemon_common::DaemonEvent; @@ -17,6 +16,7 @@ use netfetch::metrics::ExtraInsertsConf; use netfetch::metrics::StatsSet; use netpod::Database; use netpod::ScyllaConfig; +use scywr::insertworker::InsertWorkerOpts; use scywr::insertworker::Ttls; use scywr::iteminsertqueue as scywriiq; use scywr::store::DataStore; @@ -78,13 +78,14 @@ pub struct Daemon { count_assigned: usize, last_status_print: SystemTime, insert_workers_jh: Vec>>, - ingest_commons: Arc, caconn_last_channel_check: Instant, stats: Arc, shutting_down: bool, insert_rx_weak: WeakReceiver, connset_ctrl: CaConnSetCtrl, connset_status_last: Instant, + // TODO should be a stats object? + insert_workers_running: AtomicU64, } impl Daemon { @@ -137,25 +138,17 @@ impl Daemon { } }); - let ingest_commons = IngestCommons { - pgconf: Arc::new(opts.pgconf.clone()), - backend: opts.backend().into(), - local_epics_hostname: opts.local_epics_hostname.clone(), - data_store: datastore.clone(), - insert_ivl_min: Arc::new(AtomicU64::new(0)), - extra_inserts_conf: tokio::sync::Mutex::new(ExtraInsertsConf::new()), - store_workers_rate: Arc::new(AtomicU64::new(20000)), - insert_frac: Arc::new(AtomicU64::new(1000)), - insert_workers_running: Arc::new(AtomicU64::new(0)), - }; - let ingest_commons = Arc::new(ingest_commons); - let use_rate_limit_queue = false; // TODO use a new stats type: let store_stats = Arc::new(stats::CaConnStats::new()); let ttls = opts.ttls.clone(); - let insert_worker_opts = Arc::new(ingest_commons.as_ref().into()); + let insert_worker_opts = InsertWorkerOpts { + store_workers_rate: Arc::new(AtomicU64::new(20000000)), + insert_workers_running: Arc::new(AtomicU64::new(0)), + insert_frac: Arc::new(AtomicU64::new(1000)), + }; + let insert_worker_opts = Arc::new(insert_worker_opts); let insert_workers_jh = scywr::insertworker::spawn_scylla_insert_workers( opts.scyconf.clone(), opts.insert_scylla_sessions, @@ -214,13 +207,13 @@ impl Daemon { count_assigned: 0, last_status_print: SystemTime::now(), insert_workers_jh, - ingest_commons, caconn_last_channel_check: Instant::now(), stats: Arc::new(DaemonStats::new()), shutting_down: false, insert_rx_weak: query_item_rx.downgrade(), connset_ctrl: conn_set_ctrl, connset_status_last: Instant::now(), + insert_workers_running: AtomicU64::new(0), }; Ok(ret) } @@ -239,10 +232,7 @@ impl Daemon { async fn handle_timer_tick(&mut self) -> Result<(), Error> { if self.shutting_down { - let nworkers = self - .ingest_commons - .insert_workers_running - .load(atomic::Ordering::Acquire); + let nworkers = self.insert_workers_running.load(atomic::Ordering::Acquire); let nitems = self .insert_rx_weak .upgrade() @@ -253,7 +243,7 @@ impl Daemon { std::process::exit(0); } } - self.stats.handle_timer_tick_count_inc(); + self.stats.handle_timer_tick_count.inc(); let ts1 = Instant::now(); let tsnow = SystemTime::now(); if SIGINT.load(atomic::Ordering::Acquire) == 1 { @@ -406,7 +396,7 @@ impl Daemon { async fn handle_event(&mut self, item: DaemonEvent) -> Result<(), Error> { use DaemonEvent::*; - self.stats.events_inc(); + self.stats.events.inc(); let ts1 = Instant::now(); let item_summary = item.summary(); let ret = match item { @@ -416,7 +406,7 @@ impl Daemon { match tx.send(i.wrapping_add(1)).await { Ok(_) => {} Err(_) => { - self.stats.ticker_token_release_error_inc(); + self.stats.ticker_token_release_error.inc(); error!("can not send ticker token"); return Err(Error::with_msg_no_trace("can not send ticker token")); } @@ -462,7 +452,7 @@ impl Daemon { match ticker_inp_rx.recv().await { Ok(_) => {} Err(_) => { - stats.ticker_token_acquire_error_inc(); + stats.ticker_token_acquire_error.inc(); break; } } @@ -494,7 +484,6 @@ impl Daemon { } } } - warn!("TODO should not have to close the channel"); warn!("TODO wait for insert workers"); while let Some(jh) = self.insert_workers_jh.pop() { match jh.await.map_err(Error::from_string) { diff --git a/netfetch/src/ca.rs b/netfetch/src/ca.rs index f1a127d..fcb0f5f 100644 --- a/netfetch/src/ca.rs +++ b/netfetch/src/ca.rs @@ -32,28 +32,6 @@ lazy_static::lazy_static! { pub static ref METRICS: Mutex> = Mutex::new(None); } -pub struct IngestCommons { - pub pgconf: Arc, - pub backend: String, - pub local_epics_hostname: String, - pub data_store: Arc, - pub insert_ivl_min: Arc, - pub extra_inserts_conf: TokMx, - pub insert_frac: Arc, - pub store_workers_rate: Arc, - pub insert_workers_running: Arc, -} - -impl From<&IngestCommons> for InsertWorkerOpts { - fn from(val: &IngestCommons) -> Self { - Self { - store_workers_rate: val.store_workers_rate.clone(), - insert_workers_running: val.insert_workers_running.clone(), - insert_frac: val.insert_frac.clone(), - } - } -} - pub trait SlowWarnable { fn slow_warn(self, ms: u64) -> SlowWarn>> where diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index 281e38b..481d057 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -415,17 +415,6 @@ impl CanSendChannelInfoResult for SendSeriesLookup { } } -struct ChannelOpsResources<'a> { - channel_set_ops: &'a StdMutex>, - channels: &'a mut BTreeMap, - cid_by_name: &'a mut BTreeMap, - name_by_cid: &'a mut BTreeMap, - cid_store: &'a mut CidStore, - init_state_count: &'a mut u64, - channel_set_ops_flag: &'a AtomicUsize, - time_binners: &'a mut BTreeMap, -} - pub struct CaConnOpts { insert_queue_max: usize, array_truncate: usize, @@ -558,7 +547,7 @@ impl CaConn { kind: ConnCommandResultKind::CheckHealth, }; self.cmd_res_queue.push_back(res); - //self.stats.caconn_command_can_not_reply_inc(); + //self.stats.caconn_command_can_not_reply.inc(); } fn cmd_find_channel(&self, pattern: &str) { @@ -608,13 +597,13 @@ impl CaConn { fn cmd_channel_add(&mut self, name: String, cssid: ChannelStatusSeriesId) { self.channel_add(name, cssid); // TODO return the result - //self.stats.caconn_command_can_not_reply_inc(); + //self.stats.caconn_command_can_not_reply.inc(); } fn cmd_channel_remove(&mut self, name: String) { self.channel_remove(name); // TODO return the result - //self.stats.caconn_command_can_not_reply_inc(); + //self.stats.caconn_command_can_not_reply.inc(); } fn cmd_shutdown(&mut self) { @@ -679,7 +668,7 @@ impl CaConn { fn handle_conn_command(&mut self, cx: &mut Context) -> Poll>> { // TODO if this loops for too long time, yield and make sure we get wake up again. use Poll::*; - self.stats.caconn_loop3_count_inc(); + self.stats.caconn_loop3_count.inc(); match self.conn_command_rx.poll_next_unpin(cx) { Ready(Some(a)) => { trace!("handle_conn_command received a command {}", self.remote_addr_dbg); @@ -891,15 +880,9 @@ impl CaConn { _ => {} } } - self.stats - .channel_all_count - .store(self.channels.len() as _, Ordering::Release); - self.stats - .channel_alive_count - .store(alive_count as _, Ordering::Release); - self.stats - .channel_not_alive_count - .store(not_alive_count as _, Ordering::Release); + self.stats.channel_all_count.__set(self.channels.len() as _); + self.stats.channel_alive_count.__set(alive_count as _); + self.stats.channel_not_alive_count.__set(not_alive_count as _); Ok(()) } @@ -954,7 +937,7 @@ impl CaConn { series: SeriesId, ) -> Result<(), Error> { let tsnow = Instant::now(); - self.stats.get_series_id_ok_inc(); + self.stats.get_series_id_ok.inc(); if series.id() == 0 { warn!("Weird series id: {series:?}"); } @@ -1058,7 +1041,7 @@ impl CaConn { ts_msp_grid, }; item_queue.push_back(QueryItem::Insert(item)); - stats.insert_item_create_inc(); + stats.insert_item_create.inc(); Ok(()) } @@ -1172,15 +1155,15 @@ impl CaConn { let ts = ev.value.ts.map_or(0, |x| x.get()); let ts_diff = ts.abs_diff(ts_local); if ts_diff > SEC * 300 { - self.stats.ca_ts_off_4_inc(); + self.stats.ca_ts_off_4.inc(); //warn!("Bad time for {name} {ts} vs {ts_local} diff {}", ts_diff / SEC); // TODO mute this channel for some time, discard the event. } else if ts_diff > SEC * 120 { - self.stats.ca_ts_off_3_inc(); + self.stats.ca_ts_off_3.inc(); } else if ts_diff > SEC * 20 { - self.stats.ca_ts_off_2_inc(); + self.stats.ca_ts_off_2.inc(); } else if ts_diff > SEC * 3 { - self.stats.ca_ts_off_1_inc(); + self.stats.ca_ts_off_1.inc(); } if tsnow >= st.insert_next_earliest { //let channel_state = self.channels.get_mut(&cid).unwrap(); @@ -1238,7 +1221,7 @@ impl CaConn { extra_inserts_conf, )?; } else { - self.stats.channel_fast_item_drop_inc(); + self.stats.channel_fast_item_drop.inc(); if tsnow.duration_since(st.insert_recv_ivl_last) >= Duration::from_millis(10000) { st.insert_recv_ivl_last = tsnow; let ema = st.insert_item_ivl_ema.ema(); @@ -1369,7 +1352,7 @@ impl CaConn { let ts2 = Instant::now(); self.stats .time_check_channels_state_init - .fetch_add((ts2.duration_since(ts1) * MS as u32).as_secs(), Ordering::Release); + .add((ts2.duration_since(ts1) * MS as u32).as_secs()); ts1 = ts2; let mut do_wake_again = false; if msgs_tmp.len() > 0 { @@ -1456,12 +1439,12 @@ impl CaConn { } CaMsgTy::EventAddRes(k) => { trace!("got EventAddRes: {k:?}"); - self.stats.caconn_recv_data_inc(); + self.stats.caconn_recv_data.inc(); let res = Self::handle_event_add_res(self, k, tsnow); let ts2 = Instant::now(); self.stats .time_handle_event_add_res - .fetch_add((ts2.duration_since(ts1) * MS as u32).as_secs(), Ordering::AcqRel); + .add((ts2.duration_since(ts1) * MS as u32).as_secs()); ts1 = ts2; let _ = ts1; res? @@ -1638,7 +1621,7 @@ impl CaConn { fn loop_inner(&mut self, cx: &mut Context) -> Result>, Error> { use Poll::*; loop { - self.stats.caconn_loop2_count_inc(); + self.stats.caconn_loop2_count.inc(); if self.is_shutdown() { break Ok(None); } @@ -1653,47 +1636,6 @@ impl CaConn { } } - fn apply_channel_ops_with_res(res: ChannelOpsResources) { - let mut g = res.channel_set_ops.lock().unwrap(); - let map = std::mem::replace(&mut *g, BTreeMap::new()); - for (ch, op) in map { - match op { - ChannelSetOp::Add(cssid) => Self::channel_add_expl( - ch, - cssid, - res.channels, - res.cid_by_name, - res.name_by_cid, - res.cid_store, - res.init_state_count, - ), - ChannelSetOp::Remove => Self::channel_remove_expl( - ch, - res.channels, - res.cid_by_name, - res.name_by_cid, - res.cid_store, - res.time_binners, - ), - } - } - res.channel_set_ops_flag.store(0, atomic::Ordering::Release); - } - - fn apply_channel_ops(&mut self) { - let res = ChannelOpsResources { - channel_set_ops: err::todoval(), - channels: &mut self.channels, - cid_by_name: &mut self.cid_by_name, - name_by_cid: &mut self.name_by_cid, - cid_store: &mut self.cid_store, - init_state_count: &mut self.init_state_count, - channel_set_ops_flag: err::todoval(), - time_binners: &mut self.time_binners, - }; - Self::apply_channel_ops_with_res(res) - } - fn handle_own_ticker(mut self: Pin<&mut Self>, cx: &mut Context) -> Result<(), Error> { use Poll::*; match self.ticker.poll_unpin(cx) { @@ -1757,7 +1699,7 @@ impl Stream for CaConn { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; - self.stats.caconn_poll_count_inc(); + self.stats.caconn_poll_count.inc(); loop { let mut have_pending = false; break if let CaConnState::EndOfStream = self.state { @@ -1798,8 +1740,8 @@ impl Stream for CaConn { } { Ready(Some(item)) } else { - // Ready(_) => self.stats.conn_stream_ready_inc(), - // Pending => self.stats.conn_stream_pending_inc(), + // Ready(_) => self.stats.conn_stream_ready.inc(), + // Pending => self.stats.conn_stream_pending.inc(), let _item = CaConnEvent { ts: Instant::now(), value: CaConnEventValue::None, diff --git a/netfetch/src/ca/connset.rs b/netfetch/src/ca/connset.rs index dfaea0d..734f6b5 100644 --- a/netfetch/src/ca/connset.rs +++ b/netfetch/src/ca/connset.rs @@ -520,20 +520,20 @@ impl CaConnSet { if let Some(e) = self.ca_conn_ress.remove(&addr) { match e.jh.await { Ok(Ok(())) => { - self.stats.ca_conn_task_join_done_ok_inc(); + self.stats.ca_conn_task_join_done_ok.inc(); debug!("CaConn {addr} finished well"); } Ok(Err(e)) => { - self.stats.ca_conn_task_join_done_err_inc(); + self.stats.ca_conn_task_join_done_err.inc(); error!("CaConn {addr} task error: {e}"); } Err(e) => { - self.stats.ca_conn_task_join_err_inc(); + self.stats.ca_conn_task_join_err.inc(); error!("CaConn {addr} join error: {e}"); } } } else { - self.stats.ca_conn_task_eos_non_exist_inc(); + self.stats.ca_conn_task_eos_non_exist.inc(); warn!("end-of-stream received for non-existent CaConn {addr}"); } Ok(()) @@ -581,7 +581,7 @@ impl CaConnSet { while let Some(item) = conn.next().await { match item { Ok(item) => { - stats.conn_item_count_inc(); + stats.conn_item_count.inc(); conn_item_tx .send(CaConnSetEvent::CaConnEvent((SocketAddr::V4(addr), item))) .await?; @@ -721,7 +721,7 @@ impl CaConnSet { if tsnow.duration_since(state.last_feedback) > Duration::from_millis(20000) { error!("TODO Fresh timeout send connection-close for {addr}"); // TODO collect in metrics - // self.stats.ca_conn_status_feedback_timeout_inc(); + // self.stats.ca_conn_status_feedback_timeout.inc(); // TODO send shutdown to this CaConn, check that we've received // a 'shutdown' state from it. (see below) *v = CaConnStateValue::Shutdown { since: tsnow }; @@ -732,14 +732,14 @@ impl CaConnSet { if tsnow.duration_since(state.last_feedback) > Duration::from_millis(20000) { error!("TODO HadFeedback timeout send connection-close for {addr}"); // TODO collect in metrics - // self.stats.ca_conn_status_feedback_timeout_inc(); + // self.stats.ca_conn_status_feedback_timeout.inc(); *v = CaConnStateValue::Shutdown { since: tsnow }; } } CaConnStateValue::Shutdown { since } => { if tsnow.saturating_duration_since(*since) > Duration::from_millis(10000) { // TODO collect in metrics as severe error, this would be a bug. - // self.stats.critical_error_inc(); + // self.stats.critical_error.inc(); error!("Shutdown of CaConn failed for {addr}"); } } @@ -908,11 +908,11 @@ impl CaConnSet { } } use atomic::Ordering::Release; - self.stats.channel_unknown_address.store(unknown_address, Release); - self.stats.channel_search_pending.store(search_pending, Release); - self.stats.channel_no_address.store(no_address, Release); - self.stats.channel_unassigned.store(unassigned, Release); - self.stats.channel_assigned.store(assigned, Release); + self.stats.channel_unknown_address.__set(unknown_address); + self.stats.channel_search_pending.__set(search_pending); + self.stats.channel_no_address.__set(no_address); + self.stats.channel_unassigned.__set(unassigned); + self.stats.channel_assigned.__set(assigned); (search_pending,) } } diff --git a/netfetch/src/metrics.rs b/netfetch/src/metrics.rs index 6b64e07..aa6a126 100644 --- a/netfetch/src/metrics.rs +++ b/netfetch/src/metrics.rs @@ -1,11 +1,12 @@ -use crate::ca::IngestCommons; use crate::ca::METRICS; use crate::daemon_common::DaemonEvent; use async_channel::Sender; +use async_channel::WeakSender; use axum::extract::Query; use err::Error; use http::Request; use log::*; +use scywr::iteminsertqueue::QueryItem; use serde::Deserialize; use serde::Serialize; use stats::CaConnStats; @@ -251,7 +252,7 @@ pub async fn start_metrics_service(bind_to: String, dcom: Arc, stats } pub async fn metrics_agg_task( - ingest_commons: Arc, + query_item_chn: WeakSender, local_stats: Arc, store_stats: Arc, ) -> Result<(), Error> { @@ -262,6 +263,14 @@ pub async fn metrics_agg_task( agg.push(&local_stats); agg.push(&store_stats); trace!("TODO metrics_agg_task"); + // TODO when a CaConn is closed, I'll lose the so far collected counts, which creates a jump + // in the metrics. + // To make this sound: + // Let CaConn keep a stats and just count. + // At the tick, create a snapshot: all atomics are copied after each other. + // Diff this new snapshot with an older snapshot and send that. + // Note: some stats are counters, but some are current values. + // e.g. the number of active channels should go down when a CaConn stops. #[cfg(DISABLED)] { let conn_stats_guard = ingest_commons.ca_conn_set.ca_conn_ress().lock().await; @@ -271,9 +280,8 @@ pub async fn metrics_agg_task( } { warn!("TODO provide metrics with a weak ref to the query_item_channel"); - let nitems = 0; - // let nitems = weak.upgrade()..len(); - agg.store_worker_recv_queue_len.store(nitems, Ordering::Release); + let nitems = query_item_chn.upgrade().map_or(0, |x| x.len()); + agg.store_worker_recv_queue_len.__set(nitems as u64); } let mut m = METRICS.lock().unwrap(); *m = Some(agg.clone()); diff --git a/scywr/src/insertworker.rs b/scywr/src/insertworker.rs index 2c0e918..ec8c0c7 100644 --- a/scywr/src/insertworker.rs +++ b/scywr/src/insertworker.rs @@ -24,22 +24,22 @@ fn stats_inc_for_err(stats: &stats::CaConnStats, err: &crate::iteminsertqueue::E use crate::iteminsertqueue::Error; match err { Error::DbOverload => { - stats.store_worker_insert_overload_inc(); + stats.store_worker_insert_overload.inc(); } Error::DbTimeout => { - stats.store_worker_insert_timeout_inc(); + stats.store_worker_insert_timeout.inc(); } Error::DbUnavailable => { - stats.store_worker_insert_unavailable_inc(); + stats.store_worker_insert_unavailable.inc(); } Error::DbError(e) => { if false { warn!("db error {e}"); } - stats.store_worker_insert_error_inc(); + stats.store_worker_insert_error.inc(); } Error::QueryError(_) => { - stats.store_worker_insert_error_inc(); + stats.store_worker_insert_error.inc(); } } } @@ -103,7 +103,7 @@ async fn rate_limiter_worker( let ivl2 = Duration::from_nanos(ema2.ema() as u64); if allowed_to_drop && ivl2 < dt_min { //tokio::time::sleep_until(ts_recv_last.checked_add(dt_min).unwrap().into()).await; - stats.store_worker_ratelimit_drop_inc(); + stats.store_worker_ratelimit_drop.inc(); } else { if tx.send(item).await.is_err() { break; @@ -113,7 +113,7 @@ async fn rate_limiter_worker( let dt_ns = SEC * dt.as_secs() + dt.subsec_nanos() as u64; ivl_ema.update(dt_ns.min(MS * 100) as f32); ts_forward_last = tsnow; - stats.inter_ivl_ema.store(ivl_ema.ema() as u64, Ordering::Release); + stats.inter_ivl_ema.set(ivl_ema.ema() as u64); } } } @@ -146,7 +146,7 @@ async fn worker( let mut i1 = 0; loop { let item = if let Ok(item) = item_inp.recv().await { - stats.store_worker_item_recv_inc(); + stats.store_worker_item_recv.inc(); item } else { break; @@ -155,7 +155,7 @@ async fn worker( QueryItem::ConnectionStatus(item) => { match insert_connection_status(item, ttls.index, &data_store, &stats).await { Ok(_) => { - stats.connection_status_insert_done_inc(); + stats.connection_status_insert_done.inc(); backoff = backoff_0; } Err(e) => { @@ -167,7 +167,7 @@ async fn worker( QueryItem::ChannelStatus(item) => { match insert_channel_status(item, ttls.index, &data_store, &stats).await { Ok(_) => { - stats.channel_status_insert_done_inc(); + stats.channel_status_insert_done.inc(); backoff = backoff_0; } Err(e) => { @@ -181,7 +181,7 @@ async fn worker( if i1 % 1000 < insert_frac { match insert_item(item, ttls.index, ttls.d0, ttls.d1, &data_store, &stats).await { Ok(_) => { - stats.store_worker_insert_done_inc(); + stats.store_worker_insert_done.inc(); backoff = backoff_0; } Err(e) => { @@ -190,7 +190,7 @@ async fn worker( } } } else { - stats.store_worker_fraction_drop_inc(); + stats.store_worker_fraction_drop.inc(); } i1 += 1; } @@ -206,7 +206,7 @@ async fn worker( let qres = data_store.scy.execute(&data_store.qu_insert_muted, values).await; match qres { Ok(_) => { - stats.mute_insert_done_inc(); + stats.mute_insert_done.inc(); backoff = backoff_0; } Err(e) => { @@ -230,7 +230,7 @@ async fn worker( .await; match qres { Ok(_) => { - stats.ivl_insert_done_inc(); + stats.ivl_insert_done.inc(); backoff = backoff_0; } Err(e) => { @@ -252,7 +252,7 @@ async fn worker( let qres = data_store.scy.execute(&data_store.qu_insert_channel_ping, params).await; match qres { Ok(_) => { - stats.channel_info_insert_done_inc(); + stats.channel_info_insert_done.inc(); backoff = backoff_0; } Err(e) => { @@ -281,7 +281,7 @@ async fn worker( .await; match qres { Ok(_) => { - stats.store_worker_insert_binned_done_inc(); + stats.store_worker_insert_binned_done.inc(); backoff = backoff_0; } Err(e) => { diff --git a/scywr/src/iteminsertqueue.rs b/scywr/src/iteminsertqueue.rs index 5acef45..fa16c5e 100644 --- a/scywr/src/iteminsertqueue.rs +++ b/scywr/src/iteminsertqueue.rs @@ -388,7 +388,7 @@ pub async fn insert_item( if item.msp_bump { let params = (item.series.id() as i64, item.ts_msp as i64, ttl_index.as_secs() as i32); data_store.scy.execute(&data_store.qu_insert_ts_msp, params).await?; - stats.inserts_msp_inc(); + stats.inserts_msp.inc(); } if let Some(ts_msp_grid) = item.ts_msp_grid { let params = ( @@ -403,7 +403,7 @@ pub async fn insert_item( .scy .execute(&data_store.qu_insert_series_by_ts_msp, params) .await?; - stats.inserts_msp_grid_inc(); + stats.inserts_msp_grid.inc(); } use DataValue::*; match item.val { @@ -446,7 +446,7 @@ pub async fn insert_item( } } } - stats.inserts_val_inc(); + stats.inserts_val.inc(); Ok(()) } diff --git a/stats/src/stats.rs b/stats/src/stats.rs index d92b754..6ffe9df 100644 --- a/stats/src/stats.rs +++ b/stats/src/stats.rs @@ -207,6 +207,7 @@ impl IntervalEma { } } +// #[cfg(DISABLED)] stats_proc::stats_struct!(( stats_struct( name(CaConnSetStats), @@ -224,9 +225,10 @@ stats_proc::stats_struct!(( ), ), // agg(name(CaConnSetStatsAgg), parent(CaConnSetStats)), - diff(name(CaConnSetStatsDiff), input(CaConnSetStats)), + // diff(name(CaConnSetStatsDiff), input(CaConnSetStats)), )); +// #[cfg(DISABLED)] stats_proc::stats_struct!(( stats_struct( name(CaConnStats), @@ -280,13 +282,14 @@ stats_proc::stats_struct!(( ca_ts_off_2, ca_ts_off_3, ca_ts_off_4, - inter_ivl_ema, ), + values(inter_ivl_ema) ), agg(name(CaConnStatsAgg), parent(CaConnStats)), diff(name(CaConnStatsAggDiff), input(CaConnStatsAgg)), )); +// #[cfg(DISABLED)] stats_proc::stats_struct!(( stats_struct( name(DaemonStats), @@ -320,3 +323,23 @@ stats_proc::stats_struct!(( agg(name(DaemonStatsAgg), parent(DaemonStats)), diff(name(DaemonStatsAggDiff), input(DaemonStatsAgg)), )); + +stats_proc::stats_struct!(( + stats_struct(name(TestStats0), counters(count0,), values(val0),), + diff(name(TestStats0Diff), input(TestStats0)), + agg(name(TestStats0Agg), parent(TestStats0)), + diff(name(TestStats0AggDiff), input(TestStats0Agg)), +)); + +#[test] +fn test0_diff() { + let stats_a = TestStats0::new(); + stats_a.count0().inc(); + stats_a.val0().set(43); + let stats_b = stats_a.snapshot(); + stats_b.count0().inc(); + stats_b.count0().inc(); + stats_b.count0().inc(); + let diff = TestStats0Diff::diff_from(&stats_a, &stats_b); + assert_eq!(diff.count0.load(), 3); +} diff --git a/stats_proc/src/stats_proc.rs b/stats_proc/src/stats_proc.rs index 00fccc8..cc431db 100644 --- a/stats_proc/src/stats_proc.rs +++ b/stats_proc/src/stats_proc.rs @@ -1,7 +1,8 @@ use proc_macro::TokenStream; use quote::quote; use syn::parse::ParseStream; -use syn::{parse_macro_input, Ident}; +use syn::parse_macro_input; +use syn::Ident; type PunctExpr = syn::punctuated::Punctuated; @@ -43,11 +44,11 @@ fn stats_struct_impl(st: &StatsStructDef) -> String { let inits1 = st .counters .iter() - .map(|x| format!("{:12}{}: AtomicU64::new(0)", "", x.to_string())); + .map(|x| format!("{:12}{}: stats_types::Counter::new()", "", x.to_string())); let inits2 = st .values .iter() - .map(|x| format!("{:12}{}: AtomicU64::new(0)", "", x.to_string())); + .map(|x| format!("{:12}{}: stats_types::Value::new()", "", x.to_string())); let inits: Vec<_> = inits1.into_iter().chain(inits2).collect(); let inits = inits.join(",\n"); let incers: String = st @@ -56,15 +57,12 @@ fn stats_struct_impl(st: &StatsStructDef) -> String { .map(|nn| { format!( " - pub fn {nn}_inc(&self) {{ - self.{nn}.fetch_add(1, Ordering::AcqRel); - }} - pub fn {nn}_add(&self, v: u64) {{ - self.{nn}.fetch_add(v, Ordering::AcqRel); - }} - pub fn {nn}_dur(&self, v: Duration) {{ - self.{nn}.fetch_add((v * 1000000).as_secs(), Ordering::AcqRel); + pub fn {nn}(&self) -> &stats_types::Counter {{ + &self.{nn} }} + //pub fn {nn}_dur(&self, v: Duration) {{ + // self.{nn}.fetch_add((v * 1000000).as_secs(), Ordering::AcqRel); + //}} " ) }) @@ -78,8 +76,8 @@ fn stats_struct_impl(st: &StatsStructDef) -> String { write!( buf, " - pub fn {nn}_set(&self, v: u64) {{ - self.{nn}.store(v, Ordering::Release); + pub fn {nn}(&self) -> &stats_types::Value {{ + &self.{nn} }} " ) @@ -97,7 +95,7 @@ fn stats_struct_impl(st: &StatsStructDef) -> String { String::new() }; buf.push_str(&format!( - "ret.push_str(&format!(\"daqingest{}_{} {{}}\\n\", self.{}.load(Ordering::Acquire)));\n", + "ret.push_str(&format!(\"daqingest{}_{} {{}}\\n\", self.{}.load()));\n", pre, n, n )); } @@ -109,7 +107,7 @@ fn stats_struct_impl(st: &StatsStructDef) -> String { n.to_string() }; buf.push_str(&format!( - "ret.push_str(&format!(\"daqingest_{} {{}}\\n\", self.{}.load(Ordering::Acquire)));\n", + "ret.push_str(&format!(\"daqingest_{} {{}}\\n\", self.{}.load()));\n", nn, n )); } @@ -123,12 +121,35 @@ fn stats_struct_impl(st: &StatsStructDef) -> String { " ) }; + let fn_snapshot = { + let mut init_counters = String::new(); + for x in &st.counters { + let n = x.to_string(); + init_counters.push_str(&format!("ret.{}.__set(self.{}.load());\n", n, n)); + } + let mut init_values = String::new(); + for x in &st.values { + let n = x.to_string(); + init_values.push_str(&format!("ret.{}.set(self.{}.load());\n", n, n)); + } + format!( + " + pub fn snapshot(&self) -> Self {{ + let ret = Self::new(); + {init_counters} + {init_values} + ret + }} + " + ) + }; format!( " impl {name} {{ pub fn new() -> Self {{ Self {{ ts_create: Instant::now(), + dropped: stats_types::Value::new(), {inits} }} }} @@ -138,8 +159,17 @@ impl {name} {{ {values} {fn_prometheus} + + {fn_snapshot} }} - " + +impl stats_types::DropMark for {name} {{ + fn field(&self) -> &stats_types::Value {{ + &self.dropped + }} +}} + +" ) } @@ -148,17 +178,18 @@ fn stats_struct_decl_impl(st: &StatsStructDef) -> String { let counters_decl = st .counters .iter() - .map(|x| format!("{:4}pub {}: AtomicU64,\n", "", x.to_string())) + .map(|x| format!("{:4}pub {}: stats_types::Counter,\n", "", x.to_string())) .fold(String::new(), extend_str); let values_decl = st .values .iter() - .map(|x| format!("{:4}pub {}: AtomicU64,\n", "", x.to_string())) + .map(|x| format!("{:4}pub {}: stats_types::Value,\n", "", x.to_string())) .fold(String::new(), extend_str); let structt = format!( " pub struct {name} {{ pub ts_create: Instant, + dropped: stats_types::Value, {counters_decl} {values_decl} }} @@ -175,7 +206,7 @@ fn agg_decl_impl(st: &StatsStructDef, ag: &AggStructDef) -> String { let counters_decl = st .counters .iter() - .map(|x| format!("{:4}pub {}: AtomicU64,\n", "", x.to_string())) + .map(|x| format!("{:4}pub {}: stats_types::Counter,\n", "", x.to_string())) .fold(String::new(), extend_str); let mut code = String::new(); let s = format!( @@ -183,7 +214,7 @@ fn agg_decl_impl(st: &StatsStructDef, ag: &AggStructDef) -> String { // Agg decl pub struct {name} {{ pub ts_create: Instant, - pub aggcount: AtomicU64, + pub aggcount: stats_types::Counter, {counters_decl} }} " @@ -194,7 +225,7 @@ pub struct {name} {{ .iter() .map(|x| { let n = x.to_string(); - format!("{:12}{}: AtomicU64::new(self.{}.load(Ordering::Acquire)),\n", "", n, n) + format!("{:12}{}: stats_types::Counter::init(self.{}.load()),\n", "", n, n) }) .fold(String::new(), extend_str); let s = format!( @@ -203,7 +234,7 @@ impl Clone for {name} {{ fn clone(&self) -> Self {{ Self {{ ts_create: self.ts_create.clone(), - aggcount: AtomicU64::new(self.aggcount.load(Ordering::Acquire)), + aggcount: stats_types::Counter::init(self.aggcount.load()), {clone_counters} }} }} @@ -214,7 +245,7 @@ impl Clone for {name} {{ let inits = st .counters .iter() - .map(|x| format!("{:12}{}: AtomicU64::new(0),\n", "", x.to_string())) + .map(|x| format!("{:12}{}: stats_types::Counter::new(),\n", "", x.to_string())) .fold(String::new(), extend_str); let s = format!( " @@ -223,7 +254,7 @@ impl {name} {{ pub fn new() -> Self {{ Self {{ ts_create: Instant::now(), - aggcount: AtomicU64::new(0), + aggcount: stats_types::Counter::new(), {inits} }} }} @@ -233,18 +264,12 @@ impl {name} {{ let counters_add = st .counters .iter() - .map(|x| { - format!( - "self.{}.fetch_add(inp.{}.load(Ordering::Acquire), Ordering::AcqRel);\n", - x.to_string(), - x.to_string() - ) - }) + .map(|x| format!("self.{}.add(inp.{}.load());\n", x.to_string(), x.to_string())) .fold(String::new(), extend_str); let s = format!( " pub fn push(&self, inp: &{name_inp}) {{ - self.aggcount.fetch_add(1, Ordering::AcqRel); + self.aggcount.inc(); {counters_add} }} " @@ -255,7 +280,7 @@ impl {name} {{ for x in &st.counters { let n = x.to_string(); buf.push_str(&format!( - "ret.push_str(&format!(\"daqingest_{} {{}}\\n\", self.{}.load(Ordering::Acquire)));\n", + "ret.push_str(&format!(\"daqingest_{} {{}}\\n\", self.{}.load()));\n", n, n )); } @@ -263,7 +288,7 @@ impl {name} {{ " pub fn prometheus(&self) -> String {{ let mut ret = String::new(); - ret.push_str(&format!(\"daqingest_aggcount {{}}\\n\", self.aggcount.load(Ordering::Acquire))); + ret.push_str(&format!(\"daqingest_aggcount {{}}\\n\", self.aggcount.load())); {buf} ret }} @@ -287,7 +312,7 @@ fn diff_decl_impl(st: &DiffStructDef, inp: &StatsStructDef) -> String { let decl = inp .counters .iter() - .map(|x| format!("{:4}pub {}: AtomicU64,\n", "", x.to_string())) + .map(|x| format!("{:4}pub {}: stats_types::Counter,\n", "", x.to_string())) .fold(String::new(), extend_str); let mut code = String::new(); let s = format!( @@ -309,7 +334,7 @@ pub struct {name} {{ .map(|x| { let n = x.to_string(); format!( - "{:12}let {} = AtomicU64::new(b.{}.load(Ordering::Acquire) - a.{}.load(Ordering::Acquire));\n", + "{:12}let {} = stats_types::Counter::init(b.{}.load() - a.{}.load());\n", "", n, n, n ) }) @@ -339,7 +364,7 @@ pub struct {name} {{ let mut b = String::new(); for h in &inp.counters { a.push_str(&format!("{} {{}} ", h.to_string())); - b.push_str(&format!("self.{}.load(Ordering::Acquire), ", h.to_string())); + b.push_str(&format!("self.{}.load(), ", h.to_string())); } let s = format!( " diff --git a/stats_types/src/stats_types.rs b/stats_types/src/stats_types.rs index 6f59644..f6d738b 100644 --- a/stats_types/src/stats_types.rs +++ b/stats_types/src/stats_types.rs @@ -1,15 +1,110 @@ +use std::sync::atomic::AtomicU64; +use std::sync::atomic::Ordering::AcqRel; +use std::sync::atomic::Ordering::Acquire; +use std::sync::atomic::Ordering::Release; + #[derive(Debug)] -pub struct Counter { +pub struct CounterDesc { pub name: String, } #[derive(Debug)] pub struct StatsStruct { pub name: String, - pub counters: Vec, + pub counters: Vec, } +#[derive(Debug)] pub struct StatsStructDef { pub name: String, - pub counters: Vec, + pub counters: Vec, } + +#[derive(Debug)] +pub struct Counter(AtomicU64); + +impl Counter { + pub fn new() -> Self { + Counter(AtomicU64::new(0)) + } + + pub fn init(x: u64) -> Self { + Counter(AtomicU64::new(x)) + } + + pub fn inc(&self) { + self.0.fetch_add(1, AcqRel); + } + + pub fn add(&self, x: u64) { + self.0.fetch_add(x, AcqRel); + } + + pub fn load(&self) -> u64 { + self.0.load(Acquire) + } + + pub fn __set(&self, x: u64) { + self.0.store(x, Release); + } +} + +#[derive(Debug)] +pub struct Value(AtomicU64); + +impl Value { + pub fn new() -> Self { + Value(AtomicU64::new(0)) + } + + pub fn init(x: u64) -> Self { + Value(AtomicU64::new(x)) + } + + pub fn set(&self, x: u64) { + self.0.store(x, Release); + } + + pub fn load(&self) -> u64 { + self.0.load(Acquire) + } +} + +pub trait DropMark { + fn field(&self) -> &Value; +} + +pub struct DropGuard<'a> { + mark: &'a Value, +} + +impl<'a> Drop for DropGuard<'a> { + fn drop(&mut self) { + self.mark.set(1); + } +} + +#[allow(unused)] +struct StatsAInner { + count0: Counter, + val0: Value, + done: Value, +} + +#[allow(unused)] +struct StatsA { + inner: std::sync::Arc, +} + +impl Drop for StatsA { + fn drop(&mut self) { + self.inner.done.set(1); + } +} + +#[allow(unused)] +struct StatsAReader { + inner: std::sync::Arc, +} + +impl StatsAReader {}