Use factored code path
This commit is contained in:
@@ -48,6 +48,11 @@ pub trait AppendAllFrom {
|
||||
fn append_all_from(&mut self, src: &mut Self);
|
||||
}
|
||||
|
||||
// TODO check usage, probably only for legacy
|
||||
pub trait HasNonemptyFirstBin {
|
||||
fn has_nonempty_first_bin(&self) -> bool;
|
||||
}
|
||||
|
||||
pub trait AsAnyRef {
|
||||
fn as_any_ref(&self) -> &dyn Any;
|
||||
}
|
||||
|
||||
@@ -23,6 +23,7 @@ use items_0::AppendEmptyBin;
|
||||
use items_0::AsAnyMut;
|
||||
use items_0::AsAnyRef;
|
||||
use items_0::Empty;
|
||||
use items_0::HasNonemptyFirstBin;
|
||||
use items_0::TypeName;
|
||||
use items_0::WithLen;
|
||||
use netpod::is_false;
|
||||
@@ -178,6 +179,12 @@ impl<NTY> WithLen for BinsDim0<NTY> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<STY: ScalarOps> HasNonemptyFirstBin for BinsDim0<STY> {
|
||||
fn has_nonempty_first_bin(&self) -> bool {
|
||||
self.counts.front().map_or(false, |x| *x > 0)
|
||||
}
|
||||
}
|
||||
|
||||
impl<STY: ScalarOps> HasTimestampDeque for BinsDim0<STY> {
|
||||
fn timestamp_min(&self) -> Option<u64> {
|
||||
self.ts1s.front().map(|x| *x)
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
use crate::binsdim0::BinsDim0;
|
||||
use crate::framable::FrameType;
|
||||
use crate::framable::FrameTypeStatic;
|
||||
use crate::timebin::TimeBinnerCommonV0Func;
|
||||
use crate::timebin::TimeBinnerCommonV0Trait;
|
||||
use crate::IsoDateTime;
|
||||
use crate::RangeOverlapInfo;
|
||||
@@ -28,6 +29,7 @@ use items_0::AsAnyRef;
|
||||
use items_0::Empty;
|
||||
use items_0::Events;
|
||||
use items_0::EventsNonObj;
|
||||
use items_0::HasNonemptyFirstBin;
|
||||
use items_0::MergeError;
|
||||
use items_0::TypeName;
|
||||
use items_0::WithLen;
|
||||
@@ -973,20 +975,19 @@ impl<STY: ScalarOps> TimeBinnerCommonV0Trait for EventsDim0TimeBinner<STY> {
|
||||
self.agg.range()
|
||||
}
|
||||
|
||||
fn common_cycle(&mut self) {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn common_has_more_range(&self) -> bool {
|
||||
todo!()
|
||||
self.rng.is_some()
|
||||
}
|
||||
|
||||
fn common_bins_ready_count(&self) -> usize {
|
||||
todo!()
|
||||
match &self.ready {
|
||||
Some(k) => k.len(),
|
||||
None => 0,
|
||||
}
|
||||
}
|
||||
|
||||
fn common_next_bin_range(&mut self) -> Option<SeriesRange> {
|
||||
todo!()
|
||||
self.next_bin_range()
|
||||
}
|
||||
|
||||
fn common_take_or_append_all_from(&mut self, mut item: Self::Output) {
|
||||
@@ -1001,16 +1002,17 @@ impl<STY: ScalarOps> TimeBinnerCommonV0Trait for EventsDim0TimeBinner<STY> {
|
||||
}
|
||||
|
||||
fn common_result_reset(&mut self, range: SeriesRange) -> Self::Output {
|
||||
todo!()
|
||||
self.agg.result_reset(range)
|
||||
}
|
||||
|
||||
fn common_agg_ingest(&mut self, item: &mut Self::Input) {
|
||||
self.agg.ingest(item)
|
||||
}
|
||||
}
|
||||
|
||||
impl<STY: ScalarOps> TimeBinner for EventsDim0TimeBinner<STY> {
|
||||
fn bins_ready_count(&self) -> usize {
|
||||
match &self.ready {
|
||||
Some(k) => k.len(),
|
||||
None => 0,
|
||||
}
|
||||
TimeBinnerCommonV0Trait::common_bins_ready_count(self)
|
||||
}
|
||||
|
||||
fn bins_ready(&mut self) -> Option<Box<dyn TimeBinned>> {
|
||||
@@ -1021,135 +1023,15 @@ impl<STY: ScalarOps> TimeBinner for EventsDim0TimeBinner<STY> {
|
||||
}
|
||||
|
||||
fn ingest(&mut self, item: &mut dyn TimeBinnable) {
|
||||
let self_name = Self::type_name();
|
||||
trace_ingest_item!(
|
||||
"TimeBinner for {} ingest agg.range {:?} item {:?}",
|
||||
Self::type_name(),
|
||||
self.agg.range(),
|
||||
item
|
||||
);
|
||||
if item.len() == 0 {
|
||||
// Return already here, RangeOverlapInfo would not give much sense.
|
||||
return;
|
||||
}
|
||||
// TODO optimize by remembering at which event array index we have arrived.
|
||||
// That needs modified interfaces which can take and yield the start and latest index.
|
||||
loop {
|
||||
while item.starts_after(self.agg.range()) {
|
||||
trace_ingest_item!("{self_name} ignore item and cycle starts_after");
|
||||
self.cycle();
|
||||
if self.rng.is_none() {
|
||||
debug!("{self_name} no more bin in edges after starts_after");
|
||||
return;
|
||||
}
|
||||
}
|
||||
if item.ends_before(self.agg.range()) {
|
||||
trace_ingest_item!("{self_name} ignore item ends_before");
|
||||
return;
|
||||
} else {
|
||||
if self.rng.is_none() {
|
||||
trace_ingest_item!("{self_name} no more bin in edges");
|
||||
return;
|
||||
} else {
|
||||
if let Some(item) = item
|
||||
.as_any_ref()
|
||||
// TODO make statically sure that we attempt to cast to the correct type here:
|
||||
.downcast_ref::<<EventsDim0Aggregator<STY> as TimeBinnableTypeAggregator>::Input>()
|
||||
{
|
||||
// TODO collect statistics associated with this request:
|
||||
trace_ingest_item!("{self_name} FEED THE ITEM...");
|
||||
self.agg.ingest(item);
|
||||
if item.ends_after(self.agg.range()) {
|
||||
trace_ingest_item!("{self_name} FED ITEM, ENDS AFTER agg-range {:?}", self.agg.range());
|
||||
self.cycle();
|
||||
if self.rng.is_none() {
|
||||
warn!("{self_name} no more bin in edges after ingest and cycle");
|
||||
return;
|
||||
} else {
|
||||
trace_ingest_item!("{self_name} item fed, cycled, continue");
|
||||
}
|
||||
} else {
|
||||
trace_ingest_item!("{self_name} item fed, break");
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
error!("{self_name}::ingest unexpected item type");
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
TimeBinnerCommonV0Func::ingest(self, item)
|
||||
}
|
||||
|
||||
fn push_in_progress(&mut self, push_empty: bool) {
|
||||
let self_name = any::type_name::<Self>();
|
||||
trace_ingest_item!("{self_name}::push_in_progress push_empty {push_empty}");
|
||||
// TODO expand should be derived from AggKind. Is it still required after all?
|
||||
// TODO here, the expand means that agg will assume that the current value is kept constant during
|
||||
// the rest of the time range.
|
||||
if self.rng.is_none() {
|
||||
} else {
|
||||
let range_next = self.next_bin_range();
|
||||
self.rng = range_next.clone();
|
||||
let mut bins = if let Some(range_next) = range_next {
|
||||
self.agg.result_reset(range_next)
|
||||
} else {
|
||||
// Acts as placeholder
|
||||
// TODO clean up
|
||||
let range_next = NanoRange {
|
||||
beg: u64::MAX - 1,
|
||||
end: u64::MAX,
|
||||
};
|
||||
self.agg.result_reset(range_next.into())
|
||||
};
|
||||
if bins.len() != 1 {
|
||||
error!("{self_name}::push_in_progress bins.len() {}", bins.len());
|
||||
return;
|
||||
} else {
|
||||
if push_empty || bins.counts[0] != 0 {
|
||||
match self.ready.as_mut() {
|
||||
Some(ready) => {
|
||||
ready.append_all_from(&mut bins);
|
||||
}
|
||||
None => {
|
||||
self.ready = Some(bins);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
TimeBinnerCommonV0Func::push_in_progress(self, push_empty)
|
||||
}
|
||||
|
||||
fn cycle(&mut self) {
|
||||
let self_name = any::type_name::<Self>();
|
||||
trace_ingest_item!("{self_name}::cycle");
|
||||
// TODO refactor this logic.
|
||||
let n = self.bins_ready_count();
|
||||
self.push_in_progress(true);
|
||||
if self.bins_ready_count() == n {
|
||||
let range_next = self.next_bin_range();
|
||||
self.rng = range_next.clone();
|
||||
if let Some(range) = range_next {
|
||||
let mut bins = BinsDim0::empty();
|
||||
if range.is_time() {
|
||||
bins.append_empty_bin(range.beg_u64(), range.end_u64());
|
||||
} else {
|
||||
error!("TODO {self_name}::cycle is_pulse");
|
||||
}
|
||||
match self.ready.as_mut() {
|
||||
Some(ready) => {
|
||||
ready.append_all_from(&mut bins);
|
||||
}
|
||||
None => {
|
||||
self.ready = Some(bins);
|
||||
}
|
||||
}
|
||||
if self.bins_ready_count() <= n {
|
||||
error!("failed to push a zero bin");
|
||||
}
|
||||
} else {
|
||||
warn!("cycle: no in-progress bin pushed, but also no more bin to add as zero-bin");
|
||||
}
|
||||
}
|
||||
TimeBinnerCommonV0Func::cycle(self)
|
||||
}
|
||||
|
||||
fn set_range_complete(&mut self) {
|
||||
|
||||
@@ -5,6 +5,7 @@ use items_0::timebin::TimeBinnable;
|
||||
use items_0::AppendEmptyBin;
|
||||
use items_0::Appendable;
|
||||
use items_0::Empty;
|
||||
use items_0::HasNonemptyFirstBin;
|
||||
use items_0::WithLen;
|
||||
use netpod::log::*;
|
||||
use netpod::range::evrange::NanoRange;
|
||||
@@ -31,15 +32,15 @@ macro_rules! trace2 {
|
||||
|
||||
pub trait TimeBinnerCommonV0Trait {
|
||||
type Input: RangeOverlapInfo + 'static;
|
||||
type Output: WithLen + Empty + AppendEmptyBin + 'static;
|
||||
type Output: WithLen + Empty + AppendEmptyBin + HasNonemptyFirstBin + 'static;
|
||||
fn type_name() -> &'static str;
|
||||
fn common_bins_ready_count(&self) -> usize;
|
||||
fn common_range_current(&self) -> &SeriesRange;
|
||||
fn common_next_bin_range(&mut self) -> Option<SeriesRange>;
|
||||
fn common_cycle(&mut self);
|
||||
fn common_has_more_range(&self) -> bool;
|
||||
fn common_take_or_append_all_from(&mut self, item: Self::Output);
|
||||
fn common_result_reset(&mut self, range: SeriesRange) -> Self::Output;
|
||||
fn common_agg_ingest(&mut self, item: &mut Self::Input);
|
||||
}
|
||||
|
||||
pub struct TimeBinnerCommonV0Func {}
|
||||
@@ -50,7 +51,7 @@ impl TimeBinnerCommonV0Func {
|
||||
B: TimeBinnerCommonV0Trait,
|
||||
{
|
||||
//self.agg.ingest(item);
|
||||
todo!()
|
||||
<B as TimeBinnerCommonV0Trait>::common_agg_ingest(binner, item)
|
||||
}
|
||||
|
||||
pub fn ingest<B>(binner: &mut B, item: &mut dyn TimeBinnable)
|
||||
@@ -120,7 +121,7 @@ impl TimeBinnerCommonV0Func {
|
||||
}
|
||||
}
|
||||
|
||||
fn push_in_progress<B>(binner: &mut B, push_empty: bool)
|
||||
pub fn push_in_progress<B>(binner: &mut B, push_empty: bool)
|
||||
where
|
||||
B: TimeBinnerCommonV0Trait,
|
||||
{
|
||||
@@ -148,15 +149,14 @@ impl TimeBinnerCommonV0Func {
|
||||
error!("{self_name}::push_in_progress bins.len() {}", bins.len());
|
||||
return;
|
||||
} else {
|
||||
//if push_empty || bins.counts[0] != 0 {
|
||||
if push_empty {
|
||||
if push_empty || HasNonemptyFirstBin::has_nonempty_first_bin(&bins) {
|
||||
TimeBinnerCommonV0Trait::common_take_or_append_all_from(binner, bins);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn cycle<B>(binner: &mut B)
|
||||
pub fn cycle<B>(binner: &mut B)
|
||||
where
|
||||
B: TimeBinnerCommonV0Trait,
|
||||
{
|
||||
|
||||
Reference in New Issue
Block a user