diff --git a/crates/items_0/src/collect_s.rs b/crates/items_0/src/collect_s.rs index 72ab41e..8247f6f 100644 --- a/crates/items_0/src/collect_s.rs +++ b/crates/items_0/src/collect_s.rs @@ -88,7 +88,7 @@ pub trait CollectorType: fmt::Debug + Send + Unpin + WithLen + ByteEstimate { fn result(&mut self, range: Option, binrange: Option) -> Result; } -pub trait Collector: fmt::Debug + Send + Unpin + WithLen + ByteEstimate { +pub trait Collector: fmt::Debug + Send + WithLen + ByteEstimate { fn ingest(&mut self, src: &mut dyn Collectable); fn set_range_complete(&mut self); fn set_timed_out(&mut self); diff --git a/crates/items_0/src/timebin.rs b/crates/items_0/src/timebin.rs index 7e24674..c667d30 100644 --- a/crates/items_0/src/timebin.rs +++ b/crates/items_0/src/timebin.rs @@ -103,7 +103,7 @@ pub trait BinningggContainerEventsDyn: fmt::Debug + Send { fn to_anybox(&mut self) -> Box; } -pub trait BinningggContainerBinsDyn: fmt::Debug + Send + fmt::Display + WithLen + AsAnyMut { +pub trait BinningggContainerBinsDyn: fmt::Debug + Send + fmt::Display + WithLen + AsAnyMut + Collectable { fn type_name(&self) -> &'static str; fn empty(&self) -> BinsBoxed; fn clone(&self) -> BinsBoxed; @@ -196,6 +196,10 @@ impl TimeBinnable for Box { fn to_box_to_json_result(&self) -> Box { self.as_ref().to_box_to_json_result() } + + fn to_container_bins(&self) -> Box { + self.as_ref().to_container_bins() + } } pub trait TimeBinner: fmt::Debug + Send { @@ -237,6 +241,8 @@ pub trait TimeBinnable: ) -> Box; // TODO just a helper for the empty result. fn to_box_to_json_result(&self) -> Box; + // TODO temporary converter + fn to_container_bins(&self) -> Box; } impl WithLen for Box { @@ -273,6 +279,10 @@ impl TimeBinnable for Box { fn to_box_to_json_result(&self) -> Box { todo!() } + + fn to_container_bins(&self) -> Box { + self.as_ref().to_container_bins() + } } impl RangeOverlapInfo for Box { @@ -302,6 +312,10 @@ impl TimeBinnable for Box { fn to_box_to_json_result(&self) -> Box { TimeBinnable::to_box_to_json_result(self.as_ref()) } + + fn to_container_bins(&self) -> Box { + panic!("logic error this converter must not get used on events") + } } impl TypeName for Box { diff --git a/crates/items_2/src/binning/container_bins.rs b/crates/items_2/src/binning/container_bins.rs index d60a60d..44e4cd6 100644 --- a/crates/items_2/src/binning/container_bins.rs +++ b/crates/items_2/src/binning/container_bins.rs @@ -5,10 +5,13 @@ use super::___; use core::fmt; use err::thiserror; use err::ThisError; +use items_0::collect_s::Collectable; use items_0::timebin::BinningggContainerBinsDyn; use items_0::timebin::BinsBoxed; use items_0::vecpreview::VecPreview; use items_0::AsAnyMut; +use items_0::AsAnyRef; +use items_0::TypeName; use items_0::WithLen; use netpod::EnumVariant; use netpod::TsNano; @@ -340,6 +343,91 @@ where } } +impl TypeName for ContainerBins +where + EVT: EventValueType, +{ + fn type_name(&self) -> String { + BinningggContainerBinsDyn::type_name(self).into() + } +} + +impl AsAnyRef for ContainerBins +where + EVT: EventValueType, +{ + fn as_any_ref(&self) -> &dyn any::Any { + self + } +} + +#[derive(Debug)] +pub struct ContainerBinsCollector +where + EVT: EventValueType, +{ + bins: ContainerBins, +} + +impl ContainerBinsCollector where EVT: EventValueType {} + +impl WithLen for ContainerBinsCollector +where + EVT: EventValueType, +{ + fn len(&self) -> usize { + self.bins.len() + } +} + +impl items_0::container::ByteEstimate for ContainerBinsCollector +where + EVT: EventValueType, +{ + fn byte_estimate(&self) -> u64 { + // TODO need better estimate + self.bins.len() as u64 * 200 + } +} + +impl items_0::collect_s::Collector for ContainerBinsCollector +where + EVT: EventValueType, +{ + fn ingest(&mut self, src: &mut dyn Collectable) { + todo!() + } + + fn set_range_complete(&mut self) { + todo!() + } + + fn set_timed_out(&mut self) { + todo!() + } + + fn set_continue_at_here(&mut self) { + todo!() + } + + fn result( + &mut self, + range: Option, + binrange: Option, + ) -> Result, err::Error> { + todo!() + } +} + +impl Collectable for ContainerBins +where + EVT: EventValueType, +{ + fn new_collector(&self) -> Box { + todo!() + } +} + macro_rules! try_to_old_time_binned { ($sty:ty, $this:expr, $lst:expr) => { let this = $this; @@ -383,7 +471,7 @@ where dst.ts1s.extend(self.ts1s.drain(range.clone())); } else { let styn = any::type_name::(); - panic!("unexpected drain EVT {} dst {}", styn, dst.type_name()); + panic!("unexpected drain EVT {} dst {}", styn, Self::type_name()); } } @@ -419,25 +507,6 @@ where } let styn = any::type_name::(); todo!("TODO impl for {styn}"); - // let a = self as &dyn any::Any; - // if let Some(src) = a.downcast_ref::>() { - // use items_0::Empty; - // let mut ret = crate::binsdim0::BinsDim0::::empty(); - // for ((((((&ts1, &ts2), &cnt), min), max), avg), fnl) in src.zip_iter() { - // ret.push(ts1.ns(), ts2.ns(), cnt, *min, *max, *avg as f32, 0.); - // } - // Box::new(ret) - // } else if let Some(src) = a.downcast_ref::>() { - // use items_0::Empty; - // let mut ret = crate::binsdim0::BinsDim0::::empty(); - // for ((((((&ts1, &ts2), &cnt), min), max), avg), fnl) in src.zip_iter() { - // ret.push(ts1.ns(), ts2.ns(), cnt, *min, *max, *avg as f32, 0.); - // } - // Box::new(ret) - // } else { - // let styn = any::type_name::(); - // todo!("TODO impl for {styn}") - // } } } diff --git a/crates/items_2/src/binsdim0.rs b/crates/items_2/src/binsdim0.rs index 46f1a5e..d6979c0 100644 --- a/crates/items_2/src/binsdim0.rs +++ b/crates/items_2/src/binsdim0.rs @@ -1008,6 +1008,26 @@ impl TimeBinnableTypeAggregator for BinsDim0Aggregator { } } +macro_rules! try_to_container_bins { + ($sty:ty, $avgty:ty, $this:expr) => { + let this = $this; + if let Some(bins) = this.as_any_ref().downcast_ref::>() { + let ts1s = bins.ts1s.iter().map(|&x| TsNano::from_ns(x)).collect(); + let ts2s = bins.ts2s.iter().map(|&x| TsNano::from_ns(x)).collect(); + let cnts = bins.cnts.iter().map(|&x| x).collect(); + let mins = bins.mins.iter().map(|&x| x).collect(); + let maxs = bins.maxs.iter().map(|&x| x).collect(); + let avgs = bins.avgs.iter().map(|&x| x as $avgty).collect(); + let lsts = bins.lsts.iter().map(|&x| x).collect(); + let fnls = bins.ts1s.iter().map(|_| true).collect(); + let dst = crate::binning::container_bins::ContainerBins::<$sty>::from_constituents( + ts1s, ts2s, cnts, mins, maxs, avgs, lsts, fnls, + ); + return Box::new(dst); + } + }; +} + impl TimeBinnable for BinsDim0 { fn time_binner_new( &self, @@ -1025,6 +1045,69 @@ impl TimeBinnable for BinsDim0 { let k = serde_json::to_value(self).unwrap(); Box::new(k) as _ } + + fn to_container_bins(&self) -> Box { + let this = self; + + try_to_container_bins!(u8, f64, self); + try_to_container_bins!(u16, f64, self); + try_to_container_bins!(u32, f64, self); + try_to_container_bins!(u64, f64, self); + try_to_container_bins!(i8, f64, self); + try_to_container_bins!(i16, f64, self); + try_to_container_bins!(i32, f64, self); + try_to_container_bins!(i64, f64, self); + try_to_container_bins!(f32, f32, self); + try_to_container_bins!(f64, f64, self); + + if let Some(bins) = this.as_any_ref().downcast_ref::>() { + let ts1s = bins.ts1s.iter().map(|&x| TsNano::from_ns(x)).collect(); + let ts2s = bins.ts2s.iter().map(|&x| TsNano::from_ns(x)).collect(); + let cnts = bins.cnts.iter().map(|&x| x).collect(); + let mins = bins.mins.iter().map(|x| x.clone()).collect(); + let maxs = bins.maxs.iter().map(|x| x.clone()).collect(); + let avgs = bins.avgs.iter().map(|&x| x as f64).collect(); + let lsts = bins.lsts.iter().map(|&x| x).collect(); + let fnls = bins.ts1s.iter().map(|_| true).collect(); + let dst = crate::binning::container_bins::ContainerBins::::from_constituents( + ts1s, ts2s, cnts, mins, maxs, avgs, lsts, fnls, + ); + return Box::new(dst); + } + + if let Some(bins) = this.as_any_ref().downcast_ref::>() { + let ts1s = bins.ts1s.iter().map(|&x| TsNano::from_ns(x)).collect(); + let ts2s = bins.ts2s.iter().map(|&x| TsNano::from_ns(x)).collect(); + let cnts = bins.cnts.iter().map(|&x| x).collect(); + let mins = bins.mins.iter().map(|x| x.clone()).collect(); + let maxs = bins.maxs.iter().map(|x| x.clone()).collect(); + let avgs = bins.avgs.iter().map(|&x| x as f64).collect(); + let lsts = bins.lsts.iter().map(|x| x.clone()).collect(); + let fnls = bins.ts1s.iter().map(|_| true).collect(); + let dst = crate::binning::container_bins::ContainerBins::::from_constituents( + ts1s, ts2s, cnts, mins, maxs, avgs, lsts, fnls, + ); + return Box::new(dst); + } + + if let Some(bins) = this.as_any_ref().downcast_ref::>() { + let ts1s = bins.ts1s.iter().map(|&x| TsNano::from_ns(x)).collect(); + let ts2s = bins.ts2s.iter().map(|&x| TsNano::from_ns(x)).collect(); + let cnts = bins.cnts.iter().map(|&x| x).collect(); + let mins = bins.mins.iter().map(|x| x.clone()).collect(); + let maxs = bins.maxs.iter().map(|x| x.clone()).collect(); + let avgs = bins.avgs.iter().map(|&x| x).collect(); + let lsts = bins.lsts.iter().map(|x| x.clone()).collect(); + let fnls = bins.ts1s.iter().map(|_| true).collect(); + let dst = crate::binning::container_bins::ContainerBins::::from_constituents( + ts1s, ts2s, cnts, mins, maxs, avgs, lsts, fnls, + ); + return Box::new(dst); + } + + let styn = any::type_name::(); + todo!("TODO impl for {styn}"); + } } #[derive(Debug)] diff --git a/crates/items_2/src/binsxbindim0.rs b/crates/items_2/src/binsxbindim0.rs index a58543b..0de8771 100644 --- a/crates/items_2/src/binsxbindim0.rs +++ b/crates/items_2/src/binsxbindim0.rs @@ -628,6 +628,10 @@ impl TimeBinnable for BinsXbinDim0 { let k = serde_json::to_value(self).unwrap(); Box::new(k) } + + fn to_container_bins(&self) -> Box { + panic!("not supported, remove") + } } #[derive(Debug)] diff --git a/crates/items_2/src/channelevents.rs b/crates/items_2/src/channelevents.rs index b48b57a..8ecc6a2 100644 --- a/crates/items_2/src/channelevents.rs +++ b/crates/items_2/src/channelevents.rs @@ -849,6 +849,10 @@ impl TimeBinnable for ChannelEvents { fn to_box_to_json_result(&self) -> Box { todo!() } + + fn to_container_bins(&self) -> Box { + panic!("logic error must not get used on ChannelEvents") + } } impl EventsNonObj for ChannelEvents { diff --git a/crates/items_2/src/eventsdim0.rs b/crates/items_2/src/eventsdim0.rs index 80894ac..3b0ae5c 100644 --- a/crates/items_2/src/eventsdim0.rs +++ b/crates/items_2/src/eventsdim0.rs @@ -861,6 +861,10 @@ impl TimeBinnable for EventsDim0 { let k = serde_json::to_value(self).unwrap(); Box::new(k) as _ } + + fn to_container_bins(&self) -> Box { + panic!("logic error must not get used on events") + } } impl TypeName for EventsDim0 { diff --git a/crates/items_2/src/eventsdim0enum.rs b/crates/items_2/src/eventsdim0enum.rs index f3c59cc..4e798cb 100644 --- a/crates/items_2/src/eventsdim0enum.rs +++ b/crates/items_2/src/eventsdim0enum.rs @@ -367,6 +367,10 @@ impl TimeBinnable for EventsDim0Enum { fn to_box_to_json_result(&self) -> Box { todo!() } + + fn to_container_bins(&self) -> Box { + panic!("logic error must not get used on events") + } } #[derive(Debug, Serialize, Deserialize)] diff --git a/crates/items_2/src/eventsdim1.rs b/crates/items_2/src/eventsdim1.rs index 11ec1fa..40fbdd1 100644 --- a/crates/items_2/src/eventsdim1.rs +++ b/crates/items_2/src/eventsdim1.rs @@ -764,6 +764,10 @@ impl TimeBinnable for EventsDim1 { let k = serde_json::to_value(self).unwrap(); Box::new(k) as _ } + + fn to_container_bins(&self) -> Box { + panic!("logic error must not get used on events") + } } impl items_0::TypeName for EventsDim1 { diff --git a/crates/items_2/src/eventsxbindim0.rs b/crates/items_2/src/eventsxbindim0.rs index accfaea..f6127c4 100644 --- a/crates/items_2/src/eventsxbindim0.rs +++ b/crates/items_2/src/eventsxbindim0.rs @@ -636,6 +636,10 @@ where let k = serde_json::to_value(self).unwrap(); Box::new(k) as _ } + + fn to_container_bins(&self) -> Box { + panic!("logic error must not get used on events") + } } #[derive(Debug)] diff --git a/crates/streams/src/timebinnedjson.rs b/crates/streams/src/timebinnedjson.rs index 1d883f1..1bfeb3b 100644 --- a/crates/streams/src/timebinnedjson.rs +++ b/crates/streams/src/timebinnedjson.rs @@ -374,7 +374,7 @@ async fn timebinned_stream( open_bytes: OpenBoxedBytesStreamsBox, cache_read_provider: Arc, events_read_provider: Arc, -) -> Result>> + Send>>, Error> { +) -> Result>> + Send>>, Error> { use netpod::query::CacheUsage; let cache_usage = query.cache_usage().unwrap_or(CacheUsage::V0NoCache); match cache_usage.clone() { @@ -408,11 +408,19 @@ async fn timebinned_stream( ) .map_err(Error::from_string)?; let stream = stream.map(|item| { - on_sitemty_data!(item, |k: items_0::timebin::BinsBoxed| Ok(StreamItem::DataItem( - RangeCompletableItem::Data(k.to_old_time_binned()) - ))) + on_sitemty_data!(item, |k: items_0::timebin::BinsBoxed| { + let ret = k.to_old_time_binned(); + Ok(StreamItem::DataItem(RangeCompletableItem::Data(ret))) + }) }); - let stream: Pin>> + Send>> = Box::pin(stream); + let stream = stream.map(|item| { + on_sitemty_data!(item, |x| { + let ret = Box::new(x) as Box; + Ok(StreamItem::DataItem(RangeCompletableItem::Data(ret))) + }) + }); + // let stream: Pin>> + Send>> = Box::pin(stream); + let stream = Box::pin(stream); Ok(stream) } _ => { @@ -434,26 +442,27 @@ async fn timebinned_stream( let stream = Box::pin(stream); // TODO rename TimeBinnedStream to make it more clear that it is the component which initiates the time binning. let stream = TimeBinnedStream::new(stream, binned_range, do_time_weight); - let stream: Pin>> + Send>> = Box::pin(stream); + if false { + let stream = stream.map(|x| { + on_sitemty_data!(x, |x: Box| Ok(StreamItem::DataItem( + RangeCompletableItem::Data(x.to_container_bins()) + ))) + }); + todo!(); + } + let stream = stream.map(|x| { + on_sitemty_data!(x, |x| { + let ret = Box::new(x) as Box; + Ok(StreamItem::DataItem(RangeCompletableItem::Data(ret))) + }) + }); + // let stream: Pin>> + Send>> = Box::pin(stream); + let stream = Box::pin(stream); Ok(stream) } } } -fn timebinned_to_collectable( - stream: Pin>> + Send>>, -) -> Pin>> + Send>> { - let stream = stream.map(|k| { - on_sitemty_data!(k, |k| { - let k: Box = Box::new(k); - trace!("got len {}", k.len()); - Ok(StreamItem::DataItem(RangeCompletableItem::Data(k))) - }) - }); - let stream: Pin>> + Send>> = Box::pin(stream); - stream -} - pub async fn timebinned_json( query: BinnedQuery, ch_conf: ChannelTypeConfigGen, @@ -482,7 +491,7 @@ pub async fn timebinned_json( events_read_provider, ) .await?; - let stream = timebinned_to_collectable(stream); + // let stream = timebinned_to_collectable(stream); let collected = Collect::new(stream, deadline, collect_max, bytes_max, None, Some(binned_range)); let collected: BoxFuture<_> = Box::pin(collected); let collres = collected.await?; @@ -549,7 +558,7 @@ pub async fn timebinned_json_framed( events_read_provider, ) .await?; - let stream = timebinned_to_collectable(stream); + // let stream = timebinned_to_collectable(stream); // TODO create a custom Stream adapter. // Want to timeout only on data items: the user wants to wait for bins only a maximum time. // But also, I want to coalesce.