diff --git a/crates/items_2/src/binning.rs b/crates/items_2/src/binning.rs index 62dcd79..8924855 100644 --- a/crates/items_2/src/binning.rs +++ b/crates/items_2/src/binning.rs @@ -1,6 +1,7 @@ pub mod aggregator; pub mod container_events; pub mod timeweight; +pub mod valuetype; #[cfg(test)] mod test; diff --git a/crates/items_2/src/binning/aggregator.rs b/crates/items_2/src/binning/aggregator.rs index 638ef60..c85cf3c 100644 --- a/crates/items_2/src/binning/aggregator.rs +++ b/crates/items_2/src/binning/aggregator.rs @@ -1,19 +1,70 @@ -use std::marker::PhantomData; +use super::container_events::EventValueType; +use netpod::DtNano; -pub trait AggregatorTimeWeight {} - -pub struct AggregatorNumeric { - _t0: PhantomData, +pub trait AggregatorTimeWeight +where + EVT: EventValueType, +{ + fn new() -> Self; + fn reset_for_new_bin(&mut self); + fn ingest(&mut self, dt: DtNano, bl: DtNano, val: EVT); } -trait AggWithSame {} +pub struct AggregatorNumeric { + sum: EVT, +} + +trait AggWithSame: EventValueType {} impl AggWithSame for f64 {} -impl AggregatorTimeWeight for AggregatorNumeric where T: AggWithSame {} +impl AggregatorTimeWeight for AggregatorNumeric +where + EVT: AggWithSame, +{ + fn new() -> Self { + todo!() + } -impl AggregatorTimeWeight for AggregatorNumeric {} + fn reset_for_new_bin(&mut self) { + todo!() + } -impl AggregatorTimeWeight for AggregatorNumeric {} + fn ingest(&mut self, dt: DtNano, bl: DtNano, val: EVT) { + todo!() + } +} + +impl AggregatorTimeWeight for AggregatorNumeric { + fn new() -> Self { + Self { + sum: f32::sum_identity(), + } + } + + fn reset_for_new_bin(&mut self) { + self.sum = f32::sum_identity(); + } + + fn ingest(&mut self, dt: DtNano, bl: DtNano, val: f32) { + let f = dt.ns() as f32 / bl.ns() as f32; + eprintln!("INGEST {} {}", f, val); + self.sum += f * val; + } +} + +impl AggregatorTimeWeight for AggregatorNumeric { + fn new() -> Self { + todo!() + } + + fn reset_for_new_bin(&mut self) { + todo!() + } + + fn ingest(&mut self, dt: DtNano, bl: DtNano, val: u64) { + todo!() + } +} // TODO do enum right from begin, using a SOA enum container. diff --git a/crates/items_2/src/binning/container_events.rs b/crates/items_2/src/binning/container_events.rs index 09e0a83..bc137fe 100644 --- a/crates/items_2/src/binning/container_events.rs +++ b/crates/items_2/src/binning/container_events.rs @@ -22,30 +22,59 @@ pub enum ValueContainerError {} pub trait Container: fmt::Debug + Clone + PreviewRange + Serialize + for<'a> Deserialize<'a> { fn new() -> Self; // fn verify(&self) -> Result<(), ValueContainerError>; + fn push_back(&mut self, val: EVT); fn pop_front(&mut self) -> Option; } pub trait EventValueType: fmt::Debug + Clone + PartialOrd { type Container: Container; - type AggregatorTimeWeight: AggregatorTimeWeight; + type AggregatorTimeWeight: AggregatorTimeWeight; + + fn sum_identity() -> Self; } -impl Container for VecDeque +impl Container for VecDeque where - T: EventValueType + Serialize + for<'a> Deserialize<'a>, + EVT: EventValueType + Serialize + for<'a> Deserialize<'a>, { fn new() -> Self { VecDeque::new() } - fn pop_front(&mut self) -> Option { - todo!() + fn push_back(&mut self, val: EVT) { + self.push_back(val); + } + + fn pop_front(&mut self) -> Option { + self.pop_front() } } impl EventValueType for f32 { type Container = VecDeque; type AggregatorTimeWeight = AggregatorNumeric; + + fn sum_identity() -> Self { + 0. + } +} + +impl EventValueType for f64 { + type Container = VecDeque; + type AggregatorTimeWeight = AggregatorNumeric; + + fn sum_identity() -> Self { + 0. + } +} + +impl EventValueType for u64 { + type Container = VecDeque; + type AggregatorTimeWeight = AggregatorNumeric; + + fn sum_identity() -> Self { + 0 + } } #[derive(Debug, Clone)] @@ -116,6 +145,11 @@ where None } } + + pub fn push_back(&mut self, ts: TsNano, val: EVT) { + self.tss.push_back(ts); + self.vals.push_back(val); + } } impl fmt::Debug for ContainerEvents @@ -133,3 +167,51 @@ where ) } } + +pub struct ContainerEventsTakeUpTo<'a, EVT> +where + EVT: EventValueType, +{ + evs: &'a mut ContainerEvents, + len: usize, +} + +impl<'a, EVT> ContainerEventsTakeUpTo<'a, EVT> +where + EVT: EventValueType, +{ + pub fn new(evs: &'a mut ContainerEvents, len: usize) -> Self { + let len = len.min(evs.len()); + Self { evs, len } + } +} + +impl<'a, EVT> ContainerEventsTakeUpTo<'a, EVT> +where + EVT: EventValueType, +{ + pub fn ts_first(&self) -> Option { + self.evs.ts_first() + } + + pub fn ts_last(&self) -> Option { + self.evs.ts_last() + } + + pub fn len(&self) -> usize { + self.len + } + + pub fn event_next(&mut self) -> Option> { + if self.len != 0 { + if let Some(ev) = self.evs.event_next() { + self.len -= 1; + Some(ev) + } else { + None + } + } else { + None + } + } +} diff --git a/crates/items_2/src/binning/test.rs b/crates/items_2/src/binning/test.rs index 1a66242..f0daf3e 100644 --- a/crates/items_2/src/binning/test.rs +++ b/crates/items_2/src/binning/test.rs @@ -1,3 +1,4 @@ +mod events00; use super::container_events::ContainerEvents; use super::___; use netpod::log::*; diff --git a/crates/items_2/src/binning/test/events00.rs b/crates/items_2/src/binning/test/events00.rs new file mode 100644 index 0000000..e5d29e7 --- /dev/null +++ b/crates/items_2/src/binning/test/events00.rs @@ -0,0 +1,70 @@ +use crate::binning::container_events::ContainerEvents; +use crate::binning::timeweight::timeweight_events::BinnedEventsTimeweight; +use err::thiserror; +use err::ThisError; +use netpod::range::evrange::NanoRange; +use netpod::BinnedRange; +use netpod::DtMs; +use netpod::EnumVariant; +use netpod::TsNano; + +#[derive(Debug, ThisError)] +#[cstm(name = "Error")] +enum Error { + Timeweight(#[from] crate::binning::timeweight::timeweight_events::Error), +} + +#[test] +fn test_bin_events_f32_simple_00() -> Result<(), Error> { + let beg = TsNano::from_ms(100); + let end = TsNano::from_ms(120); + let nano_range = NanoRange { + beg: beg.ns(), + end: end.ns(), + }; + let range = BinnedRange::from_nano_range(nano_range, DtMs::from_ms_u64(10)); + let mut binner = BinnedEventsTimeweight::new(range); + let mut evs = ContainerEvents::::new(); + evs.push_back(TsNano::from_ms(103), 2.0); + binner.ingest(evs)?; + Ok(()) +} + +#[test] +fn test_bin_events_f32_simple_01() -> Result<(), Error> { + let beg = TsNano::from_ms(100); + let end = TsNano::from_ms(120); + let nano_range = NanoRange { + beg: beg.ns(), + end: end.ns(), + }; + let range = BinnedRange::from_nano_range(nano_range, DtMs::from_ms_u64(10)); + let mut binner = BinnedEventsTimeweight::new(range); + let mut evs = ContainerEvents::::new(); + evs.push_back(TsNano::from_ms(103), 2.0); + evs.push_back(TsNano::from_ms(104), 2.4); + binner.ingest(evs)?; + let mut evs = ContainerEvents::::new(); + evs.push_back(TsNano::from_ms(111), 1.0); + evs.push_back(TsNano::from_ms(112), 1.2); + evs.push_back(TsNano::from_ms(113), 1.4); + binner.ingest(evs)?; + Ok(()) +} + +#[test] +fn test_bin_events_enum_simple_00() -> Result<(), Error> { + let beg = TsNano::from_ms(100); + let end = TsNano::from_ms(120); + let nano_range = NanoRange { + beg: beg.ns(), + end: end.ns(), + }; + let range = BinnedRange::from_nano_range(nano_range, DtMs::from_ms_u64(10)); + let mut binner = BinnedEventsTimeweight::new(range); + let mut evs = ContainerEvents::new(); + evs.push_back(TsNano::from_ms(103), EnumVariant::new(1, "one")); + evs.push_back(TsNano::from_ms(104), EnumVariant::new(2, "two")); + binner.ingest(evs)?; + Ok(()) +} diff --git a/crates/items_2/src/binning/timeweight/timeweight_events.rs b/crates/items_2/src/binning/timeweight/timeweight_events.rs index fd47c42..1116fee 100644 --- a/crates/items_2/src/binning/timeweight/timeweight_events.rs +++ b/crates/items_2/src/binning/timeweight/timeweight_events.rs @@ -1,12 +1,15 @@ use super::super::container_events::EventValueType; use super::___; +use crate::binning::aggregator::AggregatorTimeWeight; use crate::binning::container_events::ContainerEvents; +use crate::binning::container_events::ContainerEventsTakeUpTo; use crate::binning::container_events::EventSingle; use err::thiserror; use err::ThisError; use futures_util::Stream; use netpod::log::*; use netpod::BinnedRange; +use netpod::DtNano; use netpod::TsNano; use std::marker::PhantomData; use std::pin::Pin; @@ -14,7 +17,34 @@ use std::task::Context; use std::task::Poll; #[allow(unused)] -macro_rules! trace_init { ($($arg:tt)*) => ( if true { trace!($($arg)*); }) } +macro_rules! trace_ { ($($arg:tt)*) => ( if true { eprintln!($($arg)*); }) } + +#[allow(unused)] +macro_rules! trace_init { ($($arg:tt)*) => ( if true { trace_!($($arg)*); }) } + +#[allow(unused)] +macro_rules! trace_cycle { ($($arg:tt)*) => ( if true { trace_!($($arg)*); }) } + +#[allow(unused)] +macro_rules! trace_event_next { ($($arg:tt)*) => ( if true { trace_!($($arg)*); }) } + +#[allow(unused)] +macro_rules! trace_ingest_init_lst { ($($arg:tt)*) => ( if true { trace_!($($arg)*); }) } + +#[allow(unused)] +macro_rules! trace_ingest_minmax { ($($arg:tt)*) => ( if true { trace_!($($arg)*); }) } + +#[allow(unused)] +macro_rules! trace_ingest_event { ($($arg:tt)*) => ( if true { trace_!($($arg)*); }) } + +#[allow(unused)] +macro_rules! trace_ingest_firsts { ($($arg:tt)*) => ( if true { trace_!($($arg)*); }) } + +#[allow(unused)] +macro_rules! trace_ingest_finish_bin { ($($arg:tt)*) => ( if true { trace_!($($arg)*); }) } + +#[allow(unused)] +macro_rules! trace_ingest_container { ($($arg:tt)*) => ( if true { trace_!($($arg)*); }) } const DEBUG_CHECKS: bool = true; @@ -23,13 +53,14 @@ const DEBUG_CHECKS: bool = true; pub enum Error { BadContainer(#[from] super::super::container_events::EventsContainerError), Unordered, + EventAfterRange, NoLstAfterFirst, EmptyContainerInnerHandler, NoLstButMinMax, WithLstButEventBeforeRange, WithMinMaxButEventBeforeRange, NoMinMaxAfterInit, - ExpectEventInInnerVolumeRange, + ExpectEventWithinRange, } type MinMax = (EventSingle, EventSingle); @@ -38,126 +69,150 @@ struct LstRef<'a, EVT>(&'a EventSingle); struct LstMut<'a, EVT>(&'a mut EventSingle); -struct InnerB { - range: BinnedRange, +struct InnerB +where + EVT: EventValueType, +{ cnt: u64, - _t1: PhantomData, + active_beg: TsNano, + active_end: TsNano, + active_len: DtNano, + filled_until: TsNano, + agg: ::AggregatorTimeWeight, } impl InnerB where EVT: EventValueType, { + // NOTE that this is also used during bin-cycle. + fn ingest_event_with_lst_gt_range_beg_agg(&mut self, ev: EventSingle, lst: LstRef) { + trace_ingest_event!("ingest_event_with_lst_gt_range_beg_agg {:?}", ev); + if DEBUG_CHECKS { + if ev.ts <= self.active_beg { + panic!("should never get here"); + } + if ev.ts >= self.active_end { + panic!("should never get here"); + } + } + let dt = ev.ts.delta(self.filled_until); + // TODO can the caller already take the value and replace it afterwards with the current value? + // This fn could swap the value in lst and directly use it. + // This would require that any call path does not mess with lst. + // NOTE that this fn is also used during bin-cycle. + self.agg.ingest(dt, self.active_len, lst.0.val.clone()); + self.filled_until = ev.ts; + } + + fn ingest_event_with_lst_gt_range_beg_2(&mut self, ev: EventSingle, lst: LstMut) -> Result<(), Error> { + trace_ingest_event!("ingest_event_with_lst_gt_range_beg_2"); + self.ingest_event_with_lst_gt_range_beg_agg(ev.clone(), LstRef(lst.0)); + InnerA::apply_lst_after_event_handled(ev, lst); + Ok(()) + } + fn ingest_event_with_lst_gt_range_beg( &mut self, ev: EventSingle, lst: LstMut, minmax: &mut MinMax, ) -> Result<(), Error> { - if DEBUG_CHECKS { - if ev.ts <= self.range.nano_beg() { - return Err(Error::ExpectEventInInnerVolumeRange); + trace_ingest_event!("ingest_event_with_lst_gt_range_beg"); + // TODO if the event is exactly on the current bin first edge, then there is no contribution to the avg yet + // and I must initialize the min/max with the current event. + InnerA::apply_min_max(&ev, minmax); + self.ingest_event_with_lst_gt_range_beg_2(ev.clone(), lst)?; + Ok(()) + } + + fn ingest_event_with_lst_eq_range_beg( + &mut self, + ev: EventSingle, + lst: LstMut, + minmax: &mut MinMax, + ) -> Result<(), Error> { + trace_ingest_event!("ingest_event_with_lst_eq_range_beg"); + // TODO if the event is exactly on the current bin first edge, then there is no contribution to the avg yet + // and I must initialize the min/max with the current event. + InnerA::apply_min_max(&ev, minmax); + InnerA::apply_lst_after_event_handled(ev, lst); + Ok(()) + } + + fn ingest_with_lst_gt_range_beg( + &mut self, + mut evs: ContainerEventsTakeUpTo, + lst: LstMut, + minmax: &mut MinMax, + ) -> Result<(), Error> { + trace_ingest_event!("ingest_with_lst_gt_range_beg"); + while let Some(ev) = evs.event_next() { + trace_event_next!("ingest_with_lst_ge_range_beg {:?}", ev); + if ev.ts <= self.active_beg { + panic!("should never get here"); } + if ev.ts >= self.active_end { + panic!("should never get here"); + } + self.ingest_event_with_lst_gt_range_beg(ev.clone(), LstMut(lst.0), minmax)?; + self.cnt += 1; } - // Aggregator: - // Must handle min, max, avg, var. - // min and max is actually tricky and can not be done in one go with lst: - // The current read procedure allows that the event stream contains a one-before event even though the - // first in-range event is exactly on range-beg. In that case the min/max given by the one-before is - // irrelevant for the bin. - - // fn apply_event_time_weight(&mut self, px: u64) { - // if let Some((_, _, v)) = self.minmaxlst.as_ref() { - // trace_ingest!("apply_event_time_weight with v {v:?}"); - // let vf = v.as_prim_f32_b(); - // let v2 = v.clone(); - // self.apply_min_max_lst(v2); - // self.sumc += 1; - // let w = (px - self.int_ts) as f32 * 1e-9; - // if false { - // trace!( - // "int_ts {:10} px {:8} w {:8.1} vf {:8.1} sum {:8.1}", - // self.int_ts / MS, - // px / MS, - // w, - // vf, - // self.sum - // ); - // } - // if vf.is_nan() { - // } else { - // self.sum += vf * w; - // } - // self.int_ts = px; - // } else { - // debug_ingest!("apply_event_time_weight NO VALUE"); - // } - // } - - todo!() + Ok(()) } fn ingest_with_lst_ge_range_beg( &mut self, - mut evs: ContainerEvents, - lst: &mut EventSingle, + mut evs: ContainerEventsTakeUpTo, + lst: LstMut, minmax: &mut MinMax, ) -> Result<(), Error> { + trace_ingest_event!("ingest_with_lst_ge_range_beg"); while let Some(ev) = evs.event_next() { - if true { - // How to handle transition to the next bin? - // What does self.range mean, the full requested range of all bins or the current range? - // Do I maybe need both? - // How to handle to not emit bins until at least some partially filled bin is encountered? - // How to implement the bin cycle logic in clean way? - // How to emit things? Do to ship results to the caller? - todo!("must check if ev is already after range."); + trace_event_next!("ingest_with_lst_ge_range_beg {:?}", ev); + if ev.ts < self.active_beg { + panic!("should never get here"); } - - if ev.ts >= self.range.nano_end() { - todo!("make sure that the transition to the next bin (if we want any next bin) works"); - todo!("keep lst. we derive min/max from lst upon the first event in range"); - - // TODO where and how do I initialize min/max ? + if ev.ts >= self.active_end { + panic!("should never get here"); + } + if ev.ts == self.active_beg { + self.ingest_event_with_lst_eq_range_beg(ev, LstMut(lst.0), minmax)?; + self.cnt += 1; + } else { + self.ingest_event_with_lst_gt_range_beg(ev.clone(), LstMut(lst.0), minmax)?; + self.cnt += 1; + trace_ingest_firsts!("ingest_with_lst_ge_range_beg now calling ingest_with_lst_gt_range_beg"); + return self.ingest_with_lst_gt_range_beg(evs, LstMut(lst.0), minmax); } - - // TODO if the event is exactly on the current bin first edge, then there is no contribution to the avg yet - // and I must initialize the min/max with the current event. - // If the event is after the current bin first edge, then min/max is initialized from the lst - // and there is a contribution from the lst to the avg. - - self.ingest_event_with_lst_gt_range_beg(ev, LstMut(lst), minmax)?; - - // TODO update the lst (needs clone?) } Ok(()) } fn ingest_with_lst_minmax( &mut self, - evs: ContainerEvents, - lst: &mut EventSingle, + evs: ContainerEventsTakeUpTo, + lst: LstMut, minmax: &mut MinMax, ) -> Result<(), Error> { + trace_ingest_event!("ingest_with_lst_minmax"); // TODO how to handle the min max? I don't take event data yet out of the container. - todo!("minmax handle"); if let Some(ts0) = evs.ts_first() { - let range = &self.range; - let beg = range.nano_beg(); - let end = range.nano_end(); - if ts0 < beg { - Err(Error::WithMinMaxButEventBeforeRange) + if ts0 < self.active_beg { + panic!("should never get here"); } else { - todo!(); self.ingest_with_lst_ge_range_beg(evs, lst, minmax) } } else { - Err(Error::EmptyContainerInnerHandler) + Ok(()) } } } -struct InnerA { +struct InnerA +where + EVT: EventValueType, +{ inner_b: InnerB, minmax: Option<(EventSingle, EventSingle)>, } @@ -175,40 +230,46 @@ where } } + fn apply_lst_after_event_handled(ev: EventSingle, lst: LstMut) { + *lst.0 = ev; + } + fn init_minmax(&mut self, ev: &EventSingle) { - todo!() + trace_ingest_minmax!("init_minmax {:?}", ev); + self.minmax = Some((ev.clone(), ev.clone())); } fn init_minmax_with_lst(&mut self, ev: &EventSingle, lst: LstRef) { - todo!() + trace_ingest_minmax!("init_minmax_with_lst {:?} {:?}", ev, lst.0); + self.minmax = Some((lst.0.clone(), lst.0.clone())); + Self::apply_min_max(ev, self.minmax.as_mut().unwrap()); } - fn ingest_with_lst(&mut self, mut evs: ContainerEvents, lst: &mut EventSingle) -> Result<(), Error> { + fn ingest_with_lst(&mut self, mut evs: ContainerEventsTakeUpTo, lst: LstMut) -> Result<(), Error> { if let Some(minmax) = self.minmax.as_mut() { self.inner_b.ingest_with_lst_minmax(evs, lst, minmax) } else { if let Some(ev) = evs.event_next() { - let range = &self.inner_b.range; - let beg = range.nano_beg(); - let end = range.nano_end(); - if ev.ts >= end { - // need to cycle, then apply the event again... - // TODO write that retry as a loop with iter limit. - todo!("cycle and reapply"); + trace_event_next!("ingest_with_lst {:?}", ev); + let beg = self.inner_b.active_beg; + let end = self.inner_b.active_end; + if ev.ts < beg { + panic!("should never get here"); + } else if ev.ts >= end { + panic!("should never get here"); } else { - if DEBUG_CHECKS && ev.ts < beg { - return Err(Error::WithLstButEventBeforeRange); - } else if ev.ts == beg { + if ev.ts == beg { self.init_minmax(&ev); - todo!("this stops handling of event, but must apply to lst"); + InnerA::apply_lst_after_event_handled(ev, lst); + Ok(()) } else { - self.init_minmax_with_lst(&ev, LstRef(lst)); + self.init_minmax_with_lst(&ev, LstRef(lst.0)); if let Some(minmax) = self.minmax.as_mut() { - // todo!("apply here also the event aggregation with everything except lst, min, max"); - self.inner_b - .ingest_event_with_lst_gt_range_beg(ev, LstMut(lst), minmax)?; - todo!("this stops handling of event, but must apply to lst"); - todo!("is this correct? we did already init minmax. calling this, it would get applied again?"); + if ev.ts == beg { + panic!("logic error, is handled before"); + } else { + self.inner_b.ingest_event_with_lst_gt_range_beg_2(ev, LstMut(lst.0))?; + } self.inner_b.ingest_with_lst_minmax(evs, lst, minmax) } else { Err(Error::NoMinMaxAfterInit) @@ -226,8 +287,9 @@ pub struct BinnedEventsTimeweight where EVT: EventValueType, { - inner_a: InnerA, lst: Option>, + range: BinnedRange, + inner_a: InnerA, } impl BinnedEventsTimeweight @@ -235,12 +297,19 @@ where EVT: EventValueType, { pub fn new(range: BinnedRange) -> Self { + let active_beg = range.nano_beg(); + let active_end = active_beg.add_dt_nano(range.bin_len.to_dt_nano()); + let active_len = active_end.delta(active_beg); Self { + range, inner_a: InnerA:: { inner_b: InnerB { - range, cnt: 0, - _t1: PhantomData, + active_beg, + active_end, + active_len, + filled_until: active_beg, + agg: <::AggregatorTimeWeight as AggregatorTimeWeight>::new(), }, minmax: None, }, @@ -249,31 +318,29 @@ where } fn ingest_event_without_lst(&mut self, ev: EventSingle) -> Result<(), Error> { - let range = &self.inner_a.inner_b.range; - let beg = range.nano_beg(); - let end = range.nano_end(); - if ev.ts < end { + if ev.ts >= self.inner_a.inner_b.active_end { + Err(Error::EventAfterRange) + } else { + trace_ingest_init_lst!("ingest_event_without_lst set lst {:?}", ev); self.lst = Some(ev.clone()); - if ev.ts >= beg { - self.inner_a.minmax = Some((ev.clone(), ev.clone())); + if ev.ts >= self.inner_a.inner_b.active_beg { + trace_ingest_minmax!("ingest_event_without_lst"); + self.inner_a.init_minmax(&ev); self.inner_a.inner_b.cnt += 1; } - } else { - todo!("must cycle forward, can probably not produce a bin at all until then") + Ok(()) } - Ok(()) } - fn ingest_without_lst(&mut self, mut evs: ContainerEvents) -> Result<(), Error> { + fn ingest_without_lst(&mut self, mut evs: ContainerEventsTakeUpTo) -> Result<(), Error> { if let Some(ev) = evs.event_next() { - if ev.ts >= self.inner_a.inner_b.range.nano_end() { - // need to cycle, then apply the event again... - // TODO write that retry as a loop with iter limit. - todo!("cycle and reapply"); + trace_event_next!("ingest_without_lst {:?}", ev); + if ev.ts >= self.inner_a.inner_b.active_end { + Err(Error::EventAfterRange) } else { self.ingest_event_without_lst(ev)?; if let Some(lst) = self.lst.as_mut() { - self.inner_a.ingest_with_lst(evs, lst) + self.inner_a.ingest_with_lst(evs, LstMut(lst)) } else { Err(Error::NoLstAfterFirst) } @@ -285,9 +352,9 @@ where // Caller asserts that evs is ordered within the current container // and with respect to the last container, if any. - fn ingest_ordered(&mut self, evs: ContainerEvents) -> Result<(), Error> { + fn ingest_ordered(&mut self, evs: ContainerEventsTakeUpTo) -> Result<(), Error> { if let Some(lst) = self.lst.as_mut() { - self.inner_a.ingest_with_lst(evs, lst) + self.inner_a.ingest_with_lst(evs, LstMut(lst)) } else { if self.inner_a.minmax.is_some() { Err(Error::NoLstButMinMax) @@ -297,7 +364,7 @@ where } } - pub fn ingest(&mut self, evs: ContainerEvents) -> Result<(), Error> { + pub fn ingest(&mut self, mut evs_all: ContainerEvents) -> Result<(), Error> { // It is this type's task to find and store the one-before event. // We then pass it to the aggregation. // AggregatorTimeWeight needs a function for that. @@ -308,21 +375,72 @@ where // ALSO: need to keep track of the "lst". Probably best done in this type as well? // TODO should rely on external stream adapter for verification to not duplicate things. - evs.verify()?; + evs_all.verify()?; - if let Some(ts) = evs.ts_first() { - if let Some(lst) = self.lst.as_ref() { - if ts < lst.ts { - return Err(Error::Unordered); + loop { + // How to handle transition to the next bin? + // How to handle to not emit bins until at least some partially filled bin is encountered? + break if let Some(ts) = evs_all.ts_first() { + let b = &mut self.inner_a.inner_b; + if ts >= b.active_end { + trace_cycle!("bin edge boundary {:?}", b.active_end); + if let Some(lst) = self.lst.as_ref() { + trace_cycle!("fill remaining width"); + self.inner_a + .inner_b + .ingest_event_with_lst_gt_range_beg_agg(lst.clone(), LstRef(lst)); + } else { + // nothing to do + } + let b = &mut self.inner_a.inner_b; + if b.filled_until < b.active_beg { + panic!("fille until before bin begin"); + } else if b.filled_until == b.active_beg { + // TODO bin is meaningless + } else { + // TODO need the output type. + } + trace_cycle!("cycle bin {:?} {:?}", ts, b.active_end); + // TODO check if the bin has content to emit: either it itself contains events, or is filled with lst value. + // For the check for filled with lst I might need another flag. + let div = self.range.bin_len.ns(); + let ts1 = TsNano::from_ns(ts.ns() / div * div); + b.active_beg = ts1; + b.active_end = ts1.add_dt_nano(b.active_len); + b.filled_until = ts1; + b.cnt = 0; + b.agg.reset_for_new_bin(); + trace_cycle!("cycled to {:?} {:?}", b.active_beg, b.active_end); + } + let n1 = evs_all.len(); + let len_before = evs_all.len_before(self.inner_a.inner_b.active_end); + let evs = ContainerEventsTakeUpTo::new(&mut evs_all, len_before); + if let Some(lst) = self.lst.as_ref() { + if ts < lst.ts { + return Err(Error::Unordered); + } else { + self.ingest_ordered(evs)? + } } else { - self.ingest_ordered(evs) + self.ingest_ordered(evs)? + }; + trace_ingest_container!("ingest after still left len evs {}", evs_all.len()); + let n2 = evs_all.len(); + if n2 != 0 { + if n2 == n1 { + panic!("no progress"); + } + continue; } } else { - self.ingest_ordered(evs) - } - } else { - Ok(()) + () + }; } + Ok(()) + } + + pub fn range_final(&mut self) -> Result<(), Error> { + todo!() } } diff --git a/crates/items_2/src/binning/valuetype.rs b/crates/items_2/src/binning/valuetype.rs new file mode 100644 index 0000000..a8ac319 --- /dev/null +++ b/crates/items_2/src/binning/valuetype.rs @@ -0,0 +1,74 @@ +use super::aggregator::AggregatorTimeWeight; +use super::container_events::Container; +use super::container_events::EventValueType; +use crate::vecpreview::PreviewRange; +use netpod::DtNano; +use netpod::EnumVariant; +use serde::Deserialize; +use serde::Serialize; +use std::collections::VecDeque; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct EnumVariantContainer { + ixs: VecDeque, + names: VecDeque, +} + +impl PreviewRange for EnumVariantContainer { + fn preview(&self) -> &dyn core::fmt::Debug { + todo!() + } +} + +impl Container for EnumVariantContainer { + fn new() -> Self { + Self { + ixs: VecDeque::new(), + names: VecDeque::new(), + } + } + + fn push_back(&mut self, val: EnumVariant) { + let (ix, name) = val.into_parts(); + self.ixs.push_back(ix); + self.names.push_back(name); + } + + fn pop_front(&mut self) -> Option { + if let (Some(a), Some(b)) = (self.ixs.pop_front(), self.names.pop_front()) { + Some(EnumVariant::new(a, b)) + } else { + None + } + } +} + +pub struct EnumVariantAggregatorTimeWeight { + sum: f32, +} + +impl AggregatorTimeWeight for EnumVariantAggregatorTimeWeight { + fn new() -> Self { + Self { sum: 0. } + } + + fn reset_for_new_bin(&mut self) { + self.sum = 0. + } + + fn ingest(&mut self, dt: DtNano, bl: DtNano, val: EnumVariant) { + let f = dt.ns() as f32 / bl.ns() as f32; + eprintln!("INGEST {} {:?}", f, val); + let h = items_0::scalar_ops::AsPrimF32::as_prim_f32_b(&val); + self.sum += f * h; + } +} + +impl EventValueType for EnumVariant { + type Container = EnumVariantContainer; + type AggregatorTimeWeight = EnumVariantAggregatorTimeWeight; + + fn sum_identity() -> Self { + todo!() + } +} diff --git a/crates/netpod/src/netpod.rs b/crates/netpod/src/netpod.rs index 97760d0..3dd123f 100644 --- a/crates/netpod/src/netpod.rs +++ b/crates/netpod/src/netpod.rs @@ -691,6 +691,10 @@ impl EnumVariant { pub fn name_string(&self) -> String { self.name.clone() } + + pub fn into_parts(self) -> (u16, String) { + (self.ix, self.name) + } } impl Default for EnumVariant {