From 67590d6bd112326207a490d28fc4b6f30195a2b3 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Mon, 18 Nov 2024 15:56:46 +0100 Subject: [PATCH] WIP binned bins --- Cargo.toml | 3 + src/binning/container_bins.rs | 8 + src/binning/container_events.rs | 30 +-- src/binning/test.rs | 3 + src/binning/test/bins00.rs | 54 ++++++ src/binning/test/bins_gen.rs | 25 +++ src/binning/test/compare.rs | 144 +++++++++++++++ src/binning/test/events00.rs | 171 +++--------------- src/binning/timeweight.rs | 4 +- src/binning/timeweight/timeweight_bins.rs | 57 +++++- src/binning/timeweight/timeweight_bins_dyn.rs | 26 --- .../timeweight/timeweight_bins_lazy.rs | 52 ++++++ .../timeweight/timeweight_bins_stream.rs | 165 +++++++++++++++++ src/binning/timeweight/timeweight_events.rs | 4 +- .../timeweight/timeweight_events_dyn.rs | 2 +- src/lib.rs | 4 + 16 files changed, 547 insertions(+), 205 deletions(-) create mode 100644 src/binning/test/bins00.rs create mode 100644 src/binning/test/bins_gen.rs create mode 100644 src/binning/test/compare.rs create mode 100644 src/binning/timeweight/timeweight_bins_lazy.rs create mode 100644 src/binning/timeweight/timeweight_bins_stream.rs diff --git a/Cargo.toml b/Cargo.toml index ab9df3c..a559d99 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -31,3 +31,6 @@ thiserror = { git = "https://github.com/dominikwerder/thiserror.git", branch = " [features] heavy = [] + +[dev-dependencies] +tokio = { version = "1", features = ["rt"] } diff --git a/src/binning/container_bins.rs b/src/binning/container_bins.rs index 06c3654..ddffa93 100644 --- a/src/binning/container_bins.rs +++ b/src/binning/container_bins.rs @@ -613,6 +613,14 @@ where } } + fn binned_bins_timeweight_traitobj( + &self, + range: netpod::BinnedRange, + ) -> Box { + let ret = super::timeweight::timeweight_bins::BinnedBinsTimeweight::::new(range); + Box::new(ret) + } + fn fix_numerics(&mut self) { for ((_min, _max), _avg) in self .mins diff --git a/src/binning/container_events.rs b/src/binning/container_events.rs index a37f3cf..af64518 100644 --- a/src/binning/container_events.rs +++ b/src/binning/container_events.rs @@ -81,7 +81,7 @@ impl Container for VecDeque { } fn get_iter_ty_1(&self, pos: usize) -> Option<&str> { - todo!() + self.get(pos).map(|x| x.as_str()) } } @@ -174,14 +174,7 @@ where pub val: EVT::IterTy1<'a>, } -impl<'a, EVT> EventSingleRef<'a, EVT> -where - EVT: EventValueType, -{ - pub fn to_owned(&self) { - todo!() - } -} +impl<'a, EVT> EventSingleRef<'a, EVT> where EVT: EventValueType {} #[derive(Debug, Clone)] pub struct EventSingle { @@ -266,25 +259,6 @@ where Ok(()) } - pub fn ts_first(&self) -> Option { - self.tss.front().map(|&x| x) - } - - pub fn ts_last(&self) -> Option { - self.tss.back().map(|&x| x) - } - - fn _len_before(&self, end: TsNano) -> usize { - let tss = &self.tss; - let pp = tss.partition_point(|&x| x < end); - assert!(pp <= tss.len(), "len_before pp {} len {}", pp, tss.len()); - pp - } - - fn _pop_front(&mut self) -> Option> { - todo!() - } - pub fn push_back(&mut self, ts: TsNano, val: EVT) { self.tss.push_back(ts); self.vals.push_back(val); diff --git a/src/binning/test.rs b/src/binning/test.rs index 8b24a21..cb9d88a 100644 --- a/src/binning/test.rs +++ b/src/binning/test.rs @@ -1,3 +1,6 @@ +mod bins00; +mod bins_gen; +mod compare; mod events00; use super::container_events::ContainerEvents; diff --git a/src/binning/test/bins00.rs b/src/binning/test/bins00.rs new file mode 100644 index 0000000..b49d230 --- /dev/null +++ b/src/binning/test/bins00.rs @@ -0,0 +1,54 @@ +use super::bins_gen::bins_gen_dim0_f32_v00; +use super::compare::exp_avgs; +use super::compare::exp_cnts; +use super::compare::exp_maxs; +use super::compare::exp_mins; +use super::events00::pu; +use crate::binning::container_events::ContainerEvents; +use crate::binning::test::bins_gen::boxed_conts; +use crate::binning::timeweight::timeweight_bins_stream::BinnedBinsTimeweightStream; +use crate::binning::timeweight::timeweight_events::BinnedEventsTimeweight; +use futures_util::StreamExt; +use items_0::timebin::BinningggContainerBinsDyn; +use netpod::log::*; +use netpod::range::evrange::NanoRange; +use netpod::BinnedRange; +use netpod::DtMs; +use netpod::TsNano; + +#[derive(Debug, thiserror::Error)] +#[cstm(name = "Error")] +enum Error { + Timeweight(#[from] crate::binning::timeweight::timeweight_events::Error), + AssertMsg(String), + Compare(#[from] super::compare::Error), +} + +#[test] +fn test_bin_events_f32_simple_01() -> Result<(), Error> { + let fut = async { + let beg = TsNano::from_ms(100); + let end = TsNano::from_ms(500); + let bin_len = DtMs::from_ms_u64(10); + let nano_range = NanoRange { + beg: beg.ns(), + end: end.ns(), + }; + let range = BinnedRange::from_nano_range(nano_range, bin_len); + let inp = bins_gen_dim0_f32_v00(); + let inp = boxed_conts(inp); + let mut stream = BinnedBinsTimeweightStream::new(range, inp); + while let Some(bin) = stream.next().await { + eprintln!("bin {:?}", bin); + } + // exp_cnts(&bins, "2 3")?; + // exp_mins(&bins, "2. 1.")?; + // exp_maxs(&bins, "2.4 2.4")?; + // exp_avgs(&bins, "2.30 1.5333")?; + Ok(()) + }; + let rt = tokio::runtime::Builder::new_current_thread() + .build() + .unwrap(); + rt.block_on(fut) +} diff --git a/src/binning/test/bins_gen.rs b/src/binning/test/bins_gen.rs new file mode 100644 index 0000000..253d357 --- /dev/null +++ b/src/binning/test/bins_gen.rs @@ -0,0 +1,25 @@ +use crate::binning::container_bins::ContainerBins; +use futures_util::Stream; +use futures_util::StreamExt; +use items_0::streamitem::RangeCompletableItem; +use items_0::streamitem::Sitemty; +use items_0::streamitem::StreamItem; +use items_0::timebin::BinningggContainerBinsDyn; +use std::pin::Pin; + +pub(super) fn boxed_conts(inp: S) -> Pin::Item> + Send>> +where + S: Stream + Send + 'static, +{ + Box::pin(inp) +} + +pub(super) fn bins_gen_dim0_f32_v00( +) -> impl Stream>> { + futures_util::stream::iter((0usize..1000).into_iter()) + .map(|x| { + let c = ContainerBins::::new(); + Box::new(c) as Box + }) + .map(|x| Ok(StreamItem::DataItem(RangeCompletableItem::Data(x)))) +} diff --git a/src/binning/test/compare.rs b/src/binning/test/compare.rs new file mode 100644 index 0000000..07b960e --- /dev/null +++ b/src/binning/test/compare.rs @@ -0,0 +1,144 @@ +use crate::binning::container_bins::ContainerBins; +use std::collections::VecDeque; + +#[derive(Debug, thiserror::Error)] +#[cstm(name = "Compare")] +pub enum Error { + AssertMsg(String), +} + +trait IntoVecDequeU64 { + fn into_vec_deque_u64(self) -> VecDeque; +} + +impl IntoVecDequeU64 for &str { + fn into_vec_deque_u64(self) -> VecDeque { + self.split_ascii_whitespace() + .map(|x| x.parse().unwrap()) + .collect() + } +} +trait IntoVecDequeF32 { + fn into_vec_deque_f32(self) -> VecDeque; +} + +impl IntoVecDequeF32 for &str { + fn into_vec_deque_f32(self) -> VecDeque { + self.split_ascii_whitespace() + .map(|x| x.parse().unwrap()) + .collect() + } +} + +fn exp_u64<'a>( + vals: impl Iterator, + exps: impl Iterator, + tag: &str, +) -> Result<(), Error> { + let mut it_a = vals; + let mut it_b = exps; + let mut i = 0; + loop { + let a = it_a.next(); + let b = it_b.next(); + if a.is_none() && b.is_none() { + break; + } + if let (Some(&val), Some(&exp)) = (a, b) { + if val != exp { + return Err(Error::AssertMsg(format!( + "{tag} val {} exp {} i {}", + val, exp, i + ))); + } + } else { + return Err(Error::AssertMsg(format!("{tag} len mismatch"))); + } + i += 1; + } + Ok(()) +} + +fn exp_f32<'a>( + vals: impl Iterator, + exps: impl Iterator, + tag: &str, +) -> Result<(), Error> { + let mut it_a = vals; + let mut it_b = exps; + let mut i = 0; + loop { + let a = it_a.next(); + let b = it_b.next(); + if a.is_none() && b.is_none() { + break; + } + if let (Some(&val), Some(&exp)) = (a, b) { + if netpod::f32_close(val, exp) == false { + return Err(Error::AssertMsg(format!( + "{tag} val {} exp {} i {}", + val, exp, i + ))); + } + } else { + return Err(Error::AssertMsg(format!("{tag} len mismatch"))); + } + i += 1; + } + Ok(()) +} + +pub(super) fn exp_cnts(bins: &ContainerBins, exps: impl IntoVecDequeU64) -> Result<(), Error> { + exp_u64( + bins.cnts_iter(), + exps.into_vec_deque_u64().iter(), + "exp_cnts", + ) +} + +pub(super) fn exp_mins(bins: &ContainerBins, exps: impl IntoVecDequeF32) -> Result<(), Error> { + exp_f32( + bins.mins_iter(), + exps.into_vec_deque_f32().iter(), + "exp_mins", + ) +} + +pub(super) fn exp_maxs(bins: &ContainerBins, exps: impl IntoVecDequeF32) -> Result<(), Error> { + exp_f32( + bins.maxs_iter(), + exps.into_vec_deque_f32().iter(), + "exp_maxs", + ) +} + +pub(super) fn exp_avgs(bins: &ContainerBins, exps: impl IntoVecDequeF32) -> Result<(), Error> { + let exps = exps.into_vec_deque_f32(); + let mut it_a = bins.iter_debug(); + let mut it_b = exps.iter(); + let mut i = 0; + loop { + let a = it_a.next(); + let b = it_b.next(); + if a.is_none() && b.is_none() { + break; + } + if let (Some(a), Some(&exp)) = (a, b) { + let val = *a.avg as f32; + if netpod::f32_close(val, exp) == false { + return Err(Error::AssertMsg(format!( + "exp_avgs val {} exp {} i {}", + val, exp, i + ))); + } + } else { + return Err(Error::AssertMsg(format!( + "len mismatch {} vs {}", + bins.len(), + exps.len() + ))); + } + i += 1; + } + Ok(()) +} diff --git a/src/binning/test/events00.rs b/src/binning/test/events00.rs index bbd28f0..e111eb4 100644 --- a/src/binning/test/events00.rs +++ b/src/binning/test/events00.rs @@ -1,22 +1,23 @@ +use super::compare::exp_avgs; +use super::compare::exp_cnts; +use super::compare::exp_maxs; +use super::compare::exp_mins; use crate::binning::container_bins::ContainerBins; use crate::binning::container_events::ContainerEvents; use crate::binning::timeweight::timeweight_events::BinnedEventsTimeweight; -use daqbuf_err as err; -use err::thiserror; -use err::ThisError; use netpod::log::*; use netpod::range::evrange::NanoRange; use netpod::BinnedRange; use netpod::DtMs; use netpod::EnumVariant; use netpod::TsNano; -use std::collections::VecDeque; -#[derive(Debug, ThisError)] +#[derive(Debug, thiserror::Error)] #[cstm(name = "Error")] enum Error { Timeweight(#[from] crate::binning::timeweight::timeweight_events::Error), AssertMsg(String), + Compare(#[from] super::compare::Error), } // fn prepare_data_with_cuts(beg_ms: u64, cuts: VecDeque) -> VecDeque> { @@ -27,7 +28,7 @@ enum Error { // let ivl = DtMs::from_ms_u64(x) // } -fn pu(c: &mut ContainerEvents, ts_ms: u64, val: f32) +pub(super) fn pu(c: &mut ContainerEvents, ts_ms: u64, val: f32) // where // C: AsMut>, // C: std::borrow::BorrowMut>, @@ -35,145 +36,6 @@ fn pu(c: &mut ContainerEvents, ts_ms: u64, val: f32) c.push_back(TsNano::from_ms(ts_ms), val); } -trait IntoVecDequeU64 { - fn into_vec_deque_u64(self) -> VecDeque; -} - -impl IntoVecDequeU64 for &str { - fn into_vec_deque_u64(self) -> VecDeque { - self.split_ascii_whitespace() - .map(|x| x.parse().unwrap()) - .collect() - } -} -trait IntoVecDequeF32 { - fn into_vec_deque_f32(self) -> VecDeque; -} - -impl IntoVecDequeF32 for &str { - fn into_vec_deque_f32(self) -> VecDeque { - self.split_ascii_whitespace() - .map(|x| x.parse().unwrap()) - .collect() - } -} - -fn exp_u64<'a>( - vals: impl Iterator, - exps: impl Iterator, - tag: &str, -) -> Result<(), Error> { - let mut it_a = vals; - let mut it_b = exps; - let mut i = 0; - loop { - let a = it_a.next(); - let b = it_b.next(); - if a.is_none() && b.is_none() { - break; - } - if let (Some(&val), Some(&exp)) = (a, b) { - if val != exp { - return Err(Error::AssertMsg(format!( - "{tag} val {} exp {} i {}", - val, exp, i - ))); - } - } else { - return Err(Error::AssertMsg(format!("{tag} len mismatch"))); - } - i += 1; - } - Ok(()) -} - -fn exp_f32<'a>( - vals: impl Iterator, - exps: impl Iterator, - tag: &str, -) -> Result<(), Error> { - let mut it_a = vals; - let mut it_b = exps; - let mut i = 0; - loop { - let a = it_a.next(); - let b = it_b.next(); - if a.is_none() && b.is_none() { - break; - } - if let (Some(&val), Some(&exp)) = (a, b) { - if netpod::f32_close(val, exp) == false { - return Err(Error::AssertMsg(format!( - "{tag} val {} exp {} i {}", - val, exp, i - ))); - } - } else { - return Err(Error::AssertMsg(format!("{tag} len mismatch"))); - } - i += 1; - } - Ok(()) -} - -#[cfg(test)] -fn exp_cnts(bins: &ContainerBins, exps: impl IntoVecDequeU64) -> Result<(), Error> { - exp_u64( - bins.cnts_iter(), - exps.into_vec_deque_u64().iter(), - "exp_cnts", - ) -} - -#[cfg(test)] -fn exp_mins(bins: &ContainerBins, exps: impl IntoVecDequeF32) -> Result<(), Error> { - exp_f32( - bins.mins_iter(), - exps.into_vec_deque_f32().iter(), - "exp_mins", - ) -} - -#[cfg(test)] -fn exp_maxs(bins: &ContainerBins, exps: impl IntoVecDequeF32) -> Result<(), Error> { - exp_f32( - bins.maxs_iter(), - exps.into_vec_deque_f32().iter(), - "exp_maxs", - ) -} - -fn exp_avgs(bins: &ContainerBins, exps: impl IntoVecDequeF32) -> Result<(), Error> { - let exps = exps.into_vec_deque_f32(); - let mut it_a = bins.iter_debug(); - let mut it_b = exps.iter(); - let mut i = 0; - loop { - let a = it_a.next(); - let b = it_b.next(); - if a.is_none() && b.is_none() { - break; - } - if let (Some(a), Some(&exp)) = (a, b) { - let val = *a.avg as f32; - if netpod::f32_close(val, exp) == false { - return Err(Error::AssertMsg(format!( - "exp_avgs val {} exp {} i {}", - val, exp, i - ))); - } - } else { - return Err(Error::AssertMsg(format!( - "len mismatch {} vs {}", - bins.len(), - exps.len() - ))); - } - i += 1; - } - Ok(()) -} - #[test] fn test_bin_events_f32_simple_with_before_00() -> Result<(), Error> { let beg = TsNano::from_ms(110); @@ -513,3 +375,22 @@ fn test_bin_events_enum_simple_range_final() -> Result<(), Error> { let bins = binner.output(); Ok(()) } + +#[test] +fn test_bin_events_string_simple_range_final() -> Result<(), Error> { + let beg = TsNano::from_ms(100); + let end = TsNano::from_ms(120); + let nano_range = NanoRange { + beg: beg.ns(), + end: end.ns(), + }; + let range = BinnedRange::from_nano_range(nano_range, DtMs::from_ms_u64(10)); + let mut binner = BinnedEventsTimeweight::new(range); + let mut evs = ContainerEvents::new(); + evs.push_back(TsNano::from_ms(103), EnumVariant::new(1, "one")); + evs.push_back(TsNano::from_ms(104), EnumVariant::new(2, "two")); + binner.ingest(&evs)?; + binner.input_done_range_final()?; + let bins = binner.output(); + Ok(()) +} diff --git a/src/binning/timeweight.rs b/src/binning/timeweight.rs index 38245c3..230b8a3 100644 --- a/src/binning/timeweight.rs +++ b/src/binning/timeweight.rs @@ -1,10 +1,10 @@ pub mod timeweight_bins; pub mod timeweight_bins_dyn; +pub mod timeweight_bins_lazy; +pub mod timeweight_bins_stream; pub mod timeweight_events; pub mod timeweight_events_dyn; -use netpod::log::*; - #[allow(unused)] macro_rules! trace_init { ($($arg:tt)*) => ( if true { trace!($($arg)*); }) } diff --git a/src/binning/timeweight/timeweight_bins.rs b/src/binning/timeweight/timeweight_bins.rs index db16357..ac14181 100644 --- a/src/binning/timeweight/timeweight_bins.rs +++ b/src/binning/timeweight/timeweight_bins.rs @@ -1,4 +1,59 @@ -use netpod::log::*; +use crate::binning::container_bins::ContainerBins; +use crate::binning::container_events::EventValueType; +use crate::log::*; +use items_0::timebin::BinnedBinsTimeweightTrait; +use items_0::timebin::BinningggError; +use items_0::timebin::BinsBoxed; +use netpod::BinnedRange; +use netpod::TsNano; #[allow(unused)] macro_rules! trace_init { ($($arg:tt)*) => ( if true { trace!($($arg)*); }) } + +#[derive(Debug)] +pub struct BinnedBinsTimeweight +where + EVT: EventValueType, +{ + range: BinnedRange, + out: ContainerBins, + produce_cnt_zero: bool, +} + +impl BinnedBinsTimeweight +where + EVT: EventValueType, +{ + pub fn new(range: BinnedRange) -> Self { + trace_init!("BinnedBinsTimeweight::new {}", range); + let active_beg = range.nano_beg(); + let active_end = active_beg.add_dt_nano(range.bin_len.to_dt_nano()); + let active_len = active_end.delta(active_beg); + Self { + range, + out: ContainerBins::new(), + produce_cnt_zero: true, + } + } +} + +impl BinnedBinsTimeweightTrait for BinnedBinsTimeweight +where + EVT: EventValueType, +{ + fn ingest(&mut self, evs: &BinsBoxed) -> Result<(), BinningggError> { + todo!() + } + + fn input_done_range_final(&mut self) -> Result<(), BinningggError> { + todo!() + } + + fn input_done_range_open(&mut self) -> Result<(), BinningggError> { + todo!() + } + + fn output(&mut self) -> Result, BinningggError> { + todo!() + } +} diff --git a/src/binning/timeweight/timeweight_bins_dyn.rs b/src/binning/timeweight/timeweight_bins_dyn.rs index 1b65daa..8b13789 100644 --- a/src/binning/timeweight/timeweight_bins_dyn.rs +++ b/src/binning/timeweight/timeweight_bins_dyn.rs @@ -1,27 +1 @@ -use futures_util::Stream; -use items_0::streamitem::Sitemty; -use items_0::timebin::BinningggContainerBinsDyn; -use netpod::BinnedRange; -use netpod::TsNano; -use std::pin::Pin; -use std::task::Context; -use std::task::Poll; -pub struct BinnedBinsTimeweightStream {} - -impl BinnedBinsTimeweightStream { - pub fn new( - range: BinnedRange, - inp: Pin>> + Send>>, - ) -> Self { - todo!() - } -} - -impl Stream for BinnedBinsTimeweightStream { - type Item = Sitemty>; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - todo!() - } -} diff --git a/src/binning/timeweight/timeweight_bins_lazy.rs b/src/binning/timeweight/timeweight_bins_lazy.rs new file mode 100644 index 0000000..3b747fc --- /dev/null +++ b/src/binning/timeweight/timeweight_bins_lazy.rs @@ -0,0 +1,52 @@ +use crate::log::*; +use items_0::timebin::BinnedBinsTimeweightTrait; +use items_0::timebin::BinningggError; +use items_0::timebin::BinsBoxed; +use netpod::BinnedRange; +use netpod::TsNano; + +#[derive(Debug, thiserror::Error)] +#[cstm(name = "BinnedBinsLazy")] +pub enum Error {} + +#[derive(Debug)] +pub struct BinnedBinsTimeweightLazy { + range: BinnedRange, + binned: Option>, +} + +impl BinnedBinsTimeweightLazy { + pub fn new(range: BinnedRange) -> Self { + Self { + range, + binned: None, + } + } + + pub fn ingest(&mut self, evs: &BinsBoxed) -> Result<(), BinningggError> { + self.binned + .get_or_insert_with(|| evs.binned_bins_timeweight_traitobj(self.range.clone())) + .ingest(evs) + } + + pub fn input_done_range_final(&mut self) -> Result<(), BinningggError> { + self.binned + .as_mut() + .map(|x| x.input_done_range_final()) + .unwrap_or_else(|| { + debug!("TODO something to do if we miss the binner here?"); + Ok(()) + }) + } + + pub fn input_done_range_open(&mut self) -> Result<(), BinningggError> { + self.binned + .as_mut() + .map(|x| x.input_done_range_open()) + .unwrap_or(Ok(())) + } + + pub fn output(&mut self) -> Result, BinningggError> { + self.binned.as_mut().map(|x| x.output()).unwrap_or(Ok(None)) + } +} diff --git a/src/binning/timeweight/timeweight_bins_stream.rs b/src/binning/timeweight/timeweight_bins_stream.rs new file mode 100644 index 0000000..ec0c0a8 --- /dev/null +++ b/src/binning/timeweight/timeweight_bins_stream.rs @@ -0,0 +1,165 @@ +use super::timeweight_bins_lazy::BinnedBinsTimeweightLazy; +use crate::log::*; +use futures_util::Stream; +use futures_util::StreamExt; +use items_0::streamitem::sitem_err2_from_string; +use items_0::streamitem::LogItem; +use items_0::streamitem::Sitemty; +use items_0::timebin::BinnedBinsTimeweightTrait; +use items_0::timebin::BinningggContainerBinsDyn; +use items_0::timebin::BinsBoxed; +use netpod::BinnedRange; +use netpod::TsNano; +use std::fmt; +use std::ops::ControlFlow; +use std::pin::Pin; +use std::task::Context; +use std::task::Poll; + +macro_rules! trace_input_container { ($($arg:tt)*) => ( if false { trace!($($arg)*); }) } + +macro_rules! trace_emit { ($($arg:tt)*) => ( if false { trace!($($arg)*); }) } + +#[derive(Debug, thiserror::Error)] +#[cstm(name = "BinnedEventsTimeweightDyn")] +pub enum Error { + InnerDynMissing, +} + +type ItemA = Box; +type ItemB = Sitemty; + +enum StreamState { + Reading, + Done, +} + +pub struct BinnedBinsTimeweightStream { + state: StreamState, + inp: Pin + Send>>, + range_complete: bool, + binned: BinnedBinsTimeweightLazy, +} + +impl BinnedBinsTimeweightStream { + pub fn new(range: BinnedRange, inp: Pin + Send>>) -> Self { + Self { + state: StreamState::Reading, + inp, + range_complete: false, + binned: BinnedBinsTimeweightLazy::new(range), + } + } + + fn handle_sitemty( + mut self: Pin<&mut Self>, + item: ItemB, + _cx: &mut Context, + ) -> ControlFlow::Item>>> { + use items_0::streamitem::RangeCompletableItem::*; + use items_0::streamitem::StreamItem::*; + use ControlFlow::*; + use Poll::*; + match item { + Ok(x) => match x { + DataItem(x) => match x { + Data(x) => match self.binned.ingest(&x) { + Ok(()) => match self.binned.output() { + Ok(Some(x)) => { + if x.len() == 0 { + Continue(()) + } else { + let ret = Ok(DataItem(Data(x))); + Break(Ready(Some(ret))) + } + } + Ok(None) => Continue(()), + Err(e) => { + let e = sitem_err2_from_string(e); + Break(Ready(Some(Err(e)))) + } + }, + Err(e) => { + let e = sitem_err2_from_string(e); + Break(Ready(Some(Err(e)))) + } + }, + RangeComplete => { + self.range_complete = true; + Continue(()) + } + }, + Log(x) => Break(Ready(Some(Ok(Log(x))))), + Stats(x) => Break(Ready(Some(Ok(Stats(x))))), + }, + Err(e) => { + self.state = StreamState::Done; + Break(Ready(Some(Err(e)))) + } + } + } + + fn handle_eos( + mut self: Pin<&mut Self>, + _cx: &mut Context, + ) -> Poll::Item>> { + trace_input_container!("handle_eos"); + use items_0::streamitem::RangeCompletableItem::*; + use items_0::streamitem::StreamItem::*; + use Poll::*; + self.state = StreamState::Done; + if self.range_complete { + self.binned + .input_done_range_final() + .map_err(sitem_err2_from_string)?; + } else { + self.binned + .input_done_range_open() + .map_err(sitem_err2_from_string)?; + } + match self.binned.output().map_err(sitem_err2_from_string)? { + Some(x) => { + trace_emit!("seeing ready bins {:?}", x); + Ready(Some(Ok(DataItem(Data(x))))) + } + None => { + let item = LogItem::from_node(888, Level::INFO, format!("no bins ready on eos")); + Ready(Some(Ok(Log(item)))) + } + } + } + + fn handle_main( + mut self: Pin<&mut Self>, + cx: &mut Context, + ) -> ControlFlow::Item>>> { + use ControlFlow::*; + use Poll::*; + let ret = match &self.state { + StreamState::Reading => match self.as_mut().inp.poll_next_unpin(cx) { + Ready(Some(x)) => self.as_mut().handle_sitemty(x, cx), + Ready(None) => Break(self.as_mut().handle_eos(cx)), + Pending => Break(Pending), + }, + StreamState::Done => Break(Ready(None)), + }; + if let Break(Ready(Some(Err(_)))) = ret { + self.state = StreamState::Done; + } + ret + } +} + +impl Stream for BinnedBinsTimeweightStream { + type Item = Sitemty>; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + use ControlFlow::*; + loop { + break match self.as_mut().handle_main(cx) { + Break(x) => x, + Continue(()) => continue, + }; + } + } +} diff --git a/src/binning/timeweight/timeweight_events.rs b/src/binning/timeweight/timeweight_events.rs index 2c28152..a55aef8 100644 --- a/src/binning/timeweight/timeweight_events.rs +++ b/src/binning/timeweight/timeweight_events.rs @@ -6,17 +6,17 @@ use crate::binning::container_events::ContainerEventsTakeUpTo; use crate::binning::container_events::EventSingle; use crate::binning::container_events::EventSingleRef; use crate::binning::container_events::PartialOrdEvtA; +use crate::log::*; use core::fmt; use daqbuf_err as err; use err::thiserror; use err::ThisError; -use netpod::log::*; use netpod::BinnedRange; use netpod::DtNano; use netpod::TsNano; use std::mem; -macro_rules! trace_ { ($($arg:tt)*) => ( if true { eprintln!($($arg)*); }) } +macro_rules! trace_ { ($($arg:tt)*) => ( if false { eprintln!($($arg)*); }) } macro_rules! trace_init { ($($arg:tt)*) => ( if true { trace_!($($arg)*); }) } diff --git a/src/binning/timeweight/timeweight_events_dyn.rs b/src/binning/timeweight/timeweight_events_dyn.rs index fc62cf2..a2434f6 100644 --- a/src/binning/timeweight/timeweight_events_dyn.rs +++ b/src/binning/timeweight/timeweight_events_dyn.rs @@ -2,6 +2,7 @@ use super::timeweight_events::BinnedEventsTimeweight; use crate::binning::container_events::ContainerEvents; use crate::binning::container_events::EventValueType; use crate::channelevents::ChannelEvents; +use crate::log::*; use daqbuf_err as err; use err::thiserror; use err::ThisError; @@ -14,7 +15,6 @@ use items_0::timebin::BinningggContainerBinsDyn; use items_0::timebin::BinningggError; use items_0::timebin::BinsBoxed; use items_0::timebin::EventsBoxed; -use netpod::log::*; use netpod::BinnedRange; use netpod::TsNano; use std::ops::ControlFlow; diff --git a/src/lib.rs b/src/lib.rs index ed03d9c..47a645c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -22,6 +22,10 @@ use items_0::isodate::IsoDateTime; use items_0::Events; use std::fmt; +mod log { + pub use netpod::log::*; +} + #[derive(Debug, PartialEq)] pub enum ErrorKind { General,