From 46b3d28db23ec2d1f44e06da0bda79dedf5b7b02 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Sun, 24 Nov 2024 22:32:04 +0100 Subject: [PATCH] WIP new container --- src/binning.rs | 2 +- src/binning/aggregator.rs | 129 +++++++ src/binning/aggregator/agg_bins.rs | 1 + src/binning/container_events.rs | 341 +++++++++++++++++- src/binning/test.rs | 3 +- src/binning/test/events01.rs | 44 +++ .../timeweight/timeweight_events_dyn.rs | 26 +- src/binning/valuetype.rs | 12 + src/channelevents.rs | 289 ++++++++------- src/empty.rs | 69 ++-- src/eventfull.rs | 88 +++-- src/eventsdim0.rs | 89 +---- src/eventsdim0enum.rs | 2 +- src/eventsdim1.rs | 10 +- src/frame.rs | 29 +- src/merger.rs | 218 +++++------ src/test.rs | 47 ++- src/test/eventsdim1.rs | 1 + src/testgen.rs | 10 +- src/testgen/events_gen.rs | 69 ++++ 20 files changed, 988 insertions(+), 491 deletions(-) create mode 100644 src/binning/test/events01.rs create mode 100644 src/testgen/events_gen.rs diff --git a/src/binning.rs b/src/binning.rs index 7378fbe..1d15a68 100644 --- a/src/binning.rs +++ b/src/binning.rs @@ -7,4 +7,4 @@ pub mod timeweight; pub mod valuetype; #[cfg(test)] -mod test; +pub mod test; diff --git a/src/binning/aggregator.rs b/src/binning/aggregator.rs index 705bfc3..caf8357 100644 --- a/src/binning/aggregator.rs +++ b/src/binning/aggregator.rs @@ -238,3 +238,132 @@ impl AggregatorTimeWeight for AggregatorNumeric { sum / filled_width_fraction as f64 } } + +#[derive(Debug)] +pub struct AggregatorVecNumeric { + sum: f64, +} + +macro_rules! impl_agg_tw_vec { + ($evt:ty) => { + impl AggregatorTimeWeight> for AggregatorVecNumeric { + fn new() -> Self { + Self { sum: 0. } + } + + fn ingest(&mut self, dt: DtNano, bl: DtNano, val: Vec<$evt>) { + let f = dt.ns() as f64 / bl.ns() as f64; + for e in val.iter() { + self.sum += f * (*e) as f64; + } + } + + fn reset_for_new_bin(&mut self) { + self.sum = 0.; + } + + fn result_and_reset_for_new_bin( + &mut self, + filled_width_fraction: f32, + ) -> as EventValueType>::AggTimeWeightOutputAvg { + let sum = self.sum.clone() as f32; + trace_result!( + "result_and_reset_for_new_bin sum {} {}", + sum, + filled_width_fraction + ); + self.sum = 0.; + sum / filled_width_fraction + } + } + }; +} + +impl_agg_tw_vec!(u8); +impl_agg_tw_vec!(u16); +impl_agg_tw_vec!(u32); +impl_agg_tw_vec!(u64); +impl_agg_tw_vec!(i8); +impl_agg_tw_vec!(i16); +impl_agg_tw_vec!(i32); +impl_agg_tw_vec!(i64); +impl_agg_tw_vec!(f32); +impl_agg_tw_vec!(f64); + +impl AggregatorTimeWeight> for AggregatorVecNumeric { + fn new() -> Self { + Self { sum: 0. } + } + + fn ingest(&mut self, dt: DtNano, bl: DtNano, val: Vec) { + let f = dt.ns() as f64 / bl.ns() as f64; + self.sum += f * val.len() as f64; + } + + fn reset_for_new_bin(&mut self) { + self.sum = 0.; + } + + fn result_and_reset_for_new_bin(&mut self, filled_width_fraction: f32) -> f32 { + let sum = self.sum as f32; + trace_result!( + "result_and_reset_for_new_bin sum {} {}", + sum, + filled_width_fraction + ); + self.sum = 0.; + sum / filled_width_fraction + } +} + +impl AggregatorTimeWeight> for AggregatorVecNumeric { + fn new() -> Self { + Self { sum: 0. } + } + + fn ingest(&mut self, dt: DtNano, bl: DtNano, val: Vec) { + let f = dt.ns() as f64 / bl.ns() as f64; + self.sum += f * val.len() as f64; + } + + fn reset_for_new_bin(&mut self) { + self.sum = 0.; + } + + fn result_and_reset_for_new_bin(&mut self, filled_width_fraction: f32) -> f32 { + let sum = self.sum as f32; + trace_result!( + "result_and_reset_for_new_bin sum {} {}", + sum, + filled_width_fraction + ); + self.sum = 0.; + sum / filled_width_fraction + } +} + +impl AggregatorTimeWeight> for AggregatorVecNumeric { + fn new() -> Self { + Self { sum: 0. } + } + + fn ingest(&mut self, dt: DtNano, bl: DtNano, val: Vec) { + let f = dt.ns() as f64 / bl.ns() as f64; + self.sum += f * val.len() as f64; + } + + fn reset_for_new_bin(&mut self) { + self.sum = 0.; + } + + fn result_and_reset_for_new_bin(&mut self, filled_width_fraction: f32) -> f32 { + let sum = self.sum as f32; + trace_result!( + "result_and_reset_for_new_bin sum {} {}", + sum, + filled_width_fraction + ); + self.sum = 0.; + sum / filled_width_fraction + } +} diff --git a/src/binning/aggregator/agg_bins.rs b/src/binning/aggregator/agg_bins.rs index e69de29..8b13789 100644 --- a/src/binning/aggregator/agg_bins.rs +++ b/src/binning/aggregator/agg_bins.rs @@ -0,0 +1 @@ + diff --git a/src/binning/container_events.rs b/src/binning/container_events.rs index 9b24664..9b6bf8a 100644 --- a/src/binning/container_events.rs +++ b/src/binning/container_events.rs @@ -1,31 +1,43 @@ use super::aggregator::AggTimeWeightOutputAvg; use super::aggregator::AggregatorNumeric; use super::aggregator::AggregatorTimeWeight; +use super::aggregator::AggregatorVecNumeric; use super::timeweight::timeweight_events_dyn::BinnedEventsTimeweightDynbox; +use crate::log::*; use core::fmt; +use core::ops::Range; use daqbuf_err as err; use err::thiserror; use err::ThisError; +use items_0::container::ByteEstimate; +use items_0::merge::DrainIntoDstResult; +use items_0::merge::DrainIntoNewResult; +use items_0::merge::MergeableDyn; +use items_0::merge::MergeableTy; +use items_0::subfr::SubFrId; use items_0::timebin::BinningggContainerEventsDyn; use items_0::vecpreview::PreviewRange; use items_0::vecpreview::VecPreview; +use items_0::AsAnyMut; use items_0::AsAnyRef; +use items_0::WithLen; use netpod::BinnedRange; +use netpod::EnumVariant; use netpod::TsNano; use serde::Deserialize; use serde::Serialize; use std::any; use std::collections::VecDeque; +use std::iter; -#[allow(unused)] macro_rules! trace_init { ($($arg:tt)*) => ( if true { trace!($($arg)*); }) } #[derive(Debug, ThisError)] #[cstm(name = "ValueContainerError")] pub enum ValueContainerError {} -// + Serialize + for<'a> Deserialize<'a> -pub trait Container: fmt::Debug + Send + Clone + PreviewRange +pub trait Container: + fmt::Debug + Send + Unpin + Clone + PreviewRange + Serialize + for<'a> Deserialize<'a> where EVT: EventValueType, { @@ -33,6 +45,7 @@ where fn push_back(&mut self, val: EVT); fn pop_front(&mut self) -> Option; fn get_iter_ty_1(&self, pos: usize) -> Option>; + fn iter_ty_1(&self) -> impl Iterator>; } pub trait PartialOrdEvtA { @@ -40,12 +53,13 @@ pub trait PartialOrdEvtA { } pub trait EventValueType: - fmt::Debug + Clone + PartialOrd + Send + 'static + Serialize + for<'a> Deserialize<'a> + fmt::Debug + Clone + PartialOrd + Send + Unpin + 'static + Serialize + for<'a> Deserialize<'a> { type Container: Container; type AggregatorTimeWeight: AggregatorTimeWeight; type AggTimeWeightOutputAvg: AggTimeWeightOutputAvg; type IterTy1<'a>: fmt::Debug + Clone + PartialOrdEvtA + Into; + const SERDE_ID: u32; } impl Container for VecDeque @@ -53,6 +67,7 @@ where EVT: for<'a> EventValueType = EVT> + Serialize + for<'a> Deserialize<'a>, { fn new() -> Self { + trace_init!("{} as trait Container ::new", std::any::type_name::()); VecDeque::new() } @@ -67,6 +82,10 @@ where fn get_iter_ty_1(&self, pos: usize) -> Option> { self.get(pos).map(|x| x.clone()) } + + fn iter_ty_1(&self) -> impl Iterator::IterTy1<'_>> { + self.iter().map(|x| x.clone()) + } } impl Container for VecDeque { @@ -85,6 +104,10 @@ impl Container for VecDeque { fn get_iter_ty_1(&self, pos: usize) -> Option<&str> { self.get(pos).map(|x| x.as_str()) } + + fn iter_ty_1(&self) -> impl Iterator::IterTy1<'_>> { + self.iter().map(|x| x.as_str()) + } } macro_rules! impl_event_value_type { @@ -94,6 +117,7 @@ macro_rules! impl_event_value_type { type AggregatorTimeWeight = AggregatorNumeric; type AggTimeWeightOutputAvg = f64; type IterTy1<'a> = $evt; + const SERDE_ID: u32 = <$evt as SubFrId>::SUB; } impl PartialOrdEvtA<$evt> for $evt { @@ -144,6 +168,7 @@ impl EventValueType for f32 { type AggregatorTimeWeight = AggregatorNumeric; type AggTimeWeightOutputAvg = f32; type IterTy1<'a> = f32; + const SERDE_ID: u32 = ::SUB; } impl EventValueType for f64 { @@ -151,6 +176,7 @@ impl EventValueType for f64 { type AggregatorTimeWeight = AggregatorNumeric; type AggTimeWeightOutputAvg = f64; type IterTy1<'a> = f64; + const SERDE_ID: u32 = ::SUB; } impl EventValueType for bool { @@ -158,6 +184,7 @@ impl EventValueType for bool { type AggregatorTimeWeight = AggregatorNumeric; type AggTimeWeightOutputAvg = f64; type IterTy1<'a> = bool; + const SERDE_ID: u32 = ::SUB; } impl EventValueType for String { @@ -165,8 +192,41 @@ impl EventValueType for String { type AggregatorTimeWeight = AggregatorNumeric; type AggTimeWeightOutputAvg = f64; type IterTy1<'a> = &'a str; + const SERDE_ID: u32 = ::SUB; } +macro_rules! impl_event_value_type_vec { + ($evt:ty) => { + impl EventValueType for Vec<$evt> { + type Container = VecDeque; + type AggregatorTimeWeight = AggregatorVecNumeric; + type AggTimeWeightOutputAvg = f32; + type IterTy1<'a> = Vec<$evt>; + const SERDE_ID: u32 = as SubFrId>::SUB; + } + + impl PartialOrdEvtA> for Vec<$evt> { + fn cmp_a(&self, other: &Vec<$evt>) -> Option { + self.partial_cmp(other) + } + } + }; +} + +impl_event_value_type_vec!(u8); +impl_event_value_type_vec!(u16); +impl_event_value_type_vec!(u32); +impl_event_value_type_vec!(u64); +impl_event_value_type_vec!(i8); +impl_event_value_type_vec!(i16); +impl_event_value_type_vec!(i32); +impl_event_value_type_vec!(i64); +impl_event_value_type_vec!(f32); +impl_event_value_type_vec!(f64); +impl_event_value_type_vec!(bool); +impl_event_value_type_vec!(String); +impl_event_value_type_vec!(EnumVariant); + #[derive(Debug, Clone)] pub struct EventSingleRef<'a, EVT> where @@ -221,15 +281,21 @@ where { tss: VecDeque, vals: ::Container, + byte_estimate: u64, } mod container_events_serde { use super::ContainerEvents; use super::EventValueType; + use serde::de::MapAccess; + use serde::de::Visitor; + use serde::ser::SerializeStruct; use serde::Deserialize; use serde::Deserializer; use serde::Serialize; use serde::Serializer; + use std::fmt; + use std::marker::PhantomData; impl Serialize for ContainerEvents where @@ -239,7 +305,54 @@ mod container_events_serde { where S: Serializer, { - todo!() + let stname = std::any::type_name::(); + let mut st = ser.serialize_struct(stname, 2)?; + st.serialize_field("tss", &self.tss)?; + st.serialize_field("vals", &self.vals)?; + st.end() + } + } + + struct Vis { + _t1: PhantomData, + } + + impl<'de, EVT> Visitor<'de> for Vis + where + EVT: EventValueType, + { + type Value = ContainerEvents; + + fn expecting(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.write_str("a struct with fields tss and vals") + } + + fn visit_map(self, mut map: M) -> Result + where + M: MapAccess<'de>, + { + let mut tss = None; + let mut vals = None; + while let Some(key) = map.next_key::<&str>()? { + match key { + "tss" => { + tss = Some(map.next_value()?); + } + "vals" => { + vals = Some(map.next_value()?); + } + _ => { + use serde::de::Error; + return Err(Error::unknown_field(key, &["tss", "vals"])); + } + } + } + let ret = Self::Value { + tss: tss.unwrap(), + vals: vals.unwrap(), + byte_estimate: 0, + }; + Ok(ret) } } @@ -251,7 +364,8 @@ mod container_events_serde { where D: Deserializer<'de>, { - todo!() + let stname = std::any::type_name::(); + de.deserialize_struct(stname, &["tss", "vals"], Vis { _t1: PhantomData }) } } } @@ -264,7 +378,11 @@ where tss: VecDeque, vals: ::Container, ) -> Self { - Self { tss, vals } + Self { + tss, + vals, + byte_estimate: 0, + } } pub fn type_name() -> &'static str { @@ -275,6 +393,7 @@ where Self { tss: VecDeque::new(), vals: Container::new(), + byte_estimate: 0, } } @@ -298,6 +417,10 @@ where self.tss.push_back(ts); self.vals.push_back(val); } + + pub fn iter_zip<'a>(&'a self) -> impl Iterator)> { + self.tss.iter().zip(self.vals.iter_ty_1()) + } } impl fmt::Debug for ContainerEvents @@ -325,6 +448,33 @@ where } } +impl AsAnyMut for ContainerEvents +where + EVT: EventValueType, +{ + fn as_any_mut(&mut self) -> &mut dyn any::Any { + self + } +} + +impl WithLen for ContainerEvents +where + EVT: EventValueType, +{ + fn len(&self) -> usize { + self.len() + } +} + +impl ByteEstimate for ContainerEvents +where + EVT: EventValueType, +{ + fn byte_estimate(&self) -> u64 { + self.byte_estimate + } +} + pub struct ContainerEventsTakeUpTo<'a, EVT> where EVT: EventValueType, @@ -391,6 +541,80 @@ where } } +impl MergeableTy for ContainerEvents +where + EVT: EventValueType, +{ + fn ts_min(&self) -> Option { + todo!() + } + + fn ts_max(&self) -> Option { + todo!() + } + + fn drain_into(&mut self, dst: &mut Self, range: Range) -> DrainIntoDstResult { + todo!() + } + + fn drain_into_new(&mut self, range: Range) -> DrainIntoNewResult { + todo!() + } + + fn find_lowest_index_gt(&self, ts: TsNano) -> Option { + todo!() + } + + fn find_lowest_index_ge(&self, ts: TsNano) -> Option { + todo!() + } + + fn find_highest_index_lt(&self, ts: TsNano) -> Option { + todo!() + } + + fn tss_for_testing(&self) -> Vec { + todo!() + } +} + +impl MergeableDyn for ContainerEvents +where + EVT: EventValueType, +{ + fn ts_min(&self) -> Option { + todo!() + } + + fn ts_max(&self) -> Option { + todo!() + } + + fn find_lowest_index_gt(&self, ts: TsNano) -> Option { + todo!() + } + + fn find_lowest_index_ge(&self, ts: TsNano) -> Option { + todo!() + } + + fn find_highest_index_lt(&self, ts: TsNano) -> Option { + todo!() + } + + fn tss_for_testing(&self) -> Vec { + todo!() + } + + fn drain_into( + &mut self, + dst: &mut dyn MergeableDyn, + range: Range, + ) -> DrainIntoDstResult { + todo!() + } +} + impl BinningggContainerEventsDyn for ContainerEvents where EVT: EventValueType, @@ -410,4 +634,107 @@ where let ret = core::mem::replace(self, Self::new()); Box::new(ret) } + + fn clone_dyn(&self) -> Box { + Box::new(self.clone()) + } + + fn serde_id(&self) -> u32 { + items_0::streamitem::CONTAINER_EVENTS_TYPE_ID + } + + fn nty_id(&self) -> u32 { + EVT::SERDE_ID + } + + fn eq(&self, rhs: &dyn BinningggContainerEventsDyn) -> bool { + if let Some(rhs) = rhs.as_any_ref().downcast_ref::() { + self.eq(rhs) + } else { + false + } + } +} + +#[cfg(test)] +mod test_frame { + use super::*; + use crate::channelevents::ChannelEvents; + use crate::framable::Framable; + use crate::framable::INMEM_FRAME_ENCID; + use crate::frame::decode_frame; + use crate::inmem::InMemoryFrame; + use items_0::streamitem::RangeCompletableItem; + use items_0::streamitem::Sitemty; + use items_0::streamitem::StreamItem; + use netpod::TsMs; + + #[test] + fn events_serialize() { + let mut evs = ContainerEvents::new(); + evs.push_back(TsNano::from_ns(123), 55f32); + let item = ChannelEvents::from(evs); + let item: Sitemty<_> = Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))); + let mut buf = item.make_frame_dyn().unwrap(); + let s = String::from_utf8_lossy(&buf[20..buf.len() - 4]); + eprintln!("[[{s}]]"); + let buflen = buf.len(); + let frame = InMemoryFrame { + encid: INMEM_FRAME_ENCID, + tyid: 0x2500, + len: (buflen - 24) as _, + buf: buf.split_off(20).split_to(buflen - 20 - 4).freeze(), + }; + let item: Sitemty = decode_frame(&frame).unwrap(); + let item = if let Ok(x) = item { x } else { panic!() }; + let item = if let StreamItem::DataItem(x) = item { + x + } else { + panic!() + }; + let item = if let RangeCompletableItem::Data(x) = item { + x + } else { + panic!() + }; + let item = if let ChannelEvents::Events(x) = item { + x + } else { + panic!() + }; + let item = if let Some(item) = item.as_any_ref().downcast_ref::>() { + item + } else { + panic!() + }; + assert_eq!( + MergeableTy::tss_for_testing(item), + &[TsMs::from_ms_u64(123)] + ); + } +} + +#[cfg(test)] +mod test_serde_opt { + use super::*; + + #[derive(Serialize)] + struct A { + a: Option, + #[serde(default)] + b: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + c: Option, + } + + #[test] + fn test_a() { + let s = serde_json::to_string(&A { + a: None, + b: None, + c: None, + }) + .unwrap(); + assert_eq!(s, r#"{"a":null,"b":null}"#); + } } diff --git a/src/binning/test.rs b/src/binning/test.rs index cb9d88a..d108a5a 100644 --- a/src/binning/test.rs +++ b/src/binning/test.rs @@ -1,7 +1,8 @@ mod bins00; -mod bins_gen; +pub mod bins_gen; mod compare; mod events00; +mod events01; use super::container_events::ContainerEvents; use std::any; diff --git a/src/binning/test/events01.rs b/src/binning/test/events01.rs new file mode 100644 index 0000000..da2b175 --- /dev/null +++ b/src/binning/test/events01.rs @@ -0,0 +1,44 @@ +use super::compare::exp_avgs; +use super::compare::exp_cnts; +use super::compare::exp_maxs; +use super::compare::exp_mins; +use crate::binning::container_bins::ContainerBins; +use crate::binning::container_events::ContainerEvents; +use crate::binning::timeweight::timeweight_events::BinnedEventsTimeweight; +use crate::testgen::events_gen::new_events_gen_dim1_f32_v00; +use futures_util::StreamExt; +use netpod::log::*; +use netpod::range::evrange::NanoRange; +use netpod::BinnedRange; +use netpod::DtMs; +use netpod::EnumVariant; +use netpod::TsNano; +use std::task::Context; + +#[derive(Debug, thiserror::Error)] +#[cstm(name = "Error")] +enum Error { + Timeweight(#[from] crate::binning::timeweight::timeweight_events::Error), + AssertMsg(String), + Compare(#[from] super::compare::Error), +} + +#[test] +fn test_bin_events_dim1_f32_00() -> Result<(), Error> { + let beg = TsNano::from_ms(110); + let end = TsNano::from_ms(120); + let nano_range = NanoRange { + beg: beg.ns(), + end: end.ns(), + }; + let range = BinnedRange::from_nano_range(nano_range, DtMs::from_ms_u64(10)); + let mut inp = new_events_gen_dim1_f32_v00(range.full_range()); + let mut binner = BinnedEventsTimeweight::new(range); + while let Some(evs) = inp.next() { + binner.ingest(&evs)?; + } + binner.input_done_range_final()?; + let bins = binner.output(); + eprintln!("{:?}", bins); + Ok(()) +} diff --git a/src/binning/timeweight/timeweight_events_dyn.rs b/src/binning/timeweight/timeweight_events_dyn.rs index a2434f6..4ca3309 100644 --- a/src/binning/timeweight/timeweight_events_dyn.rs +++ b/src/binning/timeweight/timeweight_events_dyn.rs @@ -189,24 +189,18 @@ impl BinnedEventsTimeweightStream { Ok(x) => match x { DataItem(x) => match x { Data(x) => match x { - ChannelEvents::Events(evs) => match self - .binned_events - .ingest(&evs.to_container_events()) - { - Ok(()) => { - match self.binned_events.output() { - Ok(Some(x)) => { - if x.len() == 0 { - Continue(()) - } else { - Break(Ready(Some(Ok(DataItem(Data(x)))))) - } + ChannelEvents::Events(evs) => match self.binned_events.ingest(&evs) { + Ok(()) => match self.binned_events.output() { + Ok(Some(x)) => { + if x.len() == 0 { + Continue(()) + } else { + Break(Ready(Some(Ok(DataItem(Data(x)))))) } - Ok(None) => Continue(()), - Err(e) => Break(Ready(Some(Err(err::Error::from_string(e))))), } - // Continue(()) - } + Ok(None) => Continue(()), + Err(e) => Break(Ready(Some(Err(err::Error::from_string(e))))), + }, Err(e) => Break(Ready(Some(Err(err::Error::from_string(e))))), }, ChannelEvents::Status(_) => { diff --git a/src/binning/valuetype.rs b/src/binning/valuetype.rs index 263e90a..a8de496 100644 --- a/src/binning/valuetype.rs +++ b/src/binning/valuetype.rs @@ -3,6 +3,7 @@ use super::container_events::Container; use super::container_events::EventValueType; use super::container_events::PartialOrdEvtA; use core::fmt; +use items_0::subfr::SubFrId; use items_0::vecpreview::PreviewRange; use netpod::DtNano; use netpod::EnumVariant; @@ -60,6 +61,16 @@ impl Container for EnumVariantContainer { None } } + + fn iter_ty_1(&self) -> impl Iterator::IterTy1<'_>> { + self.ixs + .iter() + .zip(self.names.iter()) + .map(|x| EnumVariantRef { + ix: *x.0, + name: x.1.as_str(), + }) + } } #[derive(Debug)] @@ -114,4 +125,5 @@ impl EventValueType for EnumVariant { type AggregatorTimeWeight = EnumVariantAggregatorTimeWeight; type AggTimeWeightOutputAvg = f32; type IterTy1<'a> = EnumVariantRef<'a>; + const SERDE_ID: u32 = Self::SUB; } diff --git a/src/channelevents.rs b/src/channelevents.rs index 7eb6857..80e779c 100644 --- a/src/channelevents.rs +++ b/src/channelevents.rs @@ -1,6 +1,8 @@ +use crate::binning::container_events::ContainerEvents; +use crate::binning::container_events::EventValueType; use crate::framable::FrameType; -use crate::merger::Mergeable; use crate::Events; +use core::ops::Range; use daqbuf_err as err; use items_0::collect_s::CollectableDyn; use items_0::collect_s::CollectedDyn; @@ -8,18 +10,23 @@ use items_0::collect_s::CollectorDyn; use items_0::container::ByteEstimate; use items_0::framable::FrameTypeInnerStatic; use items_0::isodate::IsoDateTime; +use items_0::merge::DrainIntoDstResult; +use items_0::merge::DrainIntoNewResult; +use items_0::merge::MergeableDyn; +use items_0::merge::MergeableTy; use items_0::streamitem::ITEMS_2_CHANNEL_EVENTS_FRAME_TYPE_ID; +use items_0::timebin::BinningggContainerEventsDyn; use items_0::AsAnyMut; use items_0::AsAnyRef; use items_0::Empty; use items_0::EventsNonObj; use items_0::Extendable; -use items_0::MergeError; use items_0::TypeName; use items_0::WithLen; use netpod::log::*; use netpod::range::evrange::SeriesRange; use netpod::BinnedRangeEnum; +use netpod::TsNano; use serde::Deserialize; use serde::Serialize; use std::any; @@ -46,7 +53,7 @@ impl ConnStatus { #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] pub struct ConnStatusEvent { - pub ts: u64, + pub ts: TsNano, #[serde(with = "humantime_serde")] //pub datetime: chrono::DateTime, pub datetime: SystemTime, @@ -54,8 +61,8 @@ pub struct ConnStatusEvent { } impl ConnStatusEvent { - pub fn new(ts: u64, status: ConnStatus) -> Self { - let datetime = SystemTime::UNIX_EPOCH + Duration::from_millis(ts / 1000000); + pub fn new(ts: TsNano, status: ConnStatus) -> Self { + let datetime = SystemTime::UNIX_EPOCH + Duration::from_millis(ts.ms()); Self { ts, datetime, @@ -148,11 +155,9 @@ impl ByteEstimate for ChannelStatusEvent { } } -/// Events on a channel consist not only of e.g. timestamped values, but can be also -/// connection status changes. #[derive(Debug)] pub enum ChannelEvents { - Events(Box), + Events(Box), Status(Option), } @@ -165,6 +170,15 @@ impl ChannelEvents { } } +impl From> for ChannelEvents +where + EVT: EventValueType, +{ + fn from(value: ContainerEvents) -> Self { + Self::Events(Box::new(value)) + } +} + impl TypeName for ChannelEvents { fn type_name(&self) -> String { any::type_name::().into() @@ -206,10 +220,12 @@ impl AsAnyMut for ChannelEvents { mod serde_channel_events { use super::ChannelEvents; use super::Events; + use crate::binning::container_events::ContainerEvents; use crate::channelevents::ConnStatusEvent; use crate::eventsdim0::EventsDim0; use crate::eventsdim1::EventsDim1; use items_0::subfr::SubFrId; + use items_0::timebin::BinningggContainerEventsDyn; use netpod::log::*; use netpod::EnumVariant; use serde::de; @@ -223,19 +239,64 @@ mod serde_channel_events { use serde::Serializer; use std::fmt; - struct EvRef<'a>(&'a dyn Events); + fn try_serialize( + v: &dyn BinningggContainerEventsDyn, + ser: &mut ::SerializeSeq, + ) -> Result<(), ::Error> + where + T: Serialize + 'static, + S: Serializer, + { + if let Some(x) = v.as_any_ref().downcast_ref::() { + ser.serialize_element(x)?; + Ok(()) + } else { + let s = std::any::type_name::(); + Err(serde::ser::Error::custom(format!("expect a {}", s))) + } + } + + struct EvRef<'a>(&'a dyn BinningggContainerEventsDyn); struct EvBox(Box); impl<'a> Serialize for EvRef<'a> { - fn serialize(&self, serializer: S) -> Result + fn serialize(&self, ser: S) -> Result where S: Serializer, { - let mut ser = serializer.serialize_seq(Some(3))?; - ser.serialize_element(self.0.serde_id())?; + let mut ser = ser.serialize_seq(Some(3))?; + ser.serialize_element(&self.0.serde_id())?; ser.serialize_element(&self.0.nty_id())?; - ser.serialize_element(self.0)?; + use items_0::streamitem::CONTAINER_EVENTS_TYPE_ID; + type C1 = ContainerEvents; + match self.0.serde_id() { + CONTAINER_EVENTS_TYPE_ID => match self.0.nty_id() { + u8::SUB => try_serialize::>(self.0, &mut ser)?, + u16::SUB => try_serialize::>(self.0, &mut ser)?, + u32::SUB => try_serialize::>(self.0, &mut ser)?, + u64::SUB => try_serialize::>(self.0, &mut ser)?, + i8::SUB => try_serialize::>(self.0, &mut ser)?, + i16::SUB => try_serialize::>(self.0, &mut ser)?, + i32::SUB => try_serialize::>(self.0, &mut ser)?, + i64::SUB => try_serialize::>(self.0, &mut ser)?, + f32::SUB => try_serialize::>(self.0, &mut ser)?, + f64::SUB => try_serialize::>(self.0, &mut ser)?, + bool::SUB => try_serialize::>(self.0, &mut ser)?, + String::SUB => try_serialize::>(self.0, &mut ser)?, + EnumVariant::SUB => try_serialize::>(self.0, &mut ser)?, + // + Vec::::SUB => try_serialize::>>(self.0, &mut ser)?, + _ => { + let msg = format!("not supported evt id {}", self.0.nty_id()); + return Err(serde::ser::Error::custom(msg)); + } + }, + _ => { + let msg = format!("not supported obj id {}", self.0.serde_id()); + return Err(serde::ser::Error::custom(msg)); + } + } ser.end() } } @@ -545,7 +606,9 @@ mod serde_channel_events { match id { VarId::Events => { let x: EvBox = var.newtype_variant()?; - Ok(Self::Value::Events(x.0)) + let _ = x; + // Ok(Self::Value::Events(x.0)); + todo!() } VarId::Status => { let x: Option = var.newtype_variant()?; @@ -572,6 +635,7 @@ mod serde_channel_events { #[cfg(test)] mod test_channel_events_serde { use super::ChannelEvents; + use crate::binning::container_events::ContainerEvents; use crate::channelevents::ConnStatusEvent; use crate::eventsdim0::EventsDim0; use bincode::config::FixintEncoding; @@ -584,16 +648,17 @@ mod test_channel_events_serde { use items_0::bincode; use items_0::Appendable; use items_0::Empty; + use netpod::TsNano; use serde::Deserialize; use serde::Serialize; use std::time::SystemTime; #[test] fn channel_events() { - let mut evs = EventsDim0::empty(); - evs.push(8, 2, 3.0f32); - evs.push(12, 3, 3.2f32); - let item = ChannelEvents::Events(Box::new(evs)); + let mut evs = ContainerEvents::new(); + evs.push_back(TsNano::from_ns(8), 3.0f32); + evs.push_back(TsNano::from_ns(12), 3.2f32); + let item = ChannelEvents::from(evs); let s = serde_json::to_string_pretty(&item).unwrap(); eprintln!("{s}"); let w: ChannelEvents = serde_json::from_str(&s).unwrap(); @@ -616,10 +681,10 @@ mod test_channel_events_serde { #[test] fn channel_events_bincode() { - let mut evs = EventsDim0::empty(); - evs.push(8, 2, 3.0f32); - evs.push(12, 3, 3.2f32); - let item = ChannelEvents::Events(Box::new(evs)); + let mut evs = ContainerEvents::new(); + evs.push_back(TsNano::from_ns(8), 3.0f32); + evs.push_back(TsNano::from_ns(12), 3.2f32); + let item = ChannelEvents::from(evs); let opts = bincode_opts(); let mut out = Vec::new(); let mut ser = bincode::Serializer::new(&mut out, opts); @@ -639,11 +704,11 @@ mod test_channel_events_serde { #[test] fn channel_status_bincode() { - let mut evs = EventsDim0::empty(); - evs.push(8, 2, 3.0f32); - evs.push(12, 3, 3.2f32); + let mut evs = ContainerEvents::new(); + evs.push_back(TsNano::from_ns(8), 3.0f32); + evs.push_back(TsNano::from_ns(12), 3.2f32); let status = ConnStatusEvent { - ts: 567, + ts: TsNano::from_ns(567), datetime: SystemTime::UNIX_EPOCH, status: crate::channelevents::ConnStatus::Connect, }; @@ -661,7 +726,7 @@ mod test_channel_events_serde { panic!() }; if let Some(item) = item { - assert_eq!(item.ts, 567); + assert_eq!(item.ts, TsNano::from_ns(567)); } else { panic!() } @@ -671,7 +736,7 @@ mod test_channel_events_serde { impl PartialEq for ChannelEvents { fn eq(&self, other: &Self) -> bool { match (self, other) { - (Self::Events(l0), Self::Events(r0)) => l0 == r0, + (Self::Events(l0), Self::Events(r0)) => l0.eq(r0.as_ref()), (Self::Status(l0), Self::Status(r0)) => l0 == r0, _ => core::mem::discriminant(self) == core::mem::discriminant(other), } @@ -702,10 +767,10 @@ impl ByteEstimate for ChannelEvents { } } -impl Mergeable for ChannelEvents { - fn ts_min(&self) -> Option { +impl MergeableTy for ChannelEvents { + fn ts_min(&self) -> Option { match self { - ChannelEvents::Events(k) => Mergeable::ts_min(k), + ChannelEvents::Events(k) => k.ts_min(), ChannelEvents::Status(k) => match k { Some(k) => Some(k.ts), None => None, @@ -713,9 +778,9 @@ impl Mergeable for ChannelEvents { } } - fn ts_max(&self) -> Option { + fn ts_max(&self) -> Option { match self { - ChannelEvents::Events(k) => Mergeable::ts_max(k), + ChannelEvents::Events(k) => k.ts_max(), ChannelEvents::Status(k) => match k { Some(k) => Some(k.ts), None => None, @@ -723,56 +788,43 @@ impl Mergeable for ChannelEvents { } } - fn new_empty(&self) -> Self { - match self { - ChannelEvents::Events(k) => ChannelEvents::Events(k.new_empty()), - ChannelEvents::Status(_) => ChannelEvents::Status(None), - } - } - - fn clear(&mut self) { - match self { - ChannelEvents::Events(x) => { - Mergeable::clear(x); - } - ChannelEvents::Status(x) => { - *x = None; - } - } - } - - fn drain_into(&mut self, dst: &mut Self, range: (usize, usize)) -> Result<(), MergeError> { + fn drain_into(&mut self, dst: &mut Self, range: Range) -> DrainIntoDstResult { match self { ChannelEvents::Events(k) => match dst { - ChannelEvents::Events(j) => k.drain_into(j, range), - ChannelEvents::Status(_) => Err(MergeError::NotCompatible), + ChannelEvents::Events(j) => { + // + // k.drain_into(j, range) + todo!() + } + ChannelEvents::Status(_) => DrainIntoDstResult::NotCompatible, }, ChannelEvents::Status(k) => match dst { - ChannelEvents::Events(_) => Err(MergeError::NotCompatible), + ChannelEvents::Events(_) => DrainIntoDstResult::NotCompatible, ChannelEvents::Status(j) => match j { - Some(_) => { - trace!("drain_into merger::MergeError::Full"); - Err(MergeError::Full) - } + Some(_) => DrainIntoDstResult::Partial, None => { - if range.0 > 0 { - trace!("weird range {range:?}"); - } - if range.1 > 1 { - trace!("weird range {range:?}"); - } - if range.0 == range.1 { + if range.len() != 1 { trace!("try to add empty range to status container {range:?}"); } + if range.start != 0 { + trace!("weird range {range:?}"); + } + if range.end > 1 { + trace!("weird range {range:?}"); + } *j = k.take(); - Ok(()) + DrainIntoDstResult::Done } }, }, } } - fn find_lowest_index_gt(&self, ts: u64) -> Option { + fn drain_into_new(&mut self, range: Range) -> DrainIntoNewResult { + todo!() + } + + fn find_lowest_index_gt(&self, ts: TsNano) -> Option { match self { ChannelEvents::Events(k) => k.find_lowest_index_gt(ts), ChannelEvents::Status(k) => { @@ -789,7 +841,7 @@ impl Mergeable for ChannelEvents { } } - fn find_lowest_index_ge(&self, ts: u64) -> Option { + fn find_lowest_index_ge(&self, ts: TsNano) -> Option { match self { ChannelEvents::Events(k) => k.find_lowest_index_ge(ts), ChannelEvents::Status(k) => { @@ -806,7 +858,7 @@ impl Mergeable for ChannelEvents { } } - fn find_highest_index_lt(&self, ts: u64) -> Option { + fn find_highest_index_lt(&self, ts: TsNano) -> Option { match self { ChannelEvents::Events(k) => k.find_highest_index_lt(ts), ChannelEvents::Status(k) => { @@ -823,7 +875,7 @@ impl Mergeable for ChannelEvents { } } - fn tss(&self) -> Vec { + fn tss_for_testing(&self) -> Vec { Events::tss(self) .iter() .map(|x| netpod::TsMs::from_ns_u64(*x)) @@ -833,19 +885,13 @@ impl Mergeable for ChannelEvents { impl EventsNonObj for ChannelEvents { fn into_tss_pulses(self: Box) -> (VecDeque, VecDeque) { - match *self { - ChannelEvents::Events(k) => k.into_tss_pulses(), - ChannelEvents::Status(_) => (VecDeque::new(), VecDeque::new()), - } + todo!() } } impl Events for ChannelEvents { fn verify(&self) -> bool { - match self { - ChannelEvents::Events(x) => Events::verify(x), - ChannelEvents::Status(_) => panic!(), - } + todo!() } fn output_info(&self) -> String { @@ -877,30 +923,15 @@ impl Events for ChannelEvents { } fn new_empty_evs(&self) -> Box { - match self { - ChannelEvents::Events(x) => Events::new_empty_evs(x), - ChannelEvents::Status(_) => panic!(), - } + todo!() } fn drain_into_evs( &mut self, dst: &mut dyn Events, range: (usize, usize), - ) -> Result<(), MergeError> { - let dst2 = if let Some(x) = dst.as_any_mut().downcast_mut::() { - // debug!("unwrapped dst ChannelEvents as well"); - x - } else { - panic!("dst is not ChannelEvents"); - }; - match self { - ChannelEvents::Events(k) => match dst2 { - ChannelEvents::Events(j) => Events::drain_into_evs(k, j, range), - ChannelEvents::Status(_) => panic!("dst is not events"), - }, - ChannelEvents::Status(_) => panic!("self is not events"), - } + ) -> Result<(), err::Error> { + todo!() } fn find_lowest_index_gt_evs(&self, _ts: u64) -> Option { @@ -932,10 +963,7 @@ impl Events for ChannelEvents { } fn tss(&self) -> &VecDeque { - match self { - ChannelEvents::Events(x) => Events::tss(x), - ChannelEvents::Status(_) => panic!(), - } + todo!() } fn pulses(&self) -> &VecDeque { @@ -943,63 +971,31 @@ impl Events for ChannelEvents { } fn frame_type_id(&self) -> u32 { - ::FRAME_TYPE_ID + todo!() } fn to_min_max_avg(&mut self) -> Box { - match self { - ChannelEvents::Events(item) => { - Box::new(ChannelEvents::Events(Events::to_min_max_avg(item))) - } - ChannelEvents::Status(item) => Box::new(ChannelEvents::Status(item.take())), - } + todo!() } fn to_json_string(&self) -> String { - match self { - ChannelEvents::Events(item) => item.to_json_string(), - ChannelEvents::Status(_item) => { - error!("TODO convert status to json"); - String::new() - } - } + todo!() } fn to_json_vec_u8(&self) -> Vec { - match self { - ChannelEvents::Events(item) => item.to_json_vec_u8(), - ChannelEvents::Status(_item) => { - error!("TODO convert status to json"); - Vec::new() - } - } + todo!() } fn to_cbor_vec_u8(&self) -> Vec { - match self { - ChannelEvents::Events(item) => item.to_cbor_vec_u8(), - ChannelEvents::Status(_item) => { - error!("TODO convert status to cbor"); - Vec::new() - } - } + todo!() } fn clear(&mut self) { - match self { - ChannelEvents::Events(x) => Events::clear(x.as_mut()), - ChannelEvents::Status(x) => { - *x = None; - } - } + todo!() } fn to_dim0_f32_for_binning(&self) -> Box { - use ChannelEvents::*; - match self { - Events(x) => x.to_dim0_f32_for_binning(), - Status(_x) => panic!("ChannelEvents::to_dim0_f32_for_binning"), - } + todo!() } fn to_container_events(&self) -> Box { @@ -1094,12 +1090,13 @@ impl CollectorDyn for ChannelEventsCollector { if let Some(item) = item.as_any_mut().downcast_mut::() { match item { ChannelEvents::Events(item) => { - let coll = self.coll.get_or_insert_with(|| { - item.as_ref() - .as_collectable_with_default_ref() - .new_collector() - }); - coll.ingest(item.as_collectable_with_default_mut()); + // let coll = self.coll.get_or_insert_with(|| { + // item.as_ref() + // .as_collectable_with_default_ref() + // .new_collector() + // }); + // coll.ingest(item.as_collectable_with_default_mut()); + todo!() } ChannelEvents::Status(_) => { // TODO decide on output format to collect also the connection status events diff --git a/src/empty.rs b/src/empty.rs index dc2e6b2..af35c27 100644 --- a/src/empty.rs +++ b/src/empty.rs @@ -1,52 +1,53 @@ -use crate::eventsdim0::EventsDim0; -use crate::eventsdim1::EventsDim1; +use crate::binning::container_events::ContainerEvents; use crate::Error; use daqbuf_err as err; -use items_0::Empty; -use items_0::Events; +use items_0::timebin::BinningggContainerEventsDyn; use netpod::log::*; use netpod::EnumVariant; use netpod::ScalarType; use netpod::Shape; -pub fn empty_events_dyn_ev(scalar_type: &ScalarType, shape: &Shape) -> Result, Error> { - let ret: Box = match shape { +pub fn empty_events_dyn_ev( + scalar_type: &ScalarType, + shape: &Shape, +) -> Result, Error> { + let ret: Box = match shape { Shape::Scalar => { use ScalarType::*; - type K = EventsDim0; + type K = ContainerEvents; match scalar_type { - U8 => Box::new(K::::empty()), - U16 => Box::new(K::::empty()), - U32 => Box::new(K::::empty()), - U64 => Box::new(K::::empty()), - I8 => Box::new(K::::empty()), - I16 => Box::new(K::::empty()), - I32 => Box::new(K::::empty()), - I64 => Box::new(K::::empty()), - F32 => Box::new(K::::empty()), - F64 => Box::new(K::::empty()), - BOOL => Box::new(K::::empty()), - STRING => Box::new(K::::empty()), - Enum => Box::new(K::::empty()), + U8 => Box::new(K::::new()), + U16 => Box::new(K::::new()), + U32 => Box::new(K::::new()), + U64 => Box::new(K::::new()), + I8 => Box::new(K::::new()), + I16 => Box::new(K::::new()), + I32 => Box::new(K::::new()), + I64 => Box::new(K::::new()), + F32 => Box::new(K::::new()), + F64 => Box::new(K::::new()), + BOOL => Box::new(K::::new()), + STRING => Box::new(K::::new()), + Enum => Box::new(K::::new()), } } Shape::Wave(..) => { use ScalarType::*; - type K = EventsDim1; + type K = ContainerEvents>; match scalar_type { - U8 => Box::new(K::::empty()), - U16 => Box::new(K::::empty()), - U32 => Box::new(K::::empty()), - U64 => Box::new(K::::empty()), - I8 => Box::new(K::::empty()), - I16 => Box::new(K::::empty()), - I32 => Box::new(K::::empty()), - I64 => Box::new(K::::empty()), - F32 => Box::new(K::::empty()), - F64 => Box::new(K::::empty()), - BOOL => Box::new(K::::empty()), - STRING => Box::new(K::::empty()), - Enum => Box::new(K::::empty()), + U8 => Box::new(K::::new()), + U16 => Box::new(K::::new()), + U32 => Box::new(K::::new()), + U64 => Box::new(K::::new()), + I8 => Box::new(K::::new()), + I16 => Box::new(K::::new()), + I32 => Box::new(K::::new()), + I64 => Box::new(K::::new()), + F32 => Box::new(K::::new()), + F64 => Box::new(K::::new()), + BOOL => Box::new(K::::new()), + STRING => Box::new(K::::new()), + Enum => Box::new(K::::new()), } } Shape::Image(..) => { diff --git a/src/eventfull.rs b/src/eventfull.rs index b686933..a867ada 100644 --- a/src/eventfull.rs +++ b/src/eventfull.rs @@ -1,19 +1,22 @@ use crate::framable::FrameType; -use crate::merger::Mergeable; use bytes::BytesMut; +use core::ops::Range; use daqbuf_err as err; use err::thiserror; use err::ThisError; use items_0::container::ByteEstimate; use items_0::framable::FrameTypeInnerStatic; +use items_0::merge::DrainIntoDstResult; +use items_0::merge::DrainIntoNewResult; +use items_0::merge::MergeableTy; use items_0::streamitem::EVENT_FULL_FRAME_TYPE_ID; use items_0::Empty; -use items_0::MergeError; use items_0::WithLen; #[allow(unused)] use netpod::log::*; use netpod::ScalarType; use netpod::Shape; +use netpod::TsNano; use parse::channelconfig::CompressionMethod; use serde::Deserialize; use serde::Deserializer; @@ -23,11 +26,7 @@ use std::borrow::Cow; use std::collections::VecDeque; use std::time::Instant; -#[allow(unused)] -macro_rules! trace2 { - ($($arg:tt)*) => {}; - ($($arg:tt)*) => { trace!($($arg)*) }; -} +macro_rules! trace2 { ($($arg:tt)*) => ( trace!($($arg)*); ) } #[derive(Debug, Serialize, Deserialize)] pub struct EventFull { @@ -189,33 +188,18 @@ impl ByteEstimate for EventFull { } } -impl Mergeable for EventFull { - fn ts_min(&self) -> Option { - self.tss.front().map(|&x| x) +impl MergeableTy for EventFull { + fn ts_min(&self) -> Option { + self.tss.front().map(|&x| TsNano::from_ns(x)) } - fn ts_max(&self) -> Option { - self.tss.back().map(|&x| x) + fn ts_max(&self) -> Option { + self.tss.back().map(|&x| TsNano::from_ns(x)) } - fn new_empty(&self) -> Self { - Empty::empty() - } - - fn clear(&mut self) { - self.tss.clear(); - self.pulses.clear(); - self.blobs.clear(); - self.scalar_types.clear(); - self.be.clear(); - self.shapes.clear(); - self.comps.clear(); - self.entry_payload_max = 0; - } - - fn drain_into(&mut self, dst: &mut Self, range: (usize, usize)) -> Result<(), MergeError> { + fn drain_into(&mut self, dst: &mut Self, range: Range) -> DrainIntoDstResult { // TODO make it harder to forget new members when the struct may get modified in the future - let r = range.0..range.1; + let r = range; let mut max = dst.entry_payload_max; for i in r.clone() { max = max.max(self.blobs[i].len() as _); @@ -228,38 +212,50 @@ impl Mergeable for EventFull { dst.be.extend(self.be.drain(r.clone())); dst.shapes.extend(self.shapes.drain(r.clone())); dst.comps.extend(self.comps.drain(r.clone())); - Ok(()) + DrainIntoDstResult::Done } - fn find_lowest_index_gt(&self, ts: u64) -> Option { + fn drain_into_new(&mut self, range: Range) -> DrainIntoNewResult { + let mut dst = Self::empty(); + match self.drain_into(&mut dst, range) { + DrainIntoDstResult::Done => DrainIntoNewResult::Done(dst), + DrainIntoDstResult::Partial => DrainIntoNewResult::Partial(dst), + DrainIntoDstResult::NotCompatible => DrainIntoNewResult::NotCompatible, + } + } + + fn find_lowest_index_gt(&self, ts: TsNano) -> Option { for (i, &m) in self.tss.iter().enumerate() { - if m > ts { + if m > ts.ns() { return Some(i); } } None } - fn find_lowest_index_ge(&self, ts: u64) -> Option { + fn find_lowest_index_ge(&self, ts: TsNano) -> Option { for (i, &m) in self.tss.iter().enumerate() { - if m >= ts { + if m >= ts.ns() { return Some(i); } } None } - fn find_highest_index_lt(&self, ts: u64) -> Option { + fn find_highest_index_lt(&self, ts: TsNano) -> Option { for (i, &m) in self.tss.iter().enumerate().rev() { - if m < ts { + if m < ts.ns() { return Some(i); } } None } - fn tss(&self) -> Vec { - self.tss.iter().map(|x| netpod::TsMs::from_ns_u64(*x)).collect() + fn tss_for_testing(&self) -> Vec { + self.tss + .iter() + .map(|x| netpod::TsMs::from_ns_u64(*x)) + .collect() } } @@ -292,18 +288,18 @@ fn decompress(databuf: &[u8], type_size: u32) -> Result, DecompError> { return Err(DecompError::BadCompresionBlockSize); } let ele_count = value_bytes / type_size as u64; - trace2!( - "ele_count {} ele_count_2 {} ele_count_exp {}", - ele_count, - ele_count_2, - ele_count_exp - ); + trace2!("ele_count {}", ele_count); let mut decomp: Vec = Vec::with_capacity(type_size as usize * ele_count as usize); unsafe { decomp.set_len(decomp.capacity()); } - // #[cfg(DISABLED)] - match bitshuffle::bitshuffle_decompress(&databuf[12..], &mut decomp, ele_count as _, type_size as _, 0) { + match bitshuffle::bitshuffle_decompress( + &databuf[12..], + &mut decomp, + ele_count as _, + type_size as _, + 0, + ) { Ok(c1) => { if 12 + c1 != databuf.len() { Err(DecompError::UnusedBytes) diff --git a/src/eventsdim0.rs b/src/eventsdim0.rs index c2bf0d3..f62135b 100644 --- a/src/eventsdim0.rs +++ b/src/eventsdim0.rs @@ -14,7 +14,6 @@ use items_0::AsAnyRef; use items_0::Empty; use items_0::Events; use items_0::EventsNonObj; -use items_0::MergeError; use items_0::Resettable; use items_0::TypeName; use items_0::WithLen; @@ -578,11 +577,7 @@ impl Events for EventsDim0 { Box::new(Self::empty()) } - fn drain_into_evs( - &mut self, - dst: &mut dyn Events, - range: (usize, usize), - ) -> Result<(), MergeError> { + fn drain_into_evs(&mut self, dst: &mut dyn Events, range: (usize, usize)) -> Result<(), Error> { // TODO as_any and as_any_mut are declared on unrelated traits. Simplify. if let Some(dst) = dst.as_any_mut().downcast_mut::() { // TODO make it harder to forget new members when the struct may get modified in the future @@ -779,85 +774,3 @@ where self.values.push_back(value); } } - -#[cfg(test)] -mod test_frame { - use super::*; - use crate::channelevents::ChannelEvents; - use crate::framable::Framable; - use crate::framable::INMEM_FRAME_ENCID; - use crate::frame::decode_frame; - use crate::inmem::InMemoryFrame; - use items_0::streamitem::RangeCompletableItem; - use items_0::streamitem::Sitemty; - use items_0::streamitem::StreamItem; - - #[test] - fn events_serialize() { - // taskrun::tracing_init_testing().unwrap(); - let mut events = EventsDim0::empty(); - events.push(123, 234, 55f32); - let events = events; - let events: Box = Box::new(events); - let item = ChannelEvents::Events(events); - let item = Ok::<_, Error>(StreamItem::DataItem(RangeCompletableItem::Data(item))); - let mut buf = item.make_frame_dyn().unwrap(); - let s = String::from_utf8_lossy(&buf[20..buf.len() - 4]); - eprintln!("[[{s}]]"); - let buflen = buf.len(); - let frame = InMemoryFrame { - encid: INMEM_FRAME_ENCID, - tyid: 0x2500, - len: (buflen - 24) as _, - buf: buf.split_off(20).split_to(buflen - 20 - 4).freeze(), - }; - let item: Sitemty = decode_frame(&frame).unwrap(); - let item = if let Ok(x) = item { x } else { panic!() }; - let item = if let StreamItem::DataItem(x) = item { - x - } else { - panic!() - }; - let item = if let RangeCompletableItem::Data(x) = item { - x - } else { - panic!() - }; - let mut item = if let ChannelEvents::Events(x) = item { - x - } else { - panic!() - }; - let item = if let Some(item) = item.as_any_mut().downcast_mut::>() { - item - } else { - panic!() - }; - assert_eq!(item.tss(), &[123]); - } -} - -#[cfg(test)] -mod test_serde_opt { - use super::*; - - #[derive(Serialize)] - struct A { - a: Option, - #[serde(default)] - b: Option, - #[serde(default, skip_serializing_if = "Option::is_none")] - c: Option, - } - - #[test] - fn test_a() { - let s = serde_json::to_string(&A { - a: None, - b: None, - c: None, - }) - .unwrap(); - assert_eq!(s, r#"{"a":null,"b":null}"#); - } -} diff --git a/src/eventsdim0enum.rs b/src/eventsdim0enum.rs index da4319e..f1e3307 100644 --- a/src/eventsdim0enum.rs +++ b/src/eventsdim0enum.rs @@ -396,7 +396,7 @@ impl Events for EventsDim0Enum { &mut self, _dst: &mut dyn Events, _range: (usize, usize), - ) -> Result<(), items_0::MergeError> { + ) -> Result<(), err::Error> { todo!() } diff --git a/src/eventsdim1.rs b/src/eventsdim1.rs index 1b3a352..8d4a965 100644 --- a/src/eventsdim1.rs +++ b/src/eventsdim1.rs @@ -15,7 +15,6 @@ use items_0::AsAnyRef; use items_0::Empty; use items_0::Events; use items_0::EventsNonObj; -use items_0::MergeError; use items_0::TypeName; use items_0::WithLen; use netpod::is_false; @@ -530,11 +529,7 @@ impl Events for EventsDim1 { Box::new(Self::empty()) } - fn drain_into_evs( - &mut self, - dst: &mut dyn Events, - range: (usize, usize), - ) -> Result<(), MergeError> { + fn drain_into_evs(&mut self, dst: &mut dyn Events, range: (usize, usize)) -> Result<(), Error> { // TODO as_any and as_any_mut are declared on unrelated traits. Simplify. if let Some(dst) = dst.as_any_mut().downcast_mut::() { // TODO make it harder to forget new members when the struct may get modified in the future @@ -545,7 +540,8 @@ impl Events for EventsDim1 { Ok(()) } else { error!("downcast to EventsDim0 FAILED"); - Err(MergeError::NotCompatible) + // Err(Error::NotCompatible) + todo!() } } diff --git a/src/frame.rs b/src/frame.rs index 3e736b2..50d6a47 100644 --- a/src/frame.rs +++ b/src/frame.rs @@ -355,14 +355,22 @@ where return Err(Error::UnknownEncoder(frame.encid())); } if frame.len() as usize != frame.buf().len() { - return Err(Error::BufferMismatch(frame.len(), frame.buf().len(), frame.tyid())); + return Err(Error::BufferMismatch( + frame.len(), + frame.buf().len(), + frame.tyid(), + )); } if frame.tyid() == ERROR_FRAME_TYPE_ID { // error frames are always encoded as json let k: err::Error = match json_from_slice(frame.buf()) { Ok(item) => item, Err(e) => { - error!("deserialize len {} ERROR_FRAME_TYPE_ID {}", frame.buf().len(), e); + error!( + "deserialize len {} ERROR_FRAME_TYPE_ID {}", + frame.buf().len(), + e + ); let n = frame.buf().len().min(256); let s = String::from_utf8_lossy(&frame.buf()[..n]); error!("frame.buf as string: {:?}", s); @@ -374,7 +382,11 @@ where let k: LogItem = match decode_from_slice(frame.buf()) { Ok(item) => item, Err(e) => { - error!("deserialize len {} LOG_FRAME_TYPE_ID {}", frame.buf().len(), e); + error!( + "deserialize len {} LOG_FRAME_TYPE_ID {}", + frame.buf().len(), + e + ); let n = frame.buf().len().min(128); let s = String::from_utf8_lossy(&frame.buf()[..n]); error!("frame.buf as string: {:?}", s); @@ -386,7 +398,11 @@ where let k: StatsItem = match decode_from_slice(frame.buf()) { Ok(item) => item, Err(e) => { - error!("deserialize len {} STATS_FRAME_TYPE_ID {}", frame.buf().len(), e); + error!( + "deserialize len {} STATS_FRAME_TYPE_ID {}", + frame.buf().len(), + e + ); let n = frame.buf().len().min(128); let s = String::from_utf8_lossy(&frame.buf()[..n]); error!("frame.buf as string: {:?}", s); @@ -413,7 +429,10 @@ where ); let n = frame.buf().len().min(64); let s = String::from_utf8_lossy(&frame.buf()[..n]); - error!("decode_from_slice bad frame.buf as bytes: {:?}", &frame.buf()[..n]); + error!( + "decode_from_slice bad frame.buf as bytes: {:?}", + &frame.buf()[..n] + ); error!("decode_from_slice bad frame.buf as string: {:?}", s); Err(e)? } diff --git a/src/merger.rs b/src/merger.rs index 135d75d..7655b34 100644 --- a/src/merger.rs +++ b/src/merger.rs @@ -1,18 +1,23 @@ -use crate::Error; +use crate::log::*; +use core::ops::Range; use futures_util::Stream; use futures_util::StreamExt; use items_0::container::ByteEstimate; +use items_0::merge::DrainIntoDstResult; +use items_0::merge::DrainIntoNewResult; +use items_0::merge::MergeableTy; use items_0::on_sitemty_data; use items_0::streamitem::sitem_data; +use items_0::streamitem::sitem_err2_from_string; use items_0::streamitem::LogItem; use items_0::streamitem::RangeCompletableItem; +use items_0::streamitem::SitemErrTy; use items_0::streamitem::Sitemty; use items_0::streamitem::StreamItem; use items_0::Events; -use items_0::MergeError; use items_0::WithLen; -use netpod::log::*; use netpod::TsMs; +use netpod::TsNano; use std::collections::VecDeque; use std::fmt; use std::ops::ControlFlow; @@ -29,59 +34,14 @@ macro_rules! trace3 { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) } macro_rules! trace4 { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) } -pub trait Mergeable: fmt::Debug + WithLen + ByteEstimate + Unpin { - fn ts_min(&self) -> Option; - fn ts_max(&self) -> Option; - fn new_empty(&self) -> Self; - fn clear(&mut self); - // TODO when MergeError::Full gets returned, any guarantees about what has been modified or kept unchanged? - fn drain_into(&mut self, dst: &mut Self, range: (usize, usize)) -> Result<(), MergeError>; - fn find_lowest_index_gt(&self, ts: u64) -> Option; - fn find_lowest_index_ge(&self, ts: u64) -> Option; - fn find_highest_index_lt(&self, ts: u64) -> Option; - // TODO only for testing: - fn tss(&self) -> Vec; -} - -impl Mergeable for Box { - fn ts_min(&self) -> Option { - self.as_ref().ts_min() - } - - fn ts_max(&self) -> Option { - self.as_ref().ts_max() - } - - fn new_empty(&self) -> Self { - self.as_ref().new_empty_evs() - } - - fn clear(&mut self) { - Events::clear(self.as_mut()) - } - - fn drain_into(&mut self, dst: &mut Self, range: (usize, usize)) -> Result<(), MergeError> { - self.as_mut().drain_into_evs(dst, range) - } - - fn find_lowest_index_gt(&self, ts: u64) -> Option { - self.as_ref().find_lowest_index_gt_evs(ts) - } - - fn find_lowest_index_ge(&self, ts: u64) -> Option { - self.as_ref().find_lowest_index_ge_evs(ts) - } - - fn find_highest_index_lt(&self, ts: u64) -> Option { - self.as_ref().find_highest_index_lt_evs(ts) - } - - fn tss(&self) -> Vec { - Events::tss(self) - .iter() - .map(|x| netpod::TsMs::from_ns_u64(*x)) - .collect() - } +#[derive(Debug, thiserror::Error)] +#[cstm(name = "MergerError")] +pub enum Error { + NoPendingButMissing, + Input(SitemErrTy), + ShouldFindTsMin, + ItemShouldHaveTsMax, + PartialPathDrainedAllItems, } type MergeInp = Pin> + Send>>; @@ -95,8 +55,7 @@ pub struct Merger { range_complete: Vec, out_of_band_queue: VecDeque>, log_queue: VecDeque, - dim0ix_max: u64, - done_emit_first_empty: bool, + dim0ix_max: TsNano, done_data: bool, done_buffered: bool, done_range_complete: bool, @@ -106,7 +65,7 @@ pub struct Merger { impl fmt::Debug for Merger where - T: Mergeable, + T: MergeableTy, { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { let inps: Vec<_> = self.inps.iter().map(|x| x.is_some()).collect(); @@ -125,7 +84,7 @@ where impl Merger where - T: Mergeable, + T: MergeableTy, { pub fn new(inps: Vec>, out_max_len: Option) -> Self { let n = inps.len(); @@ -138,8 +97,7 @@ where range_complete: vec![false; n], out_of_band_queue: VecDeque::new(), log_queue: VecDeque::new(), - dim0ix_max: 0, - done_emit_first_empty: false, + dim0ix_max: TsNano::from_ns(0), done_data: false, done_buffered: false, done_range_complete: false, @@ -148,37 +106,51 @@ where } } - fn drain_into_upto(src: &mut T, dst: &mut T, upto: u64) -> Result<(), MergeError> { + fn drain_into_dst_upto(src: &mut T, dst: &mut T, upto: TsNano) -> DrainIntoDstResult { match src.find_lowest_index_gt(upto) { - Some(ilgt) => { - src.drain_into(dst, (0, ilgt))?; - } + Some(ilgt) => src.drain_into(dst, 0..ilgt), None => { // TODO should not be here. - src.drain_into(dst, (0, src.len()))?; + src.drain_into(dst, 0..src.len()) } } - Ok(()) } - fn take_into_output_all(&mut self, src: &mut T) -> Result<(), MergeError> { - // TODO optimize the case when some large batch should be added to some existing small batch already in out. - // TODO maybe use two output slots? - self.take_into_output_upto(src, u64::MAX) + fn drain_into_new_upto(src: &mut T, upto: TsNano) -> DrainIntoNewResult { + match src.find_lowest_index_gt(upto) { + Some(ilgt) => src.drain_into_new(0..ilgt), + None => { + // TODO should not be here. + src.drain_into_new(0..src.len()) + } + } } - fn take_into_output_upto(&mut self, src: &mut T, upto: u64) -> Result<(), MergeError> { + fn take_into_output_upto(&mut self, src: &mut T, upto: TsNano) -> DrainIntoDstResult { // TODO optimize the case when some large batch should be added to some existing small batch already in out. // TODO maybe use two output slots? if let Some(out) = self.out.as_mut() { - Self::drain_into_upto(src, out, upto)?; + Self::drain_into_dst_upto(src, out, upto) } else { trace2!("move into fresh"); - let mut fresh = src.new_empty(); - Self::drain_into_upto(src, &mut fresh, upto)?; - self.out = Some(fresh); + match Self::drain_into_new_upto(src, upto) { + DrainIntoNewResult::Done(x) => { + self.out = Some(x); + DrainIntoDstResult::Done + } + DrainIntoNewResult::Partial(x) => { + self.out = Some(x); + DrainIntoDstResult::Partial + } + DrainIntoNewResult::NotCompatible => DrainIntoDstResult::NotCompatible, + } } - Ok(()) + } + + fn take_into_output_all(&mut self, src: &mut T) -> DrainIntoDstResult { + // TODO optimize the case when some large batch should be added to some existing small batch already in out. + // TODO maybe use two output slots? + self.take_into_output_upto(src, TsNano::from_ns(u64::MAX)) } fn process(mut self: Pin<&mut Self>, _cx: &mut Context) -> Result, Error> { @@ -227,7 +199,7 @@ where "dim0ix_max {} vs {} diff {}", self.dim0ix_max, t1, - self.dim0ix_max - t1 + self.dim0ix_max.ns() - t1.ns() ), }; log_items.push(item); @@ -246,8 +218,15 @@ where let mut item = self.items[il0].take().unwrap(); trace3!("Take all from item {item:?}"); match self.take_into_output_all(&mut item) { - Ok(()) => Ok(Break(())), - Err(MergeError::Full) | Err(MergeError::NotCompatible) => { + DrainIntoDstResult::Done => Ok(Break(())), + DrainIntoDstResult::Partial => { + // TODO count for stats + trace3!("Put item back"); + self.items[il0] = Some(item); + self.do_clear_out = true; + Ok(Break(())) + } + DrainIntoDstResult::NotCompatible => { // TODO count for stats trace3!("Put item back"); self.items[il0] = Some(item); @@ -259,20 +238,26 @@ where // Take only up to the lowest ts of the second-lowest input let mut item = self.items[il0].take().unwrap(); trace3!("Take up to {tl1} from item {item:?}"); - let res = self.take_into_output_upto(&mut item, tl1); - match res { - Ok(()) => { + match self.take_into_output_upto(&mut item, tl1) { + DrainIntoDstResult::Done => { if item.len() == 0 { // TODO should never be here because we should have taken the whole item - Err(format!("Should have taken the whole item instead").into()) + Err(Error::PartialPathDrainedAllItems) } else { self.items[il0] = Some(item); Ok(Break(())) } } - Err(MergeError::Full) | Err(MergeError::NotCompatible) => { + DrainIntoDstResult::Partial => { // TODO count for stats - info!("Put item back because {res:?}"); + trace3!("Put item back because Partial"); + self.items[il0] = Some(item); + self.do_clear_out = true; + Ok(Break(())) + } + DrainIntoDstResult::NotCompatible => { + // TODO count for stats + trace3!("Put item back because NotCompatible"); self.items[il0] = Some(item); self.do_clear_out = true; Ok(Break(())) @@ -280,16 +265,22 @@ where } } } else { - // TODO should never be here because ts-max should always exist here. - Err(format!("selected input without max ts").into()) + Err(Error::ItemShouldHaveTsMax) } } else { // No other input, take the whole item let mut item = self.items[il0].take().unwrap(); trace3!("Take all from item (no other input) {item:?}"); match self.take_into_output_all(&mut item) { - Ok(()) => Ok(Break(())), - Err(_) => { + DrainIntoDstResult::Done => Ok(Break(())), + DrainIntoDstResult::Partial => { + // TODO count for stats + trace3!("Put item back"); + self.items[il0] = Some(item); + self.do_clear_out = true; + Ok(Break(())) + } + DrainIntoDstResult::NotCompatible => { // TODO count for stats trace3!("Put item back"); self.items[il0] = Some(item); @@ -299,7 +290,7 @@ where } } } else { - Err(format!("after low ts search nothing found").into()) + Err(Error::ShouldFindTsMin) } } @@ -314,13 +305,6 @@ where Ready(Some(Ok(k))) => match k { StreamItem::DataItem(k) => match k { RangeCompletableItem::Data(k) => { - if self.done_emit_first_empty == false { - trace!("emit first empty marker item"); - self.done_emit_first_empty = true; - let item = k.new_empty(); - let item = sitem_data(item); - self.out_of_band_queue.push_back(item); - } self.items[i] = Some(k); trace4!("refilled {}", i); } @@ -344,7 +328,7 @@ where }, Ready(Some(Err(e))) => { self.inps[i] = None; - return Err(e.into()); + return Err(Error::Input(e)); } Ready(None) => { self.inps[i] = None; @@ -364,10 +348,7 @@ where } } - fn poll3( - mut self: Pin<&mut Self>, - cx: &mut Context, - ) -> ControlFlow>>> { + fn poll3(mut self: Pin<&mut Self>, cx: &mut Context) -> ControlFlow>> { use ControlFlow::*; use Poll::*; trace4!("poll3"); @@ -382,15 +363,15 @@ where .count(); trace3!("ninps {ninps} nitems {nitems} nitemsmissing {nitemsmissing}"); if nitemsmissing != 0 { - let e = Error::from(format!("missing but no pending")); - return Break(Ready(Some(Err(e)))); + let e = Error::NoPendingButMissing; + return Break(Ready(Some(e))); } let last_emit = nitems == 0; if nitems != 0 { match Self::process(Pin::new(&mut self), cx) { Ok(Break(())) => {} Ok(Continue(())) => {} - Err(e) => return Break(Ready(Some(Err(e)))), + Err(e) => return Break(Ready(Some(e))), } } if let Some(o) = self.out.as_ref() { @@ -426,23 +407,20 @@ where } } - fn poll2( - mut self: Pin<&mut Self>, - cx: &mut Context, - ) -> ControlFlow>>> { + fn poll2(mut self: Pin<&mut Self>, cx: &mut Context) -> ControlFlow>> { use ControlFlow::*; use Poll::*; match Self::refill(Pin::new(&mut self), cx) { Ok(Ready(())) => Self::poll3(self, cx), Ok(Pending) => Break(Pending), - Err(e) => Break(Ready(Some(Err(e)))), + Err(e) => Break(Ready(Some(e))), } } } impl Stream for Merger where - T: Mergeable, + T: MergeableTy, { type Item = Sitemty; @@ -492,19 +470,9 @@ where match Self::poll2(self.as_mut(), cx) { ControlFlow::Continue(()) => continue, ControlFlow::Break(k) => match k { - Ready(Some(Ok(out))) => { - if true { - error!("THIS BRANCH SHOULD NO LONGER OCCUR, REFACTOR"); - self.done_data = true; - let e = Error::from(format!("TODO refactor direct emit in merger")); - return Ready(Some(Err(e.into()))); - } - trace!("emit buffered len {}", out.len()); - Ready(Some(sitem_data(out))) - } - Ready(Some(Err(e))) => { + Ready(Some(e)) => { self.done_data = true; - Ready(Some(Err(e.into()))) + Ready(Some(Err(sitem_err2_from_string(e)))) } Ready(None) => { self.done_data = true; diff --git a/src/test.rs b/src/test.rs index 15123b0..fa9202c 100644 --- a/src/test.rs +++ b/src/test.rs @@ -104,11 +104,18 @@ fn items_merge_01() { let v0 = ChannelEvents::Events(evs0); let v1 = ChannelEvents::Events(evs1); let v2 = ChannelEvents::Status(Some(ConnStatusEvent::new(MS * 100, ConnStatus::Connect))); - let v3 = ChannelEvents::Status(Some(ConnStatusEvent::new(MS * 2300, ConnStatus::Disconnect))); + let v3 = ChannelEvents::Status(Some(ConnStatusEvent::new( + MS * 2300, + ConnStatus::Disconnect, + ))); let v4 = ChannelEvents::Status(Some(ConnStatusEvent::new(MS * 2800, ConnStatus::Connect))); let stream0 = Box::pin(stream::iter(vec![sitem_data(v0)])); let stream1 = Box::pin(stream::iter(vec![sitem_data(v1)])); - let stream2 = Box::pin(stream::iter(vec![sitem_data(v2), sitem_data(v3), sitem_data(v4)])); + let stream2 = Box::pin(stream::iter(vec![ + sitem_data(v2), + sitem_data(v3), + sitem_data(v4), + ])); let mut merger = Merger::new(vec![stream0, stream1, stream2], Some(8)); let mut total_event_count = 0; while let Some(item) = merger.next().await { @@ -139,11 +146,18 @@ fn items_merge_02() { let v0 = ChannelEvents::Events(evs0); let v1 = ChannelEvents::Events(evs1); let v2 = ChannelEvents::Status(Some(ConnStatusEvent::new(MS * 100, ConnStatus::Connect))); - let v3 = ChannelEvents::Status(Some(ConnStatusEvent::new(MS * 2300, ConnStatus::Disconnect))); + let v3 = ChannelEvents::Status(Some(ConnStatusEvent::new( + MS * 2300, + ConnStatus::Disconnect, + ))); let v4 = ChannelEvents::Status(Some(ConnStatusEvent::new(MS * 2800, ConnStatus::Connect))); let stream0 = Box::pin(stream::iter(vec![sitem_data(v0)])); let stream1 = Box::pin(stream::iter(vec![sitem_data(v1)])); - let stream2 = Box::pin(stream::iter(vec![sitem_data(v2), sitem_data(v3), sitem_data(v4)])); + let stream2 = Box::pin(stream::iter(vec![ + sitem_data(v2), + sitem_data(v3), + sitem_data(v4), + ])); let mut merger = Merger::new(vec![stream0, stream1, stream2], Some(8)); let mut total_event_count = 0; while let Some(item) = merger.next().await { @@ -299,9 +313,9 @@ fn merge_02() { datetime: std::time::SystemTime::UNIX_EPOCH, status: ConnStatus::Disconnect, }; - let item: Sitemty = Ok(StreamItem::DataItem(RangeCompletableItem::Data( - ChannelEvents::Status(Some(ev)), - ))); + let item: Sitemty = Ok(StreamItem::DataItem( + RangeCompletableItem::Data(ChannelEvents::Status(Some(ev))), + )); vec![item] }; @@ -311,9 +325,9 @@ fn merge_02() { datetime: std::time::SystemTime::UNIX_EPOCH, status: ConnStatus::Disconnect, }; - let item: Sitemty = Ok(StreamItem::DataItem(RangeCompletableItem::Data( - ChannelEvents::Status(Some(ev)), - ))); + let item: Sitemty = Ok(StreamItem::DataItem( + RangeCompletableItem::Data(ChannelEvents::Status(Some(ev))), + )); vec![item] }; @@ -366,7 +380,9 @@ fn bin_01() { let cev = ChannelEvents::Events(Box::new(events)); events_vec1.push(Ok(StreamItem::DataItem(RangeCompletableItem::Data(cev)))); } - events_vec1.push(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete))); + events_vec1.push(Ok(StreamItem::DataItem( + RangeCompletableItem::RangeComplete, + ))); let inp1 = events_vec1; let inp1 = futures_util::stream::iter(inp1); let inp1 = Box::pin(inp1); @@ -423,7 +439,9 @@ fn binned_timeout_00() { let cev = ChannelEvents::Events(Box::new(events)); events_vec1.push(Ok(StreamItem::DataItem(RangeCompletableItem::Data(cev)))); } - events_vec1.push(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete))); + events_vec1.push(Ok(StreamItem::DataItem( + RangeCompletableItem::RangeComplete, + ))); let inp1 = VecStream::new(events_vec1.into_iter().collect()); let inp1 = inp1.enumerate2().then2(|(i, k)| async move { if i == 5 { @@ -431,7 +449,10 @@ fn binned_timeout_00() { } k }); - let edges: Vec<_> = (0..10).into_iter().map(|x| TSBASE + SEC * (1 + x)).collect(); + let edges: Vec<_> = (0..10) + .into_iter() + .map(|x| TSBASE + SEC * (1 + x)) + .collect(); let range = NanoRange { beg: TSBASE + SEC * 1, end: TSBASE + SEC * 10, diff --git a/src/test/eventsdim1.rs b/src/test/eventsdim1.rs index e69de29..8b13789 100644 --- a/src/test/eventsdim1.rs +++ b/src/test/eventsdim1.rs @@ -0,0 +1 @@ + diff --git a/src/testgen.rs b/src/testgen.rs index 07d19f4..c253c3c 100644 --- a/src/testgen.rs +++ b/src/testgen.rs @@ -1,3 +1,5 @@ +pub mod events_gen; + use crate::eventsdim0::EventsDim0; use crate::Events; use items_0::Appendable; @@ -12,7 +14,13 @@ fn xorshift32(state: u32) -> u32 { x } -pub fn make_some_boxed_d0_f32(n: usize, t0: u64, tstep: u64, tmask: u64, seed: u32) -> Box { +pub fn make_some_boxed_d0_f32( + n: usize, + t0: u64, + tstep: u64, + tmask: u64, + seed: u32, +) -> Box { let mut vstate = seed; let mut events = EventsDim0::empty(); for i in 0..n { diff --git a/src/testgen/events_gen.rs b/src/testgen/events_gen.rs new file mode 100644 index 0000000..6cb427e --- /dev/null +++ b/src/testgen/events_gen.rs @@ -0,0 +1,69 @@ +use crate::binning::container_events::ContainerEvents; +use crate::eventsdim0::EventsDim0; +use items_0::Empty; +use items_0::WithLen; +use netpod::range::evrange::NanoRange; +use netpod::TsNano; +use std::pin::Pin; + +fn boxed_conts(inp: S) -> Pin::Item> + Send>> +where + S: Iterator + Send + 'static, +{ + Box::pin(inp) +} + +pub fn old_events_gen_dim0_f32_v00(range: NanoRange) -> impl Iterator> { + let dt = 1000 * 1000 * 10; + let beg = range.beg(); + let end = range.end(); + let mut ts = beg - dt; + std::iter::repeat(0) + .map(move |_| { + type T = f32; + let mut c = EventsDim0::empty(); + loop { + let ts1 = TsNano::from_ns(ts); + ts += dt; + if ts1.ns() >= end { + break; + } + let val = (ts / 1_000_000) as T; + c.push_back(ts1.ns(), 0, val); + if c.len() >= 8 { + break; + } + } + c + }) + .take_while(|c| c.len() != 0) +} + +pub fn new_events_gen_dim1_f32_v00( + range: NanoRange, +) -> impl Iterator>> { + let dt = 1000 * 1000 * 10; + let beg = range.beg(); + let end = range.end(); + let mut ts = beg - dt; + std::iter::repeat(0) + .map(move |_| { + type T = f32; + let mut c = ContainerEvents::new(); + loop { + let ts1 = TsNano::from_ns(ts); + ts += dt; + if ts1.ns() >= end { + break; + } + let val = (ts / 1_000_000) as T; + let val = vec![val; 16]; + c.push_back(ts1, val); + if c.len() >= 8 { + break; + } + } + c + }) + .take_while(|c| c.len() != 0) +}