WIP change container type

This commit is contained in:
Dominik Werder
2024-11-30 11:43:36 +01:00
parent f08f655065
commit 4d3def67b7
4 changed files with 42 additions and 101 deletions

View File

@@ -1,16 +1,12 @@
use crate::collect_s::ToCborValue;
use crate::collect_s::ToJsonValue;
use core::fmt;
use serde::Serialize;
use std::collections::VecDeque;
pub trait ContPayload: fmt::Debug + Serialize + Send {}
impl<T> ContPayload for T where T: fmt::Debug + Serialize + Send {}
pub trait UserApiType: ToCborValue {}
pub trait UserApiType: ToCborValue + ToJsonValue {}
pub trait ToUserFacingApiType {
fn to_user_facing_api_type(self: Self) -> Box<dyn UserApiType>;
fn to_user_facing_api_type(self) -> Box<dyn UserApiType>;
fn to_user_facing_api_type_box(self: Box<Self>) -> Box<dyn UserApiType>;
}
@@ -30,39 +26,11 @@ impl ToCborValue for EmptyStruct {
}
}
impl ToJsonValue for EmptyStruct {
fn to_json_value(&self) -> Result<serde_json::Value, serde_json::Error> {
let ret = serde_json::to_value(self);
ret
}
}
impl UserApiType for EmptyStruct {}
#[derive(Serialize)]
pub struct ContainerEventsApi<EVT>
where
EVT: ContPayload,
{
pub tss: VecDeque<u64>,
pub values: VecDeque<EVT>,
}
impl<EVT> fmt::Debug for ContainerEventsApi<EVT>
where
EVT: ContPayload,
{
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("ContainerEventsApi")
// .field("tss", &self.tss)
// .field("values", &self.values)
.finish()
}
}
impl<EVT> ToCborValue for ContainerEventsApi<EVT>
where
EVT: ContPayload,
{
fn to_cbor_value(&self) -> Result<ciborium::Value, ciborium::value::Error> {
// let mut out = Vec::new();
// ciborium::ser::into_writer(self, &mut out).unwrap();
let val = ciborium::value::Value::serialized(self).unwrap();
Ok(val)
}
}
impl<EVT> UserApiType for ContainerEventsApi<EVT> where EVT: ContPayload {}

View File

@@ -1,3 +1,4 @@
use crate::apitypes::ToUserFacingApiType;
use crate::container::ByteEstimate;
use crate::timebin::BinningggContainerBinsDyn;
use crate::AsAnyMut;
@@ -8,9 +9,6 @@ use crate::WithLen;
use daqbuf_err as err;
use err::Error;
use netpod::log::*;
use netpod::range::evrange::SeriesRange;
use netpod::BinnedRangeEnum;
use serde::Serialize;
use std::any;
use std::any::Any;
use std::fmt;
@@ -41,11 +39,11 @@ impl ToJsonValue for serde_json::Value {
}
}
pub trait CollectedDyn: fmt::Debug + TypeName + Send + AsAnyRef + WithLen + ToJsonValue {}
pub trait CollectedDyn: fmt::Debug + TypeName + Send + WithLen + ToUserFacingApiType {}
impl ToJsonValue for Box<dyn CollectedDyn> {
fn to_json_value(&self) -> Result<serde_json::Value, serde_json::Error> {
ToJsonValue::to_json_value(self.as_ref())
impl TypeName for Box<dyn CollectedDyn> {
fn type_name(&self) -> String {
self.as_ref().type_name()
}
}
@@ -55,42 +53,22 @@ impl WithLen for Box<dyn CollectedDyn> {
}
}
impl TypeName for Box<dyn CollectedDyn> {
fn type_name(&self) -> String {
self.as_ref().type_name()
}
}
impl CollectedDyn for Box<dyn CollectedDyn> {}
pub trait CollectorTy: fmt::Debug + Send + Unpin + WithLen + ByteEstimate {
type Input: CollectableDyn;
type Output: CollectedDyn + ToJsonValue + Serialize;
type Output: CollectedDyn;
fn ingest(&mut self, src: &mut Self::Input);
fn set_range_complete(&mut self);
fn set_timed_out(&mut self);
fn set_continue_at_here(&mut self);
// TODO use this crate's Error instead:
fn result(
&mut self,
range: Option<SeriesRange>,
binrange: Option<BinnedRangeEnum>,
) -> Result<Self::Output, Error>;
fn result(&mut self) -> Result<Self::Output, Error>;
}
pub trait CollectorDyn: fmt::Debug + Send + WithLen + ByteEstimate {
fn ingest(&mut self, src: &mut dyn CollectableDyn);
fn set_range_complete(&mut self);
fn set_timed_out(&mut self);
fn set_continue_at_here(&mut self);
// TODO factor the required parameters into new struct? Generic over events or binned?
fn result(
&mut self,
range: Option<SeriesRange>,
binrange: Option<BinnedRangeEnum>,
) -> Result<Box<dyn CollectedDyn>, Error>;
fn result(&mut self) -> Result<Box<dyn CollectedDyn>, Error>;
}
impl<T> CollectorDyn for T
@@ -101,22 +79,20 @@ where
if let Some(src) = src.as_any_mut().downcast_mut::<<T as CollectorTy>::Input>() {
trace!("sees incoming &mut ref");
T::ingest(self, src)
} else if let Some(src) = src
.as_any_mut()
.downcast_mut::<Box<<T as CollectorTy>::Input>>()
{
trace!("sees incoming &mut Box");
T::ingest(self, src)
} else {
if let Some(src) = src
.as_any_mut()
.downcast_mut::<Box<<T as CollectorTy>::Input>>()
{
trace!("sees incoming &mut Box");
T::ingest(self, src)
} else {
error!(
"No idea what this is. Expect: {} input {} got: {} {:?}",
any::type_name::<T>(),
any::type_name::<<T as CollectorTy>::Input>(),
src.type_name(),
src
);
}
error!(
"No idea what this is. Expect: {} input {} got: {} {:?}",
any::type_name::<T>(),
any::type_name::<<T as CollectorTy>::Input>(),
src.type_name(),
src
);
}
}
@@ -128,16 +104,8 @@ where
T::set_timed_out(self)
}
fn set_continue_at_here(&mut self) {
T::set_continue_at_here(self)
}
fn result(
&mut self,
range: Option<SeriesRange>,
binrange: Option<BinnedRangeEnum>,
) -> Result<Box<dyn CollectedDyn>, Error> {
let ret = T::result(self, range, binrange)?;
fn result(&mut self) -> Result<Box<dyn CollectedDyn>, Error> {
let ret = T::result(self)?;
Ok(Box::new(ret))
}
}

View File

@@ -5,6 +5,7 @@ use crate::WithLen;
use core::ops::Range;
use netpod::TsMs;
use netpod::TsNano;
use std::collections::VecDeque;
use std::fmt;
#[derive(Debug, thiserror::Error)]
@@ -44,7 +45,7 @@ pub trait MergeableTy: fmt::Debug + WithLen + ByteEstimate + Unpin + Sized {
fn find_lowest_index_gt(&self, ts: TsNano) -> Option<usize>;
fn find_lowest_index_ge(&self, ts: TsNano) -> Option<usize>;
fn find_highest_index_lt(&self, ts: TsNano) -> Option<usize>;
fn tss_for_testing(&self) -> Vec<TsMs>;
fn tss_for_testing(&self) -> VecDeque<TsNano>;
fn drain_into(&mut self, dst: &mut Self, range: Range<usize>) -> DrainIntoDstResult;
fn drain_into_new(&mut self, range: Range<usize>) -> DrainIntoNewResult<Self>;
fn is_consistent(&self) -> bool;
@@ -56,7 +57,7 @@ pub trait MergeableDyn: fmt::Debug + WithLen + ByteEstimate + Unpin + AsAnyMut {
fn find_lowest_index_gt(&self, ts: TsNano) -> Option<usize>;
fn find_lowest_index_ge(&self, ts: TsNano) -> Option<usize>;
fn find_highest_index_lt(&self, ts: TsNano) -> Option<usize>;
fn tss_for_testing(&self) -> Vec<TsMs>;
fn tss_for_testing(&self) -> VecDeque<TsNano>;
fn drain_into(&mut self, dst: &mut dyn MergeableDyn, range: Range<usize>)
-> DrainIntoDstResult;
fn drain_into_new(&mut self, range: Range<usize>) -> DrainIntoNewDynResult;

View File

@@ -1,5 +1,6 @@
use crate::apitypes::ToUserFacingApiType;
use crate::collect_s::CollectableDyn;
use crate::collect_s::CollectorDyn;
use crate::collect_s::ToCborValue;
use crate::collect_s::ToJsonValue;
use crate::container::ByteEstimate;
@@ -13,6 +14,7 @@ use crate::WithLen;
use netpod::BinnedRange;
use netpod::BinnedRangeEnum;
use netpod::TsNano;
use std::collections::VecDeque;
use std::fmt;
use std::ops::Range;
@@ -90,6 +92,7 @@ where
pub trait BinningggContainerEventsDyn:
fmt::Debug
+ TypeName
+ Send
+ AsAnyRef
+ WithLen
@@ -98,8 +101,8 @@ pub trait BinningggContainerEventsDyn:
+ ToJsonValue
+ ToCborValue
+ ToUserFacingApiType
+ CollectableDyn
{
fn type_name(&self) -> &'static str;
fn binned_events_timeweight_traitobj(
&self,
range: BinnedRange<TsNano>,
@@ -110,6 +113,7 @@ pub trait BinningggContainerEventsDyn:
fn nty_id(&self) -> u32;
fn eq(&self, rhs: &dyn BinningggContainerEventsDyn) -> bool;
fn as_mergeable_dyn_mut(&mut self) -> &mut dyn MergeableDyn;
fn as_collectable_dyn_mut(&mut self) -> &mut dyn CollectableDyn;
}
impl<T> MergeableDyn for Box<T>
@@ -136,7 +140,7 @@ where
self.as_ref().find_highest_index_lt(ts)
}
fn tss_for_testing(&self) -> Vec<netpod::TsMs> {
fn tss_for_testing(&self) -> VecDeque<TsNano> {
self.as_ref().tss_for_testing()
}