From ed23e352f64ec2362588f26b83a9b960a65bc0cf Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Fri, 7 Mar 2025 11:42:14 +0100 Subject: [PATCH] Improve byte estimate --- Cargo.toml | 6 +- src/binning.rs | 1 - src/binning/aggregator.rs | 4 +- src/binning/binnedvaluetype.rs | 8 -- src/binning/container/bins.rs | 4 +- src/binning/container_bins.rs | 60 ++++----- src/binning/container_events.rs | 114 ++++++++++++------ .../timeweight/timeweight_bins_stream.rs | 31 +++-- .../timeweight/timeweight_events_dyn.rs | 41 ++++--- src/binning/valuetype.rs | 34 +++--- src/channelevents.rs | 67 +++++----- src/eventfull.rs | 4 +- src/framable.rs | 15 +-- src/frame.rs | 17 ++- src/inmem.rs | 26 ++-- src/merger.rs | 42 +++---- src/testgen/events_gen.rs | 10 +- 17 files changed, 253 insertions(+), 231 deletions(-) delete mode 100644 src/binning/binnedvaluetype.rs diff --git a/Cargo.toml b/Cargo.toml index 3cd8b9b..ddf0b4a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,7 +2,7 @@ name = "daqbuf-items-2" version = "0.0.4" authors = ["Dominik Werder "] -edition = "2021" +edition = "2024" [dependencies] serde = { version = "1", features = ["derive"] } @@ -20,7 +20,6 @@ futures-util = "0.3.24" humantime-serde = "1.1.1" itertools = "0.13.0" autoerr = "0.0.3" -thiserror = "=0.0.1" daqbuf-err = { path = "../daqbuf-err" } items_0 = { path = "../daqbuf-items-0", package = "daqbuf-items-0" } items_proc = { path = "../daqbuf-items-proc", package = "daqbuf-items-proc" } @@ -28,9 +27,6 @@ netpod = { path = "../daqbuf-netpod", package = "daqbuf-netpod" } parse = { path = "../daqbuf-parse", package = "daqbuf-parse" } bitshuffle = { path = "../daqbuf-bitshuffle", package = "daqbuf-bitshuffle" } -[patch.crates-io] -thiserror = { git = "https://github.com/dominikwerder/thiserror.git", branch = "cstm" } - [features] heavy = [] diff --git a/src/binning.rs b/src/binning.rs index 1d15a68..726e475 100644 --- a/src/binning.rs +++ b/src/binning.rs @@ -1,5 +1,4 @@ pub mod aggregator; -pub mod binnedvaluetype; pub mod container; pub mod container_bins; pub mod container_events; diff --git a/src/binning/aggregator.rs b/src/binning/aggregator.rs index 3d0911a..d7163d9 100644 --- a/src/binning/aggregator.rs +++ b/src/binning/aggregator.rs @@ -11,9 +11,9 @@ use netpod::EnumVariant; use serde::Deserialize; use serde::Serialize; -macro_rules! trace_event { ($($arg:tt)*) => ( if false { trace!($($arg)*); }) } +macro_rules! trace_event { ($($arg:expr),*) => ( if false { trace!($($arg),*); }) } -macro_rules! trace_result { ($($arg:tt)*) => ( if true { trace!($($arg)*); }) } +macro_rules! trace_result { ($($arg:expr),*) => ( if false { trace!($($arg),*); }) } pub trait AggTimeWeightOutputAvg: BinAggedType + Serialize + for<'a> Deserialize<'a> {} diff --git a/src/binning/binnedvaluetype.rs b/src/binning/binnedvaluetype.rs deleted file mode 100644 index 13d893e..0000000 --- a/src/binning/binnedvaluetype.rs +++ /dev/null @@ -1,8 +0,0 @@ -pub trait BinnedValueType {} - -pub struct BinnedNumericValue { - avg: f32, - t: Option, -} - -impl BinnedValueType for BinnedNumericValue {} diff --git a/src/binning/container/bins.rs b/src/binning/container/bins.rs index fae3670..cd309fb 100644 --- a/src/binning/container/bins.rs +++ b/src/binning/container/bins.rs @@ -96,7 +96,7 @@ impl AggBinValTw for AggBinValTwF32 { Self { sum: 0. } } - fn ingest(&mut self, dt: DtNano, bl: DtNano, cnt: u64, val: f32) { + fn ingest(&mut self, dt: DtNano, bl: DtNano, _cnt: u64, val: f32) { let f = dt.ns() as f32 / bl.ns() as f32; self.sum += f * val; } @@ -122,7 +122,7 @@ impl AggBinValTw for AggBinValTwF64 { Self { sum: 0. } } - fn ingest(&mut self, dt: DtNano, bl: DtNano, cnt: u64, val: f64) { + fn ingest(&mut self, dt: DtNano, bl: DtNano, _cnt: u64, val: f64) { let f = dt.ns() as f32 / bl.ns() as f32; self.sum += f as f64 * val; } diff --git a/src/binning/container_bins.rs b/src/binning/container_bins.rs index 3e5d5a1..fe85b89 100644 --- a/src/binning/container_bins.rs +++ b/src/binning/container_bins.rs @@ -6,6 +6,10 @@ use crate::binning::container::bins::BinAggedContainer; use crate::log::*; use core::fmt; use daqbuf_err as err; +use items_0::AsAnyMut; +use items_0::AsAnyRef; +use items_0::TypeName; +use items_0::WithLen; use items_0::apitypes::ToUserFacingApiType; use items_0::collect_s::CollectableDyn; use items_0::collect_s::CollectedDyn; @@ -16,18 +20,12 @@ use items_0::merge::MergeableTy; use items_0::timebin::BinningggContainerBinsDyn; use items_0::timebin::BinsBoxed; use items_0::vecpreview::VecPreview; -use items_0::AsAnyMut; -use items_0::AsAnyRef; -use items_0::TypeName; -use items_0::WithLen; -use netpod::f32_close; use netpod::TsNano; +use netpod::f32_close; use std::any; use std::collections::VecDeque; use std::mem; -macro_rules! trace_init { ($($arg:tt)*) => ( if true { trace!($($arg)*); }) } - autoerr::create_error_v1!( name(ContainerBinsError, "ContainerBins"), enum variants { @@ -121,7 +119,7 @@ mod container_bins_serde { EVT: EventValueType, BVT: BinAggedType, { - fn serialize(&self, ser: S) -> Result + fn serialize(&self, _ser: S) -> Result where S: Serializer, { @@ -134,7 +132,7 @@ mod container_bins_serde { EVT: EventValueType, BVT: BinAggedType, { - fn deserialize(de: D) -> Result + fn deserialize(_de: D) -> Result where D: Deserializer<'de>, { @@ -340,6 +338,17 @@ where } } +impl ByteEstimate for ContainerBins +where + EVT: EventValueType, + BVT: BinAggedType, +{ + fn byte_estimate(&self) -> u32 { + // TODO + self.len() as u32 * 800 + } +} + pub fn compare_boxed_f32(lhs: &ContainerBins, rhs: &ContainerBins) -> bool { if let Some(lhs) = lhs.as_any_ref().downcast_ref::>() { if let Some(rhs) = rhs.as_any_ref().downcast_ref::>() { @@ -457,17 +466,6 @@ where } } -impl ByteEstimate for ContainerBins -where - EVT: EventValueType, - BVT: BinAggedType, -{ - fn byte_estimate(&self) -> u64 { - // TODO ByteEstimate for ContainerBins - 128 * self.len() as u64 - } -} - #[derive(Debug)] pub struct ContainerBinsCollectorOutput where @@ -580,9 +578,9 @@ where EVT: EventValueType, BVT: BinAggedType, { - fn byte_estimate(&self) -> u64 { + fn byte_estimate(&self) -> u32 { // TODO need better estimate - self.bins.len() as u64 * 200 + self.bins.len() as u32 * 400 } } @@ -720,29 +718,17 @@ where fn find_lowest_index_gt(&self, ts: TsNano) -> Option { let x = self.ts1s.partition_point(|&x| x <= ts); - if x >= self.ts1s.len() { - None - } else { - Some(x) - } + if x >= self.ts1s.len() { None } else { Some(x) } } fn find_lowest_index_ge(&self, ts: TsNano) -> Option { let x = self.ts1s.partition_point(|&x| x < ts); - if x >= self.ts1s.len() { - None - } else { - Some(x) - } + if x >= self.ts1s.len() { None } else { Some(x) } } fn find_highest_index_lt(&self, ts: TsNano) -> Option { let x = self.ts1s.partition_point(|&x| x < ts); - if x == 0 { - None - } else { - Some(x - 1) - } + if x == 0 { None } else { Some(x - 1) } } fn tss_for_testing(&self) -> VecDeque { diff --git a/src/binning/container_events.rs b/src/binning/container_events.rs index c8af133..a53224d 100644 --- a/src/binning/container_events.rs +++ b/src/binning/container_events.rs @@ -37,6 +37,7 @@ use netpod::TsNano; use serde::Deserialize; use serde::Serialize; use std::any; +use std::cell::RefCell; use std::collections::VecDeque; macro_rules! trace_init { ($($arg:tt)*) => ( if false { trace!($($arg)*); }) } @@ -70,6 +71,7 @@ where fn truncate_front(&mut self, len: usize); fn into_user_facing_fields(self) -> Vec<(String, Box)>; fn into_user_facing_fields_json(self) -> Vec<(String, Box)>; + fn byte_estimate(&self) -> u32; } pub trait PartialOrdEvtA { @@ -82,9 +84,9 @@ pub trait EventValueType: fmt::Debug + Clone + PartialOrd + Send + Unpin + 'stat type AggTimeWeightOutputAvg: AggTimeWeightOutputAvg; type IterTy1<'a>: fmt::Debug + Clone + PartialOrdEvtA + Into; const SERDE_ID: u32; - const BYTE_ESTIMATE_V00: u32; fn to_f32_for_binning_v01(&self) -> f32; fn scalar_type_name_string() -> String; + fn byte_estimate(&self) -> u32; } impl Container for VecDeque @@ -134,6 +136,10 @@ where fn into_user_facing_fields_json(self) -> Vec<(String, Box)> { vec![("values".into(), Box::new(self))] } + + fn byte_estimate(&self) -> u32 { + self.iter().fold(0, |a, x| a + x.byte_estimate()) + } } impl Container for VecDeque { @@ -179,6 +185,10 @@ impl Container for VecDeque { fn into_user_facing_fields_json(self) -> Vec<(String, Box)> { vec![("values".into(), Box::new(self))] } + + fn byte_estimate(&self) -> u32 { + self.iter().fold(0, |a, x| a + x.bytes().len() as u32) + } } macro_rules! impl_event_value_type { @@ -189,15 +199,16 @@ macro_rules! impl_event_value_type { type AggTimeWeightOutputAvg = f64; type IterTy1<'a> = $evt; const SERDE_ID: u32 = <$evt as SubFrId>::SUB as _; - const BYTE_ESTIMATE_V00: u32 = core::mem::size_of::<$evt>() as u32; fn to_f32_for_binning_v01(&self) -> f32 { *self as _ } fn scalar_type_name_string() -> String { $sctname.to_string() } + fn byte_estimate(&self) -> u32 { + std::mem::size_of::<$evt>() as u32 + } } - impl PartialOrdEvtA<$evt> for $evt { fn cmp_a(&self, other: &$evt) -> Option { self.partial_cmp(other) @@ -247,13 +258,15 @@ impl EventValueType for f32 { type AggTimeWeightOutputAvg = f32; type IterTy1<'a> = f32; const SERDE_ID: u32 = ::SUB as _; - const BYTE_ESTIMATE_V00: u32 = core::mem::size_of::() as u32; fn to_f32_for_binning_v01(&self) -> f32 { *self as _ } fn scalar_type_name_string() -> String { "f32".to_string() } + fn byte_estimate(&self) -> u32 { + std::mem::size_of::() as u32 + } } impl EventValueType for f64 { @@ -262,13 +275,15 @@ impl EventValueType for f64 { type AggTimeWeightOutputAvg = f64; type IterTy1<'a> = f64; const SERDE_ID: u32 = ::SUB as _; - const BYTE_ESTIMATE_V00: u32 = core::mem::size_of::() as u32; fn to_f32_for_binning_v01(&self) -> f32 { *self as _ } fn scalar_type_name_string() -> String { "f64".to_string() } + fn byte_estimate(&self) -> u32 { + std::mem::size_of::() as u32 + } } impl EventValueType for bool { @@ -277,13 +292,15 @@ impl EventValueType for bool { type AggTimeWeightOutputAvg = f64; type IterTy1<'a> = bool; const SERDE_ID: u32 = ::SUB as _; - const BYTE_ESTIMATE_V00: u32 = core::mem::size_of::() as u32; fn to_f32_for_binning_v01(&self) -> f32 { f32::from(*self) } fn scalar_type_name_string() -> String { "bool".to_string() } + fn byte_estimate(&self) -> u32 { + std::mem::size_of::() as u32 + } } impl EventValueType for String { @@ -292,13 +309,15 @@ impl EventValueType for String { type AggTimeWeightOutputAvg = f64; type IterTy1<'a> = &'a str; const SERDE_ID: u32 = ::SUB as _; - const BYTE_ESTIMATE_V00: u32 = 400; fn to_f32_for_binning_v01(&self) -> f32 { self.len() as _ } fn scalar_type_name_string() -> String { "string".to_string() } + fn byte_estimate(&self) -> u32 { + self.bytes().len() as u32 + } } macro_rules! impl_event_value_type_vec { @@ -310,13 +329,16 @@ macro_rules! impl_event_value_type_vec { type IterTy1<'a> = Vec<$evt>; const SERDE_ID: u32 = as SubFrId>::SUB as _; // TODO must use a more precise number dependent on actual elements - const BYTE_ESTIMATE_V00: u32 = 1200 * core::mem::size_of::() as u32; fn to_f32_for_binning_v01(&self) -> f32 { self.iter().fold(0., |a, x| a + *x as f32) } fn scalar_type_name_string() -> String { $sctname.to_string() } + fn byte_estimate(&self) -> u32 { + self.iter() + .fold(0, |a, x| a + EventValueType::byte_estimate(x)) + } } impl PartialOrdEvtA> for Vec<$evt> { @@ -346,14 +368,15 @@ impl EventValueType for Vec { type AggTimeWeightOutputAvg = f32; type IterTy1<'a> = Vec; const SERDE_ID: u32 = as SubFrId>::SUB as _; - // TODO must use a more precise number dependent on actual elements - const BYTE_ESTIMATE_V00: u32 = 1200 * core::mem::size_of::() as u32; fn to_f32_for_binning_v01(&self) -> f32 { self.iter().fold(0., |a, x| a + f32::from(*x)) } fn scalar_type_name_string() -> String { "bool".to_string() } + fn byte_estimate(&self) -> u32 { + self.len() as u32 * std::mem::size_of::() as u32 + } } impl PartialOrdEvtA> for Vec { @@ -368,14 +391,16 @@ impl EventValueType for Vec { type AggTimeWeightOutputAvg = f32; type IterTy1<'a> = Vec; const SERDE_ID: u32 = as SubFrId>::SUB as _; - // TODO must use a more precise number dependent on actual elements - const BYTE_ESTIMATE_V00: u32 = 1200 * core::mem::size_of::() as u32; fn to_f32_for_binning_v01(&self) -> f32 { self.iter().fold(0., |a, x| a + x.len() as f32) } fn scalar_type_name_string() -> String { "string".to_string() } + fn byte_estimate(&self) -> u32 { + self.iter() + .fold(0, |a, x| a + EventValueType::byte_estimate(x)) + } } impl PartialOrdEvtA> for Vec { @@ -390,14 +415,16 @@ impl EventValueType for Vec { type AggTimeWeightOutputAvg = f32; type IterTy1<'a> = Vec; const SERDE_ID: u32 = as SubFrId>::SUB as _; - // TODO must use a more precise number dependent on actual elements - const BYTE_ESTIMATE_V00: u32 = 1200 * core::mem::size_of::() as u32; fn to_f32_for_binning_v01(&self) -> f32 { self.iter().fold(0., |a, x| a + x.ix() as f32) } fn scalar_type_name_string() -> String { "enum".to_string() } + fn byte_estimate(&self) -> u32 { + self.iter() + .fold(0, |a, x| a + EventValueType::byte_estimate(x)) + } } impl PartialOrdEvtA> for Vec { @@ -484,7 +511,7 @@ mod serde_pulsed_val { where EVT: EventValueType, { - fn deserialize(de: D) -> Result + fn deserialize(_de: D) -> Result where D: Deserializer<'de>, { @@ -586,6 +613,10 @@ where ("values".into(), Box::new(self.vals)), ] } + + fn byte_estimate(&self) -> u32 { + self.len() as u32 * 8 + self.vals.byte_estimate() + } } impl PartialOrdEvtA> for PulsedVal @@ -606,13 +637,15 @@ where type AggTimeWeightOutputAvg = EVT::AggTimeWeightOutputAvg; type IterTy1<'a> = PulsedValIterTy<'a, EVT>; const SERDE_ID: u32 = items_0::subfr::pulsed_subfr(::SUB) as _; - const BYTE_ESTIMATE_V00: u32 = core::mem::size_of::() as u32; fn to_f32_for_binning_v01(&self) -> f32 { self.1.to_f32_for_binning_v01() } fn scalar_type_name_string() -> String { EVT::scalar_type_name_string() } + fn byte_estimate(&self) -> u32 { + 8 + self.1.byte_estimate() + } } #[derive(Debug, Clone)] @@ -663,7 +696,7 @@ where { tss: VecDeque, vals: ::Container, - byte_estimate: u64, + byte_estimate: RefCell>, } mod container_events_serde { @@ -677,10 +710,11 @@ mod container_events_serde { use serde::Deserializer; use serde::Serialize; use serde::Serializer; + use std::cell::RefCell; use std::fmt; use std::marker::PhantomData; - macro_rules! trace_serde { ($($arg:tt)*) => ( if true { eprintln!($($arg)*); }) } + macro_rules! trace_serde { ($($arg:expr),*) => ( if false { eprintln!($($arg),*); }) } impl Serialize for ContainerEvents where @@ -726,8 +760,7 @@ mod container_events_serde { let ret = Self::Value { tss, vals, - // TODO make container recompute byte_estimate - byte_estimate: 0, + byte_estimate: RefCell::new(None), }; Ok(ret) } @@ -756,7 +789,7 @@ mod container_events_serde { let ret = Self::Value { tss: tss.unwrap(), vals: vals.unwrap(), - byte_estimate: 0, + byte_estimate: RefCell::new(None), }; Ok(ret) } @@ -787,7 +820,7 @@ where Self { tss, vals, - byte_estimate: 0, + byte_estimate: RefCell::new(None), } } @@ -799,7 +832,7 @@ where Self { tss: VecDeque::new(), vals: Container::new(), - byte_estimate: 0, + byte_estimate: RefCell::new(None), } } @@ -822,6 +855,7 @@ where pub fn push_back(&mut self, ts: TsNano, val: EVT) { self.tss.push_back(ts); self.vals.push_back(val); + self.byte_estimate = RefCell::new(None); } pub fn iter_zip<'a>(&'a self) -> impl Iterator)> { @@ -835,7 +869,7 @@ where pub fn clear(&mut self) { self.tss.clear(); self.vals.clear(); - self.byte_estimate = 0; + *self.byte_estimate.borrow_mut() = Some(0); } pub fn truncate_front(&mut self, len: usize) { @@ -845,6 +879,16 @@ where self.vals.truncate_front(len); } } + + fn byte_estimate_mut(&self) -> u32 { + if let Some(x) = self.byte_estimate.borrow().clone() { + x + } else { + let byte_est = 8 * self.len() as u32 + self.vals.byte_estimate(); + *self.byte_estimate.borrow_mut() = Some(byte_est); + byte_est + } + } } impl fmt::Debug for ContainerEvents @@ -890,15 +934,6 @@ where } } -impl ByteEstimate for ContainerEvents -where - EVT: EventValueType, -{ - fn byte_estimate(&self) -> u64 { - self.byte_estimate - } -} - impl Empty for ContainerEvents where EVT: EventValueType, @@ -917,6 +952,15 @@ where } } +impl ByteEstimate for ContainerEvents +where + EVT: EventValueType, +{ + fn byte_estimate(&self) -> u32 { + self.byte_estimate_mut() + } +} + pub struct ContainerEventsTakeUpTo<'a, EVT> where EVT: EventValueType, @@ -1169,8 +1213,8 @@ impl ByteEstimate for ContainerEventsCollector where EVT: EventValueType, { - fn byte_estimate(&self) -> u64 { - EVT::BYTE_ESTIMATE_V00 as _ + fn byte_estimate(&self) -> u32 { + self.evs.byte_estimate() } } diff --git a/src/binning/timeweight/timeweight_bins_stream.rs b/src/binning/timeweight/timeweight_bins_stream.rs index ec0c0a8..9fc379b 100644 --- a/src/binning/timeweight/timeweight_bins_stream.rs +++ b/src/binning/timeweight/timeweight_bins_stream.rs @@ -1,30 +1,28 @@ use super::timeweight_bins_lazy::BinnedBinsTimeweightLazy; -use crate::log::*; +use crate::log; use futures_util::Stream; use futures_util::StreamExt; -use items_0::streamitem::sitem_err2_from_string; use items_0::streamitem::LogItem; use items_0::streamitem::Sitemty; -use items_0::timebin::BinnedBinsTimeweightTrait; +use items_0::streamitem::sitem_err2_from_string; use items_0::timebin::BinningggContainerBinsDyn; -use items_0::timebin::BinsBoxed; use netpod::BinnedRange; use netpod::TsNano; -use std::fmt; use std::ops::ControlFlow; use std::pin::Pin; use std::task::Context; use std::task::Poll; -macro_rules! trace_input_container { ($($arg:tt)*) => ( if false { trace!($($arg)*); }) } +macro_rules! trace_input_container { ($($arg:expr),*) => ( if false { log::trace!($($arg),*); }) } -macro_rules! trace_emit { ($($arg:tt)*) => ( if false { trace!($($arg)*); }) } +macro_rules! trace_emit { ($($arg:expr),*) => ( if false { log::trace!($($arg),*); }) } -#[derive(Debug, thiserror::Error)] -#[cstm(name = "BinnedEventsTimeweightDyn")] -pub enum Error { - InnerDynMissing, -} +autoerr::create_error_v1!( + name(Error, "BinnedEventsTimeweightDyn"), + enum variants { + InnerDynMissing, + }, +); type ItemA = Box; type ItemB = Sitemty; @@ -56,10 +54,10 @@ impl BinnedBinsTimeweightStream { item: ItemB, _cx: &mut Context, ) -> ControlFlow::Item>>> { - use items_0::streamitem::RangeCompletableItem::*; - use items_0::streamitem::StreamItem::*; use ControlFlow::*; use Poll::*; + use items_0::streamitem::RangeCompletableItem::*; + use items_0::streamitem::StreamItem::*; match item { Ok(x) => match x { DataItem(x) => match x { @@ -104,9 +102,9 @@ impl BinnedBinsTimeweightStream { _cx: &mut Context, ) -> Poll::Item>> { trace_input_container!("handle_eos"); + use Poll::*; use items_0::streamitem::RangeCompletableItem::*; use items_0::streamitem::StreamItem::*; - use Poll::*; self.state = StreamState::Done; if self.range_complete { self.binned @@ -123,7 +121,8 @@ impl BinnedBinsTimeweightStream { Ready(Some(Ok(DataItem(Data(x))))) } None => { - let item = LogItem::from_node(888, Level::INFO, format!("no bins ready on eos")); + let item = + LogItem::from_node(888, log::Level::INFO, format!("no bins ready on eos")); Ready(Some(Ok(Log(item)))) } } diff --git a/src/binning/timeweight/timeweight_events_dyn.rs b/src/binning/timeweight/timeweight_events_dyn.rs index 2e629ec..45778da 100644 --- a/src/binning/timeweight/timeweight_events_dyn.rs +++ b/src/binning/timeweight/timeweight_events_dyn.rs @@ -6,9 +6,11 @@ use crate::log; use daqbuf_err as err; use futures_util::Stream; use futures_util::StreamExt; -use items_0::streamitem::sitem_data; use items_0::streamitem::LogItem; +use items_0::streamitem::RangeCompletableItem; use items_0::streamitem::Sitemty; +use items_0::streamitem::StreamItem; +use items_0::streamitem::sitem_data; use items_0::timebin::BinnedEventsTimeweightTrait; use items_0::timebin::BinningggContainerBinsDyn; use items_0::timebin::BinningggContainerEventsDyn; @@ -27,7 +29,7 @@ macro_rules! debug_input_container { ($($arg:expr),*) => ( if true { log::debug! macro_rules! debug { ($($arg:expr),*) => ( if true { log::debug!($($arg),*); }) } -macro_rules! trace_input_container { ($($arg:expr),*) => ( if true { log::trace!($($arg),*); }) } +macro_rules! trace_input_container { ($($arg:expr),*) => ( if false { log::trace!($($arg),*); }) } macro_rules! trace_emit { ($($arg:expr),*) => ( if true { log::trace!($($arg),*); }) } @@ -287,10 +289,10 @@ impl BinnedEventsTimeweightStream { item: Sitemty, _cx: &mut Context, ) -> ControlFlow::Item>>> { - use items_0::streamitem::RangeCompletableItem::*; - use items_0::streamitem::StreamItem::*; use ControlFlow::*; use Poll::*; + use items_0::streamitem::RangeCompletableItem::*; + use items_0::streamitem::StreamItem::*; match item { Ok(x) => match x { DataItem(x) => match x { @@ -322,8 +324,8 @@ impl BinnedEventsTimeweightStream { } } - fn test1( - mut self: Pin<&mut Self>, + fn _test1( + self: Pin<&mut Self>, _cx: &mut Context, ) -> Result::Item>>>, Error> { use ControlFlow::*; @@ -339,8 +341,8 @@ impl BinnedEventsTimeweightStream { } } - fn test2( - mut self: Pin<&mut Self>, + fn _test2( + self: Pin<&mut Self>, _cx: &mut Context, ) -> ControlFlow::Item>>, Error>> { use ControlFlow::*; @@ -350,19 +352,15 @@ impl BinnedEventsTimeweightStream { } else if false { Continue(()) } else { - let e = Error::Dummy; + let _e = Error::Dummy; // unfortunately can not use the `?` operator here: // let _ = Err(e)?; Break(Ok(Pending)) } } - fn handle_eos( - mut self: Pin<&mut Self>, - _cx: &mut Context, - ) -> Poll::Item>> { + fn handle_eos(mut self: Pin<&mut Self>, _cx: &mut Context) -> Result<(), err::Error> { debug_input_container!("handle_eos range final {}", self.range_final); - use Poll::*; self.state = StreamState::Remains; if true || self.range_final { self.binned_events @@ -373,7 +371,7 @@ impl BinnedEventsTimeweightStream { .input_done_range_open() .map_err(err::Error::from_string)?; } - Ready(None) + Ok(()) } fn handle_remains( @@ -381,9 +379,9 @@ impl BinnedEventsTimeweightStream { _cx: &mut Context, ) -> Poll::Item>> { debug_input_container!("handle_remains"); + use Poll::*; use items_0::streamitem::RangeCompletableItem::*; use items_0::streamitem::StreamItem::*; - use Poll::*; debug!("handle_remains binner {:?}", self.binned_events); match self .binned_events @@ -409,7 +407,7 @@ impl BinnedEventsTimeweightStream { mut self: Pin<&mut Self>, cx: &mut Context, ) -> ControlFlow::Item>>> { - debug_input_container!("handle_main"); + trace_input_container!("handle_main"); use ControlFlow::*; use Poll::*; let ret = match &self.state { @@ -418,7 +416,10 @@ impl BinnedEventsTimeweightStream { StreamState::Reading => match self.as_mut().inp.poll_next_unpin(cx) { Ready(Some(x)) => self.as_mut().handle_sitemty(x, cx), Ready(None) => { - self.as_mut().handle_eos(cx); + match self.as_mut().handle_eos(cx) { + Ok(()) => {} + Err(e) => return Break(Ready(Some(Err(e)))), + } Continue(()) } Pending => Break(Pending), @@ -436,6 +437,10 @@ impl BinnedEventsTimeweightStream { if let Break(Ready(Some(Err(_)))) = ret { self.state = StreamState::Done; } + if let Break(Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(item)))))) = &ret + { + trace_emit!("emit item len {}", item.len()); + } ret } } diff --git a/src/binning/valuetype.rs b/src/binning/valuetype.rs index 706b26d..a11aa6c 100644 --- a/src/binning/valuetype.rs +++ b/src/binning/valuetype.rs @@ -102,6 +102,10 @@ impl Container for EnumVariantContainer { ("valuestrings".into(), Box::new(self.names)), ] } + + fn byte_estimate(&self) -> u32 { + self.len() as u32 * 24 + } } #[derive(Debug)] @@ -140,11 +144,7 @@ impl<'a> PartialOrdEvtA for EnumVariantRef<'a> { let x = self.ix.partial_cmp(&other.ix()); if let Some(Equal) = x { let x = self.name.partial_cmp(other.name()); - if let Some(Equal) = x { - Some(Equal) - } else { - x - } + if let Some(Equal) = x { Some(Equal) } else { x } } else { x } @@ -157,23 +157,25 @@ impl EventValueType for EnumVariant { type AggTimeWeightOutputAvg = f32; type IterTy1<'a> = EnumVariantRef<'a>; const SERDE_ID: u32 = ::SUB as u32; - const BYTE_ESTIMATE_V00: u32 = 40; fn to_f32_for_binning_v01(&self) -> f32 { self.ix() as _ } fn scalar_type_name_string() -> String { "enum".to_string() } + fn byte_estimate(&self) -> u32 { + 60 + } } impl PartialOrdEvtA for netpod::UnsupEvt { - fn cmp_a(&self, other: &netpod::UnsupEvt) -> Option { + fn cmp_a(&self, _other: &netpod::UnsupEvt) -> Option { todo!() } } impl PartialOrdEvtA> for Vec { - fn cmp_a(&self, other: &Vec) -> Option { + fn cmp_a(&self, _other: &Vec) -> Option { todo!() } } @@ -186,7 +188,7 @@ impl AggregatorTimeWeight for UnsupEvtAgg { todo!() } - fn ingest(&mut self, dt: DtNano, bl: DtNano, val: netpod::UnsupEvt) { + fn ingest(&mut self, _dt: DtNano, _bl: DtNano, _val: netpod::UnsupEvt) { todo!() } @@ -196,7 +198,7 @@ impl AggregatorTimeWeight for UnsupEvtAgg { fn result_and_reset_for_new_bin( &mut self, - filled_width_fraction: f32, + _filled_width_fraction: f32, ) -> ::AggTimeWeightOutputAvg { todo!() } @@ -207,7 +209,7 @@ impl AggregatorTimeWeight> for UnsupEvtAgg { todo!() } - fn ingest(&mut self, dt: DtNano, bl: DtNano, val: Vec) { + fn ingest(&mut self, _dt: DtNano, _bl: DtNano, _val: Vec) { todo!() } @@ -217,7 +219,7 @@ impl AggregatorTimeWeight> for UnsupEvtAgg { fn result_and_reset_for_new_bin( &mut self, - filled_width_fraction: f32, + _filled_width_fraction: f32, ) -> as EventValueType>::AggTimeWeightOutputAvg { todo!() } @@ -229,13 +231,15 @@ impl EventValueType for netpod::UnsupEvt { type AggTimeWeightOutputAvg = f32; type IterTy1<'a> = netpod::UnsupEvt; const SERDE_ID: u32 = ::SUB as u32; - const BYTE_ESTIMATE_V00: u32 = 4; fn to_f32_for_binning_v01(&self) -> f32 { 0. } fn scalar_type_name_string() -> String { "unsupevt".to_string() } + fn byte_estimate(&self) -> u32 { + 345 + } } impl EventValueType for Vec { @@ -244,11 +248,13 @@ impl EventValueType for Vec { type AggTimeWeightOutputAvg = f32; type IterTy1<'a> = Vec; const SERDE_ID: u32 = ::SUB as u32; - const BYTE_ESTIMATE_V00: u32 = 4; fn to_f32_for_binning_v01(&self) -> f32 { 0. } fn scalar_type_name_string() -> String { "unsupevt".to_string() } + fn byte_estimate(&self) -> u32 { + self.len() as u32 * 345 + } } diff --git a/src/channelevents.rs b/src/channelevents.rs index 737366d..5d8038a 100644 --- a/src/channelevents.rs +++ b/src/channelevents.rs @@ -4,6 +4,12 @@ use crate::framable::FrameType; use crate::log::*; use core::ops::Range; use daqbuf_err as err; +use items_0::AsAnyMut; +use items_0::AsAnyRef; +use items_0::Empty; +use items_0::Extendable; +use items_0::TypeName; +use items_0::WithLen; use items_0::apitypes::ToUserFacingApiType; use items_0::apitypes::UserApiType; use items_0::collect_s::CollectableDyn; @@ -18,12 +24,6 @@ use items_0::merge::DrainIntoNewResult; use items_0::merge::MergeableTy; use items_0::streamitem::ITEMS_2_CHANNEL_EVENTS_FRAME_TYPE_ID; use items_0::timebin::BinningggContainerEventsDyn; -use items_0::AsAnyMut; -use items_0::AsAnyRef; -use items_0::Empty; -use items_0::Extendable; -use items_0::TypeName; -use items_0::WithLen; use netpod::TsNano; use serde::Deserialize; use serde::Serialize; @@ -70,7 +70,7 @@ impl ConnStatusEvent { } impl ByteEstimate for ConnStatusEvent { - fn byte_estimate(&self) -> u64 { + fn byte_estimate(&self) -> u32 { // TODO magic number, but maybe good enough 32 } @@ -147,7 +147,7 @@ impl ChannelStatusEvent { } impl ByteEstimate for ChannelStatusEvent { - fn byte_estimate(&self) -> u64 { + fn byte_estimate(&self) -> u32 { // TODO magic number, but maybe good enough 32 } @@ -221,26 +221,26 @@ mod serde_channel_events { use crate::binning::container_events::PulsedVal; use crate::channelevents::ConnStatusEvent; use crate::log::*; + use items_0::subfr::SubFrId; use items_0::subfr::is_container_events; use items_0::subfr::is_pulsed_subfr; use items_0::subfr::is_vec_subfr; use items_0::subfr::subfr_scalar_type; - use items_0::subfr::SubFrId; use items_0::timebin::BinningggContainerEventsDyn; use netpod::EnumVariant; + use serde::Deserialize; + use serde::Deserializer; + use serde::Serialize; + use serde::Serializer; use serde::de; use serde::de::EnumAccess; use serde::de::VariantAccess; use serde::de::Visitor; use serde::ser::SerializeSeq; - use serde::Deserialize; - use serde::Deserializer; - use serde::Serialize; - use serde::Serializer; use std::cell::RefCell; use std::fmt; - macro_rules! trace_serde { ($($arg:expr),*) => ( if true { trace!($($arg),*); }) } + macro_rules! trace_serde { ($($arg:expr),*) => ( if false { trace!($($arg),*); }) } type C01 = ContainerEvents; type C02 = ContainerEvents>; @@ -440,7 +440,7 @@ mod serde_channel_events { ret } - fn visit_map(self, map: A) -> Result + fn visit_map(self, _map: A) -> Result where A: de::MapAccess<'de>, { @@ -595,19 +595,19 @@ mod test_channel_events_serde { use crate::framable::Framable; use crate::inmem::InMemoryFrame; use crate::log::*; + use bincode::DefaultOptions; use bincode::config::FixintEncoding; use bincode::config::LittleEndian; use bincode::config::RejectTrailing; use bincode::config::WithOtherEndian; use bincode::config::WithOtherIntEncoding; use bincode::config::WithOtherTrailing; - use bincode::DefaultOptions; - use items_0::bincode; - use items_0::streamitem::sitem_data; - use items_0::streamitem::Sitemty; - use items_0::timebin::BinningggContainerEventsDyn; use items_0::Appendable; use items_0::Empty; + use items_0::bincode; + use items_0::streamitem::Sitemty; + use items_0::streamitem::sitem_data; + use items_0::timebin::BinningggContainerEventsDyn; use netpod::TsNano; use netpod::UnsupEvt; use serde::Deserialize; @@ -758,7 +758,7 @@ impl WithLen for ChannelEvents { } impl ByteEstimate for ChannelEvents { - fn byte_estimate(&self) -> u64 { + fn byte_estimate(&self) -> u32 { match self { ChannelEvents::Events(k) => k.byte_estimate(), ChannelEvents::Status(k) => match k { @@ -838,11 +838,7 @@ impl MergeableTy for ChannelEvents { ChannelEvents::Events(k) => k.find_lowest_index_gt(ts), ChannelEvents::Status(k) => { if let Some(k) = k { - if k.ts > ts { - Some(0) - } else { - None - } + if k.ts > ts { Some(0) } else { None } } else { None } @@ -855,11 +851,7 @@ impl MergeableTy for ChannelEvents { ChannelEvents::Events(k) => k.find_lowest_index_ge(ts), ChannelEvents::Status(k) => { if let Some(k) = k { - if k.ts >= ts { - Some(0) - } else { - None - } + if k.ts >= ts { Some(0) } else { None } } else { None } @@ -872,11 +864,7 @@ impl MergeableTy for ChannelEvents { ChannelEvents::Events(k) => k.find_highest_index_lt(ts), ChannelEvents::Status(k) => { if let Some(k) = k { - if k.ts < ts { - Some(0) - } else { - None - } + if k.ts < ts { Some(0) } else { None } } else { None } @@ -981,7 +969,7 @@ impl WithLen for ChannelEventsCollector { } impl ByteEstimate for ChannelEventsCollector { - fn byte_estimate(&self) -> u64 { + fn byte_estimate(&self) -> u32 { self.coll.as_ref().map_or(0, |x| x.byte_estimate()) } } @@ -1044,7 +1032,10 @@ impl ToUserFacingApiType for ChannelEvents { fn into_user_facing_api_type(self) -> Box { match self { ChannelEvents::Events(x) => x.into_user_facing_api_type_box(), - ChannelEvents::Status(x) => Box::new(items_0::apitypes::EmptyStruct::new()), + ChannelEvents::Status(_) => { + // TODO + Box::new(items_0::apitypes::EmptyStruct::new()) + } } } diff --git a/src/eventfull.rs b/src/eventfull.rs index 0de82f3..25b2bb7 100644 --- a/src/eventfull.rs +++ b/src/eventfull.rs @@ -183,8 +183,8 @@ impl WithLen for EventFull { } impl ByteEstimate for EventFull { - fn byte_estimate(&self) -> u64 { - self.len() as u64 * (64 + self.entry_payload_max) + fn byte_estimate(&self) -> u32 { + self.len() as u32 * (64 + self.entry_payload_max as u32) } } diff --git a/src/framable.rs b/src/framable.rs index af24370..e1fdc1a 100644 --- a/src/framable.rs +++ b/src/framable.rs @@ -25,13 +25,14 @@ pub const INMEM_FRAME_HEAD: usize = 20; pub const INMEM_FRAME_FOOT: usize = 4; pub const INMEM_FRAME_MAGIC: u32 = 0xc6c3b73d; -#[derive(Debug, thiserror::Error)] -#[cstm(name = "ItemFramable")] -pub enum Error { - Msg(String), - DummyError, - Frame(#[from] crate::frame::Error), -} +autoerr::create_error_v1!( + name(Error, "ItemFramable"), + enum variants { + Msg(String), + DummyError, + Frame(#[from] crate::frame::Error), + }, +); struct ErrMsg(E) where diff --git a/src/frame.rs b/src/frame.rs index c584db5..0fc7391 100644 --- a/src/frame.rs +++ b/src/frame.rs @@ -5,25 +5,26 @@ use crate::framable::INMEM_FRAME_HEAD; use crate::framable::INMEM_FRAME_MAGIC; use crate::inmem::InMemoryFrame; use crate::log::*; +use bincode::DefaultOptions; use bincode::config::FixintEncoding; use bincode::config::LittleEndian; use bincode::config::RejectTrailing; use bincode::config::WithOtherEndian; use bincode::config::WithOtherIntEncoding; use bincode::config::WithOtherTrailing; -use bincode::DefaultOptions; use bytes::BufMut; use bytes::BytesMut; use core::fmt; use daqbuf_err as err; use items_0::bincode; -use items_0::streamitem::LogItem; -use items_0::streamitem::StatsItem; use items_0::streamitem::ERROR_FRAME_TYPE_ID; use items_0::streamitem::LOG_FRAME_TYPE_ID; +use items_0::streamitem::LogItem; use items_0::streamitem::RANGE_COMPLETE_FRAME_TYPE_ID; use items_0::streamitem::STATS_FRAME_TYPE_ID; +use items_0::streamitem::StatsItem; use items_0::streamitem::TERM_FRAME_TYPE_ID; +use netpod::log; use serde::Serialize; use std::any; use std::io; @@ -115,6 +116,7 @@ where rmp_serde::to_vec_named(&item).map_err(Error::from) } +#[allow(unused)] fn msgpack_erased_to_vec(item: T) -> Result, Error> where T: erased_serde::Serialize, @@ -149,6 +151,7 @@ where }) } +#[allow(unused)] fn postcard_erased_to_vec(item: T) -> Result, Error> where T: erased_serde::Serialize + fmt::Debug, @@ -210,6 +213,7 @@ where }) } +#[allow(unused)] fn json_erased_to_vec(item: T) -> Result, Error> where T: erased_serde::Serialize + fmt::Debug, @@ -260,7 +264,9 @@ where } } -pub fn encode_erased_to_vec(item: T) -> Result, Error> +// TODO check if better option +#[allow(unused)] +fn encode_erased_to_vec_old(item: T) -> Result, Error> where T: erased_serde::Serialize + fmt::Debug, { @@ -296,6 +302,9 @@ where if enc.len() > u32::MAX as usize { return Err(Error::TooLongPayload(enc.len())); } + if enc.len() > 1024 * 1024 * 20 { + log::debug!("make_frame_2 over weight {} kB", enc.len() / 1024); + } let mut h = crc32fast::Hasher::new(); h.update(&enc); let payload_crc = h.finalize(); diff --git a/src/inmem.rs b/src/inmem.rs index d48cfe7..09de499 100644 --- a/src/inmem.rs +++ b/src/inmem.rs @@ -1,19 +1,24 @@ use crate::framable::INMEM_FRAME_FOOT; use crate::framable::INMEM_FRAME_HEAD; use crate::framable::INMEM_FRAME_MAGIC; -use crate::log::*; +use crate::log; use bytes::Bytes; use std::fmt; -#[derive(Debug, thiserror::Error)] -#[cstm(name = "InMemoryFrameError")] -pub enum Error { - LessThanHeader, - TryFromSlice(#[from] std::array::TryFromSliceError), - BadMagic(u32), - HugeFrame(u32), - BadCrc, -} +macro_rules! error { ($($arg:expr),*) => ( if true { log::error!($($arg),*); }) } + +macro_rules! debug { ($($arg:expr),*) => ( if true { log::debug!($($arg),*); }) } + +autoerr::create_error_v1!( + name(Error, "InMemoryFrameError"), + enum variants { + LessThanHeader, + TryFromSlice(#[from] std::array::TryFromSliceError), + BadMagic(u32), + HugeFrame(u32), + BadCrc, + }, +); pub enum ParseResult { NotEnoughData(usize), @@ -56,6 +61,7 @@ impl InMemoryFrame { if magic != INMEM_FRAME_MAGIC { return Err(Error::BadMagic(magic)); } + debug!("frame len {:10}", len); if len > 1024 * 1024 * 50 { return Err(Error::HugeFrame(len)); } diff --git a/src/merger.rs b/src/merger.rs index 897f5f7..317d1a8 100644 --- a/src/merger.rs +++ b/src/merger.rs @@ -4,18 +4,17 @@ mod test; use crate::log::*; use futures_util::Stream; use futures_util::StreamExt; -use items_0::container::ByteEstimate; use items_0::merge::DrainIntoDstResult; use items_0::merge::DrainIntoNewResult; use items_0::merge::MergeableTy; use items_0::on_sitemty_data; -use items_0::streamitem::sitem_data; -use items_0::streamitem::sitem_err2_from_string; use items_0::streamitem::LogItem; use items_0::streamitem::RangeCompletableItem; use items_0::streamitem::SitemErrTy; use items_0::streamitem::Sitemty; use items_0::streamitem::StreamItem; +use items_0::streamitem::sitem_data; +use items_0::streamitem::sitem_err2_from_string; use netpod::TsNano; use std::collections::VecDeque; use std::fmt; @@ -24,26 +23,27 @@ use std::pin::Pin; use std::task::Context; use std::task::Poll; -const OUT_MAX_BYTES: u64 = 1024 * 200; +const OUT_MAX_BYTES: u32 = 1024 * 1024 * 20; const DO_DETECT_NON_MONO: bool = true; -macro_rules! trace2 { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) } +macro_rules! trace2 { ($($arg:expr),*) => ( if false { trace!($($arg),*); } ) } -macro_rules! trace3 { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) } +macro_rules! trace3 { ($($arg:expr),*) => ( if false { trace!($($arg),*); } ) } -macro_rules! trace4 { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) } +macro_rules! trace4 { ($($arg:expr),*) => ( if false { trace!($($arg),*); } ) } -macro_rules! trace_emit { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) } +macro_rules! trace_emit { ($($arg:expr),*) => ( if false { trace!($($arg),*); } ) } -#[derive(Debug, thiserror::Error)] -#[cstm(name = "MergerError")] -pub enum Error { - NoPendingButMissing, - Input(SitemErrTy), - ShouldFindTsMin, - ItemShouldHaveTsMax, - PartialPathDrainedAllItems, -} +autoerr::create_error_v1!( + name(Error, "MergerError"), + enum variants { + NoPendingButMissing, + Input(SitemErrTy), + ShouldFindTsMin, + ItemShouldHaveTsMax, + PartialPathDrainedAllItems, + }, +); type MergeInp = Pin> + Send>>; @@ -382,15 +382,11 @@ where || last_emit { if o.len() > 2 * self.out_max_len { - debug!( - "MERGER OVERLENGTH ITEM {} vs {}", - o.len(), - self.out_max_len - ); + debug!("over length item {} vs {}", o.len(), self.out_max_len); } if o.byte_estimate() > 2 * OUT_MAX_BYTES { debug!( - "MERGER OVERWEIGHT ITEM {} vs {}", + "over weight item {} vs {}", o.byte_estimate(), OUT_MAX_BYTES ); diff --git a/src/testgen/events_gen.rs b/src/testgen/events_gen.rs index 710a1c7..0b16947 100644 --- a/src/testgen/events_gen.rs +++ b/src/testgen/events_gen.rs @@ -1,14 +1,6 @@ use crate::binning::container_events::ContainerEvents; -use netpod::range::evrange::NanoRange; use netpod::TsNano; -use std::pin::Pin; - -fn boxed_conts(inp: S) -> Pin::Item> + Send>> -where - S: Iterator + Send + 'static, -{ - Box::pin(inp) -} +use netpod::range::evrange::NanoRange; pub fn new_events_gen_dim0_f32_v00(range: NanoRange) -> impl Iterator> { let dt = 1000 * 1000 * 10;