diff --git a/items_0/src/collect_s.rs b/items_0/src/collect_s.rs index 671dfdd..95969fb 100644 --- a/items_0/src/collect_s.rs +++ b/items_0/src/collect_s.rs @@ -135,12 +135,12 @@ where } // TODO rename to `Typed` -pub trait CollectableType: fmt::Debug + AsAnyRef + AsAnyMut + TypeName { +pub trait CollectableType: fmt::Debug + AsAnyRef + AsAnyMut + TypeName + Send { type Collector: CollectorType; fn new_collector() -> Self::Collector; } -pub trait Collectable: fmt::Debug + AsAnyRef + AsAnyMut + TypeName { +pub trait Collectable: fmt::Debug + AsAnyRef + AsAnyMut + TypeName + Send { fn new_collector(&self) -> Box; } diff --git a/items_0/src/transform.rs b/items_0/src/transform.rs index 404bc34..4fb3354 100644 --- a/items_0/src/transform.rs +++ b/items_0/src/transform.rs @@ -1,3 +1,4 @@ +use crate::collect_s::Collectable; use crate::collect_s::Collected; use crate::streamitem::RangeCompletableItem; use crate::streamitem::Sitemty; @@ -105,18 +106,4 @@ where } } -// TODO these must return type which can take events as input. - -pub struct TransformedCollectedStream(pub Pin, Error>>>>); - -impl WithTransformProperties for TransformedCollectedStream { - fn query_transform_properties(&self) -> TransformProperties { - todo!() - } -} - -impl EventTransform for TransformedCollectedStream { - fn transform(&mut self, src: Box) -> Box { - todo!() - } -} +pub struct CollectableStream(pub Pin>> + Send>>); diff --git a/query/src/transform.rs b/query/src/transform.rs index b9ac9c8..5b02f1b 100644 --- a/query/src/transform.rs +++ b/query/src/transform.rs @@ -1,10 +1,4 @@ use err::Error; -use items_0::transform::EventStream; -use items_0::transform::TransformEvent; -use items_0::transform::TransformedCollectedStream; -use items_2::transform::make_transform_identity; -use items_2::transform::make_transform_min_max_avg; -use items_2::transform::make_transform_pulse_id_diff; use netpod::get_url_query_pairs; use netpod::log::*; use netpod::AppendToUrl; @@ -15,7 +9,7 @@ use std::collections::BTreeMap; use url::Url; #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] -enum EventTransformQuery { +pub enum EventTransformQuery { EventBlobsVerbatim, EventBlobsUncompressed, ValueFull, @@ -39,7 +33,7 @@ impl EventTransformQuery { } #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] -enum TimeBinningTransformQuery { +pub enum TimeBinningTransformQuery { None, TimeWeighted, Unweighted, @@ -114,30 +108,12 @@ impl TransformQuery { } } - pub fn build_event_transform(&self, inp: EventStream) -> Result { - match &self.event { - EventTransformQuery::ValueFull => Ok(make_transform_identity()), - EventTransformQuery::MinMaxAvgDev => Ok(make_transform_min_max_avg()), - EventTransformQuery::ArrayPick(..) => Err(Error::with_msg_no_trace(format!( - "build_event_transform don't know what to do {self:?}" - ))), - EventTransformQuery::PulseIdDiff => Ok(make_transform_pulse_id_diff()), - EventTransformQuery::EventBlobsVerbatim => Err(Error::with_msg_no_trace(format!( - "build_event_transform don't know what to do {self:?}" - ))), - EventTransformQuery::EventBlobsUncompressed => Err(Error::with_msg_no_trace(format!( - "build_event_transform don't know what to do {self:?}" - ))), - } + pub fn get_tr_event(&self) -> &EventTransformQuery { + &self.event } - pub fn build_full_transform_collected(&self, inp: EventStream) -> Result { - let evs = self.build_event_transform(inp)?; - match &self.time_binning { - TimeBinningTransformQuery::None => todo!(), - TimeBinningTransformQuery::TimeWeighted => todo!(), - TimeBinningTransformQuery::Unweighted => todo!(), - } + pub fn get_tr_time_binning(&self) -> &TimeBinningTransformQuery { + &self.time_binning } } diff --git a/streams/src/collect.rs b/streams/src/collect.rs index 060ade0..e386530 100644 --- a/streams/src/collect.rs +++ b/streams/src/collect.rs @@ -8,6 +8,7 @@ use items_0::streamitem::RangeCompletableItem; use items_0::streamitem::Sitemty; use items_0::streamitem::StatsItem; use items_0::streamitem::StreamItem; +use items_0::transform::EventStream; use items_0::WithLen; use netpod::log::*; use netpod::range::evrange::SeriesRange; @@ -36,6 +37,10 @@ macro_rules! trace4 { ($($arg:tt)*) => (eprintln!($($arg)*)); } +pub struct Collect { + inp: EventStream, +} + async fn collect_in_span( stream: S, deadline: Instant, diff --git a/streams/src/lib.rs b/streams/src/lib.rs index 0095b59..92369db 100644 --- a/streams/src/lib.rs +++ b/streams/src/lib.rs @@ -13,3 +13,4 @@ pub mod tcprawclient; pub mod test; pub mod timebin; pub mod timebinnedjson; +pub mod transform; diff --git a/streams/src/plaineventsjson.rs b/streams/src/plaineventsjson.rs index 163892d..419678f 100644 --- a/streams/src/plaineventsjson.rs +++ b/streams/src/plaineventsjson.rs @@ -91,7 +91,7 @@ pub async fn plain_events_json(evq: &PlainEventsQuery, chconf: &ChConf, cluster: info!("item after merge: {item:?}"); item }); - let stream = RangeFilter2::new(stream, evq.range().try_into()?, evquery.one_before_range()); + //let stream = RangeFilter2::new(stream, evq.range().try_into()?, evquery.one_before_range()); #[cfg(DISABLED)] let stream = stream.map(|item| { info!("item after rangefilter: {item:?}"); diff --git a/streams/src/test/timebin.rs b/streams/src/test/timebin.rs index 493e975..c9d224e 100644 --- a/streams/src/test/timebin.rs +++ b/streams/src/test/timebin.rs @@ -1,6 +1,7 @@ use crate::collect::collect; use crate::generators::GenerateI32; use crate::test::runfut; +use crate::transform::build_event_transform; use chrono::DateTime; use chrono::Utc; use err::Error; @@ -255,6 +256,6 @@ fn transform_chain_correctness_01() -> Result<(), Error> { type STY = f32; let tq = TransformQuery::default_time_binned(); let empty = EventsDim0::::empty(); - tq.build_event_transform(empty.into())?; + build_event_transform(&tq, empty.into())?; Ok(()) } diff --git a/streams/src/transform.rs b/streams/src/transform.rs new file mode 100644 index 0000000..fd6730d --- /dev/null +++ b/streams/src/transform.rs @@ -0,0 +1,62 @@ +use err::Error; +use futures_util::StreamExt; +use items_0::transform::CollectableStream; +use items_0::transform::EventStream; +use items_0::transform::TransformEvent; +use items_2::transform::make_transform_identity; +use items_2::transform::make_transform_min_max_avg; +use items_2::transform::make_transform_pulse_id_diff; +use query::transform::EventTransformQuery; +use query::transform::TimeBinningTransformQuery; +use query::transform::TransformQuery; + +pub fn build_event_transform(tr: &TransformQuery, inp: EventStream) -> Result { + let trev = tr.get_tr_event(); + match trev { + EventTransformQuery::ValueFull => Ok(make_transform_identity()), + EventTransformQuery::MinMaxAvgDev => Ok(make_transform_min_max_avg()), + EventTransformQuery::ArrayPick(..) => Err(Error::with_msg_no_trace(format!( + "build_event_transform don't know what to do {trev:?}" + ))), + EventTransformQuery::PulseIdDiff => Ok(make_transform_pulse_id_diff()), + EventTransformQuery::EventBlobsVerbatim => Err(Error::with_msg_no_trace(format!( + "build_event_transform don't know what to do {trev:?}" + ))), + EventTransformQuery::EventBlobsUncompressed => Err(Error::with_msg_no_trace(format!( + "build_event_transform don't know what to do {trev:?}" + ))), + } +} + +pub fn build_full_transform_collected(tr: &TransformQuery, inp: EventStream) -> Result { + // TODO this must return a Stream! + //let evs = build_event_transform(tr, inp)?; + let trtb = tr.get_tr_time_binning(); + use futures_util::Stream; + use items_0::collect_s::Collectable; + use items_0::streamitem::RangeCompletableItem; + use items_0::streamitem::Sitemty; + use items_0::streamitem::StreamItem; + use std::pin::Pin; + let a: Pin>> + Send>> = Box::pin(inp.0.map(|item| match item { + Ok(item) => match item { + StreamItem::DataItem(item) => match item { + RangeCompletableItem::Data(item) => { + let item: Box = Box::new(item); + Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))) + } + RangeCompletableItem::RangeComplete => Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)), + }, + StreamItem::Log(item) => Ok(StreamItem::Log(item)), + StreamItem::Stats(item) => Ok(StreamItem::Stats(item)), + }, + Err(e) => Err(e), + })); + let stream: Pin>> + Send>> = + Box::pin(futures_util::stream::empty()); + match trtb { + TimeBinningTransformQuery::None => Ok(CollectableStream(stream)), + TimeBinningTransformQuery::TimeWeighted => todo!(), + TimeBinningTransformQuery::Unweighted => todo!(), + } +}