diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index 5266365..81b1871 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -1250,6 +1250,7 @@ impl CaConn { do_wake_again = true; } CaMsgTy::EventAddRes(k) => { + self.stats.caconn_recv_data_inc(); let res = Self::handle_event_add_res(self, k, tsnow); let ts2 = Instant::now(); self.stats diff --git a/netfetch/src/insertworker.rs b/netfetch/src/insertworker.rs index 1025720..7b078cc 100644 --- a/netfetch/src/insertworker.rs +++ b/netfetch/src/insertworker.rs @@ -4,6 +4,7 @@ use crate::rt::JoinHandle; use crate::store::{CommonInsertItemQueue, IntoSimplerError, QueryItem}; use err::Error; use log::*; +use netpod::timeunits::{MS, SEC}; use netpod::ScyllaConfig; use std::sync::atomic::Ordering; use std::sync::Arc; @@ -56,33 +57,45 @@ pub async fn spawn_scylla_insert_workers( let ingest_commons = ingest_commons.clone(); let stats = store_stats.clone(); let recv = insert_item_queue.receiver(); - let mut ts_recv_last = Instant::now(); + let store_stats = store_stats.clone(); let fut = async move { + let mut ts_forward_last = Instant::now(); + let mut ivl_ema = stats::Ema64::with_k(0.00001); loop { let item = if let Ok(x) = recv.recv().await { x } else { break; }; + let ts_received = Instant::now(); let allowed_to_drop = match &item { QueryItem::Insert(_) => true, _ => false, }; let dt_min = { let rate = ingest_commons.store_workers_rate.load(Ordering::Acquire); - Duration::from_nanos(1000000000 / rate) + Duration::from_nanos(SEC / rate) }; - let tsnow = Instant::now(); - let dt = tsnow.duration_since(ts_recv_last); - ts_recv_last = tsnow; - if allowed_to_drop && dt < dt_min { + let mut ema2 = ivl_ema.clone(); + { + let dt = ts_received.duration_since(ts_forward_last); + let dt_ns = SEC * dt.as_secs() + dt.subsec_nanos() as u64; + ema2.update(dt_ns.min(MS * 100) as f32); + } + let ivl2 = Duration::from_nanos(ema2.ema() as u64); + if allowed_to_drop && ivl2 < dt_min { //tokio::time::sleep_until(ts_recv_last.checked_add(dt_min).unwrap().into()).await; stats.store_worker_ratelimit_drop_inc(); } else { if q2_tx.send(item).await.is_err() { break; } else { - stats.store_worker_item_recv_inc(); + let tsnow = Instant::now(); + let dt = tsnow.duration_since(ts_forward_last); + let dt_ns = SEC * dt.as_secs() + dt.subsec_nanos() as u64; + ivl_ema.update(dt_ns.min(MS * 100) as f32); + ts_forward_last = tsnow; + store_stats.inter_ivl_ema.store(ivl_ema.ema() as u64, Ordering::Release); } } } @@ -100,8 +113,11 @@ pub async fn spawn_scylla_insert_workers( for i1 in 0..insert_worker_count { let data_store = data_stores[i1 * data_stores.len() / insert_worker_count].clone(); let stats = store_stats.clone(); - //let recv = insert_item_queue.receiver(); - let recv = q2_rx.clone(); + let recv = if true { + q2_rx.clone() + } else { + insert_item_queue.receiver() + }; let ingest_commons = ingest_commons.clone(); let fut = async move { let backoff_0 = Duration::from_millis(10); @@ -109,6 +125,7 @@ pub async fn spawn_scylla_insert_workers( let mut i1 = 0; loop { let item = if let Ok(item) = recv.recv().await { + stats.store_worker_item_recv_inc(); item } else { break; diff --git a/stats/src/stats.rs b/stats/src/stats.rs index b91d0ae..839bcc8 100644 --- a/stats/src/stats.rs +++ b/stats/src/stats.rs @@ -6,15 +6,17 @@ const US: u64 = 1000; const MS: u64 = US * 1000; const SEC: u64 = MS * 1000; +pub type EMA = Ema32; + #[derive(Clone, Debug)] -pub struct EMA { +pub struct Ema32 { ema: f32, emv: f32, k: f32, update_count: u64, } -impl EMA { +impl Ema32 { pub fn with_k(k: f32) -> Self { Self { ema: 0.0, @@ -71,6 +73,71 @@ impl EMA { } } +#[derive(Clone, Debug)] +pub struct Ema64 { + ema: f64, + emv: f64, + k: f64, + update_count: u64, +} + +impl Ema64 { + pub fn with_k(k: f64) -> Self { + Self { + ema: 0.0, + emv: 0.0, + k, + update_count: 0, + } + } + + pub fn with_ema(ema: f64, k: f64) -> Self { + Self { + ema, + emv: 0.0, + k, + update_count: 0, + } + } + + pub fn default() -> Self { + Self { + ema: 0.0, + emv: 0.0, + k: 0.05, + update_count: 0, + } + } + + #[inline(always)] + pub fn update(&mut self, v: V) + where + V: Into, + { + self.update_count += 1; + let k = self.k; + let dv = v.into() - self.ema; + self.ema += k * dv; + self.emv = (1f64 - k) * (self.emv + k * dv * dv); + } + + pub fn update_count(&self) -> u64 { + self.update_count + } + + pub fn ema(&self) -> f64 { + self.ema + } + + pub fn emv(&self) -> f64 { + self.emv + } + + pub fn k(&self) -> f64 { + self.k + } +} + pub struct CheckEvery { ts_last: Instant, dt: Duration, @@ -153,6 +220,7 @@ stats_proc::stats_struct!(( inserts_queue_drop, channel_fast_item_drop, store_worker_recv_queue_len, + // TODO maybe rename: this is now only the recv of the intermediate queue: store_worker_item_recv, // TODO rename to make clear that this drop is voluntary because of user config choice: store_worker_fraction_drop, @@ -173,6 +241,7 @@ stats_proc::stats_struct!(( caconn_loop3_count, caconn_loop4_count, caconn_command_can_not_reply, + caconn_recv_data, time_handle_conn_listen, time_handle_peer_ready, time_check_channels_state_init, @@ -189,6 +258,7 @@ stats_proc::stats_struct!(( ca_ts_off_2, ca_ts_off_3, ca_ts_off_4, + inter_ivl_ema, ), ), agg(name(CaConnStatsAgg), parent(CaConnStats)),