This commit is contained in:
Dominik Werder
2024-09-21 14:01:36 +02:00
parent 99cb34bf57
commit 26ce72d548
2 changed files with 68 additions and 6 deletions

View File

@@ -48,6 +48,7 @@ impl EventValueType for f32 {
type AggregatorTimeWeight = AggregatorNumeric<Self>; type AggregatorTimeWeight = AggregatorNumeric<Self>;
} }
#[derive(Debug, Clone)]
pub struct EventSingle<EVT> { pub struct EventSingle<EVT> {
pub ts: TsNano, pub ts: TsNano,
pub val: EVT, pub val: EVT,
@@ -102,6 +103,12 @@ where
self.tss.back().map(|&x| x) self.tss.back().map(|&x| x)
} }
pub fn len_before(&self, end: TsNano) -> usize {
let pp = self.tss.partition_point(|&x| x < end);
assert!(pp <= self.len(), "len_before pp {} len {}", pp, self.len());
pp
}
pub fn event_next(&mut self) -> Option<EventSingle<EVT>> { pub fn event_next(&mut self) -> Option<EventSingle<EVT>> {
if let (Some(ts), Some(val)) = (self.tss.pop_front(), self.vals.pop_front()) { if let (Some(ts), Some(val)) = (self.tss.pop_front(), self.vals.pop_front()) {
Some(EventSingle { ts, val }) Some(EventSingle { ts, val })

View File

@@ -24,8 +24,11 @@ pub enum Error {
AnotherBeforeRange, AnotherBeforeRange,
NoLstAfterFirst, NoLstAfterFirst,
EmptyContainerInnerHandler, EmptyContainerInnerHandler,
NoLstButMinMax,
} }
type MinMax<EVT> = (EventSingle<EVT>, EventSingle<EVT>);
struct InnerA<EVT> { struct InnerA<EVT> {
range: BinnedRange<TsNano>, range: BinnedRange<TsNano>,
cnt: u64, cnt: u64,
@@ -91,9 +94,17 @@ where
// Do I maybe need both? // Do I maybe need both?
// How to handle to not emit bins until at least some partially filled bin is encountered? // How to handle to not emit bins until at least some partially filled bin is encountered?
// How to implement the bin cycle logic in clean way? // How to implement the bin cycle logic in clean way?
// How to emit things? Do to ship results to the caller?
todo!("must check if ev is already after range."); todo!("must check if ev is already after range.");
} }
if ev.ts >= self.range.nano_end() {
todo!("make sure that the transition to the next bin (if we want any next bin) works");
todo!("keep lst. we derive min/max from lst upon the first event in range");
// TODO where and how do I initialize min/max ?
}
// TODO if the event is exactly on the current bin first edge, then there is no contribution to the avg yet // TODO if the event is exactly on the current bin first edge, then there is no contribution to the avg yet
// and I must initialize the min/max with the current event. // and I must initialize the min/max with the current event.
// If the event is after the current bin first edge, then min/max is initialized from the lst // If the event is after the current bin first edge, then min/max is initialized from the lst
@@ -106,7 +117,33 @@ where
Ok(()) Ok(())
} }
fn ingest_with_lst(&mut self, evs: ContainerEvents<EVT>, lst: &mut EventSingle<EVT>) -> Result<(), Error> { fn ingest_with_lst_nominmax(
&mut self,
evs: ContainerEvents<EVT>,
lst: &mut EventSingle<EVT>,
minmax: &mut Option<MinMax<EVT>>,
) -> Result<(), Error> {
// TODO how to handle the min max? I don't take event data yet out of the container.
todo!("minmax handle");
if let Some(ts0) = evs.ts_first() {
if ts0 < self.range.nano_beg() {
Err(Error::AnotherBeforeRange)
} else {
self.ingest_with_lst_ge_range_beg(evs, lst)
}
} else {
Err(Error::EmptyContainerInnerHandler)
}
}
fn ingest_with_lst_minmax(
&mut self,
evs: ContainerEvents<EVT>,
lst: &mut EventSingle<EVT>,
minmax: &mut MinMax<EVT>,
) -> Result<(), Error> {
// TODO how to handle the min max? I don't take event data yet out of the container.
todo!("minmax handle");
if let Some(ts0) = evs.ts_first() { if let Some(ts0) = evs.ts_first() {
if ts0 < self.range.nano_beg() { if ts0 < self.range.nano_beg() {
Err(Error::AnotherBeforeRange) Err(Error::AnotherBeforeRange)
@@ -125,6 +162,7 @@ where
{ {
inner_a: InnerA<EVT>, inner_a: InnerA<EVT>,
lst: Option<EventSingle<EVT>>, lst: Option<EventSingle<EVT>>,
minmax: Option<(EventSingle<EVT>, EventSingle<EVT>)>,
} }
impl<EVT> BinnedEventsTimeweight<EVT> impl<EVT> BinnedEventsTimeweight<EVT>
@@ -139,12 +177,17 @@ where
_t1: PhantomData, _t1: PhantomData,
}, },
lst: None, lst: None,
minmax: None,
} }
} }
fn ingest_event_without_lst(&mut self, ev: EventSingle<EVT>) -> Result<(), Error> { fn ingest_event_without_lst(&mut self, ev: EventSingle<EVT>) -> Result<(), Error> {
if ev.ts < self.inner_a.range.nano_end() { let range = &self.inner_a.range;
if ev.ts >= self.inner_a.range.nano_beg() { let beg = range.nano_beg();
let end = range.nano_end();
if ev.ts < end {
if ev.ts >= beg {
self.minmax = Some((ev.clone(), ev.clone()));
self.inner_a.cnt += 1; self.inner_a.cnt += 1;
} }
self.lst = Some(ev); self.lst = Some(ev);
@@ -157,7 +200,11 @@ where
self.ingest_event_without_lst(ev)?; self.ingest_event_without_lst(ev)?;
} }
if let Some(lst) = self.lst.as_mut() { if let Some(lst) = self.lst.as_mut() {
self.inner_a.ingest_with_lst(evs, lst) if let Some(minmax) = self.minmax.as_mut() {
self.inner_a.ingest_with_lst_minmax(evs, lst, minmax)
} else {
self.inner_a.ingest_with_lst_nominmax(evs, lst, &mut self.minmax)
}
} else { } else {
Err(Error::NoLstAfterFirst) Err(Error::NoLstAfterFirst)
} }
@@ -167,9 +214,17 @@ where
// and with respect to the last container, if any. // and with respect to the last container, if any.
fn ingest_ordered(&mut self, evs: ContainerEvents<EVT>) -> Result<(), Error> { fn ingest_ordered(&mut self, evs: ContainerEvents<EVT>) -> Result<(), Error> {
if let Some(lst) = self.lst.as_mut() { if let Some(lst) = self.lst.as_mut() {
self.inner_a.ingest_with_lst(evs, lst) if let Some(minmax) = self.minmax.as_mut() {
self.inner_a.ingest_with_lst_minmax(evs, lst, minmax)
} else {
self.inner_a.ingest_with_lst_nominmax(evs, lst, &mut self.minmax)
}
} else { } else {
self.ingest_without_lst(evs) if self.minmax.is_some() {
Err(Error::NoLstButMinMax)
} else {
self.ingest_without_lst(evs)
}
} }
} }