Use separate return type

This commit is contained in:
Dominik Werder
2025-02-18 15:58:30 +01:00
parent 14fb35a1b3
commit 0910443255
2 changed files with 16 additions and 10 deletions

View File

@@ -7,15 +7,18 @@ use crate::binning::container_events::EventSingleRef;
use crate::binning::container_events::EventValueType;
use crate::binning::container_events::PartialOrdEvtA;
use crate::log;
use items_0::timebin::IngestReport;
use netpod::BinnedRange;
use netpod::DtNano;
use netpod::TsNano;
use std::fmt;
use std::mem;
macro_rules! info { ($($arg:expr),*) => ( if true { log::info!($($arg),*); } ) }
macro_rules! debug { ($($arg:expr),*) => ( if true { log::debug!($($arg),*); } ) }
macro_rules! trace_ { ($($arg:expr),*) => ( if true { log::trace!($($arg),*); } ) }
macro_rules! trace_ { ($($arg:expr),*) => ( if false { log::trace!($($arg),*); } ) }
macro_rules! trace_init { ($($arg:expr),*) => ( if true { trace_!($($arg),*); } ) }
@@ -23,9 +26,11 @@ macro_rules! trace_output { ($($arg:expr),*) => ( if true { trace_!($($arg),*);
macro_rules! trace_cycle { ($($arg:expr),*) => ( if true { trace_!($($arg),*); }) }
macro_rules! trace_event_next { ($fmt:expr, $($arg:expr),*) => ( if true {
trace_!(concat!("\x1b[1mEVENT POP FRONT\x1b[0m ", $fmt), $($arg),*);
}) }
macro_rules! trace_event_next { ($fmt:expr, $($arg:expr),*) => (
if false {
trace_!(concat!("\x1b[1mEVENT POP FRONT\x1b[0m ", $fmt), $($arg),*);
}
) }
macro_rules! trace_ingest_init_lst { ($($arg:expr),*) => ( if true { trace_!($($arg),*); }) }
@@ -546,7 +551,7 @@ where
let b = &self.inner_a.inner_b;
if self.out.len() > OUT_LEN_MAX {
// TODO change api such that we can produce arbitrary bins as stream.
panic!("cycle_01 break out len {}", self.out.len());
info!("produced too many bins out len {}", self.out.len());
break;
} else if ts > b.filled_until {
if ts >= b.active_end {
@@ -618,7 +623,7 @@ where
}
}
pub fn ingest(&mut self, evs: &ContainerEvents<EVT>) -> Result<(), Error> {
pub fn ingest(&mut self, evs: &ContainerEvents<EVT>) -> Result<IngestReport, Error> {
// It is this type's task to find and store the one-before event.
// We then pass it to the aggregation.
// AggregatorTimeWeight needs a function for that.
@@ -692,7 +697,7 @@ where
// done
};
}
Ok(())
Ok(IngestReport::ConsumedAll)
}
pub fn input_done_range_final(&mut self) -> Result<(), Error> {

View File

@@ -13,6 +13,7 @@ use items_0::timebin::BinningggContainerBinsDyn;
use items_0::timebin::BinningggError;
use items_0::timebin::BinsBoxed;
use items_0::timebin::EventsBoxed;
use items_0::timebin::IngestReport;
use netpod::BinnedRange;
use netpod::TsNano;
use std::ops::ControlFlow;
@@ -66,7 +67,7 @@ where
self.binner.cnt_zero_enable();
}
fn ingest(&mut self, evs: &EventsBoxed) -> Result<(), BinningggError> {
fn ingest(&mut self, evs: &EventsBoxed) -> Result<IngestReport, BinningggError> {
match evs.as_any_ref().downcast_ref::<ContainerEvents<EVT>>() {
Some(evs) => Ok(self.binner.ingest(evs)?),
None => {
@@ -124,7 +125,7 @@ impl BinnedEventsTimeweightTrait for BinnedEventsTimeweightLazy {
self.enable_cnt_zero = true;
}
fn ingest(&mut self, evs: &EventsBoxed) -> Result<(), BinningggError> {
fn ingest(&mut self, evs: &EventsBoxed) -> Result<IngestReport, BinningggError> {
self.binned_events
.get_or_insert_with(|| {
let mut v = evs.binned_events_timeweight_traitobj(self.range.clone());
@@ -203,7 +204,7 @@ impl BinnedEventsTimeweightStream {
DataItem(x) => match x {
Data(x) => match x {
ChannelEvents::Events(evs) => match self.binned_events.ingest(&evs) {
Ok(()) => match self.binned_events.output() {
Ok(report) => match self.binned_events.output() {
Ok(Some(x)) => {
if x.len() == 0 {
Continue(())