diff --git a/src/binning.rs b/src/binning.rs index f3622f1..7378fbe 100644 --- a/src/binning.rs +++ b/src/binning.rs @@ -1,5 +1,6 @@ pub mod aggregator; pub mod binnedvaluetype; +pub mod container; pub mod container_bins; pub mod container_events; pub mod timeweight; diff --git a/src/binning/aggregator.rs b/src/binning/aggregator.rs index 2351718..cd860b2 100644 --- a/src/binning/aggregator.rs +++ b/src/binning/aggregator.rs @@ -1,3 +1,6 @@ +pub mod agg_bins; + +use super::container::bins::BinAggedType; use super::container_events::EventValueType; use core::fmt; use netpod::log::*; @@ -12,21 +15,21 @@ macro_rules! trace_event { ($($arg:tt)*) => ( if false { trace!($($arg)*); }) } #[allow(unused)] macro_rules! trace_result { ($($arg:tt)*) => ( if false { trace!($($arg)*); }) } -pub trait AggTimeWeightOutputAvg: fmt::Debug + Clone + Send + Serialize + for<'a> Deserialize<'a> {} +pub trait AggTimeWeightOutputAvg: BinAggedType + Serialize + for<'a> Deserialize<'a> {} -impl AggTimeWeightOutputAvg for u8 {} -impl AggTimeWeightOutputAvg for u16 {} -impl AggTimeWeightOutputAvg for u32 {} -impl AggTimeWeightOutputAvg for u64 {} -impl AggTimeWeightOutputAvg for i8 {} -impl AggTimeWeightOutputAvg for i16 {} -impl AggTimeWeightOutputAvg for i32 {} -impl AggTimeWeightOutputAvg for i64 {} +// impl AggTimeWeightOutputAvg for u8 {} +// impl AggTimeWeightOutputAvg for u16 {} +// impl AggTimeWeightOutputAvg for u32 {} +// impl AggTimeWeightOutputAvg for u64 {} +// impl AggTimeWeightOutputAvg for i8 {} +// impl AggTimeWeightOutputAvg for i16 {} +// impl AggTimeWeightOutputAvg for i32 {} +// impl AggTimeWeightOutputAvg for i64 {} impl AggTimeWeightOutputAvg for f32 {} impl AggTimeWeightOutputAvg for f64 {} -impl AggTimeWeightOutputAvg for EnumVariant {} -impl AggTimeWeightOutputAvg for String {} -impl AggTimeWeightOutputAvg for bool {} +// impl AggTimeWeightOutputAvg for EnumVariant {} +// impl AggTimeWeightOutputAvg for String {} +// impl AggTimeWeightOutputAvg for bool {} pub trait AggregatorTimeWeight: fmt::Debug + Send where @@ -35,7 +38,10 @@ where fn new() -> Self; fn ingest(&mut self, dt: DtNano, bl: DtNano, val: EVT); fn reset_for_new_bin(&mut self); - fn result_and_reset_for_new_bin(&mut self, filled_width_fraction: f32) -> EVT::AggTimeWeightOutputAvg; + fn result_and_reset_for_new_bin( + &mut self, + filled_width_fraction: f32, + ) -> EVT::AggTimeWeightOutputAvg; } #[derive(Debug)] @@ -71,9 +77,16 @@ where self.sum = 0.; } - fn result_and_reset_for_new_bin(&mut self, filled_width_fraction: f32) -> EVT::AggTimeWeightOutputAvg { + fn result_and_reset_for_new_bin( + &mut self, + filled_width_fraction: f32, + ) -> EVT::AggTimeWeightOutputAvg { let sum = self.sum.clone(); - trace_result!("result_and_reset_for_new_bin sum {} {}", sum, filled_width_fraction); + trace_result!( + "result_and_reset_for_new_bin sum {} {}", + sum, + filled_width_fraction + ); self.sum = 0.; sum / filled_width_fraction as f64 } @@ -96,7 +109,11 @@ impl AggregatorTimeWeight for AggregatorNumeric { fn result_and_reset_for_new_bin(&mut self, filled_width_fraction: f32) -> f32 { let sum = self.sum.clone() as f32; - trace_result!("result_and_reset_for_new_bin sum {} {}", sum, filled_width_fraction); + trace_result!( + "result_and_reset_for_new_bin sum {} {}", + sum, + filled_width_fraction + ); self.sum = 0.; sum / filled_width_fraction } @@ -158,7 +175,11 @@ impl AggregatorTimeWeight for AggregatorNumeric { fn result_and_reset_for_new_bin(&mut self, filled_width_fraction: f32) -> f64 { let sum = self.sum.clone(); - trace_result!("result_and_reset_for_new_bin sum {} {}", sum, filled_width_fraction); + trace_result!( + "result_and_reset_for_new_bin sum {} {}", + sum, + filled_width_fraction + ); self.sum = 0.; sum / filled_width_fraction as f64 } @@ -181,7 +202,11 @@ impl AggregatorTimeWeight for AggregatorNumeric { fn result_and_reset_for_new_bin(&mut self, filled_width_fraction: f32) -> f64 { let sum = self.sum.clone(); - trace_result!("result_and_reset_for_new_bin sum {} {}", sum, filled_width_fraction); + trace_result!( + "result_and_reset_for_new_bin sum {} {}", + sum, + filled_width_fraction + ); self.sum = 0.; sum / filled_width_fraction as f64 } @@ -204,7 +229,11 @@ impl AggregatorTimeWeight for AggregatorNumeric { fn result_and_reset_for_new_bin(&mut self, filled_width_fraction: f32) -> f64 { let sum = self.sum.clone(); - trace_result!("result_and_reset_for_new_bin sum {} {}", sum, filled_width_fraction); + trace_result!( + "result_and_reset_for_new_bin sum {} {}", + sum, + filled_width_fraction + ); self.sum = 0.; sum / filled_width_fraction as f64 } diff --git a/src/binning/aggregator/agg_bins.rs b/src/binning/aggregator/agg_bins.rs new file mode 100644 index 0000000..e69de29 diff --git a/src/binning/container.rs b/src/binning/container.rs new file mode 100644 index 0000000..e6596f8 --- /dev/null +++ b/src/binning/container.rs @@ -0,0 +1 @@ +pub mod bins; diff --git a/src/binning/container/bins.rs b/src/binning/container/bins.rs new file mode 100644 index 0000000..ae261ec --- /dev/null +++ b/src/binning/container/bins.rs @@ -0,0 +1,125 @@ +use crate::binning::container_bins::ContainerBins; +use crate::binning::container_events::EventValueType; +use crate::binning::container_events::PartialOrdEvtA; +use items_0::vecpreview::PreviewRange; +use netpod::DtNano; +use serde::Deserialize; +use serde::Serialize; +use std::collections::VecDeque; +use std::fmt; + +pub trait AggBinValTw: fmt::Debug + Send +where + BVT: BinAggedType, +{ + fn new() -> Self; + fn ingest(&mut self, bl: DtNano, val: BVT); + fn reset_for_new_bin(&mut self); + fn result_and_reset_for_new_bin(&mut self, filled_width_fraction: f32) -> BVT; +} + +pub trait BinAggedContainer: + fmt::Debug + Send + Clone + PreviewRange + Serialize + for<'a> Deserialize<'a> +where + BVT: BinAggedType, +{ + fn new() -> Self; + fn push_back(&mut self, val: BVT); + fn pop_front(&mut self) -> Option; + fn get_iter_ty_1<'a>(&'a self, pos: usize) -> Option>; +} + +pub trait BinAggedType: + fmt::Debug + Clone + PartialOrd + Send + 'static + Serialize + for<'a> Deserialize<'a> +{ + type Container: BinAggedContainer; + type AggregatorTimeWeight: AggBinValTw; + type IterTy1<'a>: fmt::Debug + Clone + PartialOrdEvtA + Into; +} + +impl PreviewRange for ContainerBins +where + EVT: EventValueType, + BVT: BinAggedType, +{ + fn preview<'a>(&'a self) -> Box { + todo!() + } +} + +impl BinAggedContainer for VecDeque +where + BVT: BinAggedType, +{ + fn new() -> Self { + todo!() + } + + fn push_back(&mut self, val: BVT) { + todo!() + } + + fn pop_front(&mut self) -> Option { + todo!() + } + + fn get_iter_ty_1<'a>(&'a self, pos: usize) -> Option<::IterTy1<'a>> { + todo!() + } +} + +impl BinAggedContainer for VecDeque +where + BVT: BinAggedType, +{ + fn new() -> Self { + todo!() + } + + fn push_back(&mut self, val: BVT) { + todo!() + } + + fn pop_front(&mut self) -> Option { + todo!() + } + + fn get_iter_ty_1<'a>(&'a self, pos: usize) -> Option<::IterTy1<'a>> { + todo!() + } +} + +impl BinAggedType for f32 { + type Container = VecDeque; + type AggregatorTimeWeight = (); + type IterTy1<'a> = Self; +} + +impl BinAggedType for f64 { + type Container = VecDeque; + type AggregatorTimeWeight = (); + type IterTy1<'a> = Self; +} + +impl AggBinValTw for () +where + T: BinAggedType, +{ + fn new() -> Self { + todo!() + } + + fn ingest(&mut self, bl: DtNano, val: T) { + todo!() + } + + fn reset_for_new_bin(&mut self) { + todo!() + } + + fn result_and_reset_for_new_bin(&mut self, filled_width_fraction: f32) -> T { + todo!() + } +} + +pub struct DummyPayload {} diff --git a/src/binning/container_bins.rs b/src/binning/container_bins.rs index ddffa93..f1be89a 100644 --- a/src/binning/container_bins.rs +++ b/src/binning/container_bins.rs @@ -1,3 +1,4 @@ +use super::container::bins::BinAggedType; use super::container_events::EventValueType; use crate::offsets::ts_offs_from_abs; use crate::offsets::ts_offs_from_abs_with_anchor; @@ -32,56 +33,38 @@ pub enum ContainerBinsError { Unordered, } -pub trait BinValueType: fmt::Debug + Clone + PartialOrd { - // type Container: Container; - // type AggregatorTimeWeight: AggregatorTimeWeight; - // type AggTimeWeightOutputAvg; - - // fn identity_sum() -> Self; - // fn add_weighted(&self, add: &Self, f: f32) -> Self; -} - #[derive(Debug, Clone)] -pub struct BinSingle { - pub ts1: TsNano, - pub ts2: TsNano, - pub cnt: u64, - pub min: EVT, - pub max: EVT, - pub avg: f32, - pub lst: EVT, - pub fnl: bool, -} - -#[derive(Debug, Clone)] -pub struct BinRef<'a, EVT> +pub struct BinRef<'a, EVT, BVT> where EVT: EventValueType, + BVT: BinAggedType, { pub ts1: TsNano, pub ts2: TsNano, pub cnt: u64, pub min: &'a EVT, pub max: &'a EVT, - pub avg: &'a EVT::AggTimeWeightOutputAvg, + pub agg: &'a BVT, pub lst: &'a EVT, pub fnl: bool, } -pub struct IterDebug<'a, EVT> +pub struct IterDebug<'a, EVT, BVT> where EVT: EventValueType, + BVT: BinAggedType, { - bins: &'a ContainerBins, + bins: &'a ContainerBins, ix: usize, len: usize, } -impl<'a, EVT> Iterator for IterDebug<'a, EVT> +impl<'a, EVT, BVT> Iterator for IterDebug<'a, EVT, BVT> where EVT: EventValueType, + BVT: BinAggedType, { - type Item = BinRef<'a, EVT>; + type Item = BinRef<'a, EVT, BVT>; fn next(&mut self) -> Option { if self.ix < self.bins.len() && self.ix < self.len { @@ -94,7 +77,7 @@ where cnt: b.cnts[i], min: &b.mins[i], max: &b.maxs[i], - avg: &b.avgs[i], + agg: &b.aggs[i], lst: &b.lsts[i], fnl: b.fnls[i], }; @@ -105,24 +88,62 @@ where } } -#[derive(Clone, Serialize, Deserialize)] -pub struct ContainerBins +#[derive(Clone)] +pub struct ContainerBins where EVT: EventValueType, + BVT: BinAggedType, { ts1s: VecDeque, ts2s: VecDeque, cnts: VecDeque, mins: VecDeque, maxs: VecDeque, - avgs: VecDeque, + aggs: VecDeque, lsts: VecDeque, fnls: VecDeque, } -impl ContainerBins +mod container_bins_serde { + use super::ContainerBins; + use super::EventValueType; + use crate::binning::container::bins::BinAggedType; + use serde::Deserialize; + use serde::Deserializer; + use serde::Serialize; + use serde::Serializer; + + impl Serialize for ContainerBins + where + EVT: EventValueType, + BVT: BinAggedType, + { + fn serialize(&self, ser: S) -> Result + where + S: Serializer, + { + todo!() + } + } + + impl<'de, EVT, BVT> Deserialize<'de> for ContainerBins + where + EVT: EventValueType, + BVT: BinAggedType, + { + fn deserialize(de: D) -> Result + where + D: Deserializer<'de>, + { + todo!() + } + } +} + +impl ContainerBins where EVT: EventValueType, + BVT: BinAggedType, { pub fn from_constituents( ts1s: VecDeque, @@ -130,7 +151,7 @@ where cnts: VecDeque, mins: VecDeque, maxs: VecDeque, - avgs: VecDeque, + aggs: VecDeque, lsts: VecDeque, fnls: VecDeque, ) -> Self { @@ -140,7 +161,7 @@ where cnts, mins, maxs, - avgs, + aggs, lsts, fnls, } @@ -157,7 +178,7 @@ where cnts: VecDeque::new(), mins: VecDeque::new(), maxs: VecDeque::new(), - avgs: VecDeque::new(), + aggs: VecDeque::new(), lsts: VecDeque::new(), fnls: VecDeque::new(), } @@ -215,8 +236,8 @@ where self.maxs.iter() } - pub fn avgs_iter(&self) -> std::collections::vec_deque::Iter { - self.avgs.iter() + pub fn aggs_iter(&self) -> std::collections::vec_deque::Iter { + self.aggs.iter() } pub fn lsts_iter(&self) -> std::collections::vec_deque::Iter { @@ -245,7 +266,7 @@ where >, std::collections::vec_deque::Iter, >, - std::collections::vec_deque::Iter, + std::collections::vec_deque::Iter, >, std::collections::vec_deque::Iter, >, @@ -256,7 +277,7 @@ where .zip(self.cnts_iter()) .zip(self.mins_iter()) .zip(self.maxs_iter()) - .zip(self.avgs_iter()) + .zip(self.aggs_iter()) .zip(self.lsts_iter()) .zip(self.fnls_iter()) } @@ -288,7 +309,7 @@ where cnt: u64, min: EVT, max: EVT, - avg: EVT::AggTimeWeightOutputAvg, + agg: BVT, lst: EVT, fnl: bool, ) { @@ -297,12 +318,12 @@ where self.cnts.push_back(cnt); self.mins.push_back(min); self.maxs.push_back(max); - self.avgs.push_back(avg); + self.aggs.push_back(agg); self.lsts.push_back(lst); self.fnls.push_back(fnl); } - pub fn iter_debug(&self) -> IterDebug { + pub fn iter_debug(&self) -> IterDebug { IterDebug { bins: self, ix: 0, @@ -311,64 +332,70 @@ where } } -impl fmt::Debug for ContainerBins +impl fmt::Debug for ContainerBins where EVT: EventValueType, + BVT: BinAggedType, { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { let self_name = any::type_name::(); write!( fmt, - "{self_name} {{ len: {:?}, ts1s: {:?}, ts2s: {:?}, cnts: {:?}, avgs {:?}, fnls {:?} }}", + "{self_name} {{ len: {:?}, ts1s: {:?}, ts2s: {:?}, cnts: {:?}, aggs {:?}, fnls {:?} }}", self.len(), VecPreview::new(&self.ts1s), VecPreview::new(&self.ts2s), VecPreview::new(&self.cnts), - VecPreview::new(&self.avgs), + VecPreview::new(&self.aggs), VecPreview::new(&self.fnls), ) } } -impl fmt::Display for ContainerBins +impl fmt::Display for ContainerBins where EVT: EventValueType, + BVT: BinAggedType, { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { fmt::Debug::fmt(self, fmt) } } -impl AsAnyMut for ContainerBins +impl AsAnyMut for ContainerBins where EVT: EventValueType, + BVT: BinAggedType, { fn as_any_mut(&mut self) -> &mut dyn any::Any { self } } -impl WithLen for ContainerBins +impl WithLen for ContainerBins where EVT: EventValueType, + BVT: BinAggedType, { fn len(&self) -> usize { Self::len(self) } } -impl TypeName for ContainerBins +impl TypeName for ContainerBins where EVT: EventValueType, + BVT: BinAggedType, { fn type_name(&self) -> String { - BinningggContainerBinsDyn::type_name(self).into() + Self::type_name().into() } } -impl AsAnyRef for ContainerBins +impl AsAnyRef for ContainerBins where EVT: EventValueType, + BVT: BinAggedType, { fn as_any_ref(&self) -> &dyn any::Any { self @@ -376,43 +403,48 @@ where } #[derive(Debug)] -pub struct ContainerBinsCollectorOutput +pub struct ContainerBinsCollectorOutput where EVT: EventValueType, + BVT: BinAggedType, { - bins: ContainerBins, + bins: ContainerBins, } -impl TypeName for ContainerBinsCollectorOutput +impl TypeName for ContainerBinsCollectorOutput where EVT: EventValueType, + BVT: BinAggedType, { fn type_name(&self) -> String { any::type_name::().into() } } -impl AsAnyRef for ContainerBinsCollectorOutput +impl AsAnyRef for ContainerBinsCollectorOutput where EVT: EventValueType, + BVT: BinAggedType, { fn as_any_ref(&self) -> &dyn any::Any { self } } -impl AsAnyMut for ContainerBinsCollectorOutput +impl AsAnyMut for ContainerBinsCollectorOutput where EVT: EventValueType, + BVT: BinAggedType, { fn as_any_mut(&mut self) -> &mut dyn any::Any { self } } -impl WithLen for ContainerBinsCollectorOutput +impl WithLen for ContainerBinsCollectorOutput where EVT: EventValueType, + BVT: BinAggedType, { fn len(&self) -> usize { self.bins.len() @@ -420,9 +452,10 @@ where } #[derive(Debug, Serialize)] -struct ContainerBinsCollectorOutputUser +struct ContainerBinsCollectorOutputUser where EVT: EventValueType, + BVT: BinAggedType, { #[serde(rename = "tsAnchor")] ts_anchor_sec: u64, @@ -441,7 +474,7 @@ where #[serde(rename = "maxs")] maxs: VecDeque, #[serde(rename = "avgs")] - avgs: VecDeque, + aggs: VecDeque, // #[serde(rename = "rangeFinal", default, skip_serializing_if = "is_false")] // range_final: bool, // #[serde(rename = "timedOut", default, skip_serializing_if = "is_false")] @@ -454,9 +487,10 @@ where // finished_at: Option, } -impl ToJsonResult for ContainerBinsCollectorOutput +impl ToJsonResult for ContainerBinsCollectorOutput where EVT: EventValueType, + BVT: BinAggedType, { fn to_json_value(&self) -> Result { let bins = &self.bins; @@ -467,8 +501,8 @@ where let counts = bins.cnts.clone(); let mins = bins.mins.clone(); let maxs = bins.maxs.clone(); - let avgs = bins.avgs.clone(); - let val = ContainerBinsCollectorOutputUser:: { + let aggs = bins.aggs.clone(); + let val = ContainerBinsCollectorOutputUser:: { ts_anchor_sec: ts_anch, ts1_off_ms: ts1ms, ts2_off_ms: ts2ms, @@ -477,38 +511,51 @@ where counts, mins, maxs, - avgs, + aggs, }; serde_json::to_value(&val) } } -impl CollectedDyn for ContainerBinsCollectorOutput where EVT: EventValueType {} - -#[derive(Debug)] -pub struct ContainerBinsCollector +impl CollectedDyn for ContainerBinsCollectorOutput where EVT: EventValueType, + BVT: BinAggedType, { - bins: ContainerBins, +} + +#[derive(Debug)] +pub struct ContainerBinsCollector +where + EVT: EventValueType, + BVT: BinAggedType, +{ + bins: ContainerBins, timed_out: bool, range_final: bool, } -impl ContainerBinsCollector where EVT: EventValueType {} - -impl WithLen for ContainerBinsCollector +impl ContainerBinsCollector where EVT: EventValueType, + BVT: BinAggedType, +{ +} + +impl WithLen for ContainerBinsCollector +where + EVT: EventValueType, + BVT: BinAggedType, { fn len(&self) -> usize { self.bins.len() } } -impl items_0::container::ByteEstimate for ContainerBinsCollector +impl items_0::container::ByteEstimate for ContainerBinsCollector where EVT: EventValueType, + BVT: BinAggedType, { fn byte_estimate(&self) -> u64 { // TODO need better estimate @@ -516,12 +563,13 @@ where } } -impl items_0::collect_s::CollectorDyn for ContainerBinsCollector +impl items_0::collect_s::CollectorDyn for ContainerBinsCollector where EVT: EventValueType, + BVT: BinAggedType, { fn ingest(&mut self, src: &mut dyn CollectableDyn) { - if let Some(src) = src.as_any_mut().downcast_mut::>() { + if let Some(src) = src.as_any_mut().downcast_mut::>() { src.drain_into(&mut self.bins, 0..src.len()); } else { let srcn = src.type_name(); @@ -553,12 +601,13 @@ where } } -impl CollectableDyn for ContainerBins +impl CollectableDyn for ContainerBins where EVT: EventValueType, + BVT: BinAggedType, { fn new_collector(&self) -> Box { - let ret = ContainerBinsCollector:: { + let ret = ContainerBinsCollector:: { bins: ContainerBins::new(), timed_out: false, range_final: false, @@ -567,9 +616,10 @@ where } } -impl BinningggContainerBinsDyn for ContainerBins +impl BinningggContainerBinsDyn for ContainerBins where EVT: EventValueType, + BVT: BinAggedType, { fn type_name(&self) -> &'static str { any::type_name::() @@ -604,7 +654,7 @@ where dst.cnts.extend(self.cnts.drain(range.clone())); dst.mins.extend(self.mins.drain(range.clone())); dst.maxs.extend(self.maxs.drain(range.clone())); - dst.avgs.extend(self.avgs.drain(range.clone())); + dst.aggs.extend(self.aggs.drain(range.clone())); dst.lsts.extend(self.lsts.drain(range.clone())); dst.fnls.extend(self.fnls.drain(range.clone())); } else { @@ -617,7 +667,9 @@ where &self, range: netpod::BinnedRange, ) -> Box { - let ret = super::timeweight::timeweight_bins::BinnedBinsTimeweight::::new(range); + let ret = super::timeweight::timeweight_bins::BinnedBinsTimeweight::< + EVT::AggTimeWeightOutputAvg, + >::new(range); Box::new(ret) } @@ -626,32 +678,35 @@ where .mins .iter_mut() .zip(self.maxs.iter_mut()) - .zip(self.avgs.iter_mut()) + .zip(self.aggs.iter_mut()) {} } } -pub struct ContainerBinsTakeUpTo<'a, EVT> +pub struct ContainerBinsTakeUpTo<'a, EVT, BVT> where EVT: EventValueType, + BVT: BinAggedType, { - evs: &'a mut ContainerBins, + evs: &'a mut ContainerBins, len: usize, } -impl<'a, EVT> ContainerBinsTakeUpTo<'a, EVT> +impl<'a, EVT, BVT> ContainerBinsTakeUpTo<'a, EVT, BVT> where EVT: EventValueType, + BVT: BinAggedType, { - pub fn new(evs: &'a mut ContainerBins, len: usize) -> Self { + pub fn new(evs: &'a mut ContainerBins, len: usize) -> Self { let len = len.min(evs.len()); Self { evs, len } } } -impl<'a, EVT> ContainerBinsTakeUpTo<'a, EVT> +impl<'a, EVT, BVT> ContainerBinsTakeUpTo<'a, EVT, BVT> where EVT: EventValueType, + BVT: BinAggedType, { pub fn ts1_first(&self) -> Option { self.evs.ts1_first() diff --git a/src/binning/container_events.rs b/src/binning/container_events.rs index af64518..9b24664 100644 --- a/src/binning/container_events.rs +++ b/src/binning/container_events.rs @@ -24,8 +24,8 @@ macro_rules! trace_init { ($($arg:tt)*) => ( if true { trace!($($arg)*); }) } #[cstm(name = "ValueContainerError")] pub enum ValueContainerError {} -pub trait Container: - fmt::Debug + Send + Clone + PreviewRange + Serialize + for<'a> Deserialize<'a> +// + Serialize + for<'a> Deserialize<'a> +pub trait Container: fmt::Debug + Send + Clone + PreviewRange where EVT: EventValueType, { @@ -39,7 +39,9 @@ pub trait PartialOrdEvtA { fn cmp_a(&self, other: &EVT) -> Option; } -pub trait EventValueType: fmt::Debug + Clone + PartialOrd + Send + 'static + Serialize { +pub trait EventValueType: + fmt::Debug + Clone + PartialOrd + Send + 'static + Serialize + for<'a> Deserialize<'a> +{ type Container: Container; type AggregatorTimeWeight: AggregatorTimeWeight; type AggTimeWeightOutputAvg: AggTimeWeightOutputAvg; @@ -212,7 +214,7 @@ pub enum EventsContainerError { Unordered, } -#[derive(Clone, Serialize, Deserialize)] +#[derive(Clone)] pub struct ContainerEvents where EVT: EventValueType, @@ -221,6 +223,39 @@ where vals: ::Container, } +mod container_events_serde { + use super::ContainerEvents; + use super::EventValueType; + use serde::Deserialize; + use serde::Deserializer; + use serde::Serialize; + use serde::Serializer; + + impl Serialize for ContainerEvents + where + EVT: EventValueType, + { + fn serialize(&self, ser: S) -> Result + where + S: Serializer, + { + todo!() + } + } + + impl<'de, EVT> Deserialize<'de> for ContainerEvents + where + EVT: EventValueType, + { + fn deserialize(de: D) -> Result + where + D: Deserializer<'de>, + { + todo!() + } + } +} + impl ContainerEvents where EVT: EventValueType, diff --git a/src/binning/test/bins_gen.rs b/src/binning/test/bins_gen.rs index 253d357..ec12c56 100644 --- a/src/binning/test/bins_gen.rs +++ b/src/binning/test/bins_gen.rs @@ -18,7 +18,7 @@ pub(super) fn bins_gen_dim0_f32_v00( ) -> impl Stream>> { futures_util::stream::iter((0usize..1000).into_iter()) .map(|x| { - let c = ContainerBins::::new(); + let c = ContainerBins::::new(); Box::new(c) as Box }) .map(|x| Ok(StreamItem::DataItem(RangeCompletableItem::Data(x)))) diff --git a/src/binning/test/compare.rs b/src/binning/test/compare.rs index 07b960e..8c4227d 100644 --- a/src/binning/test/compare.rs +++ b/src/binning/test/compare.rs @@ -7,7 +7,7 @@ pub enum Error { AssertMsg(String), } -trait IntoVecDequeU64 { +pub(super) trait IntoVecDequeU64 { fn into_vec_deque_u64(self) -> VecDeque; } @@ -18,7 +18,8 @@ impl IntoVecDequeU64 for &str { .collect() } } -trait IntoVecDequeF32 { + +pub(super) trait IntoVecDequeF32 { fn into_vec_deque_f32(self) -> VecDeque; } @@ -88,7 +89,10 @@ fn exp_f32<'a>( Ok(()) } -pub(super) fn exp_cnts(bins: &ContainerBins, exps: impl IntoVecDequeU64) -> Result<(), Error> { +pub(super) fn exp_cnts( + bins: &ContainerBins, + exps: impl IntoVecDequeU64, +) -> Result<(), Error> { exp_u64( bins.cnts_iter(), exps.into_vec_deque_u64().iter(), @@ -96,7 +100,10 @@ pub(super) fn exp_cnts(bins: &ContainerBins, exps: impl IntoVecDequeU64) -> ) } -pub(super) fn exp_mins(bins: &ContainerBins, exps: impl IntoVecDequeF32) -> Result<(), Error> { +pub(super) fn exp_mins( + bins: &ContainerBins, + exps: impl IntoVecDequeF32, +) -> Result<(), Error> { exp_f32( bins.mins_iter(), exps.into_vec_deque_f32().iter(), @@ -104,7 +111,10 @@ pub(super) fn exp_mins(bins: &ContainerBins, exps: impl IntoVecDequeF32) -> ) } -pub(super) fn exp_maxs(bins: &ContainerBins, exps: impl IntoVecDequeF32) -> Result<(), Error> { +pub(super) fn exp_maxs( + bins: &ContainerBins, + exps: impl IntoVecDequeF32, +) -> Result<(), Error> { exp_f32( bins.maxs_iter(), exps.into_vec_deque_f32().iter(), @@ -112,7 +122,10 @@ pub(super) fn exp_maxs(bins: &ContainerBins, exps: impl IntoVecDequeF32) -> ) } -pub(super) fn exp_avgs(bins: &ContainerBins, exps: impl IntoVecDequeF32) -> Result<(), Error> { +pub(super) fn exp_avgs( + bins: &ContainerBins, + exps: impl IntoVecDequeF32, +) -> Result<(), Error> { let exps = exps.into_vec_deque_f32(); let mut it_a = bins.iter_debug(); let mut it_b = exps.iter(); @@ -124,7 +137,7 @@ pub(super) fn exp_avgs(bins: &ContainerBins, exps: impl IntoVecDequeF32) -> break; } if let (Some(a), Some(&exp)) = (a, b) { - let val = *a.avg as f32; + let val = *a.agg as f32; if netpod::f32_close(val, exp) == false { return Err(Error::AssertMsg(format!( "exp_avgs val {} exp {} i {}", diff --git a/src/binning/timeweight/timeweight_bins.rs b/src/binning/timeweight/timeweight_bins.rs index ac14181..fe01278 100644 --- a/src/binning/timeweight/timeweight_bins.rs +++ b/src/binning/timeweight/timeweight_bins.rs @@ -1,3 +1,4 @@ +use crate::binning::container::bins::BinAggedType; use crate::binning::container_bins::ContainerBins; use crate::binning::container_events::EventValueType; use crate::log::*; @@ -6,23 +7,26 @@ use items_0::timebin::BinningggError; use items_0::timebin::BinsBoxed; use netpod::BinnedRange; use netpod::TsNano; +use std::marker::PhantomData; #[allow(unused)] macro_rules! trace_init { ($($arg:tt)*) => ( if true { trace!($($arg)*); }) } #[derive(Debug)] -pub struct BinnedBinsTimeweight +pub struct BinnedBinsTimeweight where - EVT: EventValueType, + BVT: BinAggedType, { range: BinnedRange, - out: ContainerBins, + // out: ContainerBins, produce_cnt_zero: bool, + // agg: , + t1: PhantomData, } -impl BinnedBinsTimeweight +impl BinnedBinsTimeweight where - EVT: EventValueType, + BVT: BinAggedType, { pub fn new(range: BinnedRange) -> Self { trace_init!("BinnedBinsTimeweight::new {}", range); @@ -31,15 +35,17 @@ where let active_len = active_end.delta(active_beg); Self { range, - out: ContainerBins::new(), + // out: ContainerBins::new(), produce_cnt_zero: true, + // agg: todo!(), + t1: PhantomData, } } } -impl BinnedBinsTimeweightTrait for BinnedBinsTimeweight +impl BinnedBinsTimeweightTrait for BinnedBinsTimeweight where - EVT: EventValueType, + BVT: BinAggedType, { fn ingest(&mut self, evs: &BinsBoxed) -> Result<(), BinningggError> { todo!() diff --git a/src/binning/timeweight/timeweight_events.rs b/src/binning/timeweight/timeweight_events.rs index a55aef8..9bb1edd 100644 --- a/src/binning/timeweight/timeweight_events.rs +++ b/src/binning/timeweight/timeweight_events.rs @@ -254,6 +254,7 @@ where impl InnerA where EVT: EventValueType, + // BVT: BinAggedType, { fn apply_min_max(ev: &EventSingleRef, minmax: &mut MinMax) { if let Some(std::cmp::Ordering::Less) = ev.val.cmp_a(&minmax.0.val) { @@ -262,12 +263,6 @@ where if let Some(std::cmp::Ordering::Greater) = ev.val.cmp_a(&minmax.1.val) { minmax.1 = ev.into(); } - // if ev.val < minmax.0.val { - // minmax.0 = ev.into(); - // } - // if ev.val > minmax.1.val { - // minmax.1 = ev.into(); - // } } fn apply_lst_after_event_handled(ev: EventSingleRef, lst: LstMut) { @@ -367,7 +362,7 @@ where &mut self, lst: LstRef, range_final: bool, - out: &mut ContainerBins, + out: &mut ContainerBins, ) { let selfname = "push_out_and_reset"; // TODO there is not always good enough input to produce a meaningful bin. @@ -403,7 +398,7 @@ where lst: Option>, range: BinnedRange, inner_a: InnerA, - out: ContainerBins, + out: ContainerBins, produce_cnt_zero: bool, } @@ -693,7 +688,7 @@ where self.out.len() } - pub fn output(&mut self) -> ContainerBins { + pub fn output(&mut self) -> ContainerBins { mem::replace(&mut self.out, ContainerBins::new()) } }