From b163369b1c1455d7c6f2e7ce5417ff7d23ec7494 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Fri, 8 Nov 2024 12:09:30 +0100 Subject: [PATCH] WIP binners typechecks --- netfetch/src/ca/conn.rs | 3 +- serieswriter/src/binwriter.rs | 99 +++++++------------ serieswriter/src/binwritergrid.rs | 154 ++++++++++++++++++++++++++++++ serieswriter/src/lib.rs | 1 + serieswriter/src/writer.rs | 16 +--- 5 files changed, 197 insertions(+), 76 deletions(-) create mode 100644 serieswriter/src/binwritergrid.rs diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index 7b0189c..47eb476 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -1424,10 +1424,9 @@ impl CaConn { if let ChannelState::MakingSeriesWriter(st2) = &mut conf.state { let dt = stnow.duration_since(SystemTime::UNIX_EPOCH).unwrap(); let beg = TsNano::from_ns(SEC * dt.as_secs() + dt.subsec_nanos() as u64); - let a = &conf.conf.min_quiets(); let binwriter = BinWriter::new( beg, - RetentionTime::Short, + conf.conf.min_quiets(), st2.channel.cssid, writer.series(), st2.channel.scalar_type.clone(), diff --git a/serieswriter/src/binwriter.rs b/serieswriter/src/binwriter.rs index a34bb04..344b82d 100644 --- a/serieswriter/src/binwriter.rs +++ b/serieswriter/src/binwriter.rs @@ -1,23 +1,19 @@ +use crate::binwritergrid::BinWriterGrid; +use crate::rtwriter::MinQuiets; use err::thiserror; use err::ThisError; -use items_2::binning::container_bins::ContainerBins; use items_2::binning::container_events::ContainerEvents; -use items_2::binning::timeweight::timeweight_events::BinnedEventsTimeweight; use netpod::log::*; -use netpod::range::evrange::NanoRange; use netpod::ttl::RetentionTime; -use netpod::BinnedRange; use netpod::DtMs; use netpod::ScalarType; use netpod::Shape; -use netpod::TsMs; use netpod::TsNano; use scywr::insertqueues::InsertDeques; -use scywr::iteminsertqueue::QueryItem; -use scywr::iteminsertqueue::TimeBinSimpleF32V01; use series::ChannelStatusSeriesId; use series::SeriesId; use std::mem; +use std::time::Duration; macro_rules! trace_ingest { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) } macro_rules! trace_tick { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) } @@ -30,45 +26,60 @@ pub enum Error { SeriesWriter(#[from] crate::writer::Error), Binning(#[from] items_2::binning::timeweight::timeweight_events::Error), UnsupportedBinGrid(DtMs), + BinWriterGrid(#[from] crate::binwritergrid::Error), } #[derive(Debug)] pub struct BinWriter { - rt: RetentionTime, cssid: ChannelStatusSeriesId, sid: SeriesId, scalar_type: ScalarType, shape: Shape, evbuf: ContainerEvents, - binner: BinnedEventsTimeweight, + writers: Vec, } impl BinWriter { pub fn new( beg: TsNano, - rt: RetentionTime, - // channel_info_tx: Sender, + min_quiets: MinQuiets, cssid: ChannelStatusSeriesId, sid: SeriesId, scalar_type: ScalarType, shape: Shape, ) -> Result { - // TODO select the desired bin width based on channel configuration: - // that's user knowledge, it really depends on what users want. - // For the moment, assume a fixed value. - let margin = 1000 * 1000 * 1000 * 60 * 60 * 24 * 40; - let end = u64::MAX - margin; - let range = BinnedRange::from_nano_range(NanoRange::from_ns_u64(beg.ns(), end), DtMs::from_ms_u64(1000 * 10)); - let binner = BinnedEventsTimeweight::new(range).disable_cnt_zero(); + let mut writers = Vec::new(); + for (rt, dur) in [RetentionTime::Short, RetentionTime::Medium, RetentionTime::Long] + .into_iter() + .zip([min_quiets.st.clone(), min_quiets.mt.clone(), min_quiets.lt.clone()].into_iter()) + { + if dur > Duration::ZERO && dur < Duration::from_millis(1000 * 60 * 60 * 24) { + let bin_len = if dur < Duration::from_millis(1000 * 2) { + DtMs::from_ms_u64(1000 * 1) + } else if dur < Duration::from_millis(1000 * 20) { + DtMs::from_ms_u64(1000 * 10) + } else if dur < Duration::from_millis(1000 * 60 * 2) { + DtMs::from_ms_u64(1000 * 60 * 1) + } else if dur < Duration::from_millis(1000 * 60 * 20) { + DtMs::from_ms_u64(1000 * 60 * 10) + } else if dur < Duration::from_millis(1000 * 60 * 60 * 2) { + DtMs::from_ms_u64(1000 * 60 * 60 * 1) + } else { + DtMs::from_ms_u64(1000 * 60 * 60 * 1) + }; + let writer = BinWriterGrid::new(beg, rt, bin_len, cssid, sid, scalar_type.clone(), shape.clone())?; + writers.push(writer); + } + } let ret = Self { - rt, cssid, sid, scalar_type, shape, evbuf: ContainerEvents::new(), - binner, + writers, }; + let _ = ret.cssid; Ok(ret) } @@ -91,55 +102,19 @@ impl BinWriter { Ok(()) } - fn handle_output_ready(&mut self, out: ContainerBins, iqdqs: &mut InsertDeques) -> Result<(), Error> { - let selfname = "handle_output_ready"; - trace_tick!("{selfname} bins ready len {}", out.len()); - for e in out.iter_debug() { - trace_tick_verbose!("{e:?}"); - } - for ((((((&ts1, &ts2), &cnt), &min), &max), &avg), &fnl) in out.zip_iter() { - if fnl == false { - debug!("non final bin"); - } else if cnt == 0 { - } else { - let bin_len = DtMs::from_ms_u64(ts2.delta(ts1).ms_u64()); - let div = if bin_len == DtMs::from_ms_u64(1000 * 10) { - DtMs::from_ms_u64(1000 * 60 * 60 * 2) - } else { - // TODO - return Err(Error::UnsupportedBinGrid(bin_len)); - }; - let ts_msp = TsMs::from_ms_u64(ts1.ms() / div.ms() * div.ms()); - let off = (ts1.ms() - ts_msp.ms()) / bin_len.ms(); - let item = QueryItem::TimeBinSimpleF32V01(TimeBinSimpleF32V01 { - series: self.sid.clone(), - bin_len_ms: bin_len.ms() as i32, - ts_msp, - off: off as i32, - count: cnt as i64, - min, - max, - avg, - }); - iqdqs.lt_rf3_qu.push_back(item); - } - } - Ok(()) - } - pub fn tick(&mut self, iqdqs: &mut InsertDeques) -> Result<(), Error> { if self.evbuf.len() != 0 { trace_tick!("tick evbuf len {}", self.evbuf.len()); let buf = mem::replace(&mut self.evbuf, ContainerEvents::new()); - self.binner.ingest(buf)?; + // TODO bin the more fine grid from the coarse grid, do not clone events + for writer in self.writers.iter_mut() { + writer.ingest(buf.clone(), iqdqs)?; + } } else { trace_tick_verbose!("tick NOTHING TO INGEST"); } - let out = self.binner.output(); - if out.len() != 0 { - self.handle_output_ready(out, iqdqs)?; - } else { - trace_tick_verbose!("tick NO BINS YET"); + for writer in self.writers.iter_mut() { + writer.tick(iqdqs)?; } Ok(()) } diff --git a/serieswriter/src/binwritergrid.rs b/serieswriter/src/binwritergrid.rs new file mode 100644 index 0000000..6f7d9bf --- /dev/null +++ b/serieswriter/src/binwritergrid.rs @@ -0,0 +1,154 @@ +use err::thiserror; +use err::ThisError; +use items_2::binning::container_bins::ContainerBins; +use items_2::binning::container_events::ContainerEvents; +use items_2::binning::timeweight::timeweight_events::BinnedEventsTimeweight; +use netpod::log::*; +use netpod::range::evrange::NanoRange; +use netpod::ttl::RetentionTime; +use netpod::BinnedRange; +use netpod::DtMs; +use netpod::ScalarType; +use netpod::Shape; +use netpod::TsMs; +use netpod::TsNano; +use scywr::insertqueues::InsertDeques; +use scywr::iteminsertqueue::QueryItem; +use scywr::iteminsertqueue::TimeBinSimpleF32V01; +use series::ChannelStatusSeriesId; +use series::SeriesId; + +macro_rules! trace_ingest { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) } +macro_rules! trace_tick { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) } +macro_rules! trace_tick_verbose { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) } + +#[derive(Debug, ThisError)] +#[cstm(name = "SerieswriterBinwriterGrid")] +pub enum Error { + SeriesLookupError, + SeriesWriter(#[from] crate::writer::Error), + Binning(#[from] items_2::binning::timeweight::timeweight_events::Error), + UnsupportedBinGrid(DtMs), +} + +#[derive(Debug)] +pub struct BinWriterGrid { + rt: RetentionTime, + cssid: ChannelStatusSeriesId, + sid: SeriesId, + scalar_type: ScalarType, + shape: Shape, + binner: BinnedEventsTimeweight, +} + +impl BinWriterGrid { + pub fn new( + beg: TsNano, + rt: RetentionTime, + bin_len: DtMs, + cssid: ChannelStatusSeriesId, + sid: SeriesId, + scalar_type: ScalarType, + shape: Shape, + ) -> Result { + let margin = 1000 * 1000 * 1000 * 60 * 60 * 24 * 40; + let end = (u64::MAX - margin) / bin_len.ns() * bin_len.ns(); + let range = BinnedRange::from_nano_range(NanoRange::from_ns_u64(beg.ns(), end), bin_len); + let binner = BinnedEventsTimeweight::new(range).disable_cnt_zero(); + let ret = Self { + rt, + cssid, + sid, + scalar_type, + shape, + binner, + }; + let _ = &ret.rt; + let _ = &ret.cssid; + Ok(ret) + } + + pub fn sid(&self) -> SeriesId { + self.sid.clone() + } + + pub fn scalar_type(&self) -> ScalarType { + self.scalar_type.clone() + } + + pub fn shape(&self) -> Shape { + self.shape.clone() + } + + pub fn ingest(&mut self, evs: ContainerEvents, iqdqs: &mut InsertDeques) -> Result<(), Error> { + let _ = iqdqs; + trace_ingest!("{:?} {:?}", self, evs); + self.binner.ingest(evs)?; + Ok(()) + } + + fn handle_output_ready(&mut self, out: ContainerBins, iqdqs: &mut InsertDeques) -> Result<(), Error> { + let selfname = "handle_output_ready"; + trace_tick!("{selfname} bins ready len {}", out.len()); + for e in out.iter_debug() { + trace_tick_verbose!("{e:?}"); + } + for ((((((&ts1, &ts2), &cnt), &min), &max), &avg), &fnl) in out.zip_iter() { + if fnl == false { + info!("non final bin"); + } else if cnt == 0 { + info!("zero count bin"); + } else { + let bin_len = DtMs::from_ms_u64(ts2.delta(ts1).ms_u64()); + let div = if bin_len == DtMs::from_ms_u64(1000 * 1) { + DtMs::from_ms_u64(1000 * 60 * 10) + } else if bin_len == DtMs::from_ms_u64(1000 * 10) { + DtMs::from_ms_u64(1000 * 60 * 60 * 2) + } else if bin_len == DtMs::from_ms_u64(1000 * 60 * 1) { + DtMs::from_ms_u64(1000 * 60 * 60 * 8) + } else if bin_len == DtMs::from_ms_u64(1000 * 60 * 10) { + DtMs::from_ms_u64(1000 * 60 * 60 * 24 * 4) + } else if bin_len == DtMs::from_ms_u64(1000 * 60 * 60 * 1) { + DtMs::from_ms_u64(1000 * 60 * 60 * 24 * 28) + } else { + // TODO + return Err(Error::UnsupportedBinGrid(bin_len)); + }; + let ts_msp = TsMs::from_ms_u64(ts1.ms() / div.ms() * div.ms()); + let off = (ts1.ms() - ts_msp.ms()) / bin_len.ms(); + let item = QueryItem::TimeBinSimpleF32V01(TimeBinSimpleF32V01 { + series: self.sid.clone(), + bin_len_ms: bin_len.ms() as i32, + ts_msp, + off: off as i32, + count: cnt as i64, + min, + max, + avg, + }); + match &self.rt { + RetentionTime::Short => { + iqdqs.st_rf3_qu.push_back(item); + } + RetentionTime::Medium => { + iqdqs.mt_rf3_qu.push_back(item); + } + RetentionTime::Long => { + iqdqs.lt_rf3_qu.push_back(item); + } + } + } + } + Ok(()) + } + + pub fn tick(&mut self, iqdqs: &mut InsertDeques) -> Result<(), Error> { + let out = self.binner.output(); + if out.len() != 0 { + self.handle_output_ready(out, iqdqs)?; + } else { + trace_tick_verbose!("tick NO BINS YET"); + } + Ok(()) + } +} diff --git a/serieswriter/src/lib.rs b/serieswriter/src/lib.rs index bc5becd..9367713 100644 --- a/serieswriter/src/lib.rs +++ b/serieswriter/src/lib.rs @@ -1,4 +1,5 @@ pub mod binwriter; +pub mod binwritergrid; pub mod changewriter; pub mod fixgridwriter; pub mod msptool; diff --git a/serieswriter/src/writer.rs b/serieswriter/src/writer.rs index ab426cf..41f312a 100644 --- a/serieswriter/src/writer.rs +++ b/serieswriter/src/writer.rs @@ -1,24 +1,16 @@ +use core::fmt; use err::thiserror; use err::ThisError; use log::*; use netpod::TsNano; use scywr::iteminsertqueue::QueryItem; use series::SeriesId; +pub use smallvec::SmallVec; use std::collections::VecDeque; use std::marker::PhantomData; use std::time::Instant; -use core::fmt; -pub use smallvec::SmallVec; - -#[allow(unused)] -macro_rules! trace_emit { - ($det:expr, $($arg:tt)*) => { - if $det { - trace!($($arg)*); - } - }; -} +macro_rules! trace_emit { ($det:expr, $($arg:tt)*) => ( if $det { trace!($($arg)*); } ) } #[derive(Debug)] pub struct EmitRes { @@ -113,7 +105,7 @@ where Ok(res) } - pub fn tick(&mut self, deque: &mut VecDeque) -> Result<(), Error> { + pub fn tick(&mut self, _deque: &mut VecDeque) -> Result<(), Error> { Ok(()) } }