diff --git a/src/binning/container_bins.rs b/src/binning/container_bins.rs index cb143bf..19d7ea1 100644 --- a/src/binning/container_bins.rs +++ b/src/binning/container_bins.rs @@ -3,8 +3,6 @@ use super::container_events::Container; use super::container_events::EventValueType; use crate::apitypes::ContainerBinsApi; use crate::binning::container::bins::BinAggedContainer; -use crate::offsets::ts_offs_from_abs; -use crate::offsets::ts_offs_from_abs_with_anchor; use core::fmt; use daqbuf_err as err; use err::thiserror; @@ -12,7 +10,6 @@ use err::ThisError; use items_0::apitypes::ToUserFacingApiType; use items_0::collect_s::CollectableDyn; use items_0::collect_s::CollectedDyn; -use items_0::collect_s::ToJsonValue; use items_0::container::ByteEstimate; use items_0::merge::DrainIntoDstResult; use items_0::merge::DrainIntoNewResult; @@ -25,7 +22,6 @@ use items_0::AsAnyRef; use items_0::TypeName; use items_0::WithLen; use netpod::TsNano; -use serde::Serialize; use std::any; use std::collections::VecDeque; use std::mem; @@ -447,42 +443,6 @@ where } } -#[derive(Debug, Serialize)] -struct ContainerBinsCollectorOutputUser -where - EVT: EventValueType, - BVT: BinAggedType, -{ - #[serde(rename = "tsAnchor")] - ts_anchor_sec: u64, - #[serde(rename = "ts1Ms")] - ts1_off_ms: VecDeque, - #[serde(rename = "ts2Ms")] - ts2_off_ms: VecDeque, - #[serde(rename = "ts1Ns")] - ts1_off_ns: VecDeque, - #[serde(rename = "ts2Ns")] - ts2_off_ns: VecDeque, - #[serde(rename = "counts")] - counts: VecDeque, - #[serde(rename = "mins")] - mins: VecDeque, - #[serde(rename = "maxs")] - maxs: VecDeque, - #[serde(rename = "avgs")] - aggs: VecDeque, - // #[serde(rename = "rangeFinal", default, skip_serializing_if = "is_false")] - // range_final: bool, - // #[serde(rename = "timedOut", default, skip_serializing_if = "is_false")] - // timed_out: bool, - // #[serde(rename = "missingBins", default, skip_serializing_if = "CmpZero::is_zero")] - // missing_bins: u32, - // #[serde(rename = "continueAt", default, skip_serializing_if = "Option::is_none")] - // continue_at: Option, - // #[serde(rename = "finishedAt", default, skip_serializing_if = "Option::is_none")] - // finished_at: Option, -} - impl ToUserFacingApiType for ContainerBinsCollectorOutput where EVT: EventValueType, @@ -560,10 +520,11 @@ where fn ingest(&mut self, src: &mut dyn CollectableDyn) { if let Some(src) = src.as_any_mut().downcast_mut::>() { MergeableTy::drain_into(src, &mut self.bins, 0..src.len()); - // src.drain_into(&mut self.bins, 0..src.len()); } else { - let srcn = src.type_name(); - panic!("wrong src type {srcn}"); + // TODO let trait return Result to avoid potential panic + let src_name = src.type_name(); + let self_name = any::type_name::(); + panic!("wrong src type self_name {self_name} src_name {src_name}"); } } @@ -655,6 +616,19 @@ where fn boxed_into_collectable_box(self: Box) -> Box { Box::new(*self) } + + fn fix_numerics(&mut self) { + if let Some(bins) = self.as_any_mut().downcast_mut::>() { + for ((min, max), agg) in bins + .mins + .iter_mut() + .zip(bins.maxs.iter_mut()) + .zip(bins.aggs.iter_mut()) + { + *agg = agg.min(*max).max(*min) + } + } + } } impl MergeableTy for ContainerBins diff --git a/src/binning/container_events.rs b/src/binning/container_events.rs index d0ab9d3..4d0b59b 100644 --- a/src/binning/container_events.rs +++ b/src/binning/container_events.rs @@ -67,15 +67,14 @@ pub trait PartialOrdEvtA { fn cmp_a(&self, other: &EVT) -> Option; } -pub trait EventValueType: - fmt::Debug + Clone + PartialOrd + Send + Unpin + 'static + Serialize + for<'a> Deserialize<'a> -{ +pub trait EventValueType: fmt::Debug + Clone + PartialOrd + Send + Unpin + 'static { type Container: Container; type AggregatorTimeWeight: AggregatorTimeWeight; type AggTimeWeightOutputAvg: AggTimeWeightOutputAvg; type IterTy1<'a>: fmt::Debug + Clone + PartialOrdEvtA + Into; const SERDE_ID: u32; const BYTE_ESTIMATE_V00: u32; + fn to_f32_for_binning_v01(&self) -> f32; } impl Container for VecDeque @@ -159,6 +158,9 @@ macro_rules! impl_event_value_type { type IterTy1<'a> = $evt; const SERDE_ID: u32 = <$evt as SubFrId>::SUB as _; const BYTE_ESTIMATE_V00: u32 = core::mem::size_of::<$evt>() as u32; + fn to_f32_for_binning_v01(&self) -> f32 { + *self as _ + } } impl PartialOrdEvtA<$evt> for $evt { @@ -211,6 +213,9 @@ impl EventValueType for f32 { type IterTy1<'a> = f32; const SERDE_ID: u32 = ::SUB as _; const BYTE_ESTIMATE_V00: u32 = core::mem::size_of::() as u32; + fn to_f32_for_binning_v01(&self) -> f32 { + *self as _ + } } impl EventValueType for f64 { @@ -220,6 +225,9 @@ impl EventValueType for f64 { type IterTy1<'a> = f64; const SERDE_ID: u32 = ::SUB as _; const BYTE_ESTIMATE_V00: u32 = core::mem::size_of::() as u32; + fn to_f32_for_binning_v01(&self) -> f32 { + *self as _ + } } impl EventValueType for bool { @@ -229,6 +237,9 @@ impl EventValueType for bool { type IterTy1<'a> = bool; const SERDE_ID: u32 = ::SUB as _; const BYTE_ESTIMATE_V00: u32 = core::mem::size_of::() as u32; + fn to_f32_for_binning_v01(&self) -> f32 { + f32::from(*self) + } } impl EventValueType for String { @@ -238,6 +249,9 @@ impl EventValueType for String { type IterTy1<'a> = &'a str; const SERDE_ID: u32 = ::SUB as _; const BYTE_ESTIMATE_V00: u32 = 400; + fn to_f32_for_binning_v01(&self) -> f32 { + self.len() as _ + } } macro_rules! impl_event_value_type_vec { @@ -250,6 +264,9 @@ macro_rules! impl_event_value_type_vec { const SERDE_ID: u32 = as SubFrId>::SUB as _; // TODO must use a more precise number dependent on actual elements const BYTE_ESTIMATE_V00: u32 = 1200 * core::mem::size_of::() as u32; + fn to_f32_for_binning_v01(&self) -> f32 { + self.iter().fold(0., |a, x| a + *x as f32) + } } impl PartialOrdEvtA> for Vec<$evt> { @@ -270,9 +287,65 @@ impl_event_value_type_vec!(i32); impl_event_value_type_vec!(i64); impl_event_value_type_vec!(f32); impl_event_value_type_vec!(f64); -impl_event_value_type_vec!(bool); -impl_event_value_type_vec!(String); -impl_event_value_type_vec!(EnumVariant); +// impl_event_value_type_vec!(String); +// impl_event_value_type_vec!(EnumVariant); + +impl EventValueType for Vec { + type Container = VecDeque; + type AggregatorTimeWeight = AggregatorVecNumeric; + type AggTimeWeightOutputAvg = f32; + type IterTy1<'a> = Vec; + const SERDE_ID: u32 = as SubFrId>::SUB as _; + // TODO must use a more precise number dependent on actual elements + const BYTE_ESTIMATE_V00: u32 = 1200 * core::mem::size_of::() as u32; + fn to_f32_for_binning_v01(&self) -> f32 { + self.iter().fold(0., |a, x| a + f32::from(*x)) + } +} + +impl PartialOrdEvtA> for Vec { + fn cmp_a(&self, other: &Vec) -> Option { + self.partial_cmp(other) + } +} + +impl EventValueType for Vec { + type Container = VecDeque; + type AggregatorTimeWeight = AggregatorVecNumeric; + type AggTimeWeightOutputAvg = f32; + type IterTy1<'a> = Vec; + const SERDE_ID: u32 = as SubFrId>::SUB as _; + // TODO must use a more precise number dependent on actual elements + const BYTE_ESTIMATE_V00: u32 = 1200 * core::mem::size_of::() as u32; + fn to_f32_for_binning_v01(&self) -> f32 { + self.iter().fold(0., |a, x| a + x.len() as f32) + } +} + +impl PartialOrdEvtA> for Vec { + fn cmp_a(&self, other: &Vec) -> Option { + self.partial_cmp(other) + } +} + +impl EventValueType for Vec { + type Container = VecDeque; + type AggregatorTimeWeight = AggregatorVecNumeric; + type AggTimeWeightOutputAvg = f32; + type IterTy1<'a> = Vec; + const SERDE_ID: u32 = as SubFrId>::SUB as _; + // TODO must use a more precise number dependent on actual elements + const BYTE_ESTIMATE_V00: u32 = 1200 * core::mem::size_of::() as u32; + fn to_f32_for_binning_v01(&self) -> f32 { + self.iter().fold(0., |a, x| a + x.ix() as f32) + } +} + +impl PartialOrdEvtA> for Vec { + fn cmp_a(&self, other: &Vec) -> Option { + self.partial_cmp(other) + } +} #[derive(Debug)] pub struct PulsedValIterTy<'a, EVT> @@ -462,6 +535,9 @@ where type IterTy1<'a> = PulsedValIterTy<'a, EVT>; const SERDE_ID: u32 = items_0::subfr::pulsed_subfr(::SUB) as _; const BYTE_ESTIMATE_V00: u32 = core::mem::size_of::() as u32; + fn to_f32_for_binning_v01(&self) -> f32 { + self.1.to_f32_for_binning_v01() + } } #[derive(Debug, Clone)] @@ -1150,6 +1226,16 @@ where fn as_collectable_dyn_mut(&mut self) -> &mut dyn CollectableDyn { self } + + fn to_f32_for_binning_v01(&self) -> Box { + let mut ret = ContainerEvents::new(); + for r in self.iter_zip() { + // TODO can be expensive + let v: EVT = r.1.into(); + ret.push_back(r.0, v.to_f32_for_binning_v01()); + } + Box::new(ret) + } } #[cfg(test)] diff --git a/src/binning/timeweight/timeweight_bins.rs b/src/binning/timeweight/timeweight_bins.rs index e3d25cd..8e2156d 100644 --- a/src/binning/timeweight/timeweight_bins.rs +++ b/src/binning/timeweight/timeweight_bins.rs @@ -15,9 +15,12 @@ macro_rules! trace_init { ($($arg:tt)*) => ( if true { trace!($($arg)*); }) } macro_rules! trace_ingest_bin { ($($arg:tt)*) => ( if false { trace!($($arg)*); }) } -#[derive(Debug, thiserror::Error)] -#[cstm(name = "BinBinsTimeweight")] -pub enum Error {} +autoerr::create_error_v1!( + name(Error, "BinBinsTimeweight"), + enum variants { + Logic, + }, +); #[derive(Debug)] pub struct BinnedBinsTimeweight diff --git a/src/binning/valuetype.rs b/src/binning/valuetype.rs index 99778ac..66c56c7 100644 --- a/src/binning/valuetype.rs +++ b/src/binning/valuetype.rs @@ -142,6 +142,9 @@ impl EventValueType for EnumVariant { type IterTy1<'a> = EnumVariantRef<'a>; const SERDE_ID: u32 = ::SUB as u32; const BYTE_ESTIMATE_V00: u32 = 40; + fn to_f32_for_binning_v01(&self) -> f32 { + self.ix() as _ + } } impl PartialOrdEvtA for netpod::UnsupEvt { @@ -208,6 +211,9 @@ impl EventValueType for netpod::UnsupEvt { type IterTy1<'a> = netpod::UnsupEvt; const SERDE_ID: u32 = ::SUB as u32; const BYTE_ESTIMATE_V00: u32 = 4; + fn to_f32_for_binning_v01(&self) -> f32 { + todo!() + } } impl EventValueType for Vec { @@ -217,4 +223,7 @@ impl EventValueType for Vec { type IterTy1<'a> = Vec; const SERDE_ID: u32 = ::SUB as u32; const BYTE_ESTIMATE_V00: u32 = 4; + fn to_f32_for_binning_v01(&self) -> f32 { + todo!() + } } diff --git a/src/testgen.rs b/src/testgen.rs index a7b875c..15d0604 100644 --- a/src/testgen.rs +++ b/src/testgen.rs @@ -1,33 +1 @@ pub mod events_gen; - -use crate::binning::container_events::ContainerEvents; -use items_0::Appendable; -use items_0::Empty; -use netpod::TsNano; - -#[allow(unused)] -fn xorshift32(state: u32) -> u32 { - let mut x = state; - x ^= x << 13; - x ^= x >> 17; - x ^= x << 5; - x -} - -pub fn make_some_boxed_d0_f32( - n: usize, - t0: u64, - tstep: u64, - tmask: u64, - seed: u32, -) -> ContainerEvents { - let mut vstate = seed; - let mut events = ContainerEvents::empty(); - for i in 0..n { - vstate = xorshift32(vstate); - let ts = t0 + i as u64 * tstep + (vstate as u64 & tmask); - let value = i as f32 * 100. + vstate as f32 / u32::MAX as f32 / 10.; - events.push(TsNano::from_ns(ts), value); - } - events -}