From 7ed9cf358fa35130604b81bb70235e732d218175 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Wed, 4 Dec 2024 12:14:44 +0100 Subject: [PATCH] upport framing for container and output format --- src/apitypes.rs | 156 ++++++++++--- src/binning/aggregator.rs | 37 +++ src/binning/container_bins.rs | 115 +++------- src/binning/container_events.rs | 260 ++++++++++++++-------- src/binning/timeweight/timeweight_bins.rs | 20 +- src/binning/valuetype.rs | 24 +- src/channelevents.rs | 210 ++++++++--------- src/empty.rs | 52 +++++ src/eventsdim0.rs | 12 +- src/eventsdim0enum.rs | 4 +- src/eventsdim1.rs | 6 +- src/inmem.rs | 70 ++++++ src/jsonbytes.rs | 13 ++ src/merger.rs | 15 +- 14 files changed, 647 insertions(+), 347 deletions(-) diff --git a/src/apitypes.rs b/src/apitypes.rs index ecd316c..f7e72e5 100644 --- a/src/apitypes.rs +++ b/src/apitypes.rs @@ -1,4 +1,5 @@ use crate::binning::container::bins::BinAggedType; +use crate::binning::container_events::Container; use crate::binning::container_events::EventValueType; use crate::offsets::ts_offs_from_abs; use crate::offsets::ts_offs_from_abs_with_anchor; @@ -7,6 +8,7 @@ use items_0::collect_s::ToCborValue; use items_0::collect_s::ToJsonValue; use netpod::TsNano; use serde::Serialize; +use std::collections::BTreeMap; use std::collections::VecDeque; use std::fmt; @@ -15,7 +17,7 @@ pub struct ContainerEventsApi where EVT: EventValueType, { - pub tss: VecDeque, + pub tss: VecDeque, pub values: EVT::Container, #[serde(skip_serializing_if = "netpod::is_false")] pub range_final: bool, @@ -39,9 +41,21 @@ impl ToCborValue for ContainerEventsApi where EVT: EventValueType, { - fn to_cbor_value(&self) -> Result { - let val = ciborium::value::Value::serialized(self).unwrap(); - Ok(val) + fn into_fields(self) -> Vec<(String, Box)> { + let tss: Vec<_> = self.tss.into_iter().map(|x| x.ns()).collect(); + let mut ret = self.values.into_user_facing_fields(); + ret.push(("tss".into(), Box::new(tss))); + if self.range_final { + ret.push(("rangeFinal".into(), Box::new(true))); + } + if self.timed_out { + ret.push(("timedOut".into(), Box::new(true))); + } + ret + } + + fn into_fields_box(self: Box) -> Vec<(String, Box)> { + ToCborValue::into_fields(*self) } } @@ -49,13 +63,46 @@ impl ToJsonValue for ContainerEventsApi where EVT: EventValueType, { - fn to_json_value(&self) -> Result { - let ret = serde_json::to_value(self); + fn into_fields(self) -> Vec<(String, Box)> { + let mut ret = self.values.into_user_facing_fields_json(); + let (ts_anch, ts_ms, ts_ns) = ts_offs_from_abs(&self.tss); + ret.push(("tsAnchor".into(), Box::new(ts_anch))); + ret.push(("tsMs".into(), Box::new(ts_ms))); + ret.push(("tsNs".into(), Box::new(ts_ns))); + if self.range_final { + ret.push(("rangeFinal".into(), Box::new(true))); + } + if self.timed_out { + ret.push(("timedOut".into(), Box::new(true))); + } ret } + + fn into_fields_box(self: Box) -> Vec<(String, Box)> { + ToJsonValue::into_fields(*self) + } } -impl UserApiType for ContainerEventsApi where EVT: EventValueType {} +impl UserApiType for ContainerEventsApi +where + EVT: EventValueType, +{ + fn into_serializable(self: Box) -> Box { + let mut map = BTreeMap::new(); + for (k, v) in ToCborValue::into_fields_box(self) { + map.insert(k, v); + } + Box::new(map) + } + + fn into_serializable_json(self: Box) -> Box { + let mut map = BTreeMap::new(); + for (k, v) in ToJsonValue::into_fields_box(self) { + map.insert(k, v); + } + Box::new(map) + } +} #[derive(Serialize)] pub struct ContainerBinsApi @@ -66,8 +113,8 @@ where pub ts1s: VecDeque, pub ts2s: VecDeque, pub cnts: VecDeque, - pub mins: VecDeque, - pub maxs: VecDeque, + pub mins: ::Container, + pub maxs: ::Container, pub aggs: VecDeque, pub fnls: VecDeque, } @@ -90,11 +137,40 @@ where EVT: EventValueType, BVT: BinAggedType, { - fn to_cbor_value(&self) -> Result { - // let val = ciborium::value::Value::serialized(self).unwrap(); - // Ok(val) - let e = ciborium::value::Error::Custom("binned data as cbor is not yet available".into()); - Err(e) + fn into_fields(self) -> Vec<(String, Box)> { + let mut ret = Vec::<(String, Box)>::new(); + // let mut ret = self.aggs.into_user_facing_fields_json(); + ret.push(("ts1s".into(), Box::new(self.ts1s))); + ret.push(("ts2s".into(), Box::new(self.ts2s))); + ret.push(("counts".into(), Box::new(self.cnts))); + { + let fields = self.mins.into_user_facing_fields(); + for (k, v) in fields { + let k = if k == "values" { + "mins".to_string() + } else { + format!("mins_{}", k) + }; + ret.push((k, v)); + } + } + { + let fields = self.maxs.into_user_facing_fields(); + for (k, v) in fields { + let k = if k == "values" { + "maxs".to_string() + } else { + format!("maxs_{}", k) + }; + ret.push((k, v)); + } + } + ret.push(("avgs".into(), Box::new(self.aggs))); + ret + } + + fn into_fields_box(self: Box) -> Vec<(String, Box)> { + ToCborValue::into_fields(*self) } } @@ -103,26 +179,25 @@ where EVT: EventValueType, BVT: BinAggedType, { - fn to_json_value(&self) -> Result { - use serde_json::json; - use serde_json::Value; - // let ret = serde_json::to_value(self); - // ret - let (ts_anch, ts1ms, ts1ns) = ts_offs_from_abs(&self.ts1s); - let (ts2ms, ts2ns) = ts_offs_from_abs_with_anchor(ts_anch, &self.ts2s); + fn into_fields(self) -> Vec<(String, Box)> { + let mut ret = Vec::<(String, Box)>::new(); + // let mut ret = self.aggs.into_user_facing_fields_json(); + let (ts_anch, ts1_ms, ts1_ns) = ts_offs_from_abs(&self.ts1s); + let (ts2_ms, ts2_ns) = ts_offs_from_abs_with_anchor(ts_anch, &self.ts2s); + ret.push(("tsAnchor".into(), Box::new(ts_anch))); + ret.push(("ts1Ms".into(), Box::new(ts1_ms))); + ret.push(("ts1Ns".into(), Box::new(ts1_ns))); + ret.push(("ts2Ms".into(), Box::new(ts2_ms))); + ret.push(("ts2Ns".into(), Box::new(ts2_ns))); + ret.push(("counts".into(), Box::new(self.cnts))); + ret.push(("mins".into(), Box::new(self.mins))); + ret.push(("maxs".into(), Box::new(self.maxs))); + ret.push(("avgs".into(), Box::new(self.aggs))); + ret + } - let ret = json!({ - "tsAnchor": ts_anch, - "ts1Ms": ts1ms, - "ts2Ms": ts2ms, - "ts1Ns": ts1ns, - "ts2Ns": ts2ns, - "counts": self.cnts, - "mins": self.mins, - "maxs": self.maxs, - "avgs": self.aggs, - }); - Ok(ret) + fn into_fields_box(self: Box) -> Vec<(String, Box)> { + ToJsonValue::into_fields(*self) } } @@ -131,4 +206,19 @@ where EVT: EventValueType, BVT: BinAggedType, { + fn into_serializable(self: Box) -> Box { + let mut map = BTreeMap::new(); + for (k, v) in ToCborValue::into_fields_box(self) { + map.insert(k, v); + } + Box::new(map) + } + + fn into_serializable_json(self: Box) -> Box { + let mut map = BTreeMap::new(); + for (k, v) in ToJsonValue::into_fields_box(self) { + map.insert(k, v); + } + Box::new(map) + } } diff --git a/src/binning/aggregator.rs b/src/binning/aggregator.rs index c619654..1803ab6 100644 --- a/src/binning/aggregator.rs +++ b/src/binning/aggregator.rs @@ -2,7 +2,9 @@ pub mod agg_bins; use super::container::bins::BinAggedType; use super::container_events::EventValueType; +use super::container_events::PulsedVal; use core::fmt; +use items_0::subfr::SubFrId; use netpod::log::*; use netpod::DtNano; use netpod::EnumVariant; @@ -373,3 +375,38 @@ impl AggregatorTimeWeight> for AggregatorVecNumeric { sum / filled_width_fraction } } + +#[derive(Debug)] +pub struct AggregatorPulsedNumeric +where + EVT: EventValueType, +{ + evt_agg: EVT::AggregatorTimeWeight, +} + +impl AggregatorTimeWeight> for AggregatorPulsedNumeric +where + EVT: EventValueType + SubFrId, +{ + fn new() -> Self { + Self { + evt_agg: EVT::AggregatorTimeWeight::new(), + } + } + + fn ingest(&mut self, dt: DtNano, bl: DtNano, val: PulsedVal) { + self.evt_agg.ingest(dt, bl, val.1); + } + + fn reset_for_new_bin(&mut self) { + self.evt_agg.reset_for_new_bin(); + } + + fn result_and_reset_for_new_bin( + &mut self, + filled_width_fraction: f32, + ) -> as EventValueType>::AggTimeWeightOutputAvg { + self.evt_agg + .result_and_reset_for_new_bin(filled_width_fraction) + } +} diff --git a/src/binning/container_bins.rs b/src/binning/container_bins.rs index 6fcc9c9..c3d9669 100644 --- a/src/binning/container_bins.rs +++ b/src/binning/container_bins.rs @@ -1,4 +1,5 @@ use super::container::bins::BinAggedType; +use super::container_events::Container; use super::container_events::EventValueType; use crate::apitypes::ContainerBinsApi; use crate::offsets::ts_offs_from_abs; @@ -42,10 +43,10 @@ where pub ts1: TsNano, pub ts2: TsNano, pub cnt: u64, - pub min: &'a EVT, - pub max: &'a EVT, + pub min: EVT::IterTy1<'a>, + pub max: EVT::IterTy1<'a>, pub agg: &'a BVT, - pub lst: &'a EVT, + pub lst: EVT::IterTy1<'a>, pub fnl: bool, } @@ -67,6 +68,7 @@ where type Item = BinRef<'a, EVT, BVT>; fn next(&mut self) -> Option { + use crate::binning::container_events::Container; if self.ix < self.bins.len() && self.ix < self.len { let b = &self.bins; let i = self.ix; @@ -75,10 +77,10 @@ where ts1: b.ts1s[i], ts2: b.ts2s[i], cnt: b.cnts[i], - min: &b.mins[i], - max: &b.maxs[i], + min: b.mins.get_iter_ty_1(i).unwrap(), + max: b.maxs.get_iter_ty_1(i).unwrap(), agg: &b.aggs[i], - lst: &b.lsts[i], + lst: b.lsts.get_iter_ty_1(i).unwrap(), fnl: b.fnls[i], }; Some(ret) @@ -97,10 +99,10 @@ where ts1s: VecDeque, ts2s: VecDeque, cnts: VecDeque, - mins: VecDeque, - maxs: VecDeque, + mins: ::Container, + maxs: ::Container, aggs: VecDeque, - lsts: VecDeque, + lsts: ::Container, fnls: VecDeque, } @@ -145,28 +147,6 @@ where EVT: EventValueType, BVT: BinAggedType, { - pub fn from_constituents( - ts1s: VecDeque, - ts2s: VecDeque, - cnts: VecDeque, - mins: VecDeque, - maxs: VecDeque, - aggs: VecDeque, - lsts: VecDeque, - fnls: VecDeque, - ) -> Self { - Self { - ts1s, - ts2s, - cnts, - mins, - maxs, - aggs, - lsts, - fnls, - } - } - pub fn type_name() -> &'static str { any::type_name::() } @@ -176,10 +156,10 @@ where ts1s: VecDeque::new(), ts2s: VecDeque::new(), cnts: VecDeque::new(), - mins: VecDeque::new(), - maxs: VecDeque::new(), + mins: <::Container as Container>::new(), + maxs: <::Container as Container>::new(), aggs: VecDeque::new(), - lsts: VecDeque::new(), + lsts: <::Container as Container>::new(), fnls: VecDeque::new(), } } @@ -228,20 +208,20 @@ where self.cnts.iter() } - pub fn mins_iter(&self) -> std::collections::vec_deque::Iter { - self.mins.iter() + pub fn mins_iter(&self) -> impl Iterator> { + self.mins.iter_ty_1() } - pub fn maxs_iter(&self) -> std::collections::vec_deque::Iter { - self.maxs.iter() + pub fn maxs_iter(&self) -> impl Iterator> { + self.maxs.iter_ty_1() } pub fn aggs_iter(&self) -> std::collections::vec_deque::Iter { self.aggs.iter() } - pub fn lsts_iter(&self) -> std::collections::vec_deque::Iter { - self.lsts.iter() + pub fn lsts_iter(&self) -> impl Iterator> { + self.lsts.iter_ty_1() } pub fn fnls_iter(&self) -> std::collections::vec_deque::Iter { @@ -262,13 +242,13 @@ where >, std::collections::vec_deque::Iter, >, - std::collections::vec_deque::Iter, + impl Iterator>, >, - std::collections::vec_deque::Iter, + impl Iterator>, >, std::collections::vec_deque::Iter, >, - std::collections::vec_deque::Iter, + impl Iterator>, >, std::collections::vec_deque::Iter, > { @@ -487,40 +467,12 @@ where // finished_at: Option, } -impl ToJsonValue for ContainerBinsCollectorOutput -where - EVT: EventValueType, - BVT: BinAggedType, -{ - fn to_json_value(&self) -> Result { - let bins = &self.bins; - let (ts_anch, ts1ms, ts1ns) = ts_offs_from_abs(&bins.ts1s); - let (ts2ms, ts2ns) = ts_offs_from_abs_with_anchor(ts_anch, &bins.ts2s); - let counts = bins.cnts.clone(); - let mins = bins.mins.clone(); - let maxs = bins.maxs.clone(); - let aggs = bins.aggs.clone(); - let val = ContainerBinsCollectorOutputUser:: { - ts_anchor_sec: ts_anch, - ts1_off_ms: ts1ms, - ts2_off_ms: ts2ms, - ts1_off_ns: ts1ns, - ts2_off_ns: ts2ns, - counts, - mins, - maxs, - aggs, - }; - serde_json::to_value(&val) - } -} - impl ToUserFacingApiType for ContainerBinsCollectorOutput where EVT: EventValueType, BVT: BinAggedType, { - fn to_user_facing_api_type(self) -> Box { + fn into_user_facing_api_type(self) -> Box { let ret = ContainerBinsApi:: { ts1s: self.bins.ts1s, ts2s: self.bins.ts2s, @@ -533,8 +485,8 @@ where Box::new(ret) } - fn to_user_facing_api_type_box(self: Box) -> Box { - (*self).to_user_facing_api_type() + fn into_user_facing_api_type_box(self: Box) -> Box { + (*self).into_user_facing_api_type() } } @@ -661,10 +613,10 @@ where dst.ts1s.extend(self.ts1s.drain(range.clone())); dst.ts2s.extend(self.ts2s.drain(range.clone())); dst.cnts.extend(self.cnts.drain(range.clone())); - dst.mins.extend(self.mins.drain(range.clone())); - dst.maxs.extend(self.maxs.drain(range.clone())); + self.mins.drain_into(&mut dst.mins, range.clone()); + self.maxs.drain_into(&mut dst.maxs, range.clone()); dst.aggs.extend(self.aggs.drain(range.clone())); - dst.lsts.extend(self.lsts.drain(range.clone())); + self.lsts.drain_into(&mut dst.lsts, range.clone()); dst.fnls.extend(self.fnls.drain(range.clone())); } else { let styn = any::type_name::(); @@ -683,14 +635,7 @@ where Box::new(ret) } - fn fix_numerics(&mut self) { - for ((_min, _max), _avg) in self - .mins - .iter_mut() - .zip(self.maxs.iter_mut()) - .zip(self.aggs.iter_mut()) - {} - } + fn fix_numerics(&mut self) {} } pub struct ContainerBinsTakeUpTo<'a, EVT, BVT> diff --git a/src/binning/container_events.rs b/src/binning/container_events.rs index ba6b1b9..ab67f53 100644 --- a/src/binning/container_events.rs +++ b/src/binning/container_events.rs @@ -1,10 +1,12 @@ use super::aggregator::AggTimeWeightOutputAvg; use super::aggregator::AggregatorNumeric; +use super::aggregator::AggregatorPulsedNumeric; use super::aggregator::AggregatorTimeWeight; use super::aggregator::AggregatorVecNumeric; use super::timeweight::timeweight_events_dyn::BinnedEventsTimeweightDynbox; use crate::apitypes::ContainerEventsApi; use crate::log::*; +use crate::offsets::pulse_offs_from_abs; use core::fmt; use core::ops::Range; use daqbuf_err as err; @@ -16,8 +18,6 @@ use items_0::collect_s::CollectableDyn; use items_0::collect_s::CollectedDyn; use items_0::collect_s::CollectorDyn; use items_0::collect_s::CollectorTy; -use items_0::collect_s::ToCborValue; -use items_0::collect_s::ToJsonValue; use items_0::container::ByteEstimate; use items_0::merge::DrainIntoDstResult; use items_0::merge::DrainIntoNewDynResult; @@ -55,10 +55,11 @@ where { fn new() -> Self; fn push_back(&mut self, val: EVT); - fn pop_front(&mut self) -> Option; fn get_iter_ty_1(&self, pos: usize) -> Option>; fn iter_ty_1(&self) -> impl Iterator>; fn drain_into(&mut self, dst: &mut Self, range: Range); + fn into_user_facing_fields(self) -> Vec<(String, Box)>; + fn into_user_facing_fields_json(self) -> Vec<(String, Box)>; } pub trait PartialOrdEvtA { @@ -89,10 +90,6 @@ where self.push_back(val); } - fn pop_front(&mut self) -> Option { - self.pop_front() - } - fn get_iter_ty_1(&self, pos: usize) -> Option> { self.get(pos).map(|x| x.clone()) } @@ -104,6 +101,14 @@ where fn drain_into(&mut self, dst: &mut Self, range: Range) { dst.extend(self.drain(range)); } + + fn into_user_facing_fields(self) -> Vec<(String, Box)> { + vec![("values".into(), Box::new(self))] + } + + fn into_user_facing_fields_json(self) -> Vec<(String, Box)> { + vec![("values".into(), Box::new(self))] + } } impl Container for VecDeque { @@ -115,10 +120,6 @@ impl Container for VecDeque { self.push_back(val); } - fn pop_front(&mut self) -> Option { - self.pop_front() - } - fn get_iter_ty_1(&self, pos: usize) -> Option<&str> { self.get(pos).map(|x| x.as_str()) } @@ -130,6 +131,14 @@ impl Container for VecDeque { fn drain_into(&mut self, dst: &mut Self, range: Range) { dst.extend(self.drain(range)) } + + fn into_user_facing_fields(self) -> Vec<(String, Box)> { + vec![("values".into(), Box::new(self))] + } + + fn into_user_facing_fields_json(self) -> Vec<(String, Box)> { + vec![("values".into(), Box::new(self))] + } } macro_rules! impl_event_value_type { @@ -139,7 +148,7 @@ macro_rules! impl_event_value_type { type AggregatorTimeWeight = AggregatorNumeric; type AggTimeWeightOutputAvg = f64; type IterTy1<'a> = $evt; - const SERDE_ID: u32 = <$evt as SubFrId>::SUB; + const SERDE_ID: u32 = <$evt as SubFrId>::SUB as _; const BYTE_ESTIMATE_V00: u32 = core::mem::size_of::<$evt>() as u32; } @@ -191,7 +200,7 @@ impl EventValueType for f32 { type AggregatorTimeWeight = AggregatorNumeric; type AggTimeWeightOutputAvg = f32; type IterTy1<'a> = f32; - const SERDE_ID: u32 = ::SUB; + const SERDE_ID: u32 = ::SUB as _; const BYTE_ESTIMATE_V00: u32 = core::mem::size_of::() as u32; } @@ -200,7 +209,7 @@ impl EventValueType for f64 { type AggregatorTimeWeight = AggregatorNumeric; type AggTimeWeightOutputAvg = f64; type IterTy1<'a> = f64; - const SERDE_ID: u32 = ::SUB; + const SERDE_ID: u32 = ::SUB as _; const BYTE_ESTIMATE_V00: u32 = core::mem::size_of::() as u32; } @@ -209,7 +218,7 @@ impl EventValueType for bool { type AggregatorTimeWeight = AggregatorNumeric; type AggTimeWeightOutputAvg = f64; type IterTy1<'a> = bool; - const SERDE_ID: u32 = ::SUB; + const SERDE_ID: u32 = ::SUB as _; const BYTE_ESTIMATE_V00: u32 = core::mem::size_of::() as u32; } @@ -218,7 +227,7 @@ impl EventValueType for String { type AggregatorTimeWeight = AggregatorNumeric; type AggTimeWeightOutputAvg = f64; type IterTy1<'a> = &'a str; - const SERDE_ID: u32 = ::SUB; + const SERDE_ID: u32 = ::SUB as _; const BYTE_ESTIMATE_V00: u32 = 400; } @@ -229,7 +238,7 @@ macro_rules! impl_event_value_type_vec { type AggregatorTimeWeight = AggregatorVecNumeric; type AggTimeWeightOutputAvg = f32; type IterTy1<'a> = Vec<$evt>; - const SERDE_ID: u32 = as SubFrId>::SUB; + const SERDE_ID: u32 = as SubFrId>::SUB as _; // TODO must use a more precise number dependent on actual elements const BYTE_ESTIMATE_V00: u32 = 1200 * core::mem::size_of::() as u32; } @@ -256,8 +265,57 @@ impl_event_value_type_vec!(bool); impl_event_value_type_vec!(String); impl_event_value_type_vec!(EnumVariant); -#[derive(Debug, Clone, PartialEq, PartialOrd, Serialize)] -pub struct PulsedVal(EVT) +#[derive(Debug)] +pub struct PulsedValIterTy<'a, EVT> +where + EVT: EventValueType, +{ + pulse: u64, + evt: EVT::IterTy1<'a>, +} + +impl<'a, EVT> Clone for PulsedValIterTy<'a, EVT> +where + EVT: EventValueType + SubFrId, +{ + fn clone(&self) -> Self { + Self { + pulse: self.pulse, + evt: self.evt.clone(), + } + } +} + +impl<'a, EVT> PartialOrdEvtA> for PulsedValIterTy<'a, EVT> +where + EVT: EventValueType + SubFrId, +{ + fn cmp_a(&self, other: &PulsedVal) -> Option { + use std::cmp::Ordering; + match self.pulse.cmp(&other.0) { + Ordering::Less => Some(Ordering::Less), + Ordering::Greater => Some(Ordering::Greater), + Ordering::Equal => match self.evt.cmp_a(&other.1) { + Some(Ordering::Less) => Some(Ordering::Less), + Some(Ordering::Greater) => Some(Ordering::Greater), + Some(Ordering::Equal) => Some(Ordering::Equal), + None => None, + }, + } + } +} + +impl<'a, EVT> From> for PulsedVal +where + EVT: EventValueType, +{ + fn from(value: PulsedValIterTy<'a, EVT>) -> Self { + Self(value.pulse, value.evt.into()) + } +} + +#[derive(Debug, Clone, PartialEq, Serialize)] +pub struct PulsedVal(pub u64, pub EVT) where EVT: EventValueType; @@ -270,6 +328,15 @@ where } } +impl PartialOrd for PulsedVal +where + EVT: EventValueType, +{ + fn partial_cmp(&self, other: &Self) -> Option { + self.1.partial_cmp(&other.1) + } +} + mod serde_pulsed_val { use super::EventValueType; use super::PulsedVal; @@ -303,14 +370,18 @@ where EVT: EventValueType, { fn preview<'a>(&'a self) -> Box { - todo!() + let ret = items_0::vecpreview::PreviewCell { + a: self.pulses.front(), + b: self.pulses.back(), + }; + Box::new(ret) } } impl Container> for VecDequePulsed where EVT: EventValueType, - PulsedVal: EventValueType, + for<'a> PulsedVal: EventValueType = PulsedValIterTy<'a, EVT>>, { fn new() -> Self { Self { @@ -320,48 +391,69 @@ where } fn push_back(&mut self, val: PulsedVal) { - todo!() - } - - fn pop_front(&mut self) -> Option> { - todo!() + self.pulses.push_back(val.0); + self.vals.push_back(val.1); } fn get_iter_ty_1(&self, pos: usize) -> Option< as EventValueType>::IterTy1<'_>> { - todo!() + if let (Some(&pulse), Some(val)) = (self.pulses.get(pos), self.vals.get_iter_ty_1(pos)) { + let x = PulsedValIterTy { pulse, evt: val }; + Some(x) + } else { + None + } } fn iter_ty_1(&self) -> impl Iterator as EventValueType>::IterTy1<'_>> { - todo!(); - self.vals.iter_ty_1().map(|x| todo!()) + self.pulses + .iter() + .map(|&x| x) + .zip(self.vals.iter_ty_1()) + .map(|(pulse, evt)| PulsedValIterTy { pulse, evt }) } fn drain_into(&mut self, dst: &mut Self, range: Range) { dst.pulses.extend(self.pulses.drain(range.clone())); self.vals.drain_into(&mut dst.vals, range.clone()); } + + fn into_user_facing_fields(self) -> Vec<(String, Box)> { + vec![ + ("pulses".into(), Box::new(self.pulses)), + ("values".into(), Box::new(self.vals)), + ] + } + + fn into_user_facing_fields_json(self) -> Vec<(String, Box)> { + let (pulses_anch, pulses_offs) = pulse_offs_from_abs(&self.pulses); + vec![ + ("pulseAnchor".into(), Box::new(pulses_anch)), + ("pulseOff".into(), Box::new(pulses_offs)), + ("values".into(), Box::new(self.vals)), + ] + } } -macro_rules! impl_pulse_evt { - ($evt:ty) => { - impl EventValueType for PulsedVal<$evt> { - type Container = VecDequePulsed<$evt>; - type AggregatorTimeWeight = AggregatorNumeric; - type AggTimeWeightOutputAvg = f64; - type IterTy1<'a> = PulsedVal<$evt>; - const SERDE_ID: u32 = <$evt as SubFrId>::SUB; - const BYTE_ESTIMATE_V00: u32 = core::mem::size_of::<$evt>() as u32; - } - - impl PartialOrdEvtA> for PulsedVal<$evt> { - fn cmp_a(&self, other: &PulsedVal<$evt>) -> Option { - self.partial_cmp(other) - } - } - }; +impl PartialOrdEvtA> for PulsedVal +where + EVT: EventValueType, +{ + fn cmp_a(&self, other: &PulsedVal) -> Option { + self.partial_cmp(other) + } } -impl_pulse_evt!(u8); +impl EventValueType for PulsedVal +where + EVT: EventValueType + SubFrId, +{ + type Container = VecDequePulsed; + type AggregatorTimeWeight = AggregatorPulsedNumeric; + type AggTimeWeightOutputAvg = EVT::AggTimeWeightOutputAvg; + type IterTy1<'a> = PulsedValIterTy<'a, EVT>; + const SERDE_ID: u32 = items_0::subfr::pulsed_subfr(::SUB) as _; + const BYTE_ESTIMATE_V00: u32 = core::mem::size_of::() as u32; +} #[derive(Debug, Clone)] pub struct EventSingleRef<'a, EVT> @@ -578,8 +670,8 @@ where self.vals.push_back(val); } - pub fn iter_zip<'a>(&'a self) -> impl Iterator)> { - self.tss.iter().zip(self.vals.iter_ty_1()) + pub fn iter_zip<'a>(&'a self) -> impl Iterator)> { + self.tss.iter().map(|&x| x).zip(self.vals.iter_ty_1()) } pub fn serde_id() -> u32 { @@ -664,6 +756,7 @@ where evs: &'a ContainerEvents, end: usize, pos: usize, + // it: Box)>>, } impl<'a, EVT> ContainerEventsTakeUpTo<'a, EVT> @@ -671,10 +764,13 @@ where EVT: EventValueType, { pub fn new(evs: &'a ContainerEvents) -> Self { + // let it = unsafe { netpod::extltref(evs) }.iter_zip(); + // let it = Box::new(it); Self { evs, end: evs.len(), pos: 0, + // it, } } @@ -884,9 +980,9 @@ impl ToUserFacingApiType for ContainerEventsCollected where EVT: EventValueType, { - fn to_user_facing_api_type(self: Self) -> Box { + fn into_user_facing_api_type(self: Self) -> Box { let evs = ContainerEventsApi:: { - tss: self.evs.tss.into_iter().map(|x| x.ns()).collect(), + tss: self.evs.tss, values: self.evs.vals, range_final: self.range_final, timed_out: self.timed_out, @@ -894,8 +990,8 @@ where Box::new(evs) } - fn to_user_facing_api_type_box(self: Box) -> Box { - (*self).to_user_facing_api_type() + fn into_user_facing_api_type_box(self: Box) -> Box { + (*self).into_user_facing_api_type() } } @@ -925,6 +1021,7 @@ where EVT: EventValueType, { pub fn new() -> Self { + debug!("ContainerEventsCollector::new"); Self { evs: ContainerEvents::new(), range_final: false, @@ -950,9 +1047,7 @@ where type Output = ContainerEventsCollected; fn ingest(&mut self, src: &mut Self::Input) { - let n = self.len(); - info!("CollectorTy for ContainerEventsCollector n {}", n); - MergeableTy::drain_into(src, &mut self.evs, 0..n); + MergeableTy::drain_into(src, &mut self.evs, 0..src.len()); } fn set_range_complete(&mut self) { @@ -983,31 +1078,13 @@ where } } -impl ToCborValue for ContainerEvents -where - EVT: EventValueType, -{ - fn to_cbor_value(&self) -> Result { - ciborium::value::Value::serialized(self) - } -} - -impl ToJsonValue for ContainerEvents -where - EVT: EventValueType, -{ - fn to_json_value(&self) -> Result { - serde_json::to_value(self) - } -} - impl ToUserFacingApiType for ContainerEvents where EVT: EventValueType, { - fn to_user_facing_api_type(self: Self) -> Box { + fn into_user_facing_api_type(self: Self) -> Box { let ret = ContainerEventsApi:: { - tss: self.tss.into_iter().map(|x| x.ns()).collect(), + tss: self.tss, values: self.vals, range_final: false, timed_out: false, @@ -1015,9 +1092,9 @@ where Box::new(ret) } - fn to_user_facing_api_type_box(self: Box) -> Box { + fn into_user_facing_api_type_box(self: Box) -> Box { let this = *self; - this.to_user_facing_api_type() + this.into_user_facing_api_type() } } @@ -1071,29 +1148,26 @@ mod test_frame { use super::*; use crate::channelevents::ChannelEvents; use crate::framable::Framable; - use crate::framable::INMEM_FRAME_ENCID; use crate::frame::decode_frame; use crate::inmem::InMemoryFrame; + use crate::inmem::ParseResult; + use items_0::streamitem::sitem_data; use items_0::streamitem::RangeCompletableItem; use items_0::streamitem::Sitemty; use items_0::streamitem::StreamItem; - use netpod::TsMs; #[test] fn events_serialize() { - let mut evs = ContainerEvents::new(); - evs.push_back(TsNano::from_ns(123), 55f32); + let mut evs = ContainerEvents::::new(); + evs.push_back(TsNano::from_ns(123), 55.); + evs.push_back(TsNano::from_ns(124), 56.); let item = ChannelEvents::from(evs); - let item: Sitemty<_> = Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))); - let mut buf = item.make_frame_dyn().unwrap(); - let s = String::from_utf8_lossy(&buf[20..buf.len() - 4]); - eprintln!("[[{s}]]"); - let buflen = buf.len(); - let frame = InMemoryFrame { - encid: INMEM_FRAME_ENCID, - tyid: 0x2500, - len: (buflen - 24) as _, - buf: buf.split_off(20).split_to(buflen - 20 - 4).freeze(), + let item: Sitemty<_> = sitem_data(item); + let buf = item.make_frame_dyn().unwrap(); + let frame = match InMemoryFrame::parse(&buf) { + Ok(ParseResult::Parsed(n, val)) => val, + Ok(ParseResult::NotEnoughData(n)) => panic!(), + Err(e) => panic!("{}", e), }; let item: Sitemty = decode_frame(&frame).unwrap(); let item = if let Ok(x) = item { x } else { panic!() }; @@ -1119,7 +1193,7 @@ mod test_frame { }; assert_eq!( MergeableTy::tss_for_testing(item), - &[TsMs::from_ms_u64(123)] + &[TsNano::from_ns(123), TsNano::from_ns(124)] ); } } diff --git a/src/binning/timeweight/timeweight_bins.rs b/src/binning/timeweight/timeweight_bins.rs index c953287..9582dfa 100644 --- a/src/binning/timeweight/timeweight_bins.rs +++ b/src/binning/timeweight/timeweight_bins.rs @@ -2,6 +2,7 @@ use crate::binning::container::bins::AggBinValTw; use crate::binning::container::bins::BinAggedType; use crate::binning::container_bins::ContainerBins; use crate::binning::container_events::EventValueType; +use crate::binning::container_events::PartialOrdEvtA; use crate::log::*; use items_0::timebin::BinnedBinsTimeweightTrait; use items_0::timebin::BinningggError; @@ -86,13 +87,16 @@ where self.non_fnl = false; } - fn bound(a: &mut Option, b: &EVT, f: impl Fn(&EVT, &EVT) -> bool) { + fn bound(a: &mut Option, b: ::IterTy1<'_>, d: std::cmp::Ordering) { if let Some(x) = a.as_mut() { - if f(b, x) { - *x = b.clone(); + match b.cmp_a(x) { + Some(x) if x == d => { + *a = Some(b.into()); + } + Some(_) | None => {} } } else { - *a = Some(b.clone()); + *a = Some(b.into()); } } @@ -101,20 +105,20 @@ where let grid = self.range.bin_len_dt_ns(); trace_ingest_bin!("grid {:?} ts1 {:?} agg {:?}", grid, ts1, agg); if ts1 < self.active_beg { - self.lst = Some(lst.clone()); + self.lst = Some(lst.into()); } else { if ts1 >= self.active_end { self.maybe_emit_active(); self.active_forward(ts1); } self.cnt += cnt; - Self::bound(&mut self.min, min, PartialOrd::lt); - Self::bound(&mut self.max, max, PartialOrd::gt); + Self::bound(&mut self.min, min, std::cmp::Ordering::Less); + Self::bound(&mut self.max, max, std::cmp::Ordering::Greater); let dt = ts2.delta(ts1); let bl = self.range.bin_len_dt_ns(); self.agg.ingest(dt, bl, cnt, agg.clone()); self.non_fnl |= !fnl; - self.lst = Some(lst.clone()); + self.lst = Some(lst.into()); } } Ok(()) diff --git a/src/binning/valuetype.rs b/src/binning/valuetype.rs index 4a4e63b..19e8820 100644 --- a/src/binning/valuetype.rs +++ b/src/binning/valuetype.rs @@ -42,14 +42,6 @@ impl Container for EnumVariantContainer { self.names.push_back(name); } - fn pop_front(&mut self) -> Option { - if let (Some(a), Some(b)) = (self.ixs.pop_front(), self.names.pop_front()) { - Some(EnumVariant::new(a, b)) - } else { - None - } - } - fn get_iter_ty_1(&self, pos: usize) -> Option<::IterTy1<'_>> { if let (Some(&ix), Some(name)) = (self.ixs.get(pos), self.names.get(pos)) { let ret = EnumVariantRef { @@ -76,6 +68,20 @@ impl Container for EnumVariantContainer { dst.ixs.extend(self.ixs.drain(range.clone())); dst.names.extend(self.names.drain(range)); } + + fn into_user_facing_fields(self) -> Vec<(String, Box)> { + vec![ + ("values".into(), Box::new(self.ixs)), + ("valuestrings".into(), Box::new(self.names)), + ] + } + + fn into_user_facing_fields_json(self) -> Vec<(String, Box)> { + vec![ + ("values".into(), Box::new(self.ixs)), + ("valuestrings".into(), Box::new(self.names)), + ] + } } #[derive(Debug)] @@ -130,6 +136,6 @@ impl EventValueType for EnumVariant { type AggregatorTimeWeight = EnumVariantAggregatorTimeWeight; type AggTimeWeightOutputAvg = f32; type IterTy1<'a> = EnumVariantRef<'a>; - const SERDE_ID: u32 = Self::SUB; + const SERDE_ID: u32 = Self::SUB as u32; const BYTE_ESTIMATE_V00: u32 = 40; } diff --git a/src/channelevents.rs b/src/channelevents.rs index 3c0ae44..1b76bf5 100644 --- a/src/channelevents.rs +++ b/src/channelevents.rs @@ -223,8 +223,13 @@ impl AsAnyMut for ChannelEvents { mod serde_channel_events { use super::ChannelEvents; use crate::binning::container_events::ContainerEvents; + use crate::binning::container_events::PulsedVal; use crate::channelevents::ConnStatusEvent; use crate::log::*; + use items_0::subfr::is_container_events; + use items_0::subfr::is_pulsed_subfr; + use items_0::subfr::is_vec_subfr; + use items_0::subfr::subfr_scalar_type; use items_0::subfr::SubFrId; use items_0::timebin::BinningggContainerEventsDyn; use netpod::EnumVariant; @@ -241,6 +246,11 @@ mod serde_channel_events { macro_rules! trace_serde { ($($arg:tt)*) => ( if false { eprintln!($($arg)*); }) } + type C01 = ContainerEvents; + type C02 = ContainerEvents>; + type C03 = ContainerEvents>; + type C04 = ContainerEvents>>; + fn try_serialize( v: &dyn BinningggContainerEventsDyn, ser: &mut ::SerializeSeq, @@ -258,6 +268,34 @@ mod serde_channel_events { } } + macro_rules! ser_inner_nty { + ($ser:expr, $cont1:ident, $nty:expr, $val:expr) => {{ + let ser = $ser; + let nty_id = subfr_scalar_type($nty); + let v = $val; + type C = $cont1; + match nty_id { + u8::SUB => try_serialize::>(v, ser), + // u16::SUB => try_serialize::>(v, ser)?, + // u32::SUB => try_serialize::>(v, ser)?, + // u64::SUB => try_serialize::>(v, ser)?, + // i8::SUB => try_serialize::>(v, ser)?, + // i16::SUB => try_serialize::>(v, ser)?, + // i32::SUB => try_serialize::>(v, ser)?, + // i64::SUB => try_serialize::>(v, ser)?, + f32::SUB => try_serialize::>(v, ser), + // f64::SUB => try_serialize::>(v, ser)?, + // bool::SUB => try_serialize::>(v, ser)?, + // String::SUB => try_serialize::>(v, ser)?, + // EnumVariant::SUB => try_serialize::>(v, ser)?, + _ => { + let msg = format!("serde ser not supported evt id 0x{:x}", nty_id); + return Err(serde::ser::Error::custom(msg)); + } + } + }}; + } + struct EvRef<'a>(&'a dyn BinningggContainerEventsDyn); struct EvBox(Box); @@ -270,35 +308,25 @@ mod serde_channel_events { let mut ser = ser.serialize_seq(Some(3))?; ser.serialize_element(&self.0.serde_id())?; ser.serialize_element(&self.0.nty_id())?; - use items_0::streamitem::CONTAINER_EVENTS_TYPE_ID; - type C1 = ContainerEvents; - match self.0.serde_id() { - CONTAINER_EVENTS_TYPE_ID => match self.0.nty_id() { - u8::SUB => try_serialize::>(self.0, &mut ser)?, - u16::SUB => try_serialize::>(self.0, &mut ser)?, - u32::SUB => try_serialize::>(self.0, &mut ser)?, - u64::SUB => try_serialize::>(self.0, &mut ser)?, - i8::SUB => try_serialize::>(self.0, &mut ser)?, - i16::SUB => try_serialize::>(self.0, &mut ser)?, - i32::SUB => try_serialize::>(self.0, &mut ser)?, - i64::SUB => try_serialize::>(self.0, &mut ser)?, - f32::SUB => try_serialize::>(self.0, &mut ser)?, - f64::SUB => try_serialize::>(self.0, &mut ser)?, - bool::SUB => try_serialize::>(self.0, &mut ser)?, - String::SUB => try_serialize::>(self.0, &mut ser)?, - EnumVariant::SUB => try_serialize::>(self.0, &mut ser)?, - // - Vec::::SUB => try_serialize::>>(self.0, &mut ser)?, - _ => { - let msg = format!("not supported evt id {}", self.0.nty_id()); - return Err(serde::ser::Error::custom(msg)); + let nty_id = self.0.nty_id() as u16; + if is_container_events(self.0.serde_id()) { + if is_pulsed_subfr(nty_id) { + if is_vec_subfr(nty_id) { + ser_inner_nty!(&mut ser, C04, nty_id, self.0) + } else { + ser_inner_nty!(&mut ser, C03, nty_id, self.0) + } + } else { + if is_vec_subfr(nty_id) { + ser_inner_nty!(&mut ser, C02, nty_id, self.0) + } else { + ser_inner_nty!(&mut ser, C01, nty_id, self.0) } - }, - _ => { - let msg = format!("not supported obj id {}", self.0.serde_id()); - return Err(serde::ser::Error::custom(msg)); } - } + } else { + let msg = format!("not supported obj id {}", self.0.serde_id()); + return Err(serde::ser::Error::custom(msg)); + }?; ser.end() } } @@ -316,12 +344,40 @@ mod serde_channel_events { A: de::SeqAccess<'de>, T: Deserialize<'de> + BinningggContainerEventsDyn + 'static, { + let s = std::any::type_name::(); + trace_serde!("get_2nd_or_err {}", s); let obj: T = seq .next_element()? .ok_or_else(|| de::Error::missing_field("[2] obj"))?; Ok(EvBox(Box::new(obj))) } + macro_rules! de_inner_nty { + ($seq:expr, $cont1:ident, $nty:expr) => {{ + let seq = $seq; + let nty = subfr_scalar_type($nty); + match nty { + u8::SUB => get_2nd_or_err::<$cont1, _>(seq), + u16::SUB => get_2nd_or_err::<$cont1, _>(seq), + u32::SUB => get_2nd_or_err::<$cont1, _>(seq), + u64::SUB => get_2nd_or_err::<$cont1, _>(seq), + i8::SUB => get_2nd_or_err::<$cont1, _>(seq), + i16::SUB => get_2nd_or_err::<$cont1, _>(seq), + i32::SUB => get_2nd_or_err::<$cont1, _>(seq), + i64::SUB => get_2nd_or_err::<$cont1, _>(seq), + f32::SUB => get_2nd_or_err::<$cont1, _>(seq), + f64::SUB => get_2nd_or_err::<$cont1, _>(seq), + bool::SUB => get_2nd_or_err::<$cont1, _>(seq), + String::SUB => get_2nd_or_err::<$cont1, _>(seq), + EnumVariant::SUB => get_2nd_or_err::<$cont1, _>(seq), + _ => { + error!("TODO serde::de nty 0x{:x}", nty); + Err(de::Error::custom(&format!("unknown nty 0x{:x}", nty))) + } + } + }}; + } + impl<'de> Visitor<'de> for EvBoxVis { type Value = EvBox; @@ -334,49 +390,31 @@ mod serde_channel_events { A: de::SeqAccess<'de>, { trace_serde!("EvBoxVis::visit_seq"); - type C1 = ContainerEvents; let cty: u32 = seq .next_element()? .ok_or_else(|| de::Error::missing_field("[0] cty"))?; - let nty: u32 = seq + let nty: u16 = seq .next_element()? .ok_or_else(|| de::Error::missing_field("[1] nty"))?; - if cty == C1::::serde_id() { - match nty { - u8::SUB => get_2nd_or_err::, _>(&mut seq), - u16::SUB => get_2nd_or_err::, _>(&mut seq), - u32::SUB => get_2nd_or_err::, _>(&mut seq), - u64::SUB => get_2nd_or_err::, _>(&mut seq), - i8::SUB => get_2nd_or_err::, _>(&mut seq), - i16::SUB => get_2nd_or_err::, _>(&mut seq), - i32::SUB => get_2nd_or_err::, _>(&mut seq), - i64::SUB => get_2nd_or_err::, _>(&mut seq), - f32::SUB => get_2nd_or_err::, _>(&mut seq), - f64::SUB => get_2nd_or_err::, _>(&mut seq), - bool::SUB => get_2nd_or_err::, _>(&mut seq), - String::SUB => get_2nd_or_err::, _>(&mut seq), - EnumVariant::SUB => get_2nd_or_err::, _>(&mut seq), - Vec::::SUB => get_2nd_or_err::>, _>(&mut seq), - Vec::::SUB => get_2nd_or_err::>, _>(&mut seq), - Vec::::SUB => get_2nd_or_err::>, _>(&mut seq), - Vec::::SUB => get_2nd_or_err::>, _>(&mut seq), - Vec::::SUB => get_2nd_or_err::>, _>(&mut seq), - Vec::::SUB => get_2nd_or_err::>, _>(&mut seq), - Vec::::SUB => get_2nd_or_err::>, _>(&mut seq), - Vec::::SUB => get_2nd_or_err::>, _>(&mut seq), - Vec::::SUB => get_2nd_or_err::>, _>(&mut seq), - Vec::::SUB => get_2nd_or_err::>, _>(&mut seq), - Vec::::SUB => get_2nd_or_err::>, _>(&mut seq), - Vec::::SUB => get_2nd_or_err::>, _>(&mut seq), - Vec::::SUB => get_2nd_or_err::>, _>(&mut seq), - _ => { - error!("TODO serde cty {cty} nty {nty}"); - Err(de::Error::custom(&format!("unknown nty {nty}"))) + let seq = &mut seq; + trace_serde!("EvBoxVis cty 0x{:x} nty 0x{:X}", cty, nty); + if is_container_events(cty) { + if is_pulsed_subfr(nty) { + if is_vec_subfr(nty) { + de_inner_nty!(seq, C04, nty) + } else { + de_inner_nty!(seq, C03, nty) + } + } else { + if is_vec_subfr(nty) { + de_inner_nty!(seq, C02, nty) + } else { + de_inner_nty!(seq, C01, nty) } } } else { - error!("unsupported serde cty {cty} nty {nty}"); - Err(de::Error::custom(&format!("unknown cty {cty}"))) + error!("unsupported serde cty 0x{:x} nty 0x{:x}", cty, nty); + Err(de::Error::custom(&format!("unknown cty 0x{:x}", cty))) } } } @@ -682,13 +720,13 @@ impl MergeableTy for ChannelEvents { Some(_) => DrainIntoDstResult::Partial, None => { if range.len() != 1 { - trace!("try to add empty range to status container {range:?}"); + trace!("try to add empty range to status container {:?}", range); } if range.start != 0 { - trace!("weird range {range:?}"); + trace!("weird range {:?}", range); } if range.end > 1 { - trace!("weird range {range:?}"); + trace!("weird range {:?}", range); } *j = k.take(); DrainIntoDstResult::Done @@ -824,11 +862,11 @@ impl WithLen for ChannelEventsCollectorOutput { } impl ToUserFacingApiType for ChannelEventsCollectorOutput { - fn to_user_facing_api_type(self: Self) -> Box { + fn into_user_facing_api_type(self: Self) -> Box { todo!() } - fn to_user_facing_api_type_box(self: Box) -> Box { + fn into_user_facing_api_type_box(self: Box) -> Box { todo!() } } @@ -921,51 +959,23 @@ impl CollectorDyn for ChannelEventsCollector { } None => { let e = err::Error::with_public_msg_no_trace("nothing collected [caa8d2565]"); - error!("{e}"); + error!("{}", e); Err(e) } } } } -impl ToJsonValue for ChannelEvents { - fn to_json_value(&self) -> Result { - let ret = match self { - ChannelEvents::Events(x) => x.to_json_value().unwrap(), - ChannelEvents::Status(x) => serde_json::json!({ - "_private_channel_status": x, - }), - }; - Ok(ret) - } -} - -impl ToCborValue for ChannelEvents { - fn to_cbor_value(&self) -> Result { - let ret = match self { - ChannelEvents::Events(x) => x.to_cbor_value()?, - ChannelEvents::Status(x) => { - use ciborium::cbor; - cbor!({ - "_private_channel_status" => x, - }) - .unwrap() - } - }; - Ok(ret) - } -} - impl ToUserFacingApiType for ChannelEvents { - fn to_user_facing_api_type(self) -> Box { + fn into_user_facing_api_type(self) -> Box { match self { - ChannelEvents::Events(x) => x.to_user_facing_api_type_box(), + ChannelEvents::Events(x) => x.into_user_facing_api_type_box(), ChannelEvents::Status(x) => Box::new(items_0::apitypes::EmptyStruct::new()), } } - fn to_user_facing_api_type_box(self: Box) -> Box { + fn into_user_facing_api_type_box(self: Box) -> Box { let this = *self; - this.to_user_facing_api_type() + this.into_user_facing_api_type() } } diff --git a/src/empty.rs b/src/empty.rs index af35c27..3f2a5e1 100644 --- a/src/empty.rs +++ b/src/empty.rs @@ -1,4 +1,5 @@ use crate::binning::container_events::ContainerEvents; +use crate::binning::container_events::PulsedVal; use crate::Error; use daqbuf_err as err; use items_0::timebin::BinningggContainerEventsDyn; @@ -57,3 +58,54 @@ pub fn empty_events_dyn_ev( }; Ok(ret) } + +pub fn empty_events_pulsed_dyn_ev( + scalar_type: &ScalarType, + shape: &Shape, +) -> Result, Error> { + let ret: Box = match shape { + Shape::Scalar => { + use ScalarType::*; + type K = ContainerEvents>; + match scalar_type { + U8 => Box::new(K::::new()), + U16 => Box::new(K::::new()), + U32 => Box::new(K::::new()), + U64 => Box::new(K::::new()), + I8 => Box::new(K::::new()), + I16 => Box::new(K::::new()), + I32 => Box::new(K::::new()), + I64 => Box::new(K::::new()), + F32 => Box::new(K::::new()), + F64 => Box::new(K::::new()), + BOOL => Box::new(K::::new()), + STRING => Box::new(K::::new()), + Enum => Box::new(K::::new()), + } + } + Shape::Wave(..) => { + use ScalarType::*; + type K = ContainerEvents>; + match scalar_type { + U8 => Box::new(K::::new()), + U16 => Box::new(K::::new()), + U32 => Box::new(K::::new()), + U64 => Box::new(K::::new()), + I8 => Box::new(K::::new()), + I16 => Box::new(K::::new()), + I32 => Box::new(K::::new()), + I64 => Box::new(K::::new()), + F32 => Box::new(K::::new()), + F64 => Box::new(K::::new()), + BOOL => Box::new(K::::new()), + STRING => Box::new(K::::new()), + Enum => Box::new(K::::new()), + } + } + Shape::Image(..) => { + error!("TODO empty_events_dyn_ev {scalar_type:?} {shape:?}"); + err::todoval() + } + }; + Ok(ret) +} diff --git a/src/eventsdim0.rs b/src/eventsdim0.rs index baa77b4..4f7242a 100644 --- a/src/eventsdim0.rs +++ b/src/eventsdim0.rs @@ -375,18 +375,12 @@ impl WithLen for EventsDim0CollectorOutput { } } -impl ToJsonValue for EventsDim0CollectorOutput { - fn to_json_value(&self) -> Result { - serde_json::to_value(self) - } -} - impl ToUserFacingApiType for EventsDim0CollectorOutput { - fn to_user_facing_api_type(self: Self) -> Box { + fn into_user_facing_api_type(self: Self) -> Box { todo!() } - fn to_user_facing_api_type_box(self: Box) -> Box { + fn into_user_facing_api_type_box(self: Box) -> Box { todo!() } } @@ -647,7 +641,7 @@ impl Events for EventsDim0 { } fn nty_id(&self) -> u32 { - STY::SUB + STY::SUB as u32 } fn clone_dyn(&self) -> Box { diff --git a/src/eventsdim0enum.rs b/src/eventsdim0enum.rs index 3ef5953..ba72f69 100644 --- a/src/eventsdim0enum.rs +++ b/src/eventsdim0enum.rs @@ -129,11 +129,11 @@ impl TypeName for EventsDim0EnumCollectorOutput { } impl ToUserFacingApiType for EventsDim0EnumCollectorOutput { - fn to_user_facing_api_type(self: Self) -> Box { + fn into_user_facing_api_type(self: Self) -> Box { todo!() } - fn to_user_facing_api_type_box(self: Box) -> Box { + fn into_user_facing_api_type_box(self: Box) -> Box { todo!() } } diff --git a/src/eventsdim1.rs b/src/eventsdim1.rs index 8fb27e3..0ed8cfd 100644 --- a/src/eventsdim1.rs +++ b/src/eventsdim1.rs @@ -337,11 +337,11 @@ impl WithLen for EventsDim1CollectorOutput { } impl ToUserFacingApiType for EventsDim1CollectorOutput { - fn to_user_facing_api_type(self: Self) -> Box { + fn into_user_facing_api_type(self: Self) -> Box { todo!() } - fn to_user_facing_api_type_box(self: Box) -> Box { + fn into_user_facing_api_type_box(self: Box) -> Box { todo!() } } @@ -589,7 +589,7 @@ impl Events for EventsDim1 { } fn nty_id(&self) -> u32 { - STY::SUB + STY::SUB as u32 } fn clone_dyn(&self) -> Box { diff --git a/src/inmem.rs b/src/inmem.rs index ce3dd7a..d48cfe7 100644 --- a/src/inmem.rs +++ b/src/inmem.rs @@ -1,6 +1,25 @@ +use crate::framable::INMEM_FRAME_FOOT; +use crate::framable::INMEM_FRAME_HEAD; +use crate::framable::INMEM_FRAME_MAGIC; +use crate::log::*; use bytes::Bytes; use std::fmt; +#[derive(Debug, thiserror::Error)] +#[cstm(name = "InMemoryFrameError")] +pub enum Error { + LessThanHeader, + TryFromSlice(#[from] std::array::TryFromSliceError), + BadMagic(u32), + HugeFrame(u32), + BadCrc, +} + +pub enum ParseResult { + NotEnoughData(usize), + Parsed(usize, T), +} + pub struct InMemoryFrame { pub encid: u32, pub tyid: u32, @@ -12,15 +31,66 @@ impl InMemoryFrame { pub fn encid(&self) -> u32 { self.encid } + pub fn tyid(&self) -> u32 { self.tyid } + pub fn len(&self) -> u32 { self.len } + pub fn buf(&self) -> &Bytes { &self.buf } + + pub fn parse(buf: &[u8]) -> Result, Error> { + if buf.len() < INMEM_FRAME_HEAD { + return Err(Error::LessThanHeader); + } + let magic = u32::from_le_bytes(buf[0..4].try_into()?); + let encid = u32::from_le_bytes(buf[4..8].try_into()?); + let tyid = u32::from_le_bytes(buf[8..12].try_into()?); + let len = u32::from_le_bytes(buf[12..16].try_into()?); + let payload_crc_exp = u32::from_le_bytes(buf[16..20].try_into()?); + if magic != INMEM_FRAME_MAGIC { + return Err(Error::BadMagic(magic)); + } + if len > 1024 * 1024 * 50 { + return Err(Error::HugeFrame(len)); + } + let lentot = INMEM_FRAME_HEAD + INMEM_FRAME_FOOT + len as usize; + if buf.len() < lentot { + return Ok(ParseResult::NotEnoughData(lentot)); + } + let p1 = INMEM_FRAME_HEAD + len as usize; + let mut h = crc32fast::Hasher::new(); + h.update(&buf[..p1]); + let frame_crc = h.finalize(); + let mut h = crc32fast::Hasher::new(); + h.update(&buf[INMEM_FRAME_HEAD..p1]); + let payload_crc = h.finalize(); + let frame_crc_ind = u32::from_le_bytes(buf[p1..p1 + 4].try_into()?); + let payload_crc_match = payload_crc_exp == payload_crc; + let frame_crc_match = frame_crc_ind == frame_crc; + if !frame_crc_match || !payload_crc_match { + let _ss = String::from_utf8_lossy(&buf[..buf.len().min(256)]); + let msg = format!( + "InMemoryFrameAsyncReadStream tryparse crc mismatch A {} {}", + payload_crc_match, frame_crc_match, + ); + error!("{}", msg); + let e = Error::BadCrc; + return Err(e); + } + let ret = InMemoryFrame { + len, + tyid, + encid, + buf: Bytes::from(buf[INMEM_FRAME_HEAD..p1].to_vec()), + }; + Ok(ParseResult::Parsed(lentot, ret)) + } } impl fmt::Debug for InMemoryFrame { diff --git a/src/jsonbytes.rs b/src/jsonbytes.rs index 9be908d..37bdcf8 100644 --- a/src/jsonbytes.rs +++ b/src/jsonbytes.rs @@ -1,4 +1,5 @@ use bytes::Bytes; +use core::fmt; use items_0::WithLen; pub struct JsonBytes(String); @@ -15,6 +16,18 @@ impl JsonBytes { pub fn len(&self) -> u32 { self.0.len() as _ } + + pub fn into_bytes(self) -> Vec { + self.0.as_bytes().to_vec() + } +} + +impl fmt::Debug for JsonBytes { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_tuple("JsonBytes") + .field(&self.0.chars().take(40).collect::()) + .finish() + } } impl WithLen for JsonBytes { diff --git a/src/merger.rs b/src/merger.rs index 581a0c9..1aa0c13 100644 --- a/src/merger.rs +++ b/src/merger.rs @@ -208,7 +208,7 @@ where } } } - trace4!("tslows {tslows:?}"); + trace4!("tslows {:?}", tslows); if let Some((il0, _tl0)) = tslows[0] { if let Some((_il1, tl1)) = tslows[1] { // There is a second input, take only up to the second highest timestamp @@ -218,7 +218,7 @@ where // Can take the whole item // TODO gather stats about this case. Should be never for databuffer, and often for scylla. let mut item = self.items[il0].take().unwrap(); - trace3!("Take all from item {item:?}"); + trace3!("Take all from item {:?}", item); match self.take_into_output_all(&mut item) { DrainIntoDstResult::Done => Ok(Break(())), DrainIntoDstResult::Partial => { @@ -239,7 +239,7 @@ where } else { // Take only up to the lowest ts of the second-lowest input let mut item = self.items[il0].take().unwrap(); - trace3!("Take up to {tl1} from item {item:?}"); + trace3!("Take up to {} from item {:?}", tl1, item); match self.take_into_output_upto(&mut item, tl1) { DrainIntoDstResult::Done => { if item.len() == 0 { @@ -272,7 +272,7 @@ where } else { // No other input, take the whole item let mut item = self.items[il0].take().unwrap(); - trace3!("Take all from item (no other input) {item:?}"); + trace3!("Take all from item (no other input) {:?}", item); match self.take_into_output_all(&mut item) { DrainIntoDstResult::Done => Ok(Break(())), DrainIntoDstResult::Partial => { @@ -363,7 +363,12 @@ where .zip(self.items.iter()) .filter(|(a, b)| a.is_some() && b.is_none()) .count(); - trace3!("ninps {ninps} nitems {nitems} nitemsmissing {nitemsmissing}"); + trace3!( + "ninps {} nitems {} nitemsmissing {}", + ninps, + nitems, + nitemsmissing + ); if nitemsmissing != 0 { let e = Error::NoPendingButMissing; return Break(Ready(Some(e)));