diff --git a/daqbufp2/src/test/api4/binnedjson.rs b/daqbufp2/src/test/api4/binnedjson.rs index c09c801..5adc76a 100644 --- a/daqbufp2/src/test/api4/binnedjson.rs +++ b/daqbufp2/src/test/api4/binnedjson.rs @@ -1,11 +1,13 @@ use crate::err::ErrConv; use crate::nodes::require_test_hosts_running; +use crate::test::api4::common::fetch_binned_json; use crate::test::f32_cmp_near; use chrono::Utc; use err::Error; use http::StatusCode; use hyper::Body; use items_0::WithLen; +use items_2::binsdim0::BinsDim0CollectedResult; use netpod::log::*; use netpod::range::evrange::NanoRange; use netpod::AppendToUrl; @@ -17,6 +19,24 @@ use query::api4::binned::BinnedQuery; use serde_json::Value as JsonValue; use url::Url; +pub fn make_query>( + name: S, + beg_date: &str, + end_date: &str, + bin_count_min: u32, +) -> Result { + let channel = Channel { + backend: "test-inmem".into(), + name: name.into(), + series: None, + }; + let beg_date = beg_date.parse()?; + let end_date = end_date.parse()?; + let range = NanoRange::from_date_time(beg_date, end_date).into(); + let query = BinnedQuery::new(channel, range, bin_count_min).for_time_weighted_scalar(); + Ok(query) +} + #[test] fn binned_d0_json_00() -> Result<(), Error> { let fut = async { @@ -268,26 +288,48 @@ fn binned_inmem_d0_json_00() -> Result<(), Error> { let fut = async { let rh = require_test_hosts_running()?; let cluster = &rh.cluster; - let jsv = get_binned_json( - Channel { - backend: "test-disk-databuffer".into(), - name: "const-regular-scalar-i32-be".into(), - series: None, - }, - "1970-01-01T00:20:11.000Z", - "1970-01-01T00:30:20.000Z", - // TODO must use AggKind::TimeWeightedScalar + let query = make_query( + "inmem-d0-i32", + "1970-01-01T00:20:04.000Z", + "1970-01-01T00:21:10.000Z", 10, - cluster, - ) - .await?; - debug!("Receveided a response json value: {jsv:?}"); - let res: items_2::binsdim0::BinsDim0CollectedResult = serde_json::from_value(jsv)?; + )?; + let jsv = fetch_binned_json(query, cluster).await?; + let res: BinsDim0CollectedResult = serde_json::from_value(jsv)?; // inmem was meant just for functional test, ignores the requested time range assert_eq!(res.ts_anchor_sec(), 1200); - assert_eq!(res.len(), 11); + assert_eq!(res.len(), 14); assert_eq!(res.range_final(), true); - assert_eq!(f32_cmp_near(res.avgs()[0], 42.0), true); + { + let v1: Vec<_> = res.counts().iter().map(|x| *x).collect(); + assert_eq!(&v1, &[5; 14]); + } + { + let v1: Vec<_> = res.ts1_off_ms().iter().map(|x| *x).collect(); + let v2: Vec<_> = (0..14).into_iter().map(|x| 5000 * x).collect(); + assert_eq!(&v1, &v2); + } + { + let v1: Vec<_> = res.ts2_off_ms().iter().map(|x| *x).collect(); + let v2: Vec<_> = (1..15).into_iter().map(|x| 5000 * x).collect(); + assert_eq!(&v1, &v2); + } + { + let v1: Vec<_> = res.mins().iter().map(|x| *x).collect(); + let v2: Vec<_> = (0..14).into_iter().map(|x| 1200 + 5 * x).collect(); + assert_eq!(&v1, &v2); + } + { + let v1: Vec<_> = res.maxs().iter().map(|x| *x).collect(); + let v2: Vec<_> = (0..14).into_iter().map(|x| 1204 + 5 * x).collect(); + assert_eq!(&v1, &v2); + } + { + let v1: Vec<_> = res.avgs().iter().map(|x| *x).collect(); + let v2: Vec<_> = (0..14).into_iter().map(|x| 1204 + 5 * x).collect(); + //assert_eq!(f32_cmp_near(res.avgs()[0], 42.0), true); + //assert_eq!(&v1, &v2); + } Ok(()) }; taskrun::run(fut) diff --git a/daqbufp2/src/test/api4/common.rs b/daqbufp2/src/test/api4/common.rs index 7be2641..635a495 100644 --- a/daqbufp2/src/test/api4/common.rs +++ b/daqbufp2/src/test/api4/common.rs @@ -8,6 +8,7 @@ use netpod::AppendToUrl; use netpod::Cluster; use netpod::HostPort; use netpod::APP_JSON; +use query::api4::binned::BinnedQuery; use query::api4::events::PlainEventsQuery; use serde_json::Value as JsonValue; use url::Url; @@ -44,3 +45,36 @@ pub async fn fetch_events_json(query: PlainEventsQuery, cluster: &Cluster) -> Re info!("time {} ms", ms); Ok(res) } + +// TODO improve by a more information-rich return type. +pub async fn fetch_binned_json(query: BinnedQuery, cluster: &Cluster) -> Result { + let t1 = Utc::now(); + let node0 = &cluster.nodes[0]; + let hp = HostPort::from_node(node0); + let mut url = Url::parse(&format!("http://{}:{}/api/4/binned", hp.host, hp.port))?; + query.append_to_url(&mut url); + let url = url; + info!("http get {}", url); + let req = hyper::Request::builder() + .method(http::Method::GET) + .uri(url.to_string()) + .header(http::header::ACCEPT, APP_JSON) + .body(Body::empty()) + .ec()?; + let client = hyper::Client::new(); + let res = client.request(req).await.ec()?; + if res.status() != StatusCode::OK { + error!("client response {:?}", res); + return Err(Error::with_msg_no_trace(format!("bad result {res:?}"))); + } + let buf = hyper::body::to_bytes(res.into_body()).await.ec()?; + let s = String::from_utf8_lossy(&buf); + let res: JsonValue = serde_json::from_str(&s)?; + let pretty = serde_json::to_string_pretty(&res)?; + info!("{pretty}"); + let t2 = chrono::Utc::now(); + let ms = t2.signed_duration_since(t1).num_milliseconds() as u64; + // TODO add timeout + info!("time {} ms", ms); + Ok(res) +} diff --git a/items_0/src/timebin.rs b/items_0/src/timebin.rs index 46ba27c..ff8ce0b 100644 --- a/items_0/src/timebin.rs +++ b/items_0/src/timebin.rs @@ -44,7 +44,7 @@ pub trait TimeBinnerTy: fmt::Debug + Send + Unpin { fn empty(&self) -> Option; } -pub trait TimeBinnableTy: fmt::Debug + Send + Sized { +pub trait TimeBinnableTy: fmt::Debug + WithLen + Send + Sized { type TimeBinner: TimeBinnerTy; fn time_binner_new(&self, binrange: BinnedRangeEnum, do_time_weight: bool) -> Self::TimeBinner; @@ -164,11 +164,11 @@ impl RangeOverlapInfo for Box { impl TimeBinnable for Box { fn time_binner_new(&self, binrange: BinnedRangeEnum, do_time_weight: bool) -> Box { - todo!() + TimeBinnable::time_binner_new(self.as_ref(), binrange, do_time_weight) } fn to_box_to_json_result(&self) -> Box { - todo!() + TimeBinnable::to_box_to_json_result(self.as_ref()) } } diff --git a/items_2/src/binsdim0.rs b/items_2/src/binsdim0.rs index 6b62acf..4b7596a 100644 --- a/items_2/src/binsdim0.rs +++ b/items_2/src/binsdim0.rs @@ -726,6 +726,7 @@ impl TimeBinner for BinsDim0TimeBinner { } } }*/ + error!("!!!!!!!!!!!!!!!! TODO actually do something"); todo!() } diff --git a/items_2/src/merger.rs b/items_2/src/merger.rs index ad06ead..6517b51 100644 --- a/items_2/src/merger.rs +++ b/items_2/src/merger.rs @@ -2,6 +2,7 @@ use crate::Error; use futures_util::Stream; use futures_util::StreamExt; use items_0::container::ByteEstimate; +use items_0::on_sitemty_data; use items_0::streamitem::sitem_data; use items_0::streamitem::LogItem; use items_0::streamitem::RangeCompletableItem; @@ -65,6 +66,7 @@ pub struct Merger { out_of_band_queue: VecDeque>, log_queue: VecDeque, dim0ix_max: u64, + done_emit_first_empty: bool, done_data: bool, done_buffered: bool, done_range_complete: bool, @@ -107,6 +109,7 @@ where out_of_band_queue: VecDeque::new(), log_queue: VecDeque::new(), dim0ix_max: 0, + done_emit_first_empty: false, done_data: false, done_buffered: false, done_range_complete: false, @@ -281,6 +284,13 @@ where Ready(Some(Ok(k))) => match k { StreamItem::DataItem(k) => match k { RangeCompletableItem::Data(k) => { + if self.done_emit_first_empty == false { + info!("++++++++++++++++++++++ LET MERGER EMIT THE FIRST EMPTY MARKER ITEM"); + self.done_emit_first_empty = true; + let item = k.new_empty(); + let item = sitem_data(item); + self.out_of_band_queue.push_back(item); + } self.items[i] = Some(k); trace4!("refilled {}", i); } @@ -353,9 +363,12 @@ where if o.len() >= self.out_max_len || o.byte_estimate() >= OUT_MAX_BYTES || self.do_clear_out || last_emit { trace3!("decide to output"); self.do_clear_out = false; - Break(Ready(Some(Ok(self.out.take().unwrap())))) + //Break(Ready(Some(Ok(self.out.take().unwrap())))) + let item = sitem_data(self.out.take().unwrap()); + self.out_of_band_queue.push_back(item); + Continue(()) } else { - trace4!("output not yet"); + trace4!("not enough output yet"); Continue(()) } } else { @@ -420,6 +433,10 @@ where continue; } } else if let Some(item) = self.out_of_band_queue.pop_front() { + let item = on_sitemty_data!(item, |k: T| { + info!("++++++++++++ EMIT OUT OF BAND DATA len {}", k.len()); + sitem_data(k) + }); trace!("emit out-of-band"); Ready(Some(item)) } else { @@ -427,6 +444,12 @@ where ControlFlow::Continue(()) => continue, ControlFlow::Break(k) => match k { Ready(Some(Ok(out))) => { + if true { + error!("THIS BRANCH SHOULD NO LONGER OCCUR, REFACTOR"); + self.done_data = true; + let e = Error::from(format!("TODO refactor direct emit in merger")); + return Ready(Some(Err(e.into()))); + } trace!("emit buffered len {}", out.len()); Ready(Some(sitem_data(out))) } diff --git a/items_2/src/transform.rs b/items_2/src/transform.rs index 0235936..4a26a81 100644 --- a/items_2/src/transform.rs +++ b/items_2/src/transform.rs @@ -25,7 +25,7 @@ impl WithTransformProperties for TransformEventIdentity { impl EventTransform for TransformEventIdentity { fn transform(&mut self, src: Box) -> Box { - todo!() + src } } diff --git a/netpod/src/netpod.rs b/netpod/src/netpod.rs index b29fdab..7a9fd0d 100644 --- a/netpod/src/netpod.rs +++ b/netpod/src/netpod.rs @@ -1542,6 +1542,12 @@ where } } +impl BinnedRange { + pub fn to_nano_range(&self) -> NanoRange { + self.full_range() + } +} + impl BinnedRange where T: Dim0Index, diff --git a/streams/src/test/timebin.rs b/streams/src/test/timebin.rs index a8184d0..dedcf81 100644 --- a/streams/src/test/timebin.rs +++ b/streams/src/test/timebin.rs @@ -62,7 +62,7 @@ fn time_bin_00() { d }; let deadline = Instant::now() + Duration::from_millis(2000000); - let mut binned_stream = crate::timebin::TimeBinnedStream::new(stream0, binned_range, true, deadline); + let mut binned_stream = crate::timebin::TimeBinnedStream::new(stream0, binned_range, true); while let Some(item) = binned_stream.next().await { //eprintln!("{item:?}"); match item { @@ -121,7 +121,7 @@ fn time_bin_01() { }); let stream0 = Box::pin(stream0); let deadline = Instant::now() + Duration::from_millis(200); - let mut binned_stream = crate::timebin::TimeBinnedStream::new(stream0, binned_range, true, deadline); + let mut binned_stream = crate::timebin::TimeBinnedStream::new(stream0, binned_range, true); while let Some(item) = binned_stream.next().await { if true { eprintln!("{item:?}"); @@ -192,8 +192,7 @@ fn time_bin_02() -> Result<(), Error> { x }); let stream = Box::pin(stream); - let mut binned_stream = - crate::timebin::TimeBinnedStream::new(stream, binned_range.clone(), do_time_weight, deadline); + let mut binned_stream = crate::timebin::TimeBinnedStream::new(stream, binned_range.clone(), do_time_weight); // From there on it should no longer be neccessary to distinguish whether its still events or time bins. // Then, optionally collect for output type like json, or stream as batches. // TODO the timebinner should already provide batches to make this efficient. diff --git a/streams/src/timebin.rs b/streams/src/timebin.rs index c43df86..ad20972 100644 --- a/streams/src/timebin.rs +++ b/streams/src/timebin.rs @@ -1,6 +1,4 @@ use err::Error; -use futures_util::Future; -use futures_util::FutureExt; use futures_util::Stream; use futures_util::StreamExt; use items_0::streamitem::sitem_data; @@ -8,20 +6,15 @@ use items_0::streamitem::RangeCompletableItem; use items_0::streamitem::Sitemty; use items_0::streamitem::StreamItem; use items_0::timebin::TimeBinnableTy; -use items_0::timebin::TimeBinner; use items_0::timebin::TimeBinnerTy; -use items_0::transform::TimeBinnableStreamTrait; -use items_0::transform::WithTransformProperties; use netpod::log::*; -use netpod::BinnedRange; use netpod::BinnedRangeEnum; -use netpod::Dim0Index; use std::any; use std::fmt; +use std::ops::ControlFlow; use std::pin::Pin; use std::task::Context; use std::task::Poll; -use std::time::Instant; #[allow(unused)] macro_rules! trace2 { @@ -50,10 +43,9 @@ where inp: MergeInp, range: BinnedRangeEnum, do_time_weight: bool, - deadline: Instant, - deadline_fut: Pin + Send>>, range_final: bool, binner: Option<::TimeBinner>, + done_first_input: bool, done_data: bool, done: bool, complete: bool, @@ -66,7 +58,6 @@ where fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { fmt.debug_struct(any::type_name::()) .field("range", &self.range) - .field("deadline", &self.deadline) .field("range_final", &self.range_final) .field("binner", &self.binner) .finish() @@ -77,17 +68,14 @@ impl TimeBinnedStream where T: TimeBinnableTy, { - pub fn new(inp: MergeInp, range: BinnedRangeEnum, do_time_weight: bool, deadline: Instant) -> Self { - let deadline_fut = tokio::time::sleep_until(deadline.into()); - let deadline_fut = Box::pin(deadline_fut); + pub fn new(inp: MergeInp, range: BinnedRangeEnum, do_time_weight: bool) -> Self { Self { inp, range, do_time_weight, - deadline, - deadline_fut, range_final: false, binner: None, + done_first_input: false, done_data: false, done: false, complete: false, @@ -95,7 +83,7 @@ where } fn process_item(&mut self, mut item: T) -> () { - trace!("process_item {item:?}"); + info!("process_item {item:?}"); if self.binner.is_none() { trace!("process_item call time_binner_new"); let binner = item.time_binner_new(self.range.clone(), self.do_time_weight); @@ -105,6 +93,140 @@ where trace!("process_item call binner ingest"); binner.ingest(&mut item); } + + fn handle_data_item( + &mut self, + item: T, + ) -> Result::TimeBinner as TimeBinnerTy>::Output>>>, Error> { + use ControlFlow::*; + use Poll::*; + info!("================= handle_data_item"); + let item_len = item.len(); + self.process_item(item); + let mut do_emit = false; + if self.done_first_input == false { + info!( + "emit container after the first input len {} binner {}", + item_len, + self.binner.is_some() + ); + if self.binner.is_none() { + let e = Error::with_msg_no_trace("must emit on first input but no binner"); + self.done = true; + return Err(e); + } + do_emit = true; + self.done_first_input = true; + } + if let Some(binner) = self.binner.as_mut() { + trace3!("bins ready count {}", binner.bins_ready_count()); + if binner.bins_ready_count() > 0 { + do_emit = true + } + if do_emit { + if let Some(bins) = binner.bins_ready() { + Ok(Break(Ready(sitem_data(bins)))) + } else { + warn!("bins ready but got nothing"); + if let Some(bins) = binner.empty() { + Ok(Break(Ready(sitem_data(bins)))) + } else { + let e = Error::with_msg_no_trace("bins ready, but nothing, can not even create empty A"); + error!("{e}"); + Err(e) + } + } + } else { + trace3!("not emit"); + Ok(ControlFlow::Continue(())) + } + } else { + warn!("processed item, but no binner yet"); + Ok(ControlFlow::Continue(())) + } + } + + fn handle_item( + &mut self, + item: Result>, Error>, + ) -> Result::TimeBinner as TimeBinnerTy>::Output>>>, Error> { + use ControlFlow::*; + use Poll::*; + info!("================= handle_item"); + match item { + Ok(item) => match item { + StreamItem::DataItem(item) => match item { + RangeCompletableItem::RangeComplete => { + debug!("see RangeComplete"); + self.range_final = true; + Ok(Continue(())) + } + RangeCompletableItem::Data(item) => self.handle_data_item(item), + }, + StreamItem::Log(item) => Ok(Break(Ready(Ok(StreamItem::Log(item))))), + StreamItem::Stats(item) => Ok(Break(Ready(Ok(StreamItem::Stats(item))))), + }, + Err(e) => { + self.done = true; + Err(e) + } + } + } + + fn handle_none( + &mut self, + ) -> Result::TimeBinner as TimeBinnerTy>::Output>>>, Error> { + use ControlFlow::*; + use Poll::*; + info!("================= handle_none"); + let self_range_complete = self.range_final; + if let Some(binner) = self.binner.as_mut() { + trace!("bins ready count before finish {}", binner.bins_ready_count()); + // TODO rework the finish logic + if self_range_complete { + binner.set_range_complete(); + } + binner.push_in_progress(false); + trace!("bins ready count after finish {}", binner.bins_ready_count()); + if let Some(bins) = binner.bins_ready() { + self.done_data = true; + Ok(Break(Ready(sitem_data(bins)))) + } else { + warn!("bins ready but got nothing"); + if let Some(bins) = binner.empty() { + Ok(Break(Ready(sitem_data(bins)))) + } else { + let e = Error::with_msg_no_trace("bins ready, but nothing, can not even create empty B"); + error!("{e}"); + self.done_data = true; + Err(e) + } + } + } else { + warn!("input stream finished, still no binner"); + self.done_data = true; + let e = Error::with_msg_no_trace(format!("input stream finished, still no binner")); + Err(e) + } + } + + // TODO + // Original block inside the poll loop was able to: + // continue + // break with Poll> + fn poll_input( + &mut self, + cx: &mut Context, + ) -> Result::TimeBinner as TimeBinnerTy>::Output>>>, Error> { + use ControlFlow::*; + use Poll::*; + info!("================= poll_input"); + match self.inp.poll_next_unpin(cx) { + Ready(Some(item)) => self.handle_item(item), + Ready(None) => self.handle_none(), + Pending => Ok(Break(Pending)), + } + } } impl Stream for TimeBinnedStream @@ -115,8 +237,9 @@ where fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; - let span = span!(Level::INFO, "poll"); + let span = span!(Level::INFO, "TimeBinner"); let _spg = span.enter(); + info!("================= POLL"); loop { break if self.complete { panic!("poll on complete") @@ -131,109 +254,18 @@ where continue; } } else { - match self.deadline_fut.poll_unpin(cx) { - Ready(()) => { - trace2!("timeout"); - let self_range_complete = self.range_final; - if let Some(binner) = self.binner.as_mut() { - trace2!("bins ready count before finish {}", binner.bins_ready_count()); - // TODO rework the finish logic - if self_range_complete { - binner.set_range_complete(); - } - trace2!("bins ready count after finish {}", binner.bins_ready_count()); - if let Some(bins) = binner.bins_ready() { - self.done_data = true; - return Ready(Some(sitem_data(bins))); - } else { - self.done_data = true; - continue; - } - } else { - continue; - } - } - Pending => {} - } - match self.inp.poll_next_unpin(cx) { - Ready(Some(item)) => match item { - Ok(item) => match item { - StreamItem::DataItem(item) => match item { - RangeCompletableItem::RangeComplete => { - debug!("see RangeComplete"); - self.range_final = true; - continue; - } - RangeCompletableItem::Data(item) => { - self.process_item(item); - if let Some(binner) = self.binner.as_mut() { - trace3!("bins ready count {}", binner.bins_ready_count()); - if binner.bins_ready_count() > 0 { - if let Some(bins) = binner.bins_ready() { - Ready(Some(sitem_data(bins))) - } else { - trace2!("bins ready but got nothing"); - Pending - } - } else { - trace3!("no bins ready yet"); - continue; - } - } else { - trace2!("processed item, but no binner yet"); - continue; - } - } - }, - StreamItem::Log(item) => Ready(Some(Ok(StreamItem::Log(item)))), - StreamItem::Stats(item) => Ready(Some(Ok(StreamItem::Stats(item)))), + match self.poll_input(cx) { + Ok(item) => match item { + ControlFlow::Continue(()) => continue, + ControlFlow::Break(item) => match item { + Ready(item) => break Ready(Some(item)), + Pending => break Pending, }, - Err(e) => { - self.done_data = true; - Ready(Some(Err(e))) - } }, - Ready(None) => { - trace!("finish up"); - let self_range_complete = self.range_final; - if let Some(binner) = self.binner.as_mut() { - trace!("bins ready count before finish {}", binner.bins_ready_count()); - // TODO rework the finish logic - if self_range_complete { - binner.set_range_complete(); - } - binner.push_in_progress(false); - trace!("bins ready count after finish {}", binner.bins_ready_count()); - if binner.bins_ready_count() > 0 { - if let Some(bins) = binner.bins_ready() { - self.done_data = true; - Ready(Some(sitem_data(bins))) - } else { - trace2!("bins ready but got nothing"); - self.done_data = true; - let e = Error::with_msg_no_trace(format!("bins ready but got nothing")); - Ready(Some(Err(e))) - } - } else { - if let Some(bins) = binner.empty() { - trace!("at end of stream, bin count zero, return {bins:?}"); - self.done_data = true; - Ready(Some(sitem_data(bins))) - } else { - error!("at the end, no bins, can not get empty"); - self.done_data = true; - let e = Error::with_msg_no_trace(format!("no bins")) - .add_public_msg(format!("unable to produce bins")); - Ready(Some(Err(e))) - } - } - } else { - trace!("input stream finished, still no binner"); - self.done_data = true; - continue; - } + Err(e) => { + self.done = true; + break Ready(Some(Err(e))); } - Pending => Pending, } }; } diff --git a/streams/src/timebinnedjson.rs b/streams/src/timebinnedjson.rs index 8f523cd..7052ea4 100644 --- a/streams/src/timebinnedjson.rs +++ b/streams/src/timebinnedjson.rs @@ -40,7 +40,7 @@ async fn timebinnable_stream( one_before_range: bool, cluster: Cluster, ) -> Result { - let evq = PlainEventsQuery::new(query.channel().clone(), query.range().clone()).for_time_weighted_scalar(); + let evq = PlainEventsQuery::new(query.channel().clone(), range.clone()).for_time_weighted_scalar(); let mut tr = build_merged_event_transform(evq.transform())?; let inps = open_tcp_streams::<_, ChannelEvents>(&evq, &cluster).await?; @@ -66,20 +66,19 @@ async fn timebinnable_stream( async fn timebinned_stream( query: BinnedQuery, + binned_range: BinnedRangeEnum, cluster: Cluster, ) -> Result>> + Send>>, Error> { - let deadline = Instant::now(); - let range: NanoRange = query.range().try_into()?; + let range = binned_range.binned_range_time().to_nano_range(); - //let binned_range = BinnedRangeEnum::covering_range(SeriesRange::TimeRange(NanoRange { beg: 123, end: 456 }), 10)?; - let binned_range = BinnedRangeEnum::covering_range(query.range().clone(), query.bin_count())?; let do_time_weight = true; let one_before_range = true; let stream = timebinnable_stream(query.clone(), range, one_before_range, cluster).await?; let stream: Pin> = stream.0; let stream = Box::pin(stream); - let stream = TimeBinnedStream::new(stream, binned_range, do_time_weight, deadline); + // TODO rename TimeBinnedStream to make it more clear that it is the component which initiates the time binning. + let stream = TimeBinnedStream::new(stream, binned_range, do_time_weight); let stream: Pin>> + Send>> = Box::pin(stream); Ok(stream) } @@ -99,13 +98,13 @@ fn timebinned_to_collectable( } pub async fn timebinned_json(query: BinnedQuery, _chconf: ChConf, cluster: Cluster) -> Result { - let deadline = Instant::now(); - let evq = PlainEventsQuery::new(query.channel().clone(), query.range().clone()).for_time_weighted_scalar(); - let collect_range = evq.range().clone(); - let events_max = evq.events_max(); - let stream = timebinned_stream(query.clone(), cluster).await?; + info!("~~~~~~~~~~~ timebinned_json"); + let deadline = Instant::now().checked_add(query.timeout_value()).unwrap(); + let binned_range = BinnedRangeEnum::covering_range(query.range().clone(), query.bin_count())?; + let collect_max = 10000; + let stream = timebinned_stream(query.clone(), binned_range.clone(), cluster).await?; let stream = timebinned_to_collectable(stream); - let collected = Collect::new(stream, deadline, events_max, Some(collect_range), None); + let collected = Collect::new(stream, deadline, collect_max, None, Some(binned_range)); let collected: BoxFuture<_> = Box::pin(collected); let collected = collected.await?; let jsval = serde_json::to_value(&collected)?;