WIP
This commit is contained in:
@@ -1,6 +1,7 @@
|
||||
pub mod aggregator;
|
||||
pub mod container_events;
|
||||
pub mod timeweight;
|
||||
pub mod valuetype;
|
||||
|
||||
#[cfg(test)]
|
||||
mod test;
|
||||
|
||||
@@ -1,19 +1,70 @@
|
||||
use std::marker::PhantomData;
|
||||
use super::container_events::EventValueType;
|
||||
use netpod::DtNano;
|
||||
|
||||
pub trait AggregatorTimeWeight {}
|
||||
|
||||
pub struct AggregatorNumeric<T> {
|
||||
_t0: PhantomData<T>,
|
||||
pub trait AggregatorTimeWeight<EVT>
|
||||
where
|
||||
EVT: EventValueType,
|
||||
{
|
||||
fn new() -> Self;
|
||||
fn reset_for_new_bin(&mut self);
|
||||
fn ingest(&mut self, dt: DtNano, bl: DtNano, val: EVT);
|
||||
}
|
||||
|
||||
trait AggWithSame {}
|
||||
pub struct AggregatorNumeric<EVT> {
|
||||
sum: EVT,
|
||||
}
|
||||
|
||||
trait AggWithSame: EventValueType {}
|
||||
|
||||
impl AggWithSame for f64 {}
|
||||
|
||||
impl<T> AggregatorTimeWeight for AggregatorNumeric<T> where T: AggWithSame {}
|
||||
impl<EVT> AggregatorTimeWeight<EVT> for AggregatorNumeric<EVT>
|
||||
where
|
||||
EVT: AggWithSame,
|
||||
{
|
||||
fn new() -> Self {
|
||||
todo!()
|
||||
}
|
||||
|
||||
impl AggregatorTimeWeight for AggregatorNumeric<f32> {}
|
||||
fn reset_for_new_bin(&mut self) {
|
||||
todo!()
|
||||
}
|
||||
|
||||
impl AggregatorTimeWeight for AggregatorNumeric<u64> {}
|
||||
fn ingest(&mut self, dt: DtNano, bl: DtNano, val: EVT) {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
||||
impl AggregatorTimeWeight<f32> for AggregatorNumeric<f32> {
|
||||
fn new() -> Self {
|
||||
Self {
|
||||
sum: f32::sum_identity(),
|
||||
}
|
||||
}
|
||||
|
||||
fn reset_for_new_bin(&mut self) {
|
||||
self.sum = f32::sum_identity();
|
||||
}
|
||||
|
||||
fn ingest(&mut self, dt: DtNano, bl: DtNano, val: f32) {
|
||||
let f = dt.ns() as f32 / bl.ns() as f32;
|
||||
eprintln!("INGEST {} {}", f, val);
|
||||
self.sum += f * val;
|
||||
}
|
||||
}
|
||||
|
||||
impl AggregatorTimeWeight<u64> for AggregatorNumeric<u64> {
|
||||
fn new() -> Self {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn reset_for_new_bin(&mut self) {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn ingest(&mut self, dt: DtNano, bl: DtNano, val: u64) {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
||||
// TODO do enum right from begin, using a SOA enum container.
|
||||
|
||||
@@ -22,30 +22,59 @@ pub enum ValueContainerError {}
|
||||
pub trait Container<EVT>: fmt::Debug + Clone + PreviewRange + Serialize + for<'a> Deserialize<'a> {
|
||||
fn new() -> Self;
|
||||
// fn verify(&self) -> Result<(), ValueContainerError>;
|
||||
fn push_back(&mut self, val: EVT);
|
||||
fn pop_front(&mut self) -> Option<EVT>;
|
||||
}
|
||||
|
||||
pub trait EventValueType: fmt::Debug + Clone + PartialOrd {
|
||||
type Container: Container<Self>;
|
||||
type AggregatorTimeWeight: AggregatorTimeWeight;
|
||||
type AggregatorTimeWeight: AggregatorTimeWeight<Self>;
|
||||
|
||||
fn sum_identity() -> Self;
|
||||
}
|
||||
|
||||
impl<T> Container<T> for VecDeque<T>
|
||||
impl<EVT> Container<EVT> for VecDeque<EVT>
|
||||
where
|
||||
T: EventValueType + Serialize + for<'a> Deserialize<'a>,
|
||||
EVT: EventValueType + Serialize + for<'a> Deserialize<'a>,
|
||||
{
|
||||
fn new() -> Self {
|
||||
VecDeque::new()
|
||||
}
|
||||
|
||||
fn pop_front(&mut self) -> Option<T> {
|
||||
todo!()
|
||||
fn push_back(&mut self, val: EVT) {
|
||||
self.push_back(val);
|
||||
}
|
||||
|
||||
fn pop_front(&mut self) -> Option<EVT> {
|
||||
self.pop_front()
|
||||
}
|
||||
}
|
||||
|
||||
impl EventValueType for f32 {
|
||||
type Container = VecDeque<Self>;
|
||||
type AggregatorTimeWeight = AggregatorNumeric<Self>;
|
||||
|
||||
fn sum_identity() -> Self {
|
||||
0.
|
||||
}
|
||||
}
|
||||
|
||||
impl EventValueType for f64 {
|
||||
type Container = VecDeque<Self>;
|
||||
type AggregatorTimeWeight = AggregatorNumeric<Self>;
|
||||
|
||||
fn sum_identity() -> Self {
|
||||
0.
|
||||
}
|
||||
}
|
||||
|
||||
impl EventValueType for u64 {
|
||||
type Container = VecDeque<Self>;
|
||||
type AggregatorTimeWeight = AggregatorNumeric<Self>;
|
||||
|
||||
fn sum_identity() -> Self {
|
||||
0
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
@@ -116,6 +145,11 @@ where
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
pub fn push_back(&mut self, ts: TsNano, val: EVT) {
|
||||
self.tss.push_back(ts);
|
||||
self.vals.push_back(val);
|
||||
}
|
||||
}
|
||||
|
||||
impl<EVT> fmt::Debug for ContainerEvents<EVT>
|
||||
@@ -133,3 +167,51 @@ where
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
pub struct ContainerEventsTakeUpTo<'a, EVT>
|
||||
where
|
||||
EVT: EventValueType,
|
||||
{
|
||||
evs: &'a mut ContainerEvents<EVT>,
|
||||
len: usize,
|
||||
}
|
||||
|
||||
impl<'a, EVT> ContainerEventsTakeUpTo<'a, EVT>
|
||||
where
|
||||
EVT: EventValueType,
|
||||
{
|
||||
pub fn new(evs: &'a mut ContainerEvents<EVT>, len: usize) -> Self {
|
||||
let len = len.min(evs.len());
|
||||
Self { evs, len }
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, EVT> ContainerEventsTakeUpTo<'a, EVT>
|
||||
where
|
||||
EVT: EventValueType,
|
||||
{
|
||||
pub fn ts_first(&self) -> Option<TsNano> {
|
||||
self.evs.ts_first()
|
||||
}
|
||||
|
||||
pub fn ts_last(&self) -> Option<TsNano> {
|
||||
self.evs.ts_last()
|
||||
}
|
||||
|
||||
pub fn len(&self) -> usize {
|
||||
self.len
|
||||
}
|
||||
|
||||
pub fn event_next(&mut self) -> Option<EventSingle<EVT>> {
|
||||
if self.len != 0 {
|
||||
if let Some(ev) = self.evs.event_next() {
|
||||
self.len -= 1;
|
||||
Some(ev)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
mod events00;
|
||||
use super::container_events::ContainerEvents;
|
||||
use super::___;
|
||||
use netpod::log::*;
|
||||
|
||||
70
crates/items_2/src/binning/test/events00.rs
Normal file
70
crates/items_2/src/binning/test/events00.rs
Normal file
@@ -0,0 +1,70 @@
|
||||
use crate::binning::container_events::ContainerEvents;
|
||||
use crate::binning::timeweight::timeweight_events::BinnedEventsTimeweight;
|
||||
use err::thiserror;
|
||||
use err::ThisError;
|
||||
use netpod::range::evrange::NanoRange;
|
||||
use netpod::BinnedRange;
|
||||
use netpod::DtMs;
|
||||
use netpod::EnumVariant;
|
||||
use netpod::TsNano;
|
||||
|
||||
#[derive(Debug, ThisError)]
|
||||
#[cstm(name = "Error")]
|
||||
enum Error {
|
||||
Timeweight(#[from] crate::binning::timeweight::timeweight_events::Error),
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_bin_events_f32_simple_00() -> Result<(), Error> {
|
||||
let beg = TsNano::from_ms(100);
|
||||
let end = TsNano::from_ms(120);
|
||||
let nano_range = NanoRange {
|
||||
beg: beg.ns(),
|
||||
end: end.ns(),
|
||||
};
|
||||
let range = BinnedRange::from_nano_range(nano_range, DtMs::from_ms_u64(10));
|
||||
let mut binner = BinnedEventsTimeweight::new(range);
|
||||
let mut evs = ContainerEvents::<f32>::new();
|
||||
evs.push_back(TsNano::from_ms(103), 2.0);
|
||||
binner.ingest(evs)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_bin_events_f32_simple_01() -> Result<(), Error> {
|
||||
let beg = TsNano::from_ms(100);
|
||||
let end = TsNano::from_ms(120);
|
||||
let nano_range = NanoRange {
|
||||
beg: beg.ns(),
|
||||
end: end.ns(),
|
||||
};
|
||||
let range = BinnedRange::from_nano_range(nano_range, DtMs::from_ms_u64(10));
|
||||
let mut binner = BinnedEventsTimeweight::new(range);
|
||||
let mut evs = ContainerEvents::<f32>::new();
|
||||
evs.push_back(TsNano::from_ms(103), 2.0);
|
||||
evs.push_back(TsNano::from_ms(104), 2.4);
|
||||
binner.ingest(evs)?;
|
||||
let mut evs = ContainerEvents::<f32>::new();
|
||||
evs.push_back(TsNano::from_ms(111), 1.0);
|
||||
evs.push_back(TsNano::from_ms(112), 1.2);
|
||||
evs.push_back(TsNano::from_ms(113), 1.4);
|
||||
binner.ingest(evs)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_bin_events_enum_simple_00() -> Result<(), Error> {
|
||||
let beg = TsNano::from_ms(100);
|
||||
let end = TsNano::from_ms(120);
|
||||
let nano_range = NanoRange {
|
||||
beg: beg.ns(),
|
||||
end: end.ns(),
|
||||
};
|
||||
let range = BinnedRange::from_nano_range(nano_range, DtMs::from_ms_u64(10));
|
||||
let mut binner = BinnedEventsTimeweight::new(range);
|
||||
let mut evs = ContainerEvents::new();
|
||||
evs.push_back(TsNano::from_ms(103), EnumVariant::new(1, "one"));
|
||||
evs.push_back(TsNano::from_ms(104), EnumVariant::new(2, "two"));
|
||||
binner.ingest(evs)?;
|
||||
Ok(())
|
||||
}
|
||||
@@ -1,12 +1,15 @@
|
||||
use super::super::container_events::EventValueType;
|
||||
use super::___;
|
||||
use crate::binning::aggregator::AggregatorTimeWeight;
|
||||
use crate::binning::container_events::ContainerEvents;
|
||||
use crate::binning::container_events::ContainerEventsTakeUpTo;
|
||||
use crate::binning::container_events::EventSingle;
|
||||
use err::thiserror;
|
||||
use err::ThisError;
|
||||
use futures_util::Stream;
|
||||
use netpod::log::*;
|
||||
use netpod::BinnedRange;
|
||||
use netpod::DtNano;
|
||||
use netpod::TsNano;
|
||||
use std::marker::PhantomData;
|
||||
use std::pin::Pin;
|
||||
@@ -14,7 +17,34 @@ use std::task::Context;
|
||||
use std::task::Poll;
|
||||
|
||||
#[allow(unused)]
|
||||
macro_rules! trace_init { ($($arg:tt)*) => ( if true { trace!($($arg)*); }) }
|
||||
macro_rules! trace_ { ($($arg:tt)*) => ( if true { eprintln!($($arg)*); }) }
|
||||
|
||||
#[allow(unused)]
|
||||
macro_rules! trace_init { ($($arg:tt)*) => ( if true { trace_!($($arg)*); }) }
|
||||
|
||||
#[allow(unused)]
|
||||
macro_rules! trace_cycle { ($($arg:tt)*) => ( if true { trace_!($($arg)*); }) }
|
||||
|
||||
#[allow(unused)]
|
||||
macro_rules! trace_event_next { ($($arg:tt)*) => ( if true { trace_!($($arg)*); }) }
|
||||
|
||||
#[allow(unused)]
|
||||
macro_rules! trace_ingest_init_lst { ($($arg:tt)*) => ( if true { trace_!($($arg)*); }) }
|
||||
|
||||
#[allow(unused)]
|
||||
macro_rules! trace_ingest_minmax { ($($arg:tt)*) => ( if true { trace_!($($arg)*); }) }
|
||||
|
||||
#[allow(unused)]
|
||||
macro_rules! trace_ingest_event { ($($arg:tt)*) => ( if true { trace_!($($arg)*); }) }
|
||||
|
||||
#[allow(unused)]
|
||||
macro_rules! trace_ingest_firsts { ($($arg:tt)*) => ( if true { trace_!($($arg)*); }) }
|
||||
|
||||
#[allow(unused)]
|
||||
macro_rules! trace_ingest_finish_bin { ($($arg:tt)*) => ( if true { trace_!($($arg)*); }) }
|
||||
|
||||
#[allow(unused)]
|
||||
macro_rules! trace_ingest_container { ($($arg:tt)*) => ( if true { trace_!($($arg)*); }) }
|
||||
|
||||
const DEBUG_CHECKS: bool = true;
|
||||
|
||||
@@ -23,13 +53,14 @@ const DEBUG_CHECKS: bool = true;
|
||||
pub enum Error {
|
||||
BadContainer(#[from] super::super::container_events::EventsContainerError),
|
||||
Unordered,
|
||||
EventAfterRange,
|
||||
NoLstAfterFirst,
|
||||
EmptyContainerInnerHandler,
|
||||
NoLstButMinMax,
|
||||
WithLstButEventBeforeRange,
|
||||
WithMinMaxButEventBeforeRange,
|
||||
NoMinMaxAfterInit,
|
||||
ExpectEventInInnerVolumeRange,
|
||||
ExpectEventWithinRange,
|
||||
}
|
||||
|
||||
type MinMax<EVT> = (EventSingle<EVT>, EventSingle<EVT>);
|
||||
@@ -38,126 +69,150 @@ struct LstRef<'a, EVT>(&'a EventSingle<EVT>);
|
||||
|
||||
struct LstMut<'a, EVT>(&'a mut EventSingle<EVT>);
|
||||
|
||||
struct InnerB<EVT> {
|
||||
range: BinnedRange<TsNano>,
|
||||
struct InnerB<EVT>
|
||||
where
|
||||
EVT: EventValueType,
|
||||
{
|
||||
cnt: u64,
|
||||
_t1: PhantomData<EVT>,
|
||||
active_beg: TsNano,
|
||||
active_end: TsNano,
|
||||
active_len: DtNano,
|
||||
filled_until: TsNano,
|
||||
agg: <EVT as EventValueType>::AggregatorTimeWeight,
|
||||
}
|
||||
|
||||
impl<EVT> InnerB<EVT>
|
||||
where
|
||||
EVT: EventValueType,
|
||||
{
|
||||
// NOTE that this is also used during bin-cycle.
|
||||
fn ingest_event_with_lst_gt_range_beg_agg(&mut self, ev: EventSingle<EVT>, lst: LstRef<EVT>) {
|
||||
trace_ingest_event!("ingest_event_with_lst_gt_range_beg_agg {:?}", ev);
|
||||
if DEBUG_CHECKS {
|
||||
if ev.ts <= self.active_beg {
|
||||
panic!("should never get here");
|
||||
}
|
||||
if ev.ts >= self.active_end {
|
||||
panic!("should never get here");
|
||||
}
|
||||
}
|
||||
let dt = ev.ts.delta(self.filled_until);
|
||||
// TODO can the caller already take the value and replace it afterwards with the current value?
|
||||
// This fn could swap the value in lst and directly use it.
|
||||
// This would require that any call path does not mess with lst.
|
||||
// NOTE that this fn is also used during bin-cycle.
|
||||
self.agg.ingest(dt, self.active_len, lst.0.val.clone());
|
||||
self.filled_until = ev.ts;
|
||||
}
|
||||
|
||||
fn ingest_event_with_lst_gt_range_beg_2(&mut self, ev: EventSingle<EVT>, lst: LstMut<EVT>) -> Result<(), Error> {
|
||||
trace_ingest_event!("ingest_event_with_lst_gt_range_beg_2");
|
||||
self.ingest_event_with_lst_gt_range_beg_agg(ev.clone(), LstRef(lst.0));
|
||||
InnerA::apply_lst_after_event_handled(ev, lst);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn ingest_event_with_lst_gt_range_beg(
|
||||
&mut self,
|
||||
ev: EventSingle<EVT>,
|
||||
lst: LstMut<EVT>,
|
||||
minmax: &mut MinMax<EVT>,
|
||||
) -> Result<(), Error> {
|
||||
if DEBUG_CHECKS {
|
||||
if ev.ts <= self.range.nano_beg() {
|
||||
return Err(Error::ExpectEventInInnerVolumeRange);
|
||||
trace_ingest_event!("ingest_event_with_lst_gt_range_beg");
|
||||
// TODO if the event is exactly on the current bin first edge, then there is no contribution to the avg yet
|
||||
// and I must initialize the min/max with the current event.
|
||||
InnerA::apply_min_max(&ev, minmax);
|
||||
self.ingest_event_with_lst_gt_range_beg_2(ev.clone(), lst)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn ingest_event_with_lst_eq_range_beg(
|
||||
&mut self,
|
||||
ev: EventSingle<EVT>,
|
||||
lst: LstMut<EVT>,
|
||||
minmax: &mut MinMax<EVT>,
|
||||
) -> Result<(), Error> {
|
||||
trace_ingest_event!("ingest_event_with_lst_eq_range_beg");
|
||||
// TODO if the event is exactly on the current bin first edge, then there is no contribution to the avg yet
|
||||
// and I must initialize the min/max with the current event.
|
||||
InnerA::apply_min_max(&ev, minmax);
|
||||
InnerA::apply_lst_after_event_handled(ev, lst);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn ingest_with_lst_gt_range_beg(
|
||||
&mut self,
|
||||
mut evs: ContainerEventsTakeUpTo<EVT>,
|
||||
lst: LstMut<EVT>,
|
||||
minmax: &mut MinMax<EVT>,
|
||||
) -> Result<(), Error> {
|
||||
trace_ingest_event!("ingest_with_lst_gt_range_beg");
|
||||
while let Some(ev) = evs.event_next() {
|
||||
trace_event_next!("ingest_with_lst_ge_range_beg {:?}", ev);
|
||||
if ev.ts <= self.active_beg {
|
||||
panic!("should never get here");
|
||||
}
|
||||
if ev.ts >= self.active_end {
|
||||
panic!("should never get here");
|
||||
}
|
||||
self.ingest_event_with_lst_gt_range_beg(ev.clone(), LstMut(lst.0), minmax)?;
|
||||
self.cnt += 1;
|
||||
}
|
||||
// Aggregator:
|
||||
// Must handle min, max, avg, var.
|
||||
// min and max is actually tricky and can not be done in one go with lst:
|
||||
// The current read procedure allows that the event stream contains a one-before event even though the
|
||||
// first in-range event is exactly on range-beg. In that case the min/max given by the one-before is
|
||||
// irrelevant for the bin.
|
||||
|
||||
// fn apply_event_time_weight(&mut self, px: u64) {
|
||||
// if let Some((_, _, v)) = self.minmaxlst.as_ref() {
|
||||
// trace_ingest!("apply_event_time_weight with v {v:?}");
|
||||
// let vf = v.as_prim_f32_b();
|
||||
// let v2 = v.clone();
|
||||
// self.apply_min_max_lst(v2);
|
||||
// self.sumc += 1;
|
||||
// let w = (px - self.int_ts) as f32 * 1e-9;
|
||||
// if false {
|
||||
// trace!(
|
||||
// "int_ts {:10} px {:8} w {:8.1} vf {:8.1} sum {:8.1}",
|
||||
// self.int_ts / MS,
|
||||
// px / MS,
|
||||
// w,
|
||||
// vf,
|
||||
// self.sum
|
||||
// );
|
||||
// }
|
||||
// if vf.is_nan() {
|
||||
// } else {
|
||||
// self.sum += vf * w;
|
||||
// }
|
||||
// self.int_ts = px;
|
||||
// } else {
|
||||
// debug_ingest!("apply_event_time_weight NO VALUE");
|
||||
// }
|
||||
// }
|
||||
|
||||
todo!()
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn ingest_with_lst_ge_range_beg(
|
||||
&mut self,
|
||||
mut evs: ContainerEvents<EVT>,
|
||||
lst: &mut EventSingle<EVT>,
|
||||
mut evs: ContainerEventsTakeUpTo<EVT>,
|
||||
lst: LstMut<EVT>,
|
||||
minmax: &mut MinMax<EVT>,
|
||||
) -> Result<(), Error> {
|
||||
trace_ingest_event!("ingest_with_lst_ge_range_beg");
|
||||
while let Some(ev) = evs.event_next() {
|
||||
if true {
|
||||
// How to handle transition to the next bin?
|
||||
// What does self.range mean, the full requested range of all bins or the current range?
|
||||
// Do I maybe need both?
|
||||
// How to handle to not emit bins until at least some partially filled bin is encountered?
|
||||
// How to implement the bin cycle logic in clean way?
|
||||
// How to emit things? Do to ship results to the caller?
|
||||
todo!("must check if ev is already after range.");
|
||||
trace_event_next!("ingest_with_lst_ge_range_beg {:?}", ev);
|
||||
if ev.ts < self.active_beg {
|
||||
panic!("should never get here");
|
||||
}
|
||||
|
||||
if ev.ts >= self.range.nano_end() {
|
||||
todo!("make sure that the transition to the next bin (if we want any next bin) works");
|
||||
todo!("keep lst. we derive min/max from lst upon the first event in range");
|
||||
|
||||
// TODO where and how do I initialize min/max ?
|
||||
if ev.ts >= self.active_end {
|
||||
panic!("should never get here");
|
||||
}
|
||||
if ev.ts == self.active_beg {
|
||||
self.ingest_event_with_lst_eq_range_beg(ev, LstMut(lst.0), minmax)?;
|
||||
self.cnt += 1;
|
||||
} else {
|
||||
self.ingest_event_with_lst_gt_range_beg(ev.clone(), LstMut(lst.0), minmax)?;
|
||||
self.cnt += 1;
|
||||
trace_ingest_firsts!("ingest_with_lst_ge_range_beg now calling ingest_with_lst_gt_range_beg");
|
||||
return self.ingest_with_lst_gt_range_beg(evs, LstMut(lst.0), minmax);
|
||||
}
|
||||
|
||||
// TODO if the event is exactly on the current bin first edge, then there is no contribution to the avg yet
|
||||
// and I must initialize the min/max with the current event.
|
||||
// If the event is after the current bin first edge, then min/max is initialized from the lst
|
||||
// and there is a contribution from the lst to the avg.
|
||||
|
||||
self.ingest_event_with_lst_gt_range_beg(ev, LstMut(lst), minmax)?;
|
||||
|
||||
// TODO update the lst (needs clone?)
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn ingest_with_lst_minmax(
|
||||
&mut self,
|
||||
evs: ContainerEvents<EVT>,
|
||||
lst: &mut EventSingle<EVT>,
|
||||
evs: ContainerEventsTakeUpTo<EVT>,
|
||||
lst: LstMut<EVT>,
|
||||
minmax: &mut MinMax<EVT>,
|
||||
) -> Result<(), Error> {
|
||||
trace_ingest_event!("ingest_with_lst_minmax");
|
||||
// TODO how to handle the min max? I don't take event data yet out of the container.
|
||||
todo!("minmax handle");
|
||||
if let Some(ts0) = evs.ts_first() {
|
||||
let range = &self.range;
|
||||
let beg = range.nano_beg();
|
||||
let end = range.nano_end();
|
||||
if ts0 < beg {
|
||||
Err(Error::WithMinMaxButEventBeforeRange)
|
||||
if ts0 < self.active_beg {
|
||||
panic!("should never get here");
|
||||
} else {
|
||||
todo!();
|
||||
self.ingest_with_lst_ge_range_beg(evs, lst, minmax)
|
||||
}
|
||||
} else {
|
||||
Err(Error::EmptyContainerInnerHandler)
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct InnerA<EVT> {
|
||||
struct InnerA<EVT>
|
||||
where
|
||||
EVT: EventValueType,
|
||||
{
|
||||
inner_b: InnerB<EVT>,
|
||||
minmax: Option<(EventSingle<EVT>, EventSingle<EVT>)>,
|
||||
}
|
||||
@@ -175,40 +230,46 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
fn apply_lst_after_event_handled(ev: EventSingle<EVT>, lst: LstMut<EVT>) {
|
||||
*lst.0 = ev;
|
||||
}
|
||||
|
||||
fn init_minmax(&mut self, ev: &EventSingle<EVT>) {
|
||||
todo!()
|
||||
trace_ingest_minmax!("init_minmax {:?}", ev);
|
||||
self.minmax = Some((ev.clone(), ev.clone()));
|
||||
}
|
||||
|
||||
fn init_minmax_with_lst(&mut self, ev: &EventSingle<EVT>, lst: LstRef<EVT>) {
|
||||
todo!()
|
||||
trace_ingest_minmax!("init_minmax_with_lst {:?} {:?}", ev, lst.0);
|
||||
self.minmax = Some((lst.0.clone(), lst.0.clone()));
|
||||
Self::apply_min_max(ev, self.minmax.as_mut().unwrap());
|
||||
}
|
||||
|
||||
fn ingest_with_lst(&mut self, mut evs: ContainerEvents<EVT>, lst: &mut EventSingle<EVT>) -> Result<(), Error> {
|
||||
fn ingest_with_lst(&mut self, mut evs: ContainerEventsTakeUpTo<EVT>, lst: LstMut<EVT>) -> Result<(), Error> {
|
||||
if let Some(minmax) = self.minmax.as_mut() {
|
||||
self.inner_b.ingest_with_lst_minmax(evs, lst, minmax)
|
||||
} else {
|
||||
if let Some(ev) = evs.event_next() {
|
||||
let range = &self.inner_b.range;
|
||||
let beg = range.nano_beg();
|
||||
let end = range.nano_end();
|
||||
if ev.ts >= end {
|
||||
// need to cycle, then apply the event again...
|
||||
// TODO write that retry as a loop with iter limit.
|
||||
todo!("cycle and reapply");
|
||||
trace_event_next!("ingest_with_lst {:?}", ev);
|
||||
let beg = self.inner_b.active_beg;
|
||||
let end = self.inner_b.active_end;
|
||||
if ev.ts < beg {
|
||||
panic!("should never get here");
|
||||
} else if ev.ts >= end {
|
||||
panic!("should never get here");
|
||||
} else {
|
||||
if DEBUG_CHECKS && ev.ts < beg {
|
||||
return Err(Error::WithLstButEventBeforeRange);
|
||||
} else if ev.ts == beg {
|
||||
if ev.ts == beg {
|
||||
self.init_minmax(&ev);
|
||||
todo!("this stops handling of event, but must apply to lst");
|
||||
InnerA::apply_lst_after_event_handled(ev, lst);
|
||||
Ok(())
|
||||
} else {
|
||||
self.init_minmax_with_lst(&ev, LstRef(lst));
|
||||
self.init_minmax_with_lst(&ev, LstRef(lst.0));
|
||||
if let Some(minmax) = self.minmax.as_mut() {
|
||||
// todo!("apply here also the event aggregation with everything except lst, min, max");
|
||||
self.inner_b
|
||||
.ingest_event_with_lst_gt_range_beg(ev, LstMut(lst), minmax)?;
|
||||
todo!("this stops handling of event, but must apply to lst");
|
||||
todo!("is this correct? we did already init minmax. calling this, it would get applied again?");
|
||||
if ev.ts == beg {
|
||||
panic!("logic error, is handled before");
|
||||
} else {
|
||||
self.inner_b.ingest_event_with_lst_gt_range_beg_2(ev, LstMut(lst.0))?;
|
||||
}
|
||||
self.inner_b.ingest_with_lst_minmax(evs, lst, minmax)
|
||||
} else {
|
||||
Err(Error::NoMinMaxAfterInit)
|
||||
@@ -226,8 +287,9 @@ pub struct BinnedEventsTimeweight<EVT>
|
||||
where
|
||||
EVT: EventValueType,
|
||||
{
|
||||
inner_a: InnerA<EVT>,
|
||||
lst: Option<EventSingle<EVT>>,
|
||||
range: BinnedRange<TsNano>,
|
||||
inner_a: InnerA<EVT>,
|
||||
}
|
||||
|
||||
impl<EVT> BinnedEventsTimeweight<EVT>
|
||||
@@ -235,12 +297,19 @@ where
|
||||
EVT: EventValueType,
|
||||
{
|
||||
pub fn new(range: BinnedRange<TsNano>) -> Self {
|
||||
let active_beg = range.nano_beg();
|
||||
let active_end = active_beg.add_dt_nano(range.bin_len.to_dt_nano());
|
||||
let active_len = active_end.delta(active_beg);
|
||||
Self {
|
||||
range,
|
||||
inner_a: InnerA::<EVT> {
|
||||
inner_b: InnerB {
|
||||
range,
|
||||
cnt: 0,
|
||||
_t1: PhantomData,
|
||||
active_beg,
|
||||
active_end,
|
||||
active_len,
|
||||
filled_until: active_beg,
|
||||
agg: <<EVT as EventValueType>::AggregatorTimeWeight as AggregatorTimeWeight<EVT>>::new(),
|
||||
},
|
||||
minmax: None,
|
||||
},
|
||||
@@ -249,31 +318,29 @@ where
|
||||
}
|
||||
|
||||
fn ingest_event_without_lst(&mut self, ev: EventSingle<EVT>) -> Result<(), Error> {
|
||||
let range = &self.inner_a.inner_b.range;
|
||||
let beg = range.nano_beg();
|
||||
let end = range.nano_end();
|
||||
if ev.ts < end {
|
||||
if ev.ts >= self.inner_a.inner_b.active_end {
|
||||
Err(Error::EventAfterRange)
|
||||
} else {
|
||||
trace_ingest_init_lst!("ingest_event_without_lst set lst {:?}", ev);
|
||||
self.lst = Some(ev.clone());
|
||||
if ev.ts >= beg {
|
||||
self.inner_a.minmax = Some((ev.clone(), ev.clone()));
|
||||
if ev.ts >= self.inner_a.inner_b.active_beg {
|
||||
trace_ingest_minmax!("ingest_event_without_lst");
|
||||
self.inner_a.init_minmax(&ev);
|
||||
self.inner_a.inner_b.cnt += 1;
|
||||
}
|
||||
} else {
|
||||
todo!("must cycle forward, can probably not produce a bin at all until then")
|
||||
Ok(())
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn ingest_without_lst(&mut self, mut evs: ContainerEvents<EVT>) -> Result<(), Error> {
|
||||
fn ingest_without_lst(&mut self, mut evs: ContainerEventsTakeUpTo<EVT>) -> Result<(), Error> {
|
||||
if let Some(ev) = evs.event_next() {
|
||||
if ev.ts >= self.inner_a.inner_b.range.nano_end() {
|
||||
// need to cycle, then apply the event again...
|
||||
// TODO write that retry as a loop with iter limit.
|
||||
todo!("cycle and reapply");
|
||||
trace_event_next!("ingest_without_lst {:?}", ev);
|
||||
if ev.ts >= self.inner_a.inner_b.active_end {
|
||||
Err(Error::EventAfterRange)
|
||||
} else {
|
||||
self.ingest_event_without_lst(ev)?;
|
||||
if let Some(lst) = self.lst.as_mut() {
|
||||
self.inner_a.ingest_with_lst(evs, lst)
|
||||
self.inner_a.ingest_with_lst(evs, LstMut(lst))
|
||||
} else {
|
||||
Err(Error::NoLstAfterFirst)
|
||||
}
|
||||
@@ -285,9 +352,9 @@ where
|
||||
|
||||
// Caller asserts that evs is ordered within the current container
|
||||
// and with respect to the last container, if any.
|
||||
fn ingest_ordered(&mut self, evs: ContainerEvents<EVT>) -> Result<(), Error> {
|
||||
fn ingest_ordered(&mut self, evs: ContainerEventsTakeUpTo<EVT>) -> Result<(), Error> {
|
||||
if let Some(lst) = self.lst.as_mut() {
|
||||
self.inner_a.ingest_with_lst(evs, lst)
|
||||
self.inner_a.ingest_with_lst(evs, LstMut(lst))
|
||||
} else {
|
||||
if self.inner_a.minmax.is_some() {
|
||||
Err(Error::NoLstButMinMax)
|
||||
@@ -297,7 +364,7 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
pub fn ingest(&mut self, evs: ContainerEvents<EVT>) -> Result<(), Error> {
|
||||
pub fn ingest(&mut self, mut evs_all: ContainerEvents<EVT>) -> Result<(), Error> {
|
||||
// It is this type's task to find and store the one-before event.
|
||||
// We then pass it to the aggregation.
|
||||
// AggregatorTimeWeight needs a function for that.
|
||||
@@ -308,21 +375,72 @@ where
|
||||
// ALSO: need to keep track of the "lst". Probably best done in this type as well?
|
||||
|
||||
// TODO should rely on external stream adapter for verification to not duplicate things.
|
||||
evs.verify()?;
|
||||
evs_all.verify()?;
|
||||
|
||||
if let Some(ts) = evs.ts_first() {
|
||||
if let Some(lst) = self.lst.as_ref() {
|
||||
if ts < lst.ts {
|
||||
return Err(Error::Unordered);
|
||||
loop {
|
||||
// How to handle transition to the next bin?
|
||||
// How to handle to not emit bins until at least some partially filled bin is encountered?
|
||||
break if let Some(ts) = evs_all.ts_first() {
|
||||
let b = &mut self.inner_a.inner_b;
|
||||
if ts >= b.active_end {
|
||||
trace_cycle!("bin edge boundary {:?}", b.active_end);
|
||||
if let Some(lst) = self.lst.as_ref() {
|
||||
trace_cycle!("fill remaining width");
|
||||
self.inner_a
|
||||
.inner_b
|
||||
.ingest_event_with_lst_gt_range_beg_agg(lst.clone(), LstRef(lst));
|
||||
} else {
|
||||
// nothing to do
|
||||
}
|
||||
let b = &mut self.inner_a.inner_b;
|
||||
if b.filled_until < b.active_beg {
|
||||
panic!("fille until before bin begin");
|
||||
} else if b.filled_until == b.active_beg {
|
||||
// TODO bin is meaningless
|
||||
} else {
|
||||
// TODO need the output type.
|
||||
}
|
||||
trace_cycle!("cycle bin {:?} {:?}", ts, b.active_end);
|
||||
// TODO check if the bin has content to emit: either it itself contains events, or is filled with lst value.
|
||||
// For the check for filled with lst I might need another flag.
|
||||
let div = self.range.bin_len.ns();
|
||||
let ts1 = TsNano::from_ns(ts.ns() / div * div);
|
||||
b.active_beg = ts1;
|
||||
b.active_end = ts1.add_dt_nano(b.active_len);
|
||||
b.filled_until = ts1;
|
||||
b.cnt = 0;
|
||||
b.agg.reset_for_new_bin();
|
||||
trace_cycle!("cycled to {:?} {:?}", b.active_beg, b.active_end);
|
||||
}
|
||||
let n1 = evs_all.len();
|
||||
let len_before = evs_all.len_before(self.inner_a.inner_b.active_end);
|
||||
let evs = ContainerEventsTakeUpTo::new(&mut evs_all, len_before);
|
||||
if let Some(lst) = self.lst.as_ref() {
|
||||
if ts < lst.ts {
|
||||
return Err(Error::Unordered);
|
||||
} else {
|
||||
self.ingest_ordered(evs)?
|
||||
}
|
||||
} else {
|
||||
self.ingest_ordered(evs)
|
||||
self.ingest_ordered(evs)?
|
||||
};
|
||||
trace_ingest_container!("ingest after still left len evs {}", evs_all.len());
|
||||
let n2 = evs_all.len();
|
||||
if n2 != 0 {
|
||||
if n2 == n1 {
|
||||
panic!("no progress");
|
||||
}
|
||||
continue;
|
||||
}
|
||||
} else {
|
||||
self.ingest_ordered(evs)
|
||||
}
|
||||
} else {
|
||||
Ok(())
|
||||
()
|
||||
};
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn range_final(&mut self) -> Result<(), Error> {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
74
crates/items_2/src/binning/valuetype.rs
Normal file
74
crates/items_2/src/binning/valuetype.rs
Normal file
@@ -0,0 +1,74 @@
|
||||
use super::aggregator::AggregatorTimeWeight;
|
||||
use super::container_events::Container;
|
||||
use super::container_events::EventValueType;
|
||||
use crate::vecpreview::PreviewRange;
|
||||
use netpod::DtNano;
|
||||
use netpod::EnumVariant;
|
||||
use serde::Deserialize;
|
||||
use serde::Serialize;
|
||||
use std::collections::VecDeque;
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct EnumVariantContainer {
|
||||
ixs: VecDeque<u16>,
|
||||
names: VecDeque<String>,
|
||||
}
|
||||
|
||||
impl PreviewRange for EnumVariantContainer {
|
||||
fn preview(&self) -> &dyn core::fmt::Debug {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
||||
impl Container<EnumVariant> for EnumVariantContainer {
|
||||
fn new() -> Self {
|
||||
Self {
|
||||
ixs: VecDeque::new(),
|
||||
names: VecDeque::new(),
|
||||
}
|
||||
}
|
||||
|
||||
fn push_back(&mut self, val: EnumVariant) {
|
||||
let (ix, name) = val.into_parts();
|
||||
self.ixs.push_back(ix);
|
||||
self.names.push_back(name);
|
||||
}
|
||||
|
||||
fn pop_front(&mut self) -> Option<EnumVariant> {
|
||||
if let (Some(a), Some(b)) = (self.ixs.pop_front(), self.names.pop_front()) {
|
||||
Some(EnumVariant::new(a, b))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct EnumVariantAggregatorTimeWeight {
|
||||
sum: f32,
|
||||
}
|
||||
|
||||
impl AggregatorTimeWeight<EnumVariant> for EnumVariantAggregatorTimeWeight {
|
||||
fn new() -> Self {
|
||||
Self { sum: 0. }
|
||||
}
|
||||
|
||||
fn reset_for_new_bin(&mut self) {
|
||||
self.sum = 0.
|
||||
}
|
||||
|
||||
fn ingest(&mut self, dt: DtNano, bl: DtNano, val: EnumVariant) {
|
||||
let f = dt.ns() as f32 / bl.ns() as f32;
|
||||
eprintln!("INGEST {} {:?}", f, val);
|
||||
let h = items_0::scalar_ops::AsPrimF32::as_prim_f32_b(&val);
|
||||
self.sum += f * h;
|
||||
}
|
||||
}
|
||||
|
||||
impl EventValueType for EnumVariant {
|
||||
type Container = EnumVariantContainer;
|
||||
type AggregatorTimeWeight = EnumVariantAggregatorTimeWeight;
|
||||
|
||||
fn sum_identity() -> Self {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
@@ -691,6 +691,10 @@ impl EnumVariant {
|
||||
pub fn name_string(&self) -> String {
|
||||
self.name.clone()
|
||||
}
|
||||
|
||||
pub fn into_parts(self) -> (u16, String) {
|
||||
(self.ix, self.name)
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for EnumVariant {
|
||||
|
||||
Reference in New Issue
Block a user