From ca787cee786d5ffce26c42746a50acd5bb5fbb23 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Mon, 9 Dec 2024 19:21:47 +0100 Subject: [PATCH] Test with dummy type --- Cargo.toml | 1 + src/binning/container/bins.rs | 2 +- src/binning/container_bins.rs | 98 ++++++++++++++++++++++++++++++++- src/binning/container_events.rs | 13 +++++ src/binning/test/compare.rs | 12 ++-- src/binning/valuetype.rs | 81 ++++++++++++++++++++++++++- src/channelevents.rs | 94 +++++++++++++++++++++++-------- src/frame.rs | 49 +++++++++-------- 8 files changed, 295 insertions(+), 55 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index a559d99..f5ce4dc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,6 +18,7 @@ chrono = { version = "0.4.19", features = ["serde"] } crc32fast = "1.3.2" futures-util = "0.3.24" humantime-serde = "1.1.1" +autoerr = "0.0.3" thiserror = "=0.0.1" daqbuf-err = { path = "../daqbuf-err" } items_0 = { path = "../daqbuf-items-0", package = "daqbuf-items-0" } diff --git a/src/binning/container/bins.rs b/src/binning/container/bins.rs index 429ae85..fae3670 100644 --- a/src/binning/container/bins.rs +++ b/src/binning/container/bins.rs @@ -18,7 +18,7 @@ where } pub trait BinAggedContainer: - fmt::Debug + Send + Clone + PreviewRange + Serialize + for<'a> Deserialize<'a> + fmt::Debug + Send + Clone + Unpin + PreviewRange + Serialize + for<'a> Deserialize<'a> where BVT: BinAggedType, { diff --git a/src/binning/container_bins.rs b/src/binning/container_bins.rs index b0d75a3..cb143bf 100644 --- a/src/binning/container_bins.rs +++ b/src/binning/container_bins.rs @@ -13,6 +13,10 @@ use items_0::apitypes::ToUserFacingApiType; use items_0::collect_s::CollectableDyn; use items_0::collect_s::CollectedDyn; use items_0::collect_s::ToJsonValue; +use items_0::container::ByteEstimate; +use items_0::merge::DrainIntoDstResult; +use items_0::merge::DrainIntoNewResult; +use items_0::merge::MergeableTy; use items_0::timebin::BinningggContainerBinsDyn; use items_0::timebin::BinsBoxed; use items_0::vecpreview::VecPreview; @@ -383,6 +387,17 @@ where } } +impl ByteEstimate for ContainerBins +where + EVT: EventValueType, + BVT: BinAggedType, +{ + fn byte_estimate(&self) -> u64 { + // TODO ByteEstimate for ContainerBins + 128 * self.len() as u64 + } +} + #[derive(Debug)] pub struct ContainerBinsCollectorOutput where @@ -544,7 +559,8 @@ where { fn ingest(&mut self, src: &mut dyn CollectableDyn) { if let Some(src) = src.as_any_mut().downcast_mut::>() { - src.drain_into(&mut self.bins, 0..src.len()); + MergeableTy::drain_into(src, &mut self.bins, 0..src.len()); + // src.drain_into(&mut self.bins, 0..src.len()); } else { let srcn = src.type_name(); panic!("wrong src type {srcn}"); @@ -641,6 +657,86 @@ where } } +impl MergeableTy for ContainerBins +where + EVT: EventValueType, + BVT: BinAggedType, +{ + fn ts_min(&self) -> Option { + self.ts1s.front().copied() + } + + fn ts_max(&self) -> Option { + self.ts1s.back().copied() + } + + fn find_lowest_index_gt(&self, ts: TsNano) -> Option { + let x = self.ts1s.partition_point(|&x| x <= ts); + if x >= self.ts1s.len() { + None + } else { + Some(x) + } + } + + fn find_lowest_index_ge(&self, ts: TsNano) -> Option { + let x = self.ts1s.partition_point(|&x| x < ts); + if x >= self.ts1s.len() { + None + } else { + Some(x) + } + } + + fn find_highest_index_lt(&self, ts: TsNano) -> Option { + let x = self.ts1s.partition_point(|&x| x < ts); + if x == 0 { + None + } else { + Some(x - 1) + } + } + + fn tss_for_testing(&self) -> VecDeque { + self.ts1s.clone() + } + + fn drain_into( + &mut self, + dst: &mut Self, + range: std::ops::Range, + ) -> items_0::merge::DrainIntoDstResult { + dst.ts1s.extend(self.ts1s.drain(range.clone())); + dst.ts2s.extend(self.ts2s.drain(range.clone())); + dst.cnts.extend(self.cnts.drain(range.clone())); + self.mins.drain_into(&mut dst.mins, range.clone()); + self.maxs.drain_into(&mut dst.maxs, range.clone()); + self.aggs.drain_into(&mut dst.aggs, range.clone()); + self.lsts.drain_into(&mut dst.lsts, range.clone()); + dst.fnls.extend(self.fnls.drain(range.clone())); + DrainIntoDstResult::Done + } + + fn drain_into_new( + &mut self, + range: std::ops::Range, + ) -> items_0::merge::DrainIntoNewResult { + let mut dst = Self::new(); + MergeableTy::drain_into(self, &mut dst, range); + DrainIntoNewResult::Done(dst) + } + + fn is_consistent(&self) -> bool { + let n = self.ts1s.len(); + let mut same_len = true; + same_len &= n == self.ts2s.len(); + same_len &= n == self.cnts.len(); + same_len &= n == self.mins.len(); + same_len &= n == self.ts2s.len(); + same_len + } +} + pub struct ContainerBinsTakeUpTo<'a, EVT, BVT> where EVT: EventValueType, diff --git a/src/binning/container_events.rs b/src/binning/container_events.rs index 087bae4..d0ab9d3 100644 --- a/src/binning/container_events.rs +++ b/src/binning/container_events.rs @@ -54,6 +54,7 @@ where EVT: EventValueType, { fn new() -> Self; + fn len(&self) -> usize; fn push_back(&mut self, val: EVT); fn get_iter_ty_1(&self, pos: usize) -> Option>; fn iter_ty_1(&self) -> impl Iterator>; @@ -86,6 +87,10 @@ where VecDeque::new() } + fn len(&self) -> usize { + self.len() + } + fn push_back(&mut self, val: EVT) { self.push_back(val); } @@ -116,6 +121,10 @@ impl Container for VecDeque { VecDeque::new() } + fn len(&self) -> usize { + self.len() + } + fn push_back(&mut self, val: String) { self.push_back(val); } @@ -386,6 +395,10 @@ where } } + fn len(&self) -> usize { + self.vals.len() + } + fn push_back(&mut self, val: PulsedVal) { self.pulses.push_back(val.0); self.vals.push_back(val.1); diff --git a/src/binning/test/compare.rs b/src/binning/test/compare.rs index 8c4227d..2ae695a 100644 --- a/src/binning/test/compare.rs +++ b/src/binning/test/compare.rs @@ -61,8 +61,8 @@ fn exp_u64<'a>( } fn exp_f32<'a>( - vals: impl Iterator, - exps: impl Iterator, + vals: impl Iterator, + exps: impl Iterator, tag: &str, ) -> Result<(), Error> { let mut it_a = vals; @@ -74,7 +74,7 @@ fn exp_f32<'a>( if a.is_none() && b.is_none() { break; } - if let (Some(&val), Some(&exp)) = (a, b) { + if let (Some(val), Some(exp)) = (a, b) { if netpod::f32_close(val, exp) == false { return Err(Error::AssertMsg(format!( "{tag} val {} exp {} i {}", @@ -106,7 +106,7 @@ pub(super) fn exp_mins( ) -> Result<(), Error> { exp_f32( bins.mins_iter(), - exps.into_vec_deque_f32().iter(), + exps.into_vec_deque_f32().into_iter(), "exp_mins", ) } @@ -117,7 +117,7 @@ pub(super) fn exp_maxs( ) -> Result<(), Error> { exp_f32( bins.maxs_iter(), - exps.into_vec_deque_f32().iter(), + exps.into_vec_deque_f32().into_iter(), "exp_maxs", ) } @@ -137,7 +137,7 @@ pub(super) fn exp_avgs( break; } if let (Some(a), Some(&exp)) = (a, b) { - let val = *a.agg as f32; + let val = a.agg as f32; if netpod::f32_close(val, exp) == false { return Err(Error::AssertMsg(format!( "exp_avgs val {} exp {} i {}", diff --git a/src/binning/valuetype.rs b/src/binning/valuetype.rs index 19e8820..99778ac 100644 --- a/src/binning/valuetype.rs +++ b/src/binning/valuetype.rs @@ -36,6 +36,10 @@ impl Container for EnumVariantContainer { } } + fn len(&self) -> usize { + self.ixs.len() + } + fn push_back(&mut self, val: EnumVariant) { let (ix, name) = val.into_parts(); self.ixs.push_back(ix); @@ -136,6 +140,81 @@ impl EventValueType for EnumVariant { type AggregatorTimeWeight = EnumVariantAggregatorTimeWeight; type AggTimeWeightOutputAvg = f32; type IterTy1<'a> = EnumVariantRef<'a>; - const SERDE_ID: u32 = Self::SUB as u32; + const SERDE_ID: u32 = ::SUB as u32; const BYTE_ESTIMATE_V00: u32 = 40; } + +impl PartialOrdEvtA for netpod::UnsupEvt { + fn cmp_a(&self, other: &netpod::UnsupEvt) -> Option { + todo!() + } +} + +impl PartialOrdEvtA> for Vec { + fn cmp_a(&self, other: &Vec) -> Option { + todo!() + } +} + +#[derive(Debug)] +pub struct UnsupEvtAgg; + +impl AggregatorTimeWeight for UnsupEvtAgg { + fn new() -> Self { + todo!() + } + + fn ingest(&mut self, dt: DtNano, bl: DtNano, val: netpod::UnsupEvt) { + todo!() + } + + fn reset_for_new_bin(&mut self) { + todo!() + } + + fn result_and_reset_for_new_bin( + &mut self, + filled_width_fraction: f32, + ) -> ::AggTimeWeightOutputAvg { + todo!() + } +} + +impl AggregatorTimeWeight> for UnsupEvtAgg { + fn new() -> Self { + todo!() + } + + fn ingest(&mut self, dt: DtNano, bl: DtNano, val: Vec) { + todo!() + } + + fn reset_for_new_bin(&mut self) { + todo!() + } + + fn result_and_reset_for_new_bin( + &mut self, + filled_width_fraction: f32, + ) -> as EventValueType>::AggTimeWeightOutputAvg { + todo!() + } +} + +impl EventValueType for netpod::UnsupEvt { + type Container = std::collections::VecDeque; + type AggregatorTimeWeight = UnsupEvtAgg; + type AggTimeWeightOutputAvg = f32; + type IterTy1<'a> = netpod::UnsupEvt; + const SERDE_ID: u32 = ::SUB as u32; + const BYTE_ESTIMATE_V00: u32 = 4; +} + +impl EventValueType for Vec { + type Container = std::collections::VecDeque>; + type AggregatorTimeWeight = UnsupEvtAgg; + type AggTimeWeightOutputAvg = f32; + type IterTy1<'a> = Vec; + const SERDE_ID: u32 = ::SUB as u32; + const BYTE_ESTIMATE_V00: u32 = 4; +} diff --git a/src/channelevents.rs b/src/channelevents.rs index e815646..c3ba38e 100644 --- a/src/channelevents.rs +++ b/src/channelevents.rs @@ -239,7 +239,7 @@ mod serde_channel_events { use serde::Serializer; use std::fmt; - macro_rules! trace_serde { ($($arg:tt)*) => ( if false { eprintln!($($arg)*); }) } + macro_rules! trace_serde { ($($arg:tt)*) => ( if false { trace!($($arg)*); }) } type C01 = ContainerEvents; type C02 = ContainerEvents>; @@ -271,21 +271,22 @@ mod serde_channel_events { type C = $cont1; match nty_id { u8::SUB => try_serialize::>(v, ser), - // u16::SUB => try_serialize::>(v, ser)?, - // u32::SUB => try_serialize::>(v, ser)?, - // u64::SUB => try_serialize::>(v, ser)?, - // i8::SUB => try_serialize::>(v, ser)?, - // i16::SUB => try_serialize::>(v, ser)?, - // i32::SUB => try_serialize::>(v, ser)?, - // i64::SUB => try_serialize::>(v, ser)?, + u16::SUB => try_serialize::>(v, ser), + u32::SUB => try_serialize::>(v, ser), + u64::SUB => try_serialize::>(v, ser), + i8::SUB => try_serialize::>(v, ser), + i16::SUB => try_serialize::>(v, ser), + i32::SUB => try_serialize::>(v, ser), + i64::SUB => try_serialize::>(v, ser), f32::SUB => try_serialize::>(v, ser), - // f64::SUB => try_serialize::>(v, ser)?, - // bool::SUB => try_serialize::>(v, ser)?, - // String::SUB => try_serialize::>(v, ser)?, - // EnumVariant::SUB => try_serialize::>(v, ser)?, + f64::SUB => try_serialize::>(v, ser), + bool::SUB => try_serialize::>(v, ser), + String::SUB => try_serialize::>(v, ser), + EnumVariant::SUB => try_serialize::>(v, ser), _ => { let msg = format!("serde ser not supported evt id 0x{:x}", nty_id); - return Err(serde::ser::Error::custom(msg)); + error!("{}", msg); + Err(serde::ser::Error::custom(msg)) } } }}; @@ -304,7 +305,7 @@ mod serde_channel_events { ser.serialize_element(&self.0.serde_id())?; ser.serialize_element(&self.0.nty_id())?; let nty_id = self.0.nty_id() as u16; - if is_container_events(self.0.serde_id()) { + let x = if is_container_events(self.0.serde_id()) { if is_pulsed_subfr(nty_id) { if is_vec_subfr(nty_id) { ser_inner_nty!(&mut ser, C04, nty_id, self.0) @@ -320,8 +321,11 @@ mod serde_channel_events { } } else { let msg = format!("not supported obj id {}", self.0.serde_id()); - return Err(serde::ser::Error::custom(msg)); - }?; + Err(serde::ser::Error::custom(msg)) + }; + // warn!("Serialize for EvRef is_ok {}", x.is_ok()); + let _: () = x?; + // warn!("Serialize for EvRef ending"); ser.end() } } @@ -351,6 +355,12 @@ mod serde_channel_events { ($seq:expr, $cont1:ident, $nty:expr) => {{ let seq = $seq; let nty = subfr_scalar_type($nty); + let cc = std::any::type_name::<$cont1>(); + trace_serde!( + "EvBoxVis::visit_seq de_inner_nty nty 0x{:X} cont1 {}", + nty, + cc + ); match nty { u8::SUB => get_2nd_or_err::<$cont1, _>(seq), u16::SUB => get_2nd_or_err::<$cont1, _>(seq), @@ -365,8 +375,12 @@ mod serde_channel_events { bool::SUB => get_2nd_or_err::<$cont1, _>(seq), String::SUB => get_2nd_or_err::<$cont1, _>(seq), EnumVariant::SUB => get_2nd_or_err::<$cont1, _>(seq), + netpod::UnsupEvt::SUB => get_2nd_or_err::<$cont1, _>(seq), _ => { error!("TODO serde::de nty 0x{:x}", nty); + if true { + panic!("TODO serde::de nty 0x{:x}", nty); + } Err(de::Error::custom(&format!("unknown nty 0x{:x}", nty))) } } @@ -392,7 +406,7 @@ mod serde_channel_events { .next_element()? .ok_or_else(|| de::Error::missing_field("[1] nty"))?; let seq = &mut seq; - trace_serde!("EvBoxVis cty 0x{:x} nty 0x{:X}", cty, nty); + trace_serde!("EvBoxVis::visit_seq cty 0x{:x} nty 0x{:x}", cty, nty); if is_container_events(cty) { if is_pulsed_subfr(nty) { if is_vec_subfr(nty) { @@ -432,7 +446,14 @@ mod serde_channel_events { let vars = ChannelEventsVis::allowed_variants(); match self { ChannelEvents::Events(obj) => { - serializer.serialize_newtype_variant(name, 0, vars[0], &EvRef(obj.as_ref())) + let x = serializer.serialize_newtype_variant( + name, + 0, + vars[0], + &EvRef(obj.as_ref()), + ); + // warn!("Serialize for ChannelEvents is_ok {}", x.is_ok()); + x } ChannelEvents::Status(val) => { serializer.serialize_newtype_variant(name, 1, vars[1], val) @@ -552,6 +573,9 @@ mod test_channel_events_serde { use super::ChannelEvents; use crate::binning::container_events::ContainerEvents; use crate::channelevents::ConnStatusEvent; + use crate::framable::Framable; + use crate::inmem::InMemoryFrame; + use crate::log::*; use bincode::config::FixintEncoding; use bincode::config::LittleEndian; use bincode::config::RejectTrailing; @@ -560,13 +584,40 @@ mod test_channel_events_serde { use bincode::config::WithOtherTrailing; use bincode::DefaultOptions; use items_0::bincode; + use items_0::streamitem::sitem_data; + use items_0::streamitem::Sitemty; + use items_0::timebin::BinningggContainerEventsDyn; use items_0::Appendable; use items_0::Empty; use netpod::TsNano; + use netpod::UnsupEvt; use serde::Deserialize; use serde::Serialize; use std::time::SystemTime; + #[test] + fn channel_events_unsup_evt() { + let mut evs = ContainerEvents::new(); + evs.push_back(TsNano::from_ns(8), UnsupEvt(4)); + let item = ChannelEvents::from(evs); + // let item: Box = Box::new(evs); + let item = sitem_data(item); + match item.make_frame_dyn() { + Ok(frame) => { + panic!("this should have failed"); + let imfr = if let Ok(crate::inmem::ParseResult::Parsed(_, x)) = + InMemoryFrame::parse(&frame) + { + x + } else { + panic!(); + }; + crate::frame::decode_frame::>(&imfr).unwrap(); + } + Err(_) => (), + } + } + #[test] fn channel_events() { let mut evs = ContainerEvents::new(); @@ -612,8 +663,9 @@ mod test_channel_events_serde { panic!() }; let item: &ContainerEvents = item.as_any_ref().downcast_ref().unwrap(); - assert_eq!(item.tss().len(), 2); - assert_eq!(item.tss()[1], 12); + use items_0::merge::MergeableTy; + assert_eq!(item.tss_for_testing().len(), 2); + assert_eq!(item.tss_for_testing()[1], TsNano::from_ns(12)); } #[test] @@ -866,7 +918,6 @@ pub struct ChannelEventsCollector { coll: Option>, range_complete: bool, timed_out: bool, - needs_continue_at: bool, tmp_warned_status: bool, tmp_error_unknown_type: bool, } @@ -881,7 +932,6 @@ impl ChannelEventsCollector { coll: None, range_complete: false, timed_out: false, - needs_continue_at: false, tmp_warned_status: false, tmp_error_unknown_type: false, } diff --git a/src/frame.rs b/src/frame.rs index 8a25636..e4d2e8a 100644 --- a/src/frame.rs +++ b/src/frame.rs @@ -4,6 +4,7 @@ use crate::framable::INMEM_FRAME_FOOT; use crate::framable::INMEM_FRAME_HEAD; use crate::framable::INMEM_FRAME_MAGIC; use crate::inmem::InMemoryFrame; +use crate::log::*; use bincode::config::FixintEncoding; use bincode::config::LittleEndian; use bincode::config::RejectTrailing; @@ -23,7 +24,6 @@ use items_0::streamitem::LOG_FRAME_TYPE_ID; use items_0::streamitem::RANGE_COMPLETE_FRAME_TYPE_ID; use items_0::streamitem::STATS_FRAME_TYPE_ID; use items_0::streamitem::TERM_FRAME_TYPE_ID; -use netpod::log::*; use serde::Serialize; use std::any; use std::io; @@ -32,26 +32,23 @@ const USE_JSON: bool = false; const EMIT_JSON_DEBUG: bool = false; const EMIT_POSTCARD_DEBUG: bool = false; -#[derive(Debug, thiserror::Error)] -#[cstm(name = "ItemFrame")] -pub enum Error { - TooLongPayload(usize), - UnknownEncoder(u32), - #[error("BufferMismatch({0}, {1}, {2})")] - BufferMismatch(u32, usize, u32), - #[error("TyIdMismatch({0}, {1})")] - TyIdMismatch(u32, u32), - Msg(String), - Bincode(#[from] Box), - RmpEnc(#[from] rmp_serde::encode::Error), - RmpDec(#[from] rmp_serde::decode::Error), - ErasedSerde(#[from] erased_serde::Error), - #[error("PostcardSer({0})")] - PostcardSer(postcard::Error), - #[error("PostcardDe({0}, {1}, {2:?}, {3})")] - PostcardDe(postcard::Error, usize, Vec, &'static str), - SerdeJson(#[from] serde_json::Error), -} +autoerr::create_error_v1!( + name(Error, "ItemFrame"), + enum variants { + TooLongPayload(usize), + UnknownEncoder(u32), + BufferMismatch(u32, usize, u32), + TyIdMismatch(u32, u32), + Msg(String), + Bincode(#[from] Box), + RmpEnc(#[from] rmp_serde::encode::Error), + RmpDec(#[from] rmp_serde::decode::Error), + ErasedSerde(#[from] erased_serde::Error), + PostcardSer(postcard::Error), + PostcardDe(postcard::Error, usize, String, String), + SerdeJson(#[from] serde_json::Error), + }, +); struct ErrMsg(E) where @@ -187,8 +184,8 @@ where Error::PostcardDe( e, buf.len(), - buf[0..buf.len().min(40)].to_vec(), - std::any::type_name::(), + format!("{:?}", buf[0..buf.len().min(40)].to_vec()), + std::any::type_name::().into(), ) })?; Ok(x) @@ -271,7 +268,10 @@ where } else if false { msgpack_erased_to_vec(item) } else { - postcard_erased_to_vec(item) + let x = postcard_erased_to_vec(item); + // let s = std::any::type_name::(); + // warn!("encode_erased_to_vec is_ok {} T {}", x.is_ok(), s); + x } } @@ -507,6 +507,7 @@ where frame.tyid(), any::type_name::() ); + error!("decode_from_slice error {}", e); let n = frame.buf().len().min(64); let s = String::from_utf8_lossy(&frame.buf()[..n]); error!(