Bin stream test

This commit is contained in:
Dominik Werder
2025-02-13 11:52:29 +01:00
parent bfa1e71be8
commit 14fb35a1b3
8 changed files with 299 additions and 117 deletions

View File

@@ -63,6 +63,7 @@ where
fn new() -> Self;
fn len(&self) -> usize;
fn push_back(&mut self, val: EVT);
fn clear(&mut self);
fn get_iter_ty_1(&self, pos: usize) -> Option<EVT::IterTy1<'_>>;
fn iter_ty_1(&self) -> impl Iterator<Item = EVT::IterTy1<'_>>;
fn drain_into(&mut self, dst: &mut Self, range: Range<usize>);
@@ -102,6 +103,10 @@ where
self.push_back(val);
}
fn clear(&mut self) {
self.clear();
}
fn get_iter_ty_1(&self, pos: usize) -> Option<EVT::IterTy1<'_>> {
self.get(pos).map(|x| x.clone())
}
@@ -136,6 +141,10 @@ impl Container<String> for VecDeque<String> {
self.push_back(val);
}
fn clear(&mut self) {
self.clear();
}
fn get_iter_ty_1(&self, pos: usize) -> Option<&str> {
self.get(pos).map(|x| x.as_str())
}
@@ -512,6 +521,11 @@ where
self.vals.push_back(val.1);
}
fn clear(&mut self) {
self.pulses.clear();
self.vals.clear();
}
fn get_iter_ty_1(&self, pos: usize) -> Option<<PulsedVal<EVT> as EventValueType>::IterTy1<'_>> {
if let (Some(&pulse), Some(val)) = (self.pulses.get(pos), self.vals.get_iter_ty_1(pos)) {
let x = PulsedValIterTy { pulse, evt: val };
@@ -794,6 +808,12 @@ where
pub fn serde_id() -> u32 {
items_0::streamitem::CONTAINER_EVENTS_TYPE_ID
}
pub fn clear(&mut self) {
self.tss.clear();
self.vals.clear();
self.byte_estimate = 0;
}
}
impl<EVT> fmt::Debug for ContainerEvents<EVT>

View File

@@ -1,11 +1,13 @@
use crate::binning::container_bins::ContainerBins;
use std::collections::VecDeque;
#[derive(Debug, thiserror::Error)]
#[cstm(name = "Compare")]
pub enum Error {
AssertMsg(String),
}
autoerr::create_error_v1!(
name(Error, "Compare"),
enum variants {
AssertMsg(String),
BinLenMismatch(usize, usize),
},
);
pub(super) trait IntoVecDequeU64 {
fn into_vec_deque_u64(self) -> VecDeque<u64>;
@@ -93,11 +95,13 @@ pub(super) fn exp_cnts(
bins: &ContainerBins<f32, f32>,
exps: impl IntoVecDequeU64,
) -> Result<(), Error> {
exp_u64(
bins.cnts_iter(),
exps.into_vec_deque_u64().iter(),
"exp_cnts",
)
let exps = exps.into_vec_deque_u64();
if bins.len() != exps.len() {
let e = Error::BinLenMismatch(bins.len(), exps.len());
Err(e)
} else {
exp_u64(bins.cnts_iter(), exps.iter(), "exp_cnts")
}
}
pub(super) fn exp_mins(

View File

@@ -2,23 +2,23 @@ use super::compare::exp_avgs;
use super::compare::exp_cnts;
use super::compare::exp_maxs;
use super::compare::exp_mins;
use crate::binning::container_bins::ContainerBins;
use crate::binning::container_events::ContainerEvents;
use crate::binning::timeweight::timeweight_events::BinnedEventsTimeweight;
use netpod::log::*;
use crate::log::*;
use netpod::range::evrange::NanoRange;
use netpod::BinnedRange;
use netpod::DtMs;
use netpod::EnumVariant;
use netpod::TsNano;
#[derive(Debug, thiserror::Error)]
#[cstm(name = "Error")]
enum Error {
Timeweight(#[from] crate::binning::timeweight::timeweight_events::Error),
AssertMsg(String),
Compare(#[from] super::compare::Error),
}
autoerr::create_error_v1!(
name(Error, "Error"),
enum variants {
Timeweight(#[from] crate::binning::timeweight::timeweight_events::Error),
AssertMsg(String),
Compare(#[from] super::compare::Error),
},
);
// fn prepare_data_with_cuts(beg_ms: u64, cuts: VecDeque<u64>) -> VecDeque<ContainerEvents<f32>> {
// let beg = TsNano::from_ms(beg_ms);
@@ -69,7 +69,9 @@ fn test_bin_events_f32_simple_with_before_01_range_final() -> Result<(), Error>
end: end.ns(),
};
let range = BinnedRange::from_nano_range(nano_range, DtMs::from_ms_u64(10));
eprintln!("range {:?}", range);
let mut binner = BinnedEventsTimeweight::new(range);
binner.cnt_zero_enable();
let mut evs = ContainerEvents::<f32>::new();
let em = &mut evs;
pu(em, 103, 2.0);
@@ -109,7 +111,7 @@ fn test_bin_events_f32_simple_00() -> Result<(), Error> {
binner.input_done_range_open()?;
let bins = binner.output();
for b in bins.iter_debug() {
trace!("{b:?}");
trace!("{:?}", b);
}
exp_cnts(&bins, "2 3")?;
exp_mins(&bins, "2. 1.")?;
@@ -144,7 +146,7 @@ fn test_bin_events_f32_simple_01() -> Result<(), Error> {
binner.input_done_range_open()?;
let bins = binner.output();
for b in bins.iter_debug() {
trace!("{b:?}");
trace!("{:?}", b);
}
assert_eq!(bins.len(), 2);
exp_cnts(&bins, "2 3")?;
@@ -180,7 +182,7 @@ fn test_bin_events_f32_small_range_final() -> Result<(), Error> {
binner.input_done_range_final()?;
let bins = binner.output();
for b in bins.iter_debug() {
trace!("{b:?}");
trace!("{:?}", b);
}
assert_eq!(bins.len(), 2);
exp_cnts(&bins, "2 3")?;
@@ -202,6 +204,7 @@ fn test_bin_events_f32_small_intermittent_silence_range_open() -> Result<(), Err
};
let range = BinnedRange::from_nano_range(nano_range, DtMs::from_ms_u64(10));
let mut binner = BinnedEventsTimeweight::new(range);
binner.cnt_zero_enable();
let mut evs = ContainerEvents::<f32>::new();
let em = &mut evs;
pu(em, 102, 2.0);
@@ -223,7 +226,7 @@ fn test_bin_events_f32_small_intermittent_silence_range_open() -> Result<(), Err
binner.input_done_range_open()?;
let bins = binner.output();
for b in bins.iter_debug() {
trace!("{b:?}");
trace!("{:?}", b);
}
assert_eq!(bins.len(), 5);
exp_cnts(&bins, "2 3 0 0 2")?;
@@ -246,6 +249,7 @@ fn test_bin_events_f32_small_intermittent_silence_range_final() -> Result<(), Er
};
let range = BinnedRange::from_nano_range(nano_range, bin_len);
let mut binner = BinnedEventsTimeweight::new(range);
binner.cnt_zero_enable();
let mut evs = ContainerEvents::<f32>::new();
let em = &mut evs;
pu(em, 102, 2.0);
@@ -267,7 +271,7 @@ fn test_bin_events_f32_small_intermittent_silence_range_final() -> Result<(), Er
binner.input_done_range_final()?;
let bins = binner.output();
for b in bins.iter_debug() {
trace!("{b:?}");
trace!("{:?}", b);
}
exp_cnts(&bins, "2 3 0 0 2")?;
exp_mins(&bins, "2.0 1.0 1.4 1.4 1.2")?;
@@ -308,7 +312,7 @@ fn test_bin_events_f32_small_intermittent_silence_minmax_no_edge_range_final() -
binner.input_done_range_final()?;
let bins = binner.output();
for b in bins.iter_debug() {
trace!("{b:?}");
trace!("{:?}", b);
}
exp_cnts(&bins, "1")?;
exp_mins(&bins, "40.")?;
@@ -347,7 +351,7 @@ fn test_bin_events_f32_small_intermittent_silence_minmax_edge_range_final() -> R
binner.input_done_range_final()?;
let bins = binner.output();
for b in bins.iter_debug() {
trace!("{b:?}");
trace!("{:?}", b);
}
exp_cnts(&bins, "1")?;
exp_mins(&bins, "40.")?;
@@ -367,12 +371,14 @@ fn test_bin_events_enum_simple_range_final() -> Result<(), Error> {
};
let range = BinnedRange::from_nano_range(nano_range, DtMs::from_ms_u64(10));
let mut binner = BinnedEventsTimeweight::new(range);
binner.cnt_zero_enable();
let mut evs = ContainerEvents::new();
evs.push_back(TsNano::from_ms(103), EnumVariant::new(1, "one"));
evs.push_back(TsNano::from_ms(104), EnumVariant::new(2, "two"));
binner.ingest(&evs)?;
binner.input_done_range_final()?;
let bins = binner.output();
assert_eq!(bins.len(), 2);
Ok(())
}
@@ -386,11 +392,13 @@ fn test_bin_events_string_simple_range_final() -> Result<(), Error> {
};
let range = BinnedRange::from_nano_range(nano_range, DtMs::from_ms_u64(10));
let mut binner = BinnedEventsTimeweight::new(range);
binner.cnt_zero_enable();
let mut evs = ContainerEvents::new();
evs.push_back(TsNano::from_ms(103), EnumVariant::new(1, "one"));
evs.push_back(TsNano::from_ms(104), EnumVariant::new(2, "two"));
binner.ingest(&evs)?;
binner.input_done_range_final()?;
let bins = binner.output();
assert_eq!(bins.len(), 2);
Ok(())
}

View File

@@ -5,10 +5,6 @@ use items_0::timebin::BinsBoxed;
use netpod::BinnedRange;
use netpod::TsNano;
#[derive(Debug, thiserror::Error)]
#[cstm(name = "BinnedBinsLazy")]
pub enum Error {}
#[derive(Debug)]
pub struct BinnedBinsTimeweightLazy {
range: BinnedRange<TsNano>,

View File

@@ -1,37 +1,49 @@
use super::super::container_events::EventValueType;
use crate::binning::aggregator::AggregatorTimeWeight;
use crate::binning::container_bins::ContainerBins;
use crate::binning::container_events::ContainerEvents;
use crate::binning::container_events::ContainerEventsTakeUpTo;
use crate::binning::container_events::EventSingle;
use crate::binning::container_events::EventSingleRef;
use crate::binning::container_events::EventValueType;
use crate::binning::container_events::PartialOrdEvtA;
use crate::log::*;
use core::fmt;
use crate::log;
use netpod::BinnedRange;
use netpod::DtNano;
use netpod::TsNano;
use std::fmt;
use std::mem;
macro_rules! trace_ { ($($arg:tt)*) => ( if false { trace!($($arg)*); }) }
macro_rules! debug { ($($arg:expr),*) => ( if true { log::debug!($($arg),*); } ) }
macro_rules! trace_init { ($($arg:tt)*) => ( if true { trace_!($($arg)*); }) }
macro_rules! trace_ { ($($arg:expr),*) => ( if true { log::trace!($($arg),*); } ) }
macro_rules! trace_cycle { ($($arg:tt)*) => ( if true { trace_!($($arg)*); }) }
macro_rules! trace_init { ($($arg:expr),*) => ( if true { trace_!($($arg),*); } ) }
macro_rules! trace_event_next { ($($arg:tt)*) => ( if true { trace_!($($arg)*); }) }
macro_rules! trace_output { ($($arg:expr),*) => ( if true { trace_!($($arg),*); }) }
macro_rules! trace_ingest_init_lst { ($($arg:tt)*) => ( if true { trace_!($($arg)*); }) }
macro_rules! trace_cycle { ($($arg:expr),*) => ( if true { trace_!($($arg),*); }) }
macro_rules! trace_ingest_minmax { ($($arg:tt)*) => ( if true { trace_!($($arg)*); }) }
macro_rules! trace_event_next { ($fmt:expr, $($arg:expr),*) => ( if true {
trace_!(concat!("\x1b[1mEVENT POP FRONT\x1b[0m ", $fmt), $($arg),*);
}) }
macro_rules! trace_ingest_event { ($($arg:tt)*) => ( if true { trace_!($($arg)*); }) }
macro_rules! trace_ingest_init_lst { ($($arg:expr),*) => ( if true { trace_!($($arg),*); }) }
macro_rules! trace_ingest_container { ($($arg:tt)*) => ( if true { trace_!($($arg)*); }) }
macro_rules! trace_ingest_minmax { ($($arg:expr),*) => ( if true { trace_!($($arg),*); }) }
macro_rules! trace_ingest_container_2 { ($($arg:tt)*) => ( if true { trace_!($($arg)*); }) }
macro_rules! trace_ingest_event { ($($arg:expr),*) => ( if true { trace_!($($arg),*); }) }
macro_rules! trace_fill_until { ($($arg:tt)*) => ( if true { trace_!($($arg)*); }) }
macro_rules! trace_ingest_container { ($($arg:expr),*) => ( if true { trace_!($($arg),*); }) }
macro_rules! trace_ingest_container_2 { ($($arg:expr),*) => ( if true { trace_!($($arg),*); }) }
macro_rules! trace_fill_until { ($($arg:expr),*) => ( if true { trace_!($($arg),*); }) }
const COL1: &'static str = "\x1b[1m";
const RST: &'static str = "\x1b[0m";
const VERIFY_INPUT_EVENTS: bool = false;
const OUT_LEN_MAX: usize = 20000;
#[cold]
#[inline]
@@ -53,6 +65,7 @@ autoerr::create_error_v1!(
WithMinMaxButEventBeforeRange,
NoMinMaxAfterInit,
ExpectEventWithinRange,
IngestNoProgress(usize, usize),
},
);
@@ -160,7 +173,7 @@ where
let selfname = "ingest_with_lst_gt_range_beg";
trace_ingest_event!("{} len {}", selfname, evs.len());
while let Some(ev) = evs.next() {
trace_event_next!("EVENT POP FRONT {:?} {:30}", ev, selfname);
trace_event_next!("{:?} {:30}", ev, selfname);
if ev.ts <= self.active_beg {
panic!("should never get here");
}
@@ -182,7 +195,7 @@ where
let selfname = "ingest_with_lst_ge_range_beg";
trace_ingest_event!("{} len {}", selfname, evs.len());
while let Some(ev) = evs.next() {
trace_event_next!("EVENT POP FRONT {:?} {:30}", ev, selfname);
trace_event_next!("{:?} {:30}", ev, selfname);
assert!(ev.ts >= self.active_beg);
assert!(ev.ts < self.active_end);
if ev.ts == self.active_beg {
@@ -291,7 +304,7 @@ where
let mut run_ingest_with_lst_minmax = false;
let _ = run_ingest_with_lst_minmax;
if let Some(ev) = evs.next() {
trace_event_next!("EVENT POP FRONT {:?} {:30}", ev, selfname);
trace_event_next!("{:?} {:30}", ev, selfname);
let beg = b.active_beg;
let end = b.active_end;
if ev.ts < beg {
@@ -363,6 +376,7 @@ where
out: &mut ContainerBins<EVT, EVT::AggTimeWeightOutputAvg>,
) {
let selfname = "push_out_and_reset";
trace_output!("{} range_final {}", selfname, range_final);
// TODO there is not always good enough input to produce a meaningful bin.
// TODO can we always reset, and what exactly does reset mean here?
// TODO what logic can I save here? To output a bin I need to have min, max, lst.
@@ -393,11 +407,11 @@ pub struct BinnedEventsTimeweight<EVT>
where
EVT: EventValueType,
{
lst: Option<EventSingle<EVT>>,
range: BinnedRange<TsNano>,
produce_cnt_zero: bool,
lst: Option<EventSingle<EVT>>,
inner_a: InnerA<EVT>,
out: ContainerBins<EVT, EVT::AggTimeWeightOutputAvg>,
produce_cnt_zero: bool,
}
impl<EVT> fmt::Debug for BinnedEventsTimeweight<EVT>
@@ -406,8 +420,9 @@ where
{
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("BinnedEventsTimeweight")
.field("lst", &self.lst)
.field("range", &self.range)
.field("produce_cnt_zero", &self.produce_cnt_zero)
.field("lst", &self.lst)
.field("inner_a", &self.inner_a)
.field("out", &self.out)
.finish()
@@ -418,13 +433,19 @@ impl<EVT> BinnedEventsTimeweight<EVT>
where
EVT: EventValueType,
{
pub fn type_name() -> &'static str {
std::any::type_name::<Self>()
}
pub fn new(range: BinnedRange<TsNano>) -> Self {
trace_init!("BinnedEventsTimeweight::new {}", range);
trace_init!("{}::new {}", Self::type_name(), range);
let active_beg = range.nano_beg();
let active_end = active_beg.add_dt_nano(range.bin_len.to_dt_nano());
let active_len = active_end.delta(active_beg);
Self {
range,
produce_cnt_zero: false,
lst: None,
inner_a: InnerA::<EVT> {
inner_b: InnerB {
cnt: 0,
@@ -439,16 +460,12 @@ where
},
minmax: None,
},
lst: None,
out: ContainerBins::new(),
produce_cnt_zero: true,
}
}
pub fn disable_cnt_zero(self) -> Self {
let mut ret = self;
ret.produce_cnt_zero = false;
ret
pub fn cnt_zero_enable(&mut self) {
self.produce_cnt_zero = true;
}
fn ingest_event_without_lst(&mut self, ev: EventSingleRef<EVT>) -> Result<(), Error> {
@@ -476,7 +493,7 @@ where
let mut run_ingest_with_lst = false;
let _ = run_ingest_with_lst;
if let Some(ev) = evs.next() {
trace_event_next!("EVENT POP FRONT {:?} {:30}", ev, selfname);
trace_event_next!("{:?} {:30}", ev, selfname);
assert!(ev.ts < self.inner_a.inner_b.active_end);
self.ingest_event_without_lst(ev)?;
run_ingest_with_lst = true;
@@ -527,7 +544,11 @@ where
i += 1;
assert!(i < 100000, "too many iterations");
let b = &self.inner_a.inner_b;
if ts > b.filled_until {
if self.out.len() > OUT_LEN_MAX {
// TODO change api such that we can produce arbitrary bins as stream.
panic!("cycle_01 break out len {}", self.out.len());
break;
} else if ts > b.filled_until {
if ts >= b.active_end {
if b.filled_until < b.active_end {
self.inner_a.inner_b.fill_until(b.active_end, lst.clone());
@@ -557,7 +578,6 @@ where
} else {
// TODO should never hit this case. Count.
}
// TODO jump to next bin
// TODO merge with the other reset
// Below uses the same code
@@ -609,8 +629,9 @@ where
// ALSO: need to keep track of the "lst". Probably best done in this type as well?
// TODO should rely on external stream adapter for verification to not duplicate things.
evs.verify()?;
if VERIFY_INPUT_EVENTS {
evs.verify()?;
}
let mut evs = ContainerEventsTakeUpTo::new(evs);
loop {
@@ -635,6 +656,8 @@ where
self.cycle_01(ts);
}
let n1 = evs.len();
// TODO instead of mutable constrain/expand, use cheap derived subslices.
// But inner must still communicate back how much was consumed.
evs.constrain_up_to_ts(self.inner_a.inner_b.active_end);
{
trace_ingest_container!(
@@ -651,14 +674,18 @@ where
} else {
self.ingest_ordered(&mut evs)?
};
trace_ingest_container_2!("ingest after still left len evs {}", evs.len());
trace_ingest_container_2!("ingest after still left evs len {}", evs.len());
}
evs.extend_to_all();
let n2 = evs.len();
trace_ingest_container_2!("ingest extended again to all evs len {}", evs.len());
if n2 == 0 {
// done
} else if n2 >= n1 {
let e = Error::IngestNoProgress(n1, n2);
debug!("{}", e);
return Err(e);
} else {
assert!(n2 < n1, "no progress");
continue;
}
} else {
@@ -669,13 +696,13 @@ where
}
pub fn input_done_range_final(&mut self) -> Result<(), Error> {
trace_cycle!("input_done_range_final");
trace_cycle!("{}input_done_range_final{}", COL1, RST);
self.cycle_01(self.range.nano_end());
Ok(())
}
pub fn input_done_range_open(&mut self) -> Result<(), Error> {
trace_cycle!("input_done_range_open");
trace_cycle!("{}input_done_range_open{}", COL1, RST);
self.cycle_02();
Ok(())
}

View File

@@ -2,10 +2,8 @@ use super::timeweight_events::BinnedEventsTimeweight;
use crate::binning::container_events::ContainerEvents;
use crate::binning::container_events::EventValueType;
use crate::channelevents::ChannelEvents;
use crate::log::*;
use crate::log;
use daqbuf_err as err;
use err::thiserror;
use err::ThisError;
use futures_util::Stream;
use futures_util::StreamExt;
use items_0::streamitem::LogItem;
@@ -22,15 +20,23 @@ use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
macro_rules! trace_input_container { ($($arg:tt)*) => ( if false { trace!($($arg)*); }) }
macro_rules! debug_input_container { ($($arg:expr),*) => ( if true { log::debug!($($arg),*); }) }
macro_rules! trace_emit { ($($arg:tt)*) => ( if false { trace!($($arg)*); }) }
macro_rules! debug { ($($arg:expr),*) => ( if true { log::debug!($($arg),*); }) }
#[derive(Debug, ThisError)]
#[cstm(name = "BinnedEventsTimeweightDyn")]
pub enum Error {
InnerDynMissing,
}
macro_rules! trace_input_container { ($($arg:expr),*) => ( if true { log::trace!($($arg),*); }) }
macro_rules! trace_emit { ($($arg:expr),*) => ( if true { log::trace!($($arg),*); }) }
macro_rules! trace { ($($arg:expr),*) => ( if true { log::trace!($($arg),*); }) }
autoerr::create_error_v1!(
name(Error, "BinnedEventsTimeweightDyn"),
enum variants {
InnerDynMissing,
Dummy,
},
);
#[derive(Debug)]
pub struct BinnedEventsTimeweightDynbox<EVT>
@@ -56,23 +62,11 @@ impl<EVT> BinnedEventsTimeweightTrait for BinnedEventsTimeweightDynbox<EVT>
where
EVT: EventValueType,
{
fn cnt_zero_enable(&mut self) {
self.binner.cnt_zero_enable();
}
fn ingest(&mut self, evs: &EventsBoxed) -> Result<(), BinningggError> {
// let a = (&evs as &dyn any::Any).downcast_ref::<String>();
// evs.downcast::<String>();
// evs.as_anybox().downcast::<ContainerEvents<f64>>();
// match evs.to_anybox().downcast::<ContainerEvents<EVT>>() {
// Ok(evs) => {
// let evs = {
// let a = evs;
// *a
// };
// Ok(self.binner.ingest(evs)?)
// }
// Err(_) => Err(BinningggError::TypeMismatch {
// have: evs.type_name().into(),
// expect: std::any::type_name::<ContainerEvents<EVT>>().into(),
// }),
// }
match evs.as_any_ref().downcast_ref::<ContainerEvents<EVT>>() {
Some(evs) => Ok(self.binner.ingest(evs)?),
None => {
@@ -107,6 +101,7 @@ where
pub struct BinnedEventsTimeweightLazy {
range: BinnedRange<TsNano>,
binned_events: Option<Box<dyn BinnedEventsTimeweightTrait>>,
enable_cnt_zero: bool,
}
impl BinnedEventsTimeweightLazy {
@@ -114,14 +109,30 @@ impl BinnedEventsTimeweightLazy {
Self {
range,
binned_events: None,
enable_cnt_zero: false,
}
}
pub fn with_cnt_zero(mut self) -> Self {
self.enable_cnt_zero = true;
self
}
}
impl BinnedEventsTimeweightTrait for BinnedEventsTimeweightLazy {
fn cnt_zero_enable(&mut self) {
self.enable_cnt_zero = true;
}
fn ingest(&mut self, evs: &EventsBoxed) -> Result<(), BinningggError> {
self.binned_events
.get_or_insert_with(|| evs.binned_events_timeweight_traitobj(self.range.clone()))
.get_or_insert_with(|| {
let mut v = evs.binned_events_timeweight_traitobj(self.range.clone());
if self.enable_cnt_zero {
v.cnt_zero_enable();
}
v
})
.ingest(evs)
}
@@ -152,6 +163,7 @@ impl BinnedEventsTimeweightTrait for BinnedEventsTimeweightLazy {
enum StreamState {
Reading,
Remains,
Done,
Invalid,
}
@@ -160,7 +172,7 @@ pub struct BinnedEventsTimeweightStream {
state: StreamState,
inp: Pin<Box<dyn Stream<Item = Sitemty<ChannelEvents>> + Send>>,
binned_events: BinnedEventsTimeweightLazy,
range_complete: bool,
range_final: bool,
}
impl BinnedEventsTimeweightStream {
@@ -168,11 +180,12 @@ impl BinnedEventsTimeweightStream {
range: BinnedRange<TsNano>,
inp: Pin<Box<dyn Stream<Item = Sitemty<ChannelEvents>> + Send>>,
) -> Self {
trace!("stream new");
Self {
state: StreamState::Reading,
inp,
binned_events: BinnedEventsTimeweightLazy::new(range),
range_complete: false,
binned_events: BinnedEventsTimeweightLazy::new(range).with_cnt_zero(),
range_final: false,
}
}
@@ -209,7 +222,7 @@ impl BinnedEventsTimeweightStream {
}
},
RangeComplete => {
self.range_complete = true;
self.range_final = true;
Continue(())
}
},
@@ -223,16 +236,51 @@ impl BinnedEventsTimeweightStream {
}
}
fn test1(
mut self: Pin<&mut Self>,
_cx: &mut Context,
) -> Result<ControlFlow<Poll<Option<<Self as Stream>::Item>>>, Error> {
use ControlFlow::*;
use Poll::*;
if false {
Ok(Break(Pending))
} else if false {
Ok(Continue(()))
} else {
let e = Error::Dummy;
let _ = Err(e)?;
Ok(Break(Pending))
}
}
fn test2(
mut self: Pin<&mut Self>,
_cx: &mut Context,
) -> ControlFlow<Result<Poll<Option<<Self as Stream>::Item>>, Error>> {
use ControlFlow::*;
use Poll::*;
if false {
Break(Ok(Pending))
} else if false {
Continue(())
} else {
let e = Error::Dummy;
// unfortunately can not use the `?` operator here:
// let _ = Err(e)?;
Break(Ok(Pending))
}
}
fn handle_eos(
mut self: Pin<&mut Self>,
_cx: &mut Context,
) -> Poll<Option<<Self as Stream>::Item>> {
trace_input_container!("handle_eos");
debug_input_container!("handle_eos range final {}", self.range_final);
use items_0::streamitem::RangeCompletableItem::*;
use items_0::streamitem::StreamItem::*;
use Poll::*;
self.state = StreamState::Done;
if self.range_complete {
self.state = StreamState::Remains;
if true || self.range_final {
self.binned_events
.input_done_range_final()
.map_err(err::Error::from_string)?;
@@ -241,17 +289,33 @@ impl BinnedEventsTimeweightStream {
.input_done_range_open()
.map_err(err::Error::from_string)?;
}
Ready(None)
}
fn handle_remains(
mut self: Pin<&mut Self>,
_cx: &mut Context,
) -> Poll<Option<<Self as Stream>::Item>> {
debug_input_container!("handle_remains");
use items_0::streamitem::RangeCompletableItem::*;
use items_0::streamitem::StreamItem::*;
use Poll::*;
debug!("handle_remains binner {:?}", self.binned_events);
match self
.binned_events
.output()
.map_err(err::Error::from_string)?
{
Some(x) => {
trace_emit!("seeing ready bins {:?}", x);
// trace_emit!("seeing ready bins {:?}", x);
debug!("seeing ready bins {:?}", x);
Ready(Some(Ok(DataItem(Data(x)))))
}
None => {
let item = LogItem::from_node(888, Level::INFO, format!("no bins ready on eos"));
debug!("no bins ready on eos");
self.state = StreamState::Done;
let item =
LogItem::from_node(888, log::Level::INFO, format!("no bins ready on eos"));
Ready(Some(Ok(Log(item))))
}
}
@@ -261,14 +325,19 @@ impl BinnedEventsTimeweightStream {
mut self: Pin<&mut Self>,
cx: &mut Context,
) -> ControlFlow<Poll<Option<<Self as Stream>::Item>>> {
debug_input_container!("handle_main");
use ControlFlow::*;
use Poll::*;
let ret = match &self.state {
StreamState::Reading => match self.as_mut().inp.poll_next_unpin(cx) {
Ready(Some(x)) => self.as_mut().handle_sitemty(x, cx),
Ready(None) => Break(self.as_mut().handle_eos(cx)),
Ready(None) => {
self.as_mut().handle_eos(cx);
Continue(())
}
Pending => Break(Pending),
},
StreamState::Remains => Break(self.as_mut().handle_remains(cx)),
StreamState::Done => {
self.state = StreamState::Invalid;
Break(Ready(None))
@@ -289,6 +358,7 @@ impl Stream for BinnedEventsTimeweightStream {
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use ControlFlow::*;
trace!("poll");
loop {
break match self.as_mut().handle_main(cx) {
Break(x) => x,

View File

@@ -2,6 +2,7 @@ use super::aggregator::AggregatorTimeWeight;
use super::container_events::Container;
use super::container_events::EventValueType;
use super::container_events::PartialOrdEvtA;
use crate::log;
use core::fmt;
use items_0::subfr::SubFrId;
use items_0::vecpreview::PreviewRange;
@@ -12,6 +13,8 @@ use serde::Deserialize;
use serde::Serialize;
use std::collections::VecDeque;
macro_rules! trace_ingest_event { ($($arg:expr),*) => ( if false { log::trace!($($arg),*) } ); }
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EnumVariantContainer {
ixs: VecDeque<u16>,
@@ -46,6 +49,11 @@ impl Container<EnumVariant> for EnumVariantContainer {
self.names.push_back(name);
}
fn clear(&mut self) {
self.ixs.clear();
self.names.clear();
}
fn get_iter_ty_1(&self, pos: usize) -> Option<<EnumVariant as EventValueType>::IterTy1<'_>> {
if let (Some(&ix), Some(name)) = (self.ixs.get(pos), self.names.get(pos)) {
let ret = EnumVariantRef {
@@ -100,7 +108,7 @@ impl AggregatorTimeWeight<EnumVariant> for EnumVariantAggregatorTimeWeight {
fn ingest(&mut self, dt: DtNano, bl: DtNano, val: EnumVariant) {
let f = dt.ns() as f32 / bl.ns() as f32;
eprintln!("INGEST ENUM {} {:?}", f, val);
trace_ingest_event!("ingest enum {:.3e} {:?}", f, val);
self.sum += f * val.ix() as f32;
}

View File

@@ -240,7 +240,7 @@ mod serde_channel_events {
use std::cell::RefCell;
use std::fmt;
macro_rules! trace_serde { ($($arg:tt)*) => ( if false { trace!($($arg)*); }) }
macro_rules! trace_serde { ($($arg:expr),*) => ( if true { trace!($($arg),*); }) }
type C01<T> = ContainerEvents<T>;
type C02<T> = ContainerEvents<Vec<T>>;
@@ -255,12 +255,16 @@ mod serde_channel_events {
T: Serialize + 'static,
S: Serializer,
{
let s = std::any::type_name::<T>();
trace_serde!("try_serialize T {}", s);
if let Some(x) = v.as_any_ref().downcast_ref::<T>() {
ser.serialize_element(x)?;
Ok(())
} else {
let s = std::any::type_name::<T>();
Err(serde::ser::Error::custom(format!("expect a {}", s)))
let e = serde::ser::Error::custom(format!("expect a {}", s));
error!("{}", e);
Err(e)
}
}
@@ -287,15 +291,17 @@ mod serde_channel_events {
_ => {
*$val.1.borrow_mut() = 1;
let msg = format!("serde ser not supported evt id 0x{:x}", nty_id);
// error!("{}", msg);
error!("{}", msg);
Err(serde::ser::Error::custom(msg))
}
}
}};
}
#[derive(Debug)]
struct EvRef<'a>(&'a dyn BinningggContainerEventsDyn, RefCell<u8>);
#[derive(Debug)]
struct EvBox(Box<dyn BinningggContainerEventsDyn>);
impl<'a> Serialize for EvRef<'a> {
@@ -343,10 +349,18 @@ mod serde_channel_events {
T: Deserialize<'de> + BinningggContainerEventsDyn + 'static,
{
let s = std::any::type_name::<T>();
trace_serde!("get_2nd_or_err {}", s);
trace_serde!("get_2nd_or_err T {}", s);
let obj: T = seq
.next_element()?
.ok_or_else(|| de::Error::missing_field("[2] obj"))?;
.next_element()
.map_err(|e| {
error!("get_2nd_or_err next_element error {}", e);
e
})?
.ok_or_else(|| de::Error::missing_field("[2] obj"))
.map_err(|e| {
error!("get_2nd_or_err error {}", e);
e
})?;
Ok(EvBox(Box::new(obj)))
}
@@ -375,7 +389,11 @@ mod serde_channel_events {
String::SUB => get_2nd_or_err::<$cont1<String>, _>(seq),
EnumVariant::SUB => get_2nd_or_err::<$cont1<EnumVariant>, _>(seq),
netpod::UnsupEvt::SUB => get_2nd_or_err::<$cont1<netpod::UnsupEvt>, _>(seq),
_ => Err(de::Error::custom(&format!("unknown nty 0x{:x}", nty))),
_ => {
let e = de::Error::custom(&format!("unknown nty 0x{:x}", nty));
error!("{}", e);
Err(e)
}
}
}};
}
@@ -400,7 +418,7 @@ mod serde_channel_events {
.ok_or_else(|| de::Error::missing_field("[1] nty"))?;
let seq = &mut seq;
trace_serde!("EvBoxVis::visit_seq cty 0x{:x} nty 0x{:x}", cty, nty);
if is_container_events(cty) {
let ret = if is_container_events(cty) {
if is_pulsed_subfr(nty) {
if is_vec_subfr(nty) {
de_inner_nty!(seq, C04, nty)
@@ -417,7 +435,16 @@ mod serde_channel_events {
} else {
error!("unsupported serde cty 0x{:x} nty 0x{:x}", cty, nty);
Err(de::Error::custom(&format!("unknown cty 0x{:x}", cty)))
}
};
trace_serde!("EvBoxVis::visit_seq ret {:?}", ret);
ret
}
fn visit_map<A>(self, map: A) -> Result<Self::Value, A::Error>
where
A: de::MapAccess<'de>,
{
panic!("EvBoxVis visit_map");
}
}
@@ -435,7 +462,7 @@ mod serde_channel_events {
where
S: Serializer,
{
let name = "ChannelEvents";
let name = ChannelEventsVis::name();
let vars = ChannelEventsVis::allowed_variants();
match self {
ChannelEvents::Events(obj) => {
@@ -612,9 +639,9 @@ mod test_channel_events_serde {
evs.push_back(TsNano::from_ns(12), 3.2f32);
let item = ChannelEvents::from(evs);
let s = serde_json::to_string_pretty(&item).unwrap();
eprintln!("{s}");
trace!("{}", s);
let w: ChannelEvents = serde_json::from_str(&s).unwrap();
eprintln!("{w:?}");
trace!("{:?}", w);
}
type OptsTy = WithOtherTrailing<
@@ -632,6 +659,28 @@ mod test_channel_events_serde {
}
#[test]
fn channel_events_postcard() {
let mut evs = ContainerEvents::<f32>::new();
evs.push_back(TsNano::from_ns(8), 3.0);
evs.push_back(TsNano::from_ns(12), 3.2);
let item = ChannelEvents::from(evs);
let out = postcard::to_stdvec(&item).unwrap();
trace!("serialized into {} bytes", out.len());
let item: ChannelEvents = postcard::from_bytes(&out).unwrap();
let item = if let ChannelEvents::Events(x) = item {
x
} else {
panic!()
};
let item: &ContainerEvents<f32> = item.as_any_ref().downcast_ref().unwrap();
use items_0::merge::MergeableTy;
assert_eq!(item.tss_for_testing().len(), 2);
assert_eq!(item.tss_for_testing()[1], TsNano::from_ns(12));
}
// TODO some unresolved issue with bincode
#[allow(unused)]
// #[test]
fn channel_events_bincode() {
let mut evs = ContainerEvents::<f32>::new();
evs.push_back(TsNano::from_ns(8), 3.0);
@@ -641,7 +690,7 @@ mod test_channel_events_serde {
let mut out = Vec::new();
let mut ser = bincode::Serializer::new(&mut out, opts);
item.serialize(&mut ser).unwrap();
eprintln!("serialized into {} bytes", out.len());
trace!("serialized into {} bytes", out.len());
let mut de = bincode::Deserializer::from_slice(&out, opts);
let item = <ChannelEvents as Deserialize>::deserialize(&mut de).unwrap();
let item = if let ChannelEvents::Events(x) = item {
@@ -670,7 +719,7 @@ mod test_channel_events_serde {
let mut out = Vec::new();
let mut ser = bincode::Serializer::new(&mut out, opts);
item.serialize(&mut ser).unwrap();
eprintln!("serialized into {} bytes", out.len());
trace!("serialized into {} bytes", out.len());
let mut de = bincode::Deserializer::from_slice(&out, opts);
let item = <ChannelEvents as Deserialize>::deserialize(&mut de).unwrap();
let item = if let ChannelEvents::Status(x) = item {