From 14fb35a1b3814c33e08978a664104c2964c0bb56 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Thu, 13 Feb 2025 11:52:29 +0100 Subject: [PATCH] Bin stream test --- src/binning/container_events.rs | 20 +++ src/binning/test/compare.rs | 24 +-- src/binning/test/events00.rs | 40 +++-- .../timeweight/timeweight_bins_lazy.rs | 4 - src/binning/timeweight/timeweight_events.rs | 97 +++++++----- .../timeweight/timeweight_events_dyn.rs | 144 +++++++++++++----- src/binning/valuetype.rs | 10 +- src/channelevents.rs | 77 ++++++++-- 8 files changed, 299 insertions(+), 117 deletions(-) diff --git a/src/binning/container_events.rs b/src/binning/container_events.rs index 64cd0b9..ad30dd3 100644 --- a/src/binning/container_events.rs +++ b/src/binning/container_events.rs @@ -63,6 +63,7 @@ where fn new() -> Self; fn len(&self) -> usize; fn push_back(&mut self, val: EVT); + fn clear(&mut self); fn get_iter_ty_1(&self, pos: usize) -> Option>; fn iter_ty_1(&self) -> impl Iterator>; fn drain_into(&mut self, dst: &mut Self, range: Range); @@ -102,6 +103,10 @@ where self.push_back(val); } + fn clear(&mut self) { + self.clear(); + } + fn get_iter_ty_1(&self, pos: usize) -> Option> { self.get(pos).map(|x| x.clone()) } @@ -136,6 +141,10 @@ impl Container for VecDeque { self.push_back(val); } + fn clear(&mut self) { + self.clear(); + } + fn get_iter_ty_1(&self, pos: usize) -> Option<&str> { self.get(pos).map(|x| x.as_str()) } @@ -512,6 +521,11 @@ where self.vals.push_back(val.1); } + fn clear(&mut self) { + self.pulses.clear(); + self.vals.clear(); + } + fn get_iter_ty_1(&self, pos: usize) -> Option< as EventValueType>::IterTy1<'_>> { if let (Some(&pulse), Some(val)) = (self.pulses.get(pos), self.vals.get_iter_ty_1(pos)) { let x = PulsedValIterTy { pulse, evt: val }; @@ -794,6 +808,12 @@ where pub fn serde_id() -> u32 { items_0::streamitem::CONTAINER_EVENTS_TYPE_ID } + + pub fn clear(&mut self) { + self.tss.clear(); + self.vals.clear(); + self.byte_estimate = 0; + } } impl fmt::Debug for ContainerEvents diff --git a/src/binning/test/compare.rs b/src/binning/test/compare.rs index 2ae695a..c86c7f9 100644 --- a/src/binning/test/compare.rs +++ b/src/binning/test/compare.rs @@ -1,11 +1,13 @@ use crate::binning::container_bins::ContainerBins; use std::collections::VecDeque; -#[derive(Debug, thiserror::Error)] -#[cstm(name = "Compare")] -pub enum Error { - AssertMsg(String), -} +autoerr::create_error_v1!( + name(Error, "Compare"), + enum variants { + AssertMsg(String), + BinLenMismatch(usize, usize), + }, +); pub(super) trait IntoVecDequeU64 { fn into_vec_deque_u64(self) -> VecDeque; @@ -93,11 +95,13 @@ pub(super) fn exp_cnts( bins: &ContainerBins, exps: impl IntoVecDequeU64, ) -> Result<(), Error> { - exp_u64( - bins.cnts_iter(), - exps.into_vec_deque_u64().iter(), - "exp_cnts", - ) + let exps = exps.into_vec_deque_u64(); + if bins.len() != exps.len() { + let e = Error::BinLenMismatch(bins.len(), exps.len()); + Err(e) + } else { + exp_u64(bins.cnts_iter(), exps.iter(), "exp_cnts") + } } pub(super) fn exp_mins( diff --git a/src/binning/test/events00.rs b/src/binning/test/events00.rs index e111eb4..ec81587 100644 --- a/src/binning/test/events00.rs +++ b/src/binning/test/events00.rs @@ -2,23 +2,23 @@ use super::compare::exp_avgs; use super::compare::exp_cnts; use super::compare::exp_maxs; use super::compare::exp_mins; -use crate::binning::container_bins::ContainerBins; use crate::binning::container_events::ContainerEvents; use crate::binning::timeweight::timeweight_events::BinnedEventsTimeweight; -use netpod::log::*; +use crate::log::*; use netpod::range::evrange::NanoRange; use netpod::BinnedRange; use netpod::DtMs; use netpod::EnumVariant; use netpod::TsNano; -#[derive(Debug, thiserror::Error)] -#[cstm(name = "Error")] -enum Error { - Timeweight(#[from] crate::binning::timeweight::timeweight_events::Error), - AssertMsg(String), - Compare(#[from] super::compare::Error), -} +autoerr::create_error_v1!( + name(Error, "Error"), + enum variants { + Timeweight(#[from] crate::binning::timeweight::timeweight_events::Error), + AssertMsg(String), + Compare(#[from] super::compare::Error), + }, +); // fn prepare_data_with_cuts(beg_ms: u64, cuts: VecDeque) -> VecDeque> { // let beg = TsNano::from_ms(beg_ms); @@ -69,7 +69,9 @@ fn test_bin_events_f32_simple_with_before_01_range_final() -> Result<(), Error> end: end.ns(), }; let range = BinnedRange::from_nano_range(nano_range, DtMs::from_ms_u64(10)); + eprintln!("range {:?}", range); let mut binner = BinnedEventsTimeweight::new(range); + binner.cnt_zero_enable(); let mut evs = ContainerEvents::::new(); let em = &mut evs; pu(em, 103, 2.0); @@ -109,7 +111,7 @@ fn test_bin_events_f32_simple_00() -> Result<(), Error> { binner.input_done_range_open()?; let bins = binner.output(); for b in bins.iter_debug() { - trace!("{b:?}"); + trace!("{:?}", b); } exp_cnts(&bins, "2 3")?; exp_mins(&bins, "2. 1.")?; @@ -144,7 +146,7 @@ fn test_bin_events_f32_simple_01() -> Result<(), Error> { binner.input_done_range_open()?; let bins = binner.output(); for b in bins.iter_debug() { - trace!("{b:?}"); + trace!("{:?}", b); } assert_eq!(bins.len(), 2); exp_cnts(&bins, "2 3")?; @@ -180,7 +182,7 @@ fn test_bin_events_f32_small_range_final() -> Result<(), Error> { binner.input_done_range_final()?; let bins = binner.output(); for b in bins.iter_debug() { - trace!("{b:?}"); + trace!("{:?}", b); } assert_eq!(bins.len(), 2); exp_cnts(&bins, "2 3")?; @@ -202,6 +204,7 @@ fn test_bin_events_f32_small_intermittent_silence_range_open() -> Result<(), Err }; let range = BinnedRange::from_nano_range(nano_range, DtMs::from_ms_u64(10)); let mut binner = BinnedEventsTimeweight::new(range); + binner.cnt_zero_enable(); let mut evs = ContainerEvents::::new(); let em = &mut evs; pu(em, 102, 2.0); @@ -223,7 +226,7 @@ fn test_bin_events_f32_small_intermittent_silence_range_open() -> Result<(), Err binner.input_done_range_open()?; let bins = binner.output(); for b in bins.iter_debug() { - trace!("{b:?}"); + trace!("{:?}", b); } assert_eq!(bins.len(), 5); exp_cnts(&bins, "2 3 0 0 2")?; @@ -246,6 +249,7 @@ fn test_bin_events_f32_small_intermittent_silence_range_final() -> Result<(), Er }; let range = BinnedRange::from_nano_range(nano_range, bin_len); let mut binner = BinnedEventsTimeweight::new(range); + binner.cnt_zero_enable(); let mut evs = ContainerEvents::::new(); let em = &mut evs; pu(em, 102, 2.0); @@ -267,7 +271,7 @@ fn test_bin_events_f32_small_intermittent_silence_range_final() -> Result<(), Er binner.input_done_range_final()?; let bins = binner.output(); for b in bins.iter_debug() { - trace!("{b:?}"); + trace!("{:?}", b); } exp_cnts(&bins, "2 3 0 0 2")?; exp_mins(&bins, "2.0 1.0 1.4 1.4 1.2")?; @@ -308,7 +312,7 @@ fn test_bin_events_f32_small_intermittent_silence_minmax_no_edge_range_final() - binner.input_done_range_final()?; let bins = binner.output(); for b in bins.iter_debug() { - trace!("{b:?}"); + trace!("{:?}", b); } exp_cnts(&bins, "1")?; exp_mins(&bins, "40.")?; @@ -347,7 +351,7 @@ fn test_bin_events_f32_small_intermittent_silence_minmax_edge_range_final() -> R binner.input_done_range_final()?; let bins = binner.output(); for b in bins.iter_debug() { - trace!("{b:?}"); + trace!("{:?}", b); } exp_cnts(&bins, "1")?; exp_mins(&bins, "40.")?; @@ -367,12 +371,14 @@ fn test_bin_events_enum_simple_range_final() -> Result<(), Error> { }; let range = BinnedRange::from_nano_range(nano_range, DtMs::from_ms_u64(10)); let mut binner = BinnedEventsTimeweight::new(range); + binner.cnt_zero_enable(); let mut evs = ContainerEvents::new(); evs.push_back(TsNano::from_ms(103), EnumVariant::new(1, "one")); evs.push_back(TsNano::from_ms(104), EnumVariant::new(2, "two")); binner.ingest(&evs)?; binner.input_done_range_final()?; let bins = binner.output(); + assert_eq!(bins.len(), 2); Ok(()) } @@ -386,11 +392,13 @@ fn test_bin_events_string_simple_range_final() -> Result<(), Error> { }; let range = BinnedRange::from_nano_range(nano_range, DtMs::from_ms_u64(10)); let mut binner = BinnedEventsTimeweight::new(range); + binner.cnt_zero_enable(); let mut evs = ContainerEvents::new(); evs.push_back(TsNano::from_ms(103), EnumVariant::new(1, "one")); evs.push_back(TsNano::from_ms(104), EnumVariant::new(2, "two")); binner.ingest(&evs)?; binner.input_done_range_final()?; let bins = binner.output(); + assert_eq!(bins.len(), 2); Ok(()) } diff --git a/src/binning/timeweight/timeweight_bins_lazy.rs b/src/binning/timeweight/timeweight_bins_lazy.rs index 3b747fc..e4c8a60 100644 --- a/src/binning/timeweight/timeweight_bins_lazy.rs +++ b/src/binning/timeweight/timeweight_bins_lazy.rs @@ -5,10 +5,6 @@ use items_0::timebin::BinsBoxed; use netpod::BinnedRange; use netpod::TsNano; -#[derive(Debug, thiserror::Error)] -#[cstm(name = "BinnedBinsLazy")] -pub enum Error {} - #[derive(Debug)] pub struct BinnedBinsTimeweightLazy { range: BinnedRange, diff --git a/src/binning/timeweight/timeweight_events.rs b/src/binning/timeweight/timeweight_events.rs index 8039306..c5d15d3 100644 --- a/src/binning/timeweight/timeweight_events.rs +++ b/src/binning/timeweight/timeweight_events.rs @@ -1,37 +1,49 @@ -use super::super::container_events::EventValueType; use crate::binning::aggregator::AggregatorTimeWeight; use crate::binning::container_bins::ContainerBins; use crate::binning::container_events::ContainerEvents; use crate::binning::container_events::ContainerEventsTakeUpTo; use crate::binning::container_events::EventSingle; use crate::binning::container_events::EventSingleRef; +use crate::binning::container_events::EventValueType; use crate::binning::container_events::PartialOrdEvtA; -use crate::log::*; -use core::fmt; +use crate::log; use netpod::BinnedRange; use netpod::DtNano; use netpod::TsNano; +use std::fmt; use std::mem; -macro_rules! trace_ { ($($arg:tt)*) => ( if false { trace!($($arg)*); }) } +macro_rules! debug { ($($arg:expr),*) => ( if true { log::debug!($($arg),*); } ) } -macro_rules! trace_init { ($($arg:tt)*) => ( if true { trace_!($($arg)*); }) } +macro_rules! trace_ { ($($arg:expr),*) => ( if true { log::trace!($($arg),*); } ) } -macro_rules! trace_cycle { ($($arg:tt)*) => ( if true { trace_!($($arg)*); }) } +macro_rules! trace_init { ($($arg:expr),*) => ( if true { trace_!($($arg),*); } ) } -macro_rules! trace_event_next { ($($arg:tt)*) => ( if true { trace_!($($arg)*); }) } +macro_rules! trace_output { ($($arg:expr),*) => ( if true { trace_!($($arg),*); }) } -macro_rules! trace_ingest_init_lst { ($($arg:tt)*) => ( if true { trace_!($($arg)*); }) } +macro_rules! trace_cycle { ($($arg:expr),*) => ( if true { trace_!($($arg),*); }) } -macro_rules! trace_ingest_minmax { ($($arg:tt)*) => ( if true { trace_!($($arg)*); }) } +macro_rules! trace_event_next { ($fmt:expr, $($arg:expr),*) => ( if true { + trace_!(concat!("\x1b[1mEVENT POP FRONT\x1b[0m ", $fmt), $($arg),*); +}) } -macro_rules! trace_ingest_event { ($($arg:tt)*) => ( if true { trace_!($($arg)*); }) } +macro_rules! trace_ingest_init_lst { ($($arg:expr),*) => ( if true { trace_!($($arg),*); }) } -macro_rules! trace_ingest_container { ($($arg:tt)*) => ( if true { trace_!($($arg)*); }) } +macro_rules! trace_ingest_minmax { ($($arg:expr),*) => ( if true { trace_!($($arg),*); }) } -macro_rules! trace_ingest_container_2 { ($($arg:tt)*) => ( if true { trace_!($($arg)*); }) } +macro_rules! trace_ingest_event { ($($arg:expr),*) => ( if true { trace_!($($arg),*); }) } -macro_rules! trace_fill_until { ($($arg:tt)*) => ( if true { trace_!($($arg)*); }) } +macro_rules! trace_ingest_container { ($($arg:expr),*) => ( if true { trace_!($($arg),*); }) } + +macro_rules! trace_ingest_container_2 { ($($arg:expr),*) => ( if true { trace_!($($arg),*); }) } + +macro_rules! trace_fill_until { ($($arg:expr),*) => ( if true { trace_!($($arg),*); }) } + +const COL1: &'static str = "\x1b[1m"; +const RST: &'static str = "\x1b[0m"; + +const VERIFY_INPUT_EVENTS: bool = false; +const OUT_LEN_MAX: usize = 20000; #[cold] #[inline] @@ -53,6 +65,7 @@ autoerr::create_error_v1!( WithMinMaxButEventBeforeRange, NoMinMaxAfterInit, ExpectEventWithinRange, + IngestNoProgress(usize, usize), }, ); @@ -160,7 +173,7 @@ where let selfname = "ingest_with_lst_gt_range_beg"; trace_ingest_event!("{} len {}", selfname, evs.len()); while let Some(ev) = evs.next() { - trace_event_next!("EVENT POP FRONT {:?} {:30}", ev, selfname); + trace_event_next!("{:?} {:30}", ev, selfname); if ev.ts <= self.active_beg { panic!("should never get here"); } @@ -182,7 +195,7 @@ where let selfname = "ingest_with_lst_ge_range_beg"; trace_ingest_event!("{} len {}", selfname, evs.len()); while let Some(ev) = evs.next() { - trace_event_next!("EVENT POP FRONT {:?} {:30}", ev, selfname); + trace_event_next!("{:?} {:30}", ev, selfname); assert!(ev.ts >= self.active_beg); assert!(ev.ts < self.active_end); if ev.ts == self.active_beg { @@ -291,7 +304,7 @@ where let mut run_ingest_with_lst_minmax = false; let _ = run_ingest_with_lst_minmax; if let Some(ev) = evs.next() { - trace_event_next!("EVENT POP FRONT {:?} {:30}", ev, selfname); + trace_event_next!("{:?} {:30}", ev, selfname); let beg = b.active_beg; let end = b.active_end; if ev.ts < beg { @@ -363,6 +376,7 @@ where out: &mut ContainerBins, ) { let selfname = "push_out_and_reset"; + trace_output!("{} range_final {}", selfname, range_final); // TODO there is not always good enough input to produce a meaningful bin. // TODO can we always reset, and what exactly does reset mean here? // TODO what logic can I save here? To output a bin I need to have min, max, lst. @@ -393,11 +407,11 @@ pub struct BinnedEventsTimeweight where EVT: EventValueType, { - lst: Option>, range: BinnedRange, + produce_cnt_zero: bool, + lst: Option>, inner_a: InnerA, out: ContainerBins, - produce_cnt_zero: bool, } impl fmt::Debug for BinnedEventsTimeweight @@ -406,8 +420,9 @@ where { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { fmt.debug_struct("BinnedEventsTimeweight") - .field("lst", &self.lst) .field("range", &self.range) + .field("produce_cnt_zero", &self.produce_cnt_zero) + .field("lst", &self.lst) .field("inner_a", &self.inner_a) .field("out", &self.out) .finish() @@ -418,13 +433,19 @@ impl BinnedEventsTimeweight where EVT: EventValueType, { + pub fn type_name() -> &'static str { + std::any::type_name::() + } + pub fn new(range: BinnedRange) -> Self { - trace_init!("BinnedEventsTimeweight::new {}", range); + trace_init!("{}::new {}", Self::type_name(), range); let active_beg = range.nano_beg(); let active_end = active_beg.add_dt_nano(range.bin_len.to_dt_nano()); let active_len = active_end.delta(active_beg); Self { range, + produce_cnt_zero: false, + lst: None, inner_a: InnerA:: { inner_b: InnerB { cnt: 0, @@ -439,16 +460,12 @@ where }, minmax: None, }, - lst: None, out: ContainerBins::new(), - produce_cnt_zero: true, } } - pub fn disable_cnt_zero(self) -> Self { - let mut ret = self; - ret.produce_cnt_zero = false; - ret + pub fn cnt_zero_enable(&mut self) { + self.produce_cnt_zero = true; } fn ingest_event_without_lst(&mut self, ev: EventSingleRef) -> Result<(), Error> { @@ -476,7 +493,7 @@ where let mut run_ingest_with_lst = false; let _ = run_ingest_with_lst; if let Some(ev) = evs.next() { - trace_event_next!("EVENT POP FRONT {:?} {:30}", ev, selfname); + trace_event_next!("{:?} {:30}", ev, selfname); assert!(ev.ts < self.inner_a.inner_b.active_end); self.ingest_event_without_lst(ev)?; run_ingest_with_lst = true; @@ -527,7 +544,11 @@ where i += 1; assert!(i < 100000, "too many iterations"); let b = &self.inner_a.inner_b; - if ts > b.filled_until { + if self.out.len() > OUT_LEN_MAX { + // TODO change api such that we can produce arbitrary bins as stream. + panic!("cycle_01 break out len {}", self.out.len()); + break; + } else if ts > b.filled_until { if ts >= b.active_end { if b.filled_until < b.active_end { self.inner_a.inner_b.fill_until(b.active_end, lst.clone()); @@ -557,7 +578,6 @@ where } else { // TODO should never hit this case. Count. } - // TODO jump to next bin // TODO merge with the other reset // Below uses the same code @@ -609,8 +629,9 @@ where // ALSO: need to keep track of the "lst". Probably best done in this type as well? // TODO should rely on external stream adapter for verification to not duplicate things. - evs.verify()?; - + if VERIFY_INPUT_EVENTS { + evs.verify()?; + } let mut evs = ContainerEventsTakeUpTo::new(evs); loop { @@ -635,6 +656,8 @@ where self.cycle_01(ts); } let n1 = evs.len(); + // TODO instead of mutable constrain/expand, use cheap derived subslices. + // But inner must still communicate back how much was consumed. evs.constrain_up_to_ts(self.inner_a.inner_b.active_end); { trace_ingest_container!( @@ -651,14 +674,18 @@ where } else { self.ingest_ordered(&mut evs)? }; - trace_ingest_container_2!("ingest after still left len evs {}", evs.len()); + trace_ingest_container_2!("ingest after still left evs len {}", evs.len()); } evs.extend_to_all(); let n2 = evs.len(); + trace_ingest_container_2!("ingest extended again to all evs len {}", evs.len()); if n2 == 0 { // done + } else if n2 >= n1 { + let e = Error::IngestNoProgress(n1, n2); + debug!("{}", e); + return Err(e); } else { - assert!(n2 < n1, "no progress"); continue; } } else { @@ -669,13 +696,13 @@ where } pub fn input_done_range_final(&mut self) -> Result<(), Error> { - trace_cycle!("input_done_range_final"); + trace_cycle!("{}input_done_range_final{}", COL1, RST); self.cycle_01(self.range.nano_end()); Ok(()) } pub fn input_done_range_open(&mut self) -> Result<(), Error> { - trace_cycle!("input_done_range_open"); + trace_cycle!("{}input_done_range_open{}", COL1, RST); self.cycle_02(); Ok(()) } diff --git a/src/binning/timeweight/timeweight_events_dyn.rs b/src/binning/timeweight/timeweight_events_dyn.rs index 4ca3309..8d32240 100644 --- a/src/binning/timeweight/timeweight_events_dyn.rs +++ b/src/binning/timeweight/timeweight_events_dyn.rs @@ -2,10 +2,8 @@ use super::timeweight_events::BinnedEventsTimeweight; use crate::binning::container_events::ContainerEvents; use crate::binning::container_events::EventValueType; use crate::channelevents::ChannelEvents; -use crate::log::*; +use crate::log; use daqbuf_err as err; -use err::thiserror; -use err::ThisError; use futures_util::Stream; use futures_util::StreamExt; use items_0::streamitem::LogItem; @@ -22,15 +20,23 @@ use std::pin::Pin; use std::task::Context; use std::task::Poll; -macro_rules! trace_input_container { ($($arg:tt)*) => ( if false { trace!($($arg)*); }) } +macro_rules! debug_input_container { ($($arg:expr),*) => ( if true { log::debug!($($arg),*); }) } -macro_rules! trace_emit { ($($arg:tt)*) => ( if false { trace!($($arg)*); }) } +macro_rules! debug { ($($arg:expr),*) => ( if true { log::debug!($($arg),*); }) } -#[derive(Debug, ThisError)] -#[cstm(name = "BinnedEventsTimeweightDyn")] -pub enum Error { - InnerDynMissing, -} +macro_rules! trace_input_container { ($($arg:expr),*) => ( if true { log::trace!($($arg),*); }) } + +macro_rules! trace_emit { ($($arg:expr),*) => ( if true { log::trace!($($arg),*); }) } + +macro_rules! trace { ($($arg:expr),*) => ( if true { log::trace!($($arg),*); }) } + +autoerr::create_error_v1!( + name(Error, "BinnedEventsTimeweightDyn"), + enum variants { + InnerDynMissing, + Dummy, + }, +); #[derive(Debug)] pub struct BinnedEventsTimeweightDynbox @@ -56,23 +62,11 @@ impl BinnedEventsTimeweightTrait for BinnedEventsTimeweightDynbox where EVT: EventValueType, { + fn cnt_zero_enable(&mut self) { + self.binner.cnt_zero_enable(); + } + fn ingest(&mut self, evs: &EventsBoxed) -> Result<(), BinningggError> { - // let a = (&evs as &dyn any::Any).downcast_ref::(); - // evs.downcast::(); - // evs.as_anybox().downcast::>(); - // match evs.to_anybox().downcast::>() { - // Ok(evs) => { - // let evs = { - // let a = evs; - // *a - // }; - // Ok(self.binner.ingest(evs)?) - // } - // Err(_) => Err(BinningggError::TypeMismatch { - // have: evs.type_name().into(), - // expect: std::any::type_name::>().into(), - // }), - // } match evs.as_any_ref().downcast_ref::>() { Some(evs) => Ok(self.binner.ingest(evs)?), None => { @@ -107,6 +101,7 @@ where pub struct BinnedEventsTimeweightLazy { range: BinnedRange, binned_events: Option>, + enable_cnt_zero: bool, } impl BinnedEventsTimeweightLazy { @@ -114,14 +109,30 @@ impl BinnedEventsTimeweightLazy { Self { range, binned_events: None, + enable_cnt_zero: false, } } + + pub fn with_cnt_zero(mut self) -> Self { + self.enable_cnt_zero = true; + self + } } impl BinnedEventsTimeweightTrait for BinnedEventsTimeweightLazy { + fn cnt_zero_enable(&mut self) { + self.enable_cnt_zero = true; + } + fn ingest(&mut self, evs: &EventsBoxed) -> Result<(), BinningggError> { self.binned_events - .get_or_insert_with(|| evs.binned_events_timeweight_traitobj(self.range.clone())) + .get_or_insert_with(|| { + let mut v = evs.binned_events_timeweight_traitobj(self.range.clone()); + if self.enable_cnt_zero { + v.cnt_zero_enable(); + } + v + }) .ingest(evs) } @@ -152,6 +163,7 @@ impl BinnedEventsTimeweightTrait for BinnedEventsTimeweightLazy { enum StreamState { Reading, + Remains, Done, Invalid, } @@ -160,7 +172,7 @@ pub struct BinnedEventsTimeweightStream { state: StreamState, inp: Pin> + Send>>, binned_events: BinnedEventsTimeweightLazy, - range_complete: bool, + range_final: bool, } impl BinnedEventsTimeweightStream { @@ -168,11 +180,12 @@ impl BinnedEventsTimeweightStream { range: BinnedRange, inp: Pin> + Send>>, ) -> Self { + trace!("stream new"); Self { state: StreamState::Reading, inp, - binned_events: BinnedEventsTimeweightLazy::new(range), - range_complete: false, + binned_events: BinnedEventsTimeweightLazy::new(range).with_cnt_zero(), + range_final: false, } } @@ -209,7 +222,7 @@ impl BinnedEventsTimeweightStream { } }, RangeComplete => { - self.range_complete = true; + self.range_final = true; Continue(()) } }, @@ -223,16 +236,51 @@ impl BinnedEventsTimeweightStream { } } + fn test1( + mut self: Pin<&mut Self>, + _cx: &mut Context, + ) -> Result::Item>>>, Error> { + use ControlFlow::*; + use Poll::*; + if false { + Ok(Break(Pending)) + } else if false { + Ok(Continue(())) + } else { + let e = Error::Dummy; + let _ = Err(e)?; + Ok(Break(Pending)) + } + } + + fn test2( + mut self: Pin<&mut Self>, + _cx: &mut Context, + ) -> ControlFlow::Item>>, Error>> { + use ControlFlow::*; + use Poll::*; + if false { + Break(Ok(Pending)) + } else if false { + Continue(()) + } else { + let e = Error::Dummy; + // unfortunately can not use the `?` operator here: + // let _ = Err(e)?; + Break(Ok(Pending)) + } + } + fn handle_eos( mut self: Pin<&mut Self>, _cx: &mut Context, ) -> Poll::Item>> { - trace_input_container!("handle_eos"); + debug_input_container!("handle_eos range final {}", self.range_final); use items_0::streamitem::RangeCompletableItem::*; use items_0::streamitem::StreamItem::*; use Poll::*; - self.state = StreamState::Done; - if self.range_complete { + self.state = StreamState::Remains; + if true || self.range_final { self.binned_events .input_done_range_final() .map_err(err::Error::from_string)?; @@ -241,17 +289,33 @@ impl BinnedEventsTimeweightStream { .input_done_range_open() .map_err(err::Error::from_string)?; } + Ready(None) + } + + fn handle_remains( + mut self: Pin<&mut Self>, + _cx: &mut Context, + ) -> Poll::Item>> { + debug_input_container!("handle_remains"); + use items_0::streamitem::RangeCompletableItem::*; + use items_0::streamitem::StreamItem::*; + use Poll::*; + debug!("handle_remains binner {:?}", self.binned_events); match self .binned_events .output() .map_err(err::Error::from_string)? { Some(x) => { - trace_emit!("seeing ready bins {:?}", x); + // trace_emit!("seeing ready bins {:?}", x); + debug!("seeing ready bins {:?}", x); Ready(Some(Ok(DataItem(Data(x))))) } None => { - let item = LogItem::from_node(888, Level::INFO, format!("no bins ready on eos")); + debug!("no bins ready on eos"); + self.state = StreamState::Done; + let item = + LogItem::from_node(888, log::Level::INFO, format!("no bins ready on eos")); Ready(Some(Ok(Log(item)))) } } @@ -261,14 +325,19 @@ impl BinnedEventsTimeweightStream { mut self: Pin<&mut Self>, cx: &mut Context, ) -> ControlFlow::Item>>> { + debug_input_container!("handle_main"); use ControlFlow::*; use Poll::*; let ret = match &self.state { StreamState::Reading => match self.as_mut().inp.poll_next_unpin(cx) { Ready(Some(x)) => self.as_mut().handle_sitemty(x, cx), - Ready(None) => Break(self.as_mut().handle_eos(cx)), + Ready(None) => { + self.as_mut().handle_eos(cx); + Continue(()) + } Pending => Break(Pending), }, + StreamState::Remains => Break(self.as_mut().handle_remains(cx)), StreamState::Done => { self.state = StreamState::Invalid; Break(Ready(None)) @@ -289,6 +358,7 @@ impl Stream for BinnedEventsTimeweightStream { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use ControlFlow::*; + trace!("poll"); loop { break match self.as_mut().handle_main(cx) { Break(x) => x, diff --git a/src/binning/valuetype.rs b/src/binning/valuetype.rs index e6c2e77..d210929 100644 --- a/src/binning/valuetype.rs +++ b/src/binning/valuetype.rs @@ -2,6 +2,7 @@ use super::aggregator::AggregatorTimeWeight; use super::container_events::Container; use super::container_events::EventValueType; use super::container_events::PartialOrdEvtA; +use crate::log; use core::fmt; use items_0::subfr::SubFrId; use items_0::vecpreview::PreviewRange; @@ -12,6 +13,8 @@ use serde::Deserialize; use serde::Serialize; use std::collections::VecDeque; +macro_rules! trace_ingest_event { ($($arg:expr),*) => ( if false { log::trace!($($arg),*) } ); } + #[derive(Debug, Clone, Serialize, Deserialize)] pub struct EnumVariantContainer { ixs: VecDeque, @@ -46,6 +49,11 @@ impl Container for EnumVariantContainer { self.names.push_back(name); } + fn clear(&mut self) { + self.ixs.clear(); + self.names.clear(); + } + fn get_iter_ty_1(&self, pos: usize) -> Option<::IterTy1<'_>> { if let (Some(&ix), Some(name)) = (self.ixs.get(pos), self.names.get(pos)) { let ret = EnumVariantRef { @@ -100,7 +108,7 @@ impl AggregatorTimeWeight for EnumVariantAggregatorTimeWeight { fn ingest(&mut self, dt: DtNano, bl: DtNano, val: EnumVariant) { let f = dt.ns() as f32 / bl.ns() as f32; - eprintln!("INGEST ENUM {} {:?}", f, val); + trace_ingest_event!("ingest enum {:.3e} {:?}", f, val); self.sum += f * val.ix() as f32; } diff --git a/src/channelevents.rs b/src/channelevents.rs index d4b586e..737366d 100644 --- a/src/channelevents.rs +++ b/src/channelevents.rs @@ -240,7 +240,7 @@ mod serde_channel_events { use std::cell::RefCell; use std::fmt; - macro_rules! trace_serde { ($($arg:tt)*) => ( if false { trace!($($arg)*); }) } + macro_rules! trace_serde { ($($arg:expr),*) => ( if true { trace!($($arg),*); }) } type C01 = ContainerEvents; type C02 = ContainerEvents>; @@ -255,12 +255,16 @@ mod serde_channel_events { T: Serialize + 'static, S: Serializer, { + let s = std::any::type_name::(); + trace_serde!("try_serialize T {}", s); if let Some(x) = v.as_any_ref().downcast_ref::() { ser.serialize_element(x)?; Ok(()) } else { let s = std::any::type_name::(); - Err(serde::ser::Error::custom(format!("expect a {}", s))) + let e = serde::ser::Error::custom(format!("expect a {}", s)); + error!("{}", e); + Err(e) } } @@ -287,15 +291,17 @@ mod serde_channel_events { _ => { *$val.1.borrow_mut() = 1; let msg = format!("serde ser not supported evt id 0x{:x}", nty_id); - // error!("{}", msg); + error!("{}", msg); Err(serde::ser::Error::custom(msg)) } } }}; } + #[derive(Debug)] struct EvRef<'a>(&'a dyn BinningggContainerEventsDyn, RefCell); + #[derive(Debug)] struct EvBox(Box); impl<'a> Serialize for EvRef<'a> { @@ -343,10 +349,18 @@ mod serde_channel_events { T: Deserialize<'de> + BinningggContainerEventsDyn + 'static, { let s = std::any::type_name::(); - trace_serde!("get_2nd_or_err {}", s); + trace_serde!("get_2nd_or_err T {}", s); let obj: T = seq - .next_element()? - .ok_or_else(|| de::Error::missing_field("[2] obj"))?; + .next_element() + .map_err(|e| { + error!("get_2nd_or_err next_element error {}", e); + e + })? + .ok_or_else(|| de::Error::missing_field("[2] obj")) + .map_err(|e| { + error!("get_2nd_or_err error {}", e); + e + })?; Ok(EvBox(Box::new(obj))) } @@ -375,7 +389,11 @@ mod serde_channel_events { String::SUB => get_2nd_or_err::<$cont1, _>(seq), EnumVariant::SUB => get_2nd_or_err::<$cont1, _>(seq), netpod::UnsupEvt::SUB => get_2nd_or_err::<$cont1, _>(seq), - _ => Err(de::Error::custom(&format!("unknown nty 0x{:x}", nty))), + _ => { + let e = de::Error::custom(&format!("unknown nty 0x{:x}", nty)); + error!("{}", e); + Err(e) + } } }}; } @@ -400,7 +418,7 @@ mod serde_channel_events { .ok_or_else(|| de::Error::missing_field("[1] nty"))?; let seq = &mut seq; trace_serde!("EvBoxVis::visit_seq cty 0x{:x} nty 0x{:x}", cty, nty); - if is_container_events(cty) { + let ret = if is_container_events(cty) { if is_pulsed_subfr(nty) { if is_vec_subfr(nty) { de_inner_nty!(seq, C04, nty) @@ -417,7 +435,16 @@ mod serde_channel_events { } else { error!("unsupported serde cty 0x{:x} nty 0x{:x}", cty, nty); Err(de::Error::custom(&format!("unknown cty 0x{:x}", cty))) - } + }; + trace_serde!("EvBoxVis::visit_seq ret {:?}", ret); + ret + } + + fn visit_map(self, map: A) -> Result + where + A: de::MapAccess<'de>, + { + panic!("EvBoxVis visit_map"); } } @@ -435,7 +462,7 @@ mod serde_channel_events { where S: Serializer, { - let name = "ChannelEvents"; + let name = ChannelEventsVis::name(); let vars = ChannelEventsVis::allowed_variants(); match self { ChannelEvents::Events(obj) => { @@ -612,9 +639,9 @@ mod test_channel_events_serde { evs.push_back(TsNano::from_ns(12), 3.2f32); let item = ChannelEvents::from(evs); let s = serde_json::to_string_pretty(&item).unwrap(); - eprintln!("{s}"); + trace!("{}", s); let w: ChannelEvents = serde_json::from_str(&s).unwrap(); - eprintln!("{w:?}"); + trace!("{:?}", w); } type OptsTy = WithOtherTrailing< @@ -632,6 +659,28 @@ mod test_channel_events_serde { } #[test] + fn channel_events_postcard() { + let mut evs = ContainerEvents::::new(); + evs.push_back(TsNano::from_ns(8), 3.0); + evs.push_back(TsNano::from_ns(12), 3.2); + let item = ChannelEvents::from(evs); + let out = postcard::to_stdvec(&item).unwrap(); + trace!("serialized into {} bytes", out.len()); + let item: ChannelEvents = postcard::from_bytes(&out).unwrap(); + let item = if let ChannelEvents::Events(x) = item { + x + } else { + panic!() + }; + let item: &ContainerEvents = item.as_any_ref().downcast_ref().unwrap(); + use items_0::merge::MergeableTy; + assert_eq!(item.tss_for_testing().len(), 2); + assert_eq!(item.tss_for_testing()[1], TsNano::from_ns(12)); + } + + // TODO some unresolved issue with bincode + #[allow(unused)] + // #[test] fn channel_events_bincode() { let mut evs = ContainerEvents::::new(); evs.push_back(TsNano::from_ns(8), 3.0); @@ -641,7 +690,7 @@ mod test_channel_events_serde { let mut out = Vec::new(); let mut ser = bincode::Serializer::new(&mut out, opts); item.serialize(&mut ser).unwrap(); - eprintln!("serialized into {} bytes", out.len()); + trace!("serialized into {} bytes", out.len()); let mut de = bincode::Deserializer::from_slice(&out, opts); let item = ::deserialize(&mut de).unwrap(); let item = if let ChannelEvents::Events(x) = item { @@ -670,7 +719,7 @@ mod test_channel_events_serde { let mut out = Vec::new(); let mut ser = bincode::Serializer::new(&mut out, opts); item.serialize(&mut ser).unwrap(); - eprintln!("serialized into {} bytes", out.len()); + trace!("serialized into {} bytes", out.len()); let mut de = bincode::Deserializer::from_slice(&out, opts); let item = ::deserialize(&mut de).unwrap(); let item = if let ChannelEvents::Status(x) = item {