This commit is contained in:
Dominik Werder
2024-09-21 22:45:44 +02:00
parent 096e0abdb7
commit 300a386a6a

View File

@@ -16,19 +16,28 @@ use std::task::Poll;
#[allow(unused)]
macro_rules! trace_init { ($($arg:tt)*) => ( if true { trace!($($arg)*); }) }
const DEBUG_CHECKS: bool = true;
#[derive(Debug, ThisError)]
#[cstm(name = "BinnedEventsTimeweight")]
pub enum Error {
BadContainer(#[from] super::super::container_events::EventsContainerError),
Unordered,
AnotherBeforeRange,
NoLstAfterFirst,
EmptyContainerInnerHandler,
NoLstButMinMax,
WithLstButEventBeforeRange,
WithMinMaxButEventBeforeRange,
NoMinMaxAfterInit,
ExpectEventInInnerVolumeRange,
}
type MinMax<EVT> = (EventSingle<EVT>, EventSingle<EVT>);
struct LstRef<'a, EVT>(&'a EventSingle<EVT>);
struct LstMut<'a, EVT>(&'a mut EventSingle<EVT>);
struct InnerB<EVT> {
range: BinnedRange<TsNano>,
cnt: u64,
@@ -39,11 +48,17 @@ impl<EVT> InnerB<EVT>
where
EVT: EventValueType,
{
fn ingest_event_with_lst_in_range(
fn ingest_event_with_lst_gt_range_beg(
&mut self,
ev: EventSingle<EVT>,
lst: &mut EventSingle<EVT>,
lst: LstMut<EVT>,
minmax: &mut MinMax<EVT>,
) -> Result<(), Error> {
if DEBUG_CHECKS {
if ev.ts <= self.range.nano_beg() {
return Err(Error::ExpectEventInInnerVolumeRange);
}
}
// Aggregator:
// Must handle min, max, avg, var.
// min and max is actually tricky and can not be done in one go with lst:
@@ -86,6 +101,7 @@ where
&mut self,
mut evs: ContainerEvents<EVT>,
lst: &mut EventSingle<EVT>,
minmax: &mut MinMax<EVT>,
) -> Result<(), Error> {
while let Some(ev) = evs.event_next() {
if true {
@@ -110,7 +126,7 @@ where
// If the event is after the current bin first edge, then min/max is initialized from the lst
// and there is a contribution from the lst to the avg.
self.ingest_event_with_lst_in_range(ev, lst)?;
self.ingest_event_with_lst_gt_range_beg(ev, LstMut(lst), minmax)?;
// TODO update the lst (needs clone?)
}
@@ -126,10 +142,14 @@ where
// TODO how to handle the min max? I don't take event data yet out of the container.
todo!("minmax handle");
if let Some(ts0) = evs.ts_first() {
if ts0 < self.range.nano_beg() {
Err(Error::AnotherBeforeRange)
let range = &self.range;
let beg = range.nano_beg();
let end = range.nano_end();
if ts0 < beg {
Err(Error::WithMinMaxButEventBeforeRange)
} else {
self.ingest_with_lst_ge_range_beg(evs, lst)
todo!();
self.ingest_with_lst_ge_range_beg(evs, lst, minmax)
}
} else {
Err(Error::EmptyContainerInnerHandler)
@@ -146,35 +166,58 @@ impl<EVT> InnerA<EVT>
where
EVT: EventValueType,
{
fn ingest_with_lst_nominmax(
&mut self,
evs: ContainerEvents<EVT>,
lst: &mut EventSingle<EVT>,
minmax: &mut Option<MinMax<EVT>>,
) -> Result<(), Error> {
// TODO how to handle the min max? I don't take event data yet out of the container.
todo!("minmax handle");
if let Some(ts0) = evs.ts_first() {
if ts0 < self.inner_b.range.nano_beg() {
Err(Error::AnotherBeforeRange)
} else {
self.inner_b.ingest_with_lst_ge_range_beg(evs, lst)
}
} else {
Err(Error::EmptyContainerInnerHandler)
fn apply_min_max(ev: &EventSingle<EVT>, minmax: &mut MinMax<EVT>) {
if ev.val < minmax.0.val {
minmax.0 = ev.clone();
}
if ev.val > minmax.1.val {
minmax.1 = ev.clone();
}
}
fn ingest_with_lst(&mut self, evs: ContainerEvents<EVT>, lst: &mut EventSingle<EVT>) -> Result<(), Error> {
fn init_minmax(&mut self, ev: &EventSingle<EVT>) {
todo!()
}
fn init_minmax_with_lst(&mut self, ev: &EventSingle<EVT>, lst: LstRef<EVT>) {
todo!()
}
fn ingest_with_lst(&mut self, mut evs: ContainerEvents<EVT>, lst: &mut EventSingle<EVT>) -> Result<(), Error> {
if let Some(minmax) = self.minmax.as_mut() {
// TODO
// Maybe?
self.inner_b.ingest_with_lst_minmax(evs, lst, minmax)?;
todo!()
self.inner_b.ingest_with_lst_minmax(evs, lst, minmax)
} else {
// self.ingest_with_lst_without_minmax(evs, lst)
// TODO we have lst, so expect that this next event gives us minmax.
todo!()
if let Some(ev) = evs.event_next() {
let range = &self.inner_b.range;
let beg = range.nano_beg();
let end = range.nano_end();
if ev.ts >= end {
// need to cycle, then apply the event again...
// TODO write that retry as a loop with iter limit.
todo!("cycle and reapply");
} else {
if DEBUG_CHECKS && ev.ts < beg {
return Err(Error::WithLstButEventBeforeRange);
} else if ev.ts == beg {
self.init_minmax(&ev);
todo!("this stops handling of event, but must apply to lst");
} else {
self.init_minmax_with_lst(&ev, LstRef(lst));
if let Some(minmax) = self.minmax.as_mut() {
// todo!("apply here also the event aggregation with everything except lst, min, max");
self.inner_b
.ingest_event_with_lst_gt_range_beg(ev, LstMut(lst), minmax)?;
todo!("this stops handling of event, but must apply to lst");
todo!("is this correct? we did already init minmax. calling this, it would get applied again?");
self.inner_b.ingest_with_lst_minmax(evs, lst, minmax)
} else {
Err(Error::NoMinMaxAfterInit)
}
}
}
} else {
Ok(())
}
}
}
}
@@ -205,15 +248,6 @@ where
}
}
fn apply_min_max(ev: &EventSingle<EVT>, minmax: &mut MinMax<EVT>) {
if ev.val < minmax.0.val {
minmax.0 = ev.clone();
}
if ev.val > minmax.1.val {
minmax.1 = ev.clone();
}
}
fn ingest_event_without_lst(&mut self, ev: EventSingle<EVT>) -> Result<(), Error> {
let range = &self.inner_a.inner_b.range;
let beg = range.nano_beg();
@@ -230,24 +264,6 @@ where
Ok(())
}
fn _________ingest_with_lst_without_minmax(
&mut self,
mut evs: ContainerEvents<EVT>,
lst: &mut EventSingle<EVT>,
) -> Result<(), Error> {
if let Some(ev) = evs.event_next() {
if ev.ts >= self.inner_a.inner_b.range.nano_end() {
// need to cycle, then apply the event again...
todo!("cycle and reapply");
} else {
// self.ingest_event_with_lst_without_minmax(ev, lst)?;
Ok(())
}
} else {
Ok(())
}
}
fn ingest_without_lst(&mut self, mut evs: ContainerEvents<EVT>) -> Result<(), Error> {
if let Some(ev) = evs.event_next() {
if ev.ts >= self.inner_a.inner_b.range.nano_end() {