commit 60330a3e824251c0c7745a0f5a46738645638358 Author: Dominik Werder Date: Thu Nov 7 20:57:38 2024 +0100 Factored into separate crate diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..1b72444 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +/Cargo.lock +/target diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..160b83a --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,23 @@ +[package] +name = "daqbuf-items-0" +version = "0.0.3" +authors = ["Dominik Werder "] +edition = "2021" + +[lib] +path = "src/lib.rs" + +[dependencies] +serde = { version = "1", features = ["derive"] } +erased-serde = "0.4" +typetag = "0.2" +serde_json = "1" +bincode = "1.3.3" +bytes = "1.8.0" +futures-util = "0.3.24" +chrono = { version = "0.4.19", features = ["serde"] } +netpod = { path = "../daqbuf-netpod", package = "daqbuf-netpod" } +daqbuf-err = { path = "../daqbuf-err" } + +[patch.crates-io] +thiserror = { git = "https://github.com/dominikwerder/thiserror.git", branch = "cstm" } diff --git a/src/collect_s.rs b/src/collect_s.rs new file mode 100644 index 0000000..f57cee6 --- /dev/null +++ b/src/collect_s.rs @@ -0,0 +1,255 @@ +use crate::container::ByteEstimate; +use crate::timebin::BinningggContainerBinsDyn; +use crate::AsAnyMut; +use crate::AsAnyRef; +use crate::Events; +use crate::TypeName; +use crate::WithLen; +use daqbuf_err as err; +use err::Error; +use netpod::log::*; +use netpod::range::evrange::SeriesRange; +use netpod::BinnedRangeEnum; +use serde::Serialize; +use std::any; +use std::any::Any; +use std::fmt; + +// TODO check usage of this trait +pub trait ToJsonBytes { + fn to_json_bytes(&self) -> Result, Error>; +} + +pub trait ToJsonResult: fmt::Debug + AsAnyRef + AsAnyMut + Send { + fn to_json_value(&self) -> Result; +} + +impl AsAnyRef for serde_json::Value { + fn as_any_ref(&self) -> &dyn Any { + self + } +} + +impl AsAnyMut for serde_json::Value { + fn as_any_mut(&mut self) -> &mut dyn Any { + self + } +} + +impl ToJsonResult for serde_json::Value { + fn to_json_value(&self) -> Result { + Ok(self.clone()) + } +} + +impl ToJsonBytes for serde_json::Value { + fn to_json_bytes(&self) -> Result, Error> { + Ok(serde_json::to_vec(self)?) + } +} + +pub trait CollectedDyn: fmt::Debug + TypeName + Send + AsAnyRef + WithLen + ToJsonResult {} + +impl ToJsonResult for Box { + fn to_json_value(&self) -> Result { + ToJsonResult::to_json_value(self.as_ref()) + } +} + +impl WithLen for Box { + fn len(&self) -> usize { + self.as_ref().len() + } +} + +impl TypeName for Box { + fn type_name(&self) -> String { + self.as_ref().type_name() + } +} + +impl CollectedDyn for Box {} + +pub trait CollectorTy: fmt::Debug + Send + Unpin + WithLen + ByteEstimate { + type Input: CollectableDyn; + type Output: CollectedDyn + ToJsonResult + Serialize; + + fn ingest(&mut self, src: &mut Self::Input); + fn set_range_complete(&mut self); + fn set_timed_out(&mut self); + fn set_continue_at_here(&mut self); + + // TODO use this crate's Error instead: + fn result(&mut self, range: Option, binrange: Option) -> Result; +} + +pub trait CollectorDyn: fmt::Debug + Send + WithLen + ByteEstimate { + fn ingest(&mut self, src: &mut dyn CollectableDyn); + fn set_range_complete(&mut self); + fn set_timed_out(&mut self); + fn set_continue_at_here(&mut self); + // TODO factor the required parameters into new struct? Generic over events or binned? + fn result( + &mut self, + range: Option, + binrange: Option, + ) -> Result, Error>; +} + +impl CollectorDyn for T +where + T: fmt::Debug + CollectorTy + 'static, +{ + fn ingest(&mut self, src: &mut dyn CollectableDyn) { + if let Some(src) = src.as_any_mut().downcast_mut::<::Input>() { + trace!("sees incoming &mut ref"); + T::ingest(self, src) + } else { + if let Some(src) = src.as_any_mut().downcast_mut::::Input>>() { + trace!("sees incoming &mut Box"); + T::ingest(self, src) + } else { + error!( + "No idea what this is. Expect: {} input {} got: {} {:?}", + any::type_name::(), + any::type_name::<::Input>(), + src.type_name(), + src + ); + } + } + } + + fn set_range_complete(&mut self) { + T::set_range_complete(self) + } + + fn set_timed_out(&mut self) { + T::set_timed_out(self) + } + + fn set_continue_at_here(&mut self) { + T::set_continue_at_here(self) + } + + fn result( + &mut self, + range: Option, + binrange: Option, + ) -> Result, Error> { + let ret = T::result(self, range, binrange)?; + Ok(Box::new(ret)) + } +} + +// TODO rename to `Typed` +pub trait CollectableType: fmt::Debug + WithLen + AsAnyRef + AsAnyMut + TypeName + Send { + type Collector: CollectorTy; + fn new_collector() -> Self::Collector; +} + +#[derive(Debug)] +pub struct CollectorForDyn { + inner: Box, +} + +impl WithLen for CollectorForDyn { + fn len(&self) -> usize { + todo!() + } +} + +impl ByteEstimate for CollectorForDyn { + fn byte_estimate(&self) -> u64 { + todo!() + } +} + +impl CollectorDyn for CollectorForDyn { + fn ingest(&mut self, src: &mut dyn CollectableDyn) { + todo!() + } + + fn set_range_complete(&mut self) { + todo!() + } + + fn set_timed_out(&mut self) { + todo!() + } + + fn set_continue_at_here(&mut self) { + todo!() + } + + fn result( + &mut self, + range: Option, + binrange: Option, + ) -> Result, Error> { + todo!() + } +} + +pub trait CollectableDyn: fmt::Debug + WithLen + AsAnyRef + AsAnyMut + TypeName + Send { + fn new_collector(&self) -> Box; +} + +impl TypeName for Box { + fn type_name(&self) -> String { + BinningggContainerBinsDyn::type_name(self.as_ref()).into() + } +} + +impl WithLen for Box { + fn len(&self) -> usize { + WithLen::len(self.as_ref()) + } +} + +impl CollectableDyn for Box { + fn new_collector(&self) -> Box { + self.as_ref().new_collector() + } +} + +impl TypeName for Box { + fn type_name(&self) -> String { + self.as_ref().type_name() + } +} + +impl CollectableDyn for Box { + fn new_collector(&self) -> Box { + self.as_ref().new_collector() + } +} + +impl CollectableDyn for T +where + T: CollectableType + 'static, +{ + fn new_collector(&self) -> Box { + Box::new(T::new_collector()) + } +} + +impl TypeName for Box { + fn type_name(&self) -> String { + self.as_ref().type_name() + } +} + +// TODO do this with some blanket impl: +impl WithLen for Box { + fn len(&self) -> usize { + WithLen::len(self.as_ref()) + } +} + +// TODO do this with some blanket impl: +impl CollectableDyn for Box { + fn new_collector(&self) -> Box { + CollectableDyn::new_collector(self.as_ref()) + } +} diff --git a/src/container.rs b/src/container.rs new file mode 100644 index 0000000..e26c1f7 --- /dev/null +++ b/src/container.rs @@ -0,0 +1,11 @@ +use crate::Events; + +pub trait ByteEstimate { + fn byte_estimate(&self) -> u64; +} + +impl ByteEstimate for Box { + fn byte_estimate(&self) -> u64 { + self.as_ref().byte_estimate() + } +} diff --git a/src/events.rs b/src/events.rs new file mode 100644 index 0000000..ef7af24 --- /dev/null +++ b/src/events.rs @@ -0,0 +1,286 @@ +pub use futures_util; + +use crate::collect_s::CollectableDyn; +use crate::container::ByteEstimate; +use crate::timebin::BinningggContainerEventsDyn; +use daqbuf_err as err; +use std::any::Any; +use std::collections::VecDeque; +use std::fmt; + +pub trait WithLen { + fn len(&self) -> usize; +} + +impl WithLen for bytes::Bytes { + fn len(&self) -> usize { + self.len() + } +} + +pub trait Empty { + fn empty() -> Self; +} + +pub trait Resettable { + fn reset(&mut self); +} + +pub trait Appendable: Empty + WithLen { + fn push(&mut self, ts: u64, pulse: u64, value: STY); +} + +pub trait Extendable: Empty + WithLen { + fn extend_from(&mut self, src: &mut Self); +} + +pub trait TypeName { + fn type_name(&self) -> String; +} + +pub trait AppendEmptyBin { + fn append_empty_bin(&mut self, ts1: u64, ts2: u64); +} + +// TODO rename to make it clear that this moves. Better use drain-into or something similar. +pub trait AppendAllFrom { + fn append_all_from(&mut self, src: &mut Self); +} + +// TODO check usage, probably only for legacy +pub trait HasNonemptyFirstBin { + fn has_nonempty_first_bin(&self) -> bool; +} + +pub trait AsAnyRef { + fn as_any_ref(&self) -> &dyn Any; +} + +pub trait AsAnyMut { + fn as_any_mut(&mut self) -> &mut dyn Any; +} + +impl AsAnyRef for Box +where + T: AsAnyRef + ?Sized, +{ + fn as_any_ref(&self) -> &dyn Any { + self.as_ref().as_any_ref() + } +} + +impl AsAnyMut for Box +where + T: AsAnyMut + ?Sized, +{ + fn as_any_mut(&mut self) -> &mut dyn Any { + self.as_mut().as_any_mut() + } +} + +#[derive(Debug)] +pub enum MergeError { + NotCompatible, + Full, +} + +impl From for err::Error { + fn from(e: MergeError) -> Self { + e.to_string().into() + } +} + +impl fmt::Display for MergeError { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + write!(fmt, "{:?}", self) + } +} + +impl std::error::Error for MergeError {} + +// TODO can I remove the Any bound? + +/// Container of some form of events, for use as trait object. +pub trait Events: + fmt::Debug + + TypeName + + Any + + CollectableDyn + + WithLen + + ByteEstimate + + Send + + erased_serde::Serialize + + EventsNonObj +{ + fn verify(&self) -> bool; + fn output_info(&self) -> String; + fn as_collectable_mut(&mut self) -> &mut dyn CollectableDyn; + fn as_collectable_with_default_ref(&self) -> &dyn CollectableDyn; + fn as_collectable_with_default_mut(&mut self) -> &mut dyn CollectableDyn; + fn ts_min(&self) -> Option; + fn ts_max(&self) -> Option; + // TODO is this used? + fn take_new_events_until_ts(&mut self, ts_end: u64) -> Box; + fn new_empty_evs(&self) -> Box; + fn drain_into_evs( + &mut self, + dst: &mut dyn Events, + range: (usize, usize), + ) -> Result<(), MergeError>; + fn find_lowest_index_gt_evs(&self, ts: u64) -> Option; + fn find_lowest_index_ge_evs(&self, ts: u64) -> Option; + fn find_highest_index_lt_evs(&self, ts: u64) -> Option; + fn clone_dyn(&self) -> Box; + fn partial_eq_dyn(&self, other: &dyn Events) -> bool; + fn serde_id(&self) -> &'static str; + fn nty_id(&self) -> u32; + fn tss(&self) -> &VecDeque; + fn pulses(&self) -> &VecDeque; + fn frame_type_id(&self) -> u32; + fn to_min_max_avg(&mut self) -> Box; + fn to_json_string(&self) -> String; + fn to_json_vec_u8(&self) -> Vec; + fn to_cbor_vec_u8(&self) -> Vec; + fn clear(&mut self); + // TODO: can not name EventsDim0 from here, so use trait object for now. Anyway is a workaround. + fn to_dim0_f32_for_binning(&self) -> Box; + fn to_container_events(&self) -> Box; +} + +impl WithLen for Box { + fn len(&self) -> usize { + self.as_ref().len() + } +} + +pub trait EventsNonObj { + fn into_tss_pulses(self: Box) -> (VecDeque, VecDeque); +} + +erased_serde::serialize_trait_object!(Events); + +impl PartialEq for Box { + fn eq(&self, other: &Self) -> bool { + Events::partial_eq_dyn(self.as_ref(), other.as_ref()) + } +} + +impl EventsNonObj for Box { + fn into_tss_pulses(self: Box) -> (VecDeque, VecDeque) { + todo!() + } +} + +impl Events for Box { + fn verify(&self) -> bool { + Events::verify(self.as_ref()) + } + + fn output_info(&self) -> String { + Events::output_info(self.as_ref()) + } + + fn as_collectable_mut(&mut self) -> &mut dyn CollectableDyn { + Events::as_collectable_mut(self.as_mut()) + } + + fn as_collectable_with_default_ref(&self) -> &dyn CollectableDyn { + Events::as_collectable_with_default_ref(self.as_ref()) + } + + fn as_collectable_with_default_mut(&mut self) -> &mut dyn CollectableDyn { + Events::as_collectable_with_default_mut(self.as_mut()) + } + + fn ts_min(&self) -> Option { + Events::ts_min(self.as_ref()) + } + + fn ts_max(&self) -> Option { + Events::ts_max(self.as_ref()) + } + + fn take_new_events_until_ts(&mut self, ts_end: u64) -> Box { + Events::take_new_events_until_ts(self.as_mut(), ts_end) + } + + fn new_empty_evs(&self) -> Box { + Events::new_empty_evs(self.as_ref()) + } + + fn drain_into_evs( + &mut self, + dst: &mut dyn Events, + range: (usize, usize), + ) -> Result<(), MergeError> { + Events::drain_into_evs(self.as_mut(), dst, range) + } + + fn find_lowest_index_gt_evs(&self, ts: u64) -> Option { + Events::find_lowest_index_gt_evs(self.as_ref(), ts) + } + + fn find_lowest_index_ge_evs(&self, ts: u64) -> Option { + Events::find_lowest_index_ge_evs(self.as_ref(), ts) + } + + fn find_highest_index_lt_evs(&self, ts: u64) -> Option { + Events::find_highest_index_lt_evs(self.as_ref(), ts) + } + + fn clone_dyn(&self) -> Box { + Events::clone_dyn(self.as_ref()) + } + + fn partial_eq_dyn(&self, other: &dyn Events) -> bool { + Events::partial_eq_dyn(self.as_ref(), other) + } + + fn serde_id(&self) -> &'static str { + Events::serde_id(self.as_ref()) + } + + fn nty_id(&self) -> u32 { + Events::nty_id(self.as_ref()) + } + + fn tss(&self) -> &VecDeque { + Events::tss(self.as_ref()) + } + + fn pulses(&self) -> &VecDeque { + Events::pulses(self.as_ref()) + } + + fn frame_type_id(&self) -> u32 { + Events::frame_type_id(self.as_ref()) + } + + fn to_min_max_avg(&mut self) -> Box { + Events::to_min_max_avg(self.as_mut()) + } + + fn to_json_string(&self) -> String { + Events::to_json_string(self.as_ref()) + } + + fn to_json_vec_u8(&self) -> Vec { + Events::to_json_vec_u8(self.as_ref()) + } + + fn to_cbor_vec_u8(&self) -> Vec { + Events::to_cbor_vec_u8(self.as_ref()) + } + + fn clear(&mut self) { + Events::clear(self.as_mut()) + } + + fn to_dim0_f32_for_binning(&self) -> Box { + Events::to_dim0_f32_for_binning(self.as_ref()) + } + + fn to_container_events(&self) -> Box { + Events::to_container_events(self.as_ref()) + } +} diff --git a/src/framable.rs b/src/framable.rs new file mode 100644 index 0000000..5fd10cf --- /dev/null +++ b/src/framable.rs @@ -0,0 +1,19 @@ +// Required for any inner type of Sitemty. +pub trait FrameTypeInnerStatic { + const FRAME_TYPE_ID: u32; +} + +// To be implemented by the T of Sitemty, e.g. ScalarEvents. +pub trait FrameTypeInnerDyn { + // TODO check actual usage of this + fn frame_type_id(&self) -> u32; +} + +impl FrameTypeInnerDyn for T +where + T: FrameTypeInnerStatic, +{ + fn frame_type_id(&self) -> u32 { + ::FRAME_TYPE_ID + } +} diff --git a/src/isodate.rs b/src/isodate.rs new file mode 100644 index 0000000..18abefd --- /dev/null +++ b/src/isodate.rs @@ -0,0 +1,41 @@ +use chrono::DateTime; +use chrono::TimeZone; +use chrono::Utc; +use netpod::DATETIME_FMT_3MS; +use serde::Deserialize; +use serde::Serialize; +use serde::Serializer; + +#[derive(Debug, Clone, PartialEq, Deserialize)] +pub struct IsoDateTime(DateTime); + +impl IsoDateTime { + pub fn from_unix_millis(ms: u64) -> Self { + // let datetime = chrono::DateTime::from_timestamp_millis(ms as i64).unwrap(); + // Self(datetime) + IsoDateTime( + Utc.timestamp_millis_opt(ms as i64) + .earliest() + .unwrap_or(Utc.timestamp_nanos(0)), + ) + } + + pub fn from_ns_u64(ts: u64) -> Self { + IsoDateTime(Utc.timestamp_nanos(ts as i64)) + } +} + +impl Serialize for IsoDateTime { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + serializer.serialize_str(&self.0.format(DATETIME_FMT_3MS).to_string()) + } +} + +pub fn make_iso_ts(tss: &[u64]) -> Vec { + tss.iter() + .map(|&k| IsoDateTime(Utc.timestamp_nanos(k as i64))) + .collect() +} diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..1b2d02e --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,19 @@ +pub mod collect_s; +pub mod container; +pub mod events; +pub mod framable; +pub mod isodate; +pub mod overlap; +pub mod scalar_ops; +pub mod streamitem; +pub mod subfr; +pub mod test; +pub mod timebin; +pub mod transform; +pub mod vecpreview; + +pub mod bincode { + pub use bincode::*; +} + +pub use events::*; diff --git a/src/overlap.rs b/src/overlap.rs new file mode 100644 index 0000000..2943395 --- /dev/null +++ b/src/overlap.rs @@ -0,0 +1,7 @@ +// TODO rename, no more deque involved +pub trait HasTimestampDeque { + fn timestamp_min(&self) -> Option; + fn timestamp_max(&self) -> Option; + fn pulse_min(&self) -> Option; + fn pulse_max(&self) -> Option; +} diff --git a/src/scalar_ops.rs b/src/scalar_ops.rs new file mode 100644 index 0000000..2e739fa --- /dev/null +++ b/src/scalar_ops.rs @@ -0,0 +1,309 @@ +use crate::container::ByteEstimate; +use crate::subfr::SubFrId; +use daqbuf_err as err; +use netpod::EnumVariant; +use netpod::StringFix; +use serde::Serialize; +use std::fmt; +use std::ops; + +#[allow(unused)] +const fn is_nan_int(_x: &T) -> bool { + false +} + +#[allow(unused)] +fn is_nan_f32(x: f32) -> bool { + x.is_nan() +} + +#[allow(unused)] +fn is_nan_f64(x: f64) -> bool { + x.is_nan() +} + +pub trait AsPrimF32 { + fn as_prim_f32_b(&self) -> f32; +} + +macro_rules! impl_as_prim_f32 { + ($ty:ident) => { + impl AsPrimF32 for $ty { + fn as_prim_f32_b(&self) -> f32 { + *self as f32 + } + } + }; +} + +impl_as_prim_f32!(u8); +impl_as_prim_f32!(u16); +impl_as_prim_f32!(u32); +impl_as_prim_f32!(u64); +impl_as_prim_f32!(i8); +impl_as_prim_f32!(i16); +impl_as_prim_f32!(i32); +impl_as_prim_f32!(i64); +impl_as_prim_f32!(f32); +impl_as_prim_f32!(f64); + +impl AsPrimF32 for bool { + fn as_prim_f32_b(&self) -> f32 { + if *self { + 1. + } else { + 0. + } + } +} + +impl AsPrimF32 for String { + fn as_prim_f32_b(&self) -> f32 { + // Well, at least some impl. + self.len() as f32 + } +} + +pub trait ScalarOps: + fmt::Debug + + fmt::Display + + Clone + + PartialOrd + + PartialEq + + SubFrId + + AsPrimF32 + + ByteEstimate + + Serialize + + Unpin + + Send + + 'static +{ + fn scalar_type_name() -> &'static str; + fn zero_b() -> Self; + fn equal_slack(&self, rhs: &Self) -> bool; + fn add(&mut self, rhs: &Self); + fn div(&mut self, n: usize); + fn find_vec_min(a: &Vec) -> Option; + fn find_vec_max(a: &Vec) -> Option; + fn avg_vec(a: &Vec) -> Option; +} + +macro_rules! impl_scalar_ops { + ($ty:ident, $zero:expr, $equal_slack:ident, $mac_add:ident, $mac_div:ident, $sty_name:expr, $byte_estimate:expr) => { + impl ByteEstimate for $ty { + fn byte_estimate(&self) -> u64 { + $byte_estimate + } + } + + impl ScalarOps for $ty { + fn scalar_type_name() -> &'static str { + $sty_name + } + + fn zero_b() -> Self { + $zero + } + + fn equal_slack(&self, rhs: &Self) -> bool { + $equal_slack(self, rhs) + } + + fn add(&mut self, rhs: &Self) { + $mac_add!(self, rhs); + } + + fn div(&mut self, n: usize) { + $mac_div!(self, n); + } + + fn find_vec_min(a: &Vec) -> Option { + if a.len() == 0 { + None + } else { + let mut k = &a[0]; + for (i, v) in a.iter().enumerate() { + if *v < *k { + k = &a[i]; + } + } + Some(k.clone()) + } + } + + fn find_vec_max(a: &Vec) -> Option { + if a.len() == 0 { + None + } else { + let mut k = &a[0]; + for (i, v) in a.iter().enumerate() { + if *v > *k { + k = &a[i]; + } + } + Some(k.clone()) + } + } + + fn avg_vec(a: &Vec) -> Option { + if a.len() == 0 { + None + } else { + let mut sum = Self::zero_b(); + let mut c = 0; + for v in a.iter() { + sum.add(v); + c += 1; + } + ScalarOps::div(&mut sum, c); + Some(sum) + } + } + } + }; +} + +fn equal_int(a: T, b: T) -> bool { + a == b +} + +fn equal_f32(&a: &f32, &b: &f32) -> bool { + (a - b).abs() < 1e-4 || (a / b > 0.999 && a / b < 1.001) +} + +fn equal_f64(&a: &f64, &b: &f64) -> bool { + (a - b).abs() < 1e-6 || (a / b > 0.99999 && a / b < 1.00001) +} + +fn equal_bool(&a: &bool, &b: &bool) -> bool { + a == b +} + +fn equal_string(a: &String, b: &String) -> bool { + a == b +} + +fn _add_int(a: &mut T, b: &T) { + let _ = b; + ops::AddAssign::add_assign(a, err::todoval()); +} + +macro_rules! add_int { + ($a:expr, $b:expr) => { + *$a += $b; + }; +} + +macro_rules! add_bool { + ($a:expr, $b:expr) => { + *$a |= $b; + }; +} + +macro_rules! add_string { + ($a:expr, $b:expr) => { + $a.push_str($b); + }; +} + +macro_rules! div_int { + ($a:expr, $b:expr) => { + // TODO what is this used for? + + // TODO for average calculation, the accumulator must be large enough! + // Use u64 for all ints, and f32 for all floats. + // Therefore, the name "add" is too general. + //*$a /= $b; + { + let _ = $a; + let _ = $b; + } + }; +} + +macro_rules! div_bool { + ($a:expr, $b:expr) => { + // TODO what is this used for? + { + let _ = $a; + let _ = $b; + } + }; +} + +macro_rules! div_string { + ($a:expr, $b:expr) => { + // TODO what is this used for? + { + let _ = $a; + let _ = $b; + } + }; +} + +impl_scalar_ops!(u8, 0, equal_int, add_int, div_int, "u8", 1); +impl_scalar_ops!(u16, 0, equal_int, add_int, div_int, "u16", 2); +impl_scalar_ops!(u32, 0, equal_int, add_int, div_int, "u32", 4); +impl_scalar_ops!(u64, 0, equal_int, add_int, div_int, "u64", 8); +impl_scalar_ops!(i8, 0, equal_int, add_int, div_int, "i8", 1); +impl_scalar_ops!(i16, 0, equal_int, add_int, div_int, "i16", 2); +impl_scalar_ops!(i32, 0, equal_int, add_int, div_int, "i32", 4); +impl_scalar_ops!(i64, 0, equal_int, add_int, div_int, "i64", 8); +impl_scalar_ops!(f32, 0., equal_f32, add_int, div_int, "f32", 4); +impl_scalar_ops!(f64, 0., equal_f64, add_int, div_int, "f64", 8); +impl_scalar_ops!(bool, false, equal_bool, add_bool, div_bool, "bool", 1); +impl_scalar_ops!( + String, + String::new(), + equal_string, + add_string, + div_string, + "string", + 16 +); + +impl ByteEstimate for EnumVariant { + fn byte_estimate(&self) -> u64 { + 12 + } +} + +impl AsPrimF32 for EnumVariant { + fn as_prim_f32_b(&self) -> f32 { + self.ix() as f32 + } +} + +impl ScalarOps for EnumVariant { + fn scalar_type_name() -> &'static str { + "enumvariant" + } + + fn zero_b() -> Self { + EnumVariant::default() + } + + fn equal_slack(&self, rhs: &Self) -> bool { + self == rhs + } + + fn add(&mut self, _rhs: &Self) { + // undefined so far + } + + fn div(&mut self, _n: usize) { + // undefined so far + } + + fn find_vec_min(a: &Vec) -> Option { + todo!() + } + + fn find_vec_max(a: &Vec) -> Option { + todo!() + } + + fn avg_vec(a: &Vec) -> Option { + todo!() + } +} diff --git a/src/streamitem.rs b/src/streamitem.rs new file mode 100644 index 0000000..00b28de --- /dev/null +++ b/src/streamitem.rs @@ -0,0 +1,219 @@ +use daqbuf_err as err; +use netpod::log::Level; +use netpod::DiskStats; +use netpod::EventDataReadStats; +use netpod::RangeFilterStats; +use serde::Deserialize; +use serde::Serialize; + +pub const TERM_FRAME_TYPE_ID: u32 = 0xaa01; +pub const ERROR_FRAME_TYPE_ID: u32 = 0xaa02; +pub const SITEMTY_NONSPEC_FRAME_TYPE_ID: u32 = 0xaa04; +pub const EVENT_QUERY_JSON_STRING_FRAME: u32 = 0x100; +pub const EVENTS_0D_FRAME_TYPE_ID: u32 = 0x500; +pub const MIN_MAX_AVG_DIM_0_BINS_FRAME_TYPE_ID: u32 = 0x700; +pub const MIN_MAX_AVG_DIM_1_BINS_FRAME_TYPE_ID: u32 = 0x800; +pub const MIN_MAX_AVG_WAVE_BINS: u32 = 0xa00; +pub const WAVE_EVENTS_FRAME_TYPE_ID: u32 = 0xb00; +pub const LOG_FRAME_TYPE_ID: u32 = 0xc00; +pub const STATS_FRAME_TYPE_ID: u32 = 0xd00; +pub const RANGE_COMPLETE_FRAME_TYPE_ID: u32 = 0xe00; +pub const EVENT_FULL_FRAME_TYPE_ID: u32 = 0x2200; +pub const EVENTS_ITEM_FRAME_TYPE_ID: u32 = 0x2300; +pub const STATS_EVENTS_FRAME_TYPE_ID: u32 = 0x2400; +pub const ITEMS_2_CHANNEL_EVENTS_FRAME_TYPE_ID: u32 = 0x2500; +pub const X_BINNED_SCALAR_EVENTS_FRAME_TYPE_ID: u32 = 0x8800; +pub const X_BINNED_WAVE_EVENTS_FRAME_TYPE_ID: u32 = 0x8900; +pub const DATABUFFER_EVENT_BLOB_FRAME_TYPE_ID: u32 = 0x8a00; + +pub fn bool_is_false(j: &bool) -> bool { + *j == false +} + +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +pub enum RangeCompletableItem { + RangeComplete, + Data(T), +} + +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +pub enum StatsItem { + EventDataReadStats(EventDataReadStats), + RangeFilterStats(RangeFilterStats), + DiskStats(DiskStats), + Warnings(), +} + +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +pub enum StreamItem { + DataItem(T), + Log(LogItem), + Stats(StatsItem), +} + +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +pub struct LogItem { + pub node_ix: u32, + #[serde(with = "levelserde")] + pub level: Level, + pub msg: String, +} + +impl LogItem { + pub fn from_node(node_ix: usize, level: Level, msg: String) -> Self { + Self { + node_ix: node_ix as _, + level, + msg, + } + } +} + +pub type SitemErrTy = err::Error; + +pub type Sitemty = Result>, SitemErrTy>; + +pub type Sitemty2 = Result>, E>; + +#[macro_export] +macro_rules! on_sitemty_range_complete { + ($item:expr, $ex:expr) => { + if let Ok($crate::StreamItem::DataItem($crate::RangeCompletableItem::RangeComplete)) = $item { + $ex + } + }; +} + +#[macro_export] +macro_rules! on_sitemty_data_old { + ($item:expr, $ex:expr) => { + if let Ok($crate::streamitem::StreamItem::DataItem($crate::streamitem::RangeCompletableItem::Data(item))) = + $item + { + $ex(item) + } else { + $item + } + }; +} + +#[macro_export] +macro_rules! on_sitemty_data { + ($item:expr, $ex:expr) => {{ + use $crate::streamitem::RangeCompletableItem; + use $crate::streamitem::StreamItem; + match $item { + Ok(x) => match x { + StreamItem::DataItem(x) => match x { + RangeCompletableItem::Data(x) => $ex(x), + RangeCompletableItem::RangeComplete => { + Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)) + } + }, + StreamItem::Log(x) => Ok(StreamItem::Log(x)), + StreamItem::Stats(x) => Ok(StreamItem::Stats(x)), + }, + Err(x) => Err(x), + } + }}; +} + +#[macro_export] +macro_rules! try_map_sitemty_data { + ($item:expr, $ex:expr) => {{ + use $crate::streamitem::RangeCompletableItem; + use $crate::streamitem::StreamItem; + match $item { + Ok(x) => match x { + StreamItem::DataItem(x) => match x { + RangeCompletableItem::Data(x) => match $ex(x) { + Ok(x) => Ok(StreamItem::DataItem(RangeCompletableItem::Data(x))), + Err(e) => Err(e), + }, + RangeCompletableItem::RangeComplete => { + Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)) + } + }, + StreamItem::Log(x) => Ok(StreamItem::Log(x)), + StreamItem::Stats(x) => Ok(StreamItem::Stats(x)), + }, + Err(x) => Err(x), + } + }}; +} + +pub fn sitem_data(x: X) -> Sitemty { + Ok(StreamItem::DataItem(RangeCompletableItem::Data(x))) +} + +pub fn sitem_err_from_string(x: T) -> Sitemty +where + T: ToString, +{ + Err(err::Error::from_string(x)) +} + +pub fn sitem_err2_from_string(x: T) -> err::Error +where + T: ToString, +{ + err::Error::from_string(x) +} + +mod levelserde { + use super::Level; + use serde::de::{self, Visitor}; + use serde::{Deserializer, Serializer}; + use std::fmt; + + pub fn serialize(t: &Level, se: S) -> Result + where + S: Serializer, + { + let g = match *t { + Level::ERROR => 1, + Level::WARN => 2, + Level::INFO => 3, + Level::DEBUG => 4, + Level::TRACE => 5, + }; + se.serialize_u32(g) + } + + struct VisitLevel; + + impl VisitLevel { + fn from_u32(x: u32) -> Level { + match x { + 1 => Level::ERROR, + 2 => Level::WARN, + 3 => Level::INFO, + 4 => Level::DEBUG, + 5 => Level::TRACE, + _ => Level::TRACE, + } + } + } + + impl<'de> Visitor<'de> for VisitLevel { + type Value = Level; + + fn expecting(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + write!(fmt, "expect Level code") + } + + fn visit_u64(self, val: u64) -> Result + where + E: de::Error, + { + Ok(VisitLevel::from_u32(val as _)) + } + } + + pub fn deserialize<'de, D>(de: D) -> Result + where + D: Deserializer<'de>, + { + de.deserialize_u32(VisitLevel) + } +} diff --git a/src/subfr.rs b/src/subfr.rs new file mode 100644 index 0000000..25ac5b9 --- /dev/null +++ b/src/subfr.rs @@ -0,0 +1,57 @@ +use netpod::EnumVariant; + +pub trait SubFrId { + const SUB: u32; +} + +impl SubFrId for u8 { + const SUB: u32 = 0x03; +} + +impl SubFrId for u16 { + const SUB: u32 = 0x05; +} + +impl SubFrId for u32 { + const SUB: u32 = 0x08; +} + +impl SubFrId for u64 { + const SUB: u32 = 0x0a; +} + +impl SubFrId for i8 { + const SUB: u32 = 0x02; +} + +impl SubFrId for i16 { + const SUB: u32 = 0x04; +} + +impl SubFrId for i32 { + const SUB: u32 = 0x07; +} + +impl SubFrId for i64 { + const SUB: u32 = 0x09; +} + +impl SubFrId for f32 { + const SUB: u32 = 0x0b; +} + +impl SubFrId for f64 { + const SUB: u32 = 0x0c; +} + +impl SubFrId for bool { + const SUB: u32 = 0x0d; +} + +impl SubFrId for String { + const SUB: u32 = 0x0e; +} + +impl SubFrId for EnumVariant { + const SUB: u32 = 0x0f; +} diff --git a/src/test.rs b/src/test.rs new file mode 100644 index 0000000..3e60a90 --- /dev/null +++ b/src/test.rs @@ -0,0 +1,87 @@ +pub fn f32_cmp_near(x: f32, y: f32, abs: f32, rel: f32) -> bool { + /*let x = { + let mut a = x.to_le_bytes(); + a[0] &= 0xf0; + f32::from_ne_bytes(a) + }; + let y = { + let mut a = y.to_le_bytes(); + a[0] &= 0xf0; + f32::from_ne_bytes(a) + }; + x == y*/ + let ad = (x - y).abs(); + ad <= abs || (ad / y).abs() <= rel +} + +pub fn f64_cmp_near(x: f64, y: f64, abs: f64, rel: f64) -> bool { + /*let x = { + let mut a = x.to_le_bytes(); + a[0] &= 0x00; + a[1] &= 0x00; + f64::from_ne_bytes(a) + }; + let y = { + let mut a = y.to_le_bytes(); + a[0] &= 0x00; + a[1] &= 0x00; + f64::from_ne_bytes(a) + }; + x == y*/ + let ad = (x - y).abs(); + ad <= abs || (ad / y).abs() <= rel +} + +pub fn f32_iter_cmp_near(a: A, b: B, abs: f32, rel: f32) -> bool +where + A: IntoIterator, + B: IntoIterator, +{ + let mut a = a.into_iter(); + let mut b = b.into_iter(); + loop { + let x = a.next(); + let y = b.next(); + if let (Some(x), Some(y)) = (x, y) { + if !f32_cmp_near(x, y, abs, rel) { + return false; + } + } else if x.is_some() || y.is_some() { + return false; + } else { + return true; + } + } +} + +pub fn f64_iter_cmp_near(a: A, b: B, abs: f64, rel: f64) -> bool +where + A: IntoIterator, + B: IntoIterator, +{ + let mut a = a.into_iter(); + let mut b = b.into_iter(); + loop { + let x = a.next(); + let y = b.next(); + if let (Some(x), Some(y)) = (x, y) { + if !f64_cmp_near(x, y, abs, rel) { + return false; + } + } else if x.is_some() || y.is_some() { + return false; + } else { + return true; + } + } +} + +#[test] +fn test_f32_iter_cmp_near() { + let a = [-127.553e17]; + let b = [-127.554e17]; + assert_eq!(f32_iter_cmp_near(a, b, 0.000001, 0.000001), false); + let a = [-127.55300e17]; + let b = [-127.55301e17]; + assert_eq!(f32_iter_cmp_near(a, b, 0.000001, 0.000001), true); +} diff --git a/src/timebin.rs b/src/timebin.rs new file mode 100644 index 0000000..d93ddd7 --- /dev/null +++ b/src/timebin.rs @@ -0,0 +1,129 @@ +use crate::collect_s::CollectableDyn; +use crate::AsAnyMut; +use crate::WithLen; +use netpod::BinnedRange; +use netpod::BinnedRangeEnum; +use netpod::TsNano; +use std::fmt; +use std::ops::Range; + +// TODO can probably be removed. +pub trait TimeBins { + fn ts_min(&self) -> Option; + fn ts_max(&self) -> Option; + fn ts_min_max(&self) -> Option<(u64, u64)>; +} + +// TODO remove +pub trait TimeBinnerTy: fmt::Debug + Send + Unpin { + type Input: fmt::Debug; + type Output: fmt::Debug; + + fn ingest(&mut self, item: &mut Self::Input); + + fn set_range_complete(&mut self); + + fn bins_ready_count(&self) -> usize; + + fn bins_ready(&mut self) -> Option; + + /// If there is a bin in progress with non-zero count, push it to the result set. + /// With push_empty == true, a bin in progress is pushed even if it contains no counts. + fn push_in_progress(&mut self, push_empty: bool); + + /// Implies `Self::push_in_progress` but in addition, pushes a zero-count bin if the call + /// to `push_in_progress` did not change the result count, as long as edges are left. + /// The next call to `Self::bins_ready_count` must return one higher count than before. + fn cycle(&mut self); + + fn empty(&self) -> Option; + + fn append_empty_until_end(&mut self); +} + +pub trait TimeBinnableTy: fmt::Debug + WithLen + Send + Sized { + type TimeBinner: TimeBinnerTy; + + fn time_binner_new( + &self, + binrange: BinnedRangeEnum, + do_time_weight: bool, + emit_empty_bins: bool, + ) -> Self::TimeBinner; +} + +// #[derive(Debug, ThisError)] +// #[cstm(name = "Binninggg")] +pub enum BinningggError { + Dyn(Box), + TypeMismatch { have: String, expect: String }, +} + +impl fmt::Display for BinningggError { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + match self { + BinningggError::Dyn(e) => write!(fmt, "{e}"), + BinningggError::TypeMismatch { have, expect } => { + write!(fmt, "TypeMismatch(have: {have}, expect: {expect})") + } + } + } +} + +impl From for BinningggError +where + E: std::error::Error + 'static, +{ + fn from(value: E) -> Self { + Self::Dyn(Box::new(value)) + } +} + +pub trait BinningggContainerEventsDyn: fmt::Debug + Send { + fn type_name(&self) -> &'static str; + fn binned_events_timeweight_traitobj(&self, range: BinnedRange) -> Box; + fn to_anybox(&mut self) -> Box; +} + +pub trait BinningggContainerBinsDyn: fmt::Debug + Send + fmt::Display + WithLen + AsAnyMut + CollectableDyn { + fn type_name(&self) -> &'static str; + fn empty(&self) -> BinsBoxed; + fn clone(&self) -> BinsBoxed; + fn edges_iter( + &self, + ) -> std::iter::Zip, std::collections::vec_deque::Iter>; + fn drain_into(&mut self, dst: &mut dyn BinningggContainerBinsDyn, range: Range); + fn fix_numerics(&mut self); +} + +pub type BinsBoxed = Box; + +pub type EventsBoxed = Box; + +pub trait BinningggBinnerTy: fmt::Debug + Send { + type Input: fmt::Debug; + type Output: fmt::Debug; + + fn ingest(&mut self, item: &mut Self::Input); + fn range_final(&mut self); + fn bins_ready_count(&self) -> usize; + fn bins_ready(&mut self) -> Option; +} + +pub trait BinningggBinnableTy: fmt::Debug + WithLen + Send { + type Binner: BinningggBinnerTy; + + fn binner_new(range: BinnedRange) -> Self::Binner; +} + +pub trait BinningggBinnerDyn: fmt::Debug + Send { + fn input_done_range_final(&mut self) -> Result<(), BinningggError>; + fn input_done_range_open(&mut self) -> Result<(), BinningggError>; +} + +pub trait BinnedEventsTimeweightTrait: fmt::Debug + Send { + fn ingest(&mut self, evs_all: EventsBoxed) -> Result<(), BinningggError>; + fn input_done_range_final(&mut self) -> Result<(), BinningggError>; + fn input_done_range_open(&mut self) -> Result<(), BinningggError>; + fn output(&mut self) -> Result, BinningggError>; +} diff --git a/src/transform.rs b/src/transform.rs new file mode 100644 index 0000000..1908ea2 --- /dev/null +++ b/src/transform.rs @@ -0,0 +1,166 @@ +use crate::collect_s::CollectableDyn; +use crate::collect_s::CollectedDyn; +use crate::streamitem::RangeCompletableItem; +use crate::streamitem::Sitemty; +use crate::streamitem::StreamItem; +use crate::Events; +use daqbuf_err as err; +use err::Error; +use futures_util::stream; +use futures_util::Future; +use futures_util::Stream; +use futures_util::StreamExt; +use std::pin::Pin; +use std::task::Context; +use std::task::Poll; + +pub trait EventStreamTrait: Stream>> + WithTransformProperties + Send {} + +pub trait CollectableStreamTrait: + Stream>> + WithTransformProperties + Send +{ +} + +pub struct EventTransformProperties { + pub needs_value: bool, +} + +pub struct TransformProperties { + pub needs_one_before_range: bool, + pub needs_value: bool, +} + +pub trait WithTransformProperties { + fn query_transform_properties(&self) -> TransformProperties; +} + +impl WithTransformProperties for Box +where + T: WithTransformProperties, +{ + fn query_transform_properties(&self) -> TransformProperties { + self.as_ref().query_transform_properties() + } +} + +impl WithTransformProperties for Pin> +where + T: WithTransformProperties, +{ + fn query_transform_properties(&self) -> TransformProperties { + self.as_ref().query_transform_properties() + } +} + +pub trait EventTransform: WithTransformProperties + Send { + fn transform(&mut self, src: Box) -> Box; +} + +impl EventTransform for Box +where + T: EventTransform, +{ + fn transform(&mut self, src: Box) -> Box { + self.as_mut().transform(src) + } +} + +impl EventTransform for Pin> +where + T: EventTransform, +{ + fn transform(&mut self, src: Box) -> Box { + todo!() + } +} + +pub struct IdentityTransform {} + +impl IdentityTransform { + pub fn default() -> Self { + Self {} + } +} + +impl WithTransformProperties for IdentityTransform { + fn query_transform_properties(&self) -> TransformProperties { + todo!() + } +} + +impl EventTransform for IdentityTransform { + fn transform(&mut self, src: Box) -> Box { + src + } +} + +pub struct TransformEvent(pub Box); + +impl WithTransformProperties for TransformEvent { + fn query_transform_properties(&self) -> TransformProperties { + self.0.query_transform_properties() + } +} + +impl EventTransform for TransformEvent { + fn transform(&mut self, src: Box) -> Box { + self.0.transform(src) + } +} + +impl WithTransformProperties for stream::Iter { + fn query_transform_properties(&self) -> TransformProperties { + todo!() + } +} + +impl EventStreamTrait for stream::Iter where + T: core::iter::Iterator>> + Send +{ +} + +pub struct EventStreamBox(pub Pin>); + +impl From for EventStreamBox +where + T: Events, +{ + fn from(value: T) -> Self { + let item = Ok(StreamItem::DataItem(RangeCompletableItem::Data(Box::new(value) as _))); + let x = stream::iter(vec![item]); + Self(Box::pin(x)) + } +} + +pub struct CollectableStreamBox(pub Pin>); + +impl Stream for CollectableStreamBox { + type Item = Sitemty>; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + self.0.poll_next_unpin(cx) + } +} + +impl WithTransformProperties for CollectableStreamBox { + fn query_transform_properties(&self) -> TransformProperties { + todo!() + } +} + +impl CollectableStreamTrait for CollectableStreamBox {} + +impl WithTransformProperties for stream::Empty { + fn query_transform_properties(&self) -> TransformProperties { + todo!() + } +} + +impl CollectableStreamTrait for stream::Empty +where + T: Send, + stream::Empty: Stream>>, +{ +} + +impl CollectableStreamTrait for Pin> where T: CollectableStreamTrait {} diff --git a/src/vecpreview.rs b/src/vecpreview.rs new file mode 100644 index 0000000..74f23ec --- /dev/null +++ b/src/vecpreview.rs @@ -0,0 +1,53 @@ +use core::fmt; +use std::collections::VecDeque; + +pub struct PreviewCell<'a, T> { + pub a: Option<&'a T>, + pub b: Option<&'a T>, +} + +impl<'a, T> fmt::Debug for PreviewCell<'a, T> +where + T: fmt::Debug, +{ + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + match (self.a.as_ref(), self.b.as_ref()) { + (Some(a), Some(b)) => write!(fmt, "{:?} .. {:?}", a, b), + (Some(a), None) => write!(fmt, "{:?}", a), + _ => write!(fmt, "(empty)"), + } + } +} + +pub trait PreviewRange { + fn preview<'a>(&'a self) -> Box; +} + +impl PreviewRange for VecDeque +where + T: fmt::Debug, +{ + fn preview<'a>(&'a self) -> Box { + let ret = PreviewCell { + a: self.front(), + b: if self.len() <= 1 { None } else { self.back() }, + }; + Box::new(ret) + } +} + +pub struct VecPreview<'a> { + c: &'a dyn PreviewRange, +} + +impl<'a> VecPreview<'a> { + pub fn new(c: &'a dyn PreviewRange) -> Self { + Self { c } + } +} + +impl<'a> fmt::Debug for VecPreview<'a> { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + write!(fmt, "{:?}", self.c.preview()) + } +}