From 741c1380c714505cbbb4101052943bfc4c7e2cfe Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Mon, 9 Sep 2024 17:04:20 +0200 Subject: [PATCH] WIP --- Cargo.toml | 2 +- crates/disk/src/eventchunkermultifile.rs | 2 +- .../disk/src/merge/mergedblobsfromremotes.rs | 2 +- crates/httpret/src/api4/binned.rs | 24 +- crates/httpret/src/api4/events.rs | 11 +- crates/items_0/src/items_0.rs | 1 + crates/items_2/src/binsdim0.rs | 68 +++- crates/items_2/src/merger.rs | 4 +- crates/items_2/src/test.rs | 16 +- crates/netpod/src/netpod.rs | 4 + crates/query/src/api4/binned.rs | 6 +- crates/query/src/api4/events.rs | 21 +- crates/query/src/transform.rs | 4 + crates/scyllaconn/Cargo.toml | 1 + crates/scyllaconn/src/bincache.rs | 18 ++ crates/streams/src/plaineventsstream.rs | 4 +- crates/streams/src/tcprawclient.rs | 9 +- crates/streams/src/test.rs | 2 +- crates/streams/src/test/events.rs | 3 +- crates/streams/src/timebin.rs | 4 +- crates/streams/src/timebin/basic.rs | 1 + crates/streams/src/timebin/cached/reader.rs | 12 +- crates/streams/src/timebin/fromlayers.rs | 73 ++++- crates/streams/src/timebin/gapfill.rs | 292 +++++++++++++++++- crates/streams/src/timebinnedjson.rs | 139 ++++++++- 25 files changed, 638 insertions(+), 85 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 0f2e750..3f6d2e7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,7 +4,7 @@ resolver = "2" [profile.release] opt-level = 2 -debug = 0 +debug = 1 overflow-checks = false debug-assertions = false lto = "thin" diff --git a/crates/disk/src/eventchunkermultifile.rs b/crates/disk/src/eventchunkermultifile.rs index b9e6647..934a2a0 100644 --- a/crates/disk/src/eventchunkermultifile.rs +++ b/crates/disk/src/eventchunkermultifile.rs @@ -239,7 +239,7 @@ impl Stream for EventChunkerMultifile { chunkers.push(Box::pin(chunker) as _); } } - let merged = Merger::new(chunkers, self.out_max_len); + let merged = Merger::new(chunkers, Some(self.out_max_len as u32)); let filtered = RangeFilter2::new(merged, self.range.clone(), self.expand); self.evs = Some(Box::pin(filtered)); Ready(Some(Ok(StreamItem::Log(item)))) diff --git a/crates/disk/src/merge/mergedblobsfromremotes.rs b/crates/disk/src/merge/mergedblobsfromremotes.rs index 57d3495..f4c6c10 100644 --- a/crates/disk/src/merge/mergedblobsfromremotes.rs +++ b/crates/disk/src/merge/mergedblobsfromremotes.rs @@ -99,7 +99,7 @@ impl Stream for MergedBlobsFromRemotes { if c1 == self.tcp_establish_futs.len() { let inps = self.nodein.iter_mut().map(|k| k.take().unwrap()).collect(); // TODO set out_max_len dynamically - let s1 = Merger::new(inps, 128); + let s1 = Merger::new(inps, Some(128)); self.merged = Some(Box::pin(s1)); } continue 'outer; diff --git a/crates/httpret/src/api4/binned.rs b/crates/httpret/src/api4/binned.rs index 9600330..bfc5be5 100644 --- a/crates/httpret/src/api4/binned.rs +++ b/crates/httpret/src/api4/binned.rs @@ -23,6 +23,10 @@ use netpod::NodeConfigCached; use netpod::ReqCtx; use nodenet::client::OpenBoxedBytesViaHttp; use query::api4::binned::BinnedQuery; +use scyllaconn::bincache::ScyllaCacheReadProvider; +use scyllaconn::worker::ScyllaQueue; +use std::sync::Arc; +use streams::timebin::CacheReadProvider; use tracing::Instrument; use url::Url; @@ -71,7 +75,7 @@ impl BinnedHandler { if req.method() != Method::GET { return Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(body_empty())?); } - match binned(req, ctx, &shared_res.pgqueue, ncc).await { + match binned(req, ctx, &shared_res.pgqueue, shared_res.scyqueue.clone(), ncc).await { Ok(ret) => Ok(ret), Err(e) => match e { Error::ChannelNotFound => { @@ -91,7 +95,13 @@ impl BinnedHandler { } } -async fn binned(req: Requ, ctx: &ReqCtx, pgqueue: &PgQueue, ncc: &NodeConfigCached) -> Result { +async fn binned( + req: Requ, + ctx: &ReqCtx, + pgqueue: &PgQueue, + scyqueue: Option, + ncc: &NodeConfigCached, +) -> Result { let url = req_uri_to_url(req.uri()).map_err(|e| Error::BadQuery(e.to_string()))?; if req .uri() @@ -101,7 +111,7 @@ async fn binned(req: Requ, ctx: &ReqCtx, pgqueue: &PgQueue, ncc: &NodeConfigCach Err(Error::ServerError)?; } if accepts_json_or_all(&req.headers()) { - Ok(binned_json(url, req, ctx, pgqueue, ncc).await?) + Ok(binned_json(url, req, ctx, pgqueue, scyqueue, ncc).await?) } else if accepts_octets(&req.headers()) { Ok(error_response( format!("binary binned data not yet available"), @@ -118,6 +128,7 @@ async fn binned_json( req: Requ, ctx: &ReqCtx, pgqueue: &PgQueue, + scyqueue: Option, ncc: &NodeConfigCached, ) -> Result { debug!("{:?}", req); @@ -143,8 +154,11 @@ async fn binned_json( debug!("begin"); }); let open_bytes = OpenBoxedBytesViaHttp::new(ncc.node_config.cluster.clone()); - let open_bytes = Box::pin(open_bytes); - let item = streams::timebinnedjson::timebinned_json(query, ch_conf, ctx, open_bytes) + let open_bytes = Arc::pin(open_bytes); + let cache_read_provider = scyqueue + .map(|qu| ScyllaCacheReadProvider::new(qu)) + .map(|x| Arc::new(x) as Arc); + let item = streams::timebinnedjson::timebinned_json(query, ch_conf, ctx, open_bytes, cache_read_provider) .instrument(span1) .await .map_err(|e| Error::BinnedStream(e))?; diff --git a/crates/httpret/src/api4/events.rs b/crates/httpret/src/api4/events.rs index e74300f..20b4d94 100644 --- a/crates/httpret/src/api4/events.rs +++ b/crates/httpret/src/api4/events.rs @@ -36,6 +36,7 @@ use netpod::APP_JSON_FRAMED; use netpod::HEADER_NAME_REQUEST_ID; use nodenet::client::OpenBoxedBytesViaHttp; use query::api4::events::PlainEventsQuery; +use std::sync::Arc; use streams::instrument::InstrumentStream; use tracing::Instrument; @@ -174,7 +175,8 @@ async fn plain_events_cbor_framed( ) -> Result { debug!("plain_events_cbor_framed {ch_conf:?} {req:?}"); let open_bytes = OpenBoxedBytesViaHttp::new(ncc.node_config.cluster.clone()); - let stream = streams::plaineventscbor::plain_events_cbor_stream(&evq, ch_conf, ctx, Box::pin(open_bytes)).await?; + let open_bytes = Arc::pin(open_bytes); + let stream = streams::plaineventscbor::plain_events_cbor_stream(&evq, ch_conf, ctx, open_bytes).await?; let stream = bytes_chunks_to_framed(stream); let logspan = if evq.log_level() == "trace" { trace!("enable trace for handler"); @@ -202,7 +204,8 @@ async fn plain_events_json_framed( ) -> Result { debug!("plain_events_json_framed {ch_conf:?} {req:?}"); let open_bytes = OpenBoxedBytesViaHttp::new(ncc.node_config.cluster.clone()); - let stream = streams::plaineventsjson::plain_events_json_stream(&evq, ch_conf, ctx, Box::pin(open_bytes)).await?; + let open_bytes = Arc::pin(open_bytes); + let stream = streams::plaineventsjson::plain_events_json_stream(&evq, ch_conf, ctx, open_bytes).await?; let stream = bytes_chunks_to_len_framed_str(stream); let ret = response(StatusCode::OK) .header(CONTENT_TYPE, APP_JSON_FRAMED) @@ -224,9 +227,9 @@ async fn plain_events_json( // TODO handle None case better and return 404 debug!("{self_name} chconf_from_events_quorum: {ch_conf:?}"); let open_bytes = OpenBoxedBytesViaHttp::new(ncc.node_config.cluster.clone()); + let open_bytes = Arc::pin(open_bytes); let item = - streams::plaineventsjson::plain_events_json(&evq, ch_conf, ctx, &ncc.node_config.cluster, Box::pin(open_bytes)) - .await; + streams::plaineventsjson::plain_events_json(&evq, ch_conf, ctx, &ncc.node_config.cluster, open_bytes).await; debug!("{self_name} returned {}", item.is_ok()); let item = match item { Ok(item) => item, diff --git a/crates/items_0/src/items_0.rs b/crates/items_0/src/items_0.rs index 20aff13..a3e2e3e 100644 --- a/crates/items_0/src/items_0.rs +++ b/crates/items_0/src/items_0.rs @@ -57,6 +57,7 @@ pub trait AppendEmptyBin { fn append_empty_bin(&mut self, ts1: u64, ts2: u64); } +// TODO rename to make it clear that this moves. Better use drain-into or something similar. pub trait AppendAllFrom { fn append_all_from(&mut self, src: &mut Self); } diff --git a/crates/items_2/src/binsdim0.rs b/crates/items_2/src/binsdim0.rs index b6d6227..e09fe3e 100644 --- a/crates/items_2/src/binsdim0.rs +++ b/crates/items_2/src/binsdim0.rs @@ -150,6 +150,17 @@ impl BinsDim0 { } true } + + // TODO make this part of a new bins trait, similar like Events trait. + // TODO check for error? + pub fn drain_into(&mut self, dst: &mut Self, range: Range) -> () { + dst.ts1s.extend(self.ts1s.drain(range.clone())); + dst.ts2s.extend(self.ts2s.drain(range.clone())); + dst.counts.extend(self.counts.drain(range.clone())); + dst.mins.extend(self.mins.drain(range.clone())); + dst.maxs.extend(self.maxs.drain(range.clone())); + dst.avgs.extend(self.avgs.drain(range.clone())); + } } impl AsAnyRef for BinsDim0 @@ -301,8 +312,39 @@ impl TimeBinnableType for BinsDim0 { } #[derive(Debug)] -pub struct BinsDim0TimeBinnerTy { - _t1: std::marker::PhantomData, +pub struct BinsDim0TimeBinnerTy +where + STY: ScalarOps, +{ + ts1now: TsNano, + binrange: BinnedRange, + do_time_weight: bool, + emit_empty_bins: bool, + range_complete: bool, + buf: ::Output, + out: ::Output, + bins_ready_count: usize, +} + +impl BinsDim0TimeBinnerTy +where + STY: ScalarOps, +{ + pub fn new(binrange: BinnedRange, do_time_weight: bool, emit_empty_bins: bool) -> Self { + // let ts1now = TsNano::from_ns(binrange.bin_off * binrange.bin_len.ns()); + // let ts2 = ts1.add_dt_nano(binrange.bin_len.to_dt_nano()); + let buf = ::Output::empty(); + Self { + ts1now: TsNano::from_ns(binrange.full_range().beg()), + binrange, + do_time_weight, + emit_empty_bins, + range_complete: false, + buf, + out: ::Output::empty(), + bins_ready_count: 0, + } + } } impl TimeBinnerTy for BinsDim0TimeBinnerTy @@ -313,35 +355,36 @@ where type Output = BinsDim0; fn ingest(&mut self, item: &mut Self::Input) { - todo!() + // item.ts1s; + todo!("TimeBinnerTy::ingest") } fn set_range_complete(&mut self) { - todo!() + self.range_complete = true; } fn bins_ready_count(&self) -> usize { - todo!() + self.bins_ready_count } fn bins_ready(&mut self) -> Option { - todo!() + todo!("TimeBinnerTy::bins_ready") } fn push_in_progress(&mut self, push_empty: bool) { - todo!() + todo!("TimeBinnerTy::push_in_progress") } fn cycle(&mut self) { - todo!() + todo!("TimeBinnerTy::cycle") } fn empty(&self) -> Option { - todo!() + todo!("TimeBinnerTy::empty") } fn append_empty_until_end(&mut self) { - todo!() + todo!("TimeBinnerTy::append_empty_until_end") } } @@ -354,7 +397,10 @@ impl TimeBinnableTy for BinsDim0 { do_time_weight: bool, emit_empty_bins: bool, ) -> Self::TimeBinner { - todo!() + match binrange { + BinnedRangeEnum::Time(binrange) => BinsDim0TimeBinnerTy::new(binrange, do_time_weight, emit_empty_bins), + BinnedRangeEnum::Pulse(_) => todo!("TimeBinnableTy for BinsDim0 Pulse"), + } } } diff --git a/crates/items_2/src/merger.rs b/crates/items_2/src/merger.rs index ff21117..48f83a6 100644 --- a/crates/items_2/src/merger.rs +++ b/crates/items_2/src/merger.rs @@ -101,14 +101,14 @@ impl Merger where T: Mergeable, { - pub fn new(inps: Vec>, out_max_len: usize) -> Self { + pub fn new(inps: Vec>, out_max_len: Option) -> Self { let n = inps.len(); Self { inps: inps.into_iter().map(|x| Some(x)).collect(), items: (0..n).into_iter().map(|_| None).collect(), out: None, do_clear_out: false, - out_max_len, + out_max_len: out_max_len.unwrap_or(1000) as usize, range_complete: vec![false; n], out_of_band_queue: VecDeque::new(), log_queue: VecDeque::new(), diff --git a/crates/items_2/src/test.rs b/crates/items_2/src/test.rs index fcfd846..e1687c9 100644 --- a/crates/items_2/src/test.rs +++ b/crates/items_2/src/test.rs @@ -86,7 +86,7 @@ fn items_merge_00() { let v1 = ChannelEvents::Events(evs1); let stream0 = Box::pin(stream::iter(vec![sitem_data(v0)])); let stream1 = Box::pin(stream::iter(vec![sitem_data(v1)])); - let mut merger = Merger::new(vec![stream0, stream1], 8); + let mut merger = Merger::new(vec![stream0, stream1], Some(8)); while let Some(item) = merger.next().await { eprintln!("{item:?}"); } @@ -109,7 +109,7 @@ fn items_merge_01() { let stream0 = Box::pin(stream::iter(vec![sitem_data(v0)])); let stream1 = Box::pin(stream::iter(vec![sitem_data(v1)])); let stream2 = Box::pin(stream::iter(vec![sitem_data(v2), sitem_data(v3), sitem_data(v4)])); - let mut merger = Merger::new(vec![stream0, stream1, stream2], 8); + let mut merger = Merger::new(vec![stream0, stream1, stream2], Some(8)); let mut total_event_count = 0; while let Some(item) = merger.next().await { eprintln!("{item:?}"); @@ -144,7 +144,7 @@ fn items_merge_02() { let stream0 = Box::pin(stream::iter(vec![sitem_data(v0)])); let stream1 = Box::pin(stream::iter(vec![sitem_data(v1)])); let stream2 = Box::pin(stream::iter(vec![sitem_data(v2), sitem_data(v3), sitem_data(v4)])); - let mut merger = Merger::new(vec![stream0, stream1, stream2], 8); + let mut merger = Merger::new(vec![stream0, stream1, stream2], Some(8)); let mut total_event_count = 0; while let Some(item) = merger.next().await { eprintln!("{item:?}"); @@ -187,7 +187,7 @@ fn merge_00() { let inp2: Vec> = Vec::new(); let inp2 = futures_util::stream::iter(inp2); let inp2 = Box::pin(inp2); - let mut merger = crate::merger::Merger::new(vec![inp1, inp2], 32); + let mut merger = crate::merger::Merger::new(vec![inp1, inp2], Some(32)); // Expect an empty first item. let item = merger.next().await; @@ -230,7 +230,7 @@ fn merge_01() { let inp2: Vec> = Vec::new(); let inp2 = futures_util::stream::iter(inp2); let inp2 = Box::pin(inp2); - let mut merger = crate::merger::Merger::new(vec![inp1, inp2], 10); + let mut merger = crate::merger::Merger::new(vec![inp1, inp2], Some(10)); // Expect an empty first item. let item = merger.next().await; @@ -323,7 +323,7 @@ fn merge_02() { let inp2: Vec> = inp2_events_a; let inp2 = futures_util::stream::iter(inp2); let inp2 = Box::pin(inp2); - let mut merger = crate::merger::Merger::new(vec![inp1, inp2], 10); + let mut merger = crate::merger::Merger::new(vec![inp1, inp2], Some(10)); // Expect an empty first item. let item = merger.next().await; @@ -365,7 +365,7 @@ fn bin_00() { let inp1 = futures_util::stream::iter(inp1); let inp1 = Box::pin(inp1); let inp2 = Box::pin(futures_util::stream::empty()) as _; - let stream = crate::merger::Merger::new(vec![inp1, inp2], 32); + let stream = crate::merger::Merger::new(vec![inp1, inp2], Some(32)); let range = NanoRange { beg: SEC * 0, end: SEC * 100, @@ -413,7 +413,7 @@ fn bin_01() { let inp1 = futures_util::stream::iter(inp1); let inp1 = Box::pin(inp1); let inp2 = Box::pin(futures_util::stream::empty()) as _; - let stream = crate::merger::Merger::new(vec![inp1, inp2], 32); + let stream = crate::merger::Merger::new(vec![inp1, inp2], Some(32)); // covering_range result is subject to adjustments, instead, manually choose bin edges let range = NanoRange { beg: TSBASE + SEC * 1, diff --git a/crates/netpod/src/netpod.rs b/crates/netpod/src/netpod.rs index 81a9b2a..43d1084 100644 --- a/crates/netpod/src/netpod.rs +++ b/crates/netpod/src/netpod.rs @@ -1877,6 +1877,10 @@ impl TsNano { TsMs::from_ms_u64(self.ms()) } + pub const fn to_dt_nano(self) -> DtNano { + DtNano::from_ns(self.0) + } + pub const fn to_dt_ms(self) -> DtMs { DtMs::from_ms_u64(self.ms()) } diff --git a/crates/query/src/api4/binned.rs b/crates/query/src/api4/binned.rs index 061a940..5d04955 100644 --- a/crates/query/src/api4/binned.rs +++ b/crates/query/src/api4/binned.rs @@ -83,7 +83,7 @@ pub struct BinnedQuery { #[serde(default, skip_serializing_if = "Option::is_none")] disk_stats_every: Option, #[serde(default, skip_serializing_if = "Option::is_none")] - pub merger_out_len_max: Option, + pub merger_out_len_max: Option, #[serde(default, skip_serializing_if = "Option::is_none")] test_do_wasm: Option, #[serde(default)] @@ -159,8 +159,8 @@ impl BinnedQuery { self.subgrids.as_ref().map(|x| x.as_slice()) } - pub fn merger_out_len_max(&self) -> usize { - self.merger_out_len_max.unwrap_or(1024) + pub fn merger_out_len_max(&self) -> Option { + self.merger_out_len_max } pub fn set_series_id(&mut self, series: u64) { diff --git a/crates/query/src/api4/events.rs b/crates/query/src/api4/events.rs index ed867a6..89560ff 100644 --- a/crates/query/src/api4/events.rs +++ b/crates/query/src/api4/events.rs @@ -59,7 +59,7 @@ pub struct PlainEventsQuery { #[serde(default, skip_serializing_if = "Option::is_none")] test_do_wasm: Option, #[serde(default, skip_serializing_if = "Option::is_none")] - merger_out_len_max: Option, + merger_out_len_max: Option, #[serde(default, skip_serializing_if = "Vec::is_empty")] create_errors: Vec, #[serde(default)] @@ -156,8 +156,8 @@ impl PlainEventsQuery { &self.event_delay } - pub fn merger_out_len_max(&self) -> usize { - self.merger_out_len_max.unwrap_or(1024) + pub fn merger_out_len_max(&self) -> Option { + self.merger_out_len_max } pub fn do_test_main_error(&self) -> bool { @@ -417,6 +417,13 @@ pub struct EventsSubQuerySettings { queue_len_disk_io: Option, create_errors: Vec, use_rt: Option, + merger_out_len_max: Option, +} + +impl EventsSubQuerySettings { + pub fn merger_out_len_max(&self) -> Option { + self.merger_out_len_max + } } impl Default for EventsSubQuerySettings { @@ -431,6 +438,7 @@ impl Default for EventsSubQuerySettings { queue_len_disk_io: None, create_errors: Vec::new(), use_rt: None, + merger_out_len_max: None, } } } @@ -448,6 +456,7 @@ impl From<&PlainEventsQuery> for EventsSubQuerySettings { queue_len_disk_io: None, create_errors: value.create_errors.clone(), use_rt: value.use_rt(), + merger_out_len_max: value.merger_out_len_max(), } } } @@ -466,6 +475,7 @@ impl From<&BinnedQuery> for EventsSubQuerySettings { queue_len_disk_io: None, create_errors: Vec::new(), use_rt: value.use_rt(), + merger_out_len_max: value.merger_out_len_max(), } } } @@ -484,6 +494,7 @@ impl From<&Api1Query> for EventsSubQuerySettings { queue_len_disk_io: Some(disk_io_tune.read_queue_len), create_errors: Vec::new(), use_rt: None, + merger_out_len_max: None, } } } @@ -595,6 +606,10 @@ impl EventsSubQuery { pub fn use_rt(&self) -> Option { self.settings.use_rt.clone() } + + pub fn merger_out_len_max(&self) -> Option { + self.settings.merger_out_len_max() + } } #[derive(Debug, Serialize, Deserialize)] diff --git a/crates/query/src/transform.rs b/crates/query/src/transform.rs index 2b71c6e..1fbd2b4 100644 --- a/crates/query/src/transform.rs +++ b/crates/query/src/transform.rs @@ -148,6 +148,10 @@ impl TransformQuery { pub fn enum_as_string(&self) -> Option { self.enum_as_string.clone() } + + pub fn do_wasm(&self) -> Option<&str> { + None + } } impl FromUrl for TransformQuery { diff --git a/crates/scyllaconn/Cargo.toml b/crates/scyllaconn/Cargo.toml index 41b6194..ccc30b0 100644 --- a/crates/scyllaconn/Cargo.toml +++ b/crates/scyllaconn/Cargo.toml @@ -17,5 +17,6 @@ netpod = { path = "../netpod" } query = { path = "../query" } items_0 = { path = "../items_0" } items_2 = { path = "../items_2" } +streams = { path = "../streams" } series = { path = "../../../daqingest/series" } taskrun = { path = "../taskrun" } diff --git a/crates/scyllaconn/src/bincache.rs b/crates/scyllaconn/src/bincache.rs index 23fe67c..215e39f 100644 --- a/crates/scyllaconn/src/bincache.rs +++ b/crates/scyllaconn/src/bincache.rs @@ -1,6 +1,7 @@ #![allow(unused)] use crate::errconv::ErrConv; +use crate::worker::ScyllaQueue; use err::Error; use futures_util::Future; use futures_util::Stream; @@ -519,3 +520,20 @@ pub async fn pre_binned_value_stream( err::todo(); Ok(Box::pin(futures_util::stream::iter([Ok(res.0)]))) } + +pub struct ScyllaCacheReadProvider { + scyqueue: ScyllaQueue, +} + +impl ScyllaCacheReadProvider { + pub fn new(scyqueue: ScyllaQueue) -> Self { + Self { scyqueue } + } +} + +impl streams::timebin::CacheReadProvider for ScyllaCacheReadProvider { + fn read(&self) -> streams::timebin::cached::reader::Reading { + warn!("impl CacheReadProvider for ScyllaCacheReadProvider"); + todo!("impl CacheReadProvider for ScyllaCacheReadProvider") + } +} diff --git a/crates/streams/src/plaineventsstream.rs b/crates/streams/src/plaineventsstream.rs index de63145..8fc0140 100644 --- a/crates/streams/src/plaineventsstream.rs +++ b/crates/streams/src/plaineventsstream.rs @@ -34,13 +34,13 @@ pub async fn dyn_events_stream( open_bytes: OpenBoxedBytesStreamsBox, ) -> Result { trace!("dyn_events_stream {}", evq.summary_short()); + use query::api4::events::EventsSubQuerySettings; let subq = make_sub_query( ch_conf, evq.range().clone(), evq.one_before_range(), evq.transform().clone(), - evq.test_do_wasm(), - evq, + EventsSubQuerySettings::from(evq), evq.log_level().into(), ctx, ); diff --git a/crates/streams/src/tcprawclient.rs b/crates/streams/src/tcprawclient.rs index f863617..a7d2838 100644 --- a/crates/streams/src/tcprawclient.rs +++ b/crates/streams/src/tcprawclient.rs @@ -36,6 +36,7 @@ use query::transform::TransformQuery; use serde::de::DeserializeOwned; use std::fmt; use std::pin::Pin; +use std::sync::Arc; use tokio::io::AsyncWriteExt; use tokio::net::TcpStream; @@ -45,11 +46,12 @@ pub trait OpenBoxedBytesStreams { fn open( &self, subq: EventsSubQuery, + // TODO take by Arc ctx: ReqCtx, ) -> Pin, Error>> + Send>>; } -pub type OpenBoxedBytesStreamsBox = Pin>; +pub type OpenBoxedBytesStreamsBox = Pin>; pub fn make_node_command_frame(query: EventsSubQuery) -> Result { let obj = Frame1Parts::new(query); @@ -202,7 +204,6 @@ pub fn make_sub_query( range: SeriesRange, one_before_range: bool, transform: TransformQuery, - test_do_wasm: Option<&str>, sub: SUB, log_level: String, ctx: &ReqCtx, @@ -210,8 +211,8 @@ pub fn make_sub_query( where SUB: Into, { - let mut select = EventsSubQuerySelect::new(ch_conf, range, one_before_range, transform); - if let Some(wasm1) = test_do_wasm { + let mut select = EventsSubQuerySelect::new(ch_conf, range, one_before_range, transform.clone()); + if let Some(wasm1) = transform.do_wasm() { select.set_wasm1(wasm1.into()); } let settings = sub.into(); diff --git a/crates/streams/src/test.rs b/crates/streams/src/test.rs index d24c5b9..38283e1 100644 --- a/crates/streams/src/test.rs +++ b/crates/streams/src/test.rs @@ -45,7 +45,7 @@ fn merge_mergeable_00() -> Result<(), Error> { let fut = async { let inp0 = inmem_test_events_d0_i32_00(); let inp1 = inmem_test_events_d0_i32_01(); - let _merger = items_2::merger::Merger::new(vec![inp0, inp1], 4); + let _merger = items_2::merger::Merger::new(vec![inp0, inp1], Some(4)); Ok(()) }; runfut(fut) diff --git a/crates/streams/src/test/events.rs b/crates/streams/src/test/events.rs index ba18ae2..ab297b1 100644 --- a/crates/streams/src/test/events.rs +++ b/crates/streams/src/test/events.rs @@ -21,6 +21,7 @@ use netpod::Shape; use query::api4::events::EventsSubQuery; use query::api4::events::PlainEventsQuery; use std::pin::Pin; +use std::sync::Arc; #[test] fn merged_events_cbor() { @@ -46,7 +47,7 @@ async fn merged_events_inner() -> Result<(), Error> { )); let evq = PlainEventsQuery::new(channel, range); let open_bytes = StreamOpener::new(); - let open_bytes = Box::pin(open_bytes); + let open_bytes = Arc::pin(open_bytes); let stream = plain_events_cbor_stream(&evq, ch_conf.clone().into(), &ctx, open_bytes) .await .unwrap(); diff --git a/crates/streams/src/timebin.rs b/crates/streams/src/timebin.rs index f5c6fca..551d88c 100644 --- a/crates/streams/src/timebin.rs +++ b/crates/streams/src/timebin.rs @@ -1,8 +1,10 @@ mod basic; -mod cached; +pub mod cached; mod fromlayers; mod gapfill; mod grid; pub(super) use basic::TimeBinnedStream; pub(super) use fromlayers::TimeBinnedFromLayers; + +pub use cached::reader::CacheReadProvider; diff --git a/crates/streams/src/timebin/basic.rs b/crates/streams/src/timebin/basic.rs index 4fa1ec6..763ecb9 100644 --- a/crates/streams/src/timebin/basic.rs +++ b/crates/streams/src/timebin/basic.rs @@ -252,6 +252,7 @@ where trace2!("================= POLL"); loop { break if self.complete { + error!("TimeBinnedStream poll on complete"); panic!("TimeBinnedStream poll on complete") } else if self.done { self.complete = true; diff --git a/crates/streams/src/timebin/cached/reader.rs b/crates/streams/src/timebin/cached/reader.rs index ec15ffd..dbb949f 100644 --- a/crates/streams/src/timebin/cached/reader.rs +++ b/crates/streams/src/timebin/cached/reader.rs @@ -8,6 +8,7 @@ use netpod::DtMs; use netpod::TsNano; use std::future::Future; use std::pin::Pin; +use std::sync::Arc; use std::task::Context; use std::task::Poll; @@ -23,16 +24,18 @@ impl Future for Reading { } } -pub trait CacheReadProvider: Send { +pub trait CacheReadProvider: Send + Sync { fn read(&self) -> Reading; } #[derive(Debug, ThisError)] #[cstm(name = "BinCachedReader")] -pub enum Error {} +pub enum Error { + TodoImpl, +} pub struct CachedReader { - cache_read_provider: Box, + cache_read_provider: Arc, } impl CachedReader { @@ -40,7 +43,7 @@ impl CachedReader { series: u64, bin_len: DtMs, range: BinnedRange, - cache_read_provider: Box, + cache_read_provider: Arc, ) -> Result { let ret = Self { cache_read_provider }; Ok(ret) @@ -52,6 +55,7 @@ impl Stream for CachedReader { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; + // Ready(Some(Err(Error::TodoImpl))) Ready(None) } } diff --git a/crates/streams/src/timebin/fromlayers.rs b/crates/streams/src/timebin/fromlayers.rs index 3ed6246..a518e46 100644 --- a/crates/streams/src/timebin/fromlayers.rs +++ b/crates/streams/src/timebin/fromlayers.rs @@ -1,4 +1,5 @@ use super::cached::reader::CacheReadProvider; +use crate::tcprawclient::OpenBoxedBytesStreamsBox; use crate::timebin::grid::find_next_finer_bin_len; use err::thiserror; use err::ThisError; @@ -14,9 +15,14 @@ use items_2::binsdim0::BinsDim0; use netpod::log::*; use netpod::BinnedRange; use netpod::BinnedRangeEnum; +use netpod::ChannelTypeConfigGen; use netpod::DtMs; +use netpod::ReqCtx; use netpod::TsNano; +use query::api4::events::EventsSubQuerySettings; +use query::transform::TransformQuery; use std::pin::Pin; +use std::sync::Arc; use std::task::Context; use std::task::Poll; @@ -30,6 +36,12 @@ pub enum Error { type BoxedInput = Pin>> + Send>>; pub struct TimeBinnedFromLayers { + ch_conf: ChannelTypeConfigGen, + transform_query: TransformQuery, + sub: EventsSubQuerySettings, + log_level: String, + ctx: Arc, + open_bytes: OpenBoxedBytesStreamsBox, inp: BoxedInput, } @@ -39,11 +51,17 @@ impl TimeBinnedFromLayers { } pub fn new( + ch_conf: ChannelTypeConfigGen, + transform_query: TransformQuery, + sub: EventsSubQuerySettings, + log_level: String, + ctx: Arc, + open_bytes: OpenBoxedBytesStreamsBox, series: u64, range: BinnedRange, do_time_weight: bool, bin_len_layers: Vec, - cache_read_provider: Box, + cache_read_provider: Arc, ) -> Result { info!( "{}::new {:?} {:?} {:?}", @@ -55,8 +73,28 @@ impl TimeBinnedFromLayers { let bin_len = DtMs::from_ms_u64(range.bin_len.ms()); if bin_len_layers.contains(&bin_len) { info!("{}::new bin_len in layers", Self::type_name()); - let inp = super::gapfill::GapFill::new(series, range, do_time_weight, bin_len_layers, cache_read_provider)?; - let ret = Self { inp: Box::pin(inp) }; + let inp = super::gapfill::GapFill::new( + ch_conf.clone(), + transform_query.clone(), + sub.clone(), + log_level.clone(), + ctx.clone(), + open_bytes.clone(), + series, + range, + do_time_weight, + bin_len_layers, + cache_read_provider, + )?; + let ret = Self { + ch_conf, + transform_query, + sub, + log_level, + ctx, + open_bytes, + inp: Box::pin(inp), + }; Ok(ret) } else { match find_next_finer_bin_len(bin_len, &bin_len_layers) { @@ -64,8 +102,14 @@ impl TimeBinnedFromLayers { // TODO // produce from binned sub-stream with additional binner. let range = BinnedRange::from_nano_range(range.to_nano_range(), finer); - info!("{}::new next finer {:?} {:?}", Self::type_name(), finer, range); + warn!("{}::new next finer {:?} {:?}", Self::type_name(), finer, range); let inp = super::gapfill::GapFill::new( + ch_conf.clone(), + transform_query.clone(), + sub.clone(), + log_level.clone(), + ctx.clone(), + open_bytes.clone(), series, range.clone(), do_time_weight, @@ -77,11 +121,19 @@ impl TimeBinnedFromLayers { BinnedRangeEnum::Time(range), do_time_weight, ); - let ret = Self { inp: Box::pin(inp) }; + let ret = Self { + ch_conf, + transform_query, + sub, + log_level, + ctx, + open_bytes, + inp: Box::pin(inp), + }; Ok(ret) } None => { - info!("{}::new NO next finer", Self::type_name()); + warn!("{}::new NO next finer", Self::type_name()); // TODO // produce from events todo!() @@ -94,7 +146,12 @@ impl TimeBinnedFromLayers { impl Stream for TimeBinnedFromLayers { type Item = Sitemty>; - fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - todo!() + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + use Poll::*; + match self.inp.poll_next_unpin(cx) { + Ready(Some(x)) => Ready(Some(x)), + Ready(None) => Ready(None), + Pending => Pending, + } } } diff --git a/crates/streams/src/timebin/gapfill.rs b/crates/streams/src/timebin/gapfill.rs index 4703c54..cb06e0a 100644 --- a/crates/streams/src/timebin/gapfill.rs +++ b/crates/streams/src/timebin/gapfill.rs @@ -1,14 +1,24 @@ use super::cached::reader::CacheReadProvider; +use crate::tcprawclient::OpenBoxedBytesStreamsBox; use err::thiserror; use err::ThisError; use futures_util::Stream; -use futures_util::TryStreamExt; +use futures_util::StreamExt; +use items_0::streamitem::RangeCompletableItem; use items_0::streamitem::Sitemty; +use items_0::streamitem::StreamItem; use items_2::binsdim0::BinsDim0; +use netpod::log::*; +use netpod::range::evrange::NanoRange; use netpod::BinnedRange; +use netpod::ChannelTypeConfigGen; use netpod::DtMs; +use netpod::ReqCtx; use netpod::TsNano; +use query::api4::events::EventsSubQuerySettings; +use query::transform::TransformQuery; use std::pin::Pin; +use std::sync::Arc; use std::task::Context; use std::task::Poll; @@ -16,32 +26,201 @@ use std::task::Poll; #[cstm(name = "BinCachedGapFill")] pub enum Error { CacheReader(#[from] super::cached::reader::Error), + GapFromFiner, } -type INP = Pin, Error>> + Send>>; +type INP = Pin>> + Send>>; // Try to read from cache for the given bin len. // For gaps in the stream, construct an alternative input from finer bin len with a binner. pub struct GapFill { + ch_conf: ChannelTypeConfigGen, + transform_query: TransformQuery, + sub: EventsSubQuerySettings, + log_level: String, + ctx: Arc, + open_bytes: OpenBoxedBytesStreamsBox, + series: u64, + range: BinnedRange, + do_time_weight: bool, + bin_len_layers: Vec, inp: INP, + inp_buf: Option>, + inp_finer: Option, + last_bin_ts2: Option, + exp_finer_range: NanoRange, + cache_read_provider: Arc, } impl GapFill { // bin_len of the given range must be a cacheable bin_len. pub fn new( + ch_conf: ChannelTypeConfigGen, + transform_query: TransformQuery, + sub: EventsSubQuerySettings, + log_level: String, + ctx: Arc, + open_bytes: OpenBoxedBytesStreamsBox, + series: u64, range: BinnedRange, do_time_weight: bool, bin_len_layers: Vec, - cache_read_provider: Box, + cache_read_provider: Arc, ) -> Result { // super::fromlayers::TimeBinnedFromLayers::new(series, range, do_time_weight, bin_len_layers)?; - let inp = - super::cached::reader::CachedReader::new(series, range.bin_len.to_dt_ms(), range, cache_read_provider)? - .map_err(Error::from); - let ret = Self { inp: Box::pin(inp) }; + let inp = super::cached::reader::CachedReader::new( + series, + range.bin_len.to_dt_ms(), + range.clone(), + cache_read_provider.clone(), + )? + .map(|x| match x { + Ok(x) => Ok(StreamItem::DataItem(RangeCompletableItem::Data(x))), + Err(e) => Err(::err::Error::from_string(e)), + }); + let ret = Self { + ch_conf, + transform_query, + sub, + log_level, + ctx, + open_bytes, + series, + range, + do_time_weight, + bin_len_layers, + inp: Box::pin(inp), + inp_buf: None, + inp_finer: None, + last_bin_ts2: None, + // TODO just dummy: + exp_finer_range: NanoRange { beg: 0, end: 0 }, + cache_read_provider, + }; Ok(ret) } + + fn handle_bins_finer(mut self: Pin<&mut Self>, bins: BinsDim0) -> Result, Error> { + for (&ts1, &ts2) in bins.ts1s.iter().zip(&bins.ts2s) { + if let Some(last) = self.last_bin_ts2 { + if ts1 != last.ns() { + return Err(Error::GapFromFiner); + } + } + self.last_bin_ts2 = Some(TsNano::from_ns(ts2)); + } + + // TODO keep bins from finer source. + // Only write bins to cache if we receive another + + // TODO make sure that input does not send "made-up" empty future bins. + // On the other hand, if the request is over past range, but the channel was silent ever since? + // Then we should in principle know that from is-alive status checking. + // So, until then, allow made-up bins? + // Maybe, for now, only write those bins before some last non-zero-count bin. The only safe way. + + Ok(bins) + } + + fn handle_bins(mut self: Pin<&mut Self>, bins: BinsDim0) -> Result, Error> { + // TODO could use an interface to iterate over opaque bin items that only expose + // edge and count information with all remaining values opaque. + for (i, (&ts1, &ts2)) in bins.ts1s.iter().zip(&bins.ts2s).enumerate() { + if let Some(last) = self.last_bin_ts2 { + if ts1 != last.ns() { + let mut ret = as items_0::Empty>::empty(); + let mut bins = bins; + bins.drain_into(&mut ret, 0..i); + self.inp_buf = Some(bins); + let range = NanoRange { + beg: last.ns(), + end: ts1, + }; + self.setup_inp_finer(range)?; + return Ok(ret); + } + } + self.last_bin_ts2 = Some(TsNano::from_ns(ts2)); + } + Ok(bins) + } + + fn setup_inp_finer(mut self: Pin<&mut Self>, range: NanoRange) -> Result<(), Error> { + // Set up range to fill from finer. + self.exp_finer_range = range.clone(); + if let Some(bin_len_finer) = + super::grid::find_next_finer_bin_len(self.range.bin_len.to_dt_ms(), &self.bin_len_layers) + { + let range_finer = BinnedRange::from_nano_range(range, bin_len_finer); + let inp_finer = GapFill::new( + self.ch_conf.clone(), + self.transform_query.clone(), + self.sub.clone(), + self.log_level.clone(), + self.ctx.clone(), + self.open_bytes.clone(), + self.series, + range_finer.clone(), + self.do_time_weight, + self.bin_len_layers.clone(), + self.cache_read_provider.clone(), + )?; + let stream = Box::pin(inp_finer); + let do_time_weight = self.do_time_weight; + let range = BinnedRange::from_nano_range(range_finer.full_range(), self.range.bin_len.to_dt_ms()); + let stream = + super::basic::TimeBinnedStream::new(stream, netpod::BinnedRangeEnum::Time(range), do_time_weight); + self.inp_finer = Some(Box::pin(stream)); + } else { + let do_time_weight = self.do_time_weight; + let one_before_range = true; + let range = BinnedRange::from_nano_range(range, self.range.bin_len.to_dt_ms()); + let stream = crate::timebinnedjson::TimeBinnableStream::new( + range.full_range(), + one_before_range, + self.ch_conf.clone(), + self.transform_query.clone(), + self.sub.clone(), + self.log_level.clone(), + self.ctx.clone(), + self.open_bytes.clone(), + ); + // let stream: Pin> = stream; + let stream = Box::pin(stream); + // TODO rename TimeBinnedStream to make it more clear that it is the component which initiates the time binning. + let stream = + super::basic::TimeBinnedStream::new(stream, netpod::BinnedRangeEnum::Time(range), do_time_weight); + let stream = stream.map(|item| match item { + Ok(x) => match x { + StreamItem::DataItem(x) => match x { + RangeCompletableItem::Data(mut x) => { + // TODO need a typed time binner + if let Some(x) = x.as_any_mut().downcast_mut::>() { + let y = x.clone(); + Ok(StreamItem::DataItem(RangeCompletableItem::Data(y))) + } else { + Err(::err::Error::with_msg_no_trace( + "GapFill expects incoming BinsDim0", + )) + } + } + RangeCompletableItem::RangeComplete => { + Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)) + } + }, + StreamItem::Log(x) => Ok(StreamItem::Log(x)), + StreamItem::Stats(x) => Ok(StreamItem::Stats(x)), + }, + Err(e) => Err(e), + }); + // let stream: Pin< + // Box>> + Send>, + // > = Box::pin(stream); + self.inp_finer = Some(Box::pin(stream)); + } + Ok(()) + } } impl Stream for GapFill { @@ -49,6 +228,101 @@ impl Stream for GapFill { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; + loop { + break if let Some(inp_finer) = self.inp_finer.as_mut() { + // TODO + // detect also gaps here: if gap from finer, then error. + // on CacheUsage Use or Rereate: + // write these bins to cache because we did not find them in cache before. + match inp_finer.poll_next_unpin(cx) { + Ready(Some(Ok(x))) => match x { + StreamItem::DataItem(RangeCompletableItem::Data(x)) => { + match self.as_mut().handle_bins_finer(x) { + Ok(x) => Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(x))))), + Err(e) => Ready(Some(Err(::err::Error::from_string(e)))), + } + } + StreamItem::DataItem(RangeCompletableItem::RangeComplete) => { + Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)))) + } + StreamItem::Log(x) => Ready(Some(Ok(StreamItem::Log(x)))), + StreamItem::Stats(x) => Ready(Some(Ok(StreamItem::Stats(x)))), + }, + Ready(Some(Err(e))) => Ready(Some(Err(::err::Error::from_string(e)))), + Ready(None) => { + if let Some(j) = self.last_bin_ts2 { + if j.ns() != self.exp_finer_range.end() { + Ready(Some(Err(::err::Error::from_string( + "finer input didn't deliver to the end", + )))) + } else { + self.last_bin_ts2 = None; + self.exp_finer_range = NanoRange { beg: 0, end: 0 }; + self.inp_finer = None; + continue; + } + } else { + Ready(Some(Err(::err::Error::from_string("finer input delivered nothing")))) + } + } + Pending => Pending, + } + } else if let Some(x) = self.inp_buf.take() { + match self.as_mut().handle_bins_finer(x) { + Ok(x) => Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(x))))), + Err(e) => Ready(Some(Err(::err::Error::from_string(e)))), + } + } else { + match self.inp.poll_next_unpin(cx) { + Ready(Some(Ok(x))) => match x { + StreamItem::DataItem(RangeCompletableItem::Data(x)) => match self.as_mut().handle_bins(x) { + Ok(x) => Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(x))))), + Err(e) => Ready(Some(Err(::err::Error::from_string(e)))), + }, + StreamItem::DataItem(RangeCompletableItem::RangeComplete) => { + Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)))) + } + StreamItem::Log(x) => Ready(Some(Ok(StreamItem::Log(x)))), + StreamItem::Stats(x) => Ready(Some(Ok(StreamItem::Stats(x)))), + }, + Ready(Some(Err(e))) => Ready(Some(Err(::err::Error::from_string(e)))), + Ready(None) => { + // TODO assert that we have emitted up to the requested range. + // If not, request the remaining range from "finer" input. + if let Some(j) = self.last_bin_ts2 { + if j.ns() != self.exp_finer_range.end() { + let range = NanoRange { + beg: j.ns(), + end: self.range.full_range().end(), + }; + match self.as_mut().setup_inp_finer(range) { + Ok(()) => { + continue; + } + Err(e) => Ready(Some(Err(::err::Error::from_string(e)))), + } + } else { + // self.last_bin_ts2 = None; + // self.exp_finer_range = NanoRange { beg: 0, end: 0 }; + // self.inp_finer = None; + // continue; + Ready(None) + } + } else { + warn!("----- NOTHING IN CACHE, SETUP FULL FROM FINER"); + let range = self.range.full_range(); + match self.as_mut().setup_inp_finer(range) { + Ok(()) => { + continue; + } + Err(e) => Ready(Some(Err(::err::Error::from_string(e)))), + } + } + } + Pending => Pending, + } + }; + } // When do we detect a gap: // - when the current item poses a gap to the last. // - when we see EOS before the requested range is filled. @@ -59,12 +333,12 @@ impl Stream for GapFill { // When a gap is detected: // - buffer the current item, if there is one (can also be EOS). // - create a new producer of bin: - // - FromFiner(series, bin_len, range) + // - GapFillwith finer range? FromFiner(series, bin_len, range) ? + // - TimeBinnedFromLayers for a bin_len in layers would also go directly into GapFill. // what does FromFiner bring to the table? // It does not attempt to read the given bin-len from a cache, because we just did attempt that. // It still requires that bin-len is cacheable. (NO! it must work with the layering that I passed!) // Then it finds the next cacheable // Ready(None) - todo!("poll the already created cached reader, detect and fill in gaps, send off to cache-write") } } diff --git a/crates/streams/src/timebinnedjson.rs b/crates/streams/src/timebinnedjson.rs index 42bdeee..40fd6a8 100644 --- a/crates/streams/src/timebinnedjson.rs +++ b/crates/streams/src/timebinnedjson.rs @@ -3,11 +3,14 @@ use crate::rangefilter2::RangeFilter2; use crate::tcprawclient::container_stream_from_bytes_stream; use crate::tcprawclient::make_sub_query; use crate::tcprawclient::OpenBoxedBytesStreamsBox; +use crate::timebin::CacheReadProvider; use crate::timebin::TimeBinnedStream; use crate::transform::build_merged_event_transform; use crate::transform::EventsToTimeBinnable; use err::Error; use futures_util::future::BoxFuture; +use futures_util::Future; +use futures_util::FutureExt; use futures_util::Stream; use futures_util::StreamExt; use items_0::collect_s::Collectable; @@ -15,6 +18,7 @@ use items_0::on_sitemty_data; use items_0::streamitem::RangeCompletableItem; use items_0::streamitem::Sitemty; use items_0::streamitem::StreamItem; +use items_0::timebin::TimeBinnable; use items_0::timebin::TimeBinned; use items_0::transform::TimeBinnableStreamBox; use items_0::transform::TimeBinnableStreamTrait; @@ -30,8 +34,13 @@ use netpod::ChannelTypeConfigGen; use netpod::DtMs; use netpod::ReqCtx; use query::api4::binned::BinnedQuery; +use query::api4::events::EventsSubQuerySettings; +use query::transform::TransformQuery; use serde_json::Value as JsonValue; use std::pin::Pin; +use std::sync::Arc; +use std::task::Context; +use std::task::Poll; use std::time::Duration; use std::time::Instant; @@ -40,28 +49,30 @@ fn assert_stream_send<'u, R>(stream: impl 'u + Send + Stream) -> impl stream } -async fn timebinnable_stream( - query: BinnedQuery, +// TODO factor out, it is use now also from GapFill. +pub async fn timebinnable_stream( range: NanoRange, one_before_range: bool, ch_conf: ChannelTypeConfigGen, - ctx: &ReqCtx, + transform_query: TransformQuery, + sub: EventsSubQuerySettings, + log_level: String, + ctx: Arc, open_bytes: OpenBoxedBytesStreamsBox, ) -> Result { let subq = make_sub_query( ch_conf, range.clone().into(), one_before_range, - query.transform().clone(), - query.test_do_wasm(), - &query, - query.log_level().into(), - ctx, + transform_query, + sub.clone(), + log_level.clone(), + &ctx, ); let inmem_bufcap = subq.inmem_bufcap(); let _wasm1 = subq.wasm1().map(ToString::to_string); let mut tr = build_merged_event_transform(subq.transform())?; - let bytes_streams = open_bytes.open(subq, ctx.clone()).await?; + let bytes_streams = open_bytes.open(subq, ctx.as_ref().clone()).await?; let mut inps = Vec::new(); for s in bytes_streams { let s = container_stream_from_bytes_stream::(s, inmem_bufcap.clone(), "TODOdbgdesc".into())?; @@ -70,7 +81,7 @@ async fn timebinnable_stream( } // TODO propagate also the max-buf-len for the first stage event reader. // TODO use a mixture of count and byte-size as threshold. - let stream = Merger::new(inps, query.merger_out_len_max()); + let stream = Merger::new(inps, sub.merger_out_len_max()); let stream = RangeFilter2::new(stream, range, one_before_range); @@ -221,16 +232,88 @@ async fn timebinnable_stream( Ok(TimeBinnableStreamBox(stream)) } +pub struct TimeBinnableStream { + make_stream_fut: Option> + Send>>>, + stream: Option>> + Send>>>, +} + +impl TimeBinnableStream { + pub fn new( + range: NanoRange, + one_before_range: bool, + ch_conf: ChannelTypeConfigGen, + transform_query: TransformQuery, + sub: EventsSubQuerySettings, + log_level: String, + // TODO take by Arc ref + ctx: Arc, + open_bytes: OpenBoxedBytesStreamsBox, + ) -> Self { + let fut = timebinnable_stream( + range, + one_before_range, + ch_conf, + transform_query, + sub, + log_level, + ctx, + open_bytes, + ); + let fut = Box::pin(fut); + Self { + make_stream_fut: Some(fut), + stream: None, + } + } +} + +// impl WithTransformProperties + Send + +impl Stream for TimeBinnableStream { + type Item = Sitemty>; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + use Poll::*; + loop { + break if let Some(fut) = self.make_stream_fut.as_mut() { + match fut.poll_unpin(cx) { + Ready(x) => match x { + Ok(x) => { + self.make_stream_fut = None; + self.stream = Some(Box::pin(x)); + continue; + } + Err(e) => Ready(Some(Err(e))), + }, + Pending => Pending, + } + } else if let Some(fut) = self.stream.as_mut() { + match fut.poll_next_unpin(cx) { + Ready(Some(x)) => Ready(Some(x)), + Ready(None) => { + self.stream = None; + Ready(None) + } + Pending => Pending, + } + } else { + Ready(None) + }; + } + } +} + async fn timebinned_stream( query: BinnedQuery, binned_range: BinnedRangeEnum, ch_conf: ChannelTypeConfigGen, ctx: &ReqCtx, open_bytes: OpenBoxedBytesStreamsBox, + cache_read_provider: Option>, ) -> Result>> + Send>>, Error> { use netpod::query::CacheUsage; - match query.cache_usage() { - CacheUsage::Use | CacheUsage::Recreate => { + match (query.cache_usage(), cache_read_provider) { + (CacheUsage::Use | CacheUsage::Recreate, Some(cache_read_provider)) => { let series = if let Some(x) = query.channel().series() { x } else { @@ -256,8 +339,13 @@ async fn timebinned_stream( // DtMs::from_ms_u64(1000 * 10), ] }; - let cache_read_provider = err::todoval(); let stream = crate::timebin::TimeBinnedFromLayers::new( + ch_conf, + query.transform().clone(), + EventsSubQuerySettings::from(&query), + query.log_level().into(), + Arc::new(ctx.clone()), + open_bytes.clone(), series, binned_range.binned_range_time(), do_time_weight, @@ -273,13 +361,23 @@ async fn timebinned_stream( let stream: Pin>> + Send>> = Box::pin(stream); Ok(stream) } - CacheUsage::Ignore => { + _ => { let range = binned_range.binned_range_time().to_nano_range(); let do_time_weight = true; let one_before_range = true; - let stream = timebinnable_stream(query.clone(), range, one_before_range, ch_conf, ctx, open_bytes).await?; + let stream = timebinnable_stream( + range, + one_before_range, + ch_conf, + query.transform().clone(), + (&query).into(), + query.log_level().into(), + Arc::new(ctx.clone()), + open_bytes, + ) + .await?; let stream: Pin> = stream.0; let stream = Box::pin(stream); // TODO rename TimeBinnedStream to make it more clear that it is the component which initiates the time binning. @@ -309,13 +407,22 @@ pub async fn timebinned_json( ch_conf: ChannelTypeConfigGen, ctx: &ReqCtx, open_bytes: OpenBoxedBytesStreamsBox, + cache_read_provider: Option>, ) -> Result { let deadline = Instant::now() + query.timeout_content().unwrap_or(Duration::from_millis(5000)); let binned_range = BinnedRangeEnum::covering_range(query.range().clone(), query.bin_count())?; // TODO derive better values, from query let collect_max = 10000; let bytes_max = 100 * collect_max; - let stream = timebinned_stream(query.clone(), binned_range.clone(), ch_conf, ctx, open_bytes).await?; + let stream = timebinned_stream( + query.clone(), + binned_range.clone(), + ch_conf, + ctx, + open_bytes, + cache_read_provider, + ) + .await?; let stream = timebinned_to_collectable(stream); let collected = Collect::new(stream, deadline, collect_max, bytes_max, None, Some(binned_range)); let collected: BoxFuture<_> = Box::pin(collected);