WIP merge type

This commit is contained in:
Dominik Werder
2024-11-24 22:31:25 +01:00
parent aacdc22214
commit 97e225f487
9 changed files with 195 additions and 32 deletions

View File

@@ -15,6 +15,7 @@ serde_json = "1"
bincode = "1.3.3"
bytes = "1.8.0"
futures-util = "0.3.24"
thiserror = "=0.0.1"
chrono = { version = "0.4.19", features = ["serde"] }
netpod = { path = "../daqbuf-netpod", package = "daqbuf-netpod" }
daqbuf-err = { path = "../daqbuf-err" }

View File

@@ -155,13 +155,13 @@ pub trait CollectableType: fmt::Debug + WithLen + AsAnyRef + AsAnyMut + TypeName
fn new_collector() -> Self::Collector;
}
pub trait CollectableDyn: fmt::Debug + WithLen + AsAnyRef + AsAnyMut + TypeName + Send {
pub trait CollectableDyn: fmt::Debug + WithLen + AsAnyRef + AsAnyMut + Send + TypeName {
fn new_collector(&self) -> Box<dyn CollectorDyn>;
}
impl TypeName for Box<dyn BinningggContainerBinsDyn> {
fn type_name(&self) -> String {
BinningggContainerBinsDyn::type_name(self.as_ref()).into()
self.as_ref().type_name()
}
}

View File

@@ -9,3 +9,12 @@ impl ByteEstimate for Box<dyn Events> {
self.as_ref().byte_estimate()
}
}
impl<T> ByteEstimate for Box<T>
where
T: ByteEstimate,
{
fn byte_estimate(&self) -> u64 {
self.as_ref().byte_estimate()
}
}

View File

@@ -12,6 +12,15 @@ pub trait WithLen {
fn len(&self) -> usize;
}
impl<T> WithLen for Box<T>
where
T: WithLen,
{
fn len(&self) -> usize {
self.as_ref().len()
}
}
impl WithLen for bytes::Bytes {
fn len(&self) -> usize {
self.len()
@@ -78,26 +87,6 @@ where
}
}
#[derive(Debug)]
pub enum MergeError {
NotCompatible,
Full,
}
impl From<MergeError> for err::Error {
fn from(e: MergeError) -> Self {
e.to_string().into()
}
}
impl fmt::Display for MergeError {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
write!(fmt, "{:?}", self)
}
}
impl std::error::Error for MergeError {}
// TODO can I remove the Any bound?
/// Container of some form of events, for use as trait object.
@@ -126,7 +115,7 @@ pub trait Events:
&mut self,
dst: &mut dyn Events,
range: (usize, usize),
) -> Result<(), MergeError>;
) -> Result<(), err::Error>;
fn find_lowest_index_gt_evs(&self, ts: u64) -> Option<usize>;
fn find_lowest_index_ge_evs(&self, ts: u64) -> Option<usize>;
fn find_highest_index_lt_evs(&self, ts: u64) -> Option<usize>;
@@ -212,7 +201,7 @@ impl Events for Box<dyn Events> {
&mut self,
dst: &mut dyn Events,
range: (usize, usize),
) -> Result<(), MergeError> {
) -> Result<(), err::Error> {
Events::drain_into_evs(self.as_mut(), dst, range)
}

View File

@@ -3,6 +3,7 @@ pub mod container;
pub mod events;
pub mod framable;
pub mod isodate;
pub mod merge;
pub mod overlap;
pub mod scalar_ops;
pub mod streamitem;

53
src/merge.rs Normal file
View File

@@ -0,0 +1,53 @@
use crate::container::ByteEstimate;
use crate::AsAnyMut;
use crate::WithLen;
use core::ops::Range;
use netpod::TsMs;
use netpod::TsNano;
use std::fmt;
#[derive(Debug, thiserror::Error)]
#[cstm(name = "MergeError")]
pub enum Error {}
impl From<Error> for daqbuf_err::Error {
fn from(e: Error) -> Self {
daqbuf_err::Error::from_string(e)
}
}
#[derive(Debug)]
pub enum DrainIntoDstResult {
Done,
Partial,
NotCompatible,
}
#[derive(Debug)]
pub enum DrainIntoNewResult<T> {
Done(T),
Partial(T),
NotCompatible,
}
pub trait MergeableTy: fmt::Debug + WithLen + ByteEstimate + Unpin + Sized {
fn ts_min(&self) -> Option<TsNano>;
fn ts_max(&self) -> Option<TsNano>;
fn find_lowest_index_gt(&self, ts: TsNano) -> Option<usize>;
fn find_lowest_index_ge(&self, ts: TsNano) -> Option<usize>;
fn find_highest_index_lt(&self, ts: TsNano) -> Option<usize>;
fn tss_for_testing(&self) -> Vec<TsMs>;
fn drain_into(&mut self, dst: &mut Self, range: Range<usize>) -> DrainIntoDstResult;
fn drain_into_new(&mut self, range: Range<usize>) -> DrainIntoNewResult<Self>;
}
pub trait MergeableDyn: fmt::Debug + WithLen + ByteEstimate + Unpin + AsAnyMut {
fn ts_min(&self) -> Option<TsNano>;
fn ts_max(&self) -> Option<TsNano>;
fn find_lowest_index_gt(&self, ts: TsNano) -> Option<usize>;
fn find_lowest_index_ge(&self, ts: TsNano) -> Option<usize>;
fn find_highest_index_lt(&self, ts: TsNano) -> Option<usize>;
fn tss_for_testing(&self) -> Vec<TsMs>;
fn drain_into(&mut self, dst: &mut dyn MergeableDyn, range: Range<usize>)
-> DrainIntoDstResult;
}

View File

@@ -25,6 +25,7 @@ pub const ITEMS_2_CHANNEL_EVENTS_FRAME_TYPE_ID: u32 = 0x2500;
pub const X_BINNED_SCALAR_EVENTS_FRAME_TYPE_ID: u32 = 0x8800;
pub const X_BINNED_WAVE_EVENTS_FRAME_TYPE_ID: u32 = 0x8900;
pub const DATABUFFER_EVENT_BLOB_FRAME_TYPE_ID: u32 = 0x8a00;
pub const CONTAINER_EVENTS_TYPE_ID: u32 = 0x8b00;
pub fn bool_is_false(j: &bool) -> bool {
*j == false
@@ -78,7 +79,8 @@ pub type Sitemty2<T, E> = Result<StreamItem<RangeCompletableItem<T>>, E>;
#[macro_export]
macro_rules! on_sitemty_range_complete {
($item:expr, $ex:expr) => {
if let Ok($crate::StreamItem::DataItem($crate::RangeCompletableItem::RangeComplete)) = $item {
if let Ok($crate::StreamItem::DataItem($crate::RangeCompletableItem::RangeComplete)) = $item
{
$ex
}
};
@@ -87,8 +89,9 @@ macro_rules! on_sitemty_range_complete {
#[macro_export]
macro_rules! on_sitemty_data_old {
($item:expr, $ex:expr) => {
if let Ok($crate::streamitem::StreamItem::DataItem($crate::streamitem::RangeCompletableItem::Data(item))) =
$item
if let Ok($crate::streamitem::StreamItem::DataItem(
$crate::streamitem::RangeCompletableItem::Data(item),
)) = $item
{
$ex(item)
} else {

View File

@@ -55,3 +55,55 @@ impl SubFrId for String {
impl SubFrId for EnumVariant {
const SUB: u32 = 0x0f;
}
impl SubFrId for Vec<u8> {
const SUB: u32 = 0x23;
}
impl SubFrId for Vec<u16> {
const SUB: u32 = 0x25;
}
impl SubFrId for Vec<u32> {
const SUB: u32 = 0x28;
}
impl SubFrId for Vec<u64> {
const SUB: u32 = 0x2a;
}
impl SubFrId for Vec<i8> {
const SUB: u32 = 0x22;
}
impl SubFrId for Vec<i16> {
const SUB: u32 = 0x24;
}
impl SubFrId for Vec<i32> {
const SUB: u32 = 0x27;
}
impl SubFrId for Vec<i64> {
const SUB: u32 = 0x29;
}
impl SubFrId for Vec<f32> {
const SUB: u32 = 0x2b;
}
impl SubFrId for Vec<f64> {
const SUB: u32 = 0x2c;
}
impl SubFrId for Vec<bool> {
const SUB: u32 = 0x2d;
}
impl SubFrId for Vec<String> {
const SUB: u32 = 0x2e;
}
impl SubFrId for Vec<EnumVariant> {
const SUB: u32 = 0x2f;
}

View File

@@ -1,6 +1,10 @@
use crate::collect_s::CollectableDyn;
use crate::container::ByteEstimate;
use crate::merge::DrainIntoDstResult;
use crate::merge::MergeableDyn;
use crate::AsAnyMut;
use crate::AsAnyRef;
use crate::TypeName;
use crate::WithLen;
use netpod::BinnedRange;
use netpod::BinnedRangeEnum;
@@ -80,19 +84,61 @@ where
}
}
pub trait BinningggContainerEventsDyn: fmt::Debug + Send + AsAnyRef {
pub trait BinningggContainerEventsDyn:
fmt::Debug + Send + AsAnyRef + WithLen + ByteEstimate + MergeableDyn
{
fn type_name(&self) -> &'static str;
fn binned_events_timeweight_traitobj(
&self,
range: BinnedRange<TsNano>,
) -> Box<dyn BinnedEventsTimeweightTrait>;
fn to_anybox(&mut self) -> Box<dyn std::any::Any>;
fn clone_dyn(&self) -> Box<dyn BinningggContainerEventsDyn>;
fn serde_id(&self) -> u32;
fn nty_id(&self) -> u32;
fn eq(&self, rhs: &dyn BinningggContainerEventsDyn) -> bool;
}
impl<T> MergeableDyn for Box<T>
where
T: MergeableDyn,
{
fn ts_min(&self) -> Option<TsNano> {
self.as_ref().ts_min()
}
fn ts_max(&self) -> Option<TsNano> {
self.as_ref().ts_max()
}
fn find_lowest_index_gt(&self, ts: TsNano) -> Option<usize> {
self.as_ref().find_lowest_index_gt(ts)
}
fn find_lowest_index_ge(&self, ts: TsNano) -> Option<usize> {
self.as_ref().find_lowest_index_ge(ts)
}
fn find_highest_index_lt(&self, ts: TsNano) -> Option<usize> {
self.as_ref().find_highest_index_lt(ts)
}
fn tss_for_testing(&self) -> Vec<netpod::TsMs> {
self.as_ref().tss_for_testing()
}
fn drain_into(
&mut self,
dst: &mut dyn MergeableDyn,
range: Range<usize>,
) -> DrainIntoDstResult {
todo!()
}
}
pub trait BinningggContainerBinsDyn:
fmt::Debug + Send + fmt::Display + WithLen + AsAnyMut + CollectableDyn
fmt::Debug + Send + fmt::Display + TypeName + WithLen + AsAnyMut + CollectableDyn
{
fn type_name(&self) -> &'static str;
fn empty(&self) -> BinsBoxed;
fn clone(&self) -> BinsBoxed;
fn edges_iter(
@@ -102,6 +148,10 @@ pub trait BinningggContainerBinsDyn:
std::collections::vec_deque::Iter<TsNano>,
>;
fn drain_into(&mut self, dst: &mut dyn BinningggContainerBinsDyn, range: Range<usize>);
fn binned_bins_timeweight_traitobj(
&self,
range: BinnedRange<TsNano>,
) -> Box<dyn BinnedBinsTimeweightTrait>;
fn fix_numerics(&mut self);
}
@@ -112,7 +162,6 @@ pub type EventsBoxed = Box<dyn BinningggContainerEventsDyn>;
pub trait BinningggBinnerTy: fmt::Debug + Send {
type Input: fmt::Debug;
type Output: fmt::Debug;
fn ingest(&mut self, item: &mut Self::Input);
fn range_final(&mut self);
fn bins_ready_count(&self) -> usize;
@@ -121,7 +170,6 @@ pub trait BinningggBinnerTy: fmt::Debug + Send {
pub trait BinningggBinnableTy: fmt::Debug + WithLen + Send {
type Binner: BinningggBinnerTy<Input = Self>;
fn binner_new(range: BinnedRange<TsNano>) -> Self::Binner;
}
@@ -136,3 +184,10 @@ pub trait BinnedEventsTimeweightTrait: fmt::Debug + Send {
fn input_done_range_open(&mut self) -> Result<(), BinningggError>;
fn output(&mut self) -> Result<Option<BinsBoxed>, BinningggError>;
}
pub trait BinnedBinsTimeweightTrait: fmt::Debug + Send {
fn ingest(&mut self, bins: &BinsBoxed) -> Result<(), BinningggError>;
fn input_done_range_final(&mut self) -> Result<(), BinningggError>;
fn input_done_range_open(&mut self) -> Result<(), BinningggError>;
fn output(&mut self) -> Result<Option<BinsBoxed>, BinningggError>;
}