WIP but checks

This commit is contained in:
Dominik Werder
2023-04-05 15:51:10 +02:00
parent 81298b16df
commit 32efc693f5
12 changed files with 188 additions and 28 deletions

View File

@@ -99,7 +99,7 @@ pub trait Events:
+ erased_serde::Serialize
+ EventsNonObj
{
fn as_time_binnable(&self) -> &dyn TimeBinnable;
fn as_time_binnable_mut(&mut self) -> &mut dyn TimeBinnable;
fn verify(&self) -> bool;
fn output_info(&self);
fn as_collectable_mut(&mut self) -> &mut dyn Collectable;

View File

@@ -5,6 +5,7 @@ use crate::AsAnyRef;
use crate::RangeOverlapInfo;
use crate::TypeName;
use crate::WithLen;
use netpod::log::*;
use netpod::BinnedRangeEnum;
use std::any::Any;
use std::fmt;
@@ -58,8 +59,8 @@ pub trait TimeBinned: Any + TypeName + TimeBinnable + Collectable + erased_serde
fn validate(&self) -> Result<(), String>;
}
pub trait TimeBinner: Send {
fn ingest(&mut self, item: &dyn TimeBinnable);
pub trait TimeBinner: fmt::Debug + Send {
fn ingest(&mut self, item: &mut dyn TimeBinnable);
fn bins_ready_count(&self) -> usize;
fn bins_ready(&mut self) -> Option<Box<dyn TimeBinned>>;
@@ -89,17 +90,81 @@ pub trait TimeBinnable: fmt::Debug + WithLen + RangeOverlapInfo + Any + AsAnyRef
}
#[derive(Debug)]
pub struct TimeBinnerDyn {}
pub struct TimeBinnerDynStruct {
binrange: BinnedRangeEnum,
do_time_weight: bool,
binner: Option<Box<dyn TimeBinner>>,
}
impl TimeBinnerTy for TimeBinnerDyn {
impl TimeBinnerDynStruct {
pub fn new(binrange: BinnedRangeEnum, do_time_weight: bool, binner: Box<dyn TimeBinner>) -> Self {
Self {
binrange,
do_time_weight,
binner: Some(binner),
}
}
}
impl TimeBinnerTy for TimeBinnerDynStruct {
type Input = Box<dyn TimeBinnable>;
type Output = Box<dyn TimeBinned>;
fn ingest(&mut self, item: &mut Self::Input) {
todo!()
if self.binner.is_none() {
self.binner = Some(Box::new(
item.time_binner_new(self.binrange.clone(), self.do_time_weight),
));
}
self.binner.as_mut().unwrap().as_mut().ingest(item.as_mut())
}
fn set_range_complete(&mut self) {
if let Some(k) = self.binner.as_mut() {
k.set_range_complete()
}
}
fn bins_ready_count(&self) -> usize {
if let Some(k) = self.binner.as_ref() {
k.bins_ready_count()
} else {
0
}
}
fn bins_ready(&mut self) -> Option<Self::Output> {
if let Some(k) = self.binner.as_mut() {
k.bins_ready()
} else {
None
}
}
fn push_in_progress(&mut self, push_empty: bool) {
if let Some(k) = self.binner.as_mut() {
k.push_in_progress(push_empty)
}
}
fn cycle(&mut self) {
if let Some(k) = self.binner.as_mut() {
k.cycle()
}
}
fn empty(&self) -> Option<Self::Output> {
if let Some(k) = self.binner.as_ref() {
Some(k.empty())
} else {
warn!("TimeBinnerDynStruct::empty called with binner None");
None
}
}
}
impl TimeBinner for TimeBinnerDynStruct {
fn ingest(&mut self, item: &mut dyn TimeBinnable) {
todo!()
}
@@ -107,7 +172,7 @@ impl TimeBinnerTy for TimeBinnerDyn {
todo!()
}
fn bins_ready(&mut self) -> Option<Self::Output> {
fn bins_ready(&mut self) -> Option<Box<dyn TimeBinned>> {
todo!()
}
@@ -119,15 +184,20 @@ impl TimeBinnerTy for TimeBinnerDyn {
todo!()
}
fn empty(&self) -> Option<Self::Output> {
fn set_range_complete(&mut self) {
todo!()
}
fn empty(&self) -> Box<dyn TimeBinned> {
todo!()
}
}
impl TimeBinnableTy for Box<dyn TimeBinnable> {
type TimeBinner = TimeBinnerDyn;
type TimeBinner = TimeBinnerDynStruct;
fn time_binner_new(&self, binrange: BinnedRangeEnum, do_time_weight: bool) -> Self::TimeBinner {
todo!()
let binner = self.as_ref().time_binner_new(binrange.clone(), do_time_weight);
TimeBinnerDynStruct::new(binrange, do_time_weight, binner)
}
}

View File

@@ -141,16 +141,16 @@ impl BinnedCollected {
self.range_final = true;
}
RangeCompletableItem::Data(k) => match k {
ChannelEvents::Events(events) => {
ChannelEvents::Events(mut events) => {
if self.binner.is_none() {
let bb = events
.as_time_binnable()
.as_time_binnable_mut()
.time_binner_new(self.binrange.clone(), self.do_time_weight);
self.binner = Some(bb);
}
let binner = self.binner.as_mut().unwrap();
trace!("handle_item call binner.ingest");
binner.ingest(events.as_time_binnable());
binner.ingest(events.as_time_binnable_mut());
flush_binned(binner, &mut self.coll, false)?;
}
ChannelEvents::Status(item) => {

View File

@@ -10,6 +10,7 @@ use items_0::collect_s::CollectorType;
use items_0::collect_s::ToJsonResult;
use items_0::scalar_ops::ScalarOps;
use items_0::timebin::TimeBinnable;
use items_0::timebin::TimeBinned;
use items_0::timebin::TimeBinner;
use items_0::timebin::TimeBins;
use items_0::AppendEmptyBin;
@@ -31,7 +32,6 @@ use std::any;
use std::any::Any;
use std::collections::VecDeque;
use std::fmt;
use items_0::timebin::TimeBinned;
#[allow(unused)]
macro_rules! trace4 {
@@ -520,6 +520,7 @@ impl<NTY: ScalarOps> CollectableType for BinsDim0<NTY> {
}
}
#[derive(Debug)]
pub struct BinsDim0Aggregator<NTY> {
range: SeriesRange,
count: u64,
@@ -611,6 +612,7 @@ impl<NTY: ScalarOps> TimeBinnable for BinsDim0<NTY> {
}
}
#[derive(Debug)]
pub struct BinsDim0TimeBinner<NTY: ScalarOps> {
binrange: BinnedRangeEnum,
do_time_weight: bool,
@@ -646,7 +648,7 @@ impl<NTY: ScalarOps> BinsDim0TimeBinner<NTY> {
}
impl<NTY: ScalarOps> TimeBinner for BinsDim0TimeBinner<NTY> {
fn ingest(&mut self, item: &dyn TimeBinnable) {
fn ingest(&mut self, item: &mut dyn TimeBinnable) {
/*let self_name = any::type_name::<Self>();
if item.len() == 0 {
// Return already here, RangeOverlapInfo would not give much sense.

View File

@@ -476,6 +476,7 @@ impl<NTY: ScalarOps> CollectableType for BinsXbinDim0<NTY> {
}
}
#[derive(Debug)]
pub struct BinsXbinDim0Aggregator<NTY> {
range: SeriesRange,
count: u64,
@@ -567,6 +568,7 @@ impl<NTY: ScalarOps> TimeBinnable for BinsXbinDim0<NTY> {
}
}
#[derive(Debug)]
pub struct BinsXbinDim0TimeBinner<NTY: ScalarOps> {
binrange: BinnedRangeEnum,
do_time_weight: bool,
@@ -601,7 +603,7 @@ impl<NTY: ScalarOps> BinsXbinDim0TimeBinner<NTY> {
}
impl<NTY: ScalarOps> TimeBinner for BinsXbinDim0TimeBinner<NTY> {
fn ingest(&mut self, item: &dyn TimeBinnable) {
fn ingest(&mut self, item: &mut dyn TimeBinnable) {
/*let self_name = std::any::type_name::<Self>();
if item.len() == 0 {
// Return already here, RangeOverlapInfo would not give much sense.

View File

@@ -656,7 +656,8 @@ impl RangeOverlapInfo for ChannelEvents {
impl TimeBinnable for ChannelEvents {
fn time_binner_new(&self, binrange: BinnedRangeEnum, do_time_weight: bool) -> Box<dyn TimeBinner> {
todo!()
let ret = <ChannelEvents as TimeBinnableTy>::time_binner_new(&self, binrange, do_time_weight);
Box::new(ret)
}
fn to_box_to_json_result(&self) -> Box<dyn items_0::collect_s::ToJsonResult> {
@@ -671,7 +672,7 @@ impl EventsNonObj for ChannelEvents {
}
impl Events for ChannelEvents {
fn as_time_binnable(&self) -> &dyn TimeBinnable {
fn as_time_binnable_mut(&mut self) -> &mut dyn TimeBinnable {
todo!()
}
@@ -787,7 +788,7 @@ impl TimeBinnerTy for ChannelEventsTimeBinner {
self.binner = Some(binner);
}
match self.binner.as_mut() {
Some(binner) => binner.ingest(item.as_time_binnable()),
Some(binner) => binner.ingest(item.as_time_binnable_mut()),
None => {
error!("ingest without active binner item {item:?}");
()
@@ -842,6 +843,40 @@ impl TimeBinnerTy for ChannelEventsTimeBinner {
}
}
impl TimeBinner for ChannelEventsTimeBinner {
fn ingest(&mut self, item: &mut dyn TimeBinnable) {
if let Some(item) = item.as_any_mut().downcast_mut::<ChannelEvents>() {
TimeBinnerTy::ingest(self, item)
} else {
panic!()
}
}
fn bins_ready_count(&self) -> usize {
todo!()
}
fn bins_ready(&mut self) -> Option<Box<dyn TimeBinned>> {
todo!()
}
fn push_in_progress(&mut self, push_empty: bool) {
todo!()
}
fn cycle(&mut self) {
todo!()
}
fn set_range_complete(&mut self) {
todo!()
}
fn empty(&self) -> Box<dyn TimeBinned> {
todo!()
}
}
impl TimeBinnableTy for ChannelEvents {
type TimeBinner = ChannelEventsTimeBinner;

View File

@@ -444,6 +444,7 @@ impl<NTY: ScalarOps> items_0::collect_s::CollectableType for EventsDim0<NTY> {
}
}
#[derive(Debug)]
pub struct EventsDim0Aggregator<NTY> {
range: SeriesRange,
count: u64,
@@ -768,8 +769,8 @@ impl<STY: ScalarOps> EventsNonObj for EventsDim0<STY> {
}
impl<STY: ScalarOps> Events for EventsDim0<STY> {
fn as_time_binnable(&self) -> &dyn TimeBinnable {
self as &dyn TimeBinnable
fn as_time_binnable_mut(&mut self) -> &mut dyn TimeBinnable {
self as &mut dyn TimeBinnable
}
fn verify(&self) -> bool {
@@ -913,6 +914,7 @@ impl<STY: ScalarOps> Events for EventsDim0<STY> {
}
}
#[derive(Debug)]
pub struct EventsDim0TimeBinner<NTY: ScalarOps> {
binrange: BinnedRangeEnum,
rix: usize,
@@ -972,7 +974,7 @@ impl<NTY: ScalarOps> TimeBinner for EventsDim0TimeBinner<NTY> {
}
}
fn ingest(&mut self, item: &dyn TimeBinnable) {
fn ingest(&mut self, item: &mut dyn TimeBinnable) {
let self_name = any::type_name::<Self>();
trace2!(
"TimeBinner for {self_name} ingest agg.range {:?} item {:?}",

View File

@@ -354,6 +354,7 @@ impl<NTY: ScalarOps> CollectableType for EventsDim1<NTY> {
}
}
#[derive(Debug)]
pub struct EventsDim1Aggregator<NTY> {
range: SeriesRange,
count: u64,
@@ -667,8 +668,8 @@ impl<STY: ScalarOps> EventsNonObj for EventsDim1<STY> {
}
impl<STY: ScalarOps> Events for EventsDim1<STY> {
fn as_time_binnable(&self) -> &dyn TimeBinnable {
self as &dyn TimeBinnable
fn as_time_binnable_mut(&mut self) -> &mut dyn TimeBinnable {
self as &mut dyn TimeBinnable
}
fn verify(&self) -> bool {
@@ -812,6 +813,7 @@ impl<STY: ScalarOps> Events for EventsDim1<STY> {
}
}
#[derive(Debug)]
pub struct EventsDim1TimeBinner<NTY: ScalarOps> {
edges: VecDeque<u64>,
agg: EventsDim1Aggregator<NTY>,
@@ -877,7 +879,7 @@ impl<NTY: ScalarOps> TimeBinner for EventsDim1TimeBinner<NTY> {
}
}
fn ingest(&mut self, item: &dyn TimeBinnable) {
fn ingest(&mut self, item: &mut dyn TimeBinnable) {
/*let self_name = std::any::type_name::<Self>();
trace2!(
"TimeBinner for EventsDim1TimeBinner {:?}\n{:?}\n------------------------------------",

View File

@@ -1576,6 +1576,17 @@ where
}
ret
}
pub fn edges(&self) -> Vec<T> {
let mut ret = Vec::new();
let mut t = self.bin_len.times(self.bin_off);
let end = self.bin_len.times(self.bin_off + self.bin_cnt);
while t <= end {
ret.push(t.clone());
t = t.add(&self.bin_len);
}
ret
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
@@ -1661,6 +1672,13 @@ impl BinnedRangeEnum {
BinnedRangeEnum::Pulse(_) => Dim0Kind::Pulse,
}
}
pub fn binned_range_time(&self) -> BinnedRange<TsNano> {
match self {
BinnedRangeEnum::Time(x) => x.clone(),
BinnedRangeEnum::Pulse(_) => panic!(),
}
}
}
#[cfg(test)]

View File

@@ -1 +1,2 @@
pub mod binrange;
pub mod evrange;

View File

@@ -0,0 +1,28 @@
use super::evrange::NanoRange;
use super::evrange::SeriesRange;
use crate::timeunits::SEC;
use crate::BinnedRangeEnum;
use crate::Dim0Kind;
use crate::TsNano;
use chrono::DateTime;
use chrono::Utc;
#[test]
fn test_binned_range_covering_00() {
let range = SeriesRange::TimeRange(NanoRange::from_date_time(
DateTime::parse_from_rfc3339("1970-01-01T10:10:00Z").unwrap().into(),
DateTime::parse_from_rfc3339("1970-01-01T10:20:00Z").unwrap().into(),
));
let r = BinnedRangeEnum::covering_range(range, 9).unwrap();
assert_eq!(r.bin_count(), 10);
if let Dim0Kind::Time = r.dim0kind() {
} else {
panic!()
}
let r2 = r.binned_range_time();
let a = r2.edges();
assert_eq!(a.len(), 11);
assert_eq!(a[0], TsNano((((10 * 60) + 10) * 60 + 0) * SEC));
assert_eq!(a[1], TsNano((((10 * 60) + 11) * 60 + 0) * SEC));
assert_eq!(a[10], TsNano((((10 * 60) + 20) * 60 + 0) * SEC));
}

View File

@@ -178,10 +178,10 @@ fn time_bin_02() -> Result<(), Error> {
// Then the Merge will happen always by default for backends where this is needed.
// TODO then apply the transform chain for the after-merged-stream.
let stream = stream.map(|x| {
//
on_sitemty_data!(x, |x| Ok(StreamItem::DataItem(RangeCompletableItem::Data(
let x = on_sitemty_data!(x, |x| Ok(StreamItem::DataItem(RangeCompletableItem::Data(
Box::new(x) as Box<dyn TimeBinnable>
))))
))));
x
});
let stream = Box::pin(stream);
let mut binned_stream =