From 6429ef5631456ddee4270a1f58a0a433add565db Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Wed, 23 Oct 2024 19:06:36 +0200 Subject: [PATCH] WIP typechecks --- crates/httpret/src/api4/binned.rs | 55 ++++++-- crates/items_0/src/timebin.rs | 15 ++- crates/items_2/src/binning/aggregator.rs | 59 ++++++++- crates/items_2/src/binning/container_bins.rs | 87 +++++++++++++ .../items_2/src/binning/container_events.rs | 69 ++++++---- .../binning/timeweight/timeweight_events.rs | 53 ++++---- .../timeweight/timeweight_events_dyn.rs | 113 +++++++++++----- crates/items_2/src/binning/valuetype.rs | 6 - crates/items_2/src/eventsdim0.rs | 47 +++++-- crates/items_2/src/items_2.rs | 1 - crates/items_2/src/vecpreview.rs | 1 - crates/nodenet/src/scylla.rs | 19 ++- crates/query/src/api4/events.rs | 4 + crates/streams/src/eventsplainreader.rs | 121 ++++++++++++++++++ crates/streams/src/lib.rs | 1 + crates/streams/src/timebin/cached/reader.rs | 16 ++- crates/streams/src/timebin/fromevents.rs | 3 +- crates/streams/src/timebin/fromlayers.rs | 45 ++----- crates/streams/src/timebin/gapfill.rs | 25 +--- crates/streams/src/timebinnedjson.rs | 95 ++++++++++---- 20 files changed, 622 insertions(+), 213 deletions(-) delete mode 100644 crates/items_2/src/vecpreview.rs create mode 100644 crates/streams/src/eventsplainreader.rs diff --git a/crates/httpret/src/api4/binned.rs b/crates/httpret/src/api4/binned.rs index a787245..6b3c164 100644 --- a/crates/httpret/src/api4/binned.rs +++ b/crates/httpret/src/api4/binned.rs @@ -34,8 +34,11 @@ use nodenet::scylla::ScyllaEventReadProvider; use query::api4::binned::BinnedQuery; use scyllaconn::bincache::ScyllaCacheReadProvider; use scyllaconn::worker::ScyllaQueue; +use std::pin::Pin; use std::sync::Arc; use streams::collect::CollectResult; +use streams::eventsplainreader::DummyCacheReadProvider; +use streams::eventsplainreader::SfDatabufferEventReadProvider; use streams::timebin::cached::reader::EventsReadProvider; use streams::timebin::CacheReadProvider; use tracing::Instrument; @@ -136,6 +139,40 @@ async fn binned( } } +fn make_read_provider( + scyqueue: Option, + open_bytes: Pin>, + ctx: &ReqCtx, + ncc: &NodeConfigCached, +) -> (Arc, Arc) { + let events_read_provider = if ncc.node_config.cluster.scylla_lt().is_some() { + scyqueue + .clone() + .map(|qu| ScyllaEventReadProvider::new(qu)) + .map(|x| Arc::new(x) as Arc) + .expect("expect scylla queue") + } else if ncc.node.sf_databuffer.is_some() { + // TODO do not clone the request. Pass an Arc up to here. + let x = SfDatabufferEventReadProvider::new(Arc::new(ctx.clone()), open_bytes); + Arc::new(x) + } else { + panic!() + }; + let cache_read_provider = if ncc.node_config.cluster.scylla_lt().is_some() { + scyqueue + .clone() + .map(|qu| ScyllaCacheReadProvider::new(qu)) + .map(|x| Arc::new(x) as Arc) + .expect("expect scylla queue") + } else if ncc.node.sf_databuffer.is_some() { + let x = DummyCacheReadProvider::new(); + Arc::new(x) + } else { + panic!() + }; + (events_read_provider, cache_read_provider) +} + async fn binned_json_single( url: Url, req: Requ, @@ -169,13 +206,8 @@ async fn binned_json_single( }); let open_bytes = OpenBoxedBytesViaHttp::new(ncc.node_config.cluster.clone()); let open_bytes = Arc::pin(open_bytes); - let cache_read_provider = scyqueue - .clone() - .map(|qu| ScyllaCacheReadProvider::new(qu)) - .map(|x| Arc::new(x) as Arc); - let events_read_provider = scyqueue - .map(|qu| ScyllaEventReadProvider::new(qu)) - .map(|x| Arc::new(x) as Arc); + let open_bytes2 = Arc::pin(OpenBoxedBytesViaHttp::new(ncc.node_config.cluster.clone())); + let (events_read_provider, cache_read_provider) = make_read_provider(scyqueue, open_bytes2, ctx, ncc); let item = streams::timebinnedjson::timebinned_json( query, ch_conf, @@ -238,13 +270,8 @@ async fn binned_json_framed( }); let open_bytes = OpenBoxedBytesViaHttp::new(ncc.node_config.cluster.clone()); let open_bytes = Arc::pin(open_bytes); - let cache_read_provider = scyqueue - .clone() - .map(|qu| ScyllaCacheReadProvider::new(qu)) - .map(|x| Arc::new(x) as Arc); - let events_read_provider = scyqueue - .map(|qu| ScyllaEventReadProvider::new(qu)) - .map(|x| Arc::new(x) as Arc); + let open_bytes2 = Arc::pin(OpenBoxedBytesViaHttp::new(ncc.node_config.cluster.clone())); + let (events_read_provider, cache_read_provider) = make_read_provider(scyqueue, open_bytes2, ctx, ncc); let stream = streams::timebinnedjson::timebinned_json_framed( query, ch_conf, diff --git a/crates/items_0/src/timebin.rs b/crates/items_0/src/timebin.rs index ef3a562..7e24674 100644 --- a/crates/items_0/src/timebin.rs +++ b/crates/items_0/src/timebin.rs @@ -74,12 +74,16 @@ pub trait TimeBinnableTy: fmt::Debug + WithLen + Send + Sized { // #[cstm(name = "Binninggg")] pub enum BinningggError { Dyn(Box), + TypeMismatch { have: String, expect: String }, } impl fmt::Display for BinningggError { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { match self { BinningggError::Dyn(e) => write!(fmt, "{e}"), + BinningggError::TypeMismatch { have, expect } => { + write!(fmt, "TypeMismatch(have: {have}, expect: {expect})") + } } } } @@ -94,10 +98,13 @@ where } pub trait BinningggContainerEventsDyn: fmt::Debug + Send { - fn binned_events_timeweight_traitobj(&self) -> Box; + fn type_name(&self) -> &'static str; + fn binned_events_timeweight_traitobj(&self, range: BinnedRange) -> Box; + fn to_anybox(&mut self) -> Box; } -pub trait BinningggContainerBinsDyn: fmt::Debug + Send + fmt::Display + WithLen { +pub trait BinningggContainerBinsDyn: fmt::Debug + Send + fmt::Display + WithLen + AsAnyMut { + fn type_name(&self) -> &'static str; fn empty(&self) -> BinsBoxed; fn clone(&self) -> BinsBoxed; fn edges_iter( @@ -133,10 +140,10 @@ pub trait BinningggBinnerDyn: fmt::Debug + Send { } pub trait BinnedEventsTimeweightTrait: fmt::Debug + Send { - fn ingest(&mut self, evs_all: Box) -> Result<(), BinningggError>; + fn ingest(&mut self, evs_all: EventsBoxed) -> Result<(), BinningggError>; fn input_done_range_final(&mut self) -> Result<(), BinningggError>; fn input_done_range_open(&mut self) -> Result<(), BinningggError>; - fn output(&mut self) -> Result, BinningggError>; + fn output(&mut self) -> Result, BinningggError>; } /// Data in time-binned form. diff --git a/crates/items_2/src/binning/aggregator.rs b/crates/items_2/src/binning/aggregator.rs index ec3279f..6f7640e 100644 --- a/crates/items_2/src/binning/aggregator.rs +++ b/crates/items_2/src/binning/aggregator.rs @@ -2,16 +2,28 @@ use super::container_events::EventValueType; use core::fmt; use netpod::log::*; use netpod::DtNano; +use netpod::EnumVariant; use serde::Deserialize; use serde::Serialize; +#[allow(unused)] +macro_rules! trace_event { ($($arg:tt)*) => ( if false { trace!($($arg)*); }) } + pub trait AggTimeWeightOutputAvg: fmt::Debug + Clone + Send + Serialize + for<'a> Deserialize<'a> {} +impl AggTimeWeightOutputAvg for u8 {} +impl AggTimeWeightOutputAvg for u16 {} +impl AggTimeWeightOutputAvg for u32 {} impl AggTimeWeightOutputAvg for u64 {} - +impl AggTimeWeightOutputAvg for i8 {} +impl AggTimeWeightOutputAvg for i16 {} +impl AggTimeWeightOutputAvg for i32 {} +impl AggTimeWeightOutputAvg for i64 {} impl AggTimeWeightOutputAvg for f32 {} - impl AggTimeWeightOutputAvg for f64 {} +impl AggTimeWeightOutputAvg for EnumVariant {} +impl AggTimeWeightOutputAvg for String {} +impl AggTimeWeightOutputAvg for bool {} pub trait AggregatorTimeWeight: fmt::Debug + Send where @@ -48,7 +60,7 @@ where fn ingest(&mut self, dt: DtNano, bl: DtNano, val: EVT) { let f = dt.ns() as f64 / bl.ns() as f64; - trace!("INGEST {} {:?}", f, val); + trace_event!("INGEST {} {:?}", f, val); self.sum += f * val.as_f64(); } @@ -71,7 +83,7 @@ impl AggregatorTimeWeight for AggregatorNumeric { fn ingest(&mut self, dt: DtNano, bl: DtNano, val: f32) { let f = dt.ns() as f64 / bl.ns() as f64; - trace!("INGEST {} {}", f, val); + trace_event!("INGEST {} {}", f, val); self.sum += f * val as f64; } @@ -87,6 +99,45 @@ impl AggregatorTimeWeight for AggregatorNumeric { } } +macro_rules! impl_agg_tw_for_agg_num { + ($evt:ty) => { + impl AggregatorTimeWeight<$evt> for AggregatorNumeric { + fn new() -> Self { + Self { sum: 0. } + } + + fn ingest(&mut self, dt: DtNano, bl: DtNano, val: $evt) { + let f = dt.ns() as f64 / bl.ns() as f64; + trace!("INGEST {} {}", f, val); + self.sum += f * val as f64; + } + + fn reset_for_new_bin(&mut self) { + self.sum = 0.; + } + + fn result_and_reset_for_new_bin(&mut self, filled_width_fraction: f32) -> f64 { + let sum = self.sum.clone(); + trace!( + "result_and_reset_for_new_bin sum {} {}", + sum, + filled_width_fraction + ); + self.sum = 0.; + sum / filled_width_fraction as f64 + } + } + }; +} + +impl_agg_tw_for_agg_num!(u8); +impl_agg_tw_for_agg_num!(u16); +impl_agg_tw_for_agg_num!(u32); +impl_agg_tw_for_agg_num!(i8); +impl_agg_tw_for_agg_num!(i16); +impl_agg_tw_for_agg_num!(i32); +impl_agg_tw_for_agg_num!(i64); + impl AggregatorTimeWeight for AggregatorNumeric { fn new() -> Self { Self { sum: 0. } diff --git a/crates/items_2/src/binning/container_bins.rs b/crates/items_2/src/binning/container_bins.rs index f1b2e0b..2629210 100644 --- a/crates/items_2/src/binning/container_bins.rs +++ b/crates/items_2/src/binning/container_bins.rs @@ -5,7 +5,11 @@ use super::___; use core::fmt; use err::thiserror; use err::ThisError; +use items_0::timebin::BinningggContainerBinsDyn; +use items_0::timebin::BinsBoxed; use items_0::vecpreview::VecPreview; +use items_0::AsAnyMut; +use items_0::WithLen; use netpod::TsNano; use serde::Deserialize; use serde::Serialize; @@ -245,6 +249,7 @@ where } pub fn pop_front(&mut self) -> Option> { + todo!("pop_front"); let ts1 = if let Some(x) = self.ts1s.pop_front() { x } else { @@ -307,6 +312,88 @@ where } } +impl fmt::Display for ContainerBins +where + EVT: EventValueType, +{ + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt::Debug::fmt(self, fmt) + } +} + +impl AsAnyMut for ContainerBins +where + EVT: EventValueType, +{ + fn as_any_mut(&mut self) -> &mut dyn any::Any { + self + } +} + +impl WithLen for ContainerBins +where + EVT: EventValueType, +{ + fn len(&self) -> usize { + Self::len(self) + } +} + +impl BinningggContainerBinsDyn for ContainerBins +where + EVT: EventValueType, +{ + fn type_name(&self) -> &'static str { + any::type_name::() + } + + fn empty(&self) -> BinsBoxed { + Box::new(Self::new()) + } + + fn clone(&self) -> BinsBoxed { + Box::new(::clone(self)) + } + + fn edges_iter( + &self, + ) -> std::iter::Zip, std::collections::vec_deque::Iter> { + self.ts1s.iter().zip(self.ts2s.iter()) + } + + fn drain_into(&mut self, dst: &mut dyn BinningggContainerBinsDyn, range: std::ops::Range) { + let obj = dst.as_any_mut(); + if let Some(dst) = obj.downcast_mut::() { + dst.ts1s.extend(self.ts1s.drain(range.clone())); + } else { + let styn = any::type_name::(); + panic!("unexpected drain EVT {} dst {}", styn, dst.type_name()); + } + } + + fn to_old_time_binned(&self) -> Box { + let a = self as &dyn any::Any; + if let Some(src) = a.downcast_ref::>() { + use items_0::Empty; + let mut ret = crate::binsdim0::BinsDim0::::empty(); + for ((((((&ts1, &ts2), &cnt), min), max), avg), fnl) in src.zip_iter() { + ret.push(ts1.ns(), ts2.ns(), cnt, *min, *max, *avg as f32, 0.); + } + Box::new(ret) + } else if let Some(src) = a.downcast_ref::>() { + use items_0::Empty; + let mut ret = crate::binsdim0::BinsDim0::::empty(); + for ((((((&ts1, &ts2), &cnt), min), max), avg), fnl) in src.zip_iter() { + ret.push(ts1.ns(), ts2.ns(), cnt, *min, *max, *avg as f32, 0.); + } + Box::new(ret) + } else { + let styn = any::type_name::(); + todo!("TODO impl for {styn}") + } + } +} + pub struct ContainerBinsTakeUpTo<'a, EVT> where EVT: EventValueType, diff --git a/crates/items_2/src/binning/container_events.rs b/crates/items_2/src/binning/container_events.rs index 9763359..ac44f85 100644 --- a/crates/items_2/src/binning/container_events.rs +++ b/crates/items_2/src/binning/container_events.rs @@ -1,6 +1,7 @@ use super::aggregator::AggTimeWeightOutputAvg; use super::aggregator::AggregatorNumeric; use super::aggregator::AggregatorTimeWeight; +use super::timeweight::timeweight_events_dyn::BinnedEventsTimeweightDynbox; use super::___; use core::fmt; use err::thiserror; @@ -8,6 +9,7 @@ use err::ThisError; use items_0::timebin::BinningggContainerEventsDyn; use items_0::vecpreview::PreviewRange; use items_0::vecpreview::VecPreview; +use netpod::BinnedRange; use netpod::TsNano; use serde::Deserialize; use serde::Serialize; @@ -28,13 +30,13 @@ pub trait Container: fmt::Debug + Send + Clone + PreviewRange + Serialize + fn pop_front(&mut self) -> Option; } -pub trait EventValueType: fmt::Debug + Clone + PartialOrd + Send { +pub trait EventValueType: fmt::Debug + Clone + PartialOrd + Send + 'static { type Container: Container; type AggregatorTimeWeight: AggregatorTimeWeight; type AggTimeWeightOutputAvg: AggTimeWeightOutputAvg; fn identity_sum() -> Self; - fn add_weighted(&self, add: &Self, f: f32) -> Self; + // fn add_weighted(&self, add: &Self, f: f32) -> Self; } impl Container for VecDeque @@ -54,6 +56,31 @@ where } } +macro_rules! impl_event_value_type { + ($evt:ty, $zero:expr) => { + impl EventValueType for $evt { + type Container = VecDeque; + type AggregatorTimeWeight = AggregatorNumeric; + type AggTimeWeightOutputAvg = f64; + + fn identity_sum() -> Self { + $zero + } + } + }; +} + +impl_event_value_type!(u8, 0); +impl_event_value_type!(u16, 0); +impl_event_value_type!(u32, 0); +impl_event_value_type!(u64, 0); +impl_event_value_type!(i8, 0); +impl_event_value_type!(i16, 0); +impl_event_value_type!(i32, 0); +impl_event_value_type!(i64, 0); +// impl_event_value_type!(f32, 0.); +// impl_event_value_type!(f64, 0.); + impl EventValueType for f32 { type Container = VecDeque; type AggregatorTimeWeight = AggregatorNumeric; @@ -62,10 +89,6 @@ impl EventValueType for f32 { fn identity_sum() -> Self { 0. } - - fn add_weighted(&self, add: &Self, f: f32) -> Self { - todo!() - } } impl EventValueType for f64 { @@ -76,24 +99,6 @@ impl EventValueType for f64 { fn identity_sum() -> Self { 0. } - - fn add_weighted(&self, add: &Self, f: f32) -> Self { - todo!() - } -} - -impl EventValueType for u64 { - type Container = VecDeque; - type AggregatorTimeWeight = AggregatorNumeric; - type AggTimeWeightOutputAvg = f64; - - fn identity_sum() -> Self { - 0 - } - - fn add_weighted(&self, add: &Self, f: f32) -> Self { - todo!() - } } #[derive(Debug, Clone)] @@ -243,7 +248,19 @@ impl BinningggContainerEventsDyn for ContainerEvents where EVT: EventValueType, { - fn binned_events_timeweight_traitobj(&self) -> Box { - todo!() + fn type_name(&self) -> &'static str { + std::any::type_name::() + } + + fn binned_events_timeweight_traitobj( + &self, + range: BinnedRange, + ) -> Box { + BinnedEventsTimeweightDynbox::::new(range) + } + + fn to_anybox(&mut self) -> Box { + let ret = core::mem::replace(self, Self::new()); + Box::new(ret) } } diff --git a/crates/items_2/src/binning/timeweight/timeweight_events.rs b/crates/items_2/src/binning/timeweight/timeweight_events.rs index b0601b6..4f2d757 100644 --- a/crates/items_2/src/binning/timeweight/timeweight_events.rs +++ b/crates/items_2/src/binning/timeweight/timeweight_events.rs @@ -5,24 +5,14 @@ use crate::binning::container_bins::ContainerBins; use crate::binning::container_events::ContainerEvents; use crate::binning::container_events::ContainerEventsTakeUpTo; use crate::binning::container_events::EventSingle; -use crate::channelevents::ChannelEvents; use core::fmt; use err::thiserror; use err::ThisError; -use futures_util::Stream; -use items_0::streamitem::Sitemty; -use items_0::timebin::BinningggBinnerDyn; -use items_0::timebin::BinningggBinnerTy; use netpod::log::*; use netpod::BinnedRange; use netpod::DtNano; use netpod::TsNano; -use std::collections::VecDeque; -use std::marker::PhantomData; use std::mem; -use std::pin::Pin; -use std::task::Context; -use std::task::Poll; #[allow(unused)] macro_rules! trace_ { ($($arg:tt)*) => ( if true { trace!($($arg)*); }) } @@ -34,7 +24,7 @@ macro_rules! trace_init { ($($arg:tt)*) => ( if true { trace_!($($arg)*); }) } macro_rules! trace_cycle { ($($arg:tt)*) => ( if true { trace_!($($arg)*); }) } #[allow(unused)] -macro_rules! trace_event_next { ($($arg:tt)*) => ( if true { trace_!($($arg)*); }) } +macro_rules! trace_event_next { ($($arg:tt)*) => ( if false { trace_!($($arg)*); }) } #[allow(unused)] macro_rules! trace_ingest_init_lst { ($($arg:tt)*) => ( if true { trace_!($($arg)*); }) } @@ -43,7 +33,7 @@ macro_rules! trace_ingest_init_lst { ($($arg:tt)*) => ( if true { trace_!($($arg macro_rules! trace_ingest_minmax { ($($arg:tt)*) => ( if true { trace_!($($arg)*); }) } #[allow(unused)] -macro_rules! trace_ingest_event { ($($arg:tt)*) => ( if true { trace_!($($arg)*); }) } +macro_rules! trace_ingest_event { ($($arg:tt)*) => ( if false { trace_!($($arg)*); }) } #[allow(unused)] macro_rules! trace_ingest_firsts { ($($arg:tt)*) => ( if true { trace_!($($arg)*); }) } @@ -103,7 +93,8 @@ where { // NOTE that this is also used during bin-cycle. fn ingest_event_with_lst_gt_range_beg_agg(&mut self, ev: EventSingle, lst: LstRef) { - trace_ingest_event!("ingest_event_with_lst_gt_range_beg_agg {:?}", ev); + let selfname = "ingest_event_with_lst_gt_range_beg_agg"; + trace_ingest_event!("{selfname} {:?}", ev); if DEBUG_CHECKS { if ev.ts <= self.active_beg { panic!("should never get here"); @@ -113,7 +104,7 @@ where } } let dt = ev.ts.delta(self.filled_until); - trace_ingest_event!("ingest_event_with_lst_gt_range_beg_agg dt {:?} ev {:?}", dt, ev); + trace_ingest_event!("{selfname} dt {:?} ev {:?}", dt, ev); // TODO can the caller already take the value and replace it afterwards with the current value? // This fn could swap the value in lst and directly use it. // This would require that any call path does not mess with lst. @@ -153,7 +144,8 @@ where lst: LstMut, minmax: &mut MinMax, ) -> Result<(), Error> { - trace_ingest_event!("ingest_event_with_lst_eq_range_beg"); + let selfname = "ingest_event_with_lst_eq_range_beg"; + trace_ingest_event!("{selfname}"); // TODO if the event is exactly on the current bin first edge, then there is no contribution to the avg yet // and I must initialize the min/max with the current event. InnerA::apply_min_max(&ev, minmax); @@ -167,9 +159,10 @@ where lst: LstMut, minmax: &mut MinMax, ) -> Result<(), Error> { - trace_ingest_event!("ingest_with_lst_gt_range_beg"); + let selfname = "ingest_with_lst_gt_range_beg"; + trace_ingest_event!("{selfname}"); while let Some(ev) = evs.pop_front() { - trace_event_next!("EVENT POP FRONT {:?} {:30}", ev, "ingest_with_lst_gt_range_beg"); + trace_event_next!("EVENT POP FRONT {:?} {:30}", ev, selfname); if ev.ts <= self.active_beg { panic!("should never get here"); } @@ -188,9 +181,10 @@ where lst: LstMut, minmax: &mut MinMax, ) -> Result<(), Error> { - trace_ingest_event!("ingest_with_lst_ge_range_beg"); + let selfname = "ingest_with_lst_ge_range_beg"; + trace_ingest_event!("{selfname}"); while let Some(ev) = evs.pop_front() { - trace_event_next!("EVENT POP FRONT {:?} {:30}", ev, "ingest_with_lst_ge_range_beg"); + trace_event_next!("EVENT POP FRONT {:?} {:30}", ev, selfname); if ev.ts < self.active_beg { panic!("should never get here"); } @@ -203,7 +197,7 @@ where } else { self.ingest_event_with_lst_gt_range_beg(ev.clone(), LstMut(lst.0), minmax)?; self.cnt += 1; - trace_ingest_firsts!("ingest_with_lst_ge_range_beg now calling ingest_with_lst_gt_range_beg"); + trace_ingest_firsts!("{selfname} now calling ingest_with_lst_gt_range_beg"); return self.ingest_with_lst_gt_range_beg(evs, LstMut(lst.0), minmax); } } @@ -216,11 +210,12 @@ where lst: LstMut, minmax: &mut MinMax, ) -> Result<(), Error> { - trace_ingest_event!("ingest_with_lst_minmax"); + let selfname = "ingest_with_lst_minmax"; + trace_ingest_event!("{selfname}"); // TODO how to handle the min max? I don't take event data yet out of the container. if let Some(ts0) = evs.ts_first() { - trace_ingest_event!("EVENT POP FRONT ingest_with_lst_minmax"); - trace_ingest_event!("EVENT TIMESTAMP FRONT {:?} ingest_with_lst_minmax", ts0); + trace_ingest_event!("EVENT POP FRONT {selfname}"); + trace_ingest_event!("EVENT TIMESTAMP FRONT {:?} {selfname}", ts0); if ts0 < self.active_beg { panic!("should never get here"); } else { @@ -435,9 +430,10 @@ where } fn ingest_event_without_lst(&mut self, ev: EventSingle) -> Result<(), Error> { + let selfname = "ingest_event_without_lst"; let b = &self.inner_a.inner_b; if ev.ts >= b.active_end { - panic!("should never get here"); + panic!("{selfname} should never get here"); } else { trace_ingest_init_lst!("ingest_event_without_lst set lst {:?}", ev); self.lst = Some(ev.clone()); @@ -453,10 +449,11 @@ where } fn ingest_without_lst(&mut self, mut evs: ContainerEventsTakeUpTo) -> Result<(), Error> { + let selfname = "ingest_without_lst"; if let Some(ev) = evs.pop_front() { - trace_event_next!("EVENT POP FRONT {:?} {:30}", ev, "ingest_without_lst"); + trace_event_next!("EVENT POP FRONT {:?} {:30}", ev, selfname); if ev.ts >= self.inner_a.inner_b.active_end { - panic!("should never get here"); + panic!("{selfname} should never get here"); } else { self.ingest_event_without_lst(ev)?; if let Some(lst) = self.lst.as_mut() { @@ -633,6 +630,10 @@ where Ok(()) } + pub fn output_len(&self) -> usize { + self.out.len() + } + pub fn output(&mut self) -> ContainerBins { mem::replace(&mut self.out, ContainerBins::new()) } diff --git a/crates/items_2/src/binning/timeweight/timeweight_events_dyn.rs b/crates/items_2/src/binning/timeweight/timeweight_events_dyn.rs index 508930a..4be8de5 100644 --- a/crates/items_2/src/binning/timeweight/timeweight_events_dyn.rs +++ b/crates/items_2/src/binning/timeweight/timeweight_events_dyn.rs @@ -1,11 +1,13 @@ use super::timeweight_events::BinnedEventsTimeweight; use crate::binning::container_bins::ContainerBins; +use crate::binning::container_events::ContainerEvents; use crate::binning::container_events::EventValueType; use crate::channelevents::ChannelEvents; use err::thiserror; use err::ThisError; use futures_util::Stream; use futures_util::StreamExt; +use items_0::streamitem::LogItem; use items_0::streamitem::Sitemty; use items_0::timebin::BinnedEventsTimeweightTrait; use items_0::timebin::BinningggBinnerDyn; @@ -17,6 +19,7 @@ use items_0::timebin::EventsBoxed; use netpod::log::*; use netpod::BinnedRange; use netpod::TsNano; +use std::any; use std::arch::x86_64; use std::ops::ControlFlow; use std::pin::Pin; @@ -55,23 +58,40 @@ impl BinnedEventsTimeweightTrait for BinnedEventsTimeweightDynbox where EVT: EventValueType, { - fn ingest(&mut self, evs_all: EventsBoxed) -> Result<(), BinningggError> { - todo!() + fn ingest(&mut self, mut evs: EventsBoxed) -> Result<(), BinningggError> { + // let a = (&evs as &dyn any::Any).downcast_ref::(); + // evs.downcast::(); + // evs.as_anybox().downcast::>(); + match evs.to_anybox().downcast::>() { + Ok(evs) => { + let evs = { + let a = evs; + *a + }; + Ok(self.binner.ingest(evs)?) + } + Err(_) => Err(BinningggError::TypeMismatch { + have: evs.type_name().into(), + expect: std::any::type_name::>().into(), + }), + } } fn input_done_range_final(&mut self) -> Result<(), BinningggError> { - // self.binner.input_done_range_final() - todo!() + Ok(self.binner.input_done_range_final()?) } fn input_done_range_open(&mut self) -> Result<(), BinningggError> { - // self.binner.input_done_range_open() - todo!() + Ok(self.binner.input_done_range_open()?) } - fn output(&mut self) -> Result { - // self.binner.output() - todo!() + fn output(&mut self) -> Result, BinningggError> { + if self.binner.output_len() == 0 { + Ok(None) + } else { + let c = self.binner.output(); + Ok(Some(Box::new(c))) + } } } @@ -93,36 +113,36 @@ impl BinnedEventsTimeweightLazy { impl BinnedEventsTimeweightTrait for BinnedEventsTimeweightLazy { fn ingest(&mut self, evs_all: EventsBoxed) -> Result<(), BinningggError> { self.binned_events - .get_or_insert_with(|| evs_all.binned_events_timeweight_traitobj()) + .get_or_insert_with(|| evs_all.binned_events_timeweight_traitobj(self.range.clone())) .ingest(evs_all) } fn input_done_range_final(&mut self) -> Result<(), BinningggError> { - debug!("TODO something to do if we miss the binner here?"); self.binned_events .as_mut() .map(|x| x.input_done_range_final()) - .unwrap_or(Ok(())) + .unwrap_or_else(|| { + debug!("TODO something to do if we miss the binner here?"); + Ok(()) + }) } fn input_done_range_open(&mut self) -> Result<(), BinningggError> { - debug!("TODO something to do if we miss the binner here?"); self.binned_events .as_mut() .map(|x| x.input_done_range_open()) .unwrap_or(Ok(())) } - fn output(&mut self) -> Result { - debug!("TODO something to do if we miss the binner here?"); - // TODO change trait because without binner we can not produce any container here - todo!() + fn output(&mut self) -> Result, BinningggError> { + self.binned_events.as_mut().map(|x| x.output()).unwrap_or(Ok(None)) } } enum StreamState { Reading, Done, + Invalid, } pub struct BinnedEventsTimeweightStream { @@ -158,13 +178,14 @@ impl BinnedEventsTimeweightStream { ChannelEvents::Events(evs) => match self.binned_events.ingest(evs.to_container_events()) { Ok(()) => { match self.binned_events.output() { - Ok(x) => { + Ok(Some(x)) => { if x.len() == 0 { Continue(()) } else { Break(Ready(Some(Ok(DataItem(Data(x)))))) } } + Ok(None) => Continue(()), Err(e) => Break(Ready(Some(Err(::err::Error::from_string(e))))), } // Continue(()) @@ -192,19 +213,54 @@ impl BinnedEventsTimeweightStream { } fn handle_eos(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll::Item>> { + debug!("handle_eos"); use items_0::streamitem::RangeCompletableItem::*; use items_0::streamitem::StreamItem::*; use Poll::*; + self.state = StreamState::Done; if self.range_complete { - self.binned_events.input_done_range_final(); + self.binned_events + .input_done_range_final() + .map_err(::err::Error::from_string)?; } else { - self.binned_events.input_done_range_open(); + self.binned_events + .input_done_range_open() + .map_err(::err::Error::from_string)?; } - match self.binned_events.output() { - Ok(x) => Ready(Some(Ok(DataItem(Data(x))))), - Err(e) => Ready(Some(Err(::err::Error::from_string(e)))), + match self.binned_events.output().map_err(::err::Error::from_string)? { + Some(x) => { + debug!("seeing ready bins {:?}", x); + Ready(Some(Ok(DataItem(Data(x))))) + } + None => { + let item = LogItem::from_node(888, Level::INFO, format!("no bins ready on eos")); + Ready(Some(Ok(Log(item)))) + } } } + + fn handle_main(mut self: Pin<&mut Self>, cx: &mut Context) -> ControlFlow::Item>>> { + use ControlFlow::*; + use Poll::*; + let ret = match &self.state { + StreamState::Reading => match self.as_mut().inp.poll_next_unpin(cx) { + Ready(Some(x)) => self.as_mut().handle_sitemty(x, cx), + Ready(None) => Break(self.as_mut().handle_eos(cx)), + Pending => Break(Pending), + }, + StreamState::Done => { + self.state = StreamState::Invalid; + Break(Ready(None)) + } + StreamState::Invalid => { + panic!("StreamState::Invalid") + } + }; + if let Break(Ready(Some(Err(_)))) = ret { + self.state = StreamState::Done; + } + ret + } } impl Stream for BinnedEventsTimeweightStream { @@ -212,15 +268,10 @@ impl Stream for BinnedEventsTimeweightStream { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use ControlFlow::*; - use Poll::*; loop { - break match self.as_mut().inp.poll_next_unpin(cx) { - Ready(Some(x)) => match self.as_mut().handle_sitemty(x, cx) { - Continue(()) => continue, - Break(x) => x, - }, - Ready(None) => self.handle_eos(cx), - Pending => Pending, + break match self.as_mut().handle_main(cx) { + Break(x) => x, + Continue(()) => continue, }; } } diff --git a/crates/items_2/src/binning/valuetype.rs b/crates/items_2/src/binning/valuetype.rs index 5e8bf91..1493b55 100644 --- a/crates/items_2/src/binning/valuetype.rs +++ b/crates/items_2/src/binning/valuetype.rs @@ -1,5 +1,4 @@ use super::aggregator::AggregatorTimeWeight; -use super::binnedvaluetype::BinnedNumericValue; use super::container_events::Container; use super::container_events::EventValueType; use core::fmt; @@ -88,9 +87,4 @@ impl EventValueType for EnumVariant { fn identity_sum() -> Self { todo!() } - - // TODO also remove from trait, push it to a more specialized trait for the plain numeric cases. - fn add_weighted(&self, add: &Self, f: f32) -> Self { - todo!() - } } diff --git a/crates/items_2/src/eventsdim0.rs b/crates/items_2/src/eventsdim0.rs index 4afdb91..3166458 100644 --- a/crates/items_2/src/eventsdim0.rs +++ b/crates/items_2/src/eventsdim0.rs @@ -882,6 +882,18 @@ impl EventsNonObj for EventsDim0 { } } +macro_rules! try_to_container_events { + ($sty:ty, $this:expr) => { + if let Some(evs) = $this.as_any_ref().downcast_ref::>() { + use crate::binning::container_events::ContainerEvents; + let tss = $this.tss.iter().map(|&x| TsNano::from_ns(x)).collect(); + let vals = evs.values.clone(); + let ret = ContainerEvents::<$sty>::from_constituents(tss, vals); + return Box::new(ret); + } + }; +} + impl Events for EventsDim0 { fn as_time_binnable_ref(&self) -> &dyn TimeBinnable { self @@ -1105,15 +1117,32 @@ impl Events for EventsDim0 { } fn to_container_events(&self) -> Box { - use crate::binning::container_events::ContainerEvents; - let tss = self.tss.iter().map(|&x| TsNano::from_ns(x)).collect(); - if let Some(evs) = self.as_any_ref().downcast_ref::>() { - let vals = evs.values.clone(); - let ret = ContainerEvents::::from_constituents(tss, vals); - Box::new(ret) - } else { - todo!() - } + try_to_container_events!(u8, self); + try_to_container_events!(u16, self); + try_to_container_events!(u32, self); + try_to_container_events!(u64, self); + try_to_container_events!(f32, self); + try_to_container_events!(f64, self); + let styn = any::type_name::(); + todo!("TODO for {styn}") + } +} + +fn try_to_container_events_fn( + this: &EventsDim0, +) -> Option> +where + STY: ScalarOps, + EVT: crate::binning::container_events::EventValueType>, +{ + use crate::binning::container_events::ContainerEvents; + if let Some(evs) = this.as_any_ref().downcast_ref::>() { + let tss = this.tss.iter().map(|&x| TsNano::from_ns(x)).collect(); + let vals = evs.values.clone(); + let ret = ContainerEvents::::from_constituents(tss, vals); + Some(Box::new(ret)) + } else { + None } } diff --git a/crates/items_2/src/items_2.rs b/crates/items_2/src/items_2.rs index 4f19365..5978d32 100644 --- a/crates/items_2/src/items_2.rs +++ b/crates/items_2/src/items_2.rs @@ -20,7 +20,6 @@ pub mod test; pub mod testgen; pub mod timebin; pub mod transform; -pub mod vecpreview; use channelevents::ChannelEvents; use futures_util::Stream; diff --git a/crates/items_2/src/vecpreview.rs b/crates/items_2/src/vecpreview.rs deleted file mode 100644 index 8b13789..0000000 --- a/crates/items_2/src/vecpreview.rs +++ /dev/null @@ -1 +0,0 @@ - diff --git a/crates/nodenet/src/scylla.rs b/crates/nodenet/src/scylla.rs index e9ddd8d..df4a50d 100644 --- a/crates/nodenet/src/scylla.rs +++ b/crates/nodenet/src/scylla.rs @@ -193,13 +193,18 @@ impl ScyllaEventReadProvider { } impl EventsReadProvider for ScyllaEventReadProvider { - fn read(&self, evq: EventsSubQuery, chconf: ChConf) -> streams::timebin::cached::reader::EventsReading { + fn read(&self, evq: EventsSubQuery) -> streams::timebin::cached::reader::EventsReading { let scyqueue = self.scyqueue.clone(); - let fut1 = async move { crate::scylla::scylla_channel_event_stream(evq, chconf, &scyqueue).await }; - let stream = ScyllaEventsReadStream { - fut1: Some(Box::pin(fut1)), - stream: None, - }; - streams::timebin::cached::reader::EventsReading::new(Box::pin(stream)) + match evq.ch_conf().clone() { + netpod::ChannelTypeConfigGen::Scylla(ch_conf) => { + let fut1 = async move { crate::scylla::scylla_channel_event_stream(evq, ch_conf, &scyqueue).await }; + let stream = ScyllaEventsReadStream { + fut1: Some(Box::pin(fut1)), + stream: None, + }; + streams::timebin::cached::reader::EventsReading::new(Box::pin(stream)) + } + netpod::ChannelTypeConfigGen::SfDatabuffer(_) => panic!("not a scylla reader"), + } } } diff --git a/crates/query/src/api4/events.rs b/crates/query/src/api4/events.rs index 17977cc..bb9d878 100644 --- a/crates/query/src/api4/events.rs +++ b/crates/query/src/api4/events.rs @@ -610,6 +610,10 @@ impl EventsSubQuery { pub fn merger_out_len_max(&self) -> Option { self.settings.merger_out_len_max() } + + pub fn settings(&self) -> &EventsSubQuerySettings { + &self.settings + } } #[derive(Debug, Serialize, Deserialize)] diff --git a/crates/streams/src/eventsplainreader.rs b/crates/streams/src/eventsplainreader.rs new file mode 100644 index 0000000..3d99e30 --- /dev/null +++ b/crates/streams/src/eventsplainreader.rs @@ -0,0 +1,121 @@ +use crate::tcprawclient::OpenBoxedBytesStreamsBox; +use crate::timebin::cached::reader::EventsReadProvider; +use crate::timebin::cached::reader::EventsReading; +use crate::timebin::CacheReadProvider; +use futures_util::Future; +use futures_util::FutureExt; +use futures_util::Stream; +use futures_util::StreamExt; +use items_0::streamitem::Sitemty; +use items_2::channelevents::ChannelEvents; +use netpod::ReqCtx; +use query::api4::events::EventsSubQuery; +use std::pin::Pin; +use std::sync::Arc; +use std::task::Context; +use std::task::Poll; + +enum StreamState { + Opening( + Pin< + Box< + dyn Future> + Send>>, ::err::Error>> + + Send, + >, + >, + ), + Reading(Pin> + Send>>), +} + +struct InnerStream { + state: StreamState, +} + +impl Stream for InnerStream { + type Item = Sitemty; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + use Poll::*; + loop { + break match &mut self.state { + StreamState::Opening(fut) => match fut.poll_unpin(cx) { + Ready(Ok(x)) => { + self.state = StreamState::Reading(x); + continue; + } + Ready(Err(e)) => Ready(Some(Err(e))), + Pending => Pending, + }, + StreamState::Reading(fut) => match fut.poll_next_unpin(cx) { + Ready(Some(x)) => Ready(Some(x)), + Ready(None) => Ready(None), + Pending => Pending, + }, + }; + } + } +} + +pub struct SfDatabufferEventReadProvider { + ctx: Arc, + open_bytes: OpenBoxedBytesStreamsBox, +} + +impl SfDatabufferEventReadProvider { + pub fn new(ctx: Arc, open_bytes: OpenBoxedBytesStreamsBox) -> Self { + Self { ctx, open_bytes } + } +} + +impl EventsReadProvider for SfDatabufferEventReadProvider { + fn read(&self, evq: EventsSubQuery) -> EventsReading { + let range = match evq.range() { + netpod::range::evrange::SeriesRange::TimeRange(x) => x.clone(), + netpod::range::evrange::SeriesRange::PulseRange(_) => panic!("not available for pulse range"), + }; + let ctx = self.ctx.clone(); + let open_bytes = self.open_bytes.clone(); + let state = StreamState::Opening(Box::pin(async move { + let ret = crate::timebinnedjson::timebinnable_stream_sf_databuffer_channelevents( + range, + evq.need_one_before_range(), + evq.ch_conf().clone(), + evq.transform().clone(), + evq.settings().clone(), + evq.log_level().into(), + ctx, + open_bytes, + ) + .await; + ret.map(|x| Box::pin(x) as _) + })); + let stream = InnerStream { state }; + EventsReading::new(Box::pin(stream)) + } +} + +pub struct DummyCacheReadProvider {} + +impl DummyCacheReadProvider { + pub fn new() -> Self { + Self {} + } +} + +impl CacheReadProvider for DummyCacheReadProvider { + fn read( + &self, + series: u64, + bin_len: netpod::DtMs, + msp: u64, + offs: std::ops::Range, + ) -> crate::timebin::cached::reader::CacheReading { + let stream = futures_util::future::ready(Ok(None)); + crate::timebin::cached::reader::CacheReading::new(Box::pin(stream)) + } + + fn write(&self, series: u64, bins: items_0::timebin::BinsBoxed) -> crate::timebin::cached::reader::CacheWriting { + let fut = futures_util::future::ready(Ok(())); + crate::timebin::cached::reader::CacheWriting::new(Box::pin(fut)) + } +} diff --git a/crates/streams/src/lib.rs b/crates/streams/src/lib.rs index 5dc067d..a4233df 100644 --- a/crates/streams/src/lib.rs +++ b/crates/streams/src/lib.rs @@ -2,6 +2,7 @@ pub mod boxed; pub mod cbor_stream; pub mod collect; pub mod dtflags; +pub mod eventsplainreader; pub mod filechunkread; pub mod firsterr; pub mod framed_bytes; diff --git a/crates/streams/src/timebin/cached/reader.rs b/crates/streams/src/timebin/cached/reader.rs index c0fb17c..099169b 100644 --- a/crates/streams/src/timebin/cached/reader.rs +++ b/crates/streams/src/timebin/cached/reader.rs @@ -9,7 +9,6 @@ use items_0::timebin::BinsBoxed; use items_2::channelevents::ChannelEvents; use netpod::log::*; use netpod::BinnedRange; -use netpod::ChConf; use netpod::DtMs; use netpod::TsNano; use query::api4::events::EventsSubQuery; @@ -50,23 +49,23 @@ impl Stream for EventsReading { } pub trait EventsReadProvider: Send + Sync { - fn read(&self, evq: EventsSubQuery, chconf: ChConf) -> EventsReading; + fn read(&self, evq: EventsSubQuery) -> EventsReading; } pub struct CacheReading { - fut: Pin> + Send>>, + fut: Pin, streams::timebin::cached::reader::Error>> + Send>>, } impl CacheReading { pub fn new( - fut: Pin> + Send>>, + fut: Pin, streams::timebin::cached::reader::Error>> + Send>>, ) -> Self { Self { fut } } } impl Future for CacheReading { - type Output = Result; + type Output = Result, streams::timebin::cached::reader::Error>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { self.fut.poll_unpin(cx) @@ -111,7 +110,7 @@ pub struct CachedReader { ts1next: TsNano, bin_len: DtMs, cache_read_provider: Arc, - reading: Option> + Send>>>, + reading: Option, Error>> + Send>>>, } impl CachedReader { @@ -149,7 +148,7 @@ impl Stream for CachedReader { Ready(x) => { self.reading = None; match x { - Ok(bins) => { + Ok(Some(bins)) => { trace_emit!( "- - - - - - - - - - - - emit cached bins {} bin_len {}", bins.len(), @@ -157,6 +156,9 @@ impl Stream for CachedReader { ); Ready(Some(Ok(bins))) } + Ok(None) => { + continue; + } Err(e) => Ready(Some(Err(e))), } } diff --git a/crates/streams/src/timebin/fromevents.rs b/crates/streams/src/timebin/fromevents.rs index 3dc81c1..f4e43da 100644 --- a/crates/streams/src/timebin/fromevents.rs +++ b/crates/streams/src/timebin/fromevents.rs @@ -32,14 +32,13 @@ impl BinnedFromEvents { pub fn new( range: BinnedRange, evq: EventsSubQuery, - chconf: ChConf, do_time_weight: bool, read_provider: Arc, ) -> Result { if !evq.range().is_time() { panic!(); } - let stream = read_provider.read(evq, chconf); + let stream = read_provider.read(evq); // let stream = stream.map(|x| { // let x = items_0::try_map_sitemty_data!(x, |x| match x { // ChannelEvents::Events(x) => { diff --git a/crates/streams/src/timebin/fromlayers.rs b/crates/streams/src/timebin/fromlayers.rs index f952787..96c3d68 100644 --- a/crates/streams/src/timebin/fromlayers.rs +++ b/crates/streams/src/timebin/fromlayers.rs @@ -56,7 +56,6 @@ pub struct TimeBinnedFromLayers { sub: EventsSubQuerySettings, log_level: String, ctx: Arc, - open_bytes: OpenBoxedBytesStreamsBox, inp: BoxedInput, } @@ -72,8 +71,6 @@ impl TimeBinnedFromLayers { sub: EventsSubQuerySettings, log_level: String, ctx: Arc, - open_bytes: OpenBoxedBytesStreamsBox, - series: u64, range: BinnedRange, do_time_weight: bool, bin_len_layers: Vec, @@ -83,7 +80,7 @@ impl TimeBinnedFromLayers { debug!( "{}::new {:?} {:?} {:?}", Self::type_name(), - series, + ch_conf.series(), range, bin_len_layers ); @@ -98,7 +95,6 @@ impl TimeBinnedFromLayers { sub.clone(), log_level.clone(), ctx.clone(), - series, range, do_time_weight, bin_len_layers, @@ -112,7 +108,6 @@ impl TimeBinnedFromLayers { sub, log_level, ctx, - open_bytes, inp: Box::pin(inp), }; Ok(ret) @@ -137,7 +132,6 @@ impl TimeBinnedFromLayers { sub.clone(), log_level.clone(), ctx.clone(), - series, range_finer.clone(), do_time_weight, bin_len_layers, @@ -152,7 +146,6 @@ impl TimeBinnedFromLayers { sub, log_level, ctx, - open_bytes, inp: Box::pin(inp), }; Ok(ret) @@ -168,30 +161,18 @@ impl TimeBinnedFromLayers { transform_query.clone(), ); let evq = EventsSubQuery::from_parts(select, sub.clone(), ctx.reqid().into(), log_level.clone()); - match &ch_conf { - ChannelTypeConfigGen::Scylla(chconf) => { - let inp = BinnedFromEvents::new( - range, - evq, - chconf.clone(), - do_time_weight, - events_read_provider, - )?; - let ret = Self { - ch_conf, - cache_usage, - transform_query, - sub, - log_level, - ctx, - open_bytes, - inp: Box::pin(inp), - }; - debug!("{}::new setup from events", Self::type_name()); - Ok(ret) - } - ChannelTypeConfigGen::SfDatabuffer(_) => return Err(Error::SfDatabufferNotSupported), - } + let inp = BinnedFromEvents::new(range, evq, do_time_weight, events_read_provider)?; + let ret = Self { + ch_conf, + cache_usage, + transform_query, + sub, + log_level, + ctx, + inp: Box::pin(inp), + }; + debug!("{}::new setup from events", Self::type_name()); + Ok(ret) } } } diff --git a/crates/streams/src/timebin/gapfill.rs b/crates/streams/src/timebin/gapfill.rs index d2c7d32..53334d5 100644 --- a/crates/streams/src/timebin/gapfill.rs +++ b/crates/streams/src/timebin/gapfill.rs @@ -66,7 +66,6 @@ pub struct GapFill { sub: EventsSubQuerySettings, log_level: String, ctx: Arc, - series: u64, range: BinnedRange, do_time_weight: bool, bin_len_layers: Vec, @@ -97,7 +96,6 @@ impl GapFill { sub: EventsSubQuerySettings, log_level: String, ctx: Arc, - series: u64, range: BinnedRange, do_time_weight: bool, bin_len_layers: Vec, @@ -107,6 +105,7 @@ impl GapFill { let dbgname = format!("{}--[{}]", dbgname_parent, range); debug_init!("new dbgname {}", dbgname); let inp = if cache_usage.is_cache_read() { + let series = ch_conf.series().expect("series id for cache read"); let stream = super::cached::reader::CachedReader::new(series, range.clone(), cache_read_provider.clone())? .map(|x| match x { Ok(x) => Ok(StreamItem::DataItem(RangeCompletableItem::Data(x))), @@ -125,7 +124,6 @@ impl GapFill { sub, log_level, ctx, - series, range, do_time_weight, bin_len_layers, @@ -257,7 +255,6 @@ impl GapFill { self.sub.clone(), self.log_level.clone(), self.ctx.clone(), - self.series, range_finer_one_before_bin, self.do_time_weight, self.bin_len_layers.clone(), @@ -288,26 +285,16 @@ impl GapFill { self.ctx.reqid().into(), self.log_level.clone(), ); - match &self.ch_conf { - ChannelTypeConfigGen::Scylla(chconf) => { - let range = BinnedRange::from_nano_range(range.clone(), self.range.bin_len.to_dt_ms()); - let inp = BinnedFromEvents::new( - range, - evq, - chconf.clone(), - self.do_time_weight, - self.events_read_provider.clone(), - )?; - self.inp_finer = Some(Box::pin(inp)); - } - ChannelTypeConfigGen::SfDatabuffer(_) => return Err(Error::SfDatabufferNotSupported), - } + let range = BinnedRange::from_nano_range(range.clone(), self.range.bin_len.to_dt_ms()); + let inp = BinnedFromEvents::new(range, evq, self.do_time_weight, self.events_read_provider.clone())?; + self.inp_finer = Some(Box::pin(inp)); } Ok(()) } fn cache_write(mut self: Pin<&mut Self>, bins: BinsBoxed) -> Result<(), Error> { - self.cache_writing = Some(self.cache_read_provider.write(self.series, bins)); + let series = ::err::todoval(); + self.cache_writing = Some(self.cache_read_provider.write(series, bins)); Ok(()) } diff --git a/crates/streams/src/timebinnedjson.rs b/crates/streams/src/timebinnedjson.rs index 6ba5478..1d883f1 100644 --- a/crates/streams/src/timebinnedjson.rs +++ b/crates/streams/src/timebinnedjson.rs @@ -53,7 +53,7 @@ fn assert_stream_send<'u, R>(stream: impl 'u + Send + Stream) -> impl stream } -pub async fn timebinnable_stream( +pub async fn timebinnable_stream_sf_databuffer_box_events( range: NanoRange, one_before_range: bool, ch_conf: ChannelTypeConfigGen, @@ -62,7 +62,7 @@ pub async fn timebinnable_stream( log_level: String, ctx: Arc, open_bytes: OpenBoxedBytesStreamsBox, -) -> Result { +) -> Result>>, Error> { let subq = make_sub_query( ch_conf, range.clone().into(), @@ -222,6 +222,32 @@ pub async fn timebinnable_stream( let stream = stream.map(|x| x); Box::pin(stream) }; + Ok(stream) +} + +async fn timebinnable_stream_sf_databuffer_binnable_box( + range: NanoRange, + one_before_range: bool, + ch_conf: ChannelTypeConfigGen, + transform_query: TransformQuery, + sub: EventsSubQuerySettings, + log_level: String, + ctx: Arc, + open_bytes: OpenBoxedBytesStreamsBox, +) -> Result { + let stream = timebinnable_stream_sf_databuffer_box_events( + range, + one_before_range, + ch_conf, + transform_query, + sub, + log_level, + ctx, + open_bytes, + ) + .await?; + // let stream = stream.map(|x| x); + // let stream = stream.map(|x| ChannelEvents::Events(x)); // let stream = stream.map(move |k| { // on_sitemty_data!(k, |k| { @@ -236,6 +262,39 @@ pub async fn timebinnable_stream( Ok(TimeBinnableStreamBox(stream)) } +pub async fn timebinnable_stream_sf_databuffer_channelevents( + range: NanoRange, + one_before_range: bool, + ch_conf: ChannelTypeConfigGen, + transform_query: TransformQuery, + sub: EventsSubQuerySettings, + log_level: String, + ctx: Arc, + open_bytes: OpenBoxedBytesStreamsBox, +) -> Result>, Error> { + let stream = timebinnable_stream_sf_databuffer_box_events( + range, + one_before_range, + ch_conf, + transform_query, + sub, + log_level, + ctx, + open_bytes, + ) + .await?; + // let stream = stream.map(|x| x); + let stream = stream.map(move |k| { + on_sitemty_data!(k, |k| { + // let k: Box = Box::new(k); + Ok(StreamItem::DataItem(RangeCompletableItem::Data(ChannelEvents::Events( + k, + )))) + }) + }); + Ok(stream) +} + pub struct TimeBinnableStream { make_stream_fut: Option> + Send>>>, stream: Option>> + Send>>>, @@ -253,7 +312,7 @@ impl TimeBinnableStream { ctx: Arc, open_bytes: OpenBoxedBytesStreamsBox, ) -> Self { - let fut = timebinnable_stream( + let fut = timebinnable_stream_sf_databuffer_binnable_box( range, one_before_range, ch_conf, @@ -313,23 +372,13 @@ async fn timebinned_stream( ch_conf: ChannelTypeConfigGen, ctx: &ReqCtx, open_bytes: OpenBoxedBytesStreamsBox, - cache_read_provider: Option>, - events_read_provider: Option>, + cache_read_provider: Arc, + events_read_provider: Arc, ) -> Result>> + Send>>, Error> { use netpod::query::CacheUsage; let cache_usage = query.cache_usage().unwrap_or(CacheUsage::V0NoCache); - match ( - ch_conf.series(), - cache_usage.clone(), - cache_read_provider, - events_read_provider, - ) { - ( - Some(series), - CacheUsage::Use | CacheUsage::Recreate | CacheUsage::Ignore, - Some(cache_read_provider), - Some(events_read_provider), - ) => { + match cache_usage.clone() { + CacheUsage::Use | CacheUsage::Recreate | CacheUsage::Ignore => { debug!( "timebinned_stream caching {:?} subgrids {:?}", query, @@ -351,8 +400,6 @@ async fn timebinned_stream( EventsSubQuerySettings::from(&query), query.log_level().into(), Arc::new(ctx.clone()), - open_bytes.clone(), - series, binned_range.binned_range_time(), do_time_weight, bin_len_layers, @@ -372,7 +419,7 @@ async fn timebinned_stream( let range = binned_range.binned_range_time().to_nano_range(); let do_time_weight = true; let one_before_range = true; - let stream = timebinnable_stream( + let stream = timebinnable_stream_sf_databuffer_binnable_box( range, one_before_range, ch_conf, @@ -412,8 +459,8 @@ pub async fn timebinned_json( ch_conf: ChannelTypeConfigGen, ctx: &ReqCtx, open_bytes: OpenBoxedBytesStreamsBox, - cache_read_provider: Option>, - events_read_provider: Option>, + cache_read_provider: Arc, + events_read_provider: Arc, ) -> Result, Error> { let deadline = Instant::now() + query @@ -486,8 +533,8 @@ pub async fn timebinned_json_framed( ch_conf: ChannelTypeConfigGen, ctx: &ReqCtx, open_bytes: OpenBoxedBytesStreamsBox, - cache_read_provider: Option>, - events_read_provider: Option>, + cache_read_provider: Arc, + events_read_provider: Arc, ) -> Result { trace!("timebinned_json_framed"); let binned_range = query.covering_range()?;