diff --git a/src/binning/container_bins.rs b/src/binning/container_bins.rs index f686bff..5a59860 100644 --- a/src/binning/container_bins.rs +++ b/src/binning/container_bins.rs @@ -8,7 +8,7 @@ use err::thiserror; use err::ThisError; use items_0::collect_s::CollectableDyn; use items_0::collect_s::CollectedDyn; -use items_0::collect_s::ToJsonResult; +use items_0::collect_s::ToJsonValue; use items_0::timebin::BinningggContainerBinsDyn; use items_0::timebin::BinsBoxed; use items_0::vecpreview::VecPreview; @@ -487,7 +487,7 @@ where // finished_at: Option, } -impl ToJsonResult for ContainerBinsCollectorOutput +impl ToJsonValue for ContainerBinsCollectorOutput where EVT: EventValueType, BVT: BinAggedType, diff --git a/src/binning/container_events.rs b/src/binning/container_events.rs index 24fb67c..331b265 100644 --- a/src/binning/container_events.rs +++ b/src/binning/container_events.rs @@ -9,6 +9,11 @@ use core::ops::Range; use daqbuf_err as err; use err::thiserror; use err::ThisError; +use items_0::apitypes::ContainerEventsApi; +use items_0::apitypes::ToUserFacingApiType; +use items_0::apitypes::UserApiType; +use items_0::collect_s::ToCborValue; +use items_0::collect_s::ToJsonValue; use items_0::container::ByteEstimate; use items_0::merge::DrainIntoDstResult; use items_0::merge::DrainIntoNewDynResult; @@ -300,6 +305,7 @@ mod container_events_serde { use super::ContainerEvents; use super::EventValueType; use serde::de::MapAccess; + use serde::de::SeqAccess; use serde::de::Visitor; use serde::ser::SerializeStruct; use serde::Deserialize; @@ -309,6 +315,8 @@ mod container_events_serde { use std::fmt; use std::marker::PhantomData; + macro_rules! trace_serde { ($($arg:tt)*) => ( if false { eprintln!($($arg)*); }) } + impl Serialize for ContainerEvents where EVT: EventValueType, @@ -339,10 +347,31 @@ mod container_events_serde { fmt.write_str("a struct with fields tss and vals") } + fn visit_seq(self, mut seq: S) -> Result + where + S: SeqAccess<'de>, + { + trace_serde!("Vis ContainerEvents visit_map"); + let tss = seq + .next_element()? + .ok_or_else(|| serde::de::Error::invalid_length(0, &self))?; + let vals = seq + .next_element()? + .ok_or_else(|| serde::de::Error::invalid_length(1, &self))?; + let ret = Self::Value { + tss, + vals, + // TODO make container recompute byte_estimate + byte_estimate: 0, + }; + Ok(ret) + } + fn visit_map(self, mut map: M) -> Result where M: MapAccess<'de>, { + trace_serde!("Vis ContainerEvents visit_map"); let mut tss = None; let mut vals = None; while let Some(key) = map.next_key::<&str>()? { @@ -433,6 +462,10 @@ where pub fn iter_zip<'a>(&'a self) -> impl Iterator)> { self.tss.iter().zip(self.vals.iter_ty_1()) } + + pub fn serde_id() -> u32 { + items_0::streamitem::CONTAINER_EVENTS_TYPE_ID + } } impl fmt::Debug for ContainerEvents @@ -625,6 +658,19 @@ where MergeableTy::drain_into(self, &mut dst, range); DrainIntoNewResult::Done(dst) } + + fn is_consistent(&self) -> bool { + let mut good = true; + let n = self.tss.len(); + for (&ts1, &ts2) in self.tss.iter().zip(self.tss.range(n.min(1)..n)) { + if ts1 > ts2 { + good = false; + error!("unordered event data ts1 {} ts2 {}", ts1, ts2); + break; + } + } + good + } } impl MergeableDyn for ContainerEvents @@ -674,6 +720,42 @@ where DrainIntoNewResult::NotCompatible => DrainIntoNewDynResult::NotCompatible, } } + + fn is_consistent(&self) -> bool { + MergeableTy::is_consistent(self) + } +} + +impl ToCborValue for ContainerEvents +where + EVT: EventValueType, +{ + fn to_cbor_value(&self) -> Result { + ciborium::value::Value::serialized(self) + } +} + +impl ToJsonValue for ContainerEvents +where + EVT: EventValueType, +{ + fn to_json_value(&self) -> Result { + serde_json::to_value(self) + } +} + +impl ToUserFacingApiType for ContainerEvents +where + EVT: EventValueType, +{ + fn to_user_facing_api_type(self) -> Box { + let tss: VecDeque<_> = self.tss.into_iter().map(|x| x.ms()).collect(); + let ret = ContainerEventsApi { + tss: tss.clone(), + values: tss.clone(), + }; + Box::new(ret) + } } impl BinningggContainerEventsDyn for ContainerEvents @@ -701,7 +783,7 @@ where } fn serde_id(&self) -> u32 { - items_0::streamitem::CONTAINER_EVENTS_TYPE_ID + Self::serde_id() } fn nty_id(&self) -> u32 { @@ -716,19 +798,6 @@ where } } - fn verify(&self) -> bool { - let mut good = true; - let n = self.tss.len(); - for (&ts1, &ts2) in self.tss.iter().zip(self.tss.range(n.min(1)..n)) { - if ts1 > ts2 { - good = false; - error!("unordered event data ts1 {} ts2 {}", ts1, ts2); - break; - } - } - good - } - fn as_mergeable_dyn_mut(&mut self) -> &mut dyn MergeableDyn { self } diff --git a/src/channelevents.rs b/src/channelevents.rs index 4bcdf3b..07c46e9 100644 --- a/src/channelevents.rs +++ b/src/channelevents.rs @@ -1,12 +1,18 @@ use crate::binning::container_events::ContainerEvents; use crate::binning::container_events::EventValueType; use crate::framable::FrameType; +use crate::jsonbytes::JsonBytes; +use crate::log::*; use crate::Events; use core::ops::Range; use daqbuf_err as err; +use items_0::apitypes::ToUserFacingApiType; +use items_0::apitypes::UserApiType; use items_0::collect_s::CollectableDyn; use items_0::collect_s::CollectedDyn; use items_0::collect_s::CollectorDyn; +use items_0::collect_s::ToCborValue; +use items_0::collect_s::ToJsonValue; use items_0::container::ByteEstimate; use items_0::framable::FrameTypeInnerStatic; use items_0::isodate::IsoDateTime; @@ -24,9 +30,9 @@ use items_0::EventsNonObj; use items_0::Extendable; use items_0::TypeName; use items_0::WithLen; -use netpod::log::*; use netpod::range::evrange::SeriesRange; use netpod::BinnedRangeEnum; +use netpod::TsMs; use netpod::TsNano; use serde::Deserialize; use serde::Serialize; @@ -226,9 +232,9 @@ mod serde_channel_events { use crate::channelevents::ConnStatusEvent; use crate::eventsdim0::EventsDim0; use crate::eventsdim1::EventsDim1; + use crate::log::*; use items_0::subfr::SubFrId; use items_0::timebin::BinningggContainerEventsDyn; - use netpod::log::*; use netpod::EnumVariant; use serde::de; use serde::de::EnumAccess; @@ -241,6 +247,8 @@ mod serde_channel_events { use serde::Serializer; use std::fmt; + macro_rules! trace_serde { ($($arg:tt)*) => ( if false { eprintln!($($arg)*); }) } + fn try_serialize( v: &dyn BinningggContainerEventsDyn, ser: &mut ::SerializeSeq, @@ -260,7 +268,7 @@ mod serde_channel_events { struct EvRef<'a>(&'a dyn BinningggContainerEventsDyn); - struct EvBox(Box); + struct EvBox(Box); impl<'a> Serialize for EvRef<'a> { fn serialize(&self, ser: S) -> Result @@ -322,88 +330,90 @@ mod serde_channel_events { where A: de::SeqAccess<'de>, { - let cty: &str = seq + trace_serde!("EvBoxVis::visit_seq"); + type C1 = ContainerEvents; + let cty: u32 = seq .next_element()? .ok_or_else(|| de::Error::missing_field("[0] cty"))?; let nty: u32 = seq .next_element()? .ok_or_else(|| de::Error::missing_field("[1] nty"))?; - if cty == EventsDim0::::serde_id() { + if cty == C1::::serde_id() { match nty { u8::SUB => { - let obj: EventsDim0 = seq + let obj: C1 = seq .next_element()? .ok_or_else(|| de::Error::missing_field("[2] obj"))?; Ok(EvBox(Box::new(obj))) } u16::SUB => { - let obj: EventsDim0 = seq + let obj: C1 = seq .next_element()? .ok_or_else(|| de::Error::missing_field("[2] obj"))?; Ok(EvBox(Box::new(obj))) } u32::SUB => { - let obj: EventsDim0 = seq + let obj: C1 = seq .next_element()? .ok_or_else(|| de::Error::missing_field("[2] obj"))?; Ok(EvBox(Box::new(obj))) } u64::SUB => { - let obj: EventsDim0 = seq + let obj: C1 = seq .next_element()? .ok_or_else(|| de::Error::missing_field("[2] obj"))?; Ok(EvBox(Box::new(obj))) } i8::SUB => { - let obj: EventsDim0 = seq + let obj: C1 = seq .next_element()? .ok_or_else(|| de::Error::missing_field("[2] obj"))?; Ok(EvBox(Box::new(obj))) } i16::SUB => { - let obj: EventsDim0 = seq + let obj: C1 = seq .next_element()? .ok_or_else(|| de::Error::missing_field("[2] obj"))?; Ok(EvBox(Box::new(obj))) } i32::SUB => { - let obj: EventsDim0 = seq + let obj: C1 = seq .next_element()? .ok_or_else(|| de::Error::missing_field("[2] obj"))?; Ok(EvBox(Box::new(obj))) } i64::SUB => { - let obj: EventsDim0 = seq + let obj: C1 = seq .next_element()? .ok_or_else(|| de::Error::missing_field("[2] obj"))?; Ok(EvBox(Box::new(obj))) } f32::SUB => { - let obj: EventsDim0 = seq + let obj: C1 = seq .next_element()? .ok_or_else(|| de::Error::missing_field("[2] obj"))?; Ok(EvBox(Box::new(obj))) } f64::SUB => { - let obj: EventsDim0 = seq + let obj: C1 = seq .next_element()? .ok_or_else(|| de::Error::missing_field("[2] obj"))?; Ok(EvBox(Box::new(obj))) } bool::SUB => { - let obj: EventsDim0 = seq + let obj: C1 = seq .next_element()? .ok_or_else(|| de::Error::missing_field("[2] obj"))?; Ok(EvBox(Box::new(obj))) } String::SUB => { - let obj: EventsDim0 = seq + let obj: C1 = seq .next_element()? .ok_or_else(|| de::Error::missing_field("[2] obj"))?; Ok(EvBox(Box::new(obj))) } EnumVariant::SUB => { - let obj: EventsDim0 = seq + let obj: C1 = seq .next_element()? .ok_or_else(|| de::Error::missing_field("[2] obj"))?; Ok(EvBox(Box::new(obj))) @@ -413,76 +423,76 @@ mod serde_channel_events { Err(de::Error::custom(&format!("unknown nty {nty}"))) } } - } else if cty == EventsDim1::::serde_id() { + } else if cty == C1::::serde_id() { match nty { u8::SUB => { - let obj: EventsDim1 = seq + let obj: C1 = seq .next_element()? .ok_or_else(|| de::Error::missing_field("[2] obj"))?; Ok(EvBox(Box::new(obj))) } u16::SUB => { - let obj: EventsDim1 = seq + let obj: C1 = seq .next_element()? .ok_or_else(|| de::Error::missing_field("[2] obj"))?; Ok(EvBox(Box::new(obj))) } u32::SUB => { - let obj: EventsDim1 = seq + let obj: C1 = seq .next_element()? .ok_or_else(|| de::Error::missing_field("[2] obj"))?; Ok(EvBox(Box::new(obj))) } u64::SUB => { - let obj: EventsDim1 = seq + let obj: C1 = seq .next_element()? .ok_or_else(|| de::Error::missing_field("[2] obj"))?; Ok(EvBox(Box::new(obj))) } i8::SUB => { - let obj: EventsDim1 = seq + let obj: C1 = seq .next_element()? .ok_or_else(|| de::Error::missing_field("[2] obj"))?; Ok(EvBox(Box::new(obj))) } i16::SUB => { - let obj: EventsDim1 = seq + let obj: C1 = seq .next_element()? .ok_or_else(|| de::Error::missing_field("[2] obj"))?; Ok(EvBox(Box::new(obj))) } i32::SUB => { - let obj: EventsDim1 = seq + let obj: C1 = seq .next_element()? .ok_or_else(|| de::Error::missing_field("[2] obj"))?; Ok(EvBox(Box::new(obj))) } i64::SUB => { - let obj: EventsDim1 = seq + let obj: C1 = seq .next_element()? .ok_or_else(|| de::Error::missing_field("[2] obj"))?; Ok(EvBox(Box::new(obj))) } f32::SUB => { - let obj: EventsDim1 = seq + let obj: C1 = seq .next_element()? .ok_or_else(|| de::Error::missing_field("[2] obj"))?; Ok(EvBox(Box::new(obj))) } f64::SUB => { - let obj: EventsDim1 = seq + let obj: C1 = seq .next_element()? .ok_or_else(|| de::Error::missing_field("[2] obj"))?; Ok(EvBox(Box::new(obj))) } bool::SUB => { - let obj: EventsDim1 = seq + let obj: C1 = seq .next_element()? .ok_or_else(|| de::Error::missing_field("[2] obj"))?; Ok(EvBox(Box::new(obj))) } String::SUB => { - let obj: EventsDim1 = seq + let obj: C1 = seq .next_element()? .ok_or_else(|| de::Error::missing_field("[2] obj"))?; Ok(EvBox(Box::new(obj))) @@ -608,9 +618,7 @@ mod serde_channel_events { match id { VarId::Events => { let x: EvBox = var.newtype_variant()?; - let _ = x; - // Ok(Self::Value::Events(x.0)); - todo!() + Ok(Self::Value::Events(x.0)) } VarId::Status => { let x: Option = var.newtype_variant()?; @@ -884,11 +892,21 @@ impl MergeableTy for ChannelEvents { } } - fn tss_for_testing(&self) -> Vec { - Events::tss(self) - .iter() - .map(|x| netpod::TsMs::from_ns_u64(*x)) - .collect() + fn tss_for_testing(&self) -> Vec { + match self { + ChannelEvents::Events(x) => x.tss_for_testing(), + ChannelEvents::Status(x) => match x { + Some(x) => vec![x.ts.to_ts_ms()], + None => Vec::new(), + }, + } + } + + fn is_consistent(&self) -> bool { + match self { + ChannelEvents::Events(x) => x.is_consistent(), + ChannelEvents::Status(_) => true, + } } } @@ -898,119 +916,119 @@ impl EventsNonObj for ChannelEvents { } } -impl Events for ChannelEvents { - fn verify(&self) -> bool { - todo!() - } +// impl Events for ChannelEvents { +// fn verify(&self) -> bool { +// todo!() +// } - fn output_info(&self) -> String { - todo!() - } +// fn output_info(&self) -> String { +// todo!() +// } - fn as_collectable_mut(&mut self) -> &mut dyn CollectableDyn { - todo!() - } +// fn as_collectable_mut(&mut self) -> &mut dyn CollectableDyn { +// todo!() +// } - fn as_collectable_with_default_ref(&self) -> &dyn CollectableDyn { - todo!() - } +// fn as_collectable_with_default_ref(&self) -> &dyn CollectableDyn { +// todo!() +// } - fn as_collectable_with_default_mut(&mut self) -> &mut dyn CollectableDyn { - todo!() - } +// fn as_collectable_with_default_mut(&mut self) -> &mut dyn CollectableDyn { +// todo!() +// } - fn ts_min(&self) -> Option { - todo!() - } +// fn ts_min(&self) -> Option { +// todo!() +// } - fn ts_max(&self) -> Option { - todo!() - } +// fn ts_max(&self) -> Option { +// todo!() +// } - fn take_new_events_until_ts(&mut self, _ts_end: u64) -> Box { - todo!() - } +// fn take_new_events_until_ts(&mut self, _ts_end: u64) -> Box { +// todo!() +// } - fn new_empty_evs(&self) -> Box { - todo!() - } +// fn new_empty_evs(&self) -> Box { +// todo!() +// } - fn drain_into_evs( - &mut self, - dst: &mut dyn Events, - range: (usize, usize), - ) -> Result<(), err::Error> { - todo!() - } +// fn drain_into_evs( +// &mut self, +// dst: &mut dyn Events, +// range: (usize, usize), +// ) -> Result<(), err::Error> { +// todo!() +// } - fn find_lowest_index_gt_evs(&self, _ts: u64) -> Option { - todo!() - } +// fn find_lowest_index_gt_evs(&self, _ts: u64) -> Option { +// todo!() +// } - fn find_lowest_index_ge_evs(&self, _ts: u64) -> Option { - todo!() - } +// fn find_lowest_index_ge_evs(&self, _ts: u64) -> Option { +// todo!() +// } - fn find_highest_index_lt_evs(&self, _ts: u64) -> Option { - todo!() - } +// fn find_highest_index_lt_evs(&self, _ts: u64) -> Option { +// todo!() +// } - fn clone_dyn(&self) -> Box { - todo!() - } +// fn clone_dyn(&self) -> Box { +// todo!() +// } - fn partial_eq_dyn(&self, _other: &dyn Events) -> bool { - todo!() - } +// fn partial_eq_dyn(&self, _other: &dyn Events) -> bool { +// todo!() +// } - fn serde_id(&self) -> &'static str { - todo!() - } +// fn serde_id(&self) -> &'static str { +// todo!() +// } - fn nty_id(&self) -> u32 { - todo!() - } +// fn nty_id(&self) -> u32 { +// todo!() +// } - fn tss(&self) -> &VecDeque { - todo!() - } +// fn tss(&self) -> &VecDeque { +// todo!() +// } - fn pulses(&self) -> &VecDeque { - todo!() - } +// fn pulses(&self) -> &VecDeque { +// todo!() +// } - fn frame_type_id(&self) -> u32 { - todo!() - } +// fn frame_type_id(&self) -> u32 { +// todo!() +// } - fn to_min_max_avg(&mut self) -> Box { - todo!() - } +// fn to_min_max_avg(&mut self) -> Box { +// todo!() +// } - fn to_json_string(&self) -> String { - todo!() - } +// fn to_json_string(&self) -> String { +// todo!() +// } - fn to_json_vec_u8(&self) -> Vec { - todo!() - } +// fn to_json_vec_u8(&self) -> Vec { +// todo!() +// } - fn to_cbor_vec_u8(&self) -> Vec { - todo!() - } +// fn to_cbor_vec_u8(&self) -> Vec { +// todo!() +// } - fn clear(&mut self) { - todo!() - } +// fn clear(&mut self) { +// todo!() +// } - fn to_dim0_f32_for_binning(&self) -> Box { - todo!() - } +// fn to_dim0_f32_for_binning(&self) -> Box { +// todo!() +// } - fn to_container_events(&self) -> Box { - panic!("should not get used") - } -} +// fn to_container_events(&self) -> Box { +// panic!("should not get used") +// } +// } impl CollectableDyn for ChannelEvents { fn new_collector(&self) -> Box { @@ -1047,7 +1065,7 @@ impl WithLen for ChannelEventsCollectorOutput { } } -impl items_0::collect_s::ToJsonResult for ChannelEventsCollectorOutput { +impl items_0::collect_s::ToJsonValue for ChannelEventsCollectorOutput { fn to_json_value(&self) -> Result { serde_json::to_value(self) } @@ -1164,3 +1182,40 @@ impl CollectorDyn for ChannelEventsCollector { } } } + +impl ToJsonValue for ChannelEvents { + fn to_json_value(&self) -> Result { + let ret = match self { + ChannelEvents::Events(x) => x.to_json_value().unwrap(), + ChannelEvents::Status(x) => serde_json::json!({ + "_private_channel_status": x, + }), + }; + Ok(ret) + } +} + +impl ToCborValue for ChannelEvents { + fn to_cbor_value(&self) -> Result { + let ret = match self { + ChannelEvents::Events(x) => x.to_cbor_value()?, + ChannelEvents::Status(x) => { + use ciborium::cbor; + cbor!({ + "_private_channel_status" => x, + }) + .unwrap() + } + }; + Ok(ret) + } +} + +impl ToUserFacingApiType for ChannelEvents { + fn to_user_facing_api_type(self) -> Box { + match self { + ChannelEvents::Events(x) => x.to_user_facing_api_type(), + ChannelEvents::Status(x) => todo!(), + } + } +} diff --git a/src/eventfull.rs b/src/eventfull.rs index a867ada..d169dfc 100644 --- a/src/eventfull.rs +++ b/src/eventfull.rs @@ -257,6 +257,10 @@ impl MergeableTy for EventFull { .map(|x| netpod::TsMs::from_ns_u64(*x)) .collect() } + + fn is_consistent(&self) -> bool { + true + } } #[derive(Debug, ThisError, Serialize, Deserialize)] diff --git a/src/eventsdim0.rs b/src/eventsdim0.rs index cf72bde..765a0c2 100644 --- a/src/eventsdim0.rs +++ b/src/eventsdim0.rs @@ -4,7 +4,7 @@ use err::Error; use items_0::collect_s::CollectableDyn; use items_0::collect_s::CollectedDyn; use items_0::collect_s::CollectorTy; -use items_0::collect_s::ToJsonResult; +use items_0::collect_s::ToJsonValue; use items_0::container::ByteEstimate; use items_0::overlap::HasTimestampDeque; use items_0::scalar_ops::ScalarOps; @@ -374,7 +374,7 @@ impl WithLen for EventsDim0CollectorOutput { } } -impl ToJsonResult for EventsDim0CollectorOutput { +impl ToJsonValue for EventsDim0CollectorOutput { fn to_json_value(&self) -> Result { serde_json::to_value(self) } diff --git a/src/eventsdim0enum.rs b/src/eventsdim0enum.rs index f1e3307..9abf329 100644 --- a/src/eventsdim0enum.rs +++ b/src/eventsdim0enum.rs @@ -4,7 +4,7 @@ use items_0::collect_s::CollectableDyn; use items_0::collect_s::CollectedDyn; use items_0::collect_s::CollectorDyn; use items_0::collect_s::CollectorTy; -use items_0::collect_s::ToJsonResult; +use items_0::collect_s::ToJsonValue; use items_0::container::ByteEstimate; use items_0::isodate::IsoDateTime; use items_0::scalar_ops::ScalarOps; @@ -127,7 +127,7 @@ impl TypeName for EventsDim0EnumCollectorOutput { } } -impl ToJsonResult for EventsDim0EnumCollectorOutput { +impl ToJsonValue for EventsDim0EnumCollectorOutput { fn to_json_value(&self) -> Result { todo!() } diff --git a/src/eventsdim1.rs b/src/eventsdim1.rs index 9d9ba66..9fa8b02 100644 --- a/src/eventsdim1.rs +++ b/src/eventsdim1.rs @@ -5,7 +5,7 @@ use items_0::collect_s::CollectableDyn; use items_0::collect_s::CollectableType; use items_0::collect_s::CollectedDyn; use items_0::collect_s::CollectorTy; -use items_0::collect_s::ToJsonResult; +use items_0::collect_s::ToJsonValue; use items_0::container::ByteEstimate; use items_0::overlap::HasTimestampDeque; use items_0::scalar_ops::ScalarOps; @@ -335,7 +335,7 @@ impl WithLen for EventsDim1CollectorOutput { } } -impl ToJsonResult for EventsDim1CollectorOutput { +impl ToJsonValue for EventsDim1CollectorOutput { fn to_json_value(&self) -> Result { serde_json::to_value(self) } diff --git a/src/framable.rs b/src/framable.rs index 7b08c18..f049810 100644 --- a/src/framable.rs +++ b/src/framable.rs @@ -4,6 +4,7 @@ use crate::frame::make_log_frame; use crate::frame::make_range_complete_frame; use crate::frame::make_stats_frame; use bytes::BytesMut; +use core::fmt; use daqbuf_err as err; use items_0::framable::FrameTypeInnerDyn; use items_0::framable::FrameTypeInnerStatic; @@ -93,7 +94,7 @@ impl FramableInner for T impl Framable for Sitemty where - T: Sized + serde::Serialize + FrameType, + T: Sized + serde::Serialize + FrameType + fmt::Debug, { fn make_frame_dyn(&self) -> Result { match self { diff --git a/src/frame.rs b/src/frame.rs index 50d6a47..8a25636 100644 --- a/src/frame.rs +++ b/src/frame.rs @@ -13,6 +13,7 @@ use bincode::config::WithOtherTrailing; use bincode::DefaultOptions; use bytes::BufMut; use bytes::BytesMut; +use core::fmt; use daqbuf_err as err; use items_0::bincode; use items_0::streamitem::LogItem; @@ -27,6 +28,10 @@ use serde::Serialize; use std::any; use std::io; +const USE_JSON: bool = false; +const EMIT_JSON_DEBUG: bool = false; +const EMIT_POSTCARD_DEBUG: bool = false; + #[derive(Debug, thiserror::Error)] #[cstm(name = "ItemFrame")] pub enum Error { @@ -41,7 +46,10 @@ pub enum Error { RmpEnc(#[from] rmp_serde::encode::Error), RmpDec(#[from] rmp_serde::decode::Error), ErasedSerde(#[from] erased_serde::Error), - Postcard(#[from] postcard::Error), + #[error("PostcardSer({0})")] + PostcardSer(postcard::Error), + #[error("PostcardDe({0}, {1}, {2:?}, {3})")] + PostcardDe(postcard::Error, usize, Vec, &'static str), SerdeJson(#[from] serde_json::Error), } @@ -133,12 +141,19 @@ fn postcard_to_vec(item: T) -> Result, Error> where T: Serialize, { - postcard::to_stdvec(&item).map_err(Error::from) + postcard::to_stdvec(&item) + .map_err(|e| Error::PostcardSer(e)) + .inspect(|x| { + if EMIT_POSTCARD_DEBUG { + let a = &x[0..x.len().min(40)]; + eprintln!("postcard_to_vec {:?} {}", a, std::any::type_name::()); + } + }) } fn postcard_erased_to_vec(item: T) -> Result, Error> where - T: erased_serde::Serialize, + T: erased_serde::Serialize + fmt::Debug, { use postcard::ser_flavors::Flavor; let mut ser1 = postcard::Serializer { @@ -148,36 +163,97 @@ where let mut ser2 = ::erase(&mut ser1); item.erased_serialize(&mut ser2) }?; - let ret = ser1.output.finalize()?; - Ok(ret) + ser1.output + .finalize() + .map_err(|e| Error::PostcardSer(e)) + .inspect(|x| { + if EMIT_POSTCARD_DEBUG { + let a = &x[0..x.len().min(40)]; + eprintln!( + "postcard_erased_to_vec {:?} {:?} {}", + a, + item, + std::any::type_name::() + ); + } + }) } pub fn postcard_from_slice(buf: &[u8]) -> Result where T: for<'de> serde::Deserialize<'de>, { - Ok(postcard::from_bytes(buf)?) + let x = postcard::from_bytes(buf).map_err(|e| { + Error::PostcardDe( + e, + buf.len(), + buf[0..buf.len().min(40)].to_vec(), + std::any::type_name::(), + ) + })?; + Ok(x) } fn json_to_vec(item: T) -> Result, Error> where - T: Serialize, + T: Serialize + fmt::Debug, { - Ok(serde_json::to_vec(&item)?) + serde_json::to_vec(&item).map_err(Into::into).inspect(|x| { + if EMIT_JSON_DEBUG { + let s = String::from_utf8_lossy(&x); + let a = &s[0..x.len().min(80)]; + eprintln!( + "json_to_vec {} {:?} {}", + a, + item, + std::any::type_name::() + ); + } + }) +} + +fn json_erased_to_vec(item: T) -> Result, Error> +where + T: erased_serde::Serialize + fmt::Debug, +{ + let out = Vec::new(); + let mut ser = serde_json::Serializer::new(out); + let x = erased_serde::serialize(&item, &mut ser)?; + assert_eq!(x, ()); + let ret = ser.into_inner(); + Ok(ret).inspect(|x| { + if EMIT_JSON_DEBUG { + let s = String::from_utf8_lossy(&x); + let a = &s[0..s.len().min(80)]; + eprintln!( + "json_erased_to_vec {} {:?} {}", + a, + item, + std::any::type_name::() + ); + } + }) } pub fn json_from_slice(buf: &[u8]) -> Result where T: for<'de> serde::Deserialize<'de>, { + if EMIT_JSON_DEBUG { + let s = String::from_utf8_lossy(&buf); + let a = &s[0..s.len().min(80)]; + eprintln!("json_from_slice {} {}", a, std::any::type_name::()); + } Ok(serde_json::from_slice(buf)?) } pub fn encode_to_vec(item: T) -> Result, Error> where - T: Serialize, + T: Serialize + fmt::Debug, { - if false { + if USE_JSON { + json_to_vec(item) + } else if false { msgpack_to_vec(item) } else if false { bincode_to_vec(item) @@ -188,9 +264,11 @@ where pub fn encode_erased_to_vec(item: T) -> Result, Error> where - T: erased_serde::Serialize, + T: erased_serde::Serialize + fmt::Debug, { - if false { + if USE_JSON { + json_erased_to_vec(item) + } else if false { msgpack_erased_to_vec(item) } else { postcard_erased_to_vec(item) @@ -201,7 +279,9 @@ pub fn decode_from_slice(buf: &[u8]) -> Result where T: for<'de> serde::Deserialize<'de>, { - if false { + if USE_JSON { + json_from_slice(buf) + } else if false { msgpack_from_slice(buf) } else if false { bincode_from_slice(buf) @@ -212,7 +292,7 @@ where pub fn make_frame_2(item: T, fty: u32) -> Result where - T: erased_serde::Serialize, + T: erased_serde::Serialize + fmt::Debug, { let enc = encode_erased_to_vec(item)?; if enc.len() > u32::MAX as usize { diff --git a/src/inmem.rs b/src/inmem.rs index 55a00ab..ce3dd7a 100644 --- a/src/inmem.rs +++ b/src/inmem.rs @@ -25,6 +25,8 @@ impl InMemoryFrame { impl fmt::Debug for InMemoryFrame { fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result { + let v = &self.buf; + let _a = &v[0..v.len().min(40)]; write!( fmt, "InMemoryFrame {{ encid: {:x} tyid: {:x} len {} }}", diff --git a/src/jsonbytes.rs b/src/jsonbytes.rs new file mode 100644 index 0000000..9be908d --- /dev/null +++ b/src/jsonbytes.rs @@ -0,0 +1,58 @@ +use bytes::Bytes; +use items_0::WithLen; + +pub struct JsonBytes(String); + +impl JsonBytes { + pub fn new>(s: S) -> Self { + Self(s.into()) + } + + pub fn into_inner(self) -> String { + self.0 + } + + pub fn len(&self) -> u32 { + self.0.len() as _ + } +} + +impl WithLen for JsonBytes { + fn len(&self) -> usize { + self.len() as usize + } +} + +impl From for String { + fn from(value: JsonBytes) -> Self { + value.0 + } +} + +pub struct CborBytes(Bytes); + +impl CborBytes { + pub fn new>(k: T) -> Self { + Self(k.into()) + } + + pub fn into_inner(self) -> Bytes { + self.0 + } + + pub fn len(&self) -> u32 { + self.0.len() as _ + } +} + +impl WithLen for CborBytes { + fn len(&self) -> usize { + self.len() as usize + } +} + +impl From for Bytes { + fn from(value: CborBytes) -> Self { + value.0 + } +} diff --git a/src/lib.rs b/src/lib.rs index 5caa202..8d44ee3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -9,6 +9,7 @@ pub mod eventsdim1; pub mod framable; pub mod frame; pub mod inmem; +pub mod jsonbytes; pub mod merger; pub mod offsets; pub mod streams;