From 52ae20ab02bdde8bac30c368bde7703b0dde01b5 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Fri, 25 Oct 2024 16:31:37 +0200 Subject: [PATCH] WIP --- crates/items_0/src/items_0.rs | 34 ++ crates/items_0/src/timebin.rs | 1 - crates/items_2/src/binning/container_bins.rs | 49 -- crates/items_2/src/channelevents.rs | 6 + crates/items_2/src/eventsdim0.rs | 6 + crates/items_2/src/eventsdim0enum.rs | 6 + crates/items_2/src/eventsdim1.rs | 512 ++----------------- crates/items_2/src/eventsxbindim0.rs | 6 + crates/scyllaconn/src/bincache.rs | 197 +------ crates/scyllaconn/src/worker.rs | 10 +- crates/streams/src/timebinnedjson.rs | 10 +- crates/streams/src/transform.rs | 1 + 12 files changed, 121 insertions(+), 717 deletions(-) diff --git a/crates/items_0/src/items_0.rs b/crates/items_0/src/items_0.rs index ef5eb41..9703d34 100644 --- a/crates/items_0/src/items_0.rs +++ b/crates/items_0/src/items_0.rs @@ -115,6 +115,39 @@ impl fmt::Display for MergeError { impl std::error::Error for MergeError {} +pub trait IntoTimeBinnable: BoxedIntoTimeBinnable { + fn into_time_binnable(self) -> Box; +} + +pub trait BoxedIntoTimeBinnable { + fn boxed_into_time_binnable(self: Box) -> Box; +} + +impl BoxedIntoTimeBinnable for T +where + T: IntoTimeBinnable, +{ + fn boxed_into_time_binnable(self: Box) -> Box { + ::into_time_binnable(*self) + } +} + +impl IntoTimeBinnable for Box { + fn into_time_binnable(self) -> Box { + ::boxed_into_time_binnable(self) + } +} + +impl IntoTimeBinnable for Box { + fn into_time_binnable(self) -> Box { + // ::boxed_into_time_binnable(self) + // Box::new(*self) + // let a: Box = err::todoval(); + // let b: Box = Box::new(*a); + todo!() + } +} + // TODO can I remove the Any bound? /// Container of some form of events, for use as trait object. @@ -129,6 +162,7 @@ pub trait Events: + Send + erased_serde::Serialize + EventsNonObj + + IntoTimeBinnable { fn as_time_binnable_ref(&self) -> &dyn TimeBinnable; fn as_time_binnable_mut(&mut self) -> &mut dyn TimeBinnable; diff --git a/crates/items_0/src/timebin.rs b/crates/items_0/src/timebin.rs index c6ba4d0..66c5ace 100644 --- a/crates/items_0/src/timebin.rs +++ b/crates/items_0/src/timebin.rs @@ -113,7 +113,6 @@ pub trait BinningggContainerBinsDyn: fmt::Debug + Send + fmt::Display + WithLen ) -> std::iter::Zip, std::collections::vec_deque::Iter>; fn drain_into(&mut self, dst: &mut dyn BinningggContainerBinsDyn, range: Range); fn fix_numerics(&mut self); - fn to_old_time_binned(&self) -> Box; } pub type BinsBoxed = Box; diff --git a/crates/items_2/src/binning/container_bins.rs b/crates/items_2/src/binning/container_bins.rs index c4c16b2..87f900e 100644 --- a/crates/items_2/src/binning/container_bins.rs +++ b/crates/items_2/src/binning/container_bins.rs @@ -564,21 +564,6 @@ where } } -macro_rules! try_to_old_time_binned { - ($sty:ty, $this:expr, $lst:expr) => { - let this = $this; - let a = this as &dyn any::Any; - if let Some(src) = a.downcast_ref::>() { - use items_0::Empty; - let mut ret = crate::binsdim0::BinsDim0::<$sty>::empty(); - for ((((((&ts1, &ts2), &cnt), min), max), avg), fnl) in src.zip_iter() { - ret.push(ts1.ns(), ts2.ns(), cnt, *min, *max, *avg as f32, $lst); - } - return Box::new(ret); - } - }; -} - impl BinningggContainerBinsDyn for ContainerBins where EVT: EventValueType, @@ -621,40 +606,6 @@ where fn fix_numerics(&mut self) { for ((min, max), avg) in self.mins.iter_mut().zip(self.maxs.iter_mut()).zip(self.avgs.iter_mut()) {} } - - fn to_old_time_binned(&self) -> Box { - try_to_old_time_binned!(u8, self, 0); - try_to_old_time_binned!(u16, self, 0); - try_to_old_time_binned!(u32, self, 0); - try_to_old_time_binned!(u64, self, 0); - try_to_old_time_binned!(i8, self, 0); - try_to_old_time_binned!(i16, self, 0); - try_to_old_time_binned!(i32, self, 0); - try_to_old_time_binned!(i64, self, 0); - try_to_old_time_binned!(f32, self, 0.); - try_to_old_time_binned!(f64, self, 0.); - try_to_old_time_binned!(bool, self, false); - // try_to_old_time_binned!(String, self, String::new()); - let a = self as &dyn any::Any; - if let Some(src) = a.downcast_ref::>() { - use items_0::Empty; - let mut ret = crate::binsdim0::BinsDim0::::empty(); - for ((((((&ts1, &ts2), &cnt), min), max), avg), _fnl) in src.zip_iter() { - ret.push( - ts1.ns(), - ts2.ns(), - cnt, - min.clone(), - max.clone(), - *avg as f32, - EnumVariant::new(0, ""), - ); - } - return Box::new(ret); - } - let styn = any::type_name::(); - todo!("TODO impl for {styn}"); - } } pub struct ContainerBinsTakeUpTo<'a, EVT> diff --git a/crates/items_2/src/channelevents.rs b/crates/items_2/src/channelevents.rs index f61bebb..33c26d3 100644 --- a/crates/items_2/src/channelevents.rs +++ b/crates/items_2/src/channelevents.rs @@ -166,6 +166,12 @@ impl ChannelEvents { } } +impl items_0::IntoTimeBinnable for ChannelEvents { + fn into_time_binnable(self) -> Box { + Box::new(self) + } +} + impl TypeName for ChannelEvents { fn type_name(&self) -> String { any::type_name::().into() diff --git a/crates/items_2/src/eventsdim0.rs b/crates/items_2/src/eventsdim0.rs index 8b8d853..c211eb5 100644 --- a/crates/items_2/src/eventsdim0.rs +++ b/crates/items_2/src/eventsdim0.rs @@ -189,6 +189,12 @@ where } } +impl items_0::IntoTimeBinnable for EventsDim0 { + fn into_time_binnable(self) -> Box { + Box::new(self) + } +} + impl WithLen for EventsDim0 { fn len(&self) -> usize { self.tss.len() diff --git a/crates/items_2/src/eventsdim0enum.rs b/crates/items_2/src/eventsdim0enum.rs index a0d0b9b..45153b6 100644 --- a/crates/items_2/src/eventsdim0enum.rs +++ b/crates/items_2/src/eventsdim0enum.rs @@ -373,6 +373,12 @@ impl TimeBinnable for EventsDim0Enum { } } +impl items_0::IntoTimeBinnable for EventsDim0Enum { + fn into_time_binnable(self) -> Box { + Box::new(self) + } +} + #[derive(Debug, Serialize, Deserialize)] pub struct EventsDim0EnumChunkOutput { tss: VecDeque, diff --git a/crates/items_2/src/eventsdim1.rs b/crates/items_2/src/eventsdim1.rs index e2f5d81..c488263 100644 --- a/crates/items_2/src/eventsdim1.rs +++ b/crates/items_2/src/eventsdim1.rs @@ -1,7 +1,5 @@ use crate::binsdim0::BinsDim0; use crate::eventsxbindim0::EventsXbinDim0; -use crate::framable::FrameType; -use crate::ts_offs_from_abs_with_anchor; use crate::IsoDateTime; use crate::RangeOverlapInfo; use crate::TimeBinnableType; @@ -11,7 +9,6 @@ use items_0::collect_s::CollectableDyn; use items_0::collect_s::CollectableType; use items_0::collect_s::CollectedDyn; use items_0::collect_s::CollectorTy; -use items_0::collect_s::ToJsonBytes; use items_0::collect_s::ToJsonResult; use items_0::container::ByteEstimate; use items_0::overlap::HasTimestampDeque; @@ -40,6 +37,7 @@ use std::any; use std::any::Any; use std::collections::VecDeque; use std::fmt; +use std::marker::PhantomData; use std::mem; #[allow(unused)] @@ -151,6 +149,12 @@ where } } +impl items_0::IntoTimeBinnable for EventsDim1 { + fn into_time_binnable(self) -> Box { + Box::new(self) + } +} + impl WithLen for EventsDim1 { fn len(&self) -> usize { self.tss.len() @@ -189,17 +193,11 @@ impl TimeBinnableType for EventsDim1 where STY: ScalarOps, { - // TODO type Output = BinsDim0; type Aggregator = EventsDim1Aggregator; fn aggregator(range: SeriesRange, x_bin_count: usize, do_time_weight: bool) -> Self::Aggregator { - let self_name = std::any::type_name::(); - debug!( - "TimeBinnableType for {self_name} aggregator() range {:?} x_bin_count {} do_time_weight {}", - range, x_bin_count, do_time_weight - ); - Self::Aggregator::new(range, do_time_weight) + panic!("TODO remove") } } @@ -411,7 +409,7 @@ impl CollectorTy for EventsDim1Collector { if let Some(range) = &range { match range { SeriesRange::TimeRange(x) => Some(IsoDateTime::from_ns_u64(x.beg + SEC)), - SeriesRange::PulseRange(x) => { + SeriesRange::PulseRange(_) => { error!("TODO emit create continueAt for pulse range"); Some(IsoDateTime::from_ns_u64(0)) } @@ -466,17 +464,7 @@ impl CollectableType for EventsDim1 { #[derive(Debug)] pub struct EventsDim1Aggregator { - range: SeriesRange, - count: u64, - min: STY, - max: STY, - sumc: u64, - sum: f32, - int_ts: u64, - last_seen_ts: u64, - last_seen_val: Option, - did_min_max: bool, - do_time_weight: bool, + _last_seen_val: Option, events_taken_count: u64, events_ignored_count: u64, } @@ -493,222 +481,8 @@ impl Drop for EventsDim1Aggregator { } impl EventsDim1Aggregator { - pub fn new(range: SeriesRange, do_time_weight: bool) -> Self { - /*let int_ts = range.beg; - Self { - range, - count: 0, - min: NTY::zero_b(), - max: NTY::zero_b(), - sum: 0., - sumc: 0, - int_ts, - last_seen_ts: 0, - last_seen_val: None, - did_min_max: false, - do_time_weight, - events_taken_count: 0, - events_ignored_count: 0, - }*/ - todo!() - } - - // TODO reduce clone.. optimize via more traits to factor the trade-offs? - fn apply_min_max(&mut self, val: STY) { - trace!( - "apply_min_max val {:?} last_val {:?} count {} sumc {:?} min {:?} max {:?}", - val, - self.last_seen_val, - self.count, - self.sumc, - self.min, - self.max - ); - if self.did_min_max == false { - self.did_min_max = true; - self.min = val.clone(); - self.max = val.clone(); - } else { - if self.min > val { - self.min = val.clone(); - } - if self.max < val { - self.max = val.clone(); - } - } - } - - fn apply_event_unweight(&mut self, val: STY) { - trace!("TODO check again result_reset_unweight"); - err::todo(); - let vf = val.as_prim_f32_b(); - self.apply_min_max(val); - if vf.is_nan() { - } else { - self.sum += vf; - self.sumc += 1; - } - } - - fn apply_event_time_weight(&mut self, ts: u64) { - /*if let Some(v) = &self.last_seen_val { - let vf = v.as_prim_f32_b(); - let v2 = v.clone(); - if ts > self.range.beg { - self.apply_min_max(v2); - } - let w = if self.do_time_weight { - (ts - self.int_ts) as f32 * 1e-9 - } else { - 1. - }; - if vf.is_nan() { - } else { - self.sum += vf * w; - self.sumc += 1; - } - self.int_ts = ts; - } else { - debug!( - "apply_event_time_weight NO VALUE {}", - ts as i64 - self.range.beg as i64 - ); - }*/ - todo!() - } - - fn ingest_unweight(&mut self, item: &::Input) { - /*trace!("TODO check again result_reset_unweight"); - err::todo(); - for i1 in 0..item.tss.len() { - let ts = item.tss[i1]; - let val = item.values[i1].clone(); - if ts < self.range.beg { - self.events_ignored_count += 1; - } else if ts >= self.range.end { - self.events_ignored_count += 1; - return; - } else { - error!("TODO ingest_unweight"); - err::todo(); - //self.apply_event_unweight(val); - self.count += 1; - self.events_taken_count += 1; - } - }*/ - todo!() - } - - fn ingest_time_weight(&mut self, item: &::Input) { - /*let self_name = std::any::type_name::(); - trace!("{self_name}::ingest_time_weight item len {}", item.len()); - for i1 in 0..item.tss.len() { - let ts = item.tss[i1]; - let val = item.values[i1].clone(); - trace!("{self_name} ingest {:6} {:20} {:10?}", i1, ts, val); - if ts < self.int_ts { - if self.last_seen_val.is_none() { - info!( - "ingest_time_weight event before range, only set last ts {} val {:?}", - ts, val - ); - } - self.events_ignored_count += 1; - self.last_seen_ts = ts; - error!("TODO ingest_time_weight"); - err::todo(); - //self.last_seen_val = Some(val); - } else if ts >= self.range.end { - self.events_ignored_count += 1; - return; - } else { - if false && self.last_seen_val.is_none() { - // TODO no longer needed or? - info!( - "call apply_min_max without last val, use current instead {} {:?}", - ts, val - ); - // TODO: self.apply_min_max(val.clone()); - } - self.apply_event_time_weight(ts); - self.count += 1; - self.last_seen_ts = ts; - error!("TODO ingest_time_weight"); - err::todo(); - //self.last_seen_val = Some(val); - self.events_taken_count += 1; - } - }*/ - todo!() - } - - fn result_reset_unweight(&mut self, range: SeriesRange, _expand: bool) -> BinsDim0 { - /*trace!("TODO check again result_reset_unweight"); - err::todo(); - let (min, max, avg) = if self.sumc > 0 { - let avg = self.sum / self.sumc as f32; - (self.min.clone(), self.max.clone(), avg) - } else { - let g = match &self.last_seen_val { - Some(x) => x.clone(), - None => NTY::zero_b(), - }; - (g.clone(), g.clone(), g.as_prim_f32_b()) - }; - let ret = BinsDim0 { - ts1s: [self.range.beg].into(), - ts2s: [self.range.end].into(), - counts: [self.count].into(), - mins: [min].into(), - maxs: [max].into(), - avgs: [avg].into(), - }; - self.int_ts = range.beg; - self.range = range; - self.count = 0; - self.sum = 0f32; - self.sumc = 0; - self.did_min_max = false; - ret*/ - todo!() - } - - fn result_reset_time_weight(&mut self, range: SeriesRange, expand: bool) -> BinsDim0 { - // TODO check callsite for correct expand status. - /*if expand { - debug!("result_reset_time_weight calls apply_event_time_weight"); - self.apply_event_time_weight(self.range.end); - } else { - debug!("result_reset_time_weight NO EXPAND"); - } - let (min, max, avg) = if self.sumc > 0 { - let avg = self.sum / (self.range.delta() as f32 * 1e-9); - (self.min.clone(), self.max.clone(), avg) - } else { - let g = match &self.last_seen_val { - Some(x) => x.clone(), - None => NTY::zero_b(), - }; - (g.clone(), g.clone(), g.as_prim_f32_b()) - }; - let ret = BinsDim0 { - ts1s: [self.range.beg].into(), - ts2s: [self.range.end].into(), - counts: [self.count].into(), - mins: [min].into(), - maxs: [max].into(), - avgs: [avg].into(), - }; - self.int_ts = range.beg; - self.range = range; - self.count = 0; - self.sum = 0.; - self.sumc = 0; - self.did_min_max = false; - self.min = NTY::zero_b(); - self.max = NTY::zero_b(); - ret*/ - todo!() + pub fn new(_range: SeriesRange, _do_time_weight: bool) -> Self { + panic!("TODO remove") } } @@ -717,55 +491,34 @@ impl TimeBinnableTypeAggregator for EventsDim1Aggregator { type Output = BinsDim0; fn range(&self) -> &SeriesRange { - &self.range + panic!("TODO remove") } - fn ingest(&mut self, item: &Self::Input) { - if true { - trace!("{} ingest {} events", any::type_name::(), item.len()); - } - if false { - for (i, &ts) in item.tss.iter().enumerate() { - trace!("{} ingest {:6} {:20}", any::type_name::(), i, ts); - } - } - if self.do_time_weight { - self.ingest_time_weight(item) - } else { - self.ingest_unweight(item) - } + fn ingest(&mut self, _item: &Self::Input) { + panic!("TODO remove") } - fn result_reset(&mut self, range: SeriesRange) -> Self::Output { - /*trace!("result_reset {} {}", range.beg, range.end); - if self.do_time_weight { - self.result_reset_time_weight(range, expand) - } else { - self.result_reset_unweight(range, expand) - }*/ - todo!() + fn result_reset(&mut self, _range: SeriesRange) -> Self::Output { + panic!("TODO remove") } } impl TimeBinnable for EventsDim1 { fn time_binner_new( &self, - binrange: BinnedRangeEnum, - do_time_weight: bool, - emit_empty_bins: bool, + _binrange: BinnedRangeEnum, + _do_time_weight: bool, + _emit_empty_bins: bool, ) -> Box { - // TODO respect emit_empty_bins - let ret = EventsDim1TimeBinner::::new(binrange, do_time_weight).unwrap(); - Box::new(ret) + panic!("TODO remove") } fn to_box_to_json_result(&self) -> Box { - let k = serde_json::to_value(self).unwrap(); - Box::new(k) as _ + panic!("TODO remove") } fn to_container_bins(&self) -> Box { - panic!("logic error must not get used on events") + panic!("TODO remove") } } @@ -778,12 +531,7 @@ impl items_0::TypeName for EventsDim1 { impl EventsNonObj for EventsDim1 { fn into_tss_pulses(self: Box) -> (VecDeque, VecDeque) { - info!( - "EventsDim1::into_tss_pulses len {} len {}", - self.tss.len(), - self.pulses.len() - ); - (self.tss, self.pulses) + panic!("TODO remove") } } @@ -1004,230 +752,42 @@ impl Events for EventsDim1 { #[derive(Debug)] pub struct EventsDim1TimeBinner { - edges: VecDeque, - agg: EventsDim1Aggregator, - ready: Option< as TimeBinnableTypeAggregator>::Output>, - range_complete: bool, + _t1: PhantomData, } -impl EventsDim1TimeBinner { - fn type_name() -> &'static str { - any::type_name::() - } - - fn new(binrange: BinnedRangeEnum, do_time_weight: bool) -> Result { - /*if edges.len() < 2 { - return Err(Error::with_msg_no_trace(format!("need at least 2 edges"))); - } - let self_name = std::any::type_name::(); - trace!("{self_name}::new edges {edges:?}"); - let agg = EventsDim1Aggregator::new( - NanoRange { - beg: edges[0], - end: edges[1], - }, - do_time_weight, - ); - let ret = Self { - edges, - agg, - ready: None, - range_complete: false, - }; - Ok(ret)*/ - - // trace!("{}::new binrange {:?}", Self::type_name(), binrange); - // let rng = binrange - // .range_at(0) - // .ok_or_else(|| Error::with_msg_no_trace("empty binrange"))?; - // let agg = EventsDim0Aggregator::new(rng, do_time_weight); - // let ret = Self { - // binrange, - // rix: 0, - // rng: Some(agg.range().clone()), - // agg, - // ready: None, - // range_final: false, - // }; - // Ok(ret) - - todo!() - } - - fn next_bin_range(&mut self) -> Option { - /*let self_name = std::any::type_name::(); - if self.edges.len() >= 3 { - self.edges.pop_front(); - let ret = NanoRange { - beg: self.edges[0], - end: self.edges[1], - }; - trace!("{self_name} next_bin_range {} {}", ret.beg, ret.end); - Some(ret) - } else { - self.edges.clear(); - trace!("{self_name} next_bin_range None"); - None - }*/ - todo!() - } -} +impl EventsDim1TimeBinner {} impl TimeBinner for EventsDim1TimeBinner { fn bins_ready_count(&self) -> usize { - match &self.ready { - Some(k) => k.len(), - None => 0, - } + panic!("TODO remove") } fn bins_ready(&mut self) -> Option> { - match self.ready.take() { - Some(k) => Some(Box::new(k)), - None => None, - } + panic!("TODO remove") } - fn ingest(&mut self, item: &mut dyn TimeBinnable) { - /*let self_name = std::any::type_name::(); - trace2!( - "TimeBinner for EventsDim1TimeBinner {:?}\n{:?}\n------------------------------------", - self.edges.iter().take(2).collect::>(), - item - ); - if item.len() == 0 { - // Return already here, RangeOverlapInfo would not give much sense. - return; - } - if self.edges.len() < 2 { - warn!("{self_name} no more bin in edges A"); - return; - } - // TODO optimize by remembering at which event array index we have arrived. - // That needs modified interfaces which can take and yield the start and latest index. - loop { - while item.starts_after(self.agg.range().clone()) { - trace!("{self_name} IGNORE ITEM AND CYCLE BECAUSE item.starts_after"); - self.cycle(); - if self.edges.len() < 2 { - warn!("{self_name} no more bin in edges B"); - return; - } - } - if item.ends_before(self.agg.range().clone()) { - trace!("{self_name} IGNORE ITEM BECAUSE ends_before\n------------- -----------"); - return; - } else { - if self.edges.len() < 2 { - trace!("{self_name} edge list exhausted"); - return; - } else { - if let Some(item) = item - .as_any_ref() - // TODO make statically sure that we attempt to cast to the correct type here: - .downcast_ref::< as TimeBinnableTypeAggregator>::Input>() - { - // TODO collect statistics associated with this request: - trace!("{self_name} FEED THE ITEM..."); - self.agg.ingest(item); - if item.ends_after(self.agg.range().clone()) { - trace!("{self_name} FED ITEM, ENDS AFTER."); - self.cycle(); - if self.edges.len() < 2 { - warn!("{self_name} no more bin in edges C"); - return; - } else { - trace!("{self_name} FED ITEM, CYCLED, CONTINUE."); - } - } else { - trace!("{self_name} FED ITEM."); - break; - } - } else { - panic!("{self_name} not correct item type"); - }; - } - } - }*/ - todo!() + fn ingest(&mut self, _item: &mut dyn TimeBinnable) { + panic!("TODO remove") } - fn push_in_progress(&mut self, push_empty: bool) { - /*let self_name = std::any::type_name::(); - trace!("{self_name}::push_in_progress"); - // TODO expand should be derived from AggKind. Is it still required after all? - // TODO here, the expand means that agg will assume that the current value is kept constant during - // the rest of the time range. - if self.edges.len() >= 2 { - let expand = true; - let range_next = if let Some(x) = self.next_bin_range() { - Some(x) - } else { - None - }; - let mut bins = if let Some(range_next) = range_next { - self.agg.result_reset(range_next, expand) - } else { - let range_next = NanoRange { - beg: u64::MAX - 1, - end: u64::MAX, - }; - self.agg.result_reset(range_next, expand) - }; - assert_eq!(bins.len(), 1); - if push_empty || bins.counts[0] != 0 { - match self.ready.as_mut() { - Some(ready) => { - ready.append_all_from(&mut bins); - } - None => { - self.ready = Some(bins); - } - } - } - }*/ - todo!() + fn push_in_progress(&mut self, _push_empty: bool) { + panic!("TODO remove") } fn cycle(&mut self) { - /*let self_name = std::any::type_name::(); - trace!("{self_name}::cycle"); - // TODO refactor this logic. - let n = self.bins_ready_count(); - self.push_in_progress(true); - if self.bins_ready_count() == n { - if let Some(range) = self.next_bin_range() { - let mut bins = BinsDim0::::empty(); - bins.append_zero(range.beg, range.end); - match self.ready.as_mut() { - Some(ready) => { - ready.append_all_from(&mut bins); - } - None => { - self.ready = Some(bins); - } - } - if self.bins_ready_count() <= n { - error!("failed to push a zero bin"); - } - } else { - warn!("cycle: no in-progress bin pushed, but also no more bin to add as zero-bin"); - } - }*/ - todo!() + panic!("TODO remove") } fn set_range_complete(&mut self) { - self.range_complete = true; + panic!("TODO remove") } fn empty(&self) -> Box { - let ret = as TimeBinnableTypeAggregator>::Output::empty(); - Box::new(ret) + panic!("TODO remove") } fn append_empty_until_end(&mut self) { - // nothing to do for events + panic!("TODO remove") } } diff --git a/crates/items_2/src/eventsxbindim0.rs b/crates/items_2/src/eventsxbindim0.rs index a1b17bb..d13c7c6 100644 --- a/crates/items_2/src/eventsxbindim0.rs +++ b/crates/items_2/src/eventsxbindim0.rs @@ -157,6 +157,12 @@ where } } +impl items_0::IntoTimeBinnable for EventsXbinDim0 { + fn into_time_binnable(self) -> Box { + Box::new(self) + } +} + impl WithLen for EventsXbinDim0 { fn len(&self) -> usize { self.tss.len() diff --git a/crates/scyllaconn/src/bincache.rs b/crates/scyllaconn/src/bincache.rs index b7a2120..64ea43e 100644 --- a/crates/scyllaconn/src/bincache.rs +++ b/crates/scyllaconn/src/bincache.rs @@ -1,42 +1,21 @@ -#![allow(unused)] - -use crate::errconv::ErrConv; use crate::events2::prepare::StmtsCache; use crate::worker::ScyllaQueue; use err::Error; use futures_util::Future; -use futures_util::Stream; use futures_util::StreamExt; use items_0::timebin::BinsBoxed; use items_0::timebin::TimeBinned; -use items_0::Empty; -use items_2::binsdim0::BinsDim0; -use items_2::channelevents::ChannelEvents; +use items_2::binning::container_bins::ContainerBins; use netpod::log::*; -use netpod::query::CacheUsage; -use netpod::timeunits::*; -use netpod::AggKind; -use netpod::BinnedRange; use netpod::ChannelTyped; -use netpod::Dim0Kind; use netpod::DtMs; -use netpod::PreBinnedPatchCoord; use netpod::PreBinnedPatchCoordEnum; -use netpod::PreBinnedPatchRange; -use netpod::PreBinnedPatchRangeEnum; -use netpod::ScalarType; -use netpod::Shape; use netpod::TsNano; -use query::transform::TransformQuery; use scylla::Session as ScySession; -use std::collections::VecDeque; use std::ops::Range; use std::pin::Pin; -use std::sync::Arc; use std::task::Context; use std::task::Poll; -use std::time::Duration; -use std::time::Instant; #[allow(unused)] struct WriteFut<'a> { @@ -67,148 +46,6 @@ impl<'a> Future for WriteFut<'a> { } } -pub fn write_cached_scylla<'a>( - series: u64, - _chn: &'a ChannelTyped, - coord: &'a PreBinnedPatchCoordEnum, - //data: &'a dyn TimeBinned, - data: Box, - //scy: &'a ScySession, - _scy: Arc, -) -> Pin> + Send + 'a>> { - //let _chn = unsafe { &*(chn as *const ChannelTyped) }; - //let data_ptr = data as *const dyn TimeBinned as usize; - //let scy_ptr = scy as *const ScySession as usize; - let fut = async move { - //let data = unsafe { &*(data_ptr as *const dyn TimeBinned) }; - //let scy = unsafe { &*(scy_ptr as *const ScySession) }; - todo!(); - /*let bin_len_sec = (coord.bin_t_len() / SEC) as i32; - let patch_len_sec = (coord.patch_t_len() / SEC) as i32; - let offset = coord.ix();*/ - let bin_len_sec = 0i32; - let patch_len_sec = 0i32; - let offset = 0i32; - warn!( - "write_cached_scylla len {} where series = {} and bin_len_sec = {} and patch_len_sec = {} and agg_kind = 'dummy-agg-kind' and offset = {}", - data.counts().len(), - series, - bin_len_sec, - patch_len_sec, - offset, - ); - let _data2 = data.counts().iter().map(|x| *x as i64).collect::>(); - /* - let stmt = scy.prepare("insert into binned_scalar_f32 (series, bin_len_sec, patch_len_sec, agg_kind, offset, counts, avgs, mins, maxs) values (?, ?, ?, 'dummy-agg-kind', ?, ?, ?, ?, ?)").await.err_conv()?; - scy.execute( - &stmt, - ( - series as i64, - bin_len_sec, - patch_len_sec, - offset as i64, - data2, - data.avgs(), - data.mins(), - data.maxs(), - ), - ) - .await - .err_conv() - .map_err(|e| { - error!("can not write to cache"); - e - })?; - */ - Ok(()) - }; - Box::pin(fut) -} - -pub async fn fetch_uncached_data( - series: u64, - chn: ChannelTyped, - coord: PreBinnedPatchCoordEnum, - one_before_range: bool, - transform: TransformQuery, - cache_usage: CacheUsage, - scy: Arc, -) -> Result, bool)>, Error> { - /*info!("fetch_uncached_data {coord:?}"); - // Try to find a higher resolution pre-binned grid which covers the requested patch. - let (bin, complete) = match PreBinnedPatchRange::covering_range(coord.patch_range(), coord.bin_count() + 1) { - Ok(Some(range)) => { - if coord.patch_range() != range.range() { - error!( - "The chosen covering range does not exactly cover the requested patch {:?} vs {:?}", - coord.patch_range(), - range.range() - ); - } - fetch_uncached_higher_res_prebinned( - series, - &chn, - coord.clone(), - range, - one_before_range, - transform, - cache_usage.clone(), - scy.clone(), - ) - .await - } - Ok(None) => { - fetch_uncached_binned_events(series, &chn, coord.clone(), one_before_range, transform, scy.clone()).await - } - Err(e) => Err(e), - }?; - if true || complete { - let edges = coord.edges(); - if edges.len() < bin.len() + 1 { - error!( - "attempt to write overfull bin to cache edges {} bin {}", - edges.len(), - bin.len() - ); - return Err(Error::with_msg_no_trace(format!( - "attempt to write overfull bin to cache" - ))); - } else if edges.len() > bin.len() + 1 { - let missing = edges.len() - bin.len() - 1; - error!("attempt to write incomplete bin to cache missing {missing}"); - } - if let CacheUsage::Use | CacheUsage::Recreate = &cache_usage { - // TODO pass data in safe way. - let _data = bin.as_ref(); - //let fut = WriteFut::new(&chn, &coord, err::todoval(), &scy); - //fut.await?; - //write_cached_scylla(series, &chn, &coord, bin.as_ref(), &scy).await?; - } - } - Ok(Some((bin, complete)))*/ - todo!() -} - -pub fn fetch_uncached_data_box( - series: u64, - chn: &ChannelTyped, - coord: &PreBinnedPatchCoordEnum, - one_before_range: bool, - transform: TransformQuery, - cache_usage: CacheUsage, - scy: Arc, -) -> Pin, bool)>, Error>> + Send>> { - Box::pin(fetch_uncached_data( - series, - chn.clone(), - coord.clone(), - one_before_range, - transform, - cache_usage, - scy, - )) -} - pub struct ScyllaCacheReadProvider { scyqueue: ScyllaQueue, } @@ -243,25 +80,15 @@ impl streams::timebin::CacheReadProvider for ScyllaCacheReadProvider { pub async fn worker_write( series: u64, - bins: BinsDim0, + bins: ContainerBins, stmts_cache: &StmtsCache, scy: &ScySession, ) -> Result<(), streams::timebin::cached::reader::Error> { - let mut msp_last = u64::MAX; - for ((((((&ts1, &ts2), &cnt), &min), &max), &avg), &lst) in bins - .ts1s - .iter() - .zip(bins.ts2s.iter()) - .zip(bins.cnts.iter()) - .zip(bins.mins.iter()) - .zip(bins.maxs.iter()) - .zip(bins.avgs.iter()) - .zip(bins.lsts.iter()) - { - let bin_len = DtMs::from_ms_u64((ts2 - ts1) / 1000000); + for ((((((&ts1, &ts2), &cnt), &min), &max), &avg), &lst) in bins.zip_iter() { + let bin_len = DtMs::from_ms_u64((ts2.ns() - ts1.ns()) / 1000000); let div = streams::timebin::cached::reader::part_len(bin_len).ns(); - let msp = ts1 / div; - let off = (ts1 - msp * div) / bin_len.ns(); + let msp = ts1.ns() / div; + let off = (ts1.ns() - msp * div) / bin_len.ns(); let params = ( series as i64, bin_len.ms() as i32, @@ -288,7 +115,7 @@ pub async fn worker_read( offs: core::ops::Range, stmts_cache: &StmtsCache, scy: &ScySession, -) -> Result, streams::timebin::cached::reader::Error> { +) -> Result, streams::timebin::cached::reader::Error> { let div = streams::timebin::cached::reader::part_len(bin_len).ns(); let params = ( series as i64, @@ -302,7 +129,7 @@ pub async fn worker_read( .await .map_err(|e| streams::timebin::cached::reader::Error::Scylla(e.to_string()))?; let mut it = res.into_typed::<(i32, i64, f32, f32, f32, f32)>(); - let mut bins = BinsDim0::empty(); + let mut bins = ContainerBins::new(); while let Some(x) = it.next().await { let row = x.map_err(|e| streams::timebin::cached::reader::Error::Scylla(e.to_string()))?; let off = row.0 as u64; @@ -311,9 +138,11 @@ pub async fn worker_read( let max = row.3; let avg = row.4; let lst = row.5; - let ts1 = bin_len.ns() * off + div * msp; - let ts2 = ts1 + bin_len.ns(); - bins.push(ts1, ts2, cnt, min, max, avg, lst); + let ts1 = TsNano::from_ns(bin_len.ns() * off + div * msp); + let ts2 = TsNano::from_ns(ts1.ns() + bin_len.ns()); + // By assumption, bins which got written to storage are considered final + let fnl = true; + bins.push_back(ts1, ts2, cnt, min, max, avg, lst, fnl); } Ok(bins) } diff --git a/crates/scyllaconn/src/worker.rs b/crates/scyllaconn/src/worker.rs index 84d6be0..83c242d 100644 --- a/crates/scyllaconn/src/worker.rs +++ b/crates/scyllaconn/src/worker.rs @@ -8,7 +8,7 @@ use err::thiserror; use err::ThisError; use futures_util::Future; use items_0::Events; -use items_2::binsdim0::BinsDim0; +use items_2::binning::container_bins::ContainerBins; use netpod::log::*; use netpod::ttl::RetentionTime; use netpod::DtMs; @@ -42,7 +42,7 @@ struct ReadCacheF32 { bin_len: DtMs, msp: u64, offs: core::ops::Range, - tx: Sender, streams::timebin::cached::reader::Error>>, + tx: Sender, streams::timebin::cached::reader::Error>>, } #[derive(Debug)] @@ -63,7 +63,7 @@ enum Job { ), WriteCacheF32( u64, - BinsDim0, + ContainerBins, Sender>, ), ReadCacheF32(ReadCacheF32), @@ -141,7 +141,7 @@ impl ScyllaQueue { pub async fn write_cache_f32( &self, series: u64, - bins: BinsDim0, + bins: ContainerBins, ) -> Result<(), streams::timebin::cached::reader::Error> { let (tx, rx) = async_channel::bounded(1); let job = Job::WriteCacheF32(series, bins, tx); @@ -162,7 +162,7 @@ impl ScyllaQueue { bin_len: DtMs, msp: u64, offs: core::ops::Range, - ) -> Result, streams::timebin::cached::reader::Error> { + ) -> Result, streams::timebin::cached::reader::Error> { let (tx, rx) = async_channel::bounded(1); let job = Job::ReadCacheF32(ReadCacheF32 { series, diff --git a/crates/streams/src/timebinnedjson.rs b/crates/streams/src/timebinnedjson.rs index b883faf..4f4c919 100644 --- a/crates/streams/src/timebinnedjson.rs +++ b/crates/streams/src/timebinnedjson.rs @@ -89,9 +89,9 @@ pub async fn timebinnable_stream_sf_databuffer_box_events( let stream = RangeFilter2::new(stream, range, one_before_range); let stream = stream.map(move |k| { - on_sitemty_data!(k, |k| { + on_sitemty_data!(k, |k: Box| { + TODO; let k: Box = Box::new(k); - // trace!("got len {}", k.len()); let k = k.to_dim0_f32_for_binning(); let k = tr.0.transform(k); Ok(StreamItem::DataItem(RangeCompletableItem::Data(k))) @@ -247,6 +247,12 @@ async fn timebinnable_stream_sf_databuffer_binnable_box( ) .await?; let stream = PlainEventStream::new(stream); + // let stream = stream.map(|x| { + // on_sitemty_data!(x, |x| { + // let ret = x.binnable; + // ret + // }) + // }); let stream = EventsToTimeBinnable::new(stream); let stream = Box::pin(stream); Ok(TimeBinnableStreamBox(stream)) diff --git a/crates/streams/src/transform.rs b/crates/streams/src/transform.rs index 4447c21..da23b86 100644 --- a/crates/streams/src/transform.rs +++ b/crates/streams/src/transform.rs @@ -51,6 +51,7 @@ pub fn build_merged_event_transform(tr: &TransformQuery) -> Result>, }