WIP binners typechecks

This commit is contained in:
Dominik Werder
2024-11-08 12:09:30 +01:00
parent 0ecd594419
commit b163369b1c
5 changed files with 197 additions and 76 deletions

View File

@@ -1424,10 +1424,9 @@ impl CaConn {
if let ChannelState::MakingSeriesWriter(st2) = &mut conf.state {
let dt = stnow.duration_since(SystemTime::UNIX_EPOCH).unwrap();
let beg = TsNano::from_ns(SEC * dt.as_secs() + dt.subsec_nanos() as u64);
let a = &conf.conf.min_quiets();
let binwriter = BinWriter::new(
beg,
RetentionTime::Short,
conf.conf.min_quiets(),
st2.channel.cssid,
writer.series(),
st2.channel.scalar_type.clone(),

View File

@@ -1,23 +1,19 @@
use crate::binwritergrid::BinWriterGrid;
use crate::rtwriter::MinQuiets;
use err::thiserror;
use err::ThisError;
use items_2::binning::container_bins::ContainerBins;
use items_2::binning::container_events::ContainerEvents;
use items_2::binning::timeweight::timeweight_events::BinnedEventsTimeweight;
use netpod::log::*;
use netpod::range::evrange::NanoRange;
use netpod::ttl::RetentionTime;
use netpod::BinnedRange;
use netpod::DtMs;
use netpod::ScalarType;
use netpod::Shape;
use netpod::TsMs;
use netpod::TsNano;
use scywr::insertqueues::InsertDeques;
use scywr::iteminsertqueue::QueryItem;
use scywr::iteminsertqueue::TimeBinSimpleF32V01;
use series::ChannelStatusSeriesId;
use series::SeriesId;
use std::mem;
use std::time::Duration;
macro_rules! trace_ingest { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) }
macro_rules! trace_tick { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) }
@@ -30,45 +26,60 @@ pub enum Error {
SeriesWriter(#[from] crate::writer::Error),
Binning(#[from] items_2::binning::timeweight::timeweight_events::Error),
UnsupportedBinGrid(DtMs),
BinWriterGrid(#[from] crate::binwritergrid::Error),
}
#[derive(Debug)]
pub struct BinWriter {
rt: RetentionTime,
cssid: ChannelStatusSeriesId,
sid: SeriesId,
scalar_type: ScalarType,
shape: Shape,
evbuf: ContainerEvents<f32>,
binner: BinnedEventsTimeweight<f32>,
writers: Vec<BinWriterGrid>,
}
impl BinWriter {
pub fn new(
beg: TsNano,
rt: RetentionTime,
// channel_info_tx: Sender<ChannelInfoQuery>,
min_quiets: MinQuiets,
cssid: ChannelStatusSeriesId,
sid: SeriesId,
scalar_type: ScalarType,
shape: Shape,
) -> Result<Self, Error> {
// TODO select the desired bin width based on channel configuration:
// that's user knowledge, it really depends on what users want.
// For the moment, assume a fixed value.
let margin = 1000 * 1000 * 1000 * 60 * 60 * 24 * 40;
let end = u64::MAX - margin;
let range = BinnedRange::from_nano_range(NanoRange::from_ns_u64(beg.ns(), end), DtMs::from_ms_u64(1000 * 10));
let binner = BinnedEventsTimeweight::new(range).disable_cnt_zero();
let mut writers = Vec::new();
for (rt, dur) in [RetentionTime::Short, RetentionTime::Medium, RetentionTime::Long]
.into_iter()
.zip([min_quiets.st.clone(), min_quiets.mt.clone(), min_quiets.lt.clone()].into_iter())
{
if dur > Duration::ZERO && dur < Duration::from_millis(1000 * 60 * 60 * 24) {
let bin_len = if dur < Duration::from_millis(1000 * 2) {
DtMs::from_ms_u64(1000 * 1)
} else if dur < Duration::from_millis(1000 * 20) {
DtMs::from_ms_u64(1000 * 10)
} else if dur < Duration::from_millis(1000 * 60 * 2) {
DtMs::from_ms_u64(1000 * 60 * 1)
} else if dur < Duration::from_millis(1000 * 60 * 20) {
DtMs::from_ms_u64(1000 * 60 * 10)
} else if dur < Duration::from_millis(1000 * 60 * 60 * 2) {
DtMs::from_ms_u64(1000 * 60 * 60 * 1)
} else {
DtMs::from_ms_u64(1000 * 60 * 60 * 1)
};
let writer = BinWriterGrid::new(beg, rt, bin_len, cssid, sid, scalar_type.clone(), shape.clone())?;
writers.push(writer);
}
}
let ret = Self {
rt,
cssid,
sid,
scalar_type,
shape,
evbuf: ContainerEvents::new(),
binner,
writers,
};
let _ = ret.cssid;
Ok(ret)
}
@@ -91,55 +102,19 @@ impl BinWriter {
Ok(())
}
fn handle_output_ready(&mut self, out: ContainerBins<f32>, iqdqs: &mut InsertDeques) -> Result<(), Error> {
let selfname = "handle_output_ready";
trace_tick!("{selfname} bins ready len {}", out.len());
for e in out.iter_debug() {
trace_tick_verbose!("{e:?}");
}
for ((((((&ts1, &ts2), &cnt), &min), &max), &avg), &fnl) in out.zip_iter() {
if fnl == false {
debug!("non final bin");
} else if cnt == 0 {
} else {
let bin_len = DtMs::from_ms_u64(ts2.delta(ts1).ms_u64());
let div = if bin_len == DtMs::from_ms_u64(1000 * 10) {
DtMs::from_ms_u64(1000 * 60 * 60 * 2)
} else {
// TODO
return Err(Error::UnsupportedBinGrid(bin_len));
};
let ts_msp = TsMs::from_ms_u64(ts1.ms() / div.ms() * div.ms());
let off = (ts1.ms() - ts_msp.ms()) / bin_len.ms();
let item = QueryItem::TimeBinSimpleF32V01(TimeBinSimpleF32V01 {
series: self.sid.clone(),
bin_len_ms: bin_len.ms() as i32,
ts_msp,
off: off as i32,
count: cnt as i64,
min,
max,
avg,
});
iqdqs.lt_rf3_qu.push_back(item);
}
}
Ok(())
}
pub fn tick(&mut self, iqdqs: &mut InsertDeques) -> Result<(), Error> {
if self.evbuf.len() != 0 {
trace_tick!("tick evbuf len {}", self.evbuf.len());
let buf = mem::replace(&mut self.evbuf, ContainerEvents::new());
self.binner.ingest(buf)?;
// TODO bin the more fine grid from the coarse grid, do not clone events
for writer in self.writers.iter_mut() {
writer.ingest(buf.clone(), iqdqs)?;
}
} else {
trace_tick_verbose!("tick NOTHING TO INGEST");
}
let out = self.binner.output();
if out.len() != 0 {
self.handle_output_ready(out, iqdqs)?;
} else {
trace_tick_verbose!("tick NO BINS YET");
for writer in self.writers.iter_mut() {
writer.tick(iqdqs)?;
}
Ok(())
}

View File

@@ -0,0 +1,154 @@
use err::thiserror;
use err::ThisError;
use items_2::binning::container_bins::ContainerBins;
use items_2::binning::container_events::ContainerEvents;
use items_2::binning::timeweight::timeweight_events::BinnedEventsTimeweight;
use netpod::log::*;
use netpod::range::evrange::NanoRange;
use netpod::ttl::RetentionTime;
use netpod::BinnedRange;
use netpod::DtMs;
use netpod::ScalarType;
use netpod::Shape;
use netpod::TsMs;
use netpod::TsNano;
use scywr::insertqueues::InsertDeques;
use scywr::iteminsertqueue::QueryItem;
use scywr::iteminsertqueue::TimeBinSimpleF32V01;
use series::ChannelStatusSeriesId;
use series::SeriesId;
macro_rules! trace_ingest { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) }
macro_rules! trace_tick { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) }
macro_rules! trace_tick_verbose { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) }
#[derive(Debug, ThisError)]
#[cstm(name = "SerieswriterBinwriterGrid")]
pub enum Error {
SeriesLookupError,
SeriesWriter(#[from] crate::writer::Error),
Binning(#[from] items_2::binning::timeweight::timeweight_events::Error),
UnsupportedBinGrid(DtMs),
}
#[derive(Debug)]
pub struct BinWriterGrid {
rt: RetentionTime,
cssid: ChannelStatusSeriesId,
sid: SeriesId,
scalar_type: ScalarType,
shape: Shape,
binner: BinnedEventsTimeweight<f32>,
}
impl BinWriterGrid {
pub fn new(
beg: TsNano,
rt: RetentionTime,
bin_len: DtMs,
cssid: ChannelStatusSeriesId,
sid: SeriesId,
scalar_type: ScalarType,
shape: Shape,
) -> Result<Self, Error> {
let margin = 1000 * 1000 * 1000 * 60 * 60 * 24 * 40;
let end = (u64::MAX - margin) / bin_len.ns() * bin_len.ns();
let range = BinnedRange::from_nano_range(NanoRange::from_ns_u64(beg.ns(), end), bin_len);
let binner = BinnedEventsTimeweight::new(range).disable_cnt_zero();
let ret = Self {
rt,
cssid,
sid,
scalar_type,
shape,
binner,
};
let _ = &ret.rt;
let _ = &ret.cssid;
Ok(ret)
}
pub fn sid(&self) -> SeriesId {
self.sid.clone()
}
pub fn scalar_type(&self) -> ScalarType {
self.scalar_type.clone()
}
pub fn shape(&self) -> Shape {
self.shape.clone()
}
pub fn ingest(&mut self, evs: ContainerEvents<f32>, iqdqs: &mut InsertDeques) -> Result<(), Error> {
let _ = iqdqs;
trace_ingest!("{:?} {:?}", self, evs);
self.binner.ingest(evs)?;
Ok(())
}
fn handle_output_ready(&mut self, out: ContainerBins<f32>, iqdqs: &mut InsertDeques) -> Result<(), Error> {
let selfname = "handle_output_ready";
trace_tick!("{selfname} bins ready len {}", out.len());
for e in out.iter_debug() {
trace_tick_verbose!("{e:?}");
}
for ((((((&ts1, &ts2), &cnt), &min), &max), &avg), &fnl) in out.zip_iter() {
if fnl == false {
info!("non final bin");
} else if cnt == 0 {
info!("zero count bin");
} else {
let bin_len = DtMs::from_ms_u64(ts2.delta(ts1).ms_u64());
let div = if bin_len == DtMs::from_ms_u64(1000 * 1) {
DtMs::from_ms_u64(1000 * 60 * 10)
} else if bin_len == DtMs::from_ms_u64(1000 * 10) {
DtMs::from_ms_u64(1000 * 60 * 60 * 2)
} else if bin_len == DtMs::from_ms_u64(1000 * 60 * 1) {
DtMs::from_ms_u64(1000 * 60 * 60 * 8)
} else if bin_len == DtMs::from_ms_u64(1000 * 60 * 10) {
DtMs::from_ms_u64(1000 * 60 * 60 * 24 * 4)
} else if bin_len == DtMs::from_ms_u64(1000 * 60 * 60 * 1) {
DtMs::from_ms_u64(1000 * 60 * 60 * 24 * 28)
} else {
// TODO
return Err(Error::UnsupportedBinGrid(bin_len));
};
let ts_msp = TsMs::from_ms_u64(ts1.ms() / div.ms() * div.ms());
let off = (ts1.ms() - ts_msp.ms()) / bin_len.ms();
let item = QueryItem::TimeBinSimpleF32V01(TimeBinSimpleF32V01 {
series: self.sid.clone(),
bin_len_ms: bin_len.ms() as i32,
ts_msp,
off: off as i32,
count: cnt as i64,
min,
max,
avg,
});
match &self.rt {
RetentionTime::Short => {
iqdqs.st_rf3_qu.push_back(item);
}
RetentionTime::Medium => {
iqdqs.mt_rf3_qu.push_back(item);
}
RetentionTime::Long => {
iqdqs.lt_rf3_qu.push_back(item);
}
}
}
}
Ok(())
}
pub fn tick(&mut self, iqdqs: &mut InsertDeques) -> Result<(), Error> {
let out = self.binner.output();
if out.len() != 0 {
self.handle_output_ready(out, iqdqs)?;
} else {
trace_tick_verbose!("tick NO BINS YET");
}
Ok(())
}
}

View File

@@ -1,4 +1,5 @@
pub mod binwriter;
pub mod binwritergrid;
pub mod changewriter;
pub mod fixgridwriter;
pub mod msptool;

View File

@@ -1,24 +1,16 @@
use core::fmt;
use err::thiserror;
use err::ThisError;
use log::*;
use netpod::TsNano;
use scywr::iteminsertqueue::QueryItem;
use series::SeriesId;
pub use smallvec::SmallVec;
use std::collections::VecDeque;
use std::marker::PhantomData;
use std::time::Instant;
use core::fmt;
pub use smallvec::SmallVec;
#[allow(unused)]
macro_rules! trace_emit {
($det:expr, $($arg:tt)*) => {
if $det {
trace!($($arg)*);
}
};
}
macro_rules! trace_emit { ($det:expr, $($arg:tt)*) => ( if $det { trace!($($arg)*); } ) }
#[derive(Debug)]
pub struct EmitRes {
@@ -113,7 +105,7 @@ where
Ok(res)
}
pub fn tick(&mut self, deque: &mut VecDeque<QueryItem>) -> Result<(), Error> {
pub fn tick(&mut self, _deque: &mut VecDeque<QueryItem>) -> Result<(), Error> {
Ok(())
}
}