From cf3c705823af3d424da005481d7b4b8d1ffa7f3b Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Mon, 28 Oct 2024 11:04:55 +0100 Subject: [PATCH] Remove unused --- crates/httpret/src/api4/accounting.rs | 3 - crates/httpret/src/api4/binned.rs | 43 +-- crates/items_0/src/overlap.rs | 143 ---------- crates/items_0/src/timebin.rs | 62 +---- crates/items_0/src/timebin/timebinimpl.rs | 199 -------------- crates/items_2/src/binsdim0.rs | 115 +------- crates/items_2/src/binsxbindim0.rs | 16 -- crates/items_2/src/channelevents.rs | 35 +-- crates/items_2/src/eventsdim0.rs | 46 +--- crates/items_2/src/eventsdim0enum.rs | 15 -- crates/items_2/src/eventsdim1.rs | 5 +- crates/items_2/src/eventsxbindim0.rs | 69 +---- crates/items_2/src/items_2.rs | 3 +- crates/items_2/src/timebin.rs | 89 +----- crates/streams/src/timebinnedjson.rs | 312 +++++----------------- 15 files changed, 102 insertions(+), 1053 deletions(-) delete mode 100644 crates/items_0/src/timebin/timebinimpl.rs diff --git a/crates/httpret/src/api4/accounting.rs b/crates/httpret/src/api4/accounting.rs index 5c7d311..ec37dbd 100644 --- a/crates/httpret/src/api4/accounting.rs +++ b/crates/httpret/src/api4/accounting.rs @@ -5,7 +5,6 @@ use crate::ReqCtx; use crate::ServiceSharedResources; use dbconn::worker::PgQueue; use err::ToPublicError; -use futures_util::StreamExt; use http::Method; use http::StatusCode; use httpclient::body_empty; @@ -14,8 +13,6 @@ use httpclient::IntoBody; use httpclient::Requ; use httpclient::StreamResponse; use httpclient::ToJsonBody; -use items_0::Empty; -use items_0::Extendable; use items_2::accounting::AccountingEvents; use netpod::log::*; use netpod::req_uri_to_url; diff --git a/crates/httpret/src/api4/binned.rs b/crates/httpret/src/api4/binned.rs index 444ed69..b1c4728 100644 --- a/crates/httpret/src/api4/binned.rs +++ b/crates/httpret/src/api4/binned.rs @@ -207,22 +207,13 @@ async fn binned_json_single( span1.in_scope(|| { debug!("begin"); }); - let open_bytes = OpenBoxedBytesViaHttp::new(ncc.node_config.cluster.clone()); - let open_bytes = Arc::pin(open_bytes); - let open_bytes2 = Arc::pin(OpenBoxedBytesViaHttp::new(ncc.node_config.cluster.clone())); + let open_bytes = Arc::pin(OpenBoxedBytesViaHttp::new(ncc.node_config.cluster.clone())); let (events_read_provider, cache_read_provider) = - make_read_provider(ch_conf.name(), scyqueue, open_bytes2, ctx, ncc); - let item = streams::timebinnedjson::timebinned_json( - query, - ch_conf, - ctx, - open_bytes, - cache_read_provider, - events_read_provider, - ) - .instrument(span1) - .await - .map_err(|e| Error::BinnedStream(e))?; + make_read_provider(ch_conf.name(), scyqueue, open_bytes, ctx, ncc); + let item = streams::timebinnedjson::timebinned_json(query, ch_conf, ctx, cache_read_provider, events_read_provider) + .instrument(span1) + .await + .map_err(|e| Error::BinnedStream(e))?; match item { CollectResult::Some(item) => { let ret = response(StatusCode::OK) @@ -272,22 +263,14 @@ async fn binned_json_framed( span1.in_scope(|| { debug!("begin"); }); - let open_bytes = OpenBoxedBytesViaHttp::new(ncc.node_config.cluster.clone()); - let open_bytes = Arc::pin(open_bytes); - let open_bytes2 = Arc::pin(OpenBoxedBytesViaHttp::new(ncc.node_config.cluster.clone())); + let open_bytes = Arc::pin(OpenBoxedBytesViaHttp::new(ncc.node_config.cluster.clone())); let (events_read_provider, cache_read_provider) = - make_read_provider(ch_conf.name(), scyqueue, open_bytes2, ctx, ncc); - let stream = streams::timebinnedjson::timebinned_json_framed( - query, - ch_conf, - ctx, - open_bytes, - cache_read_provider, - events_read_provider, - ) - .instrument(span1) - .await - .map_err(|e| Error::BinnedStream(e))?; + make_read_provider(ch_conf.name(), scyqueue, open_bytes, ctx, ncc); + let stream = + streams::timebinnedjson::timebinned_json_framed(query, ch_conf, ctx, cache_read_provider, events_read_provider) + .instrument(span1) + .await + .map_err(|e| Error::BinnedStream(e))?; let stream = bytes_chunks_to_len_framed_str(stream); let ret = response(StatusCode::OK) .header(CONTENT_TYPE, APP_JSON_FRAMED) diff --git a/crates/items_0/src/overlap.rs b/crates/items_0/src/overlap.rs index 34f0116..2943395 100644 --- a/crates/items_0/src/overlap.rs +++ b/crates/items_0/src/overlap.rs @@ -1,5 +1,3 @@ -use netpod::range::evrange::SeriesRange; - // TODO rename, no more deque involved pub trait HasTimestampDeque { fn timestamp_min(&self) -> Option; @@ -7,144 +5,3 @@ pub trait HasTimestampDeque { fn pulse_min(&self) -> Option; fn pulse_max(&self) -> Option; } - -pub trait RangeOverlapInfo { - fn ends_before(&self, range: &SeriesRange) -> bool; - fn ends_after(&self, range: &SeriesRange) -> bool; - fn starts_after(&self, range: &SeriesRange) -> bool; -} - -#[macro_export] -macro_rules! impl_range_overlap_info_events { - ($ty:ident) => { - impl RangeOverlapInfo for $ty - where - STY: ScalarOps, - { - fn ends_before(&self, range: &SeriesRange) -> bool { - if range.is_time() { - if let Some(max) = HasTimestampDeque::timestamp_max(self) { - max < range.beg_u64() - } else { - true - } - } else if range.is_pulse() { - if let Some(max) = HasTimestampDeque::pulse_max(self) { - max < range.beg_u64() - } else { - true - } - } else { - error!("unexpected"); - true - } - } - - fn ends_after(&self, range: &SeriesRange) -> bool { - if range.is_time() { - if let Some(max) = HasTimestampDeque::timestamp_max(self) { - max >= range.end_u64() - } else { - true - } - } else if range.is_pulse() { - if let Some(max) = HasTimestampDeque::pulse_max(self) { - max >= range.end_u64() - } else { - true - } - } else { - error!("unexpected"); - false - } - } - - fn starts_after(&self, range: &SeriesRange) -> bool { - if range.is_time() { - if let Some(min) = HasTimestampDeque::timestamp_min(self) { - min >= range.end_u64() - } else { - true - } - } else if range.is_pulse() { - if let Some(min) = HasTimestampDeque::pulse_min(self) { - min >= range.end_u64() - } else { - true - } - } else { - error!("unexpected"); - true - } - } - } - }; -} - -#[macro_export] -macro_rules! impl_range_overlap_info_bins { - ($ty:ident) => { - impl RangeOverlapInfo for $ty - where - STY: ScalarOps, - { - fn ends_before(&self, range: &SeriesRange) -> bool { - if range.is_time() { - if let Some(max) = HasTimestampDeque::timestamp_max(self) { - max <= range.beg_u64() - } else { - true - } - } else if range.is_pulse() { - // TODO for the time being, the ts represent either ts or pulse - if let Some(max) = HasTimestampDeque::timestamp_max(self) { - max <= range.beg_u64() - } else { - true - } - } else { - error!("unexpected"); - true - } - } - - fn ends_after(&self, range: &SeriesRange) -> bool { - if range.is_time() { - if let Some(max) = HasTimestampDeque::timestamp_max(self) { - max > range.end_u64() - } else { - true - } - } else if range.is_pulse() { - if let Some(max) = HasTimestampDeque::timestamp_max(self) { - max > range.end_u64() - } else { - true - } - } else { - error!("unexpected"); - false - } - } - - fn starts_after(&self, range: &SeriesRange) -> bool { - if range.is_time() { - if let Some(min) = HasTimestampDeque::timestamp_min(self) { - min >= range.end_u64() - } else { - true - } - } else if range.is_pulse() { - if let Some(min) = HasTimestampDeque::timestamp_min(self) { - min >= range.end_u64() - } else { - true - } - } else { - error!("unexpected"); - true - } - } - } - }; -} diff --git a/crates/items_0/src/timebin.rs b/crates/items_0/src/timebin.rs index 66c5ace..e0f428a 100644 --- a/crates/items_0/src/timebin.rs +++ b/crates/items_0/src/timebin.rs @@ -1,28 +1,17 @@ -pub mod timebinimpl; - use crate::collect_s::CollectableDyn; use crate::collect_s::CollectorDyn; use crate::collect_s::ToJsonResult; -use crate::container::ByteEstimate; -use crate::overlap::RangeOverlapInfo; -use crate::vecpreview::PreviewRange; use crate::AsAnyMut; use crate::AsAnyRef; -use crate::Empty; use crate::Events; use crate::Resettable; use crate::TypeName; use crate::WithLen; -use err::thiserror; use err::Error; -use err::ThisError; use netpod::log::*; -use netpod::range::evrange::SeriesRange; use netpod::BinnedRange; use netpod::BinnedRangeEnum; use netpod::TsNano; -use serde::Deserialize; -use serde::Serialize; use std::any::Any; use std::fmt; use std::ops::Range; @@ -170,20 +159,6 @@ impl Clone for Box { } } -impl RangeOverlapInfo for Box { - fn ends_before(&self, range: &SeriesRange) -> bool { - todo!() - } - - fn ends_after(&self, range: &SeriesRange) -> bool { - todo!() - } - - fn starts_after(&self, range: &SeriesRange) -> bool { - todo!() - } -} - impl TimeBinnable for Box { fn time_binner_new( &self, @@ -230,9 +205,7 @@ pub trait TimeBinner: fmt::Debug + Send { /// Provides a time-binned representation of the implementing type. /// In contrast to `TimeBinnableType` this is meant for trait objects. -pub trait TimeBinnable: - fmt::Debug + WithLen + RangeOverlapInfo + CollectableDyn + Any + AsAnyRef + AsAnyMut + Send -{ +pub trait TimeBinnable: fmt::Debug + WithLen + CollectableDyn + Any + AsAnyRef + AsAnyMut + Send { // TODO implementors may fail if edges contain not at least 2 entries. fn time_binner_new( &self, @@ -252,21 +225,6 @@ impl WithLen for Box { } } -#[allow(unused)] -impl RangeOverlapInfo for Box { - fn ends_before(&self, range: &SeriesRange) -> bool { - todo!() - } - - fn ends_after(&self, range: &SeriesRange) -> bool { - todo!() - } - - fn starts_after(&self, range: &SeriesRange) -> bool { - todo!() - } -} - impl TimeBinnable for Box { fn time_binner_new( &self, @@ -286,20 +244,6 @@ impl TimeBinnable for Box { } } -impl RangeOverlapInfo for Box { - fn ends_before(&self, range: &SeriesRange) -> bool { - RangeOverlapInfo::ends_before(self.as_ref(), range) - } - - fn ends_after(&self, range: &SeriesRange) -> bool { - RangeOverlapInfo::ends_after(self.as_ref(), range) - } - - fn starts_after(&self, range: &SeriesRange) -> bool { - RangeOverlapInfo::starts_after(self.as_ref(), range) - } -} - impl TimeBinnable for Box { fn time_binner_new( &self, @@ -425,7 +369,7 @@ impl TimeBinnerTy for TimeBinnerDynStruct { } impl TimeBinner for TimeBinnerDynStruct { - fn ingest(&mut self, item: &mut dyn TimeBinnable) { + fn ingest(&mut self, _item: &mut dyn TimeBinnable) { todo!() } @@ -437,7 +381,7 @@ impl TimeBinner for TimeBinnerDynStruct { todo!() } - fn push_in_progress(&mut self, push_empty: bool) { + fn push_in_progress(&mut self, _push_empty: bool) { todo!() } diff --git a/crates/items_0/src/timebin/timebinimpl.rs b/crates/items_0/src/timebin/timebinimpl.rs deleted file mode 100644 index e6baf7e..0000000 --- a/crates/items_0/src/timebin/timebinimpl.rs +++ /dev/null @@ -1,199 +0,0 @@ -#![allow(unused)] -use crate::timebin::TimeBinnable; -use crate::timebin::TimeBinned; -use crate::timebin::TimeBinner; -use crate::timebin::TimeBinnerIngest; -use crate::TypeName; -use netpod::log::*; -use netpod::range::evrange::NanoRange; - -#[allow(unused)] -macro_rules! trace2 { - ($($arg:tt)*) => { trace!($($arg)*) }; -} - -#[allow(unused)] -macro_rules! trace_ingest { - ($($arg:tt)*) => { trace!($($arg)*) }; -} - -#[cfg(target_abi = "x32")] -impl TimeBinner for T -where - T: TimeBinnerIngest, -{ - fn bins_ready_count(&self) -> usize { - match &self.ready { - Some(k) => k.len(), - None => 0, - } - } - - fn bins_ready(&mut self) -> Option> { - match self.ready.take() { - Some(k) => Some(Box::new(k)), - None => None, - } - } - - fn ingest(&mut self, item: &mut dyn TimeBinnable) { - trace2!( - "TimeBinner for {} ingest agg.range {:?} item {:?}", - self.type_name(), - self.agg.range(), - item - ); - if item.len() == 0 { - // Return already here, RangeOverlapInfo would not give much sense. - return; - } - // TODO optimize by remembering at which event array index we have arrived. - // That needs modified interfaces which can take and yield the start and latest index. - loop { - while item.starts_after(self.agg.range()) { - trace!( - "{} IGNORE ITEM AND CYCLE BECAUSE item.starts_after", - self.type_name() - ); - self.cycle(); - if self.rng.is_none() { - warn!("{} no more bin in edges B", self.type_name()); - return; - } - } - if item.ends_before(self.agg.range()) { - trace!("{} IGNORE ITEM BECAUSE ends_before", self.type_name()); - return; - } else { - if self.rng.is_none() { - trace!("{} no more bin in edges D", self.type_name()); - return; - } else { - match TimeBinnerIngest::ingest_inrange(self, item) { - Ok(()) => { - if item.ends_after(self.agg.range()) { - trace_ingest!("{} FED ITEM, ENDS AFTER.", self.type_name()); - self.cycle(); - if self.rng.is_none() { - warn!("{} no more bin in edges C", self.type_name()); - return; - } else { - trace_ingest!("{} FED ITEM, CYCLED, CONTINUE.", self.type_name()); - } - } else { - trace_ingest!("{} FED ITEM.", self.type_name()); - break; - } - } - Err(e) => { - error!("{}::ingest {}", self.type_name(), e); - } - } - /* - // Move to TimeBinnerIngest - if let Some(item) = item - .as_any_ref() - // TODO make statically sure that we attempt to cast to the correct type here: - .downcast_ref::< as TimeBinnableTypeAggregator>::Input>() - { - // TODO collect statistics associated with this request: - trace_ingest!("{self_name} FEED THE ITEM..."); - self.agg.ingest(item); - } else { - error!("{self_name}::ingest unexpected item type"); - }; - */ - } - } - } - } - - fn push_in_progress(&mut self, push_empty: bool) { - trace!("{}::push_in_progress push_empty {push_empty}", self.type_name()); - // TODO expand should be derived from AggKind. Is it still required after all? - // TODO here, the expand means that agg will assume that the current value is kept constant during - // the rest of the time range. - if self.rng.is_none() { - } else { - let expand = true; - let range_next = self.next_bin_range(); - self.rng = range_next.clone(); - let mut bins = if let Some(range_next) = range_next { - self.agg.result_reset(range_next, expand) - } else { - // Acts as placeholder - let range_next = NanoRange { - beg: u64::MAX - 1, - end: u64::MAX, - }; - self.agg.result_reset(range_next.into(), expand) - }; - if bins.len() != 1 { - error!("{}::push_in_progress bins.len() {}", self.type_name(), bins.len()); - return; - } else { - if push_empty || bins.counts[0] != 0 { - match self.ready.as_mut() { - Some(ready) => { - ready.append_all_from(&mut bins); - } - None => { - self.ready = Some(bins); - } - } - } - } - } - } - - fn cycle(&mut self) { - trace!("{}::cycle", self.type_name()); - // TODO refactor this logic. - let n = self.bins_ready_count(); - self.push_in_progress(true); - if self.bins_ready_count() == n { - let range_next = self.next_bin_range(); - self.rng = range_next.clone(); - if let Some(range) = range_next { - /* - TODO Move out to trait. - let mut bins = BinsDim0::empty(); - if range.is_time() { - bins.append_zero(range.beg_u64(), range.end_u64()); - } else { - error!("TODO {self_name}::cycle is_pulse"); - } - match self.ready.as_mut() { - Some(ready) => { - ready.append_all_from(&mut bins); - } - None => { - self.ready = Some(bins); - } - } - */ - if self.bins_ready_count() <= n { - error!("{}::cycle failed to push a zero bin", self.type_name()); - } - } else { - warn!( - "{}::cycle no in-progress bin pushed, but also no more bin to add as zero-bin", - self.type_name() - ); - } - } - } - - fn set_range_complete(&mut self) { - self.range_final = true; - } - - fn empty(&self) -> Box { - /* - TODO factor out to trait. - let ret = as TimeBinnableTypeAggregator>::Output::empty(); - */ - let ret = todo!(); - Box::new(ret) - } -} diff --git a/crates/items_2/src/binsdim0.rs b/crates/items_2/src/binsdim0.rs index e825118..b986242 100644 --- a/crates/items_2/src/binsdim0.rs +++ b/crates/items_2/src/binsdim0.rs @@ -3,11 +3,8 @@ use crate::timebin::TimeBinnerCommonV0Trait; use crate::ts_offs_from_abs; use crate::ts_offs_from_abs_with_anchor; use crate::IsoDateTime; -use crate::RangeOverlapInfo; use crate::TimeBinnableType; use crate::TimeBinnableTypeAggregator; -use chrono::TimeZone; -use chrono::Utc; use err::Error; use items_0::collect_s::CollectableDyn; use items_0::collect_s::CollectableType; @@ -285,8 +282,6 @@ impl HasTimestampDeque for BinsDim0 { } } -items_0::impl_range_overlap_info_bins!(BinsDim0); - impl AppendEmptyBin for BinsDim0 { fn append_empty_bin(&mut self, ts1: u64, ts2: u64) { debug!("AppendEmptyBin::append_empty_bin should not get used"); @@ -1230,121 +1225,17 @@ impl TimeBinner for BinsDim0TimeBinner { } fn ingest(&mut self, item: &mut dyn TimeBinnable) { - /*let self_name = any::type_name::(); - if item.len() == 0 { - // Return already here, RangeOverlapInfo would not give much sense. - return; - } - if self.edges.len() < 2 { - warn!("TimeBinnerDyn for {self_name} no more bin in edges A\n{:?}\n\n", item); - return; - } - // TODO optimize by remembering at which event array index we have arrived. - // That needs modified interfaces which can take and yield the start and latest index. - loop { - while item.starts_after(NanoRange { - beg: 0, - end: self.edges[1], - }) { - self.cycle(); - if self.edges.len() < 2 { - warn!("TimeBinnerDyn for {self_name} no more bin in edges B\n{:?}\n\n", item); - return; - } - } - if item.ends_before(NanoRange { - beg: self.edges[0], - end: u64::MAX, - }) { - return; - } else { - if self.edges.len() < 2 { - warn!("TimeBinnerDyn for {self_name} edge list exhausted"); - return; - } else { - let agg = if let Some(agg) = self.agg.as_mut() { - agg - } else { - self.agg = Some(BinsDim0Aggregator::new( - // We know here that we have enough edges for another bin. - // and `next_bin_range` will pop the first edge. - self.next_bin_range().unwrap(), - self.do_time_weight, - )); - self.agg.as_mut().unwrap() - }; - if let Some(item) = item - .as_any_ref() - // TODO make statically sure that we attempt to cast to the correct type here: - .downcast_ref::< as TimeBinnableTypeAggregator>::Input>() - { - agg.ingest(item); - } else { - let tyid_item = std::any::Any::type_id(item.as_any_ref()); - error!("not correct item type {:?}", tyid_item); - }; - if item.ends_after(agg.range().clone()) { - self.cycle(); - if self.edges.len() < 2 { - warn!("TimeBinnerDyn for {self_name} no more bin in edges C\n{:?}\n\n", item); - return; - } - } else { - break; - } - } - } - }*/ - TimeBinnerCommonV0Func::ingest(self, item) + panic!("TODO do not use") } // TODO there is too much common code between implementors: fn push_in_progress(&mut self, push_empty: bool) { - // TODO expand should be derived from AggKind. Is it still required after all? - /*let expand = true; - if let Some(agg) = self.agg.as_mut() { - let dummy_range = NanoRange { beg: 4, end: 5 }; - let mut bins = agg.result_reset(dummy_range, expand); - self.agg = None; - assert_eq!(bins.len(), 1); - if push_empty || bins.counts[0] != 0 { - match self.ready.as_mut() { - Some(ready) => { - ready.append_all_from(&mut bins); - } - None => { - self.ready = Some(bins); - } - } - } - }*/ - TimeBinnerCommonV0Func::push_in_progress(self, push_empty) + panic!("TODO do not use") } // TODO there is too much common code between implementors: fn cycle(&mut self) { - /*let n = self.bins_ready_count(); - self.push_in_progress(true); - if self.bins_ready_count() == n { - if let Some(range) = self.next_bin_range() { - let mut bins = BinsDim0::::empty(); - bins.append_zero(range.beg, range.end); - match self.ready.as_mut() { - Some(ready) => { - ready.append_all_from(&mut bins); - } - None => { - self.ready = Some(bins); - } - } - if self.bins_ready_count() <= n { - error!("failed to push a zero bin"); - } - } else { - warn!("cycle: no in-progress bin pushed, but also no more bin to add as zero-bin"); - } - }*/ - TimeBinnerCommonV0Func::cycle(self) + panic!("TODO do not use") } fn set_range_complete(&mut self) { diff --git a/crates/items_2/src/binsxbindim0.rs b/crates/items_2/src/binsxbindim0.rs index 3835406..da7b0d4 100644 --- a/crates/items_2/src/binsxbindim0.rs +++ b/crates/items_2/src/binsxbindim0.rs @@ -1,10 +1,8 @@ use crate::ts_offs_from_abs; use crate::ts_offs_from_abs_with_anchor; use crate::IsoDateTime; -use crate::RangeOverlapInfo; use crate::TimeBinnableType; use crate::TimeBinnableTypeAggregator; -use chrono::{TimeZone, Utc}; use err::Error; use items_0::collect_s::CollectableDyn; use items_0::collect_s::CollectableType; @@ -235,20 +233,6 @@ impl Resettable for BinsXbinDim0 { } } -impl RangeOverlapInfo for BinsXbinDim0 { - fn ends_before(&self, range: &SeriesRange) -> bool { - todo!() - } - - fn ends_after(&self, range: &SeriesRange) -> bool { - todo!() - } - - fn starts_after(&self, range: &SeriesRange) -> bool { - todo!() - } -} - impl AppendEmptyBin for BinsXbinDim0 { fn append_empty_bin(&mut self, ts1: u64, ts2: u64) { self.ts1s.push_back(ts1); diff --git a/crates/items_2/src/channelevents.rs b/crates/items_2/src/channelevents.rs index 33c26d3..5c1eefa 100644 --- a/crates/items_2/src/channelevents.rs +++ b/crates/items_2/src/channelevents.rs @@ -7,7 +7,6 @@ use items_0::collect_s::CollectorDyn; use items_0::container::ByteEstimate; use items_0::framable::FrameTypeInnerStatic; use items_0::isodate::IsoDateTime; -use items_0::overlap::RangeOverlapInfo; use items_0::streamitem::ITEMS_2_CHANNEL_EVENTS_FRAME_TYPE_ID; use items_0::timebin::TimeBinnable; use items_0::timebin::TimeBinnableTy; @@ -827,20 +826,6 @@ impl Mergeable for ChannelEvents { } } -impl RangeOverlapInfo for ChannelEvents { - fn ends_before(&self, range: &SeriesRange) -> bool { - todo!() - } - - fn ends_after(&self, range: &SeriesRange) -> bool { - todo!() - } - - fn starts_after(&self, range: &SeriesRange) -> bool { - todo!() - } -} - impl TimeBinnable for ChannelEvents { fn time_binner_new( &self, @@ -865,7 +850,7 @@ impl EventsNonObj for ChannelEvents { fn into_tss_pulses(self: Box) -> (VecDeque, VecDeque) { match *self { ChannelEvents::Events(k) => k.into_tss_pulses(), - ChannelEvents::Status(k) => (VecDeque::new(), VecDeque::new()), + ChannelEvents::Status(_) => (VecDeque::new(), VecDeque::new()), } } } @@ -910,7 +895,7 @@ impl Events for ChannelEvents { todo!() } - fn take_new_events_until_ts(&mut self, ts_end: u64) -> Box { + fn take_new_events_until_ts(&mut self, _ts_end: u64) -> Box { todo!() } @@ -937,15 +922,15 @@ impl Events for ChannelEvents { } } - fn find_lowest_index_gt_evs(&self, ts: u64) -> Option { + fn find_lowest_index_gt_evs(&self, _ts: u64) -> Option { todo!() } - fn find_lowest_index_ge_evs(&self, ts: u64) -> Option { + fn find_lowest_index_ge_evs(&self, _ts: u64) -> Option { todo!() } - fn find_highest_index_lt_evs(&self, ts: u64) -> Option { + fn find_highest_index_lt_evs(&self, _ts: u64) -> Option { todo!() } @@ -953,7 +938,7 @@ impl Events for ChannelEvents { todo!() } - fn partial_eq_dyn(&self, other: &dyn Events) -> bool { + fn partial_eq_dyn(&self, _other: &dyn Events) -> bool { todo!() } @@ -990,7 +975,7 @@ impl Events for ChannelEvents { fn to_json_string(&self) -> String { match self { ChannelEvents::Events(item) => item.to_json_string(), - ChannelEvents::Status(item) => { + ChannelEvents::Status(_item) => { error!("TODO convert status to json"); String::new() } @@ -1000,7 +985,7 @@ impl Events for ChannelEvents { fn to_json_vec_u8(&self) -> Vec { match self { ChannelEvents::Events(item) => item.to_json_vec_u8(), - ChannelEvents::Status(item) => { + ChannelEvents::Status(_item) => { error!("TODO convert status to json"); Vec::new() } @@ -1010,7 +995,7 @@ impl Events for ChannelEvents { fn to_cbor_vec_u8(&self) -> Vec { match self { ChannelEvents::Events(item) => item.to_cbor_vec_u8(), - ChannelEvents::Status(item) => { + ChannelEvents::Status(_item) => { error!("TODO convert status to cbor"); Vec::new() } @@ -1030,7 +1015,7 @@ impl Events for ChannelEvents { use ChannelEvents::*; match self { Events(x) => x.to_dim0_f32_for_binning(), - Status(x) => panic!("ChannelEvents::to_dim0_f32_for_binning"), + Status(_x) => panic!("ChannelEvents::to_dim0_f32_for_binning"), } } diff --git a/crates/items_2/src/eventsdim0.rs b/crates/items_2/src/eventsdim0.rs index c211eb5..fa106b0 100644 --- a/crates/items_2/src/eventsdim0.rs +++ b/crates/items_2/src/eventsdim0.rs @@ -1,6 +1,4 @@ use crate::binsdim0::BinsDim0; -use crate::framable::FrameType; -use crate::framable::FrameTypeStatic; use crate::timebin::ChooseIndicesForTimeBin; use crate::timebin::ChooseIndicesForTimeBinEvents; use crate::timebin::TimeAggregatorCommonV0Func; @@ -8,44 +6,35 @@ use crate::timebin::TimeAggregatorCommonV0Trait; use crate::timebin::TimeBinnerCommonV0Func; use crate::timebin::TimeBinnerCommonV0Trait; use crate::IsoDateTime; -use crate::RangeOverlapInfo; use crate::TimeBinnableType; use crate::TimeBinnableTypeAggregator; use err::Error; use items_0::collect_s::CollectableDyn; use items_0::collect_s::CollectedDyn; -use items_0::collect_s::CollectorDyn; use items_0::collect_s::CollectorTy; -use items_0::collect_s::ToJsonBytes; use items_0::collect_s::ToJsonResult; use items_0::container::ByteEstimate; -use items_0::framable::FrameTypeInnerStatic; use items_0::overlap::HasTimestampDeque; use items_0::scalar_ops::ScalarOps; -use items_0::test::f32_iter_cmp_near; use items_0::timebin::TimeBinnable; use items_0::timebin::TimeBinned; use items_0::timebin::TimeBinner; use items_0::AppendAllFrom; -use items_0::AppendEmptyBin; use items_0::Appendable; use items_0::AsAnyMut; use items_0::AsAnyRef; use items_0::Empty; use items_0::Events; use items_0::EventsNonObj; -use items_0::HasNonemptyFirstBin; use items_0::MergeError; use items_0::Resettable; use items_0::TypeName; use items_0::WithLen; use netpod::is_false; use netpod::log::*; -use netpod::range::evrange::NanoRange; use netpod::range::evrange::SeriesRange; use netpod::timeunits::MS; use netpod::timeunits::SEC; -use netpod::BinnedRange; use netpod::BinnedRangeEnum; use netpod::TsNano; use serde::Deserialize; @@ -244,8 +233,6 @@ impl HasTimestampDeque for EventsDim0 { } } -items_0::impl_range_overlap_info_events!(EventsDim0); - impl ChooseIndicesForTimeBin for EventsDim0 { fn choose_indices_unweight(&self, beg: u64, end: u64) -> (Option, usize, usize) { ChooseIndicesForTimeBinEvents::choose_unweight(beg, end, &self.tss) @@ -1431,30 +1418,6 @@ mod test_serde_opt { } } -#[test] -fn overlap_info_00() { - let mut ev1 = EventsDim0::empty(); - ev1.push(MS * 1200, 3, 1.2f32); - ev1.push(MS * 3200, 3, 3.2f32); - let range = SeriesRange::TimeRange(NanoRange { - beg: MS * 1000, - end: MS * 2000, - }); - assert_eq!(ev1.ends_after(&range), true); -} - -#[test] -fn overlap_info_01() { - let mut ev1 = EventsDim0::empty(); - ev1.push(MS * 1200, 3, 1.2f32); - ev1.push(MS * 1400, 3, 3.2f32); - let range = SeriesRange::TimeRange(NanoRange { - beg: MS * 1000, - end: MS * 2000, - }); - assert_eq!(ev1.ends_after(&range), false); -} - #[test] fn binner_00() { let mut ev1 = EventsDim0::empty(); @@ -1553,7 +1516,7 @@ fn bin_binned_02() { #[test] fn events_timebin_ingest_continuous_00() { - let binrange = BinnedRangeEnum::Time(BinnedRange { + let binrange = BinnedRangeEnum::Time(netpod::BinnedRange { bin_len: TsNano::from_ns(SEC * 2), bin_off: 9, bin_cnt: 20, @@ -1573,5 +1536,10 @@ fn events_timebin_ingest_continuous_00() { let mut exp = BinsDim0::empty(); // exp.push(SEC * 18, SEC * 20, 0, 0, 0, 0., None); exp.push(SEC * 20, SEC * 22, 1, 20, 20, 20., 20); - assert!(f32_iter_cmp_near(got.avgs.clone(), exp.avgs.clone(), 0.0001, 0.0001)); + assert!(items_0::test::f32_iter_cmp_near( + got.avgs.clone(), + exp.avgs.clone(), + 0.0001, + 0.0001 + )); } diff --git a/crates/items_2/src/eventsdim0enum.rs b/crates/items_2/src/eventsdim0enum.rs index 45153b6..7e748ad 100644 --- a/crates/items_2/src/eventsdim0enum.rs +++ b/crates/items_2/src/eventsdim0enum.rs @@ -7,7 +7,6 @@ use items_0::collect_s::ToJsonBytes; use items_0::collect_s::ToJsonResult; use items_0::container::ByteEstimate; use items_0::isodate::IsoDateTime; -use items_0::overlap::RangeOverlapInfo; use items_0::scalar_ops::ScalarOps; use items_0::timebin::TimeBinnable; use items_0::timebin::TimeBinnableTy; @@ -284,20 +283,6 @@ impl EventsNonObj for EventsDim0Enum { } } -impl RangeOverlapInfo for EventsDim0Enum { - fn ends_before(&self, range: &SeriesRange) -> bool { - todo!() - } - - fn ends_after(&self, range: &SeriesRange) -> bool { - todo!() - } - - fn starts_after(&self, range: &SeriesRange) -> bool { - todo!() - } -} - // NOTE just a dummy because currently we don't use this for time binning #[derive(Debug)] pub struct EventsDim0EnumTimeBinner; diff --git a/crates/items_2/src/eventsdim1.rs b/crates/items_2/src/eventsdim1.rs index c488263..ba38285 100644 --- a/crates/items_2/src/eventsdim1.rs +++ b/crates/items_2/src/eventsdim1.rs @@ -1,7 +1,6 @@ use crate::binsdim0::BinsDim0; use crate::eventsxbindim0::EventsXbinDim0; use crate::IsoDateTime; -use crate::RangeOverlapInfo; use crate::TimeBinnableType; use crate::TimeBinnableTypeAggregator; use err::Error; @@ -187,8 +186,6 @@ impl HasTimestampDeque for EventsDim1 { } } -items_0::impl_range_overlap_info_events!(EventsDim1); - impl TimeBinnableType for EventsDim1 where STY: ScalarOps, @@ -196,7 +193,7 @@ where type Output = BinsDim0; type Aggregator = EventsDim1Aggregator; - fn aggregator(range: SeriesRange, x_bin_count: usize, do_time_weight: bool) -> Self::Aggregator { + fn aggregator(_range: SeriesRange, _x_bin_count: usize, _do_time_weight: bool) -> Self::Aggregator { panic!("TODO remove") } } diff --git a/crates/items_2/src/eventsxbindim0.rs b/crates/items_2/src/eventsxbindim0.rs index d13c7c6..2e002a4 100644 --- a/crates/items_2/src/eventsxbindim0.rs +++ b/crates/items_2/src/eventsxbindim0.rs @@ -1,6 +1,5 @@ use crate::binsxbindim0::BinsXbinDim0; use crate::IsoDateTime; -use crate::RangeOverlapInfo; use crate::TimeBinnableType; use crate::TimeBinnableTypeAggregator; use err::Error; @@ -8,7 +7,6 @@ use items_0::collect_s::CollectableDyn; use items_0::collect_s::CollectableType; use items_0::collect_s::CollectedDyn; use items_0::collect_s::CollectorTy; -use items_0::collect_s::ToJsonBytes; use items_0::collect_s::ToJsonResult; use items_0::container::ByteEstimate; use items_0::overlap::HasTimestampDeque; @@ -187,8 +185,6 @@ impl HasTimestampDeque for EventsXbinDim0 { } } -items_0::impl_range_overlap_info_events!(EventsXbinDim0); - impl EventsNonObj for EventsXbinDim0 { fn into_tss_pulses(self: Box) -> (VecDeque, VecDeque) { info!( @@ -455,70 +451,7 @@ impl TimeBinner for EventsXbinDim0TimeBinner { } fn ingest(&mut self, item: &mut dyn TimeBinnable) { - trace2!( - "TimeBinner for {} ingest agg.range {:?} item {:?}", - Self::type_name(), - self.agg.range(), - item - ); - if item.len() == 0 { - // Return already here, RangeOverlapInfo would not give much sense. - return; - } - // TODO optimize by remembering at which event array index we have arrived. - // That needs modified interfaces which can take and yield the start and latest index. - loop { - while item.starts_after(self.agg.range()) { - trace!( - "{} IGNORE ITEM AND CYCLE BECAUSE item.starts_after", - Self::type_name() - ); - self.cycle(); - if self.rng.is_none() { - warn!("{} no more bin in edges B", Self::type_name()); - return; - } - } - if item.ends_before(self.agg.range()) { - trace!( - "{} IGNORE ITEM BECAUSE ends_before {:?} {:?}", - Self::type_name(), - self.agg.range(), - item - ); - return; - } else { - if self.rng.is_none() { - trace!("{} no more bin in edges D", Self::type_name()); - return; - } else { - if let Some(item) = item - .as_any_ref() - // TODO make statically sure that we attempt to cast to the correct type here: - .downcast_ref::< as TimeBinnableTypeAggregator>::Input>() - { - // TODO collect statistics associated with this request: - trace_ingest!("{} FEED THE ITEM...", Self::type_name()); - self.agg.ingest(item); - if item.ends_after(self.agg.range()) { - trace_ingest!("{} FED ITEM, ENDS AFTER.", Self::type_name()); - self.cycle(); - if self.rng.is_none() { - warn!("{} no more bin in edges C", Self::type_name()); - return; - } else { - trace_ingest!("{} FED ITEM, CYCLED, CONTINUE.", Self::type_name()); - } - } else { - trace_ingest!("{} FED ITEM.", Self::type_name()); - break; - } - } else { - error!("{}::ingest unexpected item type", Self::type_name()); - }; - } - } - } + panic!("TODO remove TimeBinner for EventsXbinDim0TimeBinner ingest") } fn push_in_progress(&mut self, push_empty: bool) { diff --git a/crates/items_2/src/items_2.rs b/crates/items_2/src/items_2.rs index 5978d32..2398659 100644 --- a/crates/items_2/src/items_2.rs +++ b/crates/items_2/src/items_2.rs @@ -24,7 +24,6 @@ pub mod transform; use channelevents::ChannelEvents; use futures_util::Stream; use items_0::isodate::IsoDateTime; -use items_0::overlap::RangeOverlapInfo; use items_0::streamitem::Sitemty; use items_0::transform::EventTransform; use items_0::Empty; @@ -175,7 +174,7 @@ impl Mergeable for Box { } // TODO rename to `Typed` -pub trait TimeBinnableType: Send + Unpin + RangeOverlapInfo + Empty { +pub trait TimeBinnableType: Send + Unpin + Empty { type Output: TimeBinnableType; type Aggregator: TimeBinnableTypeAggregator + Send + Unpin; fn aggregator(range: SeriesRange, bin_count: usize, do_time_weight: bool) -> Self::Aggregator; diff --git a/crates/items_2/src/timebin.rs b/crates/items_2/src/timebin.rs index 5a6b961..c9b3cab 100644 --- a/crates/items_2/src/timebin.rs +++ b/crates/items_2/src/timebin.rs @@ -1,4 +1,3 @@ -use items_0::overlap::RangeOverlapInfo; use items_0::timebin::TimeBinnable; use items_0::AppendEmptyBin; use items_0::Empty; @@ -23,7 +22,7 @@ macro_rules! trace_ingest_event { ($($arg:tt)*) => ( if false { trace!($($arg)*) macro_rules! trace_ingest_detail { ($($arg:tt)*) => ( if true { trace!($($arg)*); }) } pub trait TimeBinnerCommonV0Trait { - type Input: RangeOverlapInfo + 'static; + type Input: 'static; type Output: WithLen + Empty + AppendEmptyBin + HasNonemptyFirstBin + 'static; fn type_name() -> &'static str; fn common_bins_ready_count(&self) -> usize; @@ -45,89 +44,7 @@ impl TimeBinnerCommonV0Func { where B: TimeBinnerCommonV0Trait, { - let self_name = B::type_name(); - trace_ingest_item!( - "TimeBinner for {} ingest common_range_current {:?} item {:?}", - self_name, - binner.common_range_current(), - item - ); - if item.len() == 0 { - // Return already here, RangeOverlapInfo would not give much sense. - return; - } - // TODO optimize by remembering at which event array index we have arrived. - // That needs modified interfaces which can take and yield the start and latest index. - // Or consume the input data. - if B::common_has_lst(binner) == false { - if let Some(item) = item - .as_any_mut() - // TODO make statically sure that we attempt to cast to the correct type here: - .downcast_mut::() - { - B::common_feed_lst(binner, item); - } else { - error!( - "{self_name}::ingest unexpected item type {} expected {}", - item.type_name(), - any::type_name::() - ); - return; - } - } - loop { - while item.starts_after(B::common_range_current(binner)) { - trace_ingest_item!("{self_name} ignore item and cycle starts_after"); - TimeBinnerCommonV0Func::cycle(binner); - if !B::common_has_more_range(binner) { - debug!("{self_name} no more bin in edges after starts_after"); - return; - } - } - if item.ends_before(B::common_range_current(binner)) { - trace_ingest_item!("{self_name} ignore item ends_before"); - return; - } - { - if !B::common_has_more_range(binner) { - trace_ingest_item!("{self_name} no more bin in edges"); - return; - } else { - if let Some(item) = item - .as_any_mut() - // TODO make statically sure that we attempt to cast to the correct type here: - .downcast_mut::() - { - // TODO collect statistics associated with this request: - trace_ingest_item!("{self_name} FEED THE ITEM..."); - TimeBinnerCommonV0Func::agg_ingest(binner, item); - if item.ends_after(B::common_range_current(binner)) { - trace_ingest_item!( - "{self_name} FED ITEM, ENDS AFTER agg-range {:?}", - B::common_range_current(binner) - ); - TimeBinnerCommonV0Func::cycle(binner); - if !B::common_has_more_range(binner) { - warn!("{self_name} no more bin in edges after ingest and cycle"); - return; - } else { - trace_ingest_item!("{self_name} item fed, cycled, continue"); - } - } else { - trace_ingest_item!("{self_name} item fed, break"); - break; - } - } else { - error!( - "{self_name}::ingest unexpected item type {} expected {}", - item.type_name(), - any::type_name::() - ); - return; - }; - } - } - } + panic!("TimeBinnerCommonV0Func::ingest") } fn agg_ingest(binner: &mut B, item: &mut ::Input) @@ -243,7 +160,7 @@ impl ChooseIndicesForTimeBinEvents { } pub trait TimeAggregatorCommonV0Trait { - type Input: WithLen + RangeOverlapInfo + ChooseIndicesForTimeBin + 'static; + type Input: WithLen + ChooseIndicesForTimeBin + 'static; type Output: WithLen + Empty + AppendEmptyBin + HasNonemptyFirstBin + 'static; fn type_name() -> &'static str; fn common_range_current(&self) -> &SeriesRange; diff --git a/crates/streams/src/timebinnedjson.rs b/crates/streams/src/timebinnedjson.rs index 4f4c919..72f008e 100644 --- a/crates/streams/src/timebinnedjson.rs +++ b/crates/streams/src/timebinnedjson.rs @@ -8,13 +8,9 @@ use crate::tcprawclient::make_sub_query; use crate::tcprawclient::OpenBoxedBytesStreamsBox; use crate::timebin::cached::reader::EventsReadProvider; use crate::timebin::CacheReadProvider; -use crate::timebin::TimeBinnedStream; use crate::transform::build_merged_event_transform; -use crate::transform::EventsToTimeBinnable; use err::Error; use futures_util::future::BoxFuture; -use futures_util::Future; -use futures_util::FutureExt; use futures_util::Stream; use futures_util::StreamExt; use items_0::collect_s::CollectableDyn; @@ -22,15 +18,9 @@ use items_0::on_sitemty_data; use items_0::streamitem::RangeCompletableItem; use items_0::streamitem::Sitemty; use items_0::streamitem::StreamItem; -use items_0::timebin::TimeBinnable; -use items_0::timebin::TimeBinned; -use items_0::transform::TimeBinnableStreamBox; -use items_0::transform::TimeBinnableStreamTrait; use items_0::Events; use items_2::channelevents::ChannelEvents; use items_2::merger::Merger; -use items_2::streams::PlainEventStream; -use netpod::log::*; use netpod::log::*; use netpod::range::evrange::NanoRange; use netpod::BinnedRangeEnum; @@ -43,8 +33,6 @@ use query::transform::TransformQuery; use serde_json::Value as JsonValue; use std::pin::Pin; use std::sync::Arc; -use std::task::Context; -use std::task::Poll; use std::time::Duration; use std::time::Instant; @@ -53,7 +41,7 @@ fn assert_stream_send<'u, R>(stream: impl 'u + Send + Stream) -> impl stream } -pub async fn timebinnable_stream_sf_databuffer_box_events( +pub async fn timebinnable_stream_sf_databuffer_channelevents( range: NanoRange, one_before_range: bool, ch_conf: ChannelTypeConfigGen, @@ -62,7 +50,7 @@ pub async fn timebinnable_stream_sf_databuffer_box_events( log_level: String, ctx: Arc, open_bytes: OpenBoxedBytesStreamsBox, -) -> Result>>, Error> { +) -> Result>, Error> { let subq = make_sub_query( ch_conf, range.clone().into(), @@ -85,21 +73,26 @@ pub async fn timebinnable_stream_sf_databuffer_box_events( // TODO propagate also the max-buf-len for the first stage event reader. // TODO use a mixture of count and byte-size as threshold. let stream = Merger::new(inps, sub.merger_out_len_max()); - let stream = RangeFilter2::new(stream, range, one_before_range); - - let stream = stream.map(move |k| { - on_sitemty_data!(k, |k: Box| { - TODO; - let k: Box = Box::new(k); - let k = k.to_dim0_f32_for_binning(); - let k = tr.0.transform(k); - Ok(StreamItem::DataItem(RangeCompletableItem::Data(k))) - }) + let stream = stream.map(move |k: Sitemty| { + use ChannelEvents; + use RangeCompletableItem::*; + use StreamItem::*; + match k { + Ok(DataItem(Data(ChannelEvents::Events(k)))) => { + // let k = k; + // let k: Box = Box::new(k); + let k = k.to_dim0_f32_for_binning(); + let k = tr.0.transform(k); + Ok(StreamItem::DataItem(RangeCompletableItem::Data(ChannelEvents::Events( + k, + )))) + } + _ => k, + } }); - #[cfg(target_abi = "")] - #[cfg(wasm_transform)] + #[cfg(feature = "wasm_transform")] let stream = if let Some(wasmname) = wasm1 { debug!("make wasm transform"); use httpclient::url::Url; @@ -225,242 +218,61 @@ pub async fn timebinnable_stream_sf_databuffer_box_events( Ok(stream) } -async fn timebinnable_stream_sf_databuffer_binnable_box( - range: NanoRange, - one_before_range: bool, - ch_conf: ChannelTypeConfigGen, - transform_query: TransformQuery, - sub: EventsSubQuerySettings, - log_level: String, - ctx: Arc, - open_bytes: OpenBoxedBytesStreamsBox, -) -> Result { - let stream = timebinnable_stream_sf_databuffer_box_events( - range, - one_before_range, - ch_conf, - transform_query, - sub, - log_level, - ctx, - open_bytes, - ) - .await?; - let stream = PlainEventStream::new(stream); - // let stream = stream.map(|x| { - // on_sitemty_data!(x, |x| { - // let ret = x.binnable; - // ret - // }) - // }); - let stream = EventsToTimeBinnable::new(stream); - let stream = Box::pin(stream); - Ok(TimeBinnableStreamBox(stream)) -} - -pub async fn timebinnable_stream_sf_databuffer_channelevents( - range: NanoRange, - one_before_range: bool, - ch_conf: ChannelTypeConfigGen, - transform_query: TransformQuery, - sub: EventsSubQuerySettings, - log_level: String, - ctx: Arc, - open_bytes: OpenBoxedBytesStreamsBox, -) -> Result>, Error> { - let stream = timebinnable_stream_sf_databuffer_box_events( - range, - one_before_range, - ch_conf, - transform_query, - sub, - log_level, - ctx, - open_bytes, - ) - .await?; - // let stream = stream.map(|x| x); - let stream = stream.map(move |k| { - on_sitemty_data!(k, |k| { - // let k: Box = Box::new(k); - Ok(StreamItem::DataItem(RangeCompletableItem::Data(ChannelEvents::Events( - k, - )))) - }) - }); - Ok(stream) -} - -pub struct TimeBinnableStream { - make_stream_fut: Option> + Send>>>, - stream: Option>> + Send>>>, -} - -impl TimeBinnableStream { - pub fn new( - range: NanoRange, - one_before_range: bool, - ch_conf: ChannelTypeConfigGen, - transform_query: TransformQuery, - sub: EventsSubQuerySettings, - log_level: String, - // TODO take by Arc ref - ctx: Arc, - open_bytes: OpenBoxedBytesStreamsBox, - ) -> Self { - let fut = timebinnable_stream_sf_databuffer_binnable_box( - range, - one_before_range, - ch_conf, - transform_query, - sub, - log_level, - ctx, - open_bytes, - ); - let fut = Box::pin(fut); - Self { - make_stream_fut: Some(fut), - stream: None, - } - } -} - -// impl WithTransformProperties + Send - -impl Stream for TimeBinnableStream { - type Item = Sitemty>; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - use Poll::*; - loop { - break if let Some(fut) = self.make_stream_fut.as_mut() { - match fut.poll_unpin(cx) { - Ready(x) => match x { - Ok(x) => { - self.make_stream_fut = None; - self.stream = Some(Box::pin(x)); - continue; - } - Err(e) => Ready(Some(Err(e))), - }, - Pending => Pending, - } - } else if let Some(fut) = self.stream.as_mut() { - match fut.poll_next_unpin(cx) { - Ready(Some(x)) => Ready(Some(x)), - Ready(None) => { - self.stream = None; - Ready(None) - } - Pending => Pending, - } - } else { - Ready(None) - }; - } - } -} - async fn timebinned_stream( query: BinnedQuery, binned_range: BinnedRangeEnum, ch_conf: ChannelTypeConfigGen, ctx: &ReqCtx, - open_bytes: OpenBoxedBytesStreamsBox, cache_read_provider: Arc, events_read_provider: Arc, ) -> Result>> + Send>>, Error> { use netpod::query::CacheUsage; let cache_usage = query.cache_usage().unwrap_or(CacheUsage::Ignore); - match cache_usage.clone() { - CacheUsage::Use | CacheUsage::Recreate | CacheUsage::Ignore => { - debug!("BINNING NEW METHOD"); - debug!( - "timebinned_stream caching {:?} subgrids {:?}", - query, - query.subgrids() - ); - let do_time_weight = true; - let bin_len_layers = if let Some(subgrids) = query.subgrids() { - subgrids - .iter() - .map(|&x| DtMs::from_ms_u64(1000 * x.as_secs())) - .collect() - } else { - netpod::time_bin_len_cache_opts().to_vec() - }; - let stream = crate::timebin::TimeBinnedFromLayers::new( - ch_conf, - cache_usage, - query.transform().clone(), - EventsSubQuerySettings::from(&query), - query.log_level().into(), - Arc::new(ctx.clone()), - binned_range.binned_range_time(), - do_time_weight, - bin_len_layers, - cache_read_provider, - events_read_provider, - ) - .map_err(Error::from_string)?; - let stream = stream.map(|item| { - use items_0::timebin::BinningggContainerBinsDyn; - on_sitemty_data!(item, |mut x: Box| { - x.fix_numerics(); - let ret = Box::new(x) as Box; - Ok(StreamItem::DataItem(RangeCompletableItem::Data(ret))) - }) - }); - let stream = Box::pin(stream); - Ok(stream) - } - _ => { - debug!("BINNING OLD METHOD"); - let range = binned_range.binned_range_time().to_nano_range(); - let do_time_weight = true; - let one_before_range = true; - let stream = timebinnable_stream_sf_databuffer_binnable_box( - range, - one_before_range, - ch_conf, - query.transform().clone(), - (&query).into(), - query.log_level().into(), - Arc::new(ctx.clone()), - open_bytes, - ) - .await?; - let stream: Pin> = stream.0; - let stream = Box::pin(stream); - // TODO rename TimeBinnedStream to make it more clear that it is the component which initiates the time binning. - let stream = TimeBinnedStream::new(stream, binned_range, do_time_weight); - if false { - let stream = stream.map(|x| { - on_sitemty_data!(x, |x: Box| Ok(StreamItem::DataItem( - RangeCompletableItem::Data(x.to_container_bins()) - ))) - }); - todo!(); - } - let stream = stream.map(|x| { - on_sitemty_data!(x, |x| { - let ret = Box::new(x) as Box; - Ok(StreamItem::DataItem(RangeCompletableItem::Data(ret))) - }) - }); - // let stream: Pin>> + Send>> = Box::pin(stream); - let stream = Box::pin(stream); - Ok(stream) - } - } + debug!("BINNING NEW METHOD"); + debug!( + "timebinned_stream caching {:?} subgrids {:?}", + query, + query.subgrids() + ); + let do_time_weight = true; + let bin_len_layers = if let Some(subgrids) = query.subgrids() { + subgrids + .iter() + .map(|&x| DtMs::from_ms_u64(1000 * x.as_secs())) + .collect() + } else { + netpod::time_bin_len_cache_opts().to_vec() + }; + let stream = crate::timebin::TimeBinnedFromLayers::new( + ch_conf, + cache_usage, + query.transform().clone(), + EventsSubQuerySettings::from(&query), + query.log_level().into(), + Arc::new(ctx.clone()), + binned_range.binned_range_time(), + do_time_weight, + bin_len_layers, + cache_read_provider, + events_read_provider, + ) + .map_err(Error::from_string)?; + let stream = stream.map(|item| { + use items_0::timebin::BinningggContainerBinsDyn; + on_sitemty_data!(item, |mut x: Box| { + x.fix_numerics(); + let ret = Box::new(x) as Box; + Ok(StreamItem::DataItem(RangeCompletableItem::Data(ret))) + }) + }); + let stream = Box::pin(stream); + Ok(stream) } pub async fn timebinned_json( query: BinnedQuery, ch_conf: ChannelTypeConfigGen, ctx: &ReqCtx, - open_bytes: OpenBoxedBytesStreamsBox, cache_read_provider: Arc, events_read_provider: Arc, ) -> Result, Error> { @@ -479,18 +291,16 @@ pub async fn timebinned_json( binned_range.clone(), ch_conf, ctx, - open_bytes, cache_read_provider, events_read_provider, ) .await?; - // let stream = timebinned_to_collectable(stream); let collected = Collect::new(stream, deadline, collect_max, bytes_max, None, Some(binned_range)); let collected: BoxFuture<_> = Box::pin(collected); let collres = collected.await?; match collres { CollectResult::Some(collres) => { - let collres = if let Some(bins) = collres + let collres = if let Some(_bins) = collres .as_any_ref() .downcast_ref::>() { @@ -511,7 +321,7 @@ pub async fn timebinned_json( fn take_collector_result(coll: &mut Box) -> Option { match coll.result(None, None) { Ok(collres) => { - let collres = if let Some(bins) = collres + let collres = if let Some(_bins) = collres .as_any_ref() .downcast_ref::>() { @@ -534,7 +344,6 @@ pub async fn timebinned_json_framed( query: BinnedQuery, ch_conf: ChannelTypeConfigGen, ctx: &ReqCtx, - open_bytes: OpenBoxedBytesStreamsBox, cache_read_provider: Arc, events_read_provider: Arc, ) -> Result { @@ -546,7 +355,6 @@ pub async fn timebinned_json_framed( binned_range.clone(), ch_conf, ctx, - open_bytes, cache_read_provider, events_read_provider, )