This commit is contained in:
Dominik Werder
2024-09-21 21:20:42 +02:00
parent 26ce72d548
commit 096e0abdb7
2 changed files with 98 additions and 45 deletions

View File

@@ -25,7 +25,7 @@ pub trait Container<EVT>: fmt::Debug + Clone + PreviewRange + Serialize + for<'a
fn pop_front(&mut self) -> Option<EVT>;
}
pub trait EventValueType: fmt::Debug + Clone {
pub trait EventValueType: fmt::Debug + Clone + PartialOrd {
type Container: Container<Self>;
type AggregatorTimeWeight: AggregatorTimeWeight;
}

View File

@@ -29,13 +29,13 @@ pub enum Error {
type MinMax<EVT> = (EventSingle<EVT>, EventSingle<EVT>);
struct InnerA<EVT> {
struct InnerB<EVT> {
range: BinnedRange<TsNano>,
cnt: u64,
_t1: PhantomData<EVT>,
}
impl<EVT> InnerA<EVT>
impl<EVT> InnerB<EVT>
where
EVT: EventValueType,
{
@@ -117,25 +117,6 @@ where
Ok(())
}
fn ingest_with_lst_nominmax(
&mut self,
evs: ContainerEvents<EVT>,
lst: &mut EventSingle<EVT>,
minmax: &mut Option<MinMax<EVT>>,
) -> Result<(), Error> {
// TODO how to handle the min max? I don't take event data yet out of the container.
todo!("minmax handle");
if let Some(ts0) = evs.ts_first() {
if ts0 < self.range.nano_beg() {
Err(Error::AnotherBeforeRange)
} else {
self.ingest_with_lst_ge_range_beg(evs, lst)
}
} else {
Err(Error::EmptyContainerInnerHandler)
}
}
fn ingest_with_lst_minmax(
&mut self,
evs: ContainerEvents<EVT>,
@@ -156,13 +137,54 @@ where
}
}
struct InnerA<EVT> {
inner_b: InnerB<EVT>,
minmax: Option<(EventSingle<EVT>, EventSingle<EVT>)>,
}
impl<EVT> InnerA<EVT>
where
EVT: EventValueType,
{
fn ingest_with_lst_nominmax(
&mut self,
evs: ContainerEvents<EVT>,
lst: &mut EventSingle<EVT>,
minmax: &mut Option<MinMax<EVT>>,
) -> Result<(), Error> {
// TODO how to handle the min max? I don't take event data yet out of the container.
todo!("minmax handle");
if let Some(ts0) = evs.ts_first() {
if ts0 < self.inner_b.range.nano_beg() {
Err(Error::AnotherBeforeRange)
} else {
self.inner_b.ingest_with_lst_ge_range_beg(evs, lst)
}
} else {
Err(Error::EmptyContainerInnerHandler)
}
}
fn ingest_with_lst(&mut self, evs: ContainerEvents<EVT>, lst: &mut EventSingle<EVT>) -> Result<(), Error> {
if let Some(minmax) = self.minmax.as_mut() {
// TODO
// Maybe?
self.inner_b.ingest_with_lst_minmax(evs, lst, minmax)?;
todo!()
} else {
// self.ingest_with_lst_without_minmax(evs, lst)
// TODO we have lst, so expect that this next event gives us minmax.
todo!()
}
}
}
pub struct BinnedEventsTimeweight<EVT>
where
EVT: EventValueType,
{
inner_a: InnerA<EVT>,
lst: Option<EventSingle<EVT>>,
minmax: Option<(EventSingle<EVT>, EventSingle<EVT>)>,
}
impl<EVT> BinnedEventsTimeweight<EVT>
@@ -172,41 +194,76 @@ where
pub fn new(range: BinnedRange<TsNano>) -> Self {
Self {
inner_a: InnerA::<EVT> {
range,
cnt: 0,
_t1: PhantomData,
inner_b: InnerB {
range,
cnt: 0,
_t1: PhantomData,
},
minmax: None,
},
lst: None,
minmax: None,
}
}
fn apply_min_max(ev: &EventSingle<EVT>, minmax: &mut MinMax<EVT>) {
if ev.val < minmax.0.val {
minmax.0 = ev.clone();
}
if ev.val > minmax.1.val {
minmax.1 = ev.clone();
}
}
fn ingest_event_without_lst(&mut self, ev: EventSingle<EVT>) -> Result<(), Error> {
let range = &self.inner_a.range;
let range = &self.inner_a.inner_b.range;
let beg = range.nano_beg();
let end = range.nano_end();
if ev.ts < end {
self.lst = Some(ev.clone());
if ev.ts >= beg {
self.minmax = Some((ev.clone(), ev.clone()));
self.inner_a.cnt += 1;
self.inner_a.minmax = Some((ev.clone(), ev.clone()));
self.inner_a.inner_b.cnt += 1;
}
self.lst = Some(ev);
} else {
todo!("must cycle forward, can probably not produce a bin at all until then")
}
Ok(())
}
fn ingest_without_lst(&mut self, mut evs: ContainerEvents<EVT>) -> Result<(), Error> {
fn _________ingest_with_lst_without_minmax(
&mut self,
mut evs: ContainerEvents<EVT>,
lst: &mut EventSingle<EVT>,
) -> Result<(), Error> {
if let Some(ev) = evs.event_next() {
self.ingest_event_without_lst(ev)?;
}
if let Some(lst) = self.lst.as_mut() {
if let Some(minmax) = self.minmax.as_mut() {
self.inner_a.ingest_with_lst_minmax(evs, lst, minmax)
if ev.ts >= self.inner_a.inner_b.range.nano_end() {
// need to cycle, then apply the event again...
todo!("cycle and reapply");
} else {
self.inner_a.ingest_with_lst_nominmax(evs, lst, &mut self.minmax)
// self.ingest_event_with_lst_without_minmax(ev, lst)?;
Ok(())
}
} else {
Err(Error::NoLstAfterFirst)
Ok(())
}
}
fn ingest_without_lst(&mut self, mut evs: ContainerEvents<EVT>) -> Result<(), Error> {
if let Some(ev) = evs.event_next() {
if ev.ts >= self.inner_a.inner_b.range.nano_end() {
// need to cycle, then apply the event again...
// TODO write that retry as a loop with iter limit.
todo!("cycle and reapply");
} else {
self.ingest_event_without_lst(ev)?;
if let Some(lst) = self.lst.as_mut() {
self.inner_a.ingest_with_lst(evs, lst)
} else {
Err(Error::NoLstAfterFirst)
}
}
} else {
Ok(())
}
}
@@ -214,13 +271,9 @@ where
// and with respect to the last container, if any.
fn ingest_ordered(&mut self, evs: ContainerEvents<EVT>) -> Result<(), Error> {
if let Some(lst) = self.lst.as_mut() {
if let Some(minmax) = self.minmax.as_mut() {
self.inner_a.ingest_with_lst_minmax(evs, lst, minmax)
} else {
self.inner_a.ingest_with_lst_nominmax(evs, lst, &mut self.minmax)
}
self.inner_a.ingest_with_lst(evs, lst)
} else {
if self.minmax.is_some() {
if self.inner_a.minmax.is_some() {
Err(Error::NoLstButMinMax)
} else {
self.ingest_without_lst(evs)