From baf2c7f2d165864f07536d3760e1389fd7564bd3 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Thu, 14 Nov 2024 16:01:57 +0100 Subject: [PATCH] Fixes and refactoring --- Cargo.toml | 6 +- src/binning.rs | 2 - src/binning/binnedvaluetype.rs | 2 +- src/binning/container_bins.rs | 42 +-- src/binning/container_events.rs | 204 +++++++++++--- src/binning/test.rs | 3 +- src/binning/test/events00.rs | 85 ++++-- src/binning/timeweight.rs | 1 - src/binning/timeweight/timeweight_bins.rs | 1 - src/binning/timeweight/timeweight_events.rs | 260 +++++++++++------- .../timeweight/timeweight_events_dyn.rs | 69 +++-- src/binning/valuetype.rs | 32 +++ src/channelevents.rs | 2 - src/eventsdim0.rs | 54 ++-- src/eventsdim0enum.rs | 43 +-- src/eventsdim1.rs | 4 +- src/framable.rs | 5 +- src/items_2.rs | 182 ------------ src/lib.rs | 91 ++++++ src/merger.rs | 59 ++-- src/offsets.rs | 35 +++ src/streams.rs | 89 +----- src/transform.rs | 84 ------ 23 files changed, 696 insertions(+), 659 deletions(-) delete mode 100644 src/items_2.rs create mode 100644 src/lib.rs create mode 100644 src/offsets.rs delete mode 100644 src/transform.rs diff --git a/Cargo.toml b/Cargo.toml index 592053a..ab9df3c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,13 +1,9 @@ [package] name = "daqbuf-items-2" -version = "0.0.3" +version = "0.0.4" authors = ["Dominik Werder "] edition = "2021" -[lib] -path = "src/items_2.rs" -doctest = false - [dependencies] serde = { version = "1", features = ["derive"] } serde_json = "1" diff --git a/src/binning.rs b/src/binning.rs index f480ad1..f3622f1 100644 --- a/src/binning.rs +++ b/src/binning.rs @@ -7,5 +7,3 @@ pub mod valuetype; #[cfg(test)] mod test; - -use super::binning as ___; diff --git a/src/binning/binnedvaluetype.rs b/src/binning/binnedvaluetype.rs index 306fed4..13d893e 100644 --- a/src/binning/binnedvaluetype.rs +++ b/src/binning/binnedvaluetype.rs @@ -2,7 +2,7 @@ pub trait BinnedValueType {} pub struct BinnedNumericValue { avg: f32, - _t: Option, + t: Option, } impl BinnedValueType for BinnedNumericValue {} diff --git a/src/binning/container_bins.rs b/src/binning/container_bins.rs index 3e41cda..06c3654 100644 --- a/src/binning/container_bins.rs +++ b/src/binning/container_bins.rs @@ -1,9 +1,6 @@ -use super::aggregator::AggregatorNumeric; -use super::aggregator::AggregatorTimeWeight; use super::container_events::EventValueType; -use super::___; -use crate::ts_offs_from_abs; -use crate::ts_offs_from_abs_with_anchor; +use crate::offsets::ts_offs_from_abs; +use crate::offsets::ts_offs_from_abs_with_anchor; use core::fmt; use daqbuf_err as err; use err::thiserror; @@ -19,7 +16,6 @@ use items_0::AsAnyRef; use items_0::TypeName; use items_0::WithLen; use netpod::log::*; -use netpod::EnumVariant; use netpod::TsNano; use serde::Deserialize; use serde::Serialize; @@ -285,21 +281,6 @@ where pp } - pub fn pop_front(&mut self) -> Option> { - todo!("pop_front"); - let ts1 = if let Some(x) = self.ts1s.pop_front() { - x - } else { - return None; - }; - let ts2 = if let Some(x) = self.ts2s.pop_front() { - x - } else { - return None; - }; - todo!() - } - pub fn push_back( &mut self, ts1: TsNano, @@ -562,8 +543,8 @@ where fn result( &mut self, - range: Option, - binrange: Option, + _range: Option, + _binrange: Option, ) -> Result, err::Error> { // TODO do we need to set timeout, continueAt or anything? let bins = mem::replace(&mut self.bins, ContainerBins::new()); @@ -633,7 +614,7 @@ where } fn fix_numerics(&mut self) { - for ((min, max), avg) in self + for ((_min, _max), _avg) in self .mins .iter_mut() .zip(self.maxs.iter_mut()) @@ -675,17 +656,4 @@ where pub fn len(&self) -> usize { self.len } - - pub fn pop_front(&mut self) -> Option> { - if self.len != 0 { - if let Some(ev) = self.evs.pop_front() { - self.len -= 1; - Some(ev) - } else { - None - } - } else { - None - } - } } diff --git a/src/binning/container_events.rs b/src/binning/container_events.rs index 902a3e5..a37f3cf 100644 --- a/src/binning/container_events.rs +++ b/src/binning/container_events.rs @@ -24,25 +24,31 @@ macro_rules! trace_init { ($($arg:tt)*) => ( if true { trace!($($arg)*); }) } #[cstm(name = "ValueContainerError")] pub enum ValueContainerError {} -pub trait Container: fmt::Debug + Send + Clone + PreviewRange + Serialize + for<'a> Deserialize<'a> { +pub trait Container: + fmt::Debug + Send + Clone + PreviewRange + Serialize + for<'a> Deserialize<'a> +where + EVT: EventValueType, +{ fn new() -> Self; - // fn verify(&self) -> Result<(), ValueContainerError>; fn push_back(&mut self, val: EVT); fn pop_front(&mut self) -> Option; + fn get_iter_ty_1(&self, pos: usize) -> Option>; +} + +pub trait PartialOrdEvtA { + fn cmp_a(&self, other: &EVT) -> Option; } pub trait EventValueType: fmt::Debug + Clone + PartialOrd + Send + 'static + Serialize { type Container: Container; type AggregatorTimeWeight: AggregatorTimeWeight; type AggTimeWeightOutputAvg: AggTimeWeightOutputAvg; - - // fn identity_sum() -> Self; - // fn add_weighted(&self, add: &Self, f: f32) -> Self; + type IterTy1<'a>: fmt::Debug + Clone + PartialOrdEvtA + Into; } impl Container for VecDeque where - EVT: EventValueType + Serialize + for<'a> Deserialize<'a>, + EVT: for<'a> EventValueType = EVT> + Serialize + for<'a> Deserialize<'a>, { fn new() -> Self { VecDeque::new() @@ -55,6 +61,28 @@ where fn pop_front(&mut self) -> Option { self.pop_front() } + + fn get_iter_ty_1(&self, pos: usize) -> Option> { + self.get(pos).map(|x| x.clone()) + } +} + +impl Container for VecDeque { + fn new() -> Self { + VecDeque::new() + } + + fn push_back(&mut self, val: String) { + self.push_back(val); + } + + fn pop_front(&mut self) -> Option { + self.pop_front() + } + + fn get_iter_ty_1(&self, pos: usize) -> Option<&str> { + todo!() + } } macro_rules! impl_event_value_type { @@ -63,6 +91,13 @@ macro_rules! impl_event_value_type { type Container = VecDeque; type AggregatorTimeWeight = AggregatorNumeric; type AggTimeWeightOutputAvg = f64; + type IterTy1<'a> = $evt; + } + + impl PartialOrdEvtA<$evt> for $evt { + fn cmp_a(&self, other: &$evt) -> Option { + self.partial_cmp(other) + } } }; } @@ -78,28 +113,74 @@ impl_event_value_type!(i64); // impl_event_value_type!(f32); // impl_event_value_type!(f64); +impl PartialOrdEvtA for f32 { + fn cmp_a(&self, other: &f32) -> Option { + self.partial_cmp(other) + } +} + +impl PartialOrdEvtA for f64 { + fn cmp_a(&self, other: &f64) -> Option { + self.partial_cmp(other) + } +} + +impl PartialOrdEvtA for bool { + fn cmp_a(&self, other: &bool) -> Option { + self.partial_cmp(other) + } +} + +impl PartialOrdEvtA for &str { + fn cmp_a(&self, other: &String) -> Option { + (*self).partial_cmp(other.as_str()) + } +} + impl EventValueType for f32 { type Container = VecDeque; type AggregatorTimeWeight = AggregatorNumeric; type AggTimeWeightOutputAvg = f32; + type IterTy1<'a> = f32; } impl EventValueType for f64 { type Container = VecDeque; type AggregatorTimeWeight = AggregatorNumeric; type AggTimeWeightOutputAvg = f64; + type IterTy1<'a> = f64; } impl EventValueType for bool { type Container = VecDeque; type AggregatorTimeWeight = AggregatorNumeric; type AggTimeWeightOutputAvg = f64; + type IterTy1<'a> = bool; } impl EventValueType for String { type Container = VecDeque; type AggregatorTimeWeight = AggregatorNumeric; type AggTimeWeightOutputAvg = f64; + type IterTy1<'a> = &'a str; +} + +#[derive(Debug, Clone)] +pub struct EventSingleRef<'a, EVT> +where + EVT: EventValueType, +{ + pub ts: TsNano, + pub val: EVT::IterTy1<'a>, +} + +impl<'a, EVT> EventSingleRef<'a, EVT> +where + EVT: EventValueType, +{ + pub fn to_owned(&self) { + todo!() + } } #[derive(Debug, Clone)] @@ -108,6 +189,30 @@ pub struct EventSingle { pub val: EVT, } +impl<'a, EVT> From> for EventSingle +where + EVT: EventValueType, +{ + fn from(value: EventSingleRef<'a, EVT>) -> Self { + Self { + ts: value.ts, + val: value.val.into(), + } + } +} + +impl<'a, EVT> From<&EventSingleRef<'a, EVT>> for EventSingle +where + EVT: EventValueType, +{ + fn from(value: &EventSingleRef<'a, EVT>) -> Self { + Self { + ts: value.ts, + val: value.val.clone().into(), + } + } +} + #[derive(Debug, ThisError)] #[cstm(name = "EventsContainerError")] pub enum EventsContainerError { @@ -127,7 +232,10 @@ impl ContainerEvents where EVT: EventValueType, { - pub fn from_constituents(tss: VecDeque, vals: ::Container) -> Self { + pub fn from_constituents( + tss: VecDeque, + vals: ::Container, + ) -> Self { Self { tss, vals } } @@ -147,7 +255,12 @@ where } pub fn verify(&self) -> Result<(), EventsContainerError> { - if self.tss.iter().zip(self.tss.iter().skip(1)).any(|(&a, &b)| a > b) { + if self + .tss + .iter() + .zip(self.tss.iter().skip(1)) + .any(|(&a, &b)| a > b) + { return Err(EventsContainerError::Unordered); } Ok(()) @@ -161,18 +274,15 @@ where self.tss.back().map(|&x| x) } - pub fn len_before(&self, end: TsNano) -> usize { - let pp = self.tss.partition_point(|&x| x < end); - assert!(pp <= self.len(), "len_before pp {} len {}", pp, self.len()); + fn _len_before(&self, end: TsNano) -> usize { + let tss = &self.tss; + let pp = tss.partition_point(|&x| x < end); + assert!(pp <= tss.len(), "len_before pp {} len {}", pp, tss.len()); pp } - pub fn pop_front(&mut self) -> Option> { - if let (Some(ts), Some(val)) = (self.tss.pop_front(), self.vals.pop_front()) { - Some(EventSingle { ts, val }) - } else { - None - } + fn _pop_front(&mut self) -> Option> { + todo!() } pub fn push_back(&mut self, ts: TsNano, val: EVT) { @@ -210,40 +320,58 @@ pub struct ContainerEventsTakeUpTo<'a, EVT> where EVT: EventValueType, { - evs: &'a mut ContainerEvents, - len: usize, + evs: &'a ContainerEvents, + beg: usize, + end: usize, + pos: usize, } impl<'a, EVT> ContainerEventsTakeUpTo<'a, EVT> where EVT: EventValueType, { - pub fn new(evs: &'a mut ContainerEvents, len: usize) -> Self { - let len = len.min(evs.len()); - Self { evs, len } - } -} - -impl<'a, EVT> ContainerEventsTakeUpTo<'a, EVT> -where - EVT: EventValueType, -{ - pub fn ts_first(&self) -> Option { - self.evs.ts_first() + pub fn new(evs: &'a ContainerEvents) -> Self { + Self { + evs, + beg: 0, + end: evs.len(), + pos: 0, + } } - pub fn ts_last(&self) -> Option { - self.evs.ts_last() + pub fn constrain_up_to_ts(&mut self, end: TsNano) { + let tss = &self.evs.tss; + let pp = tss.partition_point(|&x| x < end); + let pp = pp.max(self.pos); + assert!(pp <= tss.len(), "len_before pp {} len {}", pp, tss.len()); + assert!(pp >= self.pos); + self.end = pp; + } + + pub fn extend_to_all(&mut self) { + self.end = self.evs.len(); } pub fn len(&self) -> usize { - self.len + self.end - self.pos } - pub fn pop_front(&mut self) -> Option> { - if self.len != 0 { - if let Some(ev) = self.evs.pop_front() { - self.len -= 1; + pub fn pos(&self) -> usize { + self.pos + } + + pub fn ts_first(&self) -> Option { + self.evs.tss.get(self.pos).cloned() + } + + pub fn next(&mut self) -> Option> { + let evs = &self.evs; + if self.pos < self.end { + if let (Some(&ts), Some(val)) = + (evs.tss.get(self.pos), evs.vals.get_iter_ty_1(self.pos)) + { + self.pos += 1; + let ev = EventSingleRef { ts, val }; Some(ev) } else { None diff --git a/src/binning/test.rs b/src/binning/test.rs index f0daf3e..8b24a21 100644 --- a/src/binning/test.rs +++ b/src/binning/test.rs @@ -1,7 +1,6 @@ mod events00; + use super::container_events::ContainerEvents; -use super::___; -use netpod::log::*; use std::any; #[test] diff --git a/src/binning/test/events00.rs b/src/binning/test/events00.rs index 80096ca..bbd28f0 100644 --- a/src/binning/test/events00.rs +++ b/src/binning/test/events00.rs @@ -41,7 +41,9 @@ trait IntoVecDequeU64 { impl IntoVecDequeU64 for &str { fn into_vec_deque_u64(self) -> VecDeque { - self.split_ascii_whitespace().map(|x| x.parse().unwrap()).collect() + self.split_ascii_whitespace() + .map(|x| x.parse().unwrap()) + .collect() } } trait IntoVecDequeF32 { @@ -50,7 +52,9 @@ trait IntoVecDequeF32 { impl IntoVecDequeF32 for &str { fn into_vec_deque_f32(self) -> VecDeque { - self.split_ascii_whitespace().map(|x| x.parse().unwrap()).collect() + self.split_ascii_whitespace() + .map(|x| x.parse().unwrap()) + .collect() } } @@ -70,7 +74,10 @@ fn exp_u64<'a>( } if let (Some(&val), Some(&exp)) = (a, b) { if val != exp { - return Err(Error::AssertMsg(format!("{tag} val {} exp {} i {}", val, exp, i))); + return Err(Error::AssertMsg(format!( + "{tag} val {} exp {} i {}", + val, exp, i + ))); } } else { return Err(Error::AssertMsg(format!("{tag} len mismatch"))); @@ -96,7 +103,10 @@ fn exp_f32<'a>( } if let (Some(&val), Some(&exp)) = (a, b) { if netpod::f32_close(val, exp) == false { - return Err(Error::AssertMsg(format!("{tag} val {} exp {} i {}", val, exp, i))); + return Err(Error::AssertMsg(format!( + "{tag} val {} exp {} i {}", + val, exp, i + ))); } } else { return Err(Error::AssertMsg(format!("{tag} len mismatch"))); @@ -108,17 +118,29 @@ fn exp_f32<'a>( #[cfg(test)] fn exp_cnts(bins: &ContainerBins, exps: impl IntoVecDequeU64) -> Result<(), Error> { - exp_u64(bins.cnts_iter(), exps.into_vec_deque_u64().iter(), "exp_cnts") + exp_u64( + bins.cnts_iter(), + exps.into_vec_deque_u64().iter(), + "exp_cnts", + ) } #[cfg(test)] fn exp_mins(bins: &ContainerBins, exps: impl IntoVecDequeF32) -> Result<(), Error> { - exp_f32(bins.mins_iter(), exps.into_vec_deque_f32().iter(), "exp_mins") + exp_f32( + bins.mins_iter(), + exps.into_vec_deque_f32().iter(), + "exp_mins", + ) } #[cfg(test)] fn exp_maxs(bins: &ContainerBins, exps: impl IntoVecDequeF32) -> Result<(), Error> { - exp_f32(bins.maxs_iter(), exps.into_vec_deque_f32().iter(), "exp_maxs") + exp_f32( + bins.maxs_iter(), + exps.into_vec_deque_f32().iter(), + "exp_maxs", + ) } fn exp_avgs(bins: &ContainerBins, exps: impl IntoVecDequeF32) -> Result<(), Error> { @@ -135,7 +157,10 @@ fn exp_avgs(bins: &ContainerBins, exps: impl IntoVecDequeF32) -> Result<(), if let (Some(a), Some(&exp)) = (a, b) { let val = *a.avg as f32; if netpod::f32_close(val, exp) == false { - return Err(Error::AssertMsg(format!("exp_avgs val {} exp {} i {}", val, exp, i))); + return Err(Error::AssertMsg(format!( + "exp_avgs val {} exp {} i {}", + val, exp, i + ))); } } else { return Err(Error::AssertMsg(format!( @@ -161,7 +186,7 @@ fn test_bin_events_f32_simple_with_before_00() -> Result<(), Error> { let mut binner = BinnedEventsTimeweight::new(range); let mut evs = ContainerEvents::::new(); evs.push_back(TsNano::from_ms(103), 2.0); - binner.ingest(evs)?; + binner.ingest(&evs)?; binner.input_done_range_final()?; let bins = binner.output(); exp_cnts(&bins, "0")?; @@ -186,7 +211,7 @@ fn test_bin_events_f32_simple_with_before_01_range_final() -> Result<(), Error> let mut evs = ContainerEvents::::new(); let em = &mut evs; pu(em, 103, 2.0); - binner.ingest(evs)?; + binner.ingest(&evs)?; binner.input_done_range_final()?; let bins = binner.output(); exp_cnts(&bins, "0 0")?; @@ -212,13 +237,13 @@ fn test_bin_events_f32_simple_00() -> Result<(), Error> { let em = &mut evs; pu(em, 100, 2.0); pu(em, 104, 2.4); - binner.ingest(evs)?; + binner.ingest(&evs)?; let mut evs = ContainerEvents::::new(); let em = &mut evs; pu(em, 111, 1.0); pu(em, 112, 1.2); pu(em, 113, 1.4); - binner.ingest(evs)?; + binner.ingest(&evs)?; binner.input_done_range_open()?; let bins = binner.output(); for b in bins.iter_debug() { @@ -247,13 +272,13 @@ fn test_bin_events_f32_simple_01() -> Result<(), Error> { let em = &mut evs; pu(em, 102, 2.0); pu(em, 104, 2.4); - binner.ingest(evs)?; + binner.ingest(&evs)?; let mut evs = ContainerEvents::::new(); let em = &mut evs; pu(em, 111, 1.0); pu(em, 112, 1.2); pu(em, 113, 1.4); - binner.ingest(evs)?; + binner.ingest(&evs)?; binner.input_done_range_open()?; let bins = binner.output(); for b in bins.iter_debug() { @@ -283,13 +308,13 @@ fn test_bin_events_f32_small_range_final() -> Result<(), Error> { let em = &mut evs; pu(em, 102, 2.0); pu(em, 104, 2.4); - binner.ingest(evs)?; + binner.ingest(&evs)?; let mut evs = ContainerEvents::::new(); let em = &mut evs; pu(em, 111, 1.0); pu(em, 112, 1.2); pu(em, 113, 1.4); - binner.ingest(evs)?; + binner.ingest(&evs)?; binner.input_done_range_final()?; let bins = binner.output(); for b in bins.iter_debug() { @@ -319,12 +344,12 @@ fn test_bin_events_f32_small_intermittent_silence_range_open() -> Result<(), Err let em = &mut evs; pu(em, 102, 2.0); pu(em, 104, 2.4); - binner.ingest(evs)?; + binner.ingest(&evs)?; let mut evs = ContainerEvents::::new(); let em = &mut evs; pu(em, 111, 1.0); pu(em, 112, 1.2); - binner.ingest(evs)?; + binner.ingest(&evs)?; // TODO take bins already here and assert. // TODO combine all bins together for combined assert. let mut evs = ContainerEvents::::new(); @@ -332,7 +357,7 @@ fn test_bin_events_f32_small_intermittent_silence_range_open() -> Result<(), Err pu(em, 113, 1.4); pu(em, 146, 1.3); pu(em, 148, 1.2); - binner.ingest(evs)?; + binner.ingest(&evs)?; binner.input_done_range_open()?; let bins = binner.output(); for b in bins.iter_debug() { @@ -352,22 +377,23 @@ fn test_bin_events_f32_small_intermittent_silence_range_open() -> Result<(), Err fn test_bin_events_f32_small_intermittent_silence_range_final() -> Result<(), Error> { let beg = TsNano::from_ms(100); let end = TsNano::from_ms(150); + let bin_len = DtMs::from_ms_u64(10); let nano_range = NanoRange { beg: beg.ns(), end: end.ns(), }; - let range = BinnedRange::from_nano_range(nano_range, DtMs::from_ms_u64(10)); + let range = BinnedRange::from_nano_range(nano_range, bin_len); let mut binner = BinnedEventsTimeweight::new(range); let mut evs = ContainerEvents::::new(); let em = &mut evs; pu(em, 102, 2.0); pu(em, 104, 2.4); - binner.ingest(evs)?; + binner.ingest(&evs)?; let mut evs = ContainerEvents::::new(); let em = &mut evs; pu(em, 111, 1.0); pu(em, 112, 1.2); - binner.ingest(evs)?; + binner.ingest(&evs)?; // TODO take bins already here and assert. // TODO combine all bins together for combined assert. let mut evs = ContainerEvents::::new(); @@ -375,7 +401,7 @@ fn test_bin_events_f32_small_intermittent_silence_range_final() -> Result<(), Er pu(em, 113, 1.4); pu(em, 146, 1.3); pu(em, 148, 1.2); - binner.ingest(evs)?; + binner.ingest(&evs)?; binner.input_done_range_final()?; let bins = binner.output(); for b in bins.iter_debug() { @@ -391,7 +417,8 @@ fn test_bin_events_f32_small_intermittent_silence_range_final() -> Result<(), Er } #[test] -fn test_bin_events_f32_small_intermittent_silence_minmax_no_edge_range_final() -> Result<(), Error> { +fn test_bin_events_f32_small_intermittent_silence_minmax_no_edge_range_final() -> Result<(), Error> +{ let beg = TsNano::from_ms(110); let end = TsNano::from_ms(120); let nano_range = NanoRange { @@ -403,7 +430,7 @@ fn test_bin_events_f32_small_intermittent_silence_minmax_no_edge_range_final() - let mut evs = ContainerEvents::::new(); let em = &mut evs; pu(em, 109, 50.); - binner.ingest(evs)?; + binner.ingest(&evs)?; let mut evs = ContainerEvents::::new(); let em = &mut evs; pu(em, 111, 40.); @@ -415,7 +442,7 @@ fn test_bin_events_f32_small_intermittent_silence_minmax_no_edge_range_final() - // pu(em, 120, 1.4); // pu(em, 146, 1.3); // pu(em, 148, 1.2); - binner.ingest(evs)?; + binner.ingest(&evs)?; binner.input_done_range_final()?; let bins = binner.output(); for b in bins.iter_debug() { @@ -442,7 +469,7 @@ fn test_bin_events_f32_small_intermittent_silence_minmax_edge_range_final() -> R let mut evs = ContainerEvents::::new(); let em = &mut evs; pu(em, 109, 50.); - binner.ingest(evs)?; + binner.ingest(&evs)?; let mut evs = ContainerEvents::::new(); let em = &mut evs; pu(em, 110, 40.); @@ -454,7 +481,7 @@ fn test_bin_events_f32_small_intermittent_silence_minmax_edge_range_final() -> R // pu(em, 120, 1.4); // pu(em, 146, 1.3); // pu(em, 148, 1.2); - binner.ingest(evs)?; + binner.ingest(&evs)?; binner.input_done_range_final()?; let bins = binner.output(); for b in bins.iter_debug() { @@ -481,7 +508,7 @@ fn test_bin_events_enum_simple_range_final() -> Result<(), Error> { let mut evs = ContainerEvents::new(); evs.push_back(TsNano::from_ms(103), EnumVariant::new(1, "one")); evs.push_back(TsNano::from_ms(104), EnumVariant::new(2, "two")); - binner.ingest(evs)?; + binner.ingest(&evs)?; binner.input_done_range_final()?; let bins = binner.output(); Ok(()) diff --git a/src/binning/timeweight.rs b/src/binning/timeweight.rs index 8015202..38245c3 100644 --- a/src/binning/timeweight.rs +++ b/src/binning/timeweight.rs @@ -3,7 +3,6 @@ pub mod timeweight_bins_dyn; pub mod timeweight_events; pub mod timeweight_events_dyn; -use super::___; use netpod::log::*; #[allow(unused)] diff --git a/src/binning/timeweight/timeweight_bins.rs b/src/binning/timeweight/timeweight_bins.rs index 320924c..db16357 100644 --- a/src/binning/timeweight/timeweight_bins.rs +++ b/src/binning/timeweight/timeweight_bins.rs @@ -1,4 +1,3 @@ -use super::___; use netpod::log::*; #[allow(unused)] diff --git a/src/binning/timeweight/timeweight_events.rs b/src/binning/timeweight/timeweight_events.rs index c68008d..2c28152 100644 --- a/src/binning/timeweight/timeweight_events.rs +++ b/src/binning/timeweight/timeweight_events.rs @@ -4,6 +4,8 @@ use crate::binning::container_bins::ContainerBins; use crate::binning::container_events::ContainerEvents; use crate::binning::container_events::ContainerEventsTakeUpTo; use crate::binning::container_events::EventSingle; +use crate::binning::container_events::EventSingleRef; +use crate::binning::container_events::PartialOrdEvtA; use core::fmt; use daqbuf_err as err; use err::thiserror; @@ -14,41 +16,25 @@ use netpod::DtNano; use netpod::TsNano; use std::mem; -#[allow(unused)] -macro_rules! trace_ { ($($arg:tt)*) => ( if true { trace!($($arg)*); }) } +macro_rules! trace_ { ($($arg:tt)*) => ( if true { eprintln!($($arg)*); }) } -#[allow(unused)] -macro_rules! trace_init { ($($arg:tt)*) => ( if false { trace_!($($arg)*); }) } +macro_rules! trace_init { ($($arg:tt)*) => ( if true { trace_!($($arg)*); }) } -#[allow(unused)] -macro_rules! trace_cycle { ($($arg:tt)*) => ( if false { trace_!($($arg)*); }) } +macro_rules! trace_cycle { ($($arg:tt)*) => ( if true { trace_!($($arg)*); }) } -#[allow(unused)] -macro_rules! trace_event_next { ($($arg:tt)*) => ( if false { trace_!($($arg)*); }) } +macro_rules! trace_event_next { ($($arg:tt)*) => ( if true { trace_!($($arg)*); }) } -#[allow(unused)] -macro_rules! trace_ingest_init_lst { ($($arg:tt)*) => ( if false { trace_!($($arg)*); }) } +macro_rules! trace_ingest_init_lst { ($($arg:tt)*) => ( if true { trace_!($($arg)*); }) } -#[allow(unused)] -macro_rules! trace_ingest_minmax { ($($arg:tt)*) => ( if false { trace_!($($arg)*); }) } +macro_rules! trace_ingest_minmax { ($($arg:tt)*) => ( if true { trace_!($($arg)*); }) } -#[allow(unused)] -macro_rules! trace_ingest_event { ($($arg:tt)*) => ( if false { trace_!($($arg)*); }) } +macro_rules! trace_ingest_event { ($($arg:tt)*) => ( if true { trace_!($($arg)*); }) } -#[allow(unused)] -macro_rules! trace_ingest_firsts { ($($arg:tt)*) => ( if false { trace_!($($arg)*); }) } +macro_rules! trace_ingest_container { ($($arg:tt)*) => ( if true { trace_!($($arg)*); }) } -#[allow(unused)] -macro_rules! trace_ingest_finish_bin { ($($arg:tt)*) => ( if false { trace_!($($arg)*); }) } +macro_rules! trace_ingest_container_2 { ($($arg:tt)*) => ( if true { trace_!($($arg)*); }) } -#[allow(unused)] -macro_rules! trace_ingest_container { ($($arg:tt)*) => ( if false { trace_!($($arg)*); }) } - -#[allow(unused)] -macro_rules! trace_ingest_container_2 { ($($arg:tt)*) => ( if false { trace_!($($arg)*); }) } - -#[allow(unused)] -macro_rules! trace_fill_until { ($($arg:tt)*) => ( if false { trace_!($($arg)*); }) } +macro_rules! trace_fill_until { ($($arg:tt)*) => ( if true { trace_!($($arg)*); }) } #[cold] #[inline] @@ -98,7 +84,11 @@ where EVT: EventValueType, { // NOTE that this is also used during bin-cycle. - fn ingest_event_with_lst_gt_range_beg_agg(&mut self, ev: EventSingle, lst: LstRef) { + fn ingest_event_with_lst_gt_range_beg_agg( + &mut self, + ev: EventSingleRef, + lst: LstRef, + ) { let selfname = "ingest_event_with_lst_gt_range_beg_agg"; trace_ingest_event!("{selfname} {:?}", ev); if DEBUG_CHECKS { @@ -120,7 +110,11 @@ where self.filled_until = ev.ts; } - fn ingest_event_with_lst_gt_range_beg_2(&mut self, ev: EventSingle, lst: LstMut) -> Result<(), Error> { + fn ingest_event_with_lst_gt_range_beg_2( + &mut self, + ev: EventSingleRef, + lst: LstMut, + ) -> Result<(), Error> { let selfname = "ingest_event_with_lst_gt_range_beg_2"; trace_ingest_event!("{selfname}"); self.ingest_event_with_lst_gt_range_beg_agg(ev.clone(), LstRef(lst.0)); @@ -131,7 +125,7 @@ where fn ingest_event_with_lst_gt_range_beg( &mut self, - ev: EventSingle, + ev: EventSingleRef, lst: LstMut, minmax: &mut MinMax, ) -> Result<(), Error> { @@ -146,7 +140,7 @@ where fn ingest_event_with_lst_eq_range_beg( &mut self, - ev: EventSingle, + ev: EventSingleRef, lst: LstMut, minmax: &mut MinMax, ) -> Result<(), Error> { @@ -161,13 +155,13 @@ where fn ingest_with_lst_gt_range_beg( &mut self, - mut evs: ContainerEventsTakeUpTo, + evs: &mut ContainerEventsTakeUpTo, lst: LstMut, minmax: &mut MinMax, ) -> Result<(), Error> { let selfname = "ingest_with_lst_gt_range_beg"; - trace_ingest_event!("{selfname}"); - while let Some(ev) = evs.pop_front() { + trace_ingest_event!("{selfname} len {}", evs.len()); + while let Some(ev) = evs.next() { trace_event_next!("EVENT POP FRONT {:?} {:30}", ev, selfname); if ev.ts <= self.active_beg { panic!("should never get here"); @@ -183,41 +177,42 @@ where fn ingest_with_lst_ge_range_beg( &mut self, - mut evs: ContainerEventsTakeUpTo, + evs: &mut ContainerEventsTakeUpTo, lst: LstMut, minmax: &mut MinMax, ) -> Result<(), Error> { let selfname = "ingest_with_lst_ge_range_beg"; - trace_ingest_event!("{selfname}"); - while let Some(ev) = evs.pop_front() { + trace_ingest_event!("{selfname} len {}", evs.len()); + while let Some(ev) = evs.next() { trace_event_next!("EVENT POP FRONT {:?} {:30}", ev, selfname); - if ev.ts < self.active_beg { - panic!("should never get here"); - } - if ev.ts >= self.active_end { - panic!("should never get here"); - } + assert!(ev.ts >= self.active_beg); + assert!(ev.ts < self.active_end); if ev.ts == self.active_beg { + trace_ingest_event!("{selfname} ts == active_beg"); self.ingest_event_with_lst_eq_range_beg(ev, LstMut(lst.0), minmax)?; self.cnt += 1; } else { + trace_ingest_event!("{selfname} ts != active_beg"); self.ingest_event_with_lst_gt_range_beg(ev.clone(), LstMut(lst.0), minmax)?; self.cnt += 1; - trace_ingest_event!("{selfname} now calling ingest_with_lst_gt_range_beg"); - return self.ingest_with_lst_gt_range_beg(evs, LstMut(lst.0), minmax); + break; } } - Ok(()) + trace_ingest_event!( + "{selfname} defer remainder to ingest_with_lst_gt_range_beg len {}", + evs.len() + ); + self.ingest_with_lst_gt_range_beg(evs, LstMut(lst.0), minmax) } fn ingest_with_lst_minmax( &mut self, - evs: ContainerEventsTakeUpTo, + evs: &mut ContainerEventsTakeUpTo, lst: LstMut, minmax: &mut MinMax, ) -> Result<(), Error> { let selfname = "ingest_with_lst_minmax"; - trace_ingest_event!("{selfname}"); + trace_ingest_event!("{selfname} len {}", evs.len()); // TODO how to handle the min max? I don't take event data yet out of the container. if let Some(ts0) = evs.ts_first() { trace_ingest_event!("EVENT POP FRONT {selfname}"); @@ -260,38 +255,50 @@ impl InnerA where EVT: EventValueType, { - fn apply_min_max(ev: &EventSingle, minmax: &mut MinMax) { - if ev.val < minmax.0.val { - minmax.0 = ev.clone(); + fn apply_min_max(ev: &EventSingleRef, minmax: &mut MinMax) { + if let Some(std::cmp::Ordering::Less) = ev.val.cmp_a(&minmax.0.val) { + minmax.0 = ev.into(); } - if ev.val > minmax.1.val { - minmax.1 = ev.clone(); + if let Some(std::cmp::Ordering::Greater) = ev.val.cmp_a(&minmax.1.val) { + minmax.1 = ev.into(); } + // if ev.val < minmax.0.val { + // minmax.0 = ev.into(); + // } + // if ev.val > minmax.1.val { + // minmax.1 = ev.into(); + // } } - fn apply_lst_after_event_handled(ev: EventSingle, lst: LstMut) { - *lst.0 = ev; + fn apply_lst_after_event_handled(ev: EventSingleRef, lst: LstMut) { + *lst.0 = ev.into(); } - fn init_minmax(&mut self, ev: &EventSingle) { + fn init_minmax(&mut self, ev: &EventSingleRef) { trace_ingest_minmax!("init_minmax {:?}", ev); - self.minmax = Some((ev.clone(), ev.clone())); + self.minmax = Some((ev.into(), ev.into())); } - fn init_minmax_with_lst(&mut self, ev: &EventSingle, lst: LstRef) { + fn init_minmax_with_lst(&mut self, ev: &EventSingleRef, lst: LstRef) { trace_ingest_minmax!("init_minmax_with_lst {:?} {:?}", ev, lst.0); let minmax = self.minmax.insert((lst.0.clone(), lst.0.clone())); Self::apply_min_max(ev, minmax); } - fn ingest_with_lst(&mut self, mut evs: ContainerEventsTakeUpTo, lst: LstMut) -> Result<(), Error> { + fn ingest_with_lst( + &mut self, + evs: &mut ContainerEventsTakeUpTo, + lst: LstMut, + ) -> Result<(), Error> { let selfname = "ingest_with_lst"; - trace_ingest_container!("{selfname} evs len {}", evs.len()); + trace_ingest_container!("{selfname} len {}", evs.len()); let b = &mut self.inner_b; if let Some(minmax) = self.minmax.as_mut() { b.ingest_with_lst_minmax(evs, lst, minmax) } else { - if let Some(ev) = evs.pop_front() { + let mut run_ingest_with_lst_minmax = false; + let _ = run_ingest_with_lst_minmax; + if let Some(ev) = evs.next() { trace_event_next!("EVENT POP FRONT {:?} {selfname:30}", ev); let beg = b.active_beg; let end = b.active_end; @@ -305,23 +312,31 @@ where InnerA::apply_lst_after_event_handled(ev, lst); let b = &mut self.inner_b; b.cnt += 1; - Ok(()) + return Ok(()); } else { self.init_minmax_with_lst(&ev, LstRef(lst.0)); let b = &mut self.inner_b; - if let Some(minmax) = self.minmax.as_mut() { + { if ev.ts == beg { panic!("logic error, is handled before"); } else { b.ingest_event_with_lst_gt_range_beg_2(ev, LstMut(lst.0))?; } b.cnt += 1; - b.ingest_with_lst_minmax(evs, lst, minmax) - } else { - Err(Error::NoMinMaxAfterInit) + run_ingest_with_lst_minmax = true; } } } + } else { + return Ok(()); + } + if run_ingest_with_lst_minmax { + if let Some(minmax) = self.minmax.as_mut() { + let b = &mut self.inner_b; + b.ingest_with_lst_minmax(evs, lst, minmax) + } else { + return Err(Error::NoMinMaxAfterInit); + } } else { Ok(()) } @@ -348,7 +363,12 @@ where self.minmax = Some((lst.0.clone(), lst.0.clone())); } - fn push_out_and_reset(&mut self, lst: LstRef, range_final: bool, out: &mut ContainerBins) { + fn push_out_and_reset( + &mut self, + lst: LstRef, + range_final: bool, + out: &mut ContainerBins, + ) { let selfname = "push_out_and_reset"; // TODO there is not always good enough input to produce a meaningful bin. // TODO can we always reset, and what exactly does reset mean here? @@ -406,6 +426,7 @@ where EVT: EventValueType, { pub fn new(range: BinnedRange) -> Self { + trace_init!("BinnedEventsTimeweight::new {}", range); let active_beg = range.nano_beg(); let active_end = active_beg.add_dt_nano(range.bin_len.to_dt_nano()); let active_len = active_end.delta(active_beg); @@ -419,7 +440,9 @@ where active_len, filled_until: active_beg, filled_width: DtNano::from_ns(0), - agg: <::AggregatorTimeWeight as AggregatorTimeWeight>::new(), + agg: <::AggregatorTimeWeight as AggregatorTimeWeight< + EVT, + >>::new(), }, minmax: None, }, @@ -435,14 +458,14 @@ where ret } - fn ingest_event_without_lst(&mut self, ev: EventSingle) -> Result<(), Error> { + fn ingest_event_without_lst(&mut self, ev: EventSingleRef) -> Result<(), Error> { let selfname = "ingest_event_without_lst"; let b = &self.inner_a.inner_b; if ev.ts >= b.active_end { panic!("{selfname} should never get here"); } else { trace_ingest_init_lst!("ingest_event_without_lst set lst {:?}", ev); - self.lst = Some(ev.clone()); + self.lst = Some((&ev).into()); if ev.ts >= b.active_beg { trace_ingest_minmax!("ingest_event_without_lst"); self.inner_a.init_minmax(&ev); @@ -454,19 +477,24 @@ where } } - fn ingest_without_lst(&mut self, mut evs: ContainerEventsTakeUpTo) -> Result<(), Error> { + fn ingest_without_lst(&mut self, evs: &mut ContainerEventsTakeUpTo) -> Result<(), Error> { let selfname = "ingest_without_lst"; - if let Some(ev) = evs.pop_front() { + trace_ingest_container!("{selfname} len {}", evs.len()); + let mut run_ingest_with_lst = false; + let _ = run_ingest_with_lst; + if let Some(ev) = evs.next() { trace_event_next!("EVENT POP FRONT {:?} {:30}", ev, selfname); - if ev.ts >= self.inner_a.inner_b.active_end { - panic!("{selfname} should never get here"); + assert!(ev.ts < self.inner_a.inner_b.active_end); + self.ingest_event_without_lst(ev)?; + run_ingest_with_lst = true; + } else { + return Ok(()); + } + if run_ingest_with_lst { + if let Some(lst) = self.lst.as_mut() { + self.inner_a.ingest_with_lst(evs, LstMut(lst)) } else { - self.ingest_event_without_lst(ev)?; - if let Some(lst) = self.lst.as_mut() { - self.inner_a.ingest_with_lst(evs, LstMut(lst)) - } else { - Err(Error::NoLstAfterFirst) - } + Err(Error::NoLstAfterFirst) } } else { Ok(()) @@ -475,7 +503,12 @@ where // Caller asserts that evs is ordered within the current container // and with respect to the last container, if any. - fn ingest_ordered(&mut self, evs: ContainerEventsTakeUpTo) -> Result<(), Error> { + fn ingest_ordered(&mut self, evs: &mut ContainerEventsTakeUpTo) -> Result<(), Error> { + let selfname = "ingest_ordered"; + trace_ingest_container!( + "------------------------------------\n{selfname} len {}", + evs.len() + ); if let Some(lst) = self.lst.as_mut() { self.inner_a.ingest_with_lst(evs, LstMut(lst)) } else { @@ -508,7 +541,8 @@ where if b.filled_until < b.active_end { self.inner_a.inner_b.fill_until(b.active_end, lst.clone()); } - self.inner_a.push_out_and_reset(lst.clone(), true, &mut self.out); + self.inner_a + .push_out_and_reset(lst.clone(), true, &mut self.out); } else { self.inner_a.inner_b.fill_until(ts, lst.clone()); } @@ -523,7 +557,8 @@ where if b.filled_until < b.active_end { self.inner_a.inner_b.fill_until(b.active_end, lst.clone()); } - self.inner_a.push_out_and_reset(lst.clone(), true, &mut self.out); + self.inner_a + .push_out_and_reset(lst.clone(), true, &mut self.out); } else { // TODO should not hit this case. Prove it, assert it. self.inner_a.inner_b.fill_until(ts, lst.clone()); @@ -572,7 +607,7 @@ where } } - pub fn ingest(&mut self, mut evs_all: ContainerEvents) -> Result<(), Error> { + pub fn ingest(&mut self, evs: &ContainerEvents) -> Result<(), Error> { // It is this type's task to find and store the one-before event. // We then pass it to the aggregation. // AggregatorTimeWeight needs a function for that. @@ -583,41 +618,60 @@ where // ALSO: need to keep track of the "lst". Probably best done in this type as well? // TODO should rely on external stream adapter for verification to not duplicate things. - evs_all.verify()?; + evs.verify()?; + + let mut evs = ContainerEventsTakeUpTo::new(evs); loop { - break if let Some(ts) = evs_all.ts_first() { + trace_ingest_container!( + "main-ingest-loop UNCONSTRAINED len {} pos {}", + evs.len(), + evs.pos() + ); + break if let Some(ts) = evs.ts_first() { trace_ingest_event!("EVENT TIMESTAMP FRONT {:?} ingest", ts); let b = &mut self.inner_a.inner_b; if ts >= self.range.nano_end() { return Err(Error::EventAfterRange); } if ts >= b.active_end { - assert!(b.filled_until < b.active_end, "{} < {}", b.filled_until, b.active_end); + assert!( + b.filled_until < b.active_end, + "{} < {}", + b.filled_until, + b.active_end + ); self.cycle_01(ts); } - let n1 = evs_all.len(); - let len_before = evs_all.len_before(self.inner_a.inner_b.active_end); - let evs = ContainerEventsTakeUpTo::new(&mut evs_all, len_before); - if let Some(lst) = self.lst.as_ref() { - if ts < lst.ts { - return Err(Error::Unordered); + let n1 = evs.len(); + evs.constrain_up_to_ts(self.inner_a.inner_b.active_end); + { + trace_ingest_container!( + "main-ingest-loop len {} pos {}", + evs.len(), + evs.pos() + ); + if let Some(lst) = self.lst.as_ref() { + if ts < lst.ts { + return Err(Error::Unordered); + } else { + self.ingest_ordered(&mut evs)? + } } else { - self.ingest_ordered(evs)? - } + self.ingest_ordered(&mut evs)? + }; + trace_ingest_container_2!("ingest after still left len evs {}", evs.len()); + } + evs.extend_to_all(); + let n2 = evs.len(); + if n2 == 0 { + // done } else { - self.ingest_ordered(evs)? - }; - trace_ingest_container_2!("ingest after still left len evs {}", evs_all.len()); - let n2 = evs_all.len(); - if n2 != 0 { - if n2 == n1 { - panic!("no progress"); - } + assert!(n2 < n1, "no progress"); continue; } } else { - () + // done }; } Ok(()) diff --git a/src/binning/timeweight/timeweight_events_dyn.rs b/src/binning/timeweight/timeweight_events_dyn.rs index b5f71bc..fc62cf2 100644 --- a/src/binning/timeweight/timeweight_events_dyn.rs +++ b/src/binning/timeweight/timeweight_events_dyn.rs @@ -56,22 +56,32 @@ impl BinnedEventsTimeweightTrait for BinnedEventsTimeweightDynbox where EVT: EventValueType, { - fn ingest(&mut self, mut evs: EventsBoxed) -> Result<(), BinningggError> { + fn ingest(&mut self, evs: &EventsBoxed) -> Result<(), BinningggError> { // let a = (&evs as &dyn any::Any).downcast_ref::(); // evs.downcast::(); // evs.as_anybox().downcast::>(); - match evs.to_anybox().downcast::>() { - Ok(evs) => { - let evs = { - let a = evs; - *a + // match evs.to_anybox().downcast::>() { + // Ok(evs) => { + // let evs = { + // let a = evs; + // *a + // }; + // Ok(self.binner.ingest(evs)?) + // } + // Err(_) => Err(BinningggError::TypeMismatch { + // have: evs.type_name().into(), + // expect: std::any::type_name::>().into(), + // }), + // } + match evs.as_any_ref().downcast_ref::>() { + Some(evs) => Ok(self.binner.ingest(evs)?), + None => { + let e = BinningggError::TypeMismatch { + have: evs.type_name().into(), + expect: std::any::type_name::>().into(), }; - Ok(self.binner.ingest(evs)?) + Err(e) } - Err(_) => Err(BinningggError::TypeMismatch { - have: evs.type_name().into(), - expect: std::any::type_name::>().into(), - }), } } @@ -109,10 +119,10 @@ impl BinnedEventsTimeweightLazy { } impl BinnedEventsTimeweightTrait for BinnedEventsTimeweightLazy { - fn ingest(&mut self, evs_all: EventsBoxed) -> Result<(), BinningggError> { + fn ingest(&mut self, evs: &EventsBoxed) -> Result<(), BinningggError> { self.binned_events - .get_or_insert_with(|| evs_all.binned_events_timeweight_traitobj(self.range.clone())) - .ingest(evs_all) + .get_or_insert_with(|| evs.binned_events_timeweight_traitobj(self.range.clone())) + .ingest(evs) } fn input_done_range_final(&mut self) -> Result<(), BinningggError> { @@ -133,7 +143,10 @@ impl BinnedEventsTimeweightTrait for BinnedEventsTimeweightLazy { } fn output(&mut self) -> Result, BinningggError> { - self.binned_events.as_mut().map(|x| x.output()).unwrap_or(Ok(None)) + self.binned_events + .as_mut() + .map(|x| x.output()) + .unwrap_or(Ok(None)) } } @@ -151,7 +164,10 @@ pub struct BinnedEventsTimeweightStream { } impl BinnedEventsTimeweightStream { - pub fn new(range: BinnedRange, inp: Pin> + Send>>) -> Self { + pub fn new( + range: BinnedRange, + inp: Pin> + Send>>, + ) -> Self { Self { state: StreamState::Reading, inp, @@ -173,7 +189,10 @@ impl BinnedEventsTimeweightStream { Ok(x) => match x { DataItem(x) => match x { Data(x) => match x { - ChannelEvents::Events(evs) => match self.binned_events.ingest(evs.to_container_events()) { + ChannelEvents::Events(evs) => match self + .binned_events + .ingest(&evs.to_container_events()) + { Ok(()) => { match self.binned_events.output() { Ok(Some(x)) => { @@ -210,7 +229,10 @@ impl BinnedEventsTimeweightStream { } } - fn handle_eos(mut self: Pin<&mut Self>, _cx: &mut Context) -> Poll::Item>> { + fn handle_eos( + mut self: Pin<&mut Self>, + _cx: &mut Context, + ) -> Poll::Item>> { trace_input_container!("handle_eos"); use items_0::streamitem::RangeCompletableItem::*; use items_0::streamitem::StreamItem::*; @@ -225,7 +247,11 @@ impl BinnedEventsTimeweightStream { .input_done_range_open() .map_err(err::Error::from_string)?; } - match self.binned_events.output().map_err(err::Error::from_string)? { + match self + .binned_events + .output() + .map_err(err::Error::from_string)? + { Some(x) => { trace_emit!("seeing ready bins {:?}", x); Ready(Some(Ok(DataItem(Data(x))))) @@ -237,7 +263,10 @@ impl BinnedEventsTimeweightStream { } } - fn handle_main(mut self: Pin<&mut Self>, cx: &mut Context) -> ControlFlow::Item>>> { + fn handle_main( + mut self: Pin<&mut Self>, + cx: &mut Context, + ) -> ControlFlow::Item>>> { use ControlFlow::*; use Poll::*; let ret = match &self.state { diff --git a/src/binning/valuetype.rs b/src/binning/valuetype.rs index d1b373f..263e90a 100644 --- a/src/binning/valuetype.rs +++ b/src/binning/valuetype.rs @@ -1,10 +1,12 @@ use super::aggregator::AggregatorTimeWeight; use super::container_events::Container; use super::container_events::EventValueType; +use super::container_events::PartialOrdEvtA; use core::fmt; use items_0::vecpreview::PreviewRange; use netpod::DtNano; use netpod::EnumVariant; +use netpod::EnumVariantRef; use serde::Deserialize; use serde::Serialize; use std::collections::VecDeque; @@ -46,6 +48,18 @@ impl Container for EnumVariantContainer { None } } + + fn get_iter_ty_1(&self, pos: usize) -> Option<::IterTy1<'_>> { + if let (Some(&ix), Some(name)) = (self.ixs.get(pos), self.names.get(pos)) { + let ret = EnumVariantRef { + ix, + name: name.as_str(), + }; + Some(ret) + } else { + None + } + } } #[derive(Debug)] @@ -78,8 +92,26 @@ impl AggregatorTimeWeight for EnumVariantAggregatorTimeWeight { } } +impl<'a> PartialOrdEvtA for EnumVariantRef<'a> { + fn cmp_a(&self, other: &EnumVariant) -> Option { + use std::cmp::Ordering::*; + let x = self.ix.partial_cmp(&other.ix()); + if let Some(Equal) = x { + let x = self.name.partial_cmp(other.name()); + if let Some(Equal) = x { + Some(Equal) + } else { + x + } + } else { + x + } + } +} + impl EventValueType for EnumVariant { type Container = EnumVariantContainer; type AggregatorTimeWeight = EnumVariantAggregatorTimeWeight; type AggTimeWeightOutputAvg = f32; + type IterTy1<'a> = EnumVariantRef<'a>; } diff --git a/src/channelevents.rs b/src/channelevents.rs index d696127..7eb6857 100644 --- a/src/channelevents.rs +++ b/src/channelevents.rs @@ -28,8 +28,6 @@ use std::collections::VecDeque; use std::time::Duration; use std::time::SystemTime; -macro_rules! trace_ingest { ($($arg:tt)*) => ( if true { trace!($($arg)*); }) } - // TODO maybe rename to ChannelStatus? #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] pub enum ConnStatus { diff --git a/src/eventsdim0.rs b/src/eventsdim0.rs index f9a10c8..c2bf0d3 100644 --- a/src/eventsdim0.rs +++ b/src/eventsdim0.rs @@ -279,7 +279,11 @@ pub struct EventsDim0CollectorOutput { range_final: bool, #[serde(rename = "timedOut", default, skip_serializing_if = "is_false")] timed_out: bool, - #[serde(rename = "continueAt", default, skip_serializing_if = "Option::is_none")] + #[serde( + rename = "continueAt", + default, + skip_serializing_if = "Option::is_none" + )] continue_at: Option, } @@ -440,8 +444,8 @@ impl CollectorTy for EventsDim0Collector { }; let tss_sl = vals.tss.make_contiguous(); let pulses_sl = vals.pulses.make_contiguous(); - let (ts_anchor_sec, ts_off_ms, ts_off_ns) = crate::ts_offs_from_abs(tss_sl); - let (pulse_anchor, pulse_off) = crate::pulse_offs_from_abs(pulses_sl); + let (ts_anchor_sec, ts_off_ms, ts_off_ns) = crate::offsets::ts_offs_from_abs(tss_sl); + let (pulse_anchor, pulse_off) = crate::offsets::pulse_offs_from_abs(pulses_sl); let values = mem::replace(&mut vals.values, VecDeque::new()); if ts_off_ms.len() != ts_off_ns.len() { return Err(Error::with_msg_no_trace("collected len mismatch")); @@ -478,27 +482,6 @@ impl items_0::collect_s::CollectableType for EventsDim0 { } } -#[derive(Debug)] -pub struct EventsDim0Aggregator { - range: SeriesRange, - count: u64, - minmaxlst: Option<(STY, STY, STY)>, - sumc: u64, - sum: f32, - int_ts: u64, - last_ts: u64, - do_time_weight: bool, - events_ignored_count: u64, - items_seen: usize, -} - -impl Drop for EventsDim0Aggregator { - fn drop(&mut self) { - // TODO collect as stats for the request context: - trace!("count {} ignored {}", self.count, self.events_ignored_count); - } -} - impl TypeName for EventsDim0 { fn type_name(&self) -> String { let self_name = any::type_name::(); @@ -583,7 +566,11 @@ impl Events for EventsDim0 { let tss = self.tss.drain(..n1).collect(); let pulses = self.pulses.drain(..n1).collect(); let values = self.values.drain(..n1).collect(); - let ret = Self { tss, pulses, values }; + let ret = Self { + tss, + pulses, + values, + }; Box::new(ret) } @@ -591,7 +578,11 @@ impl Events for EventsDim0 { Box::new(Self::empty()) } - fn drain_into_evs(&mut self, dst: &mut dyn Events, range: (usize, usize)) -> Result<(), MergeError> { + fn drain_into_evs( + &mut self, + dst: &mut dyn Events, + range: (usize, usize), + ) -> Result<(), MergeError> { // TODO as_any and as_any_mut are declared on unrelated traits. Simplify. if let Some(dst) = dst.as_any_mut().downcast_mut::() { // TODO make it harder to forget new members when the struct may get modified in the future @@ -607,7 +598,7 @@ impl Events for EventsDim0 { dst.type_name() ); panic!(); - Err(MergeError::NotCompatible) + // Err(MergeError::NotCompatible) } } @@ -696,8 +687,8 @@ impl Events for EventsDim0 { let mut values = self.values.clone(); let tss_sl = tss.make_contiguous(); let pulses_sl = pulses.make_contiguous(); - let (ts_anchor_sec, ts_off_ms, ts_off_ns) = crate::ts_offs_from_abs(tss_sl); - let (pulse_anchor, pulse_off) = crate::pulse_offs_from_abs(pulses_sl); + let (ts_anchor_sec, ts_off_ms, ts_off_ns) = crate::offsets::ts_offs_from_abs(tss_sl); + let (pulse_anchor, pulse_off) = crate::offsets::pulse_offs_from_abs(pulses_sl); let values = mem::replace(&mut values, VecDeque::new()); let ret = EventsDim0CollectorOutput { ts_anchor_sec, @@ -759,7 +750,10 @@ impl Events for EventsDim0 { try_to_container_events!(bool, self); try_to_container_events!(String, self); let this = self; - if let Some(evs) = self.as_any_ref().downcast_ref::>() { + if let Some(evs) = self + .as_any_ref() + .downcast_ref::>() + { use crate::binning::container_events::ContainerEvents; let tss = this.tss.iter().map(|&x| TsNano::from_ns(x)).collect(); use crate::binning::container_events::Container; diff --git a/src/eventsdim0enum.rs b/src/eventsdim0enum.rs index 50f7da2..da4319e 100644 --- a/src/eventsdim0enum.rs +++ b/src/eventsdim0enum.rs @@ -4,7 +4,6 @@ use items_0::collect_s::CollectableDyn; use items_0::collect_s::CollectedDyn; use items_0::collect_s::CollectorDyn; use items_0::collect_s::CollectorTy; -use items_0::collect_s::ToJsonBytes; use items_0::collect_s::ToJsonResult; use items_0::container::ByteEstimate; use items_0::isodate::IsoDateTime; @@ -88,11 +87,19 @@ pub struct EventsDim0EnumCollectorOutput { vals: VecDeque, #[serde(rename = "valuestrings")] valstrs: VecDeque, - #[serde(rename = "rangeFinal", default, skip_serializing_if = "netpod::is_false")] + #[serde( + rename = "rangeFinal", + default, + skip_serializing_if = "netpod::is_false" + )] range_final: bool, #[serde(rename = "timedOut", default, skip_serializing_if = "netpod::is_false")] timed_out: bool, - #[serde(rename = "continueAt", default, skip_serializing_if = "Option::is_none")] + #[serde( + rename = "continueAt", + default, + skip_serializing_if = "Option::is_none" + )] continue_at: Option, } @@ -154,7 +161,7 @@ impl CollectorTy for EventsDim0EnumCollector { fn result( &mut self, range: Option, - binrange: Option, + _binrange: Option, ) -> Result { trace_collect_result!( "{} result() needs_continue_at {}", @@ -188,7 +195,7 @@ impl CollectorTy for EventsDim0EnumCollector { None }; let tss_sl = vals.tss.make_contiguous(); - let (ts_anchor_sec, ts_off_ms, ts_off_ns) = crate::ts_offs_from_abs(tss_sl); + let (ts_anchor_sec, ts_off_ms, ts_off_ns) = crate::offsets::ts_offs_from_abs(tss_sl); let valixs = mem::replace(&mut vals.values, VecDeque::new()); let valstrs = mem::replace(&mut vals.valuestrs, VecDeque::new()); let vals = valixs; @@ -291,7 +298,7 @@ impl TimeBinnerTy for EventsDim0EnumTimeBinner { type Input = EventsDim0Enum; type Output = (); - fn ingest(&mut self, item: &mut Self::Input) { + fn ingest(&mut self, _item: &mut Self::Input) { todo!() } @@ -307,7 +314,7 @@ impl TimeBinnerTy for EventsDim0EnumTimeBinner { todo!() } - fn push_in_progress(&mut self, push_empty: bool) { + fn push_in_progress(&mut self, _push_empty: bool) { todo!() } @@ -330,9 +337,9 @@ impl TimeBinnableTy for EventsDim0Enum { fn time_binner_new( &self, - binrange: BinnedRangeEnum, - do_time_weight: bool, - emit_empty_bins: bool, + _binrange: BinnedRangeEnum, + _do_time_weight: bool, + _emit_empty_bins: bool, ) -> Self::TimeBinner { todo!() } @@ -377,7 +384,7 @@ impl Events for EventsDim0Enum { todo!() } - fn take_new_events_until_ts(&mut self, ts_end: u64) -> Box { + fn take_new_events_until_ts(&mut self, _ts_end: u64) -> Box { todo!() } @@ -385,19 +392,23 @@ impl Events for EventsDim0Enum { todo!() } - fn drain_into_evs(&mut self, dst: &mut dyn Events, range: (usize, usize)) -> Result<(), items_0::MergeError> { + fn drain_into_evs( + &mut self, + _dst: &mut dyn Events, + _range: (usize, usize), + ) -> Result<(), items_0::MergeError> { todo!() } - fn find_lowest_index_gt_evs(&self, ts: u64) -> Option { + fn find_lowest_index_gt_evs(&self, _ts: u64) -> Option { todo!() } - fn find_lowest_index_ge_evs(&self, ts: u64) -> Option { + fn find_lowest_index_ge_evs(&self, _ts: u64) -> Option { todo!() } - fn find_highest_index_lt_evs(&self, ts: u64) -> Option { + fn find_highest_index_lt_evs(&self, _ts: u64) -> Option { todo!() } @@ -405,7 +416,7 @@ impl Events for EventsDim0Enum { todo!() } - fn partial_eq_dyn(&self, other: &dyn Events) -> bool { + fn partial_eq_dyn(&self, _other: &dyn Events) -> bool { todo!() } diff --git a/src/eventsdim1.rs b/src/eventsdim1.rs index 88a4358..1b3a352 100644 --- a/src/eventsdim1.rs +++ b/src/eventsdim1.rs @@ -400,8 +400,8 @@ impl CollectorTy for EventsDim1Collector { }; let tss_sl = vals.tss.make_contiguous(); let pulses_sl = vals.pulses.make_contiguous(); - let (ts_anchor_sec, ts_off_ms, ts_off_ns) = crate::ts_offs_from_abs(tss_sl); - let (pulse_anchor, pulse_off) = crate::pulse_offs_from_abs(pulses_sl); + let (ts_anchor_sec, ts_off_ms, ts_off_ns) = crate::offsets::ts_offs_from_abs(tss_sl); + let (pulse_anchor, pulse_off) = crate::offsets::pulse_offs_from_abs(pulses_sl); let values = mem::replace(&mut vals.values, VecDeque::new()); if ts_off_ms.len() != ts_off_ns.len() { return Err(Error::with_msg_no_trace("collected len mismatch")); diff --git a/src/framable.rs b/src/framable.rs index 33629c4..7b08c18 100644 --- a/src/framable.rs +++ b/src/framable.rs @@ -202,7 +202,7 @@ fn test_frame_log() { let item: Sitemty = Ok(StreamItem::Log(item)); let buf = Framable::make_frame_dyn(&item).unwrap(); let len = u32::from_le_bytes(buf[12..16].try_into().unwrap()); - let item2: LogItem = decode_from_slice(&buf[20..20 + len as usize]).unwrap(); + let _item2: LogItem = decode_from_slice(&buf[20..20 + len as usize]).unwrap(); } #[test] @@ -217,5 +217,6 @@ fn test_frame_error() { panic!("bad tyid"); } eprintln!("buf len {} len {}", buf.len(), len); - let item2: items_0::streamitem::SitemErrTy = json_from_slice(&buf[20..20 + len as usize]).unwrap(); + let _item2: items_0::streamitem::SitemErrTy = + json_from_slice(&buf[20..20 + len as usize]).unwrap(); } diff --git a/src/items_2.rs b/src/items_2.rs deleted file mode 100644 index 9bb972d..0000000 --- a/src/items_2.rs +++ /dev/null @@ -1,182 +0,0 @@ -pub mod accounting; -pub mod binning; -pub mod channelevents; -pub mod empty; -pub mod eventfull; -pub mod eventsdim0; -pub mod eventsdim0enum; -pub mod eventsdim1; -pub mod framable; -pub mod frame; -pub mod inmem; -pub mod merger; -pub mod streams; -#[cfg(feature = "heavy")] -#[cfg(test)] -pub mod test; -pub mod testgen; -pub mod transform; - -use channelevents::ChannelEvents; -use daqbuf_err as err; -use futures_util::Stream; -use items_0::isodate::IsoDateTime; -use items_0::streamitem::Sitemty; -use items_0::transform::EventTransform; -use items_0::Events; -use items_0::MergeError; -use merger::Mergeable; -use netpod::timeunits::*; -use std::collections::VecDeque; -use std::fmt; - -pub fn ts_offs_from_abs(tss: &[u64]) -> (u64, VecDeque, VecDeque) { - let ts_anchor_sec = tss.first().map_or(0, |&k| k) / SEC; - let ts_anchor_ns = ts_anchor_sec * SEC; - let ts_off_ms: VecDeque<_> = tss.iter().map(|&k| (k - ts_anchor_ns) / MS).collect(); - let ts_off_ns = tss - .iter() - .zip(ts_off_ms.iter().map(|&k| k * MS)) - .map(|(&j, k)| (j - ts_anchor_ns - k)) - .collect(); - (ts_anchor_sec, ts_off_ms, ts_off_ns) -} - -pub fn ts_offs_from_abs_with_anchor( - ts_anchor_sec: u64, - tss: &[u64], -) -> (VecDeque, VecDeque) { - let ts_anchor_ns = ts_anchor_sec * SEC; - let ts_off_ms: VecDeque<_> = tss.iter().map(|&k| (k - ts_anchor_ns) / MS).collect(); - let ts_off_ns = tss - .iter() - .zip(ts_off_ms.iter().map(|&k| k * MS)) - .map(|(&j, k)| (j - ts_anchor_ns - k)) - .collect(); - (ts_off_ms, ts_off_ns) -} - -pub fn pulse_offs_from_abs(pulse: &[u64]) -> (u64, VecDeque) { - let pulse_anchor = pulse.first().map_or(0, |&k| k) / 10000 * 10000; - let pulse_off = pulse.iter().map(|&k| k - pulse_anchor).collect(); - (pulse_anchor, pulse_off) -} - -#[derive(Debug, PartialEq)] -pub enum ErrorKind { - General, - #[allow(unused)] - MismatchedType, -} - -// TODO stack error better -#[derive(Debug, PartialEq)] -pub struct Error { - #[allow(unused)] - kind: ErrorKind, - msg: Option, -} - -impl fmt::Display for Error { - fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - write!(fmt, "{self:?}") - } -} - -impl From for Error { - fn from(kind: ErrorKind) -> Self { - Self { kind, msg: None } - } -} - -impl From for Error { - fn from(msg: String) -> Self { - Self { - msg: Some(msg), - kind: ErrorKind::General, - } - } -} - -// TODO this discards structure -impl From for Error { - fn from(e: err::Error) -> Self { - Self { - msg: Some(format!("{e}")), - kind: ErrorKind::General, - } - } -} - -// TODO this discards structure -impl From for err::Error { - fn from(e: Error) -> Self { - err::Error::with_msg_no_trace(format!("{e}")) - } -} - -impl std::error::Error for Error {} - -impl serde::de::Error for Error { - fn custom(msg: T) -> Self - where - T: fmt::Display, - { - format!("{msg}").into() - } -} - -pub fn make_iso_ts(tss: &[u64]) -> Vec { - tss.iter().map(|&k| IsoDateTime::from_ns_u64(k)).collect() -} - -impl Mergeable for Box { - fn ts_min(&self) -> Option { - self.as_ref().ts_min() - } - - fn ts_max(&self) -> Option { - self.as_ref().ts_max() - } - - fn new_empty(&self) -> Self { - self.as_ref().new_empty_evs() - } - - fn clear(&mut self) { - Events::clear(self.as_mut()) - } - - fn drain_into(&mut self, dst: &mut Self, range: (usize, usize)) -> Result<(), MergeError> { - self.as_mut().drain_into_evs(dst, range) - } - - fn find_lowest_index_gt(&self, ts: u64) -> Option { - self.as_ref().find_lowest_index_gt_evs(ts) - } - - fn find_lowest_index_ge(&self, ts: u64) -> Option { - self.as_ref().find_lowest_index_ge_evs(ts) - } - - fn find_highest_index_lt(&self, ts: u64) -> Option { - self.as_ref().find_highest_index_lt_evs(ts) - } - - fn tss(&self) -> Vec { - Events::tss(self) - .iter() - .map(|x| netpod::TsMs::from_ns_u64(*x)) - .collect() - } -} - -pub trait ChannelEventsInput: - Stream> + EventTransform + Send -{ -} - -impl ChannelEventsInput for T where - T: Stream> + EventTransform + Send -{ -} diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..ed03d9c --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,91 @@ +pub mod accounting; +pub mod binning; +pub mod channelevents; +pub mod empty; +pub mod eventfull; +pub mod eventsdim0; +pub mod eventsdim0enum; +pub mod eventsdim1; +pub mod framable; +pub mod frame; +pub mod inmem; +pub mod merger; +pub mod offsets; +pub mod streams; +#[cfg(feature = "heavy")] +#[cfg(test)] +pub mod test; +pub mod testgen; + +use daqbuf_err as err; +use items_0::isodate::IsoDateTime; +use items_0::Events; +use std::fmt; + +#[derive(Debug, PartialEq)] +pub enum ErrorKind { + General, + #[allow(unused)] + MismatchedType, +} + +// TODO stack error better +#[derive(Debug, PartialEq)] +pub struct Error { + #[allow(unused)] + kind: ErrorKind, + msg: Option, +} + +impl fmt::Display for Error { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + write!(fmt, "{self:?}") + } +} + +impl From for Error { + fn from(kind: ErrorKind) -> Self { + Self { kind, msg: None } + } +} + +impl From for Error { + fn from(msg: String) -> Self { + Self { + msg: Some(msg), + kind: ErrorKind::General, + } + } +} + +// TODO this discards structure +impl From for Error { + fn from(e: err::Error) -> Self { + Self { + msg: Some(format!("{e}")), + kind: ErrorKind::General, + } + } +} + +// TODO this discards structure +impl From for err::Error { + fn from(e: Error) -> Self { + err::Error::with_msg_no_trace(format!("{e}")) + } +} + +impl std::error::Error for Error {} + +impl serde::de::Error for Error { + fn custom(msg: T) -> Self + where + T: fmt::Display, + { + format!("{msg}").into() + } +} + +pub fn make_iso_ts(tss: &[u64]) -> Vec { + tss.iter().map(|&k| IsoDateTime::from_ns_u64(k)).collect() +} diff --git a/src/merger.rs b/src/merger.rs index 3c70219..135d75d 100644 --- a/src/merger.rs +++ b/src/merger.rs @@ -8,9 +8,6 @@ use items_0::streamitem::LogItem; use items_0::streamitem::RangeCompletableItem; use items_0::streamitem::Sitemty; use items_0::streamitem::StreamItem; -use items_0::transform::EventTransform; -use items_0::transform::TransformProperties; -use items_0::transform::WithTransformProperties; use items_0::Events; use items_0::MergeError; use items_0::WithLen; @@ -46,6 +43,47 @@ pub trait Mergeable: fmt::Debug + WithLen + ByteEstimate + Unpin { fn tss(&self) -> Vec; } +impl Mergeable for Box { + fn ts_min(&self) -> Option { + self.as_ref().ts_min() + } + + fn ts_max(&self) -> Option { + self.as_ref().ts_max() + } + + fn new_empty(&self) -> Self { + self.as_ref().new_empty_evs() + } + + fn clear(&mut self) { + Events::clear(self.as_mut()) + } + + fn drain_into(&mut self, dst: &mut Self, range: (usize, usize)) -> Result<(), MergeError> { + self.as_mut().drain_into_evs(dst, range) + } + + fn find_lowest_index_gt(&self, ts: u64) -> Option { + self.as_ref().find_lowest_index_gt_evs(ts) + } + + fn find_lowest_index_ge(&self, ts: u64) -> Option { + self.as_ref().find_lowest_index_ge_evs(ts) + } + + fn find_highest_index_lt(&self, ts: u64) -> Option { + self.as_ref().find_highest_index_lt_evs(ts) + } + + fn tss(&self) -> Vec { + Events::tss(self) + .iter() + .map(|x| netpod::TsMs::from_ns_u64(*x)) + .collect() + } +} + type MergeInp = Pin> + Send>>; pub struct Merger { @@ -479,18 +517,3 @@ where } } } - -impl WithTransformProperties for Merger { - fn query_transform_properties(&self) -> TransformProperties { - todo!() - } -} - -impl EventTransform for Merger -where - T: Send, -{ - fn transform(&mut self, _src: Box) -> Box { - todo!() - } -} diff --git a/src/offsets.rs b/src/offsets.rs new file mode 100644 index 0000000..9d91046 --- /dev/null +++ b/src/offsets.rs @@ -0,0 +1,35 @@ +use netpod::timeunits::MS; +use netpod::timeunits::SEC; +use std::collections::VecDeque; + +pub fn ts_offs_from_abs(tss: &[u64]) -> (u64, VecDeque, VecDeque) { + let ts_anchor_sec = tss.first().map_or(0, |&k| k) / SEC; + let ts_anchor_ns = ts_anchor_sec * SEC; + let ts_off_ms: VecDeque<_> = tss.iter().map(|&k| (k - ts_anchor_ns) / MS).collect(); + let ts_off_ns = tss + .iter() + .zip(ts_off_ms.iter().map(|&k| k * MS)) + .map(|(&j, k)| (j - ts_anchor_ns - k)) + .collect(); + (ts_anchor_sec, ts_off_ms, ts_off_ns) +} + +pub fn ts_offs_from_abs_with_anchor( + ts_anchor_sec: u64, + tss: &[u64], +) -> (VecDeque, VecDeque) { + let ts_anchor_ns = ts_anchor_sec * SEC; + let ts_off_ms: VecDeque<_> = tss.iter().map(|&k| (k - ts_anchor_ns) / MS).collect(); + let ts_off_ns = tss + .iter() + .zip(ts_off_ms.iter().map(|&k| k * MS)) + .map(|(&j, k)| (j - ts_anchor_ns - k)) + .collect(); + (ts_off_ms, ts_off_ns) +} + +pub fn pulse_offs_from_abs(pulse: &[u64]) -> (u64, VecDeque) { + let pulse_anchor = pulse.first().map_or(0, |&k| k) / 10000 * 10000; + let pulse_off = pulse.iter().map(|&k| k - pulse_anchor).collect(); + (pulse_anchor, pulse_off) +} diff --git a/src/streams.rs b/src/streams.rs index 99a3b78..dbbdc69 100644 --- a/src/streams.rs +++ b/src/streams.rs @@ -6,7 +6,6 @@ use items_0::streamitem::RangeCompletableItem; use items_0::streamitem::Sitemty; use items_0::streamitem::StreamItem; use items_0::transform::EventStreamTrait; -use items_0::transform::EventTransform; use items_0::transform::TransformProperties; use items_0::transform::WithTransformProperties; use items_0::Events; @@ -21,10 +20,7 @@ pub struct Enumerate2 { } impl Enumerate2 { - pub fn new(inp: T) -> Self - where - T: EventTransform, - { + pub fn new(inp: T) -> Self { Self { inp, cnt: 0 } } } @@ -58,15 +54,6 @@ where } } -impl EventTransform for Enumerate2 -where - T: WithTransformProperties + Send, -{ - fn transform(&mut self, src: Box) -> Box { - todo!() - } -} - pub struct Then2 { inp: Pin>, f: Pin>, @@ -78,10 +65,7 @@ where T: Stream, F: Fn(::Item) -> Fut, { - pub fn new(inp: T, f: F) -> Self - where - T: EventTransform, - { + pub fn new(inp: T, f: F) -> Self { Self { inp: Box::pin(inp), f: Box::pin(f), @@ -135,56 +119,6 @@ where } } -impl WithTransformProperties for Then2 -where - T: EventTransform, -{ - fn query_transform_properties(&self) -> TransformProperties { - self.inp.query_transform_properties() - } -} - -impl EventTransform for Then2 -where - T: EventTransform + Send, - F: Send, - Fut: Send, -{ - fn transform(&mut self, src: Box) -> Box { - todo!() - } -} - -pub trait TransformerExt { - fn enumerate2(self) -> Enumerate2 - where - Self: EventTransform + Sized; - - fn then2(self, f: F) -> Then2 - where - Self: EventTransform + Stream + Sized, - F: Fn(::Item) -> Fut, - Fut: Future; -} - -impl TransformerExt for T { - fn enumerate2(self) -> Enumerate2 - where - Self: EventTransform + Sized, - { - Enumerate2::new(self) - } - - fn then2(self, f: F) -> Then2 - where - Self: EventTransform + Stream + Sized, - F: Fn(::Item) -> Fut, - Fut: Future, - { - Then2::new(self, f) - } -} - pub struct VecStream { inp: VecDeque, } @@ -211,21 +145,6 @@ where } } -impl WithTransformProperties for VecStream { - fn query_transform_properties(&self) -> TransformProperties { - todo!() - } -} - -impl EventTransform for VecStream -where - T: Send, -{ - fn transform(&mut self, src: Box) -> Box { - todo!() - } -} - /// Wrap any event stream and provide transformation properties. pub struct PlainEventStream where @@ -259,7 +178,9 @@ where Ok(item) => Ok(match item { StreamItem::DataItem(item) => StreamItem::DataItem(match item { RangeCompletableItem::RangeComplete => RangeCompletableItem::RangeComplete, - RangeCompletableItem::Data(item) => RangeCompletableItem::Data(Box::new(item)), + RangeCompletableItem::Data(item) => { + RangeCompletableItem::Data(Box::new(item)) + } }), StreamItem::Log(item) => StreamItem::Log(item), StreamItem::Stats(item) => StreamItem::Stats(item), diff --git a/src/transform.rs b/src/transform.rs deleted file mode 100644 index 4a26a81..0000000 --- a/src/transform.rs +++ /dev/null @@ -1,84 +0,0 @@ -//! Helper functions to create transforms which act locally on a batch of events. -//! Tailored to the usage pattern given by `TransformQuery`. - -use crate::channelevents::ChannelEvents; -use crate::eventsdim0::EventsDim0; -use items_0::transform::EventTransform; -use items_0::transform::TransformEvent; -use items_0::transform::TransformProperties; -use items_0::transform::WithTransformProperties; -use items_0::Appendable; -use items_0::AsAnyMut; -use items_0::Empty; -use items_0::Events; -use items_0::EventsNonObj; -use netpod::log::*; -use std::mem; - -struct TransformEventIdentity {} - -impl WithTransformProperties for TransformEventIdentity { - fn query_transform_properties(&self) -> TransformProperties { - todo!() - } -} - -impl EventTransform for TransformEventIdentity { - fn transform(&mut self, src: Box) -> Box { - src - } -} - -pub fn make_transform_identity() -> TransformEvent { - TransformEvent(Box::new(TransformEventIdentity {})) -} - -struct TransformEventMinMaxAvg {} - -impl WithTransformProperties for TransformEventMinMaxAvg { - fn query_transform_properties(&self) -> TransformProperties { - todo!() - } -} - -impl EventTransform for TransformEventMinMaxAvg { - fn transform(&mut self, mut src: Box) -> Box { - src.to_min_max_avg() - } -} - -pub fn make_transform_min_max_avg() -> TransformEvent { - TransformEvent(Box::new(TransformEventMinMaxAvg {})) -} - -struct TransformEventPulseIdDiff { - pulse_last: Option, -} - -impl WithTransformProperties for TransformEventPulseIdDiff { - fn query_transform_properties(&self) -> TransformProperties { - todo!() - } -} - -impl EventTransform for TransformEventPulseIdDiff { - fn transform(&mut self, src: Box) -> Box { - let (tss, pulses) = EventsNonObj::into_tss_pulses(src); - let mut item = EventsDim0::empty(); - let pulse_last = &mut self.pulse_last; - for (ts, pulse) in tss.into_iter().zip(pulses) { - let value = if let Some(last) = pulse_last { - pulse as i64 - *last as i64 - } else { - 0 - }; - item.push(ts, pulse, value); - *pulse_last = Some(pulse); - } - Box::new(ChannelEvents::Events(Box::new(item))) - } -} - -pub fn make_transform_pulse_id_diff() -> TransformEvent { - TransformEvent(Box::new(TransformEventPulseIdDiff { pulse_last: None })) -}