diff --git a/Cargo.toml b/Cargo.toml index 160b83a..a62deab 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,6 +15,7 @@ serde_json = "1" bincode = "1.3.3" bytes = "1.8.0" futures-util = "0.3.24" +thiserror = "=0.0.1" chrono = { version = "0.4.19", features = ["serde"] } netpod = { path = "../daqbuf-netpod", package = "daqbuf-netpod" } daqbuf-err = { path = "../daqbuf-err" } diff --git a/src/collect_s.rs b/src/collect_s.rs index 3e9505b..8d109f2 100644 --- a/src/collect_s.rs +++ b/src/collect_s.rs @@ -155,13 +155,13 @@ pub trait CollectableType: fmt::Debug + WithLen + AsAnyRef + AsAnyMut + TypeName fn new_collector() -> Self::Collector; } -pub trait CollectableDyn: fmt::Debug + WithLen + AsAnyRef + AsAnyMut + TypeName + Send { +pub trait CollectableDyn: fmt::Debug + WithLen + AsAnyRef + AsAnyMut + Send + TypeName { fn new_collector(&self) -> Box; } impl TypeName for Box { fn type_name(&self) -> String { - BinningggContainerBinsDyn::type_name(self.as_ref()).into() + self.as_ref().type_name() } } diff --git a/src/container.rs b/src/container.rs index e26c1f7..20bc4c6 100644 --- a/src/container.rs +++ b/src/container.rs @@ -9,3 +9,12 @@ impl ByteEstimate for Box { self.as_ref().byte_estimate() } } + +impl ByteEstimate for Box +where + T: ByteEstimate, +{ + fn byte_estimate(&self) -> u64 { + self.as_ref().byte_estimate() + } +} diff --git a/src/events.rs b/src/events.rs index ef7af24..1cef23a 100644 --- a/src/events.rs +++ b/src/events.rs @@ -12,6 +12,15 @@ pub trait WithLen { fn len(&self) -> usize; } +impl WithLen for Box +where + T: WithLen, +{ + fn len(&self) -> usize { + self.as_ref().len() + } +} + impl WithLen for bytes::Bytes { fn len(&self) -> usize { self.len() @@ -78,26 +87,6 @@ where } } -#[derive(Debug)] -pub enum MergeError { - NotCompatible, - Full, -} - -impl From for err::Error { - fn from(e: MergeError) -> Self { - e.to_string().into() - } -} - -impl fmt::Display for MergeError { - fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - write!(fmt, "{:?}", self) - } -} - -impl std::error::Error for MergeError {} - // TODO can I remove the Any bound? /// Container of some form of events, for use as trait object. @@ -126,7 +115,7 @@ pub trait Events: &mut self, dst: &mut dyn Events, range: (usize, usize), - ) -> Result<(), MergeError>; + ) -> Result<(), err::Error>; fn find_lowest_index_gt_evs(&self, ts: u64) -> Option; fn find_lowest_index_ge_evs(&self, ts: u64) -> Option; fn find_highest_index_lt_evs(&self, ts: u64) -> Option; @@ -212,7 +201,7 @@ impl Events for Box { &mut self, dst: &mut dyn Events, range: (usize, usize), - ) -> Result<(), MergeError> { + ) -> Result<(), err::Error> { Events::drain_into_evs(self.as_mut(), dst, range) } diff --git a/src/lib.rs b/src/lib.rs index 1b2d02e..8f7e2e1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -3,6 +3,7 @@ pub mod container; pub mod events; pub mod framable; pub mod isodate; +pub mod merge; pub mod overlap; pub mod scalar_ops; pub mod streamitem; diff --git a/src/merge.rs b/src/merge.rs new file mode 100644 index 0000000..bf7190e --- /dev/null +++ b/src/merge.rs @@ -0,0 +1,53 @@ +use crate::container::ByteEstimate; +use crate::AsAnyMut; +use crate::WithLen; +use core::ops::Range; +use netpod::TsMs; +use netpod::TsNano; +use std::fmt; + +#[derive(Debug, thiserror::Error)] +#[cstm(name = "MergeError")] +pub enum Error {} + +impl From for daqbuf_err::Error { + fn from(e: Error) -> Self { + daqbuf_err::Error::from_string(e) + } +} + +#[derive(Debug)] +pub enum DrainIntoDstResult { + Done, + Partial, + NotCompatible, +} + +#[derive(Debug)] +pub enum DrainIntoNewResult { + Done(T), + Partial(T), + NotCompatible, +} + +pub trait MergeableTy: fmt::Debug + WithLen + ByteEstimate + Unpin + Sized { + fn ts_min(&self) -> Option; + fn ts_max(&self) -> Option; + fn find_lowest_index_gt(&self, ts: TsNano) -> Option; + fn find_lowest_index_ge(&self, ts: TsNano) -> Option; + fn find_highest_index_lt(&self, ts: TsNano) -> Option; + fn tss_for_testing(&self) -> Vec; + fn drain_into(&mut self, dst: &mut Self, range: Range) -> DrainIntoDstResult; + fn drain_into_new(&mut self, range: Range) -> DrainIntoNewResult; +} + +pub trait MergeableDyn: fmt::Debug + WithLen + ByteEstimate + Unpin + AsAnyMut { + fn ts_min(&self) -> Option; + fn ts_max(&self) -> Option; + fn find_lowest_index_gt(&self, ts: TsNano) -> Option; + fn find_lowest_index_ge(&self, ts: TsNano) -> Option; + fn find_highest_index_lt(&self, ts: TsNano) -> Option; + fn tss_for_testing(&self) -> Vec; + fn drain_into(&mut self, dst: &mut dyn MergeableDyn, range: Range) + -> DrainIntoDstResult; +} diff --git a/src/streamitem.rs b/src/streamitem.rs index 00b28de..67badbe 100644 --- a/src/streamitem.rs +++ b/src/streamitem.rs @@ -25,6 +25,7 @@ pub const ITEMS_2_CHANNEL_EVENTS_FRAME_TYPE_ID: u32 = 0x2500; pub const X_BINNED_SCALAR_EVENTS_FRAME_TYPE_ID: u32 = 0x8800; pub const X_BINNED_WAVE_EVENTS_FRAME_TYPE_ID: u32 = 0x8900; pub const DATABUFFER_EVENT_BLOB_FRAME_TYPE_ID: u32 = 0x8a00; +pub const CONTAINER_EVENTS_TYPE_ID: u32 = 0x8b00; pub fn bool_is_false(j: &bool) -> bool { *j == false @@ -78,7 +79,8 @@ pub type Sitemty2 = Result>, E>; #[macro_export] macro_rules! on_sitemty_range_complete { ($item:expr, $ex:expr) => { - if let Ok($crate::StreamItem::DataItem($crate::RangeCompletableItem::RangeComplete)) = $item { + if let Ok($crate::StreamItem::DataItem($crate::RangeCompletableItem::RangeComplete)) = $item + { $ex } }; @@ -87,8 +89,9 @@ macro_rules! on_sitemty_range_complete { #[macro_export] macro_rules! on_sitemty_data_old { ($item:expr, $ex:expr) => { - if let Ok($crate::streamitem::StreamItem::DataItem($crate::streamitem::RangeCompletableItem::Data(item))) = - $item + if let Ok($crate::streamitem::StreamItem::DataItem( + $crate::streamitem::RangeCompletableItem::Data(item), + )) = $item { $ex(item) } else { diff --git a/src/subfr.rs b/src/subfr.rs index 25ac5b9..696ce5c 100644 --- a/src/subfr.rs +++ b/src/subfr.rs @@ -55,3 +55,55 @@ impl SubFrId for String { impl SubFrId for EnumVariant { const SUB: u32 = 0x0f; } + +impl SubFrId for Vec { + const SUB: u32 = 0x23; +} + +impl SubFrId for Vec { + const SUB: u32 = 0x25; +} + +impl SubFrId for Vec { + const SUB: u32 = 0x28; +} + +impl SubFrId for Vec { + const SUB: u32 = 0x2a; +} + +impl SubFrId for Vec { + const SUB: u32 = 0x22; +} + +impl SubFrId for Vec { + const SUB: u32 = 0x24; +} + +impl SubFrId for Vec { + const SUB: u32 = 0x27; +} + +impl SubFrId for Vec { + const SUB: u32 = 0x29; +} + +impl SubFrId for Vec { + const SUB: u32 = 0x2b; +} + +impl SubFrId for Vec { + const SUB: u32 = 0x2c; +} + +impl SubFrId for Vec { + const SUB: u32 = 0x2d; +} + +impl SubFrId for Vec { + const SUB: u32 = 0x2e; +} + +impl SubFrId for Vec { + const SUB: u32 = 0x2f; +} diff --git a/src/timebin.rs b/src/timebin.rs index e0b6111..83d5b01 100644 --- a/src/timebin.rs +++ b/src/timebin.rs @@ -1,6 +1,10 @@ use crate::collect_s::CollectableDyn; +use crate::container::ByteEstimate; +use crate::merge::DrainIntoDstResult; +use crate::merge::MergeableDyn; use crate::AsAnyMut; use crate::AsAnyRef; +use crate::TypeName; use crate::WithLen; use netpod::BinnedRange; use netpod::BinnedRangeEnum; @@ -80,19 +84,61 @@ where } } -pub trait BinningggContainerEventsDyn: fmt::Debug + Send + AsAnyRef { +pub trait BinningggContainerEventsDyn: + fmt::Debug + Send + AsAnyRef + WithLen + ByteEstimate + MergeableDyn +{ fn type_name(&self) -> &'static str; fn binned_events_timeweight_traitobj( &self, range: BinnedRange, ) -> Box; fn to_anybox(&mut self) -> Box; + fn clone_dyn(&self) -> Box; + fn serde_id(&self) -> u32; + fn nty_id(&self) -> u32; + fn eq(&self, rhs: &dyn BinningggContainerEventsDyn) -> bool; +} + +impl MergeableDyn for Box +where + T: MergeableDyn, +{ + fn ts_min(&self) -> Option { + self.as_ref().ts_min() + } + + fn ts_max(&self) -> Option { + self.as_ref().ts_max() + } + + fn find_lowest_index_gt(&self, ts: TsNano) -> Option { + self.as_ref().find_lowest_index_gt(ts) + } + + fn find_lowest_index_ge(&self, ts: TsNano) -> Option { + self.as_ref().find_lowest_index_ge(ts) + } + + fn find_highest_index_lt(&self, ts: TsNano) -> Option { + self.as_ref().find_highest_index_lt(ts) + } + + fn tss_for_testing(&self) -> Vec { + self.as_ref().tss_for_testing() + } + + fn drain_into( + &mut self, + dst: &mut dyn MergeableDyn, + range: Range, + ) -> DrainIntoDstResult { + todo!() + } } pub trait BinningggContainerBinsDyn: - fmt::Debug + Send + fmt::Display + WithLen + AsAnyMut + CollectableDyn + fmt::Debug + Send + fmt::Display + TypeName + WithLen + AsAnyMut + CollectableDyn { - fn type_name(&self) -> &'static str; fn empty(&self) -> BinsBoxed; fn clone(&self) -> BinsBoxed; fn edges_iter( @@ -102,6 +148,10 @@ pub trait BinningggContainerBinsDyn: std::collections::vec_deque::Iter, >; fn drain_into(&mut self, dst: &mut dyn BinningggContainerBinsDyn, range: Range); + fn binned_bins_timeweight_traitobj( + &self, + range: BinnedRange, + ) -> Box; fn fix_numerics(&mut self); } @@ -112,7 +162,6 @@ pub type EventsBoxed = Box; pub trait BinningggBinnerTy: fmt::Debug + Send { type Input: fmt::Debug; type Output: fmt::Debug; - fn ingest(&mut self, item: &mut Self::Input); fn range_final(&mut self); fn bins_ready_count(&self) -> usize; @@ -121,7 +170,6 @@ pub trait BinningggBinnerTy: fmt::Debug + Send { pub trait BinningggBinnableTy: fmt::Debug + WithLen + Send { type Binner: BinningggBinnerTy; - fn binner_new(range: BinnedRange) -> Self::Binner; } @@ -136,3 +184,10 @@ pub trait BinnedEventsTimeweightTrait: fmt::Debug + Send { fn input_done_range_open(&mut self) -> Result<(), BinningggError>; fn output(&mut self) -> Result, BinningggError>; } + +pub trait BinnedBinsTimeweightTrait: fmt::Debug + Send { + fn ingest(&mut self, bins: &BinsBoxed) -> Result<(), BinningggError>; + fn input_done_range_final(&mut self) -> Result<(), BinningggError>; + fn input_done_range_open(&mut self) -> Result<(), BinningggError>; + fn output(&mut self) -> Result, BinningggError>; +}