WIP
This commit is contained in:
@@ -1,4 +1,6 @@
|
||||
pub mod aggregator;
|
||||
pub mod binnedvaluetype;
|
||||
pub mod container_bins;
|
||||
pub mod container_events;
|
||||
pub mod timeweight;
|
||||
pub mod valuetype;
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
use super::binnedvaluetype::BinnedNumericValue;
|
||||
use super::binnedvaluetype::BinnedValueType;
|
||||
use super::container_events::EventValueType;
|
||||
use netpod::DtNano;
|
||||
|
||||
@@ -5,9 +7,12 @@ pub trait AggregatorTimeWeight<EVT>
|
||||
where
|
||||
EVT: EventValueType,
|
||||
{
|
||||
type OutputAvg;
|
||||
|
||||
fn new() -> Self;
|
||||
fn reset_for_new_bin(&mut self);
|
||||
fn ingest(&mut self, dt: DtNano, bl: DtNano, val: EVT);
|
||||
fn reset_for_new_bin(&mut self);
|
||||
fn result_and_reset_for_new_bin(&mut self) -> Self::OutputAvg;
|
||||
}
|
||||
|
||||
pub struct AggregatorNumeric<EVT> {
|
||||
@@ -22,49 +27,77 @@ impl<EVT> AggregatorTimeWeight<EVT> for AggregatorNumeric<EVT>
|
||||
where
|
||||
EVT: AggWithSame,
|
||||
{
|
||||
fn new() -> Self {
|
||||
todo!()
|
||||
}
|
||||
type OutputAvg = EVT;
|
||||
|
||||
fn reset_for_new_bin(&mut self) {
|
||||
todo!()
|
||||
fn new() -> Self {
|
||||
Self {
|
||||
sum: EVT::identity_sum(),
|
||||
}
|
||||
}
|
||||
|
||||
fn ingest(&mut self, dt: DtNano, bl: DtNano, val: EVT) {
|
||||
todo!()
|
||||
let f = dt.ns() as f32 / bl.ns() as f32;
|
||||
eprintln!("INGEST {} {:?}", f, val);
|
||||
self.sum.add_weighted(&val, f);
|
||||
}
|
||||
|
||||
fn reset_for_new_bin(&mut self) {
|
||||
self.sum = EVT::identity_sum();
|
||||
}
|
||||
|
||||
fn result_and_reset_for_new_bin(&mut self) -> Self::OutputAvg {
|
||||
let ret = self.sum.clone();
|
||||
self.sum = EVT::identity_sum();
|
||||
ret
|
||||
}
|
||||
}
|
||||
|
||||
impl AggregatorTimeWeight<f32> for AggregatorNumeric<f32> {
|
||||
type OutputAvg = f32;
|
||||
|
||||
fn new() -> Self {
|
||||
Self {
|
||||
sum: f32::sum_identity(),
|
||||
sum: f32::identity_sum(),
|
||||
}
|
||||
}
|
||||
|
||||
fn reset_for_new_bin(&mut self) {
|
||||
self.sum = f32::sum_identity();
|
||||
}
|
||||
|
||||
fn ingest(&mut self, dt: DtNano, bl: DtNano, val: f32) {
|
||||
let f = dt.ns() as f32 / bl.ns() as f32;
|
||||
eprintln!("INGEST {} {}", f, val);
|
||||
self.sum += f * val;
|
||||
}
|
||||
|
||||
fn reset_for_new_bin(&mut self) {
|
||||
self.sum = f32::identity_sum();
|
||||
}
|
||||
|
||||
fn result_and_reset_for_new_bin(&mut self) -> Self::OutputAvg {
|
||||
let ret = self.sum.clone();
|
||||
self.sum = f32::identity_sum();
|
||||
ret
|
||||
}
|
||||
}
|
||||
|
||||
impl AggregatorTimeWeight<u64> for AggregatorNumeric<u64> {
|
||||
fn new() -> Self {
|
||||
todo!()
|
||||
}
|
||||
type OutputAvg = u64;
|
||||
|
||||
fn reset_for_new_bin(&mut self) {
|
||||
fn new() -> Self {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn ingest(&mut self, dt: DtNano, bl: DtNano, val: u64) {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn reset_for_new_bin(&mut self) {
|
||||
self.sum = u64::identity_sum();
|
||||
}
|
||||
|
||||
fn result_and_reset_for_new_bin(&mut self) -> Self::OutputAvg {
|
||||
let ret = self.sum.clone();
|
||||
self.sum = u64::identity_sum();
|
||||
ret
|
||||
}
|
||||
}
|
||||
|
||||
// TODO do enum right from begin, using a SOA enum container.
|
||||
|
||||
8
crates/items_2/src/binning/binnedvaluetype.rs
Normal file
8
crates/items_2/src/binning/binnedvaluetype.rs
Normal file
@@ -0,0 +1,8 @@
|
||||
pub trait BinnedValueType {}
|
||||
|
||||
pub struct BinnedNumericValue<EVT> {
|
||||
avg: f32,
|
||||
_t: Option<EVT>,
|
||||
}
|
||||
|
||||
impl<EVT> BinnedValueType for BinnedNumericValue<EVT> {}
|
||||
191
crates/items_2/src/binning/container_bins.rs
Normal file
191
crates/items_2/src/binning/container_bins.rs
Normal file
@@ -0,0 +1,191 @@
|
||||
use super::aggregator::AggregatorNumeric;
|
||||
use super::aggregator::AggregatorTimeWeight;
|
||||
use super::container_events::EventValueType;
|
||||
use super::___;
|
||||
use crate::vecpreview::PreviewRange;
|
||||
use crate::vecpreview::VecPreview;
|
||||
use core::fmt;
|
||||
use err::thiserror;
|
||||
use err::ThisError;
|
||||
use netpod::TsNano;
|
||||
use serde::Deserialize;
|
||||
use serde::Serialize;
|
||||
use std::any;
|
||||
use std::collections::VecDeque;
|
||||
|
||||
#[allow(unused)]
|
||||
macro_rules! trace_init { ($($arg:tt)*) => ( if true { trace!($($arg)*); }) }
|
||||
|
||||
#[derive(Debug, ThisError)]
|
||||
#[cstm(name = "ContainerBins")]
|
||||
pub enum ContainerBinsError {
|
||||
Unordered,
|
||||
}
|
||||
|
||||
pub trait BinValueType: fmt::Debug + Clone + PartialOrd {
|
||||
// type Container: Container<Self>;
|
||||
// type AggregatorTimeWeight: AggregatorTimeWeight<Self>;
|
||||
// type AggTimeWeightOutputAvg;
|
||||
|
||||
// fn identity_sum() -> Self;
|
||||
// fn add_weighted(&self, add: &Self, f: f32) -> Self;
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct BinSingle<EVT> {
|
||||
pub ts1: TsNano,
|
||||
pub ts2: TsNano,
|
||||
pub cnt: u64,
|
||||
pub min: EVT,
|
||||
pub max: EVT,
|
||||
pub avg: f32,
|
||||
pub lst: EVT,
|
||||
}
|
||||
|
||||
#[derive(Clone, Serialize, Deserialize)]
|
||||
pub struct ContainerBins<EVT>
|
||||
where
|
||||
EVT: EventValueType,
|
||||
{
|
||||
ts1s: VecDeque<TsNano>,
|
||||
ts2s: VecDeque<TsNano>,
|
||||
cnts: VecDeque<u64>,
|
||||
mins: VecDeque<EVT>,
|
||||
maxs: VecDeque<EVT>,
|
||||
avgs: VecDeque<f32>,
|
||||
lsts: VecDeque<EVT>,
|
||||
}
|
||||
|
||||
impl<EVT> ContainerBins<EVT>
|
||||
where
|
||||
EVT: EventValueType,
|
||||
{
|
||||
pub fn type_name() -> &'static str {
|
||||
any::type_name::<Self>()
|
||||
}
|
||||
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
ts1s: VecDeque::new(),
|
||||
ts2s: VecDeque::new(),
|
||||
cnts: VecDeque::new(),
|
||||
mins: VecDeque::new(),
|
||||
maxs: VecDeque::new(),
|
||||
avgs: VecDeque::new(),
|
||||
lsts: VecDeque::new(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn len(&self) -> usize {
|
||||
self.ts1s.len()
|
||||
}
|
||||
|
||||
pub fn verify(&self) -> Result<(), ContainerBinsError> {
|
||||
if self.ts1s.iter().zip(self.ts1s.iter().skip(1)).any(|(&a, &b)| a > b) {
|
||||
return Err(ContainerBinsError::Unordered);
|
||||
}
|
||||
if self.ts2s.iter().zip(self.ts2s.iter().skip(1)).any(|(&a, &b)| a > b) {
|
||||
return Err(ContainerBinsError::Unordered);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn ts1_first(&self) -> Option<TsNano> {
|
||||
self.ts1s.front().map(|&x| x)
|
||||
}
|
||||
|
||||
pub fn ts2_last(&self) -> Option<TsNano> {
|
||||
self.ts2s.back().map(|&x| x)
|
||||
}
|
||||
|
||||
pub fn len_before(&self, end: TsNano) -> usize {
|
||||
let pp = self.ts2s.partition_point(|&x| x <= end);
|
||||
assert!(pp <= self.len(), "len_before pp {} len {}", pp, self.len());
|
||||
pp
|
||||
}
|
||||
|
||||
pub fn pop_front(&mut self) -> Option<BinSingle<EVT>> {
|
||||
let ts1 = if let Some(x) = self.ts1s.pop_front() {
|
||||
x
|
||||
} else {
|
||||
return None;
|
||||
};
|
||||
let ts2 = if let Some(x) = self.ts2s.pop_front() {
|
||||
x
|
||||
} else {
|
||||
return None;
|
||||
};
|
||||
todo!()
|
||||
}
|
||||
|
||||
// pub fn push_back(&mut self, ts1: TsNano, val: EVT) {
|
||||
// self.tss.push_back(ts);
|
||||
// self.vals.push_back(val);
|
||||
// }
|
||||
}
|
||||
|
||||
impl<EVT> fmt::Debug for ContainerBins<EVT>
|
||||
where
|
||||
EVT: EventValueType,
|
||||
{
|
||||
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
|
||||
let self_name = any::type_name::<Self>();
|
||||
write!(
|
||||
fmt,
|
||||
"{self_name} {{ len: {:?}, ts1s: {:?}, ts2s: {:?}, cnts: {:?}, avgs {:?} }}",
|
||||
self.len(),
|
||||
VecPreview::new(&self.ts1s),
|
||||
VecPreview::new(&self.ts2s),
|
||||
VecPreview::new(&self.cnts),
|
||||
VecPreview::new(&self.avgs),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
pub struct ContainerBinsTakeUpTo<'a, EVT>
|
||||
where
|
||||
EVT: EventValueType,
|
||||
{
|
||||
evs: &'a mut ContainerBins<EVT>,
|
||||
len: usize,
|
||||
}
|
||||
|
||||
impl<'a, EVT> ContainerBinsTakeUpTo<'a, EVT>
|
||||
where
|
||||
EVT: EventValueType,
|
||||
{
|
||||
pub fn new(evs: &'a mut ContainerBins<EVT>, len: usize) -> Self {
|
||||
let len = len.min(evs.len());
|
||||
Self { evs, len }
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, EVT> ContainerBinsTakeUpTo<'a, EVT>
|
||||
where
|
||||
EVT: EventValueType,
|
||||
{
|
||||
pub fn ts1_first(&self) -> Option<TsNano> {
|
||||
self.evs.ts1_first()
|
||||
}
|
||||
|
||||
pub fn ts2_last(&self) -> Option<TsNano> {
|
||||
self.evs.ts2_last()
|
||||
}
|
||||
|
||||
pub fn len(&self) -> usize {
|
||||
self.len
|
||||
}
|
||||
|
||||
pub fn pop_front(&mut self) -> Option<BinSingle<EVT>> {
|
||||
if self.len != 0 {
|
||||
if let Some(ev) = self.evs.pop_front() {
|
||||
self.len -= 1;
|
||||
Some(ev)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -29,8 +29,10 @@ pub trait Container<EVT>: fmt::Debug + Clone + PreviewRange + Serialize + for<'a
|
||||
pub trait EventValueType: fmt::Debug + Clone + PartialOrd {
|
||||
type Container: Container<Self>;
|
||||
type AggregatorTimeWeight: AggregatorTimeWeight<Self>;
|
||||
type AggTimeWeightOutputAvg;
|
||||
|
||||
fn sum_identity() -> Self;
|
||||
fn identity_sum() -> Self;
|
||||
fn add_weighted(&self, add: &Self, f: f32) -> Self;
|
||||
}
|
||||
|
||||
impl<EVT> Container<EVT> for VecDeque<EVT>
|
||||
@@ -53,28 +55,43 @@ where
|
||||
impl EventValueType for f32 {
|
||||
type Container = VecDeque<Self>;
|
||||
type AggregatorTimeWeight = AggregatorNumeric<Self>;
|
||||
type AggTimeWeightOutputAvg = <Self::AggregatorTimeWeight as AggregatorTimeWeight<Self>>::OutputAvg;
|
||||
|
||||
fn sum_identity() -> Self {
|
||||
fn identity_sum() -> Self {
|
||||
0.
|
||||
}
|
||||
|
||||
fn add_weighted(&self, add: &Self, f: f32) -> Self {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
||||
impl EventValueType for f64 {
|
||||
type Container = VecDeque<Self>;
|
||||
type AggregatorTimeWeight = AggregatorNumeric<Self>;
|
||||
type AggTimeWeightOutputAvg = <Self::AggregatorTimeWeight as AggregatorTimeWeight<Self>>::OutputAvg;
|
||||
|
||||
fn sum_identity() -> Self {
|
||||
fn identity_sum() -> Self {
|
||||
0.
|
||||
}
|
||||
|
||||
fn add_weighted(&self, add: &Self, f: f32) -> Self {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
||||
impl EventValueType for u64 {
|
||||
type Container = VecDeque<Self>;
|
||||
type AggregatorTimeWeight = AggregatorNumeric<Self>;
|
||||
type AggTimeWeightOutputAvg = <Self::AggregatorTimeWeight as AggregatorTimeWeight<Self>>::OutputAvg;
|
||||
|
||||
fn sum_identity() -> Self {
|
||||
fn identity_sum() -> Self {
|
||||
0
|
||||
}
|
||||
|
||||
fn add_weighted(&self, add: &Self, f: f32) -> Self {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
@@ -138,7 +155,7 @@ where
|
||||
pp
|
||||
}
|
||||
|
||||
pub fn event_next(&mut self) -> Option<EventSingle<EVT>> {
|
||||
pub fn pop_front(&mut self) -> Option<EventSingle<EVT>> {
|
||||
if let (Some(ts), Some(val)) = (self.tss.pop_front(), self.vals.pop_front()) {
|
||||
Some(EventSingle { ts, val })
|
||||
} else {
|
||||
@@ -202,9 +219,9 @@ where
|
||||
self.len
|
||||
}
|
||||
|
||||
pub fn event_next(&mut self) -> Option<EventSingle<EVT>> {
|
||||
pub fn pop_front(&mut self) -> Option<EventSingle<EVT>> {
|
||||
if self.len != 0 {
|
||||
if let Some(ev) = self.evs.event_next() {
|
||||
if let Some(ev) = self.evs.pop_front() {
|
||||
self.len -= 1;
|
||||
Some(ev)
|
||||
} else {
|
||||
|
||||
@@ -53,7 +53,7 @@ fn test_bin_events_f32_simple_01() -> Result<(), Error> {
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_bin_events_enum_simple_00() -> Result<(), Error> {
|
||||
fn test_bin_events_enum_simple_range_final() -> Result<(), Error> {
|
||||
let beg = TsNano::from_ms(100);
|
||||
let end = TsNano::from_ms(120);
|
||||
let nano_range = NanoRange {
|
||||
@@ -66,5 +66,6 @@ fn test_bin_events_enum_simple_00() -> Result<(), Error> {
|
||||
evs.push_back(TsNano::from_ms(103), EnumVariant::new(1, "one"));
|
||||
evs.push_back(TsNano::from_ms(104), EnumVariant::new(2, "two"));
|
||||
binner.ingest(evs)?;
|
||||
binner.range_final()?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -11,6 +11,7 @@ use netpod::log::*;
|
||||
use netpod::BinnedRange;
|
||||
use netpod::DtNano;
|
||||
use netpod::TsNano;
|
||||
use std::collections::VecDeque;
|
||||
use std::marker::PhantomData;
|
||||
use std::pin::Pin;
|
||||
use std::task::Context;
|
||||
@@ -147,7 +148,7 @@ where
|
||||
minmax: &mut MinMax<EVT>,
|
||||
) -> Result<(), Error> {
|
||||
trace_ingest_event!("ingest_with_lst_gt_range_beg");
|
||||
while let Some(ev) = evs.event_next() {
|
||||
while let Some(ev) = evs.pop_front() {
|
||||
trace_event_next!("ingest_with_lst_ge_range_beg {:?}", ev);
|
||||
if ev.ts <= self.active_beg {
|
||||
panic!("should never get here");
|
||||
@@ -168,7 +169,7 @@ where
|
||||
minmax: &mut MinMax<EVT>,
|
||||
) -> Result<(), Error> {
|
||||
trace_ingest_event!("ingest_with_lst_ge_range_beg");
|
||||
while let Some(ev) = evs.event_next() {
|
||||
while let Some(ev) = evs.pop_front() {
|
||||
trace_event_next!("ingest_with_lst_ge_range_beg {:?}", ev);
|
||||
if ev.ts < self.active_beg {
|
||||
panic!("should never get here");
|
||||
@@ -207,6 +208,24 @@ where
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
// PRECONDITION: filled_until < ts <= active_end
|
||||
fn fill_until(&mut self, ts: TsNano, lst: LstRef<EVT>) {
|
||||
trace_cycle!("fill_until ts {:?}", ts);
|
||||
let b = self;
|
||||
assert!(b.filled_until < ts);
|
||||
assert!(ts <= b.active_end);
|
||||
b.agg.ingest(ts.delta(b.filled_until), b.active_len, lst.0.val.clone());
|
||||
b.filled_until = ts;
|
||||
}
|
||||
|
||||
fn fill_remaining_if_space_left(&mut self, lst: LstRef<EVT>) {
|
||||
trace_cycle!("fill_remaining_if_space_left");
|
||||
let b = self;
|
||||
if b.filled_until < b.active_end {
|
||||
b.fill_until(b.active_end, lst);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct InnerA<EVT>
|
||||
@@ -249,7 +268,7 @@ where
|
||||
if let Some(minmax) = self.minmax.as_mut() {
|
||||
self.inner_b.ingest_with_lst_minmax(evs, lst, minmax)
|
||||
} else {
|
||||
if let Some(ev) = evs.event_next() {
|
||||
if let Some(ev) = evs.pop_front() {
|
||||
trace_event_next!("ingest_with_lst {:?}", ev);
|
||||
let beg = self.inner_b.active_beg;
|
||||
let end = self.inner_b.active_end;
|
||||
@@ -290,6 +309,7 @@ where
|
||||
lst: Option<EventSingle<EVT>>,
|
||||
range: BinnedRange<TsNano>,
|
||||
inner_a: InnerA<EVT>,
|
||||
out: VecDeque<EVT::AggTimeWeightOutputAvg>,
|
||||
}
|
||||
|
||||
impl<EVT> BinnedEventsTimeweight<EVT>
|
||||
@@ -314,12 +334,13 @@ where
|
||||
minmax: None,
|
||||
},
|
||||
lst: None,
|
||||
out: VecDeque::new(),
|
||||
}
|
||||
}
|
||||
|
||||
fn ingest_event_without_lst(&mut self, ev: EventSingle<EVT>) -> Result<(), Error> {
|
||||
if ev.ts >= self.inner_a.inner_b.active_end {
|
||||
Err(Error::EventAfterRange)
|
||||
panic!("should never get here");
|
||||
} else {
|
||||
trace_ingest_init_lst!("ingest_event_without_lst set lst {:?}", ev);
|
||||
self.lst = Some(ev.clone());
|
||||
@@ -333,10 +354,10 @@ where
|
||||
}
|
||||
|
||||
fn ingest_without_lst(&mut self, mut evs: ContainerEventsTakeUpTo<EVT>) -> Result<(), Error> {
|
||||
if let Some(ev) = evs.event_next() {
|
||||
if let Some(ev) = evs.pop_front() {
|
||||
trace_event_next!("ingest_without_lst {:?}", ev);
|
||||
if ev.ts >= self.inner_a.inner_b.active_end {
|
||||
Err(Error::EventAfterRange)
|
||||
panic!("should never get here");
|
||||
} else {
|
||||
self.ingest_event_without_lst(ev)?;
|
||||
if let Some(lst) = self.lst.as_mut() {
|
||||
@@ -364,6 +385,51 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
fn cycle_01(&mut self, ts: TsNano) {
|
||||
let b = &self.inner_a.inner_b;
|
||||
trace_cycle!("cycle_01 {:?} {:?}", ts, b.active_end);
|
||||
assert!(b.active_beg < ts);
|
||||
let div = self.range.bin_len.ns();
|
||||
if let Some(lst) = self.lst.as_ref() {
|
||||
loop {
|
||||
let b = &self.inner_a.inner_b;
|
||||
if b.filled_until >= ts {
|
||||
break;
|
||||
}
|
||||
if ts >= b.active_end {
|
||||
self.inner_a.inner_b.fill_remaining_if_space_left(LstRef(lst));
|
||||
let b = &mut self.inner_a.inner_b;
|
||||
{
|
||||
// TODO push bin to output.
|
||||
let res = b.agg.result_and_reset_for_new_bin();
|
||||
let cnt = b.cnt;
|
||||
b.cnt = 0;
|
||||
}
|
||||
trace_cycle!("cycle_01 filled up to {:?} emit and reset", b.active_end);
|
||||
let old_end = b.active_end;
|
||||
let ts1 = TsNano::from_ns(b.active_end.ns() / div * div);
|
||||
assert!(ts1 == old_end);
|
||||
b.active_beg = ts1;
|
||||
b.active_end = ts1.add_dt_nano(b.active_len);
|
||||
b.filled_until = ts1;
|
||||
self.inner_a.minmax = Some((lst.clone(), lst.clone()));
|
||||
} else {
|
||||
self.inner_a.inner_b.fill_until(ts, LstRef(lst));
|
||||
}
|
||||
}
|
||||
} else {
|
||||
let ts1 = TsNano::from_ns(ts.ns() / div * div);
|
||||
let b = &mut self.inner_a.inner_b;
|
||||
b.active_beg = ts1;
|
||||
b.active_end = ts1.add_dt_nano(b.active_len);
|
||||
b.filled_until = ts1;
|
||||
b.cnt = 0;
|
||||
b.agg.reset_for_new_bin();
|
||||
assert!(self.inner_a.minmax.is_none());
|
||||
trace_cycle!("cycled direct to {:?} {:?}", b.active_beg, b.active_end);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn ingest(&mut self, mut evs_all: ContainerEvents<EVT>) -> Result<(), Error> {
|
||||
// It is this type's task to find and store the one-before event.
|
||||
// We then pass it to the aggregation.
|
||||
@@ -382,35 +448,13 @@ where
|
||||
// How to handle to not emit bins until at least some partially filled bin is encountered?
|
||||
break if let Some(ts) = evs_all.ts_first() {
|
||||
let b = &mut self.inner_a.inner_b;
|
||||
if ts >= self.range.nano_end() {
|
||||
return Err(Error::EventAfterRange);
|
||||
}
|
||||
if ts >= b.active_end {
|
||||
trace_cycle!("bin edge boundary {:?}", b.active_end);
|
||||
if let Some(lst) = self.lst.as_ref() {
|
||||
trace_cycle!("fill remaining width");
|
||||
self.inner_a
|
||||
.inner_b
|
||||
.ingest_event_with_lst_gt_range_beg_agg(lst.clone(), LstRef(lst));
|
||||
} else {
|
||||
// nothing to do
|
||||
}
|
||||
let b = &mut self.inner_a.inner_b;
|
||||
if b.filled_until < b.active_beg {
|
||||
panic!("fille until before bin begin");
|
||||
} else if b.filled_until == b.active_beg {
|
||||
// TODO bin is meaningless
|
||||
} else {
|
||||
// TODO need the output type.
|
||||
}
|
||||
trace_cycle!("cycle bin {:?} {:?}", ts, b.active_end);
|
||||
// TODO check if the bin has content to emit: either it itself contains events, or is filled with lst value.
|
||||
// For the check for filled with lst I might need another flag.
|
||||
let div = self.range.bin_len.ns();
|
||||
let ts1 = TsNano::from_ns(ts.ns() / div * div);
|
||||
b.active_beg = ts1;
|
||||
b.active_end = ts1.add_dt_nano(b.active_len);
|
||||
b.filled_until = ts1;
|
||||
b.cnt = 0;
|
||||
b.agg.reset_for_new_bin();
|
||||
trace_cycle!("cycled to {:?} {:?}", b.active_beg, b.active_end);
|
||||
assert!(b.filled_until < b.active_beg);
|
||||
self.cycle_01(ts);
|
||||
}
|
||||
let n1 = evs_all.len();
|
||||
let len_before = evs_all.len_before(self.inner_a.inner_b.active_end);
|
||||
@@ -440,7 +484,9 @@ where
|
||||
}
|
||||
|
||||
pub fn range_final(&mut self) -> Result<(), Error> {
|
||||
todo!()
|
||||
trace_cycle!("range_final");
|
||||
self.cycle_01(self.range.nano_end());
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
use super::aggregator::AggregatorTimeWeight;
|
||||
use super::binnedvaluetype::BinnedNumericValue;
|
||||
use super::container_events::Container;
|
||||
use super::container_events::EventValueType;
|
||||
use crate::vecpreview::PreviewRange;
|
||||
@@ -48,27 +49,41 @@ pub struct EnumVariantAggregatorTimeWeight {
|
||||
}
|
||||
|
||||
impl AggregatorTimeWeight<EnumVariant> for EnumVariantAggregatorTimeWeight {
|
||||
type OutputAvg = f32;
|
||||
|
||||
fn new() -> Self {
|
||||
Self { sum: 0. }
|
||||
}
|
||||
|
||||
fn reset_for_new_bin(&mut self) {
|
||||
self.sum = 0.
|
||||
}
|
||||
|
||||
fn ingest(&mut self, dt: DtNano, bl: DtNano, val: EnumVariant) {
|
||||
let f = dt.ns() as f32 / bl.ns() as f32;
|
||||
eprintln!("INGEST {} {:?}", f, val);
|
||||
let h = items_0::scalar_ops::AsPrimF32::as_prim_f32_b(&val);
|
||||
self.sum += f * h;
|
||||
eprintln!("INGEST ENUM {} {:?}", f, val);
|
||||
self.sum += f * val.ix() as f32;
|
||||
}
|
||||
|
||||
fn reset_for_new_bin(&mut self) {
|
||||
self.sum = f32::identity_sum();
|
||||
}
|
||||
|
||||
fn result_and_reset_for_new_bin(&mut self) -> Self::OutputAvg {
|
||||
let ret = self.sum.clone();
|
||||
self.sum = f32::identity_sum();
|
||||
ret
|
||||
}
|
||||
}
|
||||
|
||||
impl EventValueType for EnumVariant {
|
||||
type Container = EnumVariantContainer;
|
||||
type AggregatorTimeWeight = EnumVariantAggregatorTimeWeight;
|
||||
type AggTimeWeightOutputAvg = <Self::AggregatorTimeWeight as AggregatorTimeWeight<Self>>::OutputAvg;
|
||||
|
||||
fn sum_identity() -> Self {
|
||||
// TODO remove this from trait, only needed for common numeric cases but not in general.
|
||||
fn identity_sum() -> Self {
|
||||
todo!()
|
||||
}
|
||||
|
||||
// TODO also remove from trait, push it to a more specialized trait for the plain numeric cases.
|
||||
fn add_weighted(&self, add: &Self, f: f32) -> Self {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user