From 4365d242805dc6a92d75362389bc4ea6f6b622b4 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Thu, 19 Sep 2024 13:19:42 +0200 Subject: [PATCH] WIP --- crates/items_2/src/binning.rs | 5 +- crates/items_2/src/binning/aggregator.rs | 19 ++++++ .../items_2/src/binning/container_events.rs | 68 +++++++++++++++---- crates/items_2/src/binning/test.rs | 12 ++++ .../binning/timeweight/timeweight_events.rs | 24 ++++++- crates/items_2/src/binsdim0.rs | 56 ++------------- crates/items_2/src/items_2.rs | 1 + crates/items_2/src/vecpreview.rs | 28 ++++++++ 8 files changed, 146 insertions(+), 67 deletions(-) create mode 100644 crates/items_2/src/binning/aggregator.rs create mode 100644 crates/items_2/src/vecpreview.rs diff --git a/crates/items_2/src/binning.rs b/crates/items_2/src/binning.rs index dcd36c7..62dcd79 100644 --- a/crates/items_2/src/binning.rs +++ b/crates/items_2/src/binning.rs @@ -1,5 +1,8 @@ +pub mod aggregator; pub mod container_events; -pub mod test; pub mod timeweight; +#[cfg(test)] +mod test; + use super::binning as ___; diff --git a/crates/items_2/src/binning/aggregator.rs b/crates/items_2/src/binning/aggregator.rs new file mode 100644 index 0000000..638ef60 --- /dev/null +++ b/crates/items_2/src/binning/aggregator.rs @@ -0,0 +1,19 @@ +use std::marker::PhantomData; + +pub trait AggregatorTimeWeight {} + +pub struct AggregatorNumeric { + _t0: PhantomData, +} + +trait AggWithSame {} + +impl AggWithSame for f64 {} + +impl AggregatorTimeWeight for AggregatorNumeric where T: AggWithSame {} + +impl AggregatorTimeWeight for AggregatorNumeric {} + +impl AggregatorTimeWeight for AggregatorNumeric {} + +// TODO do enum right from begin, using a SOA enum container. diff --git a/crates/items_2/src/binning/container_events.rs b/crates/items_2/src/binning/container_events.rs index 516ae68..8c8725c 100644 --- a/crates/items_2/src/binning/container_events.rs +++ b/crates/items_2/src/binning/container_events.rs @@ -1,42 +1,82 @@ +use super::aggregator::AggregatorNumeric; +use super::aggregator::AggregatorTimeWeight; use super::___; +use crate::vecpreview::PreviewRange; +use crate::vecpreview::VecPreview; +use core::fmt; use netpod::TsNano; use serde::Deserialize; use serde::Serialize; +use std::any; use std::collections::VecDeque; #[allow(unused)] macro_rules! trace_init { ($($arg:tt)*) => ( if true { trace!($($arg)*); }) } -pub trait Container: Clone {} +pub trait Container: fmt::Debug + Clone + PreviewRange + Serialize + for<'a> Deserialize<'a> { + fn new() -> Self; +} -impl Container for VecDeque where T: EventValueType {} - -pub trait EventValueType: Clone { +pub trait EventValueType: fmt::Debug + Clone { type Container: Container; + type AggregatorTimeWeight: AggregatorTimeWeight; +} + +impl Container for VecDeque +where + T: EventValueType + Serialize + for<'a> Deserialize<'a>, +{ + fn new() -> Self { + VecDeque::new() + } } impl EventValueType for f32 { type Container = VecDeque; + type AggregatorTimeWeight = AggregatorNumeric; } -#[derive(Clone)] +#[derive(Clone, Serialize, Deserialize)] pub struct ContainerEvents where EVT: EventValueType, { tss: VecDeque, - // vals: VecDeque, - vals: VecDeque<::Container>, + vals: ::Container, } -// TODO why does this already impl Serialize even though there is no bound for EVT? -// TODO try to actually instantiate and serialize in a test. - -#[derive(Clone, Serialize, Deserialize)] -pub struct ContainerEvents2 +impl ContainerEvents where EVT: EventValueType, { - tss: VecDeque, - vals: VecDeque, + pub fn type_name() -> &'static str { + any::type_name::() + } + + pub fn new() -> Self { + Self { + tss: VecDeque::new(), + vals: Container::new(), + } + } + + pub fn len(&self) -> usize { + self.tss.len() + } +} + +impl fmt::Debug for ContainerEvents +where + EVT: EventValueType, +{ + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + let self_name = any::type_name::(); + write!( + fmt, + "{self_name} {{ len: {:?}, tss: {:?}, vals {:?} }}", + self.len(), + VecPreview::new(&self.tss), + VecPreview::new(&self.vals), + ) + } } diff --git a/crates/items_2/src/binning/test.rs b/crates/items_2/src/binning/test.rs index 378e0e6..1a66242 100644 --- a/crates/items_2/src/binning/test.rs +++ b/crates/items_2/src/binning/test.rs @@ -1,2 +1,14 @@ +use super::container_events::ContainerEvents; use super::___; use netpod::log::*; +use std::any; + +#[test] +fn test_use_serde() { + let x = ContainerEvents::::new(); + let a: &dyn any::Any = &x; + assert_eq!(a.downcast_ref::().is_some(), false); + assert_eq!(a.downcast_ref::>().is_some(), true); + let s = serde_json::to_string(&x).unwrap(); + let _: ContainerEvents = serde_json::from_str(&s).unwrap(); +} diff --git a/crates/items_2/src/binning/timeweight/timeweight_events.rs b/crates/items_2/src/binning/timeweight/timeweight_events.rs index fc777ae..1b8c369 100644 --- a/crates/items_2/src/binning/timeweight/timeweight_events.rs +++ b/crates/items_2/src/binning/timeweight/timeweight_events.rs @@ -1,8 +1,9 @@ use super::super::container_events::EventValueType; use super::___; +use err::thiserror; +use err::ThisError; use futures_util::Stream; use netpod::log::*; -use std::collections::VecDeque; use std::marker::PhantomData; use std::pin::Pin; use std::task::Context; @@ -11,6 +12,10 @@ use std::task::Poll; #[allow(unused)] macro_rules! trace_init { ($($arg:tt)*) => ( if true { trace!($($arg)*); }) } +#[derive(Debug, ThisError)] +#[cstm(name = "BinnedEventsTimeweight")] +pub enum Error {} + pub struct BinnedEventsTimeweight where EVT: EventValueType, @@ -18,7 +23,22 @@ where _evt: PhantomData, } -impl BinnedEventsTimeweight where EVT: EventValueType {} +impl BinnedEventsTimeweight +where + EVT: EventValueType, +{ + pub fn ingest(&mut self, evs: ::Container) -> Result<(), Error> { + // It is this type's task to find and store the one-before event. + // We then pass it to the aggregation. + // AggregatorTimeWeight needs a function for that. + // What about counting the events that actually fall into the range? + // Maybe that should be done in this type. + // That way we can pass the values and weights to the aggregation, and count the in-range here. + // This type must also "close" the current aggregation by passing the "last" and init the next. + // ALSO: need to keep track of the "lst". Probably best done in this type as well? + todo!() + } +} pub struct BinnedEventsTimeweightStream {} diff --git a/crates/items_2/src/binsdim0.rs b/crates/items_2/src/binsdim0.rs index d0d408e..35ed5a0 100644 --- a/crates/items_2/src/binsdim0.rs +++ b/crates/items_2/src/binsdim0.rs @@ -2,6 +2,7 @@ use crate::timebin::TimeBinnerCommonV0Func; use crate::timebin::TimeBinnerCommonV0Trait; use crate::ts_offs_from_abs; use crate::ts_offs_from_abs_with_anchor; +use crate::vecpreview::VecPreview; use crate::IsoDateTime; use crate::RangeOverlapInfo; use crate::TimeBinnableType; @@ -79,9 +80,9 @@ where { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { let self_name = any::type_name::(); - // if true { - // return fmt::Display::fmt(self, fmt); - // } + if true { + return fmt::Display::fmt(self, fmt); + } if true { write!( fmt, @@ -110,60 +111,15 @@ where } } -trait HasFrontBack { - fn len(&self) -> usize; - fn front(&self) -> Option<&T>; - fn back(&self) -> Option<&T>; -} - -impl HasFrontBack for VecDeque { - fn len(&self) -> usize { - self.len() - } - - fn front(&self) -> Option<&T> { - self.front() - } - - fn back(&self) -> Option<&T> { - self.back() - } -} - -struct VecPreview<'a, T> { - c: &'a dyn HasFrontBack, -} - -impl<'a, T> VecPreview<'a, T> { - fn new(c: &'a dyn HasFrontBack) -> Self { - Self { c } - } -} - -impl<'a, T> fmt::Display for VecPreview<'a, T> -where - T: fmt::Display, -{ - fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - if self.c.len() == 0 { - write!(fmt, "()") - } else if self.c.len() == 1 { - write!(fmt, "{}", self.c.front().unwrap()) - } else { - write!(fmt, "{}", self.c.front().unwrap()) - } - } -} - impl fmt::Display for BinsDim0 where - NTY: fmt::Display, + NTY: fmt::Debug, { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { let self_name = any::type_name::(); write!( fmt, - "{self_name} {{ len: {:?}, ts1s: {}, ts2s {}, counts {}, mins {}, maxs {}, avgs {}, lsts {} }}", + "{self_name} {{ len: {:?}, ts1s: {:?}, ts2s {:?}, counts {:?}, mins {:?}, maxs {:?}, avgs {:?}, lsts {:?} }}", self.len(), VecPreview::new(&self.ts1s), VecPreview::new(&self.ts2s), diff --git a/crates/items_2/src/items_2.rs b/crates/items_2/src/items_2.rs index 5978d32..4f19365 100644 --- a/crates/items_2/src/items_2.rs +++ b/crates/items_2/src/items_2.rs @@ -20,6 +20,7 @@ pub mod test; pub mod testgen; pub mod timebin; pub mod transform; +pub mod vecpreview; use channelevents::ChannelEvents; use futures_util::Stream; diff --git a/crates/items_2/src/vecpreview.rs b/crates/items_2/src/vecpreview.rs new file mode 100644 index 0000000..b3f85bd --- /dev/null +++ b/crates/items_2/src/vecpreview.rs @@ -0,0 +1,28 @@ +use core::fmt; +use std::collections::VecDeque; + +pub trait PreviewRange { + fn preview(&self) -> &dyn fmt::Debug; +} + +impl PreviewRange for VecDeque { + fn preview(&self) -> &dyn fmt::Debug { + todo!() + } +} + +pub struct VecPreview<'a> { + c: &'a dyn PreviewRange, +} + +impl<'a> VecPreview<'a> { + pub fn new(c: &'a dyn PreviewRange) -> Self { + Self { c } + } +} + +impl<'a> fmt::Debug for VecPreview<'a> { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + write!(fmt, "{:?}", self.c.preview()) + } +}