From 94325b1400486bf8a9d68cb5933e7125f95d5d45 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Wed, 2 Jul 2025 16:24:56 +0200 Subject: [PATCH] Enable on writers periodic and on close housekeeping --- daqingest/src/daemon.rs | 11 ++-- netfetch/src/ca/conn.rs | 38 +++++++++++++- serieswriter/src/msptool.rs | 34 ++++++------ serieswriter/src/ratelimitwriter.rs | 17 +++++- serieswriter/src/rtwriter.rs | 27 +++++++++- serieswriter/src/writer.rs | 81 +++++++++++++++++++++++++---- stats/mettdecl.rs | 3 ++ 7 files changed, 177 insertions(+), 34 deletions(-) diff --git a/daqingest/src/daemon.rs b/daqingest/src/daemon.rs index c683ecd..50f7506 100644 --- a/daqingest/src/daemon.rs +++ b/daqingest/src/daemon.rs @@ -643,10 +643,11 @@ impl Daemon { } async fn handle_shutdown(&mut self) -> Result<(), Error> { + let selfname = "handle_shutdown"; if self.shutting_down { - warn!("already shutting down"); + warn!("{selfname} already shutting down"); } else { - info!("handle_shutdown"); + info!("{selfname} handle_shutdown"); self.shutting_down = true; // TODO make sure we: // set a flag so that we don't attempt to use resources any longer (why could that happen?) @@ -656,10 +657,10 @@ impl Daemon { // await the connection sets. // await other workers that we've spawned. if let Some(iqtx) = &self.iqtx { - info!("scylla output channels, closing all"); - iqtx.close_all(); + info!("{selfname} scylla output channels {iqtx}", iqtx = iqtx.summary()); + // iqtx.close_all(); } else { - info!("scylla output channels, not set"); + info!("{selfname} scylla output channels, not set"); } drop(self.iqtx.take()); self.connset_ctrl.shutdown().await?; diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index d323e85..3789981 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -70,7 +70,6 @@ use stats::rand_xoshiro::rand_core::SeedableRng; use std::collections::BTreeMap; use std::collections::VecDeque; use std::fmt; -use std::marker::PhantomData; use std::net::SocketAddrV4; use std::pin::Pin; use std::sync::atomic; @@ -107,6 +106,10 @@ macro_rules! trace_event_incoming { ($($arg:tt)*) => ( if false { log::trace!($( macro_rules! trace_monitor_stale { ($($arg:tt)*) => ( if false { log::trace!($($arg)*); } ); } +macro_rules! trace_channel_remove { ($($arg:tt)*) => ( if true { log::trace!($($arg)*); } ); } + +macro_rules! debug_shutdown { ($($arg:tt)*) => ( if true { log::debug!($($arg)*); } ); } + fn dbg_chn_cid(cid: Cid, conn: &CaConn) -> bool { if let Some(name) = conn.name_by_cid(cid) { series::dbg::dbg_chn(name) @@ -1084,6 +1087,7 @@ impl<'a> EventAddIngestRefobj<'a> { if let Some(binwriter) = self.binwriter.as_mut() { binwriter.ingest(tsev, val_for_agg, self.iqdqs)?; } + self.mett.ts_msp_reput_onevent().add(wres.msp_rewrite() as u32); } if false { // TODO record stats on drop with the new filter @@ -1498,6 +1502,8 @@ impl CaConn { } fn trigger_shutdown(&mut self, reason: ShutdownReason) { + let selfname = "trigger_shutdown"; + debug_shutdown!("{selfname} {addr}", addr = self.remote_addr_dbg); let channel_reason = match &reason { ShutdownReason::ConnectRefused => { self.state = CaConnState::Shutdown(EndOfStreamReason::ConnectRefused); @@ -1870,7 +1876,6 @@ impl CaConn { let selfname = "channel_close_by_name"; debug!("{selfname} {}", name); if let Some(x) = self.cid_by_name.get(&name).map(Clone::clone) { - self.cid_by_name.remove(&name); self.channel_close_by_cid(x.clone()) } else { warn!("{selfname} {} can not find channel", name); @@ -1885,6 +1890,8 @@ impl CaConn { let stnow = SystemTime::now(); if let Some(conf) = self.channels.get_mut(&cid) { let name = conf.conf.name(); + trace_channel_remove!("{selfname} channel_close_by_cid {cid} {name}"); + self.cid_by_name.remove(name); { // TODO emit CaConn item to let CaConnSet know that we have closed the channel. // TODO may be too full @@ -1904,8 +1911,14 @@ impl CaConn { // TODO shutdown the internal writer structures. match &mut conf.state { ChannelState::Writable(st2) => { + if st2.writer.tick(&mut self.iqdqs).is_err() { + self.mett.logic_error().inc(); + } if st2.writer.on_close(&mut self.iqdqs).is_err() { self.mett.logic_error().inc(); + } else { + trace_channel_remove!("writer on_close done Ok {}", conf.conf.name()); + self.mett.series_writer_on_close().inc(); } } _ => {} @@ -3367,6 +3380,27 @@ impl CaConn { CaConnState::EndOfStream => {} CaConnState::MetricsEmitted => {} } + { + let self2 = self.as_mut().get_mut(); + let mut ts_msp_reput = 0; + let mut logic_error = 0; + for (_, conf) in &mut self2.channels { + let st1 = &mut conf.state; + match st1 { + ChannelState::Writable(st2) => match st2.writer.housekeeping(&mut self2.iqdqs) { + Ok(res) => { + ts_msp_reput += res.ts_msp_reput as u32; + } + Err(_) => { + logic_error += 1; + } + }, + _ => {} + } + } + self.mett.ts_msp_reput_periodic().add(ts_msp_reput); + self.mett.logic_error().add(logic_error); + } self.iqdqs.housekeeping(); if self.metrics_emit_last + METRICS_EMIT_IVL <= tsnow { self.metrics_emit_last = tsnow; diff --git a/serieswriter/src/msptool.rs b/serieswriter/src/msptool.rs index cdaf87f..397c4bf 100644 --- a/serieswriter/src/msptool.rs +++ b/serieswriter/src/msptool.rs @@ -2,11 +2,10 @@ pub mod fixgrid; use netpod::DtNano; use netpod::TsNano; +use netpod::timeunits::DAY; +use netpod::timeunits::SEC; use serde::Serialize; -const SEC: u64 = 1000_000_000; -const HOUR: u64 = SEC * 60 * 60 * 24; - #[derive(Debug, Serialize)] pub struct MspSplit { last: Option, @@ -27,37 +26,42 @@ impl MspSplit { } } - pub fn split(&mut self, ts: TsNano, item_bytes: u32) -> (TsNano, DtNano, bool) { + pub fn split(&mut self, ts: TsNano, item_bytes: u32) -> (TsNano, DtNano, bool, Option) { // Maximum resolution of the ts msp: let msp_res_max = SEC * 2; - let ts_main = ts; - let (ts_msp, changed) = match self.last { + let ts_inp = ts; + let (ts_msp, changed, ts_msp_retired) = match self.last { Some(ts_msp_last) => { - if self.count >= self.count_max || self.bytes >= self.bytes_max || ts_msp_last.add_ns(HOUR) <= ts_main { - let ts_msp = ts_main.div(msp_res_max).mul(msp_res_max); + if self.count >= self.count_max || self.bytes >= self.bytes_max || ts_msp_last.add_ns(DAY) <= ts_inp { + let ts_msp = ts_inp.div(msp_res_max).mul(msp_res_max); if ts_msp == ts_msp_last { - (ts_msp, false) + // TODO should count these for metrics + (ts_msp, false, None) } else { self.last = Some(ts_msp); self.count = 1; self.bytes = item_bytes; - (ts_msp, true) + (ts_msp, true, Some(ts_msp_last)) } } else { self.count += 1; self.bytes += item_bytes; - (ts_msp_last, false) + (ts_msp_last, false, None) } } None => { - let ts_msp = ts_main.div(msp_res_max).mul(msp_res_max); + let ts_msp = ts_inp.div(msp_res_max).mul(msp_res_max); self.last = Some(ts_msp); self.count = 1; self.bytes = item_bytes; - (ts_msp, true) + (ts_msp, true, None) } }; - let ts_lsp = ts_main.delta(ts_msp); - (ts_msp, ts_lsp, changed) + let ts_lsp = ts_inp.delta(ts_msp); + (ts_msp, ts_lsp, changed, ts_msp_retired) + } + + pub fn ts_msp_current(&self) -> Option { + self.last } } diff --git a/serieswriter/src/ratelimitwriter.rs b/serieswriter/src/ratelimitwriter.rs index fdc26e0..ef24789 100644 --- a/serieswriter/src/ratelimitwriter.rs +++ b/serieswriter/src/ratelimitwriter.rs @@ -27,6 +27,12 @@ autoerr::create_error_v1!( pub struct WriteRes { pub accept: bool, pub bytes: u32, + pub msp_rewrite: u8, +} + +#[derive(Debug)] +pub struct HousekeepingRes { + pub ts_msp_reput: u8, } #[derive(Serialize)] @@ -144,12 +150,14 @@ where let ret = WriteRes { accept: true, bytes: res.bytes, + msp_rewrite: res.msp_rewrite, }; Ok(ret) } else { let ret = WriteRes { accept: false, bytes: 0, + msp_rewrite: 0, }; Ok(ret) } @@ -161,10 +169,17 @@ where } pub fn on_close(&mut self, iqdqs: &mut VecDeque) -> Result<(), Error> { - self.tick(iqdqs)?; self.writer.on_close(iqdqs)?; Ok(()) } + + pub fn housekeeping(&mut self, deque: &mut VecDeque) -> Result { + let res = self.writer.housekeeping(deque)?; + let ret = HousekeepingRes { + ts_msp_reput: res.ts_msp_reput, + }; + Ok(ret) + } } impl fmt::Debug for RateLimitWriter diff --git a/serieswriter/src/rtwriter.rs b/serieswriter/src/rtwriter.rs index 58e0d16..c43bd7e 100644 --- a/serieswriter/src/rtwriter.rs +++ b/serieswriter/src/rtwriter.rs @@ -77,12 +77,17 @@ impl WriteRes { pub fn accept_any(&self) -> bool { self.lt.accept || self.mt.accept || self.st.accept } + + pub fn msp_rewrite(&self) -> u8 { + self.st.msp_rewrite + self.mt.msp_rewrite + self.lt.msp_rewrite + } } #[derive(Debug)] pub struct WriteRtRes { pub accept: bool, pub bytes: u32, + pub msp_rewrite: u8, } impl Default for WriteRtRes { @@ -90,10 +95,16 @@ impl Default for WriteRtRes { Self { accept: false, bytes: 0, + msp_rewrite: 0, } } } +#[derive(Debug)] +pub struct HousekeepingRes { + pub ts_msp_reput: u8, +} + #[derive(Debug, Serialize)] pub struct RtWriter where @@ -246,6 +257,7 @@ where let ret = WriteRtRes { accept: x.accept, bytes: x.bytes, + msp_rewrite: x.msp_rewrite, }; Ok(ret) } @@ -262,7 +274,6 @@ where } pub fn on_close(&mut self, iqdqs: &mut InsertDeques) -> Result<(), Error> { - self.tick(iqdqs)?; if self.do_st_rf1 { self.state_st.writer.on_close(&mut iqdqs.st_rf1_qu)?; } else { @@ -272,4 +283,18 @@ where self.state_lt.writer.on_close(&mut iqdqs.lt_rf3_qu)?; Ok(()) } + + pub fn housekeeping(&mut self, iqdqs: &mut InsertDeques) -> Result { + let res_st = if self.do_st_rf1 { + self.state_st.writer.housekeeping(&mut iqdqs.st_rf1_qu)? + } else { + self.state_st.writer.housekeeping(&mut iqdqs.st_rf3_qu)? + }; + let res_mt = self.state_mt.writer.housekeeping(&mut iqdqs.mt_rf3_qu)?; + let res_lt = self.state_lt.writer.housekeeping(&mut iqdqs.lt_rf3_qu)?; + let ret = HousekeepingRes { + ts_msp_reput: res_st.ts_msp_reput + res_mt.ts_msp_reput + res_lt.ts_msp_reput, + }; + Ok(ret) + } } diff --git a/serieswriter/src/writer.rs b/serieswriter/src/writer.rs index 70aa499..4d18904 100644 --- a/serieswriter/src/writer.rs +++ b/serieswriter/src/writer.rs @@ -11,6 +11,7 @@ use std::time::Instant; use netpod::ByteSize; use scywr::iteminsertqueue::MspItem; pub use smallvec::SmallVec; +use std::time::Duration; macro_rules! trace_emit { ($det:expr, $($arg:tt)*) => ( if $det { log::trace!($($arg)*); } ) } @@ -57,13 +58,30 @@ impl From for Error { #[derive(Debug)] pub struct WriteRes { pub bytes: u32, + pub msp_rewrite: u8, +} + +#[derive(Debug)] +pub struct HousekeepingRes { + pub ts_msp_reput: u8, +} + +#[derive(Debug)] +pub struct OnCloseRes { + pub ts_msp_reput: u8, } #[derive(Debug, Serialize)] pub struct SeriesWriter { series: SeriesId, msp_split: crate::msptool::MspSplit, + evts_on_msp_write: TsNano, + evts_latest: TsNano, do_trace_detail: bool, + #[serde(skip)] + ts_check_ts_msp_reput_last: Instant, + #[serde(skip)] + ts_ts_msp_put_last: Option, _t1: PhantomData, } @@ -75,7 +93,11 @@ where let res = Self { series, msp_split: crate::msptool::MspSplit::new(1024 * 64, 1024 * 1024 * 10), + evts_on_msp_write: TsNano::from_ns(0), + evts_latest: TsNano::from_ns(0), do_trace_detail: series::dbg::dbg_series(series), + ts_check_ts_msp_reput_last: Instant::now(), + ts_ts_msp_put_last: None, _t1: PhantomData, }; Ok(res) @@ -94,17 +116,20 @@ where deque: &mut VecDeque, ) -> Result { let det = self.do_trace_detail; - // let ts_main = item.ts(); let res = item.into_query_item(ts_net, tsev, state); trace_emit!(det, "emit value for ts {tsev}"); - // TODO adapt, taken from trait impl - let (ts_msp, ts_lsp, ts_msp_chg) = self.msp_split.split(tsev, res.bytes.bytes()); + let mut msp_rewrite = 0; + let (ts_msp, ts_lsp, ts_msp_chg, ts_msp_retired) = self.msp_split.split(tsev, res.bytes.bytes()); + if let Some(msp) = ts_msp_retired { + let item = MspItem::new(self.series.clone(), msp.to_ts_ms(), ts_net); + deque.push_back(QueryItem::Msp(item)); + msp_rewrite += 1; + } if ts_msp_chg { - deque.push_back(QueryItem::Msp(MspItem::new( - self.series.clone(), - ts_msp.to_ts_ms(), - ts_net, - ))); + let item = MspItem::new(self.series.clone(), ts_msp.to_ts_ms(), ts_net); + deque.push_back(QueryItem::Msp(item)); + self.evts_on_msp_write = tsev; + self.ts_ts_msp_put_last = Some(Instant::now()); } let item = scywr::iteminsertqueue::InsertItem { series: self.series.clone(), @@ -114,8 +139,10 @@ where val: res.data_item, }; deque.push_back(QueryItem::Insert(item)); + self.evts_latest = tsev; let res = WriteRes { bytes: res.bytes.bytes(), + msp_rewrite, }; Ok(res) } @@ -124,7 +151,41 @@ where Ok(()) } - pub fn on_close(&mut self, _deque: &mut VecDeque) -> Result<(), Error> { - Ok(()) + pub fn on_close(&mut self, deque: &mut VecDeque) -> Result { + if let Some(msp) = self.msp_split.ts_msp_current() { + let item = MspItem::new(self.series.clone(), msp.to_ts_ms(), Instant::now()); + deque.push_back(QueryItem::Msp(item)); + } + let ret = OnCloseRes { ts_msp_reput: 1 }; + Ok(ret) + } + + pub fn housekeeping(&mut self, deque: &mut VecDeque) -> Result { + let tsnow = Instant::now(); + let mut ts_msp_reput = 0; + if self.ts_check_ts_msp_reput_last + Duration::from_millis(1000 * 60) <= tsnow { + self.ts_check_ts_msp_reput_last = tsnow; + if let Some(ts_ts_msp_put_last) = self.ts_ts_msp_put_last { + if ts_ts_msp_put_last + Duration::from_millis(1000 * 60 * 60) <= tsnow { + if self.evts_latest != self.evts_on_msp_write { + if let Some(msp) = self.msp_split.ts_msp_current() { + self.ts_ts_msp_put_last = Some(tsnow); + self.evts_on_msp_write = self.evts_latest; + let item = MspItem::new(self.series.clone(), msp.to_ts_ms(), tsnow); + deque.push_back(QueryItem::Msp(item)); + ts_msp_reput += 1; + } else { + // TODO return this, it should not happen + } + } else { + } + } else { + } + } else { + } + } else { + } + let ret = HousekeepingRes { ts_msp_reput }; + Ok(ret) } } diff --git a/stats/mettdecl.rs b/stats/mettdecl.rs index 3922d85..5cb55d8 100644 --- a/stats/mettdecl.rs +++ b/stats/mettdecl.rs @@ -64,6 +64,9 @@ mod Metrics { unknown_subid, get_series_id_ok, channel_add_exists, + ts_msp_reput_onevent, + ts_msp_reput_periodic, + series_writer_on_close, } enum histolog2s { clock_ioc_diff_abs,