From 585ecfe6a3219890cc82f197600663e00131672c Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Thu, 4 May 2023 15:17:38 +0200 Subject: [PATCH] Factor out parts of event bin --- items_0/src/items_0.rs | 4 + items_0/src/overlap.rs | 5 - items_2/src/binsdim0.rs | 32 +++--- items_2/src/binsxbindim0.rs | 2 +- items_2/src/eventsdim0.rs | 58 +++++++++-- items_2/src/eventsdim1.rs | 3 +- items_2/src/eventsxbindim0.rs | 7 +- items_2/src/items_2.rs | 2 +- items_2/src/timebin.rs | 185 ++++++++++++++++++++++++++++++++++ 9 files changed, 260 insertions(+), 38 deletions(-) diff --git a/items_0/src/items_0.rs b/items_0/src/items_0.rs index 838cc31..ec30754 100644 --- a/items_0/src/items_0.rs +++ b/items_0/src/items_0.rs @@ -44,6 +44,10 @@ pub trait AppendEmptyBin { fn append_empty_bin(&mut self, ts1: u64, ts2: u64); } +pub trait AppendAllFrom { + fn append_all_from(&mut self, src: &mut Self); +} + pub trait AsAnyRef { fn as_any_ref(&self) -> &dyn Any; } diff --git a/items_0/src/overlap.rs b/items_0/src/overlap.rs index 5d62ce4..01b964d 100644 --- a/items_0/src/overlap.rs +++ b/items_0/src/overlap.rs @@ -9,11 +9,6 @@ pub trait HasTimestampDeque { fn pulse_max(&self) -> Option; } -pub trait RangeOverlapCmp { - fn range_overlap_cmp_beg(a: u64, b: u64) -> bool; - fn range_overlap_cmp_end(a: u64, b: u64) -> bool; -} - pub trait RangeOverlapInfo { fn ends_before(&self, range: &SeriesRange) -> bool; fn ends_after(&self, range: &SeriesRange) -> bool; diff --git a/items_2/src/binsdim0.rs b/items_2/src/binsdim0.rs index c69935d..97232db 100644 --- a/items_2/src/binsdim0.rs +++ b/items_2/src/binsdim0.rs @@ -18,6 +18,7 @@ use items_0::timebin::TimeBinnable; use items_0::timebin::TimeBinned; use items_0::timebin::TimeBinner; use items_0::timebin::TimeBins; +use items_0::AppendAllFrom; use items_0::AppendEmptyBin; use items_0::AsAnyMut; use items_0::AsAnyRef; @@ -106,24 +107,6 @@ impl BinsDim0 { self.avgs.push_back(avg); } - pub fn append_zero(&mut self, beg: u64, end: u64) { - self.ts1s.push_back(beg); - self.ts2s.push_back(end); - self.counts.push_back(0); - self.mins.push_back(NTY::zero_b()); - self.maxs.push_back(NTY::zero_b()); - self.avgs.push_back(0.); - } - - pub fn append_all_from(&mut self, src: &mut Self) { - self.ts1s.extend(src.ts1s.drain(..)); - self.ts2s.extend(src.ts2s.drain(..)); - self.counts.extend(src.counts.drain(..)); - self.mins.extend(src.mins.drain(..)); - self.maxs.extend(src.maxs.drain(..)); - self.avgs.extend(src.avgs.drain(..)); - } - pub fn equal_slack(&self, other: &Self) -> bool { if self.len() != other.len() { return false; @@ -226,6 +209,17 @@ impl AppendEmptyBin for BinsDim0 { } } +impl AppendAllFrom for BinsDim0 { + fn append_all_from(&mut self, src: &mut Self) { + self.ts1s.extend(src.ts1s.drain(..)); + self.ts2s.extend(src.ts2s.drain(..)); + self.counts.extend(src.counts.drain(..)); + self.mins.extend(src.mins.drain(..)); + self.maxs.extend(src.maxs.drain(..)); + self.avgs.extend(src.avgs.drain(..)); + } +} + impl TimeBins for BinsDim0 { fn ts_min(&self) -> Option { self.ts1s.front().map(Clone::clone) @@ -559,7 +553,7 @@ impl TimeBinnableTypeAggregator for BinsDim0Aggregator { todo!() } - fn result_reset(&mut self, range: SeriesRange, _expand: bool) -> Self::Output { + fn result_reset(&mut self, range: SeriesRange) -> Self::Output { /*if self.sumc > 0 { self.avg = self.sum / self.sumc as f32; } diff --git a/items_2/src/binsxbindim0.rs b/items_2/src/binsxbindim0.rs index 0220431..99b8f6d 100644 --- a/items_2/src/binsxbindim0.rs +++ b/items_2/src/binsxbindim0.rs @@ -537,7 +537,7 @@ impl TimeBinnableTypeAggregator for BinsXbinDim0Aggregator todo!() } - fn result_reset(&mut self, range: SeriesRange, _expand: bool) -> Self::Output { + fn result_reset(&mut self, range: SeriesRange) -> Self::Output { /*if self.sumc > 0 { self.avg = self.sum / self.sumc as f32; } diff --git a/items_2/src/eventsdim0.rs b/items_2/src/eventsdim0.rs index 4c43b78..dd02036 100644 --- a/items_2/src/eventsdim0.rs +++ b/items_2/src/eventsdim0.rs @@ -1,6 +1,7 @@ use crate::binsdim0::BinsDim0; use crate::framable::FrameType; use crate::framable::FrameTypeStatic; +use crate::timebin::TimeBinnerCommonV0Trait; use crate::IsoDateTime; use crate::RangeOverlapInfo; use crate::TimeBinnableType; @@ -15,11 +16,12 @@ use items_0::collect_s::ToJsonResult; use items_0::container::ByteEstimate; use items_0::framable::FrameTypeInnerStatic; use items_0::overlap::HasTimestampDeque; -use items_0::overlap::RangeOverlapCmp; use items_0::scalar_ops::ScalarOps; use items_0::timebin::TimeBinnable; use items_0::timebin::TimeBinned; use items_0::timebin::TimeBinner; +use items_0::AppendAllFrom; +use items_0::AppendEmptyBin; use items_0::Appendable; use items_0::AsAnyMut; use items_0::AsAnyRef; @@ -713,7 +715,7 @@ impl TimeBinnableTypeAggregator for EventsDim0Aggregator { } } - fn result_reset(&mut self, range: SeriesRange, _expand: bool) -> Self::Output { + fn result_reset(&mut self, range: SeriesRange) -> Self::Output { trace!("result_reset {:?}", range); if self.do_time_weight { self.result_reset_time_weight(range) @@ -959,6 +961,50 @@ impl EventsDim0TimeBinner { } } +impl TimeBinnerCommonV0Trait for EventsDim0TimeBinner { + type Input = as TimeBinnableTypeAggregator>::Input; + type Output = as TimeBinnableTypeAggregator>::Output; + + fn type_name() -> &'static str { + Self::type_name() + } + + fn common_range_current(&self) -> &SeriesRange { + self.agg.range() + } + + fn common_cycle(&mut self) { + todo!() + } + + fn common_has_more_range(&self) -> bool { + todo!() + } + + fn common_bins_ready_count(&self) -> usize { + todo!() + } + + fn common_next_bin_range(&mut self) -> Option { + todo!() + } + + fn common_take_or_append_all_from(&mut self, mut item: Self::Output) { + match self.ready.as_mut() { + Some(ready) => { + ready.append_all_from(&mut item); + } + None => { + self.ready = Some(item); + } + } + } + + fn common_result_reset(&mut self, range: SeriesRange) -> Self::Output { + todo!() + } +} + impl TimeBinner for EventsDim0TimeBinner { fn bins_ready_count(&self) -> usize { match &self.ready { @@ -1042,18 +1088,18 @@ impl TimeBinner for EventsDim0TimeBinner { // the rest of the time range. if self.rng.is_none() { } else { - let expand = true; let range_next = self.next_bin_range(); self.rng = range_next.clone(); let mut bins = if let Some(range_next) = range_next { - self.agg.result_reset(range_next, expand) + self.agg.result_reset(range_next) } else { // Acts as placeholder + // TODO clean up let range_next = NanoRange { beg: u64::MAX - 1, end: u64::MAX, }; - self.agg.result_reset(range_next.into(), expand) + self.agg.result_reset(range_next.into()) }; if bins.len() != 1 { error!("{self_name}::push_in_progress bins.len() {}", bins.len()); @@ -1085,7 +1131,7 @@ impl TimeBinner for EventsDim0TimeBinner { if let Some(range) = range_next { let mut bins = BinsDim0::empty(); if range.is_time() { - bins.append_zero(range.beg_u64(), range.end_u64()); + bins.append_empty_bin(range.beg_u64(), range.end_u64()); } else { error!("TODO {self_name}::cycle is_pulse"); } diff --git a/items_2/src/eventsdim1.rs b/items_2/src/eventsdim1.rs index 1717efd..cbd16b0 100644 --- a/items_2/src/eventsdim1.rs +++ b/items_2/src/eventsdim1.rs @@ -14,7 +14,6 @@ use items_0::collect_s::ToJsonBytes; use items_0::collect_s::ToJsonResult; use items_0::container::ByteEstimate; use items_0::overlap::HasTimestampDeque; -use items_0::overlap::RangeOverlapCmp; use items_0::scalar_ops::ScalarOps; use items_0::timebin::TimeBinnable; use items_0::timebin::TimeBinned; @@ -638,7 +637,7 @@ impl TimeBinnableTypeAggregator for EventsDim1Aggregator { } } - fn result_reset(&mut self, range: SeriesRange, expand: bool) -> Self::Output { + fn result_reset(&mut self, range: SeriesRange) -> Self::Output { /*trace!("result_reset {} {}", range.beg, range.end); if self.do_time_weight { self.result_reset_time_weight(range, expand) diff --git a/items_2/src/eventsxbindim0.rs b/items_2/src/eventsxbindim0.rs index 7ccfb91..58ac82c 100644 --- a/items_2/src/eventsxbindim0.rs +++ b/items_2/src/eventsxbindim0.rs @@ -12,7 +12,6 @@ use items_0::collect_s::ToJsonBytes; use items_0::collect_s::ToJsonResult; use items_0::container::ByteEstimate; use items_0::overlap::HasTimestampDeque; -use items_0::overlap::RangeOverlapCmp; use items_0::scalar_ops::ScalarOps; use items_0::timebin::TimeBinnable; use items_0::timebin::TimeBinned; @@ -508,14 +507,14 @@ impl TimeBinner for EventsXbinDim0TimeBinner { trace!("\n+++++\n+++++\n{} range_next {:?}", Self::type_name(), range_next); self.rng = range_next.clone(); let mut bins = if let Some(range_next) = range_next { - self.agg.result_reset(range_next, expand) + self.agg.result_reset(range_next) } else { // Acts as placeholder let range_next = NanoRange { beg: u64::MAX - 1, end: u64::MAX, }; - self.agg.result_reset(range_next.into(), expand) + self.agg.result_reset(range_next.into()) }; if bins.len() != 1 { error!("{}::push_in_progress bins.len() {}", Self::type_name(), bins.len()); @@ -855,7 +854,7 @@ where } } - fn result_reset(&mut self, range: SeriesRange, expand: bool) -> Self::Output { + fn result_reset(&mut self, range: SeriesRange) -> Self::Output { if self.do_time_weight { self.result_reset_time_weight(range) } else { diff --git a/items_2/src/items_2.rs b/items_2/src/items_2.rs index 972e1b8..eb9ef45 100644 --- a/items_2/src/items_2.rs +++ b/items_2/src/items_2.rs @@ -197,7 +197,7 @@ pub trait TimeBinnableTypeAggregator: Send { type Output: TimeBinnableType; fn range(&self) -> &SeriesRange; fn ingest(&mut self, item: &Self::Input); - fn result_reset(&mut self, range: SeriesRange, expand: bool) -> Self::Output; + fn result_reset(&mut self, range: SeriesRange) -> Self::Output; } pub trait ChannelEventsInput: Stream> + EventTransform + Send {} diff --git a/items_2/src/timebin.rs b/items_2/src/timebin.rs index 8b13789..170e942 100644 --- a/items_2/src/timebin.rs +++ b/items_2/src/timebin.rs @@ -1 +1,186 @@ +use crate::eventsdim0::EventsDim0TimeBinner; +use items_0::overlap::RangeOverlapInfo; +use items_0::scalar_ops::ScalarOps; +use items_0::timebin::TimeBinnable; +use items_0::AppendEmptyBin; +use items_0::Appendable; +use items_0::Empty; +use items_0::WithLen; +use netpod::log::*; +use netpod::range::evrange::NanoRange; +use netpod::range::evrange::SeriesRange; +use std::any; +#[allow(unused)] +macro_rules! trace_ingest { + ($($arg:tt)*) => {}; + ($($arg:tt)*) => { trace!($($arg)*); }; +} + +#[allow(unused)] +macro_rules! trace_ingest_item { + ($($arg:tt)*) => {}; + ($($arg:tt)*) => { trace!($($arg)*); }; +} + +#[allow(unused)] +macro_rules! trace2 { + ($($arg:tt)*) => {}; + ($($arg:tt)*) => { trace!($($arg)*); }; +} + +pub trait TimeBinnerCommonV0Trait { + type Input: RangeOverlapInfo + 'static; + type Output: WithLen + Empty + AppendEmptyBin + 'static; + fn type_name() -> &'static str; + fn common_bins_ready_count(&self) -> usize; + fn common_range_current(&self) -> &SeriesRange; + fn common_next_bin_range(&mut self) -> Option; + fn common_cycle(&mut self); + fn common_has_more_range(&self) -> bool; + fn common_take_or_append_all_from(&mut self, item: Self::Output); + fn common_result_reset(&mut self, range: SeriesRange) -> Self::Output; +} + +pub struct TimeBinnerCommonV0Func {} + +impl TimeBinnerCommonV0Func { + pub fn agg_ingest(binner: &mut B, item: &mut ::Input) + where + B: TimeBinnerCommonV0Trait, + { + //self.agg.ingest(item); + todo!() + } + + pub fn ingest(binner: &mut B, item: &mut dyn TimeBinnable) + where + B: TimeBinnerCommonV0Trait, + { + let self_name = B::type_name(); + trace_ingest_item!( + "TimeBinner for {} ingest agg.range {:?} item {:?}", + Self::type_name(), + self.agg.range(), + item + ); + if item.len() == 0 { + // Return already here, RangeOverlapInfo would not give much sense. + return; + } + // TODO optimize by remembering at which event array index we have arrived. + // That needs modified interfaces which can take and yield the start and latest index. + // Or consume the input data. + loop { + while item.starts_after(B::common_range_current(binner)) { + trace_ingest_item!("{self_name} ignore item and cycle starts_after"); + TimeBinnerCommonV0Func::cycle(binner); + if !B::common_has_more_range(binner) { + debug!("{self_name} no more bin in edges after starts_after"); + return; + } + } + if item.ends_before(B::common_range_current(binner)) { + trace_ingest_item!("{self_name} ignore item ends_before"); + return; + } else { + if !B::common_has_more_range(binner) { + trace_ingest_item!("{self_name} no more bin in edges"); + return; + } else { + if let Some(item) = item + .as_any_mut() + // TODO make statically sure that we attempt to cast to the correct type here: + .downcast_mut::() + { + // TODO collect statistics associated with this request: + trace_ingest_item!("{self_name} FEED THE ITEM..."); + TimeBinnerCommonV0Func::agg_ingest(binner, item); + if item.ends_after(B::common_range_current(binner)) { + trace_ingest_item!( + "{self_name} FED ITEM, ENDS AFTER agg-range {:?}", + B::common_range_current(binner) + ); + TimeBinnerCommonV0Func::cycle(binner); + if !B::common_has_more_range(binner) { + warn!("{self_name} no more bin in edges after ingest and cycle"); + return; + } else { + trace_ingest_item!("{self_name} item fed, cycled, continue"); + } + } else { + trace_ingest_item!("{self_name} item fed, break"); + break; + } + } else { + error!("{self_name}::ingest unexpected item type"); + }; + } + } + } + } + + fn push_in_progress(binner: &mut B, push_empty: bool) + where + B: TimeBinnerCommonV0Trait, + { + let self_name = B::type_name(); + trace_ingest_item!("{self_name}::push_in_progress push_empty {push_empty}"); + // TODO expand should be derived from AggKind. Is it still required after all? + // TODO here, the expand means that agg will assume that the current value is kept constant during + // the rest of the time range. + if B::common_has_more_range(binner) { + let range_next = TimeBinnerCommonV0Trait::common_next_bin_range(binner); + let bins = if let Some(range_next) = range_next { + TimeBinnerCommonV0Trait::common_result_reset(binner, range_next) + //self.agg.result_reset(range_next, expand) + } else { + // Acts as placeholder + // TODO clean up + let range_next = NanoRange { + beg: u64::MAX - 1, + end: u64::MAX, + }; + TimeBinnerCommonV0Trait::common_result_reset(binner, range_next.into()) + //self.agg.result_reset(range_next.into(), expand) + }; + if bins.len() != 1 { + error!("{self_name}::push_in_progress bins.len() {}", bins.len()); + return; + } else { + //if push_empty || bins.counts[0] != 0 { + if push_empty { + TimeBinnerCommonV0Trait::common_take_or_append_all_from(binner, bins); + } + } + } + } + + fn cycle(binner: &mut B) + where + B: TimeBinnerCommonV0Trait, + { + let self_name = any::type_name::(); + trace_ingest_item!("{self_name}::cycle"); + // TODO refactor this logic. + let n = TimeBinnerCommonV0Trait::common_bins_ready_count(binner); + TimeBinnerCommonV0Func::push_in_progress(binner, true); + if TimeBinnerCommonV0Trait::common_bins_ready_count(binner) == n { + let range_next = TimeBinnerCommonV0Trait::common_next_bin_range(binner); + if let Some(range) = range_next { + let mut bins = ::Output::empty(); + if range.is_time() { + bins.append_empty_bin(range.beg_u64(), range.end_u64()); + } else { + error!("TODO {self_name}::cycle is_pulse"); + } + TimeBinnerCommonV0Trait::common_take_or_append_all_from(binner, bins); + if TimeBinnerCommonV0Trait::common_bins_ready_count(binner) <= n { + error!("failed to push a zero bin"); + } + } else { + warn!("cycle: no in-progress bin pushed, but also no more bin to add as zero-bin"); + } + } + } +}