From 868f4b014a1d87c744ab72dd377787e7c062d563 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Wed, 19 Apr 2023 13:50:33 +0200 Subject: [PATCH] higher-ranked lifetime error, could not prove, could not normalize --- items_0/src/timebin.rs | 24 +++++++++++++ items_2/src/streams.rs | 65 +++++++++++++++++++++++++++++++++ streams/src/timebin.rs | 6 ++++ streams/src/timebinnedjson.rs | 68 +++++++++++++++++++++++------------ streams/src/transform.rs | 2 ++ 5 files changed, 142 insertions(+), 23 deletions(-) diff --git a/items_0/src/timebin.rs b/items_0/src/timebin.rs index 986bfac..e22778e 100644 --- a/items_0/src/timebin.rs +++ b/items_0/src/timebin.rs @@ -62,6 +62,30 @@ pub trait TimeBinned: Any + TypeName + TimeBinnable + Collectable + erased_serde fn validate(&self) -> Result<(), String>; } +impl RangeOverlapInfo for Box { + fn ends_before(&self, range: &SeriesRange) -> bool { + todo!() + } + + fn ends_after(&self, range: &SeriesRange) -> bool { + todo!() + } + + fn starts_after(&self, range: &SeriesRange) -> bool { + todo!() + } +} + +impl TimeBinnable for Box { + fn time_binner_new(&self, binrange: BinnedRangeEnum, do_time_weight: bool) -> Box { + todo!() + } + + fn to_box_to_json_result(&self) -> Box { + todo!() + } +} + pub trait TimeBinner: fmt::Debug + Send { fn ingest(&mut self, item: &mut dyn TimeBinnable); fn bins_ready_count(&self) -> usize; diff --git a/items_2/src/streams.rs b/items_2/src/streams.rs index dcd9579..1df1bcd 100644 --- a/items_2/src/streams.rs +++ b/items_2/src/streams.rs @@ -6,9 +6,11 @@ use items_0::collect_s::Collectable; use items_0::streamitem::RangeCompletableItem; use items_0::streamitem::Sitemty; use items_0::streamitem::StreamItem; +use items_0::timebin::TimeBinnable; use items_0::transform::CollectableStreamTrait; use items_0::transform::EventStreamTrait; use items_0::transform::EventTransform; +use items_0::transform::TimeBinnableStreamTrait; use items_0::transform::TransformProperties; use items_0::transform::WithTransformProperties; use items_0::Events; @@ -292,3 +294,66 @@ where INP: Stream> + Send, { } + +/// Wrap any event stream and provide transformation properties. +pub struct PlainTimeBinnableStream +where + T: TimeBinnable, + INP: Stream> + Send, +{ + inp: Pin>, +} + +impl PlainTimeBinnableStream +where + T: TimeBinnable, + INP: Stream> + Send, +{ + pub fn new(inp: INP) -> Self { + Self { inp: Box::pin(inp) } + } +} + +impl Stream for PlainTimeBinnableStream +where + T: TimeBinnable, + INP: Stream> + Send, +{ + type Item = Sitemty>; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + use Poll::*; + match self.inp.poll_next_unpin(cx) { + Ready(Some(item)) => Ready(Some(match item { + Ok(item) => Ok(match item { + StreamItem::DataItem(item) => StreamItem::DataItem(match item { + RangeCompletableItem::RangeComplete => RangeCompletableItem::RangeComplete, + RangeCompletableItem::Data(item) => RangeCompletableItem::Data(Box::new(item)), + }), + StreamItem::Log(item) => StreamItem::Log(item), + StreamItem::Stats(item) => StreamItem::Stats(item), + }), + Err(e) => Err(e), + })), + Ready(None) => Ready(None), + Pending => Pending, + } + } +} + +impl WithTransformProperties for PlainTimeBinnableStream +where + T: TimeBinnable, + INP: Stream> + Send, +{ + fn query_transform_properties(&self) -> TransformProperties { + todo!() + } +} + +impl TimeBinnableStreamTrait for PlainTimeBinnableStream +where + T: TimeBinnable, + INP: Stream> + Send, +{ +} diff --git a/streams/src/timebin.rs b/streams/src/timebin.rs index aace697..c43df86 100644 --- a/streams/src/timebin.rs +++ b/streams/src/timebin.rs @@ -10,6 +10,8 @@ use items_0::streamitem::StreamItem; use items_0::timebin::TimeBinnableTy; use items_0::timebin::TimeBinner; use items_0::timebin::TimeBinnerTy; +use items_0::transform::TimeBinnableStreamTrait; +use items_0::transform::WithTransformProperties; use netpod::log::*; use netpod::BinnedRange; use netpod::BinnedRangeEnum; @@ -237,3 +239,7 @@ where } } } + +//impl WithTransformProperties for TimeBinnedStream where T: TimeBinnableTy {} + +//impl TimeBinnableStreamTrait for TimeBinnedStream where T: TimeBinnableTy {} diff --git a/streams/src/timebinnedjson.rs b/streams/src/timebinnedjson.rs index 3a7f3cc..88a967e 100644 --- a/streams/src/timebinnedjson.rs +++ b/streams/src/timebinnedjson.rs @@ -1,15 +1,26 @@ +use crate::collect::Collect; use crate::rangefilter2::RangeFilter2; use crate::tcprawclient::open_tcp_streams; use crate::timebin::TimeBinnedStream; +use crate::transform::build_merged_event_transform; +use crate::transform::EventsToTimeBinnable; +use crate::transform::TimeBinnableToCollectable; use err::Error; use futures_util::stream; use futures_util::StreamExt; +use items_0::on_sitemty_data; use items_0::streamitem::sitem_data; use items_0::streamitem::Sitemty; +use items_0::timebin::TimeBinnable; use items_0::timebin::TimeBinned; +use items_0::transform::TimeBinnableStreamBox; +use items_0::Events; use items_2::channelevents::ChannelEvents; use items_2::merger::Merger; +use items_2::streams::PlainEventStream; +use items_2::streams::PlainTimeBinnableStream; use netpod::log::*; +use netpod::BinnedRange; use netpod::BinnedRangeEnum; use netpod::ChConf; use netpod::Cluster; @@ -23,22 +34,13 @@ pub async fn timebinned_json(query: &BinnedQuery, chconf: &ChConf, cluster: &Clu let bins_max = 10000; warn!("TODO add with_deadline to PlainEventsQuery"); let deadline = Instant::now() + query.timeout_value(); - let empty = items_2::empty::empty_events_dyn_ev(&chconf.scalar_type, &chconf.shape)?; - warn!("TODO feed through transform chain"); - let empty = ChannelEvents::Events(empty); - let empty = sitem_data(empty); - - // TODO this part is supposed to be done one the event nodes, if sharded: - //crate::transform::build_event_transform(tr, inp); - query.transform(); - - // TODO - let evquery = PlainEventsQuery::new(query.channel().clone(), query.range().clone()).for_time_weighted_scalar(); - let inps = open_tcp_streams::<_, items_2::channelevents::ChannelEvents>(&evquery, cluster).await?; - // TODO propagate also the max-buf-len for the first stage event reader: - info!("timebinned_json with empty item {empty:?}"); - let stream = Merger::new(inps, 1024); - let stream = stream::iter([empty]).chain(stream); + // TODO construct the events query in a better way. + let evq = PlainEventsQuery::new(query.channel().clone(), query.range().clone()).for_time_weighted_scalar(); + let mut tr = build_merged_event_transform(evq.transform())?; + let inps = open_tcp_streams::<_, ChannelEvents>(&evq, cluster).await?; + // TODO propagate also the max-buf-len for the first stage event reader. + // TODO use a mixture of count and byte-size as threshold. + let stream = Merger::new(inps, evq.merger_out_len_max()); // TODO let do_time_weight = true; @@ -48,19 +50,39 @@ pub async fn timebinned_json(query: &BinnedQuery, chconf: &ChConf, cluster: &Clu let range = query.range().try_into()?; let stream = RangeFilter2::new(stream, range, one_before_range); + + let stream = stream.map(move |k| { + on_sitemty_data!(k, |k| { + let k: Box = Box::new(k); + info!("-------------------------\ngot len {}", k.len()); + let k = tr.0.transform(k); + Ok(StreamItem::DataItem(RangeCompletableItem::Data(k))) + }) + }); + let stream = PlainEventStream::new(stream); + let stream = EventsToTimeBinnable::new(stream); let stream = Box::pin(stream); // TODO TimeBinnedStream must accept types bin edges. // Maybe even take a BinnedRangeEnum? - let stream = TimeBinnedStream::new(stream, todo!(), do_time_weight, deadline); - if false { - let mut stream = stream; - let _: Option>> = stream.next().await; - panic!() - } + let stream = TimeBinnedStream::new(stream, binned_range, do_time_weight, deadline); + let stream = stream.map(|k| { + on_sitemty_data!(k, |k| { + let k: Box = Box::new(k); + sitem_data(k) + }) + }); + let stream = PlainTimeBinnableStream::new(stream); + //let stream = Box::pin(stream); + //let stream = TimeBinnableStreamBox(stream); + + let stream = TimeBinnableToCollectable::new(stream); + let stream = Box::pin(stream); // TODO collect should not have to accept two ranges, instead, generalize over it. - let collected = crate::collect::collect(stream, deadline, bins_max, None, Some(binned_range.clone())).await?; + //let collected = crate::collect::collect(stream, deadline, bins_max, None, Some(binned_range.clone())).await?; + let stream = futures_util::stream::empty(); + let collected = Collect::new(stream, deadline, evq.events_max(), Some(evq.range().clone()), None).await?; let jsval = serde_json::to_value(&collected)?; Ok(jsval) } diff --git a/streams/src/transform.rs b/streams/src/transform.rs index c3c7880..311a749 100644 --- a/streams/src/transform.rs +++ b/streams/src/transform.rs @@ -139,6 +139,8 @@ impl WithTransformProperties for TimeBinnableToCollectable { impl CollectableStreamTrait for TimeBinnableToCollectable {} +impl CollectableStreamTrait for Pin> {} + pub fn build_time_binning_transform( tr: &TransformQuery, inp: Pin>,