This commit is contained in:
Dominik Werder
2024-11-26 16:28:38 +01:00
parent f803e07296
commit d1f527a87e
12 changed files with 434 additions and 164 deletions

View File

@@ -8,7 +8,7 @@ use err::thiserror;
use err::ThisError;
use items_0::collect_s::CollectableDyn;
use items_0::collect_s::CollectedDyn;
use items_0::collect_s::ToJsonResult;
use items_0::collect_s::ToJsonValue;
use items_0::timebin::BinningggContainerBinsDyn;
use items_0::timebin::BinsBoxed;
use items_0::vecpreview::VecPreview;
@@ -487,7 +487,7 @@ where
// finished_at: Option<IsoDateTime>,
}
impl<EVT, BVT> ToJsonResult for ContainerBinsCollectorOutput<EVT, BVT>
impl<EVT, BVT> ToJsonValue for ContainerBinsCollectorOutput<EVT, BVT>
where
EVT: EventValueType,
BVT: BinAggedType,

View File

@@ -9,6 +9,11 @@ use core::ops::Range;
use daqbuf_err as err;
use err::thiserror;
use err::ThisError;
use items_0::apitypes::ContainerEventsApi;
use items_0::apitypes::ToUserFacingApiType;
use items_0::apitypes::UserApiType;
use items_0::collect_s::ToCborValue;
use items_0::collect_s::ToJsonValue;
use items_0::container::ByteEstimate;
use items_0::merge::DrainIntoDstResult;
use items_0::merge::DrainIntoNewDynResult;
@@ -300,6 +305,7 @@ mod container_events_serde {
use super::ContainerEvents;
use super::EventValueType;
use serde::de::MapAccess;
use serde::de::SeqAccess;
use serde::de::Visitor;
use serde::ser::SerializeStruct;
use serde::Deserialize;
@@ -309,6 +315,8 @@ mod container_events_serde {
use std::fmt;
use std::marker::PhantomData;
macro_rules! trace_serde { ($($arg:tt)*) => ( if false { eprintln!($($arg)*); }) }
impl<EVT> Serialize for ContainerEvents<EVT>
where
EVT: EventValueType,
@@ -339,10 +347,31 @@ mod container_events_serde {
fmt.write_str("a struct with fields tss and vals")
}
fn visit_seq<S>(self, mut seq: S) -> Result<Self::Value, S::Error>
where
S: SeqAccess<'de>,
{
trace_serde!("Vis ContainerEvents visit_map");
let tss = seq
.next_element()?
.ok_or_else(|| serde::de::Error::invalid_length(0, &self))?;
let vals = seq
.next_element()?
.ok_or_else(|| serde::de::Error::invalid_length(1, &self))?;
let ret = Self::Value {
tss,
vals,
// TODO make container recompute byte_estimate
byte_estimate: 0,
};
Ok(ret)
}
fn visit_map<M>(self, mut map: M) -> Result<Self::Value, M::Error>
where
M: MapAccess<'de>,
{
trace_serde!("Vis ContainerEvents visit_map");
let mut tss = None;
let mut vals = None;
while let Some(key) = map.next_key::<&str>()? {
@@ -433,6 +462,10 @@ where
pub fn iter_zip<'a>(&'a self) -> impl Iterator<Item = (&TsNano, EVT::IterTy1<'a>)> {
self.tss.iter().zip(self.vals.iter_ty_1())
}
pub fn serde_id() -> u32 {
items_0::streamitem::CONTAINER_EVENTS_TYPE_ID
}
}
impl<EVT> fmt::Debug for ContainerEvents<EVT>
@@ -625,6 +658,19 @@ where
MergeableTy::drain_into(self, &mut dst, range);
DrainIntoNewResult::Done(dst)
}
fn is_consistent(&self) -> bool {
let mut good = true;
let n = self.tss.len();
for (&ts1, &ts2) in self.tss.iter().zip(self.tss.range(n.min(1)..n)) {
if ts1 > ts2 {
good = false;
error!("unordered event data ts1 {} ts2 {}", ts1, ts2);
break;
}
}
good
}
}
impl<EVT> MergeableDyn for ContainerEvents<EVT>
@@ -674,6 +720,42 @@ where
DrainIntoNewResult::NotCompatible => DrainIntoNewDynResult::NotCompatible,
}
}
fn is_consistent(&self) -> bool {
MergeableTy::is_consistent(self)
}
}
impl<EVT> ToCborValue for ContainerEvents<EVT>
where
EVT: EventValueType,
{
fn to_cbor_value(&self) -> Result<ciborium::Value, ciborium::value::Error> {
ciborium::value::Value::serialized(self)
}
}
impl<EVT> ToJsonValue for ContainerEvents<EVT>
where
EVT: EventValueType,
{
fn to_json_value(&self) -> Result<serde_json::Value, serde_json::Error> {
serde_json::to_value(self)
}
}
impl<EVT> ToUserFacingApiType for ContainerEvents<EVT>
where
EVT: EventValueType,
{
fn to_user_facing_api_type(self) -> Box<dyn UserApiType> {
let tss: VecDeque<_> = self.tss.into_iter().map(|x| x.ms()).collect();
let ret = ContainerEventsApi {
tss: tss.clone(),
values: tss.clone(),
};
Box::new(ret)
}
}
impl<EVT> BinningggContainerEventsDyn for ContainerEvents<EVT>
@@ -701,7 +783,7 @@ where
}
fn serde_id(&self) -> u32 {
items_0::streamitem::CONTAINER_EVENTS_TYPE_ID
Self::serde_id()
}
fn nty_id(&self) -> u32 {
@@ -716,19 +798,6 @@ where
}
}
fn verify(&self) -> bool {
let mut good = true;
let n = self.tss.len();
for (&ts1, &ts2) in self.tss.iter().zip(self.tss.range(n.min(1)..n)) {
if ts1 > ts2 {
good = false;
error!("unordered event data ts1 {} ts2 {}", ts1, ts2);
break;
}
}
good
}
fn as_mergeable_dyn_mut(&mut self) -> &mut dyn MergeableDyn {
self
}

View File

@@ -1,12 +1,18 @@
use crate::binning::container_events::ContainerEvents;
use crate::binning::container_events::EventValueType;
use crate::framable::FrameType;
use crate::jsonbytes::JsonBytes;
use crate::log::*;
use crate::Events;
use core::ops::Range;
use daqbuf_err as err;
use items_0::apitypes::ToUserFacingApiType;
use items_0::apitypes::UserApiType;
use items_0::collect_s::CollectableDyn;
use items_0::collect_s::CollectedDyn;
use items_0::collect_s::CollectorDyn;
use items_0::collect_s::ToCborValue;
use items_0::collect_s::ToJsonValue;
use items_0::container::ByteEstimate;
use items_0::framable::FrameTypeInnerStatic;
use items_0::isodate::IsoDateTime;
@@ -24,9 +30,9 @@ use items_0::EventsNonObj;
use items_0::Extendable;
use items_0::TypeName;
use items_0::WithLen;
use netpod::log::*;
use netpod::range::evrange::SeriesRange;
use netpod::BinnedRangeEnum;
use netpod::TsMs;
use netpod::TsNano;
use serde::Deserialize;
use serde::Serialize;
@@ -226,9 +232,9 @@ mod serde_channel_events {
use crate::channelevents::ConnStatusEvent;
use crate::eventsdim0::EventsDim0;
use crate::eventsdim1::EventsDim1;
use crate::log::*;
use items_0::subfr::SubFrId;
use items_0::timebin::BinningggContainerEventsDyn;
use netpod::log::*;
use netpod::EnumVariant;
use serde::de;
use serde::de::EnumAccess;
@@ -241,6 +247,8 @@ mod serde_channel_events {
use serde::Serializer;
use std::fmt;
macro_rules! trace_serde { ($($arg:tt)*) => ( if false { eprintln!($($arg)*); }) }
fn try_serialize<S, T>(
v: &dyn BinningggContainerEventsDyn,
ser: &mut <S as Serializer>::SerializeSeq,
@@ -260,7 +268,7 @@ mod serde_channel_events {
struct EvRef<'a>(&'a dyn BinningggContainerEventsDyn);
struct EvBox(Box<dyn Events>);
struct EvBox(Box<dyn BinningggContainerEventsDyn>);
impl<'a> Serialize for EvRef<'a> {
fn serialize<S>(&self, ser: S) -> Result<S::Ok, S::Error>
@@ -322,88 +330,90 @@ mod serde_channel_events {
where
A: de::SeqAccess<'de>,
{
let cty: &str = seq
trace_serde!("EvBoxVis::visit_seq");
type C1<EVT> = ContainerEvents<EVT>;
let cty: u32 = seq
.next_element()?
.ok_or_else(|| de::Error::missing_field("[0] cty"))?;
let nty: u32 = seq
.next_element()?
.ok_or_else(|| de::Error::missing_field("[1] nty"))?;
if cty == EventsDim0::<u8>::serde_id() {
if cty == C1::<u8>::serde_id() {
match nty {
u8::SUB => {
let obj: EventsDim0<u8> = seq
let obj: C1<u8> = seq
.next_element()?
.ok_or_else(|| de::Error::missing_field("[2] obj"))?;
Ok(EvBox(Box::new(obj)))
}
u16::SUB => {
let obj: EventsDim0<u16> = seq
let obj: C1<u16> = seq
.next_element()?
.ok_or_else(|| de::Error::missing_field("[2] obj"))?;
Ok(EvBox(Box::new(obj)))
}
u32::SUB => {
let obj: EventsDim0<u32> = seq
let obj: C1<u32> = seq
.next_element()?
.ok_or_else(|| de::Error::missing_field("[2] obj"))?;
Ok(EvBox(Box::new(obj)))
}
u64::SUB => {
let obj: EventsDim0<u64> = seq
let obj: C1<u64> = seq
.next_element()?
.ok_or_else(|| de::Error::missing_field("[2] obj"))?;
Ok(EvBox(Box::new(obj)))
}
i8::SUB => {
let obj: EventsDim0<i8> = seq
let obj: C1<i8> = seq
.next_element()?
.ok_or_else(|| de::Error::missing_field("[2] obj"))?;
Ok(EvBox(Box::new(obj)))
}
i16::SUB => {
let obj: EventsDim0<i16> = seq
let obj: C1<i16> = seq
.next_element()?
.ok_or_else(|| de::Error::missing_field("[2] obj"))?;
Ok(EvBox(Box::new(obj)))
}
i32::SUB => {
let obj: EventsDim0<i32> = seq
let obj: C1<i32> = seq
.next_element()?
.ok_or_else(|| de::Error::missing_field("[2] obj"))?;
Ok(EvBox(Box::new(obj)))
}
i64::SUB => {
let obj: EventsDim0<i64> = seq
let obj: C1<i64> = seq
.next_element()?
.ok_or_else(|| de::Error::missing_field("[2] obj"))?;
Ok(EvBox(Box::new(obj)))
}
f32::SUB => {
let obj: EventsDim0<f32> = seq
let obj: C1<f32> = seq
.next_element()?
.ok_or_else(|| de::Error::missing_field("[2] obj"))?;
Ok(EvBox(Box::new(obj)))
}
f64::SUB => {
let obj: EventsDim0<f64> = seq
let obj: C1<f64> = seq
.next_element()?
.ok_or_else(|| de::Error::missing_field("[2] obj"))?;
Ok(EvBox(Box::new(obj)))
}
bool::SUB => {
let obj: EventsDim0<bool> = seq
let obj: C1<bool> = seq
.next_element()?
.ok_or_else(|| de::Error::missing_field("[2] obj"))?;
Ok(EvBox(Box::new(obj)))
}
String::SUB => {
let obj: EventsDim0<String> = seq
let obj: C1<String> = seq
.next_element()?
.ok_or_else(|| de::Error::missing_field("[2] obj"))?;
Ok(EvBox(Box::new(obj)))
}
EnumVariant::SUB => {
let obj: EventsDim0<EnumVariant> = seq
let obj: C1<EnumVariant> = seq
.next_element()?
.ok_or_else(|| de::Error::missing_field("[2] obj"))?;
Ok(EvBox(Box::new(obj)))
@@ -413,76 +423,76 @@ mod serde_channel_events {
Err(de::Error::custom(&format!("unknown nty {nty}")))
}
}
} else if cty == EventsDim1::<u8>::serde_id() {
} else if cty == C1::<u8>::serde_id() {
match nty {
u8::SUB => {
let obj: EventsDim1<u8> = seq
let obj: C1<u8> = seq
.next_element()?
.ok_or_else(|| de::Error::missing_field("[2] obj"))?;
Ok(EvBox(Box::new(obj)))
}
u16::SUB => {
let obj: EventsDim1<u16> = seq
let obj: C1<u16> = seq
.next_element()?
.ok_or_else(|| de::Error::missing_field("[2] obj"))?;
Ok(EvBox(Box::new(obj)))
}
u32::SUB => {
let obj: EventsDim1<u32> = seq
let obj: C1<u32> = seq
.next_element()?
.ok_or_else(|| de::Error::missing_field("[2] obj"))?;
Ok(EvBox(Box::new(obj)))
}
u64::SUB => {
let obj: EventsDim1<u64> = seq
let obj: C1<u64> = seq
.next_element()?
.ok_or_else(|| de::Error::missing_field("[2] obj"))?;
Ok(EvBox(Box::new(obj)))
}
i8::SUB => {
let obj: EventsDim1<i8> = seq
let obj: C1<i8> = seq
.next_element()?
.ok_or_else(|| de::Error::missing_field("[2] obj"))?;
Ok(EvBox(Box::new(obj)))
}
i16::SUB => {
let obj: EventsDim1<i16> = seq
let obj: C1<i16> = seq
.next_element()?
.ok_or_else(|| de::Error::missing_field("[2] obj"))?;
Ok(EvBox(Box::new(obj)))
}
i32::SUB => {
let obj: EventsDim1<i32> = seq
let obj: C1<i32> = seq
.next_element()?
.ok_or_else(|| de::Error::missing_field("[2] obj"))?;
Ok(EvBox(Box::new(obj)))
}
i64::SUB => {
let obj: EventsDim1<i64> = seq
let obj: C1<i64> = seq
.next_element()?
.ok_or_else(|| de::Error::missing_field("[2] obj"))?;
Ok(EvBox(Box::new(obj)))
}
f32::SUB => {
let obj: EventsDim1<f32> = seq
let obj: C1<f32> = seq
.next_element()?
.ok_or_else(|| de::Error::missing_field("[2] obj"))?;
Ok(EvBox(Box::new(obj)))
}
f64::SUB => {
let obj: EventsDim1<f64> = seq
let obj: C1<f64> = seq
.next_element()?
.ok_or_else(|| de::Error::missing_field("[2] obj"))?;
Ok(EvBox(Box::new(obj)))
}
bool::SUB => {
let obj: EventsDim1<bool> = seq
let obj: C1<bool> = seq
.next_element()?
.ok_or_else(|| de::Error::missing_field("[2] obj"))?;
Ok(EvBox(Box::new(obj)))
}
String::SUB => {
let obj: EventsDim1<String> = seq
let obj: C1<String> = seq
.next_element()?
.ok_or_else(|| de::Error::missing_field("[2] obj"))?;
Ok(EvBox(Box::new(obj)))
@@ -608,9 +618,7 @@ mod serde_channel_events {
match id {
VarId::Events => {
let x: EvBox = var.newtype_variant()?;
let _ = x;
// Ok(Self::Value::Events(x.0));
todo!()
Ok(Self::Value::Events(x.0))
}
VarId::Status => {
let x: Option<ConnStatusEvent> = var.newtype_variant()?;
@@ -884,11 +892,21 @@ impl MergeableTy for ChannelEvents {
}
}
fn tss_for_testing(&self) -> Vec<netpod::TsMs> {
Events::tss(self)
.iter()
.map(|x| netpod::TsMs::from_ns_u64(*x))
.collect()
fn tss_for_testing(&self) -> Vec<TsMs> {
match self {
ChannelEvents::Events(x) => x.tss_for_testing(),
ChannelEvents::Status(x) => match x {
Some(x) => vec![x.ts.to_ts_ms()],
None => Vec::new(),
},
}
}
fn is_consistent(&self) -> bool {
match self {
ChannelEvents::Events(x) => x.is_consistent(),
ChannelEvents::Status(_) => true,
}
}
}
@@ -898,119 +916,119 @@ impl EventsNonObj for ChannelEvents {
}
}
impl Events for ChannelEvents {
fn verify(&self) -> bool {
todo!()
}
// impl Events for ChannelEvents {
// fn verify(&self) -> bool {
// todo!()
// }
fn output_info(&self) -> String {
todo!()
}
// fn output_info(&self) -> String {
// todo!()
// }
fn as_collectable_mut(&mut self) -> &mut dyn CollectableDyn {
todo!()
}
// fn as_collectable_mut(&mut self) -> &mut dyn CollectableDyn {
// todo!()
// }
fn as_collectable_with_default_ref(&self) -> &dyn CollectableDyn {
todo!()
}
// fn as_collectable_with_default_ref(&self) -> &dyn CollectableDyn {
// todo!()
// }
fn as_collectable_with_default_mut(&mut self) -> &mut dyn CollectableDyn {
todo!()
}
// fn as_collectable_with_default_mut(&mut self) -> &mut dyn CollectableDyn {
// todo!()
// }
fn ts_min(&self) -> Option<u64> {
todo!()
}
// fn ts_min(&self) -> Option<u64> {
// todo!()
// }
fn ts_max(&self) -> Option<u64> {
todo!()
}
// fn ts_max(&self) -> Option<u64> {
// todo!()
// }
fn take_new_events_until_ts(&mut self, _ts_end: u64) -> Box<dyn Events> {
todo!()
}
// fn take_new_events_until_ts(&mut self, _ts_end: u64) -> Box<dyn Events> {
// todo!()
// }
fn new_empty_evs(&self) -> Box<dyn Events> {
todo!()
}
// fn new_empty_evs(&self) -> Box<dyn Events> {
// todo!()
// }
fn drain_into_evs(
&mut self,
dst: &mut dyn Events,
range: (usize, usize),
) -> Result<(), err::Error> {
todo!()
}
// fn drain_into_evs(
// &mut self,
// dst: &mut dyn Events,
// range: (usize, usize),
// ) -> Result<(), err::Error> {
// todo!()
// }
fn find_lowest_index_gt_evs(&self, _ts: u64) -> Option<usize> {
todo!()
}
// fn find_lowest_index_gt_evs(&self, _ts: u64) -> Option<usize> {
// todo!()
// }
fn find_lowest_index_ge_evs(&self, _ts: u64) -> Option<usize> {
todo!()
}
// fn find_lowest_index_ge_evs(&self, _ts: u64) -> Option<usize> {
// todo!()
// }
fn find_highest_index_lt_evs(&self, _ts: u64) -> Option<usize> {
todo!()
}
// fn find_highest_index_lt_evs(&self, _ts: u64) -> Option<usize> {
// todo!()
// }
fn clone_dyn(&self) -> Box<dyn Events> {
todo!()
}
// fn clone_dyn(&self) -> Box<dyn Events> {
// todo!()
// }
fn partial_eq_dyn(&self, _other: &dyn Events) -> bool {
todo!()
}
// fn partial_eq_dyn(&self, _other: &dyn Events) -> bool {
// todo!()
// }
fn serde_id(&self) -> &'static str {
todo!()
}
// fn serde_id(&self) -> &'static str {
// todo!()
// }
fn nty_id(&self) -> u32 {
todo!()
}
// fn nty_id(&self) -> u32 {
// todo!()
// }
fn tss(&self) -> &VecDeque<u64> {
todo!()
}
// fn tss(&self) -> &VecDeque<u64> {
// todo!()
// }
fn pulses(&self) -> &VecDeque<u64> {
todo!()
}
// fn pulses(&self) -> &VecDeque<u64> {
// todo!()
// }
fn frame_type_id(&self) -> u32 {
todo!()
}
// fn frame_type_id(&self) -> u32 {
// todo!()
// }
fn to_min_max_avg(&mut self) -> Box<dyn Events> {
todo!()
}
// fn to_min_max_avg(&mut self) -> Box<dyn Events> {
// todo!()
// }
fn to_json_string(&self) -> String {
todo!()
}
// fn to_json_string(&self) -> String {
// todo!()
// }
fn to_json_vec_u8(&self) -> Vec<u8> {
todo!()
}
// fn to_json_vec_u8(&self) -> Vec<u8> {
// todo!()
// }
fn to_cbor_vec_u8(&self) -> Vec<u8> {
todo!()
}
// fn to_cbor_vec_u8(&self) -> Vec<u8> {
// todo!()
// }
fn clear(&mut self) {
todo!()
}
// fn clear(&mut self) {
// todo!()
// }
fn to_dim0_f32_for_binning(&self) -> Box<dyn Events> {
todo!()
}
// fn to_dim0_f32_for_binning(&self) -> Box<dyn Events> {
// todo!()
// }
fn to_container_events(&self) -> Box<dyn ::items_0::timebin::BinningggContainerEventsDyn> {
panic!("should not get used")
}
}
// fn to_container_events(&self) -> Box<dyn ::items_0::timebin::BinningggContainerEventsDyn> {
// panic!("should not get used")
// }
// }
impl CollectableDyn for ChannelEvents {
fn new_collector(&self) -> Box<dyn CollectorDyn> {
@@ -1047,7 +1065,7 @@ impl WithLen for ChannelEventsCollectorOutput {
}
}
impl items_0::collect_s::ToJsonResult for ChannelEventsCollectorOutput {
impl items_0::collect_s::ToJsonValue for ChannelEventsCollectorOutput {
fn to_json_value(&self) -> Result<serde_json::Value, serde_json::Error> {
serde_json::to_value(self)
}
@@ -1164,3 +1182,40 @@ impl CollectorDyn for ChannelEventsCollector {
}
}
}
impl ToJsonValue for ChannelEvents {
fn to_json_value(&self) -> Result<serde_json::Value, serde_json::Error> {
let ret = match self {
ChannelEvents::Events(x) => x.to_json_value().unwrap(),
ChannelEvents::Status(x) => serde_json::json!({
"_private_channel_status": x,
}),
};
Ok(ret)
}
}
impl ToCborValue for ChannelEvents {
fn to_cbor_value(&self) -> Result<ciborium::Value, ciborium::value::Error> {
let ret = match self {
ChannelEvents::Events(x) => x.to_cbor_value()?,
ChannelEvents::Status(x) => {
use ciborium::cbor;
cbor!({
"_private_channel_status" => x,
})
.unwrap()
}
};
Ok(ret)
}
}
impl ToUserFacingApiType for ChannelEvents {
fn to_user_facing_api_type(self) -> Box<dyn UserApiType> {
match self {
ChannelEvents::Events(x) => x.to_user_facing_api_type(),
ChannelEvents::Status(x) => todo!(),
}
}
}

View File

@@ -257,6 +257,10 @@ impl MergeableTy for EventFull {
.map(|x| netpod::TsMs::from_ns_u64(*x))
.collect()
}
fn is_consistent(&self) -> bool {
true
}
}
#[derive(Debug, ThisError, Serialize, Deserialize)]

View File

@@ -4,7 +4,7 @@ use err::Error;
use items_0::collect_s::CollectableDyn;
use items_0::collect_s::CollectedDyn;
use items_0::collect_s::CollectorTy;
use items_0::collect_s::ToJsonResult;
use items_0::collect_s::ToJsonValue;
use items_0::container::ByteEstimate;
use items_0::overlap::HasTimestampDeque;
use items_0::scalar_ops::ScalarOps;
@@ -374,7 +374,7 @@ impl<STY: ScalarOps> WithLen for EventsDim0CollectorOutput<STY> {
}
}
impl<STY: ScalarOps> ToJsonResult for EventsDim0CollectorOutput<STY> {
impl<STY: ScalarOps> ToJsonValue for EventsDim0CollectorOutput<STY> {
fn to_json_value(&self) -> Result<serde_json::Value, serde_json::Error> {
serde_json::to_value(self)
}

View File

@@ -4,7 +4,7 @@ use items_0::collect_s::CollectableDyn;
use items_0::collect_s::CollectedDyn;
use items_0::collect_s::CollectorDyn;
use items_0::collect_s::CollectorTy;
use items_0::collect_s::ToJsonResult;
use items_0::collect_s::ToJsonValue;
use items_0::container::ByteEstimate;
use items_0::isodate::IsoDateTime;
use items_0::scalar_ops::ScalarOps;
@@ -127,7 +127,7 @@ impl TypeName for EventsDim0EnumCollectorOutput {
}
}
impl ToJsonResult for EventsDim0EnumCollectorOutput {
impl ToJsonValue for EventsDim0EnumCollectorOutput {
fn to_json_value(&self) -> Result<serde_json::Value, serde_json::Error> {
todo!()
}

View File

@@ -5,7 +5,7 @@ use items_0::collect_s::CollectableDyn;
use items_0::collect_s::CollectableType;
use items_0::collect_s::CollectedDyn;
use items_0::collect_s::CollectorTy;
use items_0::collect_s::ToJsonResult;
use items_0::collect_s::ToJsonValue;
use items_0::container::ByteEstimate;
use items_0::overlap::HasTimestampDeque;
use items_0::scalar_ops::ScalarOps;
@@ -335,7 +335,7 @@ impl<STY: ScalarOps> WithLen for EventsDim1CollectorOutput<STY> {
}
}
impl<STY: ScalarOps> ToJsonResult for EventsDim1CollectorOutput<STY> {
impl<STY: ScalarOps> ToJsonValue for EventsDim1CollectorOutput<STY> {
fn to_json_value(&self) -> Result<serde_json::Value, serde_json::Error> {
serde_json::to_value(self)
}

View File

@@ -4,6 +4,7 @@ use crate::frame::make_log_frame;
use crate::frame::make_range_complete_frame;
use crate::frame::make_stats_frame;
use bytes::BytesMut;
use core::fmt;
use daqbuf_err as err;
use items_0::framable::FrameTypeInnerDyn;
use items_0::framable::FrameTypeInnerStatic;
@@ -93,7 +94,7 @@ impl<T: erased_serde::Serialize + FrameTypeInnerDyn + Send> FramableInner for T
impl<T> Framable for Sitemty<T>
where
T: Sized + serde::Serialize + FrameType,
T: Sized + serde::Serialize + FrameType + fmt::Debug,
{
fn make_frame_dyn(&self) -> Result<BytesMut, Error> {
match self {

View File

@@ -13,6 +13,7 @@ use bincode::config::WithOtherTrailing;
use bincode::DefaultOptions;
use bytes::BufMut;
use bytes::BytesMut;
use core::fmt;
use daqbuf_err as err;
use items_0::bincode;
use items_0::streamitem::LogItem;
@@ -27,6 +28,10 @@ use serde::Serialize;
use std::any;
use std::io;
const USE_JSON: bool = false;
const EMIT_JSON_DEBUG: bool = false;
const EMIT_POSTCARD_DEBUG: bool = false;
#[derive(Debug, thiserror::Error)]
#[cstm(name = "ItemFrame")]
pub enum Error {
@@ -41,7 +46,10 @@ pub enum Error {
RmpEnc(#[from] rmp_serde::encode::Error),
RmpDec(#[from] rmp_serde::decode::Error),
ErasedSerde(#[from] erased_serde::Error),
Postcard(#[from] postcard::Error),
#[error("PostcardSer({0})")]
PostcardSer(postcard::Error),
#[error("PostcardDe({0}, {1}, {2:?}, {3})")]
PostcardDe(postcard::Error, usize, Vec<u8>, &'static str),
SerdeJson(#[from] serde_json::Error),
}
@@ -133,12 +141,19 @@ fn postcard_to_vec<T>(item: T) -> Result<Vec<u8>, Error>
where
T: Serialize,
{
postcard::to_stdvec(&item).map_err(Error::from)
postcard::to_stdvec(&item)
.map_err(|e| Error::PostcardSer(e))
.inspect(|x| {
if EMIT_POSTCARD_DEBUG {
let a = &x[0..x.len().min(40)];
eprintln!("postcard_to_vec {:?} {}", a, std::any::type_name::<T>());
}
})
}
fn postcard_erased_to_vec<T>(item: T) -> Result<Vec<u8>, Error>
where
T: erased_serde::Serialize,
T: erased_serde::Serialize + fmt::Debug,
{
use postcard::ser_flavors::Flavor;
let mut ser1 = postcard::Serializer {
@@ -148,36 +163,97 @@ where
let mut ser2 = <dyn erased_serde::Serializer>::erase(&mut ser1);
item.erased_serialize(&mut ser2)
}?;
let ret = ser1.output.finalize()?;
Ok(ret)
ser1.output
.finalize()
.map_err(|e| Error::PostcardSer(e))
.inspect(|x| {
if EMIT_POSTCARD_DEBUG {
let a = &x[0..x.len().min(40)];
eprintln!(
"postcard_erased_to_vec {:?} {:?} {}",
a,
item,
std::any::type_name::<T>()
);
}
})
}
pub fn postcard_from_slice<T>(buf: &[u8]) -> Result<T, Error>
where
T: for<'de> serde::Deserialize<'de>,
{
Ok(postcard::from_bytes(buf)?)
let x = postcard::from_bytes(buf).map_err(|e| {
Error::PostcardDe(
e,
buf.len(),
buf[0..buf.len().min(40)].to_vec(),
std::any::type_name::<T>(),
)
})?;
Ok(x)
}
fn json_to_vec<T>(item: T) -> Result<Vec<u8>, Error>
where
T: Serialize,
T: Serialize + fmt::Debug,
{
Ok(serde_json::to_vec(&item)?)
serde_json::to_vec(&item).map_err(Into::into).inspect(|x| {
if EMIT_JSON_DEBUG {
let s = String::from_utf8_lossy(&x);
let a = &s[0..x.len().min(80)];
eprintln!(
"json_to_vec {} {:?} {}",
a,
item,
std::any::type_name::<T>()
);
}
})
}
fn json_erased_to_vec<T>(item: T) -> Result<Vec<u8>, Error>
where
T: erased_serde::Serialize + fmt::Debug,
{
let out = Vec::new();
let mut ser = serde_json::Serializer::new(out);
let x = erased_serde::serialize(&item, &mut ser)?;
assert_eq!(x, ());
let ret = ser.into_inner();
Ok(ret).inspect(|x| {
if EMIT_JSON_DEBUG {
let s = String::from_utf8_lossy(&x);
let a = &s[0..s.len().min(80)];
eprintln!(
"json_erased_to_vec {} {:?} {}",
a,
item,
std::any::type_name::<T>()
);
}
})
}
pub fn json_from_slice<T>(buf: &[u8]) -> Result<T, Error>
where
T: for<'de> serde::Deserialize<'de>,
{
if EMIT_JSON_DEBUG {
let s = String::from_utf8_lossy(&buf);
let a = &s[0..s.len().min(80)];
eprintln!("json_from_slice {} {}", a, std::any::type_name::<T>());
}
Ok(serde_json::from_slice(buf)?)
}
pub fn encode_to_vec<T>(item: T) -> Result<Vec<u8>, Error>
where
T: Serialize,
T: Serialize + fmt::Debug,
{
if false {
if USE_JSON {
json_to_vec(item)
} else if false {
msgpack_to_vec(item)
} else if false {
bincode_to_vec(item)
@@ -188,9 +264,11 @@ where
pub fn encode_erased_to_vec<T>(item: T) -> Result<Vec<u8>, Error>
where
T: erased_serde::Serialize,
T: erased_serde::Serialize + fmt::Debug,
{
if false {
if USE_JSON {
json_erased_to_vec(item)
} else if false {
msgpack_erased_to_vec(item)
} else {
postcard_erased_to_vec(item)
@@ -201,7 +279,9 @@ pub fn decode_from_slice<T>(buf: &[u8]) -> Result<T, Error>
where
T: for<'de> serde::Deserialize<'de>,
{
if false {
if USE_JSON {
json_from_slice(buf)
} else if false {
msgpack_from_slice(buf)
} else if false {
bincode_from_slice(buf)
@@ -212,7 +292,7 @@ where
pub fn make_frame_2<T>(item: T, fty: u32) -> Result<BytesMut, Error>
where
T: erased_serde::Serialize,
T: erased_serde::Serialize + fmt::Debug,
{
let enc = encode_erased_to_vec(item)?;
if enc.len() > u32::MAX as usize {

View File

@@ -25,6 +25,8 @@ impl InMemoryFrame {
impl fmt::Debug for InMemoryFrame {
fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
let v = &self.buf;
let _a = &v[0..v.len().min(40)];
write!(
fmt,
"InMemoryFrame {{ encid: {:x} tyid: {:x} len {} }}",

58
src/jsonbytes.rs Normal file
View File

@@ -0,0 +1,58 @@
use bytes::Bytes;
use items_0::WithLen;
pub struct JsonBytes(String);
impl JsonBytes {
pub fn new<S: Into<String>>(s: S) -> Self {
Self(s.into())
}
pub fn into_inner(self) -> String {
self.0
}
pub fn len(&self) -> u32 {
self.0.len() as _
}
}
impl WithLen for JsonBytes {
fn len(&self) -> usize {
self.len() as usize
}
}
impl From<JsonBytes> for String {
fn from(value: JsonBytes) -> Self {
value.0
}
}
pub struct CborBytes(Bytes);
impl CborBytes {
pub fn new<T: Into<Bytes>>(k: T) -> Self {
Self(k.into())
}
pub fn into_inner(self) -> Bytes {
self.0
}
pub fn len(&self) -> u32 {
self.0.len() as _
}
}
impl WithLen for CborBytes {
fn len(&self) -> usize {
self.len() as usize
}
}
impl From<CborBytes> for Bytes {
fn from(value: CborBytes) -> Self {
value.0
}
}

View File

@@ -9,6 +9,7 @@ pub mod eventsdim1;
pub mod framable;
pub mod frame;
pub mod inmem;
pub mod jsonbytes;
pub mod merger;
pub mod offsets;
pub mod streams;