From cca3d77af988d5e7a93245a984fc1c528c0fc526 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Mon, 3 Feb 2025 16:33:02 +0100 Subject: [PATCH] Improve bin writer --- daqingest/Cargo.toml | 2 +- netfetch/src/ca/conn.rs | 16 +- netfetch/src/ca/connset.rs | 2 +- netfetch/src/ca/statemap.rs | 12 +- netfetch/src/metrics/status.rs | 11 +- serieswriter/Cargo.toml | 1 + serieswriter/src/binwriter.rs | 219 ++++++++++++++++++++++++---- serieswriter/src/binwritergrid.rs | 156 -------------------- serieswriter/src/lib.rs | 1 - serieswriter/src/ratelimitwriter.rs | 8 +- 10 files changed, 222 insertions(+), 206 deletions(-) delete mode 100644 serieswriter/src/binwritergrid.rs diff --git a/daqingest/Cargo.toml b/daqingest/Cargo.toml index cec4a88..a370f72 100644 --- a/daqingest/Cargo.toml +++ b/daqingest/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "daqingest" -version = "0.2.5" +version = "0.2.6-aa.0" authors = ["Dominik Werder "] edition = "2021" diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index a2d8772..ced0bc4 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -1412,6 +1412,7 @@ impl CaConn { writer.series(), st2.channel.scalar_type.clone(), st2.channel.shape.clone(), + conf.conf.name().into(), )?; self.stats.get_series_id_ok.inc(); { @@ -1773,13 +1774,13 @@ impl CaConn { // return Err(Error::with_msg_no_trace()); return Ok(()); }; - if dbg_chn { - info!("handle_event_add_res {:?} {:?}", cid, ev); + if false && dbg_chn { + trace!("handle_event_add_res {:?} {:?}", cid, ev); } match ch_s { ChannelState::Writable(st) => { - if dbg_chn { - info!("handle_event_add_res Writable {:?} {:?}", cid, ev); + if false && dbg_chn { + trace!("handle_event_add_res Writable {:?} {:?}", cid, ev); } // debug!( // "CaConn sees data_count {} payload_len {}", @@ -3061,7 +3062,12 @@ impl CaConn { } if self.tick_last_writer + Duration::from_millis(2000) <= tsnow { self.tick_last_writer = tsnow; - self.tick_writers()?; + match self.tick_writers() { + Ok(()) => {} + Err(e) => { + error!("error in writers: {e}"); + } + } } match &self.state { CaConnState::Unconnected(_) => {} diff --git a/netfetch/src/ca/connset.rs b/netfetch/src/ca/connset.rs index 65c92c2..6cf0a08 100644 --- a/netfetch/src/ca/connset.rs +++ b/netfetch/src/ca/connset.rs @@ -362,7 +362,7 @@ impl IocAddrQuery { } fn bump_backoff(x: &mut u32) { - *x = (1 + *x).min(10); + *x = (1 + *x).min(20); } struct SeriesLookupSender { diff --git a/netfetch/src/ca/statemap.rs b/netfetch/src/ca/statemap.rs index 289fb66..95bdfef 100644 --- a/netfetch/src/ca/statemap.rs +++ b/netfetch/src/ca/statemap.rs @@ -112,7 +112,17 @@ pub struct MaybeWrongAddressState { impl MaybeWrongAddressState { pub fn new(since: SystemTime, backoff_cnt: u32) -> Self { - let f = 2. + 60. * (backoff_cnt as f32 / 5.).tanh(); + // print(", ".join(["{:.5}".format(tanh(i/10)) for i in range(24)])) + const TANH: [f32; 24] = [ + 0.0, 0.099668, 0.19738, 0.29131, 0.37995, 0.46212, 0.53705, 0.60437, 0.66404, 0.7163, 0.76159, 0.8005, + 0.83365, 0.86172, 0.88535, 0.90515, 0.92167, 0.93541, 0.94681, 0.95624, 0.96403, 0.97045, 0.97574, 0.9801, + ]; + const Y1: f32 = 30.; + const Y20: f32 = 300.; + const B: f32 = (Y20 - Y1) / (TANH[20] - TANH[1]); + const A: f32 = Y1 - B * TANH[1]; + let backoff_cnt = backoff_cnt.max(1).min(20); + let f = A + B * TANH[backoff_cnt as usize]; let dtms = 1e3 * f; Self { since, diff --git a/netfetch/src/metrics/status.rs b/netfetch/src/metrics/status.rs index 5fffd30..7960959 100644 --- a/netfetch/src/metrics/status.rs +++ b/netfetch/src/metrics/status.rs @@ -13,11 +13,12 @@ use std::collections::HashMap; use std::net::SocketAddr; use std::time::SystemTime; -#[derive(Debug, ThisError)] -#[cstm(name = "StatusError")] -pub enum Error { - Internal, -} +autoerr::create_error_v1!( + name(Error, "StatusError"), + enum variants { + Internal, + }, +); #[derive(Debug, Serialize)] pub struct ChannelStates { diff --git a/serieswriter/Cargo.toml b/serieswriter/Cargo.toml index 7840a7d..6d9e504 100644 --- a/serieswriter/Cargo.toml +++ b/serieswriter/Cargo.toml @@ -10,6 +10,7 @@ async-channel = "2.1.1" futures-util = "0.3.30" smallvec = "1.13.2" autoerr = "0.0.3" +itertools = "0.14" log = { path = "../log" } netpod = { path = "../../daqbuf-netpod", package = "daqbuf-netpod" } items_0 = { path = "../../daqbuf-items-0", package = "daqbuf-items-0" } diff --git a/serieswriter/src/binwriter.rs b/serieswriter/src/binwriter.rs index 480957e..36f9a32 100644 --- a/serieswriter/src/binwriter.rs +++ b/serieswriter/src/binwriter.rs @@ -1,22 +1,33 @@ -use crate::binwritergrid::BinWriterGrid; use crate::log::*; use crate::rtwriter::MinQuiets; +use items_0::timebin::BinnedBinsTimeweightTrait; +use items_0::timebin::BinnedEventsTimeweightTrait; +use items_0::timebin::BinsBoxed; +use items_2::binning::container_bins::ContainerBins; use items_2::binning::container_events::ContainerEvents; +use items_2::binning::timeweight::timeweight_bins::BinnedBinsTimeweight; +use items_2::binning::timeweight::timeweight_bins_lazy::BinnedBinsTimeweightLazy; +use items_2::binning::timeweight::timeweight_events::BinnedEventsTimeweight; +use items_2::binning::timeweight::timeweight_events_dyn::BinnedEventsTimeweightLazy; use netpod::ttl::RetentionTime; +use netpod::BinnedRange; use netpod::DtMs; use netpod::ScalarType; use netpod::Shape; use netpod::TsNano; use scywr::insertqueues::InsertDeques; +use scywr::iteminsertqueue::QueryItem; +use scywr::iteminsertqueue::TimeBinSimpleF32V02; use series::ChannelStatusSeriesId; use series::SeriesId; -use std::mem; use std::time::Duration; macro_rules! trace_ingest { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) } macro_rules! trace_tick { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) } macro_rules! trace_tick_verbose { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) } +macro_rules! debug_bin2 { ($t:expr, $($arg:tt)*) => ( if $t { debug!($($arg)*); } ) } + autoerr::create_error_v1!( name(Error, "SerieswriterBinwriter"), enum variants { @@ -24,18 +35,46 @@ autoerr::create_error_v1!( SeriesWriter(#[from] crate::writer::Error), Binning(#[from] items_2::binning::timeweight::timeweight_events::Error), UnsupportedBinGrid(DtMs), - BinWriterGrid(#[from] crate::binwritergrid::Error), + BinBinning(#[from] items_0::timebin::BinningggError), + UnexpectedContainerType, + PartitionMsp(#[from] series::msp::Error), }, ); +fn bin_len_clamp(dur: Duration) -> DtMs { + let dur = DtMs::from_ms_u64(1000 * dur.as_secs()); + if dur < DtMs::from_ms_u64(1000 * 2) { + DtMs::from_ms_u64(1000 * 1) + } else if dur <= DtMs::from_ms_u64(1000 * 20) { + DtMs::from_ms_u64(1000 * 10) + } else if dur <= DtMs::from_ms_u64(1000 * 60 * 2) { + DtMs::from_ms_u64(1000 * 60 * 1) + } else if dur <= DtMs::from_ms_u64(1000 * 60 * 20) { + DtMs::from_ms_u64(1000 * 60 * 10) + } else if dur <= DtMs::from_ms_u64(1000 * 60 * 60 * 2) { + DtMs::from_ms_u64(1000 * 60 * 60 * 1) + } else { + DtMs::from_ms_u64(1000 * 60 * 60 * 24) + } +} + +fn get_div(bin_len: DtMs) -> Result { + let pbp = series::msp::PrebinnedPartitioning::try_from(bin_len)?; + let ret = pbp.msp_div(); + Ok(ret) +} + #[derive(Debug)] pub struct BinWriter { + chname: String, cssid: ChannelStatusSeriesId, sid: SeriesId, scalar_type: ScalarType, shape: Shape, evbuf: ContainerEvents, - writers: Vec, + binner_1st: Option<(RetentionTime, BinnedEventsTimeweight)>, + binner_others: Vec<(RetentionTime, BinnedBinsTimeweight)>, + trd: bool, } impl BinWriter { @@ -46,35 +85,59 @@ impl BinWriter { sid: SeriesId, scalar_type: ScalarType, shape: Shape, + chname: String, ) -> Result { - let mut writers = Vec::new(); - for (rt, dur) in [RetentionTime::Short, RetentionTime::Medium, RetentionTime::Long] - .into_iter() - .zip([min_quiets.st.clone(), min_quiets.mt.clone(), min_quiets.lt.clone()].into_iter()) - { - if dur > Duration::ZERO && dur < Duration::from_millis(1000 * 60 * 60 * 24) { - let bin_len = if dur < Duration::from_millis(1000 * 2) { - DtMs::from_ms_u64(1000 * 1) - } else if dur < Duration::from_millis(1000 * 20) { - DtMs::from_ms_u64(1000 * 10) - } else if dur < Duration::from_millis(1000 * 60 * 2) { - DtMs::from_ms_u64(1000 * 60 * 1) - } else if dur < Duration::from_millis(1000 * 60 * 20) { - DtMs::from_ms_u64(1000 * 60 * 10) + let trd = series::dbg::dbg_chn(&chname); + if trd { + debug_bin2!(trd, "enabled debug for {}", chname); + } + const DUR_ZERO: DtMs = DtMs::from_ms_u64(0); + const DUR_MAX: DtMs = DtMs::from_ms_u64(1000 * 60 * 60 * 24 * 40); + let rts = [RetentionTime::Short, RetentionTime::Medium, RetentionTime::Long]; + let quiets = [min_quiets.st.clone(), min_quiets.mt.clone(), min_quiets.lt.clone()]; + let mut binner_1st = None; + let mut binner_others = Vec::new(); + let mut combs: Vec<_> = rts.into_iter().zip(quiets.into_iter().map(bin_len_clamp)).collect(); + if let Some(last) = combs.last_mut() { + if last.1 >= DtMs::from_ms_u64(1000 * 60 * 60 * 24) { + last.0 = RetentionTime::Long; + } else { + combs.push((RetentionTime::Long, DtMs::from_ms_u64(1000 * 60 * 60 * 24))); + } + } + // check + for e in combs.iter() { + if get_div(e.1).is_err() { + info!("unsupported bin length {:?} {:?} {:?}", e.0, e.1, chname); + combs.clear(); + break; + } + } + let combs = combs; + debug_bin2!(trd, "{:?} binning combs {:?}", chname, combs); + for (rt, bin_len) in combs { + if bin_len > DUR_ZERO && bin_len <= DUR_MAX { + if binner_1st.is_none() { + let range = BinnedRange::from_beg_to_inf(beg, bin_len); + let binner = BinnedEventsTimeweight::new(range); + binner_1st = Some((rt, binner)); } else { - DtMs::from_ms_u64(1000 * 60 * 60 * 1) - }; - let writer = BinWriterGrid::new(beg, rt, bin_len, cssid, sid, scalar_type.clone(), shape.clone())?; - writers.push(writer); + let range = BinnedRange::from_beg_to_inf(beg, bin_len); + let binner = BinnedBinsTimeweight::new(range); + binner_others.push((rt, binner)); + } } } let ret = Self { + chname, cssid, sid, scalar_type, shape, evbuf: ContainerEvents::new(), - writers, + binner_1st, + binner_others, + trd, }; let _ = ret.cssid; Ok(ret) @@ -102,16 +165,114 @@ impl BinWriter { pub fn tick(&mut self, iqdqs: &mut InsertDeques) -> Result<(), Error> { if self.evbuf.len() != 0 { trace_tick!("tick evbuf len {}", self.evbuf.len()); - let buf = mem::replace(&mut self.evbuf, ContainerEvents::new()); - // TODO bin the more fine grid from the coarse grid, do not clone events - for writer in self.writers.iter_mut() { - writer.ingest(&buf, iqdqs)?; + let buf = &self.evbuf; + if true { + if let Some(binner) = self.binner_1st.as_mut() { + let rt = binner.0.clone(); + // TODO avoid boxing + binner.1.ingest(&Box::new(buf))?; + let bins = binner.1.output(); + if bins.len() > 0 { + trace_bin2!(self.trd, "binner_1st out len {}", bins.len()); + Self::handle_output_ready(self.trd, self.sid, rt, &bins, iqdqs)?; + // + // TODO write these bins to scylla + // + // TODO avoid boxing + let mut bins2: BinsBoxed = Box::new(bins); + for i in 0..self.binner_others.len() { + let (rt, binner) = &mut self.binner_others[i]; + binner.ingest(&bins2)?; + let bb: Option = binner.output()?; + match bb { + Some(bb) => { + if bb.len() > 0 { + trace_bin2!(self.trd, "binner_others {} out len {}", i, bb.len()); + if let Some(bb2) = bb.as_any_ref().downcast_ref::>() { + Self::handle_output_ready(self.trd, self.sid, rt.clone(), &bb2, iqdqs)?; + } else { + return Err(Error::UnexpectedContainerType); + } + // + // TODO write these bins to scylla + // + bins2 = bb; + } else { + break; + } + } + None => { + break; + } + } + } + } else { + // nothing to do + } + } else { + // nothing to do + } } + self.evbuf.clear(); } else { trace_tick_verbose!("tick NOTHING TO INGEST"); } - for writer in self.writers.iter_mut() { - writer.tick(iqdqs)?; + Ok(()) + } + + fn handle_output_ready( + trd: bool, + series: SeriesId, + rt: RetentionTime, + bins: &ContainerBins, + iqdqs: &mut InsertDeques, + ) -> Result<(), Error> { + let selfname = "handle_output_ready"; + trace_tick!("{selfname} bins ready len {}", bins.len()); + for e in bins.iter_debug() { + trace_tick_verbose!("{e:?}"); + } + let bins_len = bins.len(); + for (ts1, ts2, cnt, min, max, avg, lst, fnl) in bins.zip_iter_2() { + if fnl == false { + info!("non final bin"); + } else if cnt == 0 { + info!("zero count bin"); + } else { + let bin_len = DtMs::from_ms_u64(ts2.delta(ts1).ms_u64()); + let div = get_div(bin_len)?; + if div.ns() % bin_len.ns() != 0 { + panic!("divisor not a multiple {:?} {:?}", bin_len, div); + } + let msp = ts1.ms() / div.ms(); + let off = (ts1.ms() - div.ms() * msp) / bin_len.ms(); + let item = QueryItem::TimeBinSimpleF32V02(TimeBinSimpleF32V02 { + series, + binlen: bin_len.ms() as i32, + msp: msp as i64, + off: off as i32, + cnt: cnt as i64, + min, + max, + avg, + dev: f32::NAN, + lst, + }); + if bin_len >= DtMs::from_ms_u64(1000 * 60 * 60) { + debug_bin2!(trd, "handle_output_ready emit {:?} len {} {:?}", rt, bins_len, item); + } + match rt { + RetentionTime::Short => { + iqdqs.st_rf3_qu.push_back(item); + } + RetentionTime::Medium => { + iqdqs.mt_rf3_qu.push_back(item); + } + RetentionTime::Long => { + iqdqs.lt_rf3_qu.push_back(item); + } + } + } } Ok(()) } diff --git a/serieswriter/src/binwritergrid.rs b/serieswriter/src/binwritergrid.rs deleted file mode 100644 index 6ae219b..0000000 --- a/serieswriter/src/binwritergrid.rs +++ /dev/null @@ -1,156 +0,0 @@ -use items_2::binning::container_bins::ContainerBins; -use items_2::binning::container_events::ContainerEvents; -use items_2::binning::timeweight::timeweight_events::BinnedEventsTimeweight; -use netpod::log::*; -use netpod::range::evrange::NanoRange; -use netpod::ttl::RetentionTime; -use netpod::BinnedRange; -use netpod::DtMs; -use netpod::ScalarType; -use netpod::Shape; -use netpod::TsNano; -use scywr::insertqueues::InsertDeques; -use scywr::iteminsertqueue::QueryItem; -use scywr::iteminsertqueue::TimeBinSimpleF32V02; -use series::ChannelStatusSeriesId; -use series::SeriesId; - -macro_rules! trace_ingest { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) } -macro_rules! trace_tick { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) } -macro_rules! trace_tick_verbose { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) } - -autoerr::create_error_v1!( - name(Error, "SerieswriterBinwriterGrid"), - enum variants { - SeriesLookupError, - SeriesWriter(#[from] crate::writer::Error), - Binning(#[from] items_2::binning::timeweight::timeweight_events::Error), - UnsupportedBinGrid(DtMs), - }, -); - -#[derive(Debug)] -pub struct BinWriterGrid { - rt: RetentionTime, - cssid: ChannelStatusSeriesId, - sid: SeriesId, - scalar_type: ScalarType, - shape: Shape, - binner: BinnedEventsTimeweight, -} - -impl BinWriterGrid { - pub fn new( - beg: TsNano, - rt: RetentionTime, - bin_len: DtMs, - cssid: ChannelStatusSeriesId, - sid: SeriesId, - scalar_type: ScalarType, - shape: Shape, - ) -> Result { - let margin = 1000 * 1000 * 1000 * 60 * 60 * 24 * 40; - let end = (u64::MAX - margin) / bin_len.ns() * bin_len.ns(); - let range = BinnedRange::from_nano_range(NanoRange::from_ns_u64(beg.ns(), end), bin_len); - let binner = BinnedEventsTimeweight::new(range).disable_cnt_zero(); - let ret = Self { - rt, - cssid, - sid, - scalar_type, - shape, - binner, - }; - let _ = &ret.rt; - let _ = &ret.cssid; - Ok(ret) - } - - pub fn sid(&self) -> SeriesId { - self.sid.clone() - } - - pub fn scalar_type(&self) -> ScalarType { - self.scalar_type.clone() - } - - pub fn shape(&self) -> Shape { - self.shape.clone() - } - - pub fn ingest(&mut self, evs: &ContainerEvents, iqdqs: &mut InsertDeques) -> Result<(), Error> { - let _ = iqdqs; - trace_ingest!("{:?} {:?}", self, evs); - self.binner.ingest(evs)?; - Ok(()) - } - - fn handle_output_ready(&mut self, out: ContainerBins, iqdqs: &mut InsertDeques) -> Result<(), Error> { - let selfname = "handle_output_ready"; - trace_tick!("{selfname} bins ready len {}", out.len()); - for e in out.iter_debug() { - trace_tick_verbose!("{e:?}"); - } - for (((((((&ts1, &ts2), &cnt), min), max), avg), lst), &fnl) in out.zip_iter() { - if fnl == false { - info!("non final bin"); - } else if cnt == 0 { - info!("zero count bin"); - } else { - let bin_len = DtMs::from_ms_u64(ts2.delta(ts1).ms_u64()); - let div = if bin_len == DtMs::from_ms_u64(1000 * 1) { - DtMs::from_ms_u64(1000 * 60 * 10) - } else if bin_len == DtMs::from_ms_u64(1000 * 10) { - DtMs::from_ms_u64(1000 * 60 * 60 * 2) - } else if bin_len == DtMs::from_ms_u64(1000 * 60 * 1) { - DtMs::from_ms_u64(1000 * 60 * 60 * 8) - } else if bin_len == DtMs::from_ms_u64(1000 * 60 * 10) { - DtMs::from_ms_u64(1000 * 60 * 60 * 24 * 4) - } else if bin_len == DtMs::from_ms_u64(1000 * 60 * 60 * 1) { - DtMs::from_ms_u64(1000 * 60 * 60 * 24 * 28) - } else { - return Err(Error::UnsupportedBinGrid(bin_len)); - }; - if div.ns() % bin_len.ns() != 0 { - panic!("divisor not a multiple {:?} {:?}", bin_len, div); - } - let msp = ts1.ms() / div.ms(); - let off = (ts1.ms() - div.ms() * msp) / bin_len.ms(); - let item = QueryItem::TimeBinSimpleF32V02(TimeBinSimpleF32V02 { - series: self.sid.clone(), - binlen: bin_len.ms() as i32, - msp: msp as i64, - off: off as i32, - cnt: cnt as i64, - min, - max, - avg, - dev: f32::NAN, - lst, - }); - match &self.rt { - RetentionTime::Short => { - iqdqs.st_rf3_qu.push_back(item); - } - RetentionTime::Medium => { - iqdqs.mt_rf3_qu.push_back(item); - } - RetentionTime::Long => { - iqdqs.lt_rf3_qu.push_back(item); - } - } - } - } - Ok(()) - } - - pub fn tick(&mut self, iqdqs: &mut InsertDeques) -> Result<(), Error> { - let out = self.binner.output(); - if out.len() != 0 { - self.handle_output_ready(out, iqdqs)?; - } else { - trace_tick_verbose!("tick NO BINS YET"); - } - Ok(()) - } -} diff --git a/serieswriter/src/lib.rs b/serieswriter/src/lib.rs index 41862e2..b9bccba 100644 --- a/serieswriter/src/lib.rs +++ b/serieswriter/src/lib.rs @@ -1,5 +1,4 @@ pub mod binwriter; -pub mod binwritergrid; pub mod changewriter; pub mod fixgridwriter; pub mod msptool; diff --git a/serieswriter/src/ratelimitwriter.rs b/serieswriter/src/ratelimitwriter.rs index c7669a9..ecd3fd2 100644 --- a/serieswriter/src/ratelimitwriter.rs +++ b/serieswriter/src/ratelimitwriter.rs @@ -11,13 +11,7 @@ use std::marker::PhantomData; use std::time::Duration; use std::time::Instant; -macro_rules! trace_rt_decision { - ($det:expr, $($arg:tt)*) => { - if $det { - trace!($($arg)*); - } - }; -} +macro_rules! trace_rt_decision { ($det:expr, $($arg:tt)*) => { if $det { trace!($($arg)*); } }; } autoerr::create_error_v1!( name(Error, "RateLimitWriter"),