diff --git a/items_0/Cargo.toml b/items_0/Cargo.toml index 78cf410..4384537 100644 --- a/items_0/Cargo.toml +++ b/items_0/Cargo.toml @@ -13,6 +13,7 @@ erased-serde = "0.3" serde_json = "1.0" bincode = "1.3.3" bytes = "1.2.1" +futures-util = "0.3.24" chrono = { version = "0.4.19", features = ["serde"] } netpod = { path = "../netpod" } err = { path = "../err" } diff --git a/items_0/src/items_0.rs b/items_0/src/items_0.rs index 8699731..407321f 100644 --- a/items_0/src/items_0.rs +++ b/items_0/src/items_0.rs @@ -12,6 +12,8 @@ pub mod bincode { pub use bincode::*; } +pub use futures_util; + use collect_s::Collectable; use container::ByteEstimate; use netpod::range::evrange::SeriesRange; diff --git a/items_0/src/transform.rs b/items_0/src/transform.rs index 970af93..404bc34 100644 --- a/items_0/src/transform.rs +++ b/items_0/src/transform.rs @@ -1,4 +1,12 @@ +use crate::collect_s::Collected; +use crate::streamitem::RangeCompletableItem; +use crate::streamitem::Sitemty; +use crate::streamitem::StreamItem; use crate::Events; +use err::Error; +use futures_util::stream; +use futures_util::Future; +use futures_util::Stream; use std::pin::Pin; pub struct TransformProperties { @@ -6,8 +14,29 @@ pub struct TransformProperties { pub needs_value: bool, } -pub trait EventTransform { +pub trait WithTransformProperties { fn query_transform_properties(&self) -> TransformProperties; +} + +impl WithTransformProperties for Box +where + T: WithTransformProperties, +{ + fn query_transform_properties(&self) -> TransformProperties { + self.as_ref().query_transform_properties() + } +} + +impl WithTransformProperties for Pin> +where + T: WithTransformProperties, +{ + fn query_transform_properties(&self) -> TransformProperties { + self.as_ref().query_transform_properties() + } +} + +pub trait EventTransform: WithTransformProperties { fn transform(&mut self, src: Box) -> Box; } @@ -15,12 +44,8 @@ impl EventTransform for Box where T: EventTransform, { - fn query_transform_properties(&self) -> TransformProperties { - self.as_ref().query_transform_properties() - } - fn transform(&mut self, src: Box) -> Box { - todo!() + self.as_mut().transform(src) } } @@ -28,10 +53,6 @@ impl EventTransform for Pin> where T: EventTransform, { - fn query_transform_properties(&self) -> TransformProperties { - self.as_ref().query_transform_properties() - } - fn transform(&mut self, src: Box) -> Box { todo!() } @@ -45,12 +66,57 @@ impl IdentityTransform { } } -impl EventTransform for IdentityTransform { - fn transform(&mut self, src: Box) -> Box { - src - } - +impl WithTransformProperties for IdentityTransform { fn query_transform_properties(&self) -> TransformProperties { todo!() } } + +impl EventTransform for IdentityTransform { + fn transform(&mut self, src: Box) -> Box { + src + } +} + +pub struct TransformEvent(pub Box); + +impl WithTransformProperties for TransformEvent { + fn query_transform_properties(&self) -> TransformProperties { + self.0.query_transform_properties() + } +} + +impl EventTransform for TransformEvent { + fn transform(&mut self, src: Box) -> Box { + self.0.transform(src) + } +} + +pub struct EventStream(pub Pin>> + Send>>); + +impl From for EventStream +where + T: Events, +{ + fn from(value: T) -> Self { + let item = Ok(StreamItem::DataItem(RangeCompletableItem::Data(Box::new(value) as _))); + let x = stream::iter(vec![item]); + EventStream(Box::pin(x)) + } +} + +// TODO these must return type which can take events as input. + +pub struct TransformedCollectedStream(pub Pin, Error>>>>); + +impl WithTransformProperties for TransformedCollectedStream { + fn query_transform_properties(&self) -> TransformProperties { + todo!() + } +} + +impl EventTransform for TransformedCollectedStream { + fn transform(&mut self, src: Box) -> Box { + todo!() + } +} diff --git a/items_2/src/items_2.rs b/items_2/src/items_2.rs index 8d4dbf7..87c2cb2 100644 --- a/items_2/src/items_2.rs +++ b/items_2/src/items_2.rs @@ -15,6 +15,7 @@ pub mod streams; #[cfg(test)] pub mod test; pub mod testgen; +pub mod transform; use channelevents::ChannelEvents; use chrono::DateTime; diff --git a/items_2/src/merger.rs b/items_2/src/merger.rs index 02fb230..7dc8d1f 100644 --- a/items_2/src/merger.rs +++ b/items_2/src/merger.rs @@ -9,6 +9,7 @@ use items_0::streamitem::Sitemty; use items_0::streamitem::StreamItem; use items_0::transform::EventTransform; use items_0::transform::TransformProperties; +use items_0::transform::WithTransformProperties; use items_0::MergeError; use items_0::WithLen; use netpod::log::*; @@ -444,11 +445,13 @@ where } } -impl EventTransform for Merger { +impl WithTransformProperties for Merger { fn query_transform_properties(&self) -> TransformProperties { todo!() } +} +impl EventTransform for Merger { fn transform(&mut self, src: Box) -> Box { todo!() } diff --git a/items_2/src/streams.rs b/items_2/src/streams.rs index 8f40640..c3db44a 100644 --- a/items_2/src/streams.rs +++ b/items_2/src/streams.rs @@ -4,6 +4,7 @@ use futures_util::Stream; use futures_util::StreamExt; use items_0::transform::EventTransform; use items_0::transform::TransformProperties; +use items_0::transform::WithTransformProperties; use std::collections::VecDeque; use std::pin::Pin; use std::task::Context; @@ -43,11 +44,19 @@ where } } -impl EventTransform for Enumerate2 { +impl WithTransformProperties for Enumerate2 +where + T: WithTransformProperties, +{ fn query_transform_properties(&self) -> TransformProperties { - todo!() + self.inp.query_transform_properties() } +} +impl EventTransform for Enumerate2 +where + T: WithTransformProperties, +{ fn transform(&mut self, src: Box) -> Box { todo!() } @@ -64,7 +73,10 @@ where T: Stream, F: Fn(::Item) -> Fut, { - pub fn new(inp: T, f: F) -> Self { + pub fn new(inp: T, f: F) -> Self + where + T: EventTransform, + { Self { inp: Box::pin(inp), f: Box::pin(f), @@ -118,11 +130,19 @@ where } } -impl EventTransform for Then2 { +impl WithTransformProperties for Then2 +where + T: EventTransform, +{ fn query_transform_properties(&self) -> TransformProperties { - todo!() + self.inp.query_transform_properties() } +} +impl EventTransform for Then2 +where + T: EventTransform, +{ fn transform(&mut self, src: Box) -> Box { todo!() } @@ -186,11 +206,13 @@ where } } -impl EventTransform for VecStream { +impl WithTransformProperties for VecStream { fn query_transform_properties(&self) -> TransformProperties { todo!() } +} +impl EventTransform for VecStream { fn transform(&mut self, src: Box) -> Box { todo!() } diff --git a/items_2/src/transform.rs b/items_2/src/transform.rs new file mode 100644 index 0000000..5edfe5c --- /dev/null +++ b/items_2/src/transform.rs @@ -0,0 +1,97 @@ +//! Helper functions to create transforms which act locally on a batch of events. +//! Tailored to the usage pattern given by `TransformQuery`. + +use crate::channelevents::ChannelEvents; +use crate::eventsdim0::EventsDim0; +use items_0::transform::EventStream; +use items_0::transform::EventTransform; +use items_0::transform::TransformEvent; +use items_0::transform::TransformProperties; +use items_0::transform::WithTransformProperties; +use items_0::Appendable; +use items_0::AsAnyMut; +use items_0::Empty; +use items_0::Events; +use items_0::EventsNonObj; +use netpod::log::*; +use std::mem; + +struct TransformEventIdentity {} + +impl WithTransformProperties for TransformEventIdentity { + fn query_transform_properties(&self) -> TransformProperties { + todo!() + } +} + +impl EventTransform for TransformEventIdentity { + fn transform(&mut self, src: Box) -> Box { + todo!() + } +} + +pub fn make_transform_identity() -> TransformEvent { + TransformEvent(Box::new(TransformEventIdentity {})) +} + +struct TransformEventMinMaxAvg {} + +impl WithTransformProperties for TransformEventMinMaxAvg { + fn query_transform_properties(&self) -> TransformProperties { + todo!() + } +} + +impl EventTransform for TransformEventMinMaxAvg { + fn transform(&mut self, src: Box) -> Box { + todo!() + } +} + +pub fn make_transform_min_max_avg() -> TransformEvent { + TransformEvent(Box::new(TransformEventMinMaxAvg {})) +} + +struct TransformEventPulseIdDiff {} + +impl WithTransformProperties for TransformEventPulseIdDiff { + fn query_transform_properties(&self) -> TransformProperties { + todo!() + } +} + +impl EventTransform for TransformEventPulseIdDiff { + fn transform(&mut self, src: Box) -> Box { + let mut src = src; + if let Some(chevs) = src.as_any_mut().downcast_mut::() { + let chevs2 = chevs; + let chevs = mem::replace(chevs2, ChannelEvents::Status(None)); + let mut pulse_last = None; + match chevs { + ChannelEvents::Events(item) => { + let (tss, pulses) = EventsNonObj::into_tss_pulses(item); + let mut item = EventsDim0::empty(); + for (ts, pulse) in tss.into_iter().zip(pulses) { + let value = if let Some(last) = pulse_last { + pulse as i64 - last as i64 + } else { + 0 + }; + item.push(ts, pulse, value); + pulse_last = Some(pulse); + } + *chevs2 = ChannelEvents::Events(Box::new(item)); + } + ChannelEvents::Status(_) => {} + } + src + } else { + warn!("make_transform_pulse_id_diff item is not ChannelEvents"); + src + } + } +} + +pub fn make_transform_pulse_id_diff() -> TransformEvent { + TransformEvent(Box::new(TransformEventPulseIdDiff {})) +} diff --git a/query/Cargo.toml b/query/Cargo.toml index f617061..3dca474 100644 --- a/query/Cargo.toml +++ b/query/Cargo.toml @@ -14,3 +14,4 @@ humantime-serde = "1.1.1" err = { path = "../err" } netpod = { path = "../netpod" } items_0 = { path = "../items_0" } +items_2 = { path = "../items_2" } diff --git a/query/src/transform.rs b/query/src/transform.rs index efd119c..b9ac9c8 100644 --- a/query/src/transform.rs +++ b/query/src/transform.rs @@ -1,4 +1,10 @@ use err::Error; +use items_0::transform::EventStream; +use items_0::transform::TransformEvent; +use items_0::transform::TransformedCollectedStream; +use items_2::transform::make_transform_identity; +use items_2::transform::make_transform_min_max_avg; +use items_2::transform::make_transform_pulse_id_diff; use netpod::get_url_query_pairs; use netpod::log::*; use netpod::AppendToUrl; @@ -108,7 +114,31 @@ impl TransformQuery { } } - pub fn build_event_transform(&self) -> () {} + pub fn build_event_transform(&self, inp: EventStream) -> Result { + match &self.event { + EventTransformQuery::ValueFull => Ok(make_transform_identity()), + EventTransformQuery::MinMaxAvgDev => Ok(make_transform_min_max_avg()), + EventTransformQuery::ArrayPick(..) => Err(Error::with_msg_no_trace(format!( + "build_event_transform don't know what to do {self:?}" + ))), + EventTransformQuery::PulseIdDiff => Ok(make_transform_pulse_id_diff()), + EventTransformQuery::EventBlobsVerbatim => Err(Error::with_msg_no_trace(format!( + "build_event_transform don't know what to do {self:?}" + ))), + EventTransformQuery::EventBlobsUncompressed => Err(Error::with_msg_no_trace(format!( + "build_event_transform don't know what to do {self:?}" + ))), + } + } + + pub fn build_full_transform_collected(&self, inp: EventStream) -> Result { + let evs = self.build_event_transform(inp)?; + match &self.time_binning { + TimeBinningTransformQuery::None => todo!(), + TimeBinningTransformQuery::TimeWeighted => todo!(), + TimeBinningTransformQuery::Unweighted => todo!(), + } + } } impl FromUrl for TransformQuery { diff --git a/streams/src/boxed.rs b/streams/src/boxed.rs new file mode 100644 index 0000000..2abfaa4 --- /dev/null +++ b/streams/src/boxed.rs @@ -0,0 +1,20 @@ +use futures_util::stream::StreamExt; +use futures_util::Stream; +use items_0::on_sitemty_data; +use items_0::streamitem::Sitemty; +use items_0::transform::EventStream; +use items_0::Events; + +pub fn boxed_event_stream(inp: S) -> EventStream +where + T: Events, + S: Stream> + Send + 'static, +{ + let stream = inp.map(|x| { + let x = on_sitemty_data!(x, |x| Ok(StreamItem::DataItem(RangeCompletableItem::Data( + Box::new(x) as Box + )))); + x + }); + EventStream(Box::pin(stream)) +} diff --git a/streams/src/generators.rs b/streams/src/generators.rs index ea1bbe1..cc9d583 100644 --- a/streams/src/generators.rs +++ b/streams/src/generators.rs @@ -3,7 +3,9 @@ use futures_util::FutureExt; use futures_util::Stream; use items_0::container::ByteEstimate; use items_0::streamitem::sitem_data; +use items_0::streamitem::RangeCompletableItem; use items_0::streamitem::Sitemty; +use items_0::streamitem::StreamItem; use items_0::Appendable; use items_0::Empty; use items_2::channelevents::ChannelEvents; @@ -21,6 +23,8 @@ pub struct GenerateI32 { #[allow(unused)] c1: u64, timeout: Option + Send>>>, + done: bool, + done_range_final: bool, } impl GenerateI32 { @@ -38,6 +42,8 @@ impl GenerateI32 { tsend, c1: 0, timeout: None, + done: false, + done_range_final: false, } } @@ -67,8 +73,12 @@ impl Stream for GenerateI32 { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; loop { - break if self.ts >= self.tsend { + break if self.done_range_final { Ready(None) + } else if self.ts >= self.tsend { + self.done = true; + self.done_range_final = true; + Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)))) } else if false { // To use the generator without throttling, use this scope Ready(Some(self.make_batch())) diff --git a/streams/src/lib.rs b/streams/src/lib.rs index 1be5a74..0095b59 100644 --- a/streams/src/lib.rs +++ b/streams/src/lib.rs @@ -1,3 +1,4 @@ +pub mod boxed; pub mod collect; pub mod dtflags; pub mod filechunkread; diff --git a/streams/src/test/timebin.rs b/streams/src/test/timebin.rs index faae2bf..493e975 100644 --- a/streams/src/test/timebin.rs +++ b/streams/src/test/timebin.rs @@ -16,12 +16,14 @@ use items_2::binsdim0::BinsDim0; use items_2::channelevents::ChannelEvents; use items_2::channelevents::ConnStatus; use items_2::channelevents::ConnStatusEvent; +use items_2::eventsdim0::EventsDim0; use items_2::testgen::make_some_boxed_d0_f32; use netpod::range::evrange::NanoRange; use netpod::range::evrange::SeriesRange; use netpod::timeunits::MS; use netpod::timeunits::SEC; use netpod::BinnedRangeEnum; +use query::transform::TransformQuery; use serde_json::Value as JsValue; use std::collections::VecDeque; use std::time::Duration; @@ -209,6 +211,10 @@ fn time_bin_02() -> Result<(), Error> { let s = String::from_utf8_lossy(&d); eprintln!("{s}"); let jsval: JsValue = serde_json::from_slice(&d)?; + { + let ts_anchor = jsval.get("tsAnchor").unwrap().as_u64().unwrap(); + assert_eq!(ts_anchor, 1200); + } { let counts = jsval.get("counts").unwrap().as_array().unwrap(); assert_eq!(counts.len(), expected_bin_count); @@ -232,6 +238,10 @@ fn time_bin_02() -> Result<(), Error> { assert_eq!((40 + ts1ms.as_u64().unwrap() / 100) % 1000, max.as_u64().unwrap()); } } + { + let range_final = jsval.get("rangeFinal").unwrap().as_bool().unwrap(); + assert_eq!(range_final, true); + } } Ok(()) }; @@ -239,3 +249,12 @@ fn time_bin_02() -> Result<(), Error> { } // TODO add test case to observe RangeComplete after binning. + +#[test] +fn transform_chain_correctness_01() -> Result<(), Error> { + type STY = f32; + let tq = TransformQuery::default_time_binned(); + let empty = EventsDim0::::empty(); + tq.build_event_transform(empty.into())?; + Ok(()) +}