Enable on writers periodic and on close housekeeping

This commit is contained in:
Dominik Werder
2025-07-02 16:24:56 +02:00
parent 0d3caea38a
commit 94325b1400
7 changed files with 177 additions and 34 deletions

View File

@@ -643,10 +643,11 @@ impl Daemon {
}
async fn handle_shutdown(&mut self) -> Result<(), Error> {
let selfname = "handle_shutdown";
if self.shutting_down {
warn!("already shutting down");
warn!("{selfname} already shutting down");
} else {
info!("handle_shutdown");
info!("{selfname} handle_shutdown");
self.shutting_down = true;
// TODO make sure we:
// set a flag so that we don't attempt to use resources any longer (why could that happen?)
@@ -656,10 +657,10 @@ impl Daemon {
// await the connection sets.
// await other workers that we've spawned.
if let Some(iqtx) = &self.iqtx {
info!("scylla output channels, closing all");
iqtx.close_all();
info!("{selfname} scylla output channels {iqtx}", iqtx = iqtx.summary());
// iqtx.close_all();
} else {
info!("scylla output channels, not set");
info!("{selfname} scylla output channels, not set");
}
drop(self.iqtx.take());
self.connset_ctrl.shutdown().await?;

View File

@@ -70,7 +70,6 @@ use stats::rand_xoshiro::rand_core::SeedableRng;
use std::collections::BTreeMap;
use std::collections::VecDeque;
use std::fmt;
use std::marker::PhantomData;
use std::net::SocketAddrV4;
use std::pin::Pin;
use std::sync::atomic;
@@ -107,6 +106,10 @@ macro_rules! trace_event_incoming { ($($arg:tt)*) => ( if false { log::trace!($(
macro_rules! trace_monitor_stale { ($($arg:tt)*) => ( if false { log::trace!($($arg)*); } ); }
macro_rules! trace_channel_remove { ($($arg:tt)*) => ( if true { log::trace!($($arg)*); } ); }
macro_rules! debug_shutdown { ($($arg:tt)*) => ( if true { log::debug!($($arg)*); } ); }
fn dbg_chn_cid(cid: Cid, conn: &CaConn) -> bool {
if let Some(name) = conn.name_by_cid(cid) {
series::dbg::dbg_chn(name)
@@ -1084,6 +1087,7 @@ impl<'a> EventAddIngestRefobj<'a> {
if let Some(binwriter) = self.binwriter.as_mut() {
binwriter.ingest(tsev, val_for_agg, self.iqdqs)?;
}
self.mett.ts_msp_reput_onevent().add(wres.msp_rewrite() as u32);
}
if false {
// TODO record stats on drop with the new filter
@@ -1498,6 +1502,8 @@ impl CaConn {
}
fn trigger_shutdown(&mut self, reason: ShutdownReason) {
let selfname = "trigger_shutdown";
debug_shutdown!("{selfname} {addr}", addr = self.remote_addr_dbg);
let channel_reason = match &reason {
ShutdownReason::ConnectRefused => {
self.state = CaConnState::Shutdown(EndOfStreamReason::ConnectRefused);
@@ -1870,7 +1876,6 @@ impl CaConn {
let selfname = "channel_close_by_name";
debug!("{selfname} {}", name);
if let Some(x) = self.cid_by_name.get(&name).map(Clone::clone) {
self.cid_by_name.remove(&name);
self.channel_close_by_cid(x.clone())
} else {
warn!("{selfname} {} can not find channel", name);
@@ -1885,6 +1890,8 @@ impl CaConn {
let stnow = SystemTime::now();
if let Some(conf) = self.channels.get_mut(&cid) {
let name = conf.conf.name();
trace_channel_remove!("{selfname} channel_close_by_cid {cid} {name}");
self.cid_by_name.remove(name);
{
// TODO emit CaConn item to let CaConnSet know that we have closed the channel.
// TODO may be too full
@@ -1904,8 +1911,14 @@ impl CaConn {
// TODO shutdown the internal writer structures.
match &mut conf.state {
ChannelState::Writable(st2) => {
if st2.writer.tick(&mut self.iqdqs).is_err() {
self.mett.logic_error().inc();
}
if st2.writer.on_close(&mut self.iqdqs).is_err() {
self.mett.logic_error().inc();
} else {
trace_channel_remove!("writer on_close done Ok {}", conf.conf.name());
self.mett.series_writer_on_close().inc();
}
}
_ => {}
@@ -3367,6 +3380,27 @@ impl CaConn {
CaConnState::EndOfStream => {}
CaConnState::MetricsEmitted => {}
}
{
let self2 = self.as_mut().get_mut();
let mut ts_msp_reput = 0;
let mut logic_error = 0;
for (_, conf) in &mut self2.channels {
let st1 = &mut conf.state;
match st1 {
ChannelState::Writable(st2) => match st2.writer.housekeeping(&mut self2.iqdqs) {
Ok(res) => {
ts_msp_reput += res.ts_msp_reput as u32;
}
Err(_) => {
logic_error += 1;
}
},
_ => {}
}
}
self.mett.ts_msp_reput_periodic().add(ts_msp_reput);
self.mett.logic_error().add(logic_error);
}
self.iqdqs.housekeeping();
if self.metrics_emit_last + METRICS_EMIT_IVL <= tsnow {
self.metrics_emit_last = tsnow;

View File

@@ -2,11 +2,10 @@ pub mod fixgrid;
use netpod::DtNano;
use netpod::TsNano;
use netpod::timeunits::DAY;
use netpod::timeunits::SEC;
use serde::Serialize;
const SEC: u64 = 1000_000_000;
const HOUR: u64 = SEC * 60 * 60 * 24;
#[derive(Debug, Serialize)]
pub struct MspSplit {
last: Option<TsNano>,
@@ -27,37 +26,42 @@ impl MspSplit {
}
}
pub fn split(&mut self, ts: TsNano, item_bytes: u32) -> (TsNano, DtNano, bool) {
pub fn split(&mut self, ts: TsNano, item_bytes: u32) -> (TsNano, DtNano, bool, Option<TsNano>) {
// Maximum resolution of the ts msp:
let msp_res_max = SEC * 2;
let ts_main = ts;
let (ts_msp, changed) = match self.last {
let ts_inp = ts;
let (ts_msp, changed, ts_msp_retired) = match self.last {
Some(ts_msp_last) => {
if self.count >= self.count_max || self.bytes >= self.bytes_max || ts_msp_last.add_ns(HOUR) <= ts_main {
let ts_msp = ts_main.div(msp_res_max).mul(msp_res_max);
if self.count >= self.count_max || self.bytes >= self.bytes_max || ts_msp_last.add_ns(DAY) <= ts_inp {
let ts_msp = ts_inp.div(msp_res_max).mul(msp_res_max);
if ts_msp == ts_msp_last {
(ts_msp, false)
// TODO should count these for metrics
(ts_msp, false, None)
} else {
self.last = Some(ts_msp);
self.count = 1;
self.bytes = item_bytes;
(ts_msp, true)
(ts_msp, true, Some(ts_msp_last))
}
} else {
self.count += 1;
self.bytes += item_bytes;
(ts_msp_last, false)
(ts_msp_last, false, None)
}
}
None => {
let ts_msp = ts_main.div(msp_res_max).mul(msp_res_max);
let ts_msp = ts_inp.div(msp_res_max).mul(msp_res_max);
self.last = Some(ts_msp);
self.count = 1;
self.bytes = item_bytes;
(ts_msp, true)
(ts_msp, true, None)
}
};
let ts_lsp = ts_main.delta(ts_msp);
(ts_msp, ts_lsp, changed)
let ts_lsp = ts_inp.delta(ts_msp);
(ts_msp, ts_lsp, changed, ts_msp_retired)
}
pub fn ts_msp_current(&self) -> Option<TsNano> {
self.last
}
}

View File

@@ -27,6 +27,12 @@ autoerr::create_error_v1!(
pub struct WriteRes {
pub accept: bool,
pub bytes: u32,
pub msp_rewrite: u8,
}
#[derive(Debug)]
pub struct HousekeepingRes {
pub ts_msp_reput: u8,
}
#[derive(Serialize)]
@@ -144,12 +150,14 @@ where
let ret = WriteRes {
accept: true,
bytes: res.bytes,
msp_rewrite: res.msp_rewrite,
};
Ok(ret)
} else {
let ret = WriteRes {
accept: false,
bytes: 0,
msp_rewrite: 0,
};
Ok(ret)
}
@@ -161,10 +169,17 @@ where
}
pub fn on_close(&mut self, iqdqs: &mut VecDeque<QueryItem>) -> Result<(), Error> {
self.tick(iqdqs)?;
self.writer.on_close(iqdqs)?;
Ok(())
}
pub fn housekeeping(&mut self, deque: &mut VecDeque<QueryItem>) -> Result<HousekeepingRes, Error> {
let res = self.writer.housekeeping(deque)?;
let ret = HousekeepingRes {
ts_msp_reput: res.ts_msp_reput,
};
Ok(ret)
}
}
impl<ET> fmt::Debug for RateLimitWriter<ET>

View File

@@ -77,12 +77,17 @@ impl WriteRes {
pub fn accept_any(&self) -> bool {
self.lt.accept || self.mt.accept || self.st.accept
}
pub fn msp_rewrite(&self) -> u8 {
self.st.msp_rewrite + self.mt.msp_rewrite + self.lt.msp_rewrite
}
}
#[derive(Debug)]
pub struct WriteRtRes {
pub accept: bool,
pub bytes: u32,
pub msp_rewrite: u8,
}
impl Default for WriteRtRes {
@@ -90,10 +95,16 @@ impl Default for WriteRtRes {
Self {
accept: false,
bytes: 0,
msp_rewrite: 0,
}
}
}
#[derive(Debug)]
pub struct HousekeepingRes {
pub ts_msp_reput: u8,
}
#[derive(Debug, Serialize)]
pub struct RtWriter<ET>
where
@@ -246,6 +257,7 @@ where
let ret = WriteRtRes {
accept: x.accept,
bytes: x.bytes,
msp_rewrite: x.msp_rewrite,
};
Ok(ret)
}
@@ -262,7 +274,6 @@ where
}
pub fn on_close(&mut self, iqdqs: &mut InsertDeques) -> Result<(), Error> {
self.tick(iqdqs)?;
if self.do_st_rf1 {
self.state_st.writer.on_close(&mut iqdqs.st_rf1_qu)?;
} else {
@@ -272,4 +283,18 @@ where
self.state_lt.writer.on_close(&mut iqdqs.lt_rf3_qu)?;
Ok(())
}
pub fn housekeeping(&mut self, iqdqs: &mut InsertDeques) -> Result<HousekeepingRes, Error> {
let res_st = if self.do_st_rf1 {
self.state_st.writer.housekeeping(&mut iqdqs.st_rf1_qu)?
} else {
self.state_st.writer.housekeeping(&mut iqdqs.st_rf3_qu)?
};
let res_mt = self.state_mt.writer.housekeeping(&mut iqdqs.mt_rf3_qu)?;
let res_lt = self.state_lt.writer.housekeeping(&mut iqdqs.lt_rf3_qu)?;
let ret = HousekeepingRes {
ts_msp_reput: res_st.ts_msp_reput + res_mt.ts_msp_reput + res_lt.ts_msp_reput,
};
Ok(ret)
}
}

View File

@@ -11,6 +11,7 @@ use std::time::Instant;
use netpod::ByteSize;
use scywr::iteminsertqueue::MspItem;
pub use smallvec::SmallVec;
use std::time::Duration;
macro_rules! trace_emit { ($det:expr, $($arg:tt)*) => ( if $det { log::trace!($($arg)*); } ) }
@@ -57,13 +58,30 @@ impl From<async_channel::RecvError> for Error {
#[derive(Debug)]
pub struct WriteRes {
pub bytes: u32,
pub msp_rewrite: u8,
}
#[derive(Debug)]
pub struct HousekeepingRes {
pub ts_msp_reput: u8,
}
#[derive(Debug)]
pub struct OnCloseRes {
pub ts_msp_reput: u8,
}
#[derive(Debug, Serialize)]
pub struct SeriesWriter<ET> {
series: SeriesId,
msp_split: crate::msptool::MspSplit,
evts_on_msp_write: TsNano,
evts_latest: TsNano,
do_trace_detail: bool,
#[serde(skip)]
ts_check_ts_msp_reput_last: Instant,
#[serde(skip)]
ts_ts_msp_put_last: Option<Instant>,
_t1: PhantomData<ET>,
}
@@ -75,7 +93,11 @@ where
let res = Self {
series,
msp_split: crate::msptool::MspSplit::new(1024 * 64, 1024 * 1024 * 10),
evts_on_msp_write: TsNano::from_ns(0),
evts_latest: TsNano::from_ns(0),
do_trace_detail: series::dbg::dbg_series(series),
ts_check_ts_msp_reput_last: Instant::now(),
ts_ts_msp_put_last: None,
_t1: PhantomData,
};
Ok(res)
@@ -94,17 +116,20 @@ where
deque: &mut VecDeque<QueryItem>,
) -> Result<WriteRes, Error> {
let det = self.do_trace_detail;
// let ts_main = item.ts();
let res = item.into_query_item(ts_net, tsev, state);
trace_emit!(det, "emit value for ts {tsev}");
// TODO adapt, taken from trait impl
let (ts_msp, ts_lsp, ts_msp_chg) = self.msp_split.split(tsev, res.bytes.bytes());
let mut msp_rewrite = 0;
let (ts_msp, ts_lsp, ts_msp_chg, ts_msp_retired) = self.msp_split.split(tsev, res.bytes.bytes());
if let Some(msp) = ts_msp_retired {
let item = MspItem::new(self.series.clone(), msp.to_ts_ms(), ts_net);
deque.push_back(QueryItem::Msp(item));
msp_rewrite += 1;
}
if ts_msp_chg {
deque.push_back(QueryItem::Msp(MspItem::new(
self.series.clone(),
ts_msp.to_ts_ms(),
ts_net,
)));
let item = MspItem::new(self.series.clone(), ts_msp.to_ts_ms(), ts_net);
deque.push_back(QueryItem::Msp(item));
self.evts_on_msp_write = tsev;
self.ts_ts_msp_put_last = Some(Instant::now());
}
let item = scywr::iteminsertqueue::InsertItem {
series: self.series.clone(),
@@ -114,8 +139,10 @@ where
val: res.data_item,
};
deque.push_back(QueryItem::Insert(item));
self.evts_latest = tsev;
let res = WriteRes {
bytes: res.bytes.bytes(),
msp_rewrite,
};
Ok(res)
}
@@ -124,7 +151,41 @@ where
Ok(())
}
pub fn on_close(&mut self, _deque: &mut VecDeque<QueryItem>) -> Result<(), Error> {
Ok(())
pub fn on_close(&mut self, deque: &mut VecDeque<QueryItem>) -> Result<OnCloseRes, Error> {
if let Some(msp) = self.msp_split.ts_msp_current() {
let item = MspItem::new(self.series.clone(), msp.to_ts_ms(), Instant::now());
deque.push_back(QueryItem::Msp(item));
}
let ret = OnCloseRes { ts_msp_reput: 1 };
Ok(ret)
}
pub fn housekeeping(&mut self, deque: &mut VecDeque<QueryItem>) -> Result<HousekeepingRes, Error> {
let tsnow = Instant::now();
let mut ts_msp_reput = 0;
if self.ts_check_ts_msp_reput_last + Duration::from_millis(1000 * 60) <= tsnow {
self.ts_check_ts_msp_reput_last = tsnow;
if let Some(ts_ts_msp_put_last) = self.ts_ts_msp_put_last {
if ts_ts_msp_put_last + Duration::from_millis(1000 * 60 * 60) <= tsnow {
if self.evts_latest != self.evts_on_msp_write {
if let Some(msp) = self.msp_split.ts_msp_current() {
self.ts_ts_msp_put_last = Some(tsnow);
self.evts_on_msp_write = self.evts_latest;
let item = MspItem::new(self.series.clone(), msp.to_ts_ms(), tsnow);
deque.push_back(QueryItem::Msp(item));
ts_msp_reput += 1;
} else {
// TODO return this, it should not happen
}
} else {
}
} else {
}
} else {
}
} else {
}
let ret = HousekeepingRes { ts_msp_reput };
Ok(ret)
}
}

View File

@@ -64,6 +64,9 @@ mod Metrics {
unknown_subid,
get_series_id_ok,
channel_add_exists,
ts_msp_reput_onevent,
ts_msp_reput_periodic,
series_writer_on_close,
}
enum histolog2s {
clock_ioc_diff_abs,