Support vec string, vec enum, clamp rounded for f32 output

This commit is contained in:
Dominik Werder
2024-12-11 16:59:44 +01:00
parent d6b97b00c0
commit ee42744a7a
5 changed files with 124 additions and 84 deletions

View File

@@ -3,8 +3,6 @@ use super::container_events::Container;
use super::container_events::EventValueType;
use crate::apitypes::ContainerBinsApi;
use crate::binning::container::bins::BinAggedContainer;
use crate::offsets::ts_offs_from_abs;
use crate::offsets::ts_offs_from_abs_with_anchor;
use core::fmt;
use daqbuf_err as err;
use err::thiserror;
@@ -12,7 +10,6 @@ use err::ThisError;
use items_0::apitypes::ToUserFacingApiType;
use items_0::collect_s::CollectableDyn;
use items_0::collect_s::CollectedDyn;
use items_0::collect_s::ToJsonValue;
use items_0::container::ByteEstimate;
use items_0::merge::DrainIntoDstResult;
use items_0::merge::DrainIntoNewResult;
@@ -25,7 +22,6 @@ use items_0::AsAnyRef;
use items_0::TypeName;
use items_0::WithLen;
use netpod::TsNano;
use serde::Serialize;
use std::any;
use std::collections::VecDeque;
use std::mem;
@@ -447,42 +443,6 @@ where
}
}
#[derive(Debug, Serialize)]
struct ContainerBinsCollectorOutputUser<EVT, BVT>
where
EVT: EventValueType,
BVT: BinAggedType,
{
#[serde(rename = "tsAnchor")]
ts_anchor_sec: u64,
#[serde(rename = "ts1Ms")]
ts1_off_ms: VecDeque<u64>,
#[serde(rename = "ts2Ms")]
ts2_off_ms: VecDeque<u64>,
#[serde(rename = "ts1Ns")]
ts1_off_ns: VecDeque<u64>,
#[serde(rename = "ts2Ns")]
ts2_off_ns: VecDeque<u64>,
#[serde(rename = "counts")]
counts: VecDeque<u64>,
#[serde(rename = "mins")]
mins: VecDeque<EVT>,
#[serde(rename = "maxs")]
maxs: VecDeque<EVT>,
#[serde(rename = "avgs")]
aggs: VecDeque<BVT>,
// #[serde(rename = "rangeFinal", default, skip_serializing_if = "is_false")]
// range_final: bool,
// #[serde(rename = "timedOut", default, skip_serializing_if = "is_false")]
// timed_out: bool,
// #[serde(rename = "missingBins", default, skip_serializing_if = "CmpZero::is_zero")]
// missing_bins: u32,
// #[serde(rename = "continueAt", default, skip_serializing_if = "Option::is_none")]
// continue_at: Option<IsoDateTime>,
// #[serde(rename = "finishedAt", default, skip_serializing_if = "Option::is_none")]
// finished_at: Option<IsoDateTime>,
}
impl<EVT, BVT> ToUserFacingApiType for ContainerBinsCollectorOutput<EVT, BVT>
where
EVT: EventValueType,
@@ -560,10 +520,11 @@ where
fn ingest(&mut self, src: &mut dyn CollectableDyn) {
if let Some(src) = src.as_any_mut().downcast_mut::<ContainerBins<EVT, BVT>>() {
MergeableTy::drain_into(src, &mut self.bins, 0..src.len());
// src.drain_into(&mut self.bins, 0..src.len());
} else {
let srcn = src.type_name();
panic!("wrong src type {srcn}");
// TODO let trait return Result to avoid potential panic
let src_name = src.type_name();
let self_name = any::type_name::<Self>();
panic!("wrong src type self_name {self_name} src_name {src_name}");
}
}
@@ -655,6 +616,19 @@ where
fn boxed_into_collectable_box(self: Box<Self>) -> Box<dyn CollectableDyn> {
Box::new(*self)
}
fn fix_numerics(&mut self) {
if let Some(bins) = self.as_any_mut().downcast_mut::<ContainerBins<f32, f32>>() {
for ((min, max), agg) in bins
.mins
.iter_mut()
.zip(bins.maxs.iter_mut())
.zip(bins.aggs.iter_mut())
{
*agg = agg.min(*max).max(*min)
}
}
}
}
impl<EVT, BVT> MergeableTy for ContainerBins<EVT, BVT>

View File

@@ -67,15 +67,14 @@ pub trait PartialOrdEvtA<EVT> {
fn cmp_a(&self, other: &EVT) -> Option<std::cmp::Ordering>;
}
pub trait EventValueType:
fmt::Debug + Clone + PartialOrd + Send + Unpin + 'static + Serialize + for<'a> Deserialize<'a>
{
pub trait EventValueType: fmt::Debug + Clone + PartialOrd + Send + Unpin + 'static {
type Container: Container<Self>;
type AggregatorTimeWeight: AggregatorTimeWeight<Self>;
type AggTimeWeightOutputAvg: AggTimeWeightOutputAvg;
type IterTy1<'a>: fmt::Debug + Clone + PartialOrdEvtA<Self> + Into<Self>;
const SERDE_ID: u32;
const BYTE_ESTIMATE_V00: u32;
fn to_f32_for_binning_v01(&self) -> f32;
}
impl<EVT> Container<EVT> for VecDeque<EVT>
@@ -159,6 +158,9 @@ macro_rules! impl_event_value_type {
type IterTy1<'a> = $evt;
const SERDE_ID: u32 = <$evt as SubFrId>::SUB as _;
const BYTE_ESTIMATE_V00: u32 = core::mem::size_of::<$evt>() as u32;
fn to_f32_for_binning_v01(&self) -> f32 {
*self as _
}
}
impl PartialOrdEvtA<$evt> for $evt {
@@ -211,6 +213,9 @@ impl EventValueType for f32 {
type IterTy1<'a> = f32;
const SERDE_ID: u32 = <f32 as SubFrId>::SUB as _;
const BYTE_ESTIMATE_V00: u32 = core::mem::size_of::<Self>() as u32;
fn to_f32_for_binning_v01(&self) -> f32 {
*self as _
}
}
impl EventValueType for f64 {
@@ -220,6 +225,9 @@ impl EventValueType for f64 {
type IterTy1<'a> = f64;
const SERDE_ID: u32 = <f64 as SubFrId>::SUB as _;
const BYTE_ESTIMATE_V00: u32 = core::mem::size_of::<Self>() as u32;
fn to_f32_for_binning_v01(&self) -> f32 {
*self as _
}
}
impl EventValueType for bool {
@@ -229,6 +237,9 @@ impl EventValueType for bool {
type IterTy1<'a> = bool;
const SERDE_ID: u32 = <bool as SubFrId>::SUB as _;
const BYTE_ESTIMATE_V00: u32 = core::mem::size_of::<Self>() as u32;
fn to_f32_for_binning_v01(&self) -> f32 {
f32::from(*self)
}
}
impl EventValueType for String {
@@ -238,6 +249,9 @@ impl EventValueType for String {
type IterTy1<'a> = &'a str;
const SERDE_ID: u32 = <String as SubFrId>::SUB as _;
const BYTE_ESTIMATE_V00: u32 = 400;
fn to_f32_for_binning_v01(&self) -> f32 {
self.len() as _
}
}
macro_rules! impl_event_value_type_vec {
@@ -250,6 +264,9 @@ macro_rules! impl_event_value_type_vec {
const SERDE_ID: u32 = <Vec<$evt> as SubFrId>::SUB as _;
// TODO must use a more precise number dependent on actual elements
const BYTE_ESTIMATE_V00: u32 = 1200 * core::mem::size_of::<Self>() as u32;
fn to_f32_for_binning_v01(&self) -> f32 {
self.iter().fold(0., |a, x| a + *x as f32)
}
}
impl PartialOrdEvtA<Vec<$evt>> for Vec<$evt> {
@@ -270,9 +287,65 @@ impl_event_value_type_vec!(i32);
impl_event_value_type_vec!(i64);
impl_event_value_type_vec!(f32);
impl_event_value_type_vec!(f64);
impl_event_value_type_vec!(bool);
impl_event_value_type_vec!(String);
impl_event_value_type_vec!(EnumVariant);
// impl_event_value_type_vec!(String);
// impl_event_value_type_vec!(EnumVariant);
impl EventValueType for Vec<bool> {
type Container = VecDeque<Self>;
type AggregatorTimeWeight = AggregatorVecNumeric;
type AggTimeWeightOutputAvg = f32;
type IterTy1<'a> = Vec<bool>;
const SERDE_ID: u32 = <Vec<bool> as SubFrId>::SUB as _;
// TODO must use a more precise number dependent on actual elements
const BYTE_ESTIMATE_V00: u32 = 1200 * core::mem::size_of::<Self>() as u32;
fn to_f32_for_binning_v01(&self) -> f32 {
self.iter().fold(0., |a, x| a + f32::from(*x))
}
}
impl PartialOrdEvtA<Vec<bool>> for Vec<bool> {
fn cmp_a(&self, other: &Vec<bool>) -> Option<core::cmp::Ordering> {
self.partial_cmp(other)
}
}
impl EventValueType for Vec<String> {
type Container = VecDeque<Self>;
type AggregatorTimeWeight = AggregatorVecNumeric;
type AggTimeWeightOutputAvg = f32;
type IterTy1<'a> = Vec<String>;
const SERDE_ID: u32 = <Vec<String> as SubFrId>::SUB as _;
// TODO must use a more precise number dependent on actual elements
const BYTE_ESTIMATE_V00: u32 = 1200 * core::mem::size_of::<Self>() as u32;
fn to_f32_for_binning_v01(&self) -> f32 {
self.iter().fold(0., |a, x| a + x.len() as f32)
}
}
impl PartialOrdEvtA<Vec<String>> for Vec<String> {
fn cmp_a(&self, other: &Vec<String>) -> Option<core::cmp::Ordering> {
self.partial_cmp(other)
}
}
impl EventValueType for Vec<EnumVariant> {
type Container = VecDeque<Self>;
type AggregatorTimeWeight = AggregatorVecNumeric;
type AggTimeWeightOutputAvg = f32;
type IterTy1<'a> = Vec<EnumVariant>;
const SERDE_ID: u32 = <Vec<EnumVariant> as SubFrId>::SUB as _;
// TODO must use a more precise number dependent on actual elements
const BYTE_ESTIMATE_V00: u32 = 1200 * core::mem::size_of::<Self>() as u32;
fn to_f32_for_binning_v01(&self) -> f32 {
self.iter().fold(0., |a, x| a + x.ix() as f32)
}
}
impl PartialOrdEvtA<Vec<EnumVariant>> for Vec<EnumVariant> {
fn cmp_a(&self, other: &Vec<EnumVariant>) -> Option<core::cmp::Ordering> {
self.partial_cmp(other)
}
}
#[derive(Debug)]
pub struct PulsedValIterTy<'a, EVT>
@@ -462,6 +535,9 @@ where
type IterTy1<'a> = PulsedValIterTy<'a, EVT>;
const SERDE_ID: u32 = items_0::subfr::pulsed_subfr(<EVT as SubFrId>::SUB) as _;
const BYTE_ESTIMATE_V00: u32 = core::mem::size_of::<EVT>() as u32;
fn to_f32_for_binning_v01(&self) -> f32 {
self.1.to_f32_for_binning_v01()
}
}
#[derive(Debug, Clone)]
@@ -1150,6 +1226,16 @@ where
fn as_collectable_dyn_mut(&mut self) -> &mut dyn CollectableDyn {
self
}
fn to_f32_for_binning_v01(&self) -> Box<dyn BinningggContainerEventsDyn> {
let mut ret = ContainerEvents::new();
for r in self.iter_zip() {
// TODO can be expensive
let v: EVT = r.1.into();
ret.push_back(r.0, v.to_f32_for_binning_v01());
}
Box::new(ret)
}
}
#[cfg(test)]

View File

@@ -15,9 +15,12 @@ macro_rules! trace_init { ($($arg:tt)*) => ( if true { trace!($($arg)*); }) }
macro_rules! trace_ingest_bin { ($($arg:tt)*) => ( if false { trace!($($arg)*); }) }
#[derive(Debug, thiserror::Error)]
#[cstm(name = "BinBinsTimeweight")]
pub enum Error {}
autoerr::create_error_v1!(
name(Error, "BinBinsTimeweight"),
enum variants {
Logic,
},
);
#[derive(Debug)]
pub struct BinnedBinsTimeweight<EVT, BVT>

View File

@@ -142,6 +142,9 @@ impl EventValueType for EnumVariant {
type IterTy1<'a> = EnumVariantRef<'a>;
const SERDE_ID: u32 = <Self as SubFrId>::SUB as u32;
const BYTE_ESTIMATE_V00: u32 = 40;
fn to_f32_for_binning_v01(&self) -> f32 {
self.ix() as _
}
}
impl PartialOrdEvtA<netpod::UnsupEvt> for netpod::UnsupEvt {
@@ -208,6 +211,9 @@ impl EventValueType for netpod::UnsupEvt {
type IterTy1<'a> = netpod::UnsupEvt;
const SERDE_ID: u32 = <Self as SubFrId>::SUB as u32;
const BYTE_ESTIMATE_V00: u32 = 4;
fn to_f32_for_binning_v01(&self) -> f32 {
todo!()
}
}
impl EventValueType for Vec<netpod::UnsupEvt> {
@@ -217,4 +223,7 @@ impl EventValueType for Vec<netpod::UnsupEvt> {
type IterTy1<'a> = Vec<netpod::UnsupEvt>;
const SERDE_ID: u32 = <Self as SubFrId>::SUB as u32;
const BYTE_ESTIMATE_V00: u32 = 4;
fn to_f32_for_binning_v01(&self) -> f32 {
todo!()
}
}

View File

@@ -1,33 +1 @@
pub mod events_gen;
use crate::binning::container_events::ContainerEvents;
use items_0::Appendable;
use items_0::Empty;
use netpod::TsNano;
#[allow(unused)]
fn xorshift32(state: u32) -> u32 {
let mut x = state;
x ^= x << 13;
x ^= x >> 17;
x ^= x << 5;
x
}
pub fn make_some_boxed_d0_f32(
n: usize,
t0: u64,
tstep: u64,
tmask: u64,
seed: u32,
) -> ContainerEvents<f32> {
let mut vstate = seed;
let mut events = ContainerEvents::empty();
for i in 0..n {
vstate = xorshift32(vstate);
let ts = t0 + i as u64 * tstep + (vstate as u64 & tmask);
let value = i as f32 * 100. + vstate as f32 / u32::MAX as f32 / 10.;
events.push(TsNano::from_ns(ts), value);
}
events
}