diff --git a/httpret/src/api4/binned.rs b/httpret/src/api4/binned.rs index c293dbd..5c60fe9 100644 --- a/httpret/src/api4/binned.rs +++ b/httpret/src/api4/binned.rs @@ -45,7 +45,7 @@ async fn binned_json(url: Url, req: Request, node_config: &NodeConfigCache span1.in_scope(|| { debug!("begin"); }); - let item = streams::timebinnedjson::timebinned_json(&query, &chconf, &node_config.node_config.cluster) + let item = streams::timebinnedjson::timebinned_json(query, chconf, node_config.node_config.cluster.clone()) .instrument(span1) .await?; let buf = serde_json::to_vec(&item)?; diff --git a/items_0/src/timebin.rs b/items_0/src/timebin.rs index e22778e..46ba27c 100644 --- a/items_0/src/timebin.rs +++ b/items_0/src/timebin.rs @@ -20,7 +20,7 @@ pub trait TimeBins { fn ts_min_max(&self) -> Option<(u64, u64)>; } -pub trait TimeBinnerTy: fmt::Debug + Unpin { +pub trait TimeBinnerTy: fmt::Debug + Send + Unpin { type Input: fmt::Debug; type Output: fmt::Debug; @@ -44,7 +44,7 @@ pub trait TimeBinnerTy: fmt::Debug + Unpin { fn empty(&self) -> Option; } -pub trait TimeBinnableTy: fmt::Debug + Sized { +pub trait TimeBinnableTy: fmt::Debug + Send + Sized { type TimeBinner: TimeBinnerTy; fn time_binner_new(&self, binrange: BinnedRangeEnum, do_time_weight: bool) -> Self::TimeBinner; diff --git a/items_0/src/transform.rs b/items_0/src/transform.rs index 428f317..4127e94 100644 --- a/items_0/src/transform.rs +++ b/items_0/src/transform.rs @@ -157,6 +157,22 @@ impl TimeBinnableStreamTrait for TimeBinnableStreamBox {} pub struct CollectableStreamBox(pub Pin>); +impl Stream for CollectableStreamBox { + type Item = Sitemty>; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + self.0.poll_next_unpin(cx) + } +} + +impl WithTransformProperties for CollectableStreamBox { + fn query_transform_properties(&self) -> TransformProperties { + todo!() + } +} + +impl CollectableStreamTrait for CollectableStreamBox {} + impl WithTransformProperties for stream::Empty { fn query_transform_properties(&self) -> TransformProperties { todo!() @@ -169,3 +185,5 @@ where stream::Empty: Stream>>, { } + +impl CollectableStreamTrait for Pin> where T: CollectableStreamTrait {} diff --git a/streams/src/collect.rs b/streams/src/collect.rs index 4f5c6e1..4313e58 100644 --- a/streams/src/collect.rs +++ b/streams/src/collect.rs @@ -45,7 +45,7 @@ macro_rules! trace4 { } pub struct Collect { - inp: CollectableStreamBox, + inp: Pin>> + Send>>, events_max: u64, range: Option, binrange: Option, @@ -57,19 +57,16 @@ pub struct Collect { } impl Collect { - pub fn new( - inp: INP, + pub fn new( + inp: Pin>> + Send>>, deadline: Instant, events_max: u64, range: Option, binrange: Option, - ) -> Self - where - INP: CollectableStreamTrait + 'static, - { + ) -> Self { let timer = tokio::time::sleep_until(deadline.into()); Self { - inp: CollectableStreamBox(Box::pin(inp)), + inp, events_max, range, binrange, @@ -181,7 +178,7 @@ impl Future for Collect { self.done_input = true; continue; } - Pending => match self.inp.0.poll_next_unpin(cx) { + Pending => match self.inp.poll_next_unpin(cx) { Ready(Some(item)) => match self.handle_item(item) { Ok(()) => { continue; diff --git a/streams/src/plaineventsjson.rs b/streams/src/plaineventsjson.rs index 5891f7e..9598aa9 100644 --- a/streams/src/plaineventsjson.rs +++ b/streams/src/plaineventsjson.rs @@ -5,6 +5,7 @@ use crate::transform::EventsToTimeBinnable; use crate::transform::TimeBinnableToCollectable; use err::Error; use futures_util::StreamExt; +use items_0::collect_s::Collectable; use items_0::on_sitemty_data; use items_0::Events; use items_2::channelevents::ChannelEvents; @@ -48,12 +49,14 @@ pub async fn plain_events_json( let k: Box = Box::new(k); info!("-------------------------\ngot len {}", k.len()); let k = tr.0.transform(k); + let k: Box = Box::new(k); Ok(StreamItem::DataItem(RangeCompletableItem::Data(k))) }) }); - let stream = PlainEventStream::new(stream); - let stream = EventsToTimeBinnable::new(stream); - let stream = TimeBinnableToCollectable::new(stream); + //let stream = PlainEventStream::new(stream); + //let stream = EventsToTimeBinnable::new(stream); + //let stream = TimeBinnableToCollectable::new(stream); + let stream = Box::pin(stream); let collected = Collect::new(stream, deadline, evq.events_max(), Some(evq.range().clone()), None).await?; let jsval = serde_json::to_value(&collected)?; Ok(jsval) diff --git a/streams/src/test/collect.rs b/streams/src/test/collect.rs index b22fee6..5749d11 100644 --- a/streams/src/test/collect.rs +++ b/streams/src/test/collect.rs @@ -66,6 +66,7 @@ fn collect_channel_events_01() -> Result<(), Error> { let stream = PlainEventStream::new(stream); let stream = EventsToTimeBinnable::new(stream); let stream = TimeBinnableToCollectable::new(stream); + let stream = Box::pin(stream); let res = Collect::new(stream, deadline, events_max, None, None).await?; if let Some(res) = res.as_any_ref().downcast_ref::>() { eprintln!("Great, a match"); @@ -105,6 +106,7 @@ fn collect_channel_events_pulse_id_diff() -> Result<(), Error> { let stream = Box::pin(stream); let stream = build_time_binning_transform(&trqu, stream)?; let stream = TimeBinnableToCollectable::new(stream); + let stream = Box::pin(stream); let res = Collect::new(stream, deadline, events_max, None, None).await?; if let Some(res) = res.as_any_ref().downcast_ref::>() { eprintln!("Great, a match"); diff --git a/streams/src/timebinnedjson.rs b/streams/src/timebinnedjson.rs index 88a967e..8ac7e92 100644 --- a/streams/src/timebinnedjson.rs +++ b/streams/src/timebinnedjson.rs @@ -6,20 +6,28 @@ use crate::transform::build_merged_event_transform; use crate::transform::EventsToTimeBinnable; use crate::transform::TimeBinnableToCollectable; use err::Error; +use futures_util::future::BoxFuture; use futures_util::stream; +use futures_util::stream::BoxStream; +use futures_util::Stream; use futures_util::StreamExt; +use items_0::collect_s::Collectable; use items_0::on_sitemty_data; use items_0::streamitem::sitem_data; use items_0::streamitem::Sitemty; use items_0::timebin::TimeBinnable; use items_0::timebin::TimeBinned; +use items_0::transform::CollectableStreamBox; use items_0::transform::TimeBinnableStreamBox; +use items_0::transform::TimeBinnableStreamTrait; use items_0::Events; use items_2::channelevents::ChannelEvents; use items_2::merger::Merger; use items_2::streams::PlainEventStream; use items_2::streams::PlainTimeBinnableStream; use netpod::log::*; +use netpod::range::evrange::NanoRange; +use netpod::range::evrange::SeriesRange; use netpod::BinnedRange; use netpod::BinnedRangeEnum; use netpod::ChConf; @@ -27,28 +35,28 @@ use netpod::Cluster; use query::api4::binned::BinnedQuery; use query::api4::events::PlainEventsQuery; use serde_json::Value as JsonValue; +use std::pin::Pin; use std::time::Instant; -pub async fn timebinned_json(query: &BinnedQuery, chconf: &ChConf, cluster: &Cluster) -> Result { - let binned_range = BinnedRangeEnum::covering_range(query.range().clone(), query.bin_count())?; - let bins_max = 10000; - warn!("TODO add with_deadline to PlainEventsQuery"); - let deadline = Instant::now() + query.timeout_value(); - // TODO construct the events query in a better way. +fn assert_stream_send<'u, R>(stream: impl 'u + Send + Stream) -> impl 'u + Send + Stream { + stream +} + +async fn timebinnable_stream( + query: BinnedQuery, + chconf: ChConf, + range: NanoRange, + one_before_range: bool, + cluster: Cluster, +) -> Result { let evq = PlainEventsQuery::new(query.channel().clone(), query.range().clone()).for_time_weighted_scalar(); let mut tr = build_merged_event_transform(evq.transform())?; - let inps = open_tcp_streams::<_, ChannelEvents>(&evq, cluster).await?; + + let inps = open_tcp_streams::<_, ChannelEvents>(&evq, &cluster).await?; // TODO propagate also the max-buf-len for the first stage event reader. // TODO use a mixture of count and byte-size as threshold. let stream = Merger::new(inps, evq.merger_out_len_max()); - // TODO - let do_time_weight = true; - let one_before_range = true; - - // TODO RangeFilter2 must accept SeriesRange - let range = query.range().try_into()?; - let stream = RangeFilter2::new(stream, range, one_before_range); let stream = stream.map(move |k| { @@ -62,10 +70,119 @@ pub async fn timebinned_json(query: &BinnedQuery, chconf: &ChConf, cluster: &Clu let stream = PlainEventStream::new(stream); let stream = EventsToTimeBinnable::new(stream); let stream = Box::pin(stream); + Ok(TimeBinnableStreamBox(stream)) +} + +async fn timebinned_stream( + query: BinnedQuery, + chconf: ChConf, + cluster: Cluster, +) -> Result>> + Send>>, Error> { + let deadline = Instant::now(); + let range: NanoRange = query.range().try_into()?; + + //let binned_range = BinnedRangeEnum::covering_range(SeriesRange::TimeRange(NanoRange { beg: 123, end: 456 }), 10)?; + let binned_range = BinnedRangeEnum::covering_range(query.range().clone(), query.bin_count())?; + let do_time_weight = true; + let one_before_range = true; + + let stream = timebinnable_stream(query.clone(), chconf, range, one_before_range, cluster).await?; + let stream: Pin> = stream.0; + let stream = Box::pin(stream); + let stream = TimeBinnedStream::new(stream, binned_range, do_time_weight, deadline); + let stream: Pin>> + Send>> = Box::pin(stream); + Ok(stream) +} + +fn timebinned_to_collectable( + stream: Pin>> + Send>>, +) -> Pin>> + Send>> { + let stream = stream.map(|k| { + on_sitemty_data!(k, |k| { + let k: Box = Box::new(k); + info!("-------------------------\ngot len {}", k.len()); + Ok(StreamItem::DataItem(RangeCompletableItem::Data(k))) + }) + }); + let stream: Pin>> + Send>> = Box::pin(stream); + stream +} + +pub async fn timebinned_json(query: BinnedQuery, chconf: ChConf, cluster: Cluster) -> Result { + let stream = timebinned_stream(query.clone(), chconf.clone(), cluster).await?; + let deadline = Instant::now(); + let evq = PlainEventsQuery::new(query.channel().clone(), query.range().clone()).for_time_weighted_scalar(); + let collect_range = evq.range().clone(); + let events_max = evq.events_max(); + let range: NanoRange = query.range().try_into()?; + + let binned_range = BinnedRangeEnum::covering_range(SeriesRange::TimeRange(NanoRange { beg: 123, end: 456 }), 10)?; + let do_time_weight = true; + let one_before_range = true; + + let stream = timebinned_to_collectable(stream); + + /* + let stream = stream.map(|k| { + on_sitemty_data!(k, |k| { + let k: Box = Box::new(k); + info!("-------------------------\ngot len {}", k.len()); + Ok(StreamItem::DataItem(RangeCompletableItem::Data(k))) + }) + }); + let stream: Pin>> + Send>> = Box::pin(stream); + */ + + let collected = Collect::new(stream, deadline, events_max, Some(collect_range), None); + let collected: BoxFuture<_> = Box::pin(collected); + let collected = collected.await?; + let jsval = serde_json::to_value(&collected)?; + Ok(jsval) +} + +pub async fn timebinned_json_2(query: BinnedQuery, chconf: ChConf, cluster: Cluster) -> Result { + warn!("TODO add with_deadline to PlainEventsQuery"); + let deadline = Instant::now() + query.timeout_value(); + // TODO RangeFilter2 must accept SeriesRange + let range: NanoRange = query.range().try_into()?; + // TODO + let do_time_weight = true; + let one_before_range = true; + // TODO construct the events query in a better way. + let evq = PlainEventsQuery::new(query.channel().clone(), query.range().clone()).for_time_weighted_scalar(); + let binned_range = BinnedRangeEnum::covering_range(query.range().clone(), query.bin_count())?; + let bins_max = 10000; + let events_max = evq.events_max(); + let collect_range = evq.range().clone(); + let mut tr = build_merged_event_transform(evq.transform())?; + + /* + let inps = open_tcp_streams::<_, ChannelEvents>(&evq, &cluster).await?; + // TODO propagate also the max-buf-len for the first stage event reader. + // TODO use a mixture of count and byte-size as threshold. + let stream = Merger::new(inps, evq.merger_out_len_max()); + + let stream = RangeFilter2::new(stream, range, one_before_range); + + let stream = stream.map(move |k| { + on_sitemty_data!(k, |k| { + let k: Box = Box::new(k); + info!("-------------------------\ngot len {}", k.len()); + let k = tr.0.transform(k); + Ok(StreamItem::DataItem(RangeCompletableItem::Data(k))) + }) + }); + let stream = PlainEventStream::new(stream); + let stream = EventsToTimeBinnable::new(stream); + let stream = Box::pin(stream); + */ + + let stream: Pin>> + Send>> = todo!(); // TODO TimeBinnedStream must accept types bin edges. // Maybe even take a BinnedRangeEnum? let stream = TimeBinnedStream::new(stream, binned_range, do_time_weight, deadline); + /* let stream = stream.map(|k| { on_sitemty_data!(k, |k| { let k: Box = Box::new(k); @@ -78,11 +195,17 @@ pub async fn timebinned_json(query: &BinnedQuery, chconf: &ChConf, cluster: &Clu let stream = TimeBinnableToCollectable::new(stream); let stream = Box::pin(stream); + */ // TODO collect should not have to accept two ranges, instead, generalize over it. //let collected = crate::collect::collect(stream, deadline, bins_max, None, Some(binned_range.clone())).await?; let stream = futures_util::stream::empty(); - let collected = Collect::new(stream, deadline, evq.events_max(), Some(evq.range().clone()), None).await?; + let stream = CollectableStreamBox(Box::pin(stream)); + /*let collected = Collect::new(stream, deadline, events_max, Some(collect_range), None); + let collected: BoxFuture<_> = Box::pin(collected); + let collected = collected.await?; + */ + let collected = ""; let jsval = serde_json::to_value(&collected)?; Ok(jsval) } diff --git a/streams/src/transform.rs b/streams/src/transform.rs index 311a749..1beaa1f 100644 --- a/streams/src/transform.rs +++ b/streams/src/transform.rs @@ -139,7 +139,7 @@ impl WithTransformProperties for TimeBinnableToCollectable { impl CollectableStreamTrait for TimeBinnableToCollectable {} -impl CollectableStreamTrait for Pin> {} +//impl CollectableStreamTrait for Pin> {} pub fn build_time_binning_transform( tr: &TransformQuery,