WIP typechecks

This commit is contained in:
Dominik Werder
2024-10-24 11:13:59 +02:00
parent 83842e04fc
commit e6ece07137
11 changed files with 243 additions and 44 deletions

View File

@@ -88,7 +88,7 @@ pub trait CollectorType: fmt::Debug + Send + Unpin + WithLen + ByteEstimate {
fn result(&mut self, range: Option<SeriesRange>, binrange: Option<BinnedRangeEnum>) -> Result<Self::Output, Error>;
}
pub trait Collector: fmt::Debug + Send + Unpin + WithLen + ByteEstimate {
pub trait Collector: fmt::Debug + Send + WithLen + ByteEstimate {
fn ingest(&mut self, src: &mut dyn Collectable);
fn set_range_complete(&mut self);
fn set_timed_out(&mut self);

View File

@@ -103,7 +103,7 @@ pub trait BinningggContainerEventsDyn: fmt::Debug + Send {
fn to_anybox(&mut self) -> Box<dyn std::any::Any>;
}
pub trait BinningggContainerBinsDyn: fmt::Debug + Send + fmt::Display + WithLen + AsAnyMut {
pub trait BinningggContainerBinsDyn: fmt::Debug + Send + fmt::Display + WithLen + AsAnyMut + Collectable {
fn type_name(&self) -> &'static str;
fn empty(&self) -> BinsBoxed;
fn clone(&self) -> BinsBoxed;
@@ -196,6 +196,10 @@ impl TimeBinnable for Box<dyn TimeBinned> {
fn to_box_to_json_result(&self) -> Box<dyn ToJsonResult> {
self.as_ref().to_box_to_json_result()
}
fn to_container_bins(&self) -> Box<dyn BinningggContainerBinsDyn> {
self.as_ref().to_container_bins()
}
}
pub trait TimeBinner: fmt::Debug + Send {
@@ -237,6 +241,8 @@ pub trait TimeBinnable:
) -> Box<dyn TimeBinner>;
// TODO just a helper for the empty result.
fn to_box_to_json_result(&self) -> Box<dyn ToJsonResult>;
// TODO temporary converter
fn to_container_bins(&self) -> Box<dyn BinningggContainerBinsDyn>;
}
impl WithLen for Box<dyn TimeBinnable> {
@@ -273,6 +279,10 @@ impl TimeBinnable for Box<dyn TimeBinnable> {
fn to_box_to_json_result(&self) -> Box<dyn ToJsonResult> {
todo!()
}
fn to_container_bins(&self) -> Box<dyn BinningggContainerBinsDyn> {
self.as_ref().to_container_bins()
}
}
impl RangeOverlapInfo for Box<dyn Events> {
@@ -302,6 +312,10 @@ impl TimeBinnable for Box<dyn Events> {
fn to_box_to_json_result(&self) -> Box<dyn ToJsonResult> {
TimeBinnable::to_box_to_json_result(self.as_ref())
}
fn to_container_bins(&self) -> Box<dyn BinningggContainerBinsDyn> {
panic!("logic error this converter must not get used on events")
}
}
impl TypeName for Box<dyn TimeBinnable> {

View File

@@ -5,10 +5,13 @@ use super::___;
use core::fmt;
use err::thiserror;
use err::ThisError;
use items_0::collect_s::Collectable;
use items_0::timebin::BinningggContainerBinsDyn;
use items_0::timebin::BinsBoxed;
use items_0::vecpreview::VecPreview;
use items_0::AsAnyMut;
use items_0::AsAnyRef;
use items_0::TypeName;
use items_0::WithLen;
use netpod::EnumVariant;
use netpod::TsNano;
@@ -340,6 +343,91 @@ where
}
}
impl<EVT> TypeName for ContainerBins<EVT>
where
EVT: EventValueType,
{
fn type_name(&self) -> String {
BinningggContainerBinsDyn::type_name(self).into()
}
}
impl<EVT> AsAnyRef for ContainerBins<EVT>
where
EVT: EventValueType,
{
fn as_any_ref(&self) -> &dyn any::Any {
self
}
}
#[derive(Debug)]
pub struct ContainerBinsCollector<EVT>
where
EVT: EventValueType,
{
bins: ContainerBins<EVT>,
}
impl<EVT> ContainerBinsCollector<EVT> where EVT: EventValueType {}
impl<EVT> WithLen for ContainerBinsCollector<EVT>
where
EVT: EventValueType,
{
fn len(&self) -> usize {
self.bins.len()
}
}
impl<EVT> items_0::container::ByteEstimate for ContainerBinsCollector<EVT>
where
EVT: EventValueType,
{
fn byte_estimate(&self) -> u64 {
// TODO need better estimate
self.bins.len() as u64 * 200
}
}
impl<EVT> items_0::collect_s::Collector for ContainerBinsCollector<EVT>
where
EVT: EventValueType,
{
fn ingest(&mut self, src: &mut dyn Collectable) {
todo!()
}
fn set_range_complete(&mut self) {
todo!()
}
fn set_timed_out(&mut self) {
todo!()
}
fn set_continue_at_here(&mut self) {
todo!()
}
fn result(
&mut self,
range: Option<netpod::range::evrange::SeriesRange>,
binrange: Option<netpod::BinnedRangeEnum>,
) -> Result<Box<dyn items_0::collect_s::Collected>, err::Error> {
todo!()
}
}
impl<EVT> Collectable for ContainerBins<EVT>
where
EVT: EventValueType,
{
fn new_collector(&self) -> Box<dyn items_0::collect_s::Collector> {
todo!()
}
}
macro_rules! try_to_old_time_binned {
($sty:ty, $this:expr, $lst:expr) => {
let this = $this;
@@ -383,7 +471,7 @@ where
dst.ts1s.extend(self.ts1s.drain(range.clone()));
} else {
let styn = any::type_name::<EVT>();
panic!("unexpected drain EVT {} dst {}", styn, dst.type_name());
panic!("unexpected drain EVT {} dst {}", styn, Self::type_name());
}
}
@@ -419,25 +507,6 @@ where
}
let styn = any::type_name::<EVT>();
todo!("TODO impl for {styn}");
// let a = self as &dyn any::Any;
// if let Some(src) = a.downcast_ref::<ContainerBins<f64>>() {
// use items_0::Empty;
// let mut ret = crate::binsdim0::BinsDim0::<f64>::empty();
// for ((((((&ts1, &ts2), &cnt), min), max), avg), fnl) in src.zip_iter() {
// ret.push(ts1.ns(), ts2.ns(), cnt, *min, *max, *avg as f32, 0.);
// }
// Box::new(ret)
// } else if let Some(src) = a.downcast_ref::<ContainerBins<f32>>() {
// use items_0::Empty;
// let mut ret = crate::binsdim0::BinsDim0::<f32>::empty();
// for ((((((&ts1, &ts2), &cnt), min), max), avg), fnl) in src.zip_iter() {
// ret.push(ts1.ns(), ts2.ns(), cnt, *min, *max, *avg as f32, 0.);
// }
// Box::new(ret)
// } else {
// let styn = any::type_name::<EVT>();
// todo!("TODO impl for {styn}")
// }
}
}

View File

@@ -1008,6 +1008,26 @@ impl<NTY: ScalarOps> TimeBinnableTypeAggregator for BinsDim0Aggregator<NTY> {
}
}
macro_rules! try_to_container_bins {
($sty:ty, $avgty:ty, $this:expr) => {
let this = $this;
if let Some(bins) = this.as_any_ref().downcast_ref::<BinsDim0<$sty>>() {
let ts1s = bins.ts1s.iter().map(|&x| TsNano::from_ns(x)).collect();
let ts2s = bins.ts2s.iter().map(|&x| TsNano::from_ns(x)).collect();
let cnts = bins.cnts.iter().map(|&x| x).collect();
let mins = bins.mins.iter().map(|&x| x).collect();
let maxs = bins.maxs.iter().map(|&x| x).collect();
let avgs = bins.avgs.iter().map(|&x| x as $avgty).collect();
let lsts = bins.lsts.iter().map(|&x| x).collect();
let fnls = bins.ts1s.iter().map(|_| true).collect();
let dst = crate::binning::container_bins::ContainerBins::<$sty>::from_constituents(
ts1s, ts2s, cnts, mins, maxs, avgs, lsts, fnls,
);
return Box::new(dst);
}
};
}
impl<NTY: ScalarOps> TimeBinnable for BinsDim0<NTY> {
fn time_binner_new(
&self,
@@ -1025,6 +1045,69 @@ impl<NTY: ScalarOps> TimeBinnable for BinsDim0<NTY> {
let k = serde_json::to_value(self).unwrap();
Box::new(k) as _
}
fn to_container_bins(&self) -> Box<dyn items_0::timebin::BinningggContainerBinsDyn> {
let this = self;
try_to_container_bins!(u8, f64, self);
try_to_container_bins!(u16, f64, self);
try_to_container_bins!(u32, f64, self);
try_to_container_bins!(u64, f64, self);
try_to_container_bins!(i8, f64, self);
try_to_container_bins!(i16, f64, self);
try_to_container_bins!(i32, f64, self);
try_to_container_bins!(i64, f64, self);
try_to_container_bins!(f32, f32, self);
try_to_container_bins!(f64, f64, self);
if let Some(bins) = this.as_any_ref().downcast_ref::<BinsDim0<bool>>() {
let ts1s = bins.ts1s.iter().map(|&x| TsNano::from_ns(x)).collect();
let ts2s = bins.ts2s.iter().map(|&x| TsNano::from_ns(x)).collect();
let cnts = bins.cnts.iter().map(|&x| x).collect();
let mins = bins.mins.iter().map(|x| x.clone()).collect();
let maxs = bins.maxs.iter().map(|x| x.clone()).collect();
let avgs = bins.avgs.iter().map(|&x| x as f64).collect();
let lsts = bins.lsts.iter().map(|&x| x).collect();
let fnls = bins.ts1s.iter().map(|_| true).collect();
let dst = crate::binning::container_bins::ContainerBins::<bool>::from_constituents(
ts1s, ts2s, cnts, mins, maxs, avgs, lsts, fnls,
);
return Box::new(dst);
}
if let Some(bins) = this.as_any_ref().downcast_ref::<BinsDim0<String>>() {
let ts1s = bins.ts1s.iter().map(|&x| TsNano::from_ns(x)).collect();
let ts2s = bins.ts2s.iter().map(|&x| TsNano::from_ns(x)).collect();
let cnts = bins.cnts.iter().map(|&x| x).collect();
let mins = bins.mins.iter().map(|x| x.clone()).collect();
let maxs = bins.maxs.iter().map(|x| x.clone()).collect();
let avgs = bins.avgs.iter().map(|&x| x as f64).collect();
let lsts = bins.lsts.iter().map(|x| x.clone()).collect();
let fnls = bins.ts1s.iter().map(|_| true).collect();
let dst = crate::binning::container_bins::ContainerBins::<String>::from_constituents(
ts1s, ts2s, cnts, mins, maxs, avgs, lsts, fnls,
);
return Box::new(dst);
}
if let Some(bins) = this.as_any_ref().downcast_ref::<BinsDim0<netpod::EnumVariant>>() {
let ts1s = bins.ts1s.iter().map(|&x| TsNano::from_ns(x)).collect();
let ts2s = bins.ts2s.iter().map(|&x| TsNano::from_ns(x)).collect();
let cnts = bins.cnts.iter().map(|&x| x).collect();
let mins = bins.mins.iter().map(|x| x.clone()).collect();
let maxs = bins.maxs.iter().map(|x| x.clone()).collect();
let avgs = bins.avgs.iter().map(|&x| x).collect();
let lsts = bins.lsts.iter().map(|x| x.clone()).collect();
let fnls = bins.ts1s.iter().map(|_| true).collect();
let dst = crate::binning::container_bins::ContainerBins::<netpod::EnumVariant>::from_constituents(
ts1s, ts2s, cnts, mins, maxs, avgs, lsts, fnls,
);
return Box::new(dst);
}
let styn = any::type_name::<NTY>();
todo!("TODO impl for {styn}");
}
}
#[derive(Debug)]

View File

@@ -628,6 +628,10 @@ impl<NTY: ScalarOps> TimeBinnable for BinsXbinDim0<NTY> {
let k = serde_json::to_value(self).unwrap();
Box::new(k)
}
fn to_container_bins(&self) -> Box<dyn items_0::timebin::BinningggContainerBinsDyn> {
panic!("not supported, remove")
}
}
#[derive(Debug)]

View File

@@ -849,6 +849,10 @@ impl TimeBinnable for ChannelEvents {
fn to_box_to_json_result(&self) -> Box<dyn items_0::collect_s::ToJsonResult> {
todo!()
}
fn to_container_bins(&self) -> Box<dyn items_0::timebin::BinningggContainerBinsDyn> {
panic!("logic error must not get used on ChannelEvents")
}
}
impl EventsNonObj for ChannelEvents {

View File

@@ -861,6 +861,10 @@ impl<STY: ScalarOps> TimeBinnable for EventsDim0<STY> {
let k = serde_json::to_value(self).unwrap();
Box::new(k) as _
}
fn to_container_bins(&self) -> Box<dyn items_0::timebin::BinningggContainerBinsDyn> {
panic!("logic error must not get used on events")
}
}
impl<STY> TypeName for EventsDim0<STY> {

View File

@@ -367,6 +367,10 @@ impl TimeBinnable for EventsDim0Enum {
fn to_box_to_json_result(&self) -> Box<dyn ToJsonResult> {
todo!()
}
fn to_container_bins(&self) -> Box<dyn items_0::timebin::BinningggContainerBinsDyn> {
panic!("logic error must not get used on events")
}
}
#[derive(Debug, Serialize, Deserialize)]

View File

@@ -764,6 +764,10 @@ impl<STY: ScalarOps> TimeBinnable for EventsDim1<STY> {
let k = serde_json::to_value(self).unwrap();
Box::new(k) as _
}
fn to_container_bins(&self) -> Box<dyn items_0::timebin::BinningggContainerBinsDyn> {
panic!("logic error must not get used on events")
}
}
impl<STY> items_0::TypeName for EventsDim1<STY> {

View File

@@ -636,6 +636,10 @@ where
let k = serde_json::to_value(self).unwrap();
Box::new(k) as _
}
fn to_container_bins(&self) -> Box<dyn items_0::timebin::BinningggContainerBinsDyn> {
panic!("logic error must not get used on events")
}
}
#[derive(Debug)]

View File

@@ -374,7 +374,7 @@ async fn timebinned_stream(
open_bytes: OpenBoxedBytesStreamsBox,
cache_read_provider: Arc<dyn CacheReadProvider>,
events_read_provider: Arc<dyn EventsReadProvider>,
) -> Result<Pin<Box<dyn Stream<Item = Sitemty<Box<dyn TimeBinned>>> + Send>>, Error> {
) -> Result<Pin<Box<dyn Stream<Item = Sitemty<Box<dyn Collectable>>> + Send>>, Error> {
use netpod::query::CacheUsage;
let cache_usage = query.cache_usage().unwrap_or(CacheUsage::V0NoCache);
match cache_usage.clone() {
@@ -408,11 +408,19 @@ async fn timebinned_stream(
)
.map_err(Error::from_string)?;
let stream = stream.map(|item| {
on_sitemty_data!(item, |k: items_0::timebin::BinsBoxed| Ok(StreamItem::DataItem(
RangeCompletableItem::Data(k.to_old_time_binned())
)))
on_sitemty_data!(item, |k: items_0::timebin::BinsBoxed| {
let ret = k.to_old_time_binned();
Ok(StreamItem::DataItem(RangeCompletableItem::Data(ret)))
})
});
let stream: Pin<Box<dyn Stream<Item = Sitemty<Box<dyn TimeBinned>>> + Send>> = Box::pin(stream);
let stream = stream.map(|item| {
on_sitemty_data!(item, |x| {
let ret = Box::new(x) as Box<dyn Collectable>;
Ok(StreamItem::DataItem(RangeCompletableItem::Data(ret)))
})
});
// let stream: Pin<Box<dyn Stream<Item = Sitemty<Box<dyn TimeBinned>>> + Send>> = Box::pin(stream);
let stream = Box::pin(stream);
Ok(stream)
}
_ => {
@@ -434,26 +442,27 @@ async fn timebinned_stream(
let stream = Box::pin(stream);
// TODO rename TimeBinnedStream to make it more clear that it is the component which initiates the time binning.
let stream = TimeBinnedStream::new(stream, binned_range, do_time_weight);
let stream: Pin<Box<dyn Stream<Item = Sitemty<Box<dyn TimeBinned>>> + Send>> = Box::pin(stream);
if false {
let stream = stream.map(|x| {
on_sitemty_data!(x, |x: Box<dyn TimeBinned>| Ok(StreamItem::DataItem(
RangeCompletableItem::Data(x.to_container_bins())
)))
});
todo!();
}
let stream = stream.map(|x| {
on_sitemty_data!(x, |x| {
let ret = Box::new(x) as Box<dyn Collectable>;
Ok(StreamItem::DataItem(RangeCompletableItem::Data(ret)))
})
});
// let stream: Pin<Box<dyn Stream<Item = Sitemty<Box<dyn TimeBinned>>> + Send>> = Box::pin(stream);
let stream = Box::pin(stream);
Ok(stream)
}
}
}
fn timebinned_to_collectable(
stream: Pin<Box<dyn Stream<Item = Sitemty<Box<dyn TimeBinned>>> + Send>>,
) -> Pin<Box<dyn Stream<Item = Sitemty<Box<dyn Collectable>>> + Send>> {
let stream = stream.map(|k| {
on_sitemty_data!(k, |k| {
let k: Box<dyn Collectable> = Box::new(k);
trace!("got len {}", k.len());
Ok(StreamItem::DataItem(RangeCompletableItem::Data(k)))
})
});
let stream: Pin<Box<dyn Stream<Item = Sitemty<Box<dyn Collectable>>> + Send>> = Box::pin(stream);
stream
}
pub async fn timebinned_json(
query: BinnedQuery,
ch_conf: ChannelTypeConfigGen,
@@ -482,7 +491,7 @@ pub async fn timebinned_json(
events_read_provider,
)
.await?;
let stream = timebinned_to_collectable(stream);
// let stream = timebinned_to_collectable(stream);
let collected = Collect::new(stream, deadline, collect_max, bytes_max, None, Some(binned_range));
let collected: BoxFuture<_> = Box::pin(collected);
let collres = collected.await?;
@@ -549,7 +558,7 @@ pub async fn timebinned_json_framed(
events_read_provider,
)
.await?;
let stream = timebinned_to_collectable(stream);
// let stream = timebinned_to_collectable(stream);
// TODO create a custom Stream adapter.
// Want to timeout only on data items: the user wants to wait for bins only a maximum time.
// But also, I want to coalesce.