From 7f641f59711bdec3ec250f832657864bdac442fc Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Mon, 17 Apr 2023 16:24:33 +0200 Subject: [PATCH] WIP --- items_0/src/timebin.rs | 64 ++++++++++++++++++++++++++++++++++++++-- items_0/src/transform.rs | 13 ++++++++ streams/src/transform.rs | 46 ++++++++++++++++++++++++++++- 3 files changed, 119 insertions(+), 4 deletions(-) diff --git a/items_0/src/timebin.rs b/items_0/src/timebin.rs index e57cc28..2943634 100644 --- a/items_0/src/timebin.rs +++ b/items_0/src/timebin.rs @@ -2,10 +2,12 @@ use crate::collect_s::Collectable; use crate::collect_s::ToJsonResult; use crate::AsAnyMut; use crate::AsAnyRef; +use crate::Events; use crate::RangeOverlapInfo; use crate::TypeName; use crate::WithLen; use netpod::log::*; +use netpod::range::evrange::SeriesRange; use netpod::BinnedRangeEnum; use std::any::Any; use std::fmt; @@ -89,6 +91,60 @@ pub trait TimeBinnable: fmt::Debug + WithLen + RangeOverlapInfo + Any + AsAnyRef fn to_box_to_json_result(&self) -> Box; } +impl WithLen for Box { + fn len(&self) -> usize { + todo!() + } +} + +impl RangeOverlapInfo for Box { + fn ends_before(&self, range: &SeriesRange) -> bool { + todo!() + } + + fn ends_after(&self, range: &SeriesRange) -> bool { + todo!() + } + + fn starts_after(&self, range: &SeriesRange) -> bool { + todo!() + } +} + +impl TimeBinnable for Box { + fn time_binner_new(&self, binrange: BinnedRangeEnum, do_time_weight: bool) -> Box { + todo!() + } + + fn to_box_to_json_result(&self) -> Box { + todo!() + } +} + +impl RangeOverlapInfo for Box { + fn ends_before(&self, range: &SeriesRange) -> bool { + todo!() + } + + fn ends_after(&self, range: &SeriesRange) -> bool { + todo!() + } + + fn starts_after(&self, range: &SeriesRange) -> bool { + todo!() + } +} + +impl TimeBinnable for Box { + fn time_binner_new(&self, binrange: BinnedRangeEnum, do_time_weight: bool) -> Box { + todo!() + } + + fn to_box_to_json_result(&self) -> Box { + todo!() + } +} + #[derive(Debug)] pub struct TimeBinnerDynStruct { binrange: BinnedRangeEnum, @@ -112,9 +168,11 @@ impl TimeBinnerTy for TimeBinnerDynStruct { fn ingest(&mut self, item: &mut Self::Input) { if self.binner.is_none() { - self.binner = Some(Box::new( - item.time_binner_new(self.binrange.clone(), self.do_time_weight), - )); + self.binner = Some(Box::new(TimeBinnableTy::time_binner_new( + item, + self.binrange.clone(), + self.do_time_weight, + ))); } self.binner.as_mut().unwrap().as_mut().ingest(item.as_mut()) } diff --git a/items_0/src/transform.rs b/items_0/src/transform.rs index 4fb3354..c0d70a5 100644 --- a/items_0/src/transform.rs +++ b/items_0/src/transform.rs @@ -3,6 +3,7 @@ use crate::collect_s::Collected; use crate::streamitem::RangeCompletableItem; use crate::streamitem::Sitemty; use crate::streamitem::StreamItem; +use crate::timebin::TimeBinnable; use crate::Events; use err::Error; use futures_util::stream; @@ -10,6 +11,18 @@ use futures_util::Future; use futures_util::Stream; use std::pin::Pin; +pub trait EventStreamTrait: Stream>> + WithTransformProperties + Send {} + +pub trait TimeBinnableStreamTrait: + Stream>> + WithTransformProperties + Send +{ +} + +pub trait CollectableStreamTrait: + Stream>> + WithTransformProperties + Send +{ +} + pub struct TransformProperties { pub needs_one_before_range: bool, pub needs_value: bool, diff --git a/streams/src/transform.rs b/streams/src/transform.rs index fd6730d..e175dde 100644 --- a/streams/src/transform.rs +++ b/streams/src/transform.rs @@ -1,14 +1,25 @@ use err::Error; +use futures_util::Stream; use futures_util::StreamExt; +use items_0::streamitem::RangeCompletableItem; +use items_0::streamitem::Sitemty; +use items_0::streamitem::StreamItem; +use items_0::timebin::TimeBinnable; use items_0::transform::CollectableStream; use items_0::transform::EventStream; +use items_0::transform::EventStreamTrait; use items_0::transform::TransformEvent; +use items_0::transform::WithTransformProperties; +use items_0::Events; use items_2::transform::make_transform_identity; use items_2::transform::make_transform_min_max_avg; use items_2::transform::make_transform_pulse_id_diff; use query::transform::EventTransformQuery; use query::transform::TimeBinningTransformQuery; use query::transform::TransformQuery; +use std::pin::Pin; +use std::task::Context; +use std::task::Poll; pub fn build_event_transform(tr: &TransformQuery, inp: EventStream) -> Result { let trev = tr.get_tr_event(); @@ -28,7 +39,40 @@ pub fn build_event_transform(tr: &TransformQuery, inp: EventStream) -> Result Result { +pub struct EventsToTimeBinnable { + inp: Pin>, +} + +impl Stream for EventsToTimeBinnable { + type Item = Sitemty>; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + use Poll::*; + match self.inp.poll_next_unpin(cx) { + Ready(Some(item)) => Ready(Some(match item { + Ok(item) => Ok(match item { + StreamItem::DataItem(item) => StreamItem::DataItem(match item { + RangeCompletableItem::RangeComplete => RangeCompletableItem::RangeComplete, + RangeCompletableItem::Data(item) => RangeCompletableItem::Data(Box::new(item)), + }), + StreamItem::Log(item) => StreamItem::Log(item), + StreamItem::Stats(item) => StreamItem::Stats(item), + }), + Err(e) => Err(e), + })), + Ready(None) => Ready(None), + Pending => Pending, + } + } +} + +impl WithTransformProperties for EventsToTimeBinnable { + fn query_transform_properties(&self) -> items_0::transform::TransformProperties { + self.inp.query_transform_properties() + } +} + +pub fn build_full_transform_collectable(tr: &TransformQuery, inp: EventStream) -> Result { // TODO this must return a Stream! //let evs = build_event_transform(tr, inp)?; let trtb = tr.get_tr_time_binning();