WIP transform

This commit is contained in:
Dominik Werder
2023-05-11 16:32:07 +02:00
parent fb875f81d6
commit 3c6c1a24ff
8 changed files with 198 additions and 22 deletions

View File

@@ -32,6 +32,10 @@ pub trait Empty {
fn empty() -> Self;
}
pub trait Resettable {
fn reset(&mut self);
}
pub trait Appendable<STY>: Empty + WithLen {
fn push(&mut self, ts: u64, pulse: u64, value: STY);
}
@@ -106,6 +110,7 @@ pub trait Events:
+ erased_serde::Serialize
+ EventsNonObj
{
fn as_time_binnable_ref(&self) -> &dyn TimeBinnable;
fn as_time_binnable_mut(&mut self) -> &mut dyn TimeBinnable;
fn verify(&self) -> bool;
fn output_info(&self);
@@ -156,6 +161,10 @@ impl EventsNonObj for Box<dyn Events> {
}
impl Events for Box<dyn Events> {
fn as_time_binnable_ref(&self) -> &dyn TimeBinnable {
Events::as_time_binnable_ref(self.as_ref())
}
fn as_time_binnable_mut(&mut self) -> &mut dyn TimeBinnable {
Events::as_time_binnable_mut(self.as_mut())
}

View File

@@ -7,6 +7,7 @@ use crate::overlap::RangeOverlapInfo;
use crate::AsAnyMut;
use crate::AsAnyRef;
use crate::Events;
use crate::Resettable;
use crate::TypeName;
use crate::WithLen;
use err::Error;
@@ -15,6 +16,7 @@ use netpod::range::evrange::SeriesRange;
use netpod::BinnedRangeEnum;
use std::any::Any;
use std::fmt;
use std::ops::Range;
// TODO can probably be removed.
pub trait TimeBins {
@@ -56,9 +58,9 @@ pub trait TimeBinnableTy: fmt::Debug + WithLen + Send + Sized {
}
/// Data in time-binned form.
pub trait TimeBinned: Any + TypeName + TimeBinnable + Collectable + erased_serde::Serialize {
pub trait TimeBinned: Any + TypeName + TimeBinnable + Resettable + Collectable + erased_serde::Serialize {
fn clone_box_time_binned(&self) -> Box<dyn TimeBinned>;
fn as_time_binnable_dyn(&self) -> &dyn TimeBinnable;
fn as_time_binnable_ref(&self) -> &dyn TimeBinnable;
fn as_time_binnable_mut(&mut self) -> &mut dyn TimeBinnable;
fn as_collectable_mut(&mut self) -> &mut dyn Collectable;
fn edges_slice(&self) -> (&[u64], &[u64]);
@@ -67,6 +69,9 @@ pub trait TimeBinned: Any + TypeName + TimeBinnable + Collectable + erased_serde
fn maxs(&self) -> Vec<f32>;
fn avgs(&self) -> Vec<f32>;
fn validate(&self) -> Result<(), String>;
fn empty_like_self_box_time_binned(&self) -> Box<dyn TimeBinned>;
fn to_simple_bins_f32(&mut self) -> Box<dyn TimeBinned>;
fn drain_into_tb(&mut self, dst: &mut dyn TimeBinned, range: Range<usize>) -> Result<(), Error>;
}
impl Clone for Box<dyn TimeBinned> {
@@ -457,7 +462,7 @@ impl TimeBinnableTy for Box<dyn TimeBinned> {
fn time_binner_new(&self, binrange: BinnedRangeEnum, do_time_weight: bool) -> Self::TimeBinner {
let binner = self
.as_time_binnable_dyn()
.as_time_binnable_ref()
.time_binner_new(binrange.clone(), do_time_weight);
TimeBinnerDynStruct2::new(binrange, do_time_weight, binner)
}

View File

@@ -15,6 +15,7 @@ use items_0::collect_s::Collected;
use items_0::collect_s::CollectorType;
use items_0::collect_s::ToJsonResult;
use items_0::overlap::HasTimestampDeque;
use items_0::scalar_ops::AsPrimF32;
use items_0::scalar_ops::ScalarOps;
use items_0::timebin::TimeBinnable;
use items_0::timebin::TimeBinned;
@@ -26,6 +27,7 @@ use items_0::AsAnyMut;
use items_0::AsAnyRef;
use items_0::Empty;
use items_0::HasNonemptyFirstBin;
use items_0::Resettable;
use items_0::TypeName;
use items_0::WithLen;
use netpod::is_false;
@@ -43,6 +45,8 @@ use std::any;
use std::any::Any;
use std::collections::VecDeque;
use std::fmt;
use std::mem;
use std::ops::Range;
#[allow(unused)]
macro_rules! trace4 {
@@ -154,16 +158,16 @@ where
}
}
impl<NTY> AsAnyMut for BinsDim0<NTY>
impl<STY> AsAnyMut for BinsDim0<STY>
where
NTY: ScalarOps,
STY: ScalarOps,
{
fn as_any_mut(&mut self) -> &mut dyn Any {
self
}
}
impl<NTY> Empty for BinsDim0<NTY> {
impl<STY> Empty for BinsDim0<STY> {
fn empty() -> Self {
Self {
ts1s: VecDeque::new(),
@@ -177,12 +181,23 @@ impl<NTY> Empty for BinsDim0<NTY> {
}
}
impl<NTY> WithLen for BinsDim0<NTY> {
impl<STY> WithLen for BinsDim0<STY> {
fn len(&self) -> usize {
self.ts1s.len()
}
}
impl<STY> Resettable for BinsDim0<STY> {
fn reset(&mut self) {
self.ts1s.clear();
self.ts2s.clear();
self.counts.clear();
self.mins.clear();
self.maxs.clear();
self.avgs.clear();
}
}
impl<STY: ScalarOps> HasNonemptyFirstBin for BinsDim0<STY> {
fn has_nonempty_first_bin(&self) -> bool {
self.counts.front().map_or(false, |x| *x > 0)
@@ -861,7 +876,7 @@ impl<NTY: ScalarOps> TimeBinned for BinsDim0<NTY> {
Box::new(self.clone())
}
fn as_time_binnable_dyn(&self) -> &dyn TimeBinnable {
fn as_time_binnable_ref(&self) -> &dyn TimeBinnable {
self
}
@@ -920,6 +935,42 @@ impl<NTY: ScalarOps> TimeBinned for BinsDim0<NTY> {
fn as_collectable_mut(&mut self) -> &mut dyn Collectable {
self
}
fn empty_like_self_box_time_binned(&self) -> Box<dyn TimeBinned> {
Box::new(Self::empty())
}
fn to_simple_bins_f32(&mut self) -> Box<dyn TimeBinned> {
use mem::replace;
let ret = BinsDim0::<f32> {
ts1s: replace(&mut self.ts1s, VecDeque::new()),
ts2s: replace(&mut self.ts2s, VecDeque::new()),
counts: replace(&mut self.counts, VecDeque::new()),
mins: self.mins.iter().map(AsPrimF32::as_prim_f32_b).collect(),
maxs: self.mins.iter().map(AsPrimF32::as_prim_f32_b).collect(),
avgs: replace(&mut self.avgs, VecDeque::new()),
dim0kind: None,
};
Box::new(ret)
}
fn drain_into_tb(&mut self, dst: &mut dyn TimeBinned, range: Range<usize>) -> Result<(), Error> {
// TODO as_any and as_any_mut are declared on unrelated traits. Simplify.
if let Some(dst) = dst.as_any_mut().downcast_mut::<Self>() {
// TODO make it harder to forget new members when the struct may get modified in the future
dst.ts1s.extend(self.ts1s.drain(range.clone()));
dst.ts2s.extend(self.ts2s.drain(range.clone()));
dst.counts.extend(self.counts.drain(range.clone()));
dst.mins.extend(self.mins.drain(range.clone()));
dst.maxs.extend(self.maxs.drain(range.clone()));
dst.avgs.extend(self.avgs.drain(range.clone()));
Ok(())
} else {
let type_name = any::type_name::<Self>();
error!("downcast to {} FAILED", type_name);
Err(Error::with_msg_no_trace(format!("downcast to {} FAILED", type_name)))
}
}
}
#[test]
@@ -931,7 +982,7 @@ fn bins_timebin_fill_empty_00() {
bin_cnt: 5,
});
let do_time_weight = true;
let mut binner = bins.as_time_binnable_dyn().time_binner_new(binrange, do_time_weight);
let mut binner = bins.as_time_binnable_ref().time_binner_new(binrange, do_time_weight);
binner.ingest(&mut bins);
binner.append_empty_until_end();
let ready = binner.bins_ready();
@@ -953,7 +1004,7 @@ fn bins_timebin_fill_empty_01() {
bin_cnt: 5,
});
let do_time_weight = true;
let mut binner = bins.as_time_binnable_dyn().time_binner_new(binrange, do_time_weight);
let mut binner = bins.as_time_binnable_ref().time_binner_new(binrange, do_time_weight);
binner.ingest(&mut bins);
binner.push_in_progress(true);
binner.append_empty_until_end();
@@ -976,7 +1027,7 @@ fn bins_timebin_push_empty_00() {
bin_cnt: 5,
});
let do_time_weight = true;
let mut binner = bins.as_time_binnable_dyn().time_binner_new(binrange, do_time_weight);
let mut binner = bins.as_time_binnable_ref().time_binner_new(binrange, do_time_weight);
binner.ingest(&mut bins);
binner.push_in_progress(true);
let ready = binner.bins_ready();
@@ -998,7 +1049,7 @@ fn bins_timebin_push_empty_01() {
bin_cnt: 5,
});
let do_time_weight = true;
let mut binner = bins.as_time_binnable_dyn().time_binner_new(binrange, do_time_weight);
let mut binner = bins.as_time_binnable_ref().time_binner_new(binrange, do_time_weight);
binner.ingest(&mut bins);
binner.push_in_progress(true);
binner.push_in_progress(true);
@@ -1024,7 +1075,7 @@ fn bins_timebin_ingest_only_before() {
bin_cnt: 5,
});
let do_time_weight = true;
let mut binner = bins.as_time_binnable_dyn().time_binner_new(binrange, do_time_weight);
let mut binner = bins.as_time_binnable_ref().time_binner_new(binrange, do_time_weight);
binner.ingest(&mut bins);
binner.push_in_progress(true);
let ready = binner.bins_ready();
@@ -1047,7 +1098,7 @@ fn bins_timebin_ingest_00() {
bin_cnt: 5,
});
let do_time_weight = true;
let mut binner = bins.as_time_binnable_dyn().time_binner_new(binrange, do_time_weight);
let mut binner = bins.as_time_binnable_ref().time_binner_new(binrange, do_time_weight);
binner.ingest(&mut bins);
binner.push_in_progress(true);
let ready = binner.bins_ready();
@@ -1059,3 +1110,28 @@ fn bins_timebin_ingest_00() {
exp.push(SEC * 22, SEC * 24, 6, 72, 92, 81.);
assert_eq!(got, &exp);
}
#[test]
fn bins_timebin_ingest_continuous_00() {
let binrange = BinnedRangeEnum::Time(BinnedRange {
bin_len: TsNano(SEC * 2),
bin_off: 9,
bin_cnt: 20,
});
let do_time_weight = true;
let mut bins = BinsDim0::<u32>::empty();
bins.push(SEC * 20, SEC * 21, 3, 70, 94, 82.);
//bins.push(SEC * 21, SEC * 22, 5, 71, 93, 86.);
//bins.push(SEC * 23, SEC * 24, 6, 72, 92, 81.);
let mut binner = bins.as_time_binnable_ref().time_binner_new(binrange, do_time_weight);
binner.ingest(&mut bins);
//binner.push_in_progress(true);
let ready = binner.bins_ready();
let got = ready.unwrap();
let got: &BinsDim0<u32> = got.as_any_ref().downcast_ref().unwrap();
let mut exp = BinsDim0::empty();
exp.push(SEC * 18, SEC * 20, 0, 0, 0, 0.);
exp.push(SEC * 20, SEC * 22, 8, 70, 94, 84.);
exp.push(SEC * 22, SEC * 24, 6, 72, 92, 81.);
assert_eq!(got, &exp);
}

View File

@@ -11,6 +11,7 @@ use items_0::collect_s::CollectableType;
use items_0::collect_s::Collected;
use items_0::collect_s::CollectorType;
use items_0::collect_s::ToJsonResult;
use items_0::scalar_ops::AsPrimF32;
use items_0::scalar_ops::ScalarOps;
use items_0::timebin::TimeBinnable;
use items_0::timebin::TimeBinned;
@@ -20,6 +21,7 @@ use items_0::AppendEmptyBin;
use items_0::AsAnyMut;
use items_0::AsAnyRef;
use items_0::Empty;
use items_0::Resettable;
use items_0::TypeName;
use items_0::WithLen;
use netpod::is_false;
@@ -37,6 +39,7 @@ use std::any::Any;
use std::collections::VecDeque;
use std::fmt;
use std::mem;
use std::ops::Range;
#[allow(unused)]
macro_rules! trace4 {
@@ -169,16 +172,16 @@ where
}
}
impl<NTY> AsAnyMut for BinsXbinDim0<NTY>
impl<STY> AsAnyMut for BinsXbinDim0<STY>
where
NTY: ScalarOps,
STY: ScalarOps,
{
fn as_any_mut(&mut self) -> &mut dyn Any {
self
}
}
impl<NTY> Empty for BinsXbinDim0<NTY> {
impl<STY> Empty for BinsXbinDim0<STY> {
fn empty() -> Self {
Self {
ts1s: VecDeque::new(),
@@ -192,12 +195,23 @@ impl<NTY> Empty for BinsXbinDim0<NTY> {
}
}
impl<NTY> WithLen for BinsXbinDim0<NTY> {
impl<STY> WithLen for BinsXbinDim0<STY> {
fn len(&self) -> usize {
self.ts1s.len()
}
}
impl<STY> Resettable for BinsXbinDim0<STY> {
fn reset(&mut self) {
self.ts1s.clear();
self.ts2s.clear();
self.counts.clear();
self.mins.clear();
self.maxs.clear();
self.avgs.clear();
}
}
impl<NTY> RangeOverlapInfo for BinsXbinDim0<NTY> {
fn ends_before(&self, range: &SeriesRange) -> bool {
todo!()
@@ -757,7 +771,7 @@ impl<NTY: ScalarOps> TimeBinned for BinsXbinDim0<NTY> {
Box::new(self.clone())
}
fn as_time_binnable_dyn(&self) -> &dyn TimeBinnable {
fn as_time_binnable_ref(&self) -> &dyn TimeBinnable {
self
}
@@ -816,4 +830,26 @@ impl<NTY: ScalarOps> TimeBinned for BinsXbinDim0<NTY> {
fn as_collectable_mut(&mut self) -> &mut dyn Collectable {
self
}
fn empty_like_self_box_time_binned(&self) -> Box<dyn TimeBinned> {
Box::new(Self::empty())
}
fn to_simple_bins_f32(&mut self) -> Box<dyn TimeBinned> {
use mem::replace;
let ret = BinsXbinDim0::<f32> {
ts1s: replace(&mut self.ts1s, VecDeque::new()),
ts2s: replace(&mut self.ts2s, VecDeque::new()),
counts: replace(&mut self.counts, VecDeque::new()),
mins: self.mins.iter().map(AsPrimF32::as_prim_f32_b).collect(),
maxs: self.mins.iter().map(AsPrimF32::as_prim_f32_b).collect(),
avgs: replace(&mut self.avgs, VecDeque::new()),
dim0kind: None,
};
Box::new(ret)
}
fn drain_into_tb(&mut self, dst: &mut dyn TimeBinned, range: Range<usize>) -> Result<(), Error> {
todo!()
}
}

View File

@@ -700,6 +700,10 @@ impl EventsNonObj for ChannelEvents {
}
impl Events for ChannelEvents {
fn as_time_binnable_ref(&self) -> &dyn TimeBinnable {
todo!()
}
fn as_time_binnable_mut(&mut self) -> &mut dyn TimeBinnable {
todo!()
}

View File

@@ -22,6 +22,7 @@ use items_0::container::ByteEstimate;
use items_0::framable::FrameTypeInnerStatic;
use items_0::overlap::HasTimestampDeque;
use items_0::scalar_ops::ScalarOps;
use items_0::test::f32_iter_cmp_near;
use items_0::timebin::TimeBinnable;
use items_0::timebin::TimeBinned;
use items_0::timebin::TimeBinner;
@@ -35,6 +36,7 @@ use items_0::Events;
use items_0::EventsNonObj;
use items_0::HasNonemptyFirstBin;
use items_0::MergeError;
use items_0::Resettable;
use items_0::TypeName;
use items_0::WithLen;
use netpod::is_false;
@@ -43,6 +45,7 @@ use netpod::range::evrange::NanoRange;
use netpod::range::evrange::SeriesRange;
use netpod::timeunits::MS;
use netpod::timeunits::SEC;
use netpod::BinnedRange;
use netpod::BinnedRangeEnum;
use netpod::TsNano;
use serde::Deserialize;
@@ -168,6 +171,14 @@ impl<STY> ByteEstimate for EventsDim0<STY> {
}
}
impl<STY> Resettable for EventsDim0<STY> {
fn reset(&mut self) {
self.tss.clear();
self.pulses.clear();
self.values.clear();
}
}
impl<STY: ScalarOps> HasTimestampDeque for EventsDim0<STY> {
fn timestamp_min(&self) -> Option<u64> {
self.tss.front().map(|x| *x)
@@ -757,8 +768,12 @@ impl<STY: ScalarOps> EventsNonObj for EventsDim0<STY> {
}
impl<STY: ScalarOps> Events for EventsDim0<STY> {
fn as_time_binnable_ref(&self) -> &dyn TimeBinnable {
self
}
fn as_time_binnable_mut(&mut self) -> &mut dyn TimeBinnable {
self as &mut dyn TimeBinnable
self
}
fn verify(&self) -> bool {
@@ -1295,3 +1310,26 @@ fn bin_binned_02() {
assert_eq!(bins.avgs(), &[13. / 2.]);
}
*/
#[test]
fn events_timebin_ingest_continuous_00() {
let binrange = BinnedRangeEnum::Time(BinnedRange {
bin_len: TsNano(SEC * 2),
bin_off: 9,
bin_cnt: 20,
});
let do_time_weight = true;
let mut bins = EventsDim0::<u32>::empty();
bins.push(SEC * 20, 1, 20);
bins.push(SEC * 23, 2, 23);
let mut binner = bins.as_time_binnable_ref().time_binner_new(binrange, do_time_weight);
binner.ingest(&mut bins);
//binner.push_in_progress(true);
let ready = binner.bins_ready();
let got = ready.unwrap();
let got: &BinsDim0<u32> = got.as_any_ref().downcast_ref().unwrap();
let mut exp = BinsDim0::empty();
exp.push(SEC * 18, SEC * 20, 0, 0, 0, 0.);
exp.push(SEC * 20, SEC * 22, 1, 20, 20, 20.);
assert!(f32_iter_cmp_near(got.avgs.clone(), exp.avgs.clone(), 0.0001, 0.0001));
}

View File

@@ -679,8 +679,12 @@ impl<STY: ScalarOps> EventsNonObj for EventsDim1<STY> {
}
impl<STY: ScalarOps> Events for EventsDim1<STY> {
fn as_time_binnable_ref(&self) -> &dyn TimeBinnable {
self
}
fn as_time_binnable_mut(&mut self) -> &mut dyn TimeBinnable {
self as &mut dyn TimeBinnable
self
}
fn verify(&self) -> bool {

View File

@@ -195,8 +195,12 @@ impl<STY: ScalarOps> EventsNonObj for EventsXbinDim0<STY> {
}
impl<STY: ScalarOps> Events for EventsXbinDim0<STY> {
fn as_time_binnable_ref(&self) -> &dyn TimeBinnable {
self
}
fn as_time_binnable_mut(&mut self) -> &mut dyn TimeBinnable {
self as &mut dyn TimeBinnable
self
}
fn verify(&self) -> bool {