WIP
This commit is contained in:
@@ -1,12 +1,12 @@
|
||||
[package]
|
||||
name = "daqbuffer"
|
||||
version = "0.5.3-aa.3"
|
||||
version = "0.5.3-aa.4"
|
||||
authors = ["Dominik Werder <dominik.werder@gmail.com>"]
|
||||
edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
futures-util = "0.3.29"
|
||||
bytes = "1.5.0"
|
||||
bytes = "1.7.0"
|
||||
serde = "1.0"
|
||||
serde_derive = "1.0"
|
||||
serde_json = "1.0"
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
use super::container_events::EventValueType;
|
||||
use core::fmt;
|
||||
use netpod::log::*;
|
||||
use netpod::DtNano;
|
||||
use serde::Deserialize;
|
||||
use serde::Serialize;
|
||||
@@ -19,7 +20,7 @@ where
|
||||
fn new() -> Self;
|
||||
fn ingest(&mut self, dt: DtNano, bl: DtNano, val: EVT);
|
||||
fn reset_for_new_bin(&mut self);
|
||||
fn result_and_reset_for_new_bin(&mut self) -> EVT::AggTimeWeightOutputAvg;
|
||||
fn result_and_reset_for_new_bin(&mut self, filled_width_fraction: f32) -> EVT::AggTimeWeightOutputAvg;
|
||||
}
|
||||
|
||||
pub struct AggregatorNumeric {
|
||||
@@ -46,7 +47,7 @@ where
|
||||
|
||||
fn ingest(&mut self, dt: DtNano, bl: DtNano, val: EVT) {
|
||||
let f = dt.ns() as f64 / bl.ns() as f64;
|
||||
eprintln!("INGEST {} {:?}", f, val);
|
||||
trace!("INGEST {} {:?}", f, val);
|
||||
self.sum += f * val.as_f64();
|
||||
}
|
||||
|
||||
@@ -54,11 +55,11 @@ where
|
||||
self.sum = 0.;
|
||||
}
|
||||
|
||||
fn result_and_reset_for_new_bin(&mut self) -> EVT::AggTimeWeightOutputAvg {
|
||||
// fn result_and_reset_for_new_bin(&mut self) -> f64 {
|
||||
let ret = self.sum.clone();
|
||||
fn result_and_reset_for_new_bin(&mut self, filled_width_fraction: f32) -> EVT::AggTimeWeightOutputAvg {
|
||||
let sum = self.sum.clone();
|
||||
trace!("result_and_reset_for_new_bin sum {} {}", sum, filled_width_fraction);
|
||||
self.sum = 0.;
|
||||
ret
|
||||
sum / filled_width_fraction as f64
|
||||
}
|
||||
}
|
||||
|
||||
@@ -69,7 +70,7 @@ impl AggregatorTimeWeight<f32> for AggregatorNumeric {
|
||||
|
||||
fn ingest(&mut self, dt: DtNano, bl: DtNano, val: f32) {
|
||||
let f = dt.ns() as f64 / bl.ns() as f64;
|
||||
eprintln!("INGEST {} {}", f, val);
|
||||
trace!("INGEST {} {}", f, val);
|
||||
self.sum += f * val as f64;
|
||||
}
|
||||
|
||||
@@ -77,10 +78,11 @@ impl AggregatorTimeWeight<f32> for AggregatorNumeric {
|
||||
self.sum = 0.;
|
||||
}
|
||||
|
||||
fn result_and_reset_for_new_bin(&mut self) -> f64 {
|
||||
let ret = self.sum.clone();
|
||||
fn result_and_reset_for_new_bin(&mut self, filled_width_fraction: f32) -> f32 {
|
||||
let sum = self.sum.clone() as f32;
|
||||
trace!("result_and_reset_for_new_bin sum {} {}", sum, filled_width_fraction);
|
||||
self.sum = 0.;
|
||||
ret
|
||||
sum / filled_width_fraction
|
||||
}
|
||||
}
|
||||
|
||||
@@ -91,7 +93,7 @@ impl AggregatorTimeWeight<u64> for AggregatorNumeric {
|
||||
|
||||
fn ingest(&mut self, dt: DtNano, bl: DtNano, val: u64) {
|
||||
let f = dt.ns() as f64 / bl.ns() as f64;
|
||||
eprintln!("INGEST {} {}", f, val);
|
||||
trace!("INGEST {} {}", f, val);
|
||||
self.sum += f * val as f64;
|
||||
}
|
||||
|
||||
@@ -99,10 +101,11 @@ impl AggregatorTimeWeight<u64> for AggregatorNumeric {
|
||||
self.sum = 0.;
|
||||
}
|
||||
|
||||
fn result_and_reset_for_new_bin(&mut self) -> f64 {
|
||||
let ret = self.sum.clone();
|
||||
fn result_and_reset_for_new_bin(&mut self, filled_width_fraction: f32) -> f64 {
|
||||
let sum = self.sum.clone();
|
||||
trace!("result_and_reset_for_new_bin sum {} {}", sum, filled_width_fraction);
|
||||
self.sum = 0.;
|
||||
ret
|
||||
sum / filled_width_fraction as f64
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -40,6 +40,59 @@ pub struct BinSingle<EVT> {
|
||||
pub max: EVT,
|
||||
pub avg: f32,
|
||||
pub lst: EVT,
|
||||
pub fnl: bool,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct BinRef<'a, EVT>
|
||||
where
|
||||
EVT: EventValueType,
|
||||
{
|
||||
pub ts1: TsNano,
|
||||
pub ts2: TsNano,
|
||||
pub cnt: u64,
|
||||
pub min: &'a EVT,
|
||||
pub max: &'a EVT,
|
||||
pub avg: &'a EVT::AggTimeWeightOutputAvg,
|
||||
pub lst: &'a EVT,
|
||||
pub fnl: bool,
|
||||
}
|
||||
|
||||
pub struct IterDebug<'a, EVT>
|
||||
where
|
||||
EVT: EventValueType,
|
||||
{
|
||||
bins: &'a ContainerBins<EVT>,
|
||||
ix: usize,
|
||||
len: usize,
|
||||
}
|
||||
|
||||
impl<'a, EVT> Iterator for IterDebug<'a, EVT>
|
||||
where
|
||||
EVT: EventValueType,
|
||||
{
|
||||
type Item = BinRef<'a, EVT>;
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
if self.ix < self.bins.len() && self.ix < self.len {
|
||||
let b = &self.bins;
|
||||
let i = self.ix;
|
||||
self.ix += 1;
|
||||
let ret = BinRef {
|
||||
ts1: b.ts1s[i],
|
||||
ts2: b.ts2s[i],
|
||||
cnt: b.cnts[i],
|
||||
min: &b.mins[i],
|
||||
max: &b.maxs[i],
|
||||
avg: &b.avgs[i],
|
||||
lst: &b.lsts[i],
|
||||
fnl: b.fnls[i],
|
||||
};
|
||||
Some(ret)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Serialize, Deserialize)]
|
||||
@@ -54,6 +107,7 @@ where
|
||||
maxs: VecDeque<EVT>,
|
||||
avgs: VecDeque<EVT::AggTimeWeightOutputAvg>,
|
||||
lsts: VecDeque<EVT>,
|
||||
fnls: VecDeque<bool>,
|
||||
}
|
||||
|
||||
impl<EVT> ContainerBins<EVT>
|
||||
@@ -73,6 +127,7 @@ where
|
||||
maxs: VecDeque::new(),
|
||||
avgs: VecDeque::new(),
|
||||
lsts: VecDeque::new(),
|
||||
fnls: VecDeque::new(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -98,6 +153,10 @@ where
|
||||
self.ts2s.back().map(|&x| x)
|
||||
}
|
||||
|
||||
pub fn cnts_iter(&self) -> std::collections::vec_deque::Iter<u64> {
|
||||
self.cnts.iter()
|
||||
}
|
||||
|
||||
pub fn len_before(&self, end: TsNano) -> usize {
|
||||
let pp = self.ts2s.partition_point(|&x| x <= end);
|
||||
assert!(pp <= self.len(), "len_before pp {} len {}", pp, self.len());
|
||||
@@ -127,6 +186,7 @@ where
|
||||
max: EVT,
|
||||
avg: EVT::AggTimeWeightOutputAvg,
|
||||
lst: EVT,
|
||||
fnl: bool,
|
||||
) {
|
||||
self.ts1s.push_back(ts1);
|
||||
self.ts2s.push_back(ts2);
|
||||
@@ -135,6 +195,15 @@ where
|
||||
self.maxs.push_back(max);
|
||||
self.avgs.push_back(avg);
|
||||
self.lsts.push_back(lst);
|
||||
self.fnls.push_back(fnl);
|
||||
}
|
||||
|
||||
pub fn iter_debug(&self) -> IterDebug<EVT> {
|
||||
IterDebug {
|
||||
bins: self,
|
||||
ix: 0,
|
||||
len: self.len(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -146,12 +215,13 @@ where
|
||||
let self_name = any::type_name::<Self>();
|
||||
write!(
|
||||
fmt,
|
||||
"{self_name} {{ len: {:?}, ts1s: {:?}, ts2s: {:?}, cnts: {:?}, avgs {:?} }}",
|
||||
"{self_name} {{ len: {:?}, ts1s: {:?}, ts2s: {:?}, cnts: {:?}, avgs {:?}, fnls {:?} }}",
|
||||
self.len(),
|
||||
VecPreview::new(&self.ts1s),
|
||||
VecPreview::new(&self.ts2s),
|
||||
VecPreview::new(&self.cnts),
|
||||
VecPreview::new(&self.avgs),
|
||||
VecPreview::new(&self.fnls),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -56,7 +56,7 @@ where
|
||||
impl EventValueType for f32 {
|
||||
type Container = VecDeque<Self>;
|
||||
type AggregatorTimeWeight = AggregatorNumeric;
|
||||
type AggTimeWeightOutputAvg = f64;
|
||||
type AggTimeWeightOutputAvg = f32;
|
||||
|
||||
fn identity_sum() -> Self {
|
||||
0.
|
||||
|
||||
@@ -1,17 +1,158 @@
|
||||
use crate::binning::container_bins::ContainerBins;
|
||||
use crate::binning::container_events::ContainerEvents;
|
||||
use crate::binning::timeweight::timeweight_events::BinnedEventsTimeweight;
|
||||
use err::thiserror;
|
||||
use err::ThisError;
|
||||
use netpod::log::*;
|
||||
use netpod::range::evrange::NanoRange;
|
||||
use netpod::BinnedRange;
|
||||
use netpod::DtMs;
|
||||
use netpod::EnumVariant;
|
||||
use netpod::TsNano;
|
||||
use std::collections::VecDeque;
|
||||
|
||||
#[derive(Debug, ThisError)]
|
||||
#[cstm(name = "Error")]
|
||||
enum Error {
|
||||
Timeweight(#[from] crate::binning::timeweight::timeweight_events::Error),
|
||||
AssertMsg(String),
|
||||
}
|
||||
|
||||
// fn prepare_data_with_cuts(beg_ms: u64, cuts: VecDeque<u64>) -> VecDeque<ContainerEvents<f32>> {
|
||||
// let beg = TsNano::from_ms(beg_ms);
|
||||
// let end = TsNano::from_ms(120);
|
||||
// let mut cut_next = cuts.pop_front().unwrap_or(u64::MAX);
|
||||
// let mut ret = VecDeque::new();
|
||||
// let ivl = DtMs::from_ms_u64(x)
|
||||
// }
|
||||
|
||||
fn pu(c: &mut ContainerEvents<f32>, ts_ms: u64, val: f32)
|
||||
// where
|
||||
// C: AsMut<ContainerEvents<f32>>,
|
||||
// C: std::borrow::BorrowMut<ContainerEvents<f32>>,
|
||||
{
|
||||
c.push_back(TsNano::from_ms(ts_ms), val);
|
||||
}
|
||||
|
||||
trait IntoVecDequeU64 {
|
||||
fn into_vec_deque_u64(self) -> VecDeque<u64>;
|
||||
}
|
||||
|
||||
impl IntoVecDequeU64 for &str {
|
||||
fn into_vec_deque_u64(self) -> VecDeque<u64> {
|
||||
self.split_ascii_whitespace().map(|x| x.parse().unwrap()).collect()
|
||||
}
|
||||
}
|
||||
trait IntoVecDequeF32 {
|
||||
fn into_vec_deque_f32(self) -> VecDeque<f32>;
|
||||
}
|
||||
|
||||
impl IntoVecDequeF32 for &str {
|
||||
fn into_vec_deque_f32(self) -> VecDeque<f32> {
|
||||
self.split_ascii_whitespace().map(|x| x.parse().unwrap()).collect()
|
||||
}
|
||||
}
|
||||
|
||||
fn exp_u64<'a>(
|
||||
vals: impl Iterator<Item = &'a u64>,
|
||||
exps: impl Iterator<Item = &'a u64>,
|
||||
tag: &str,
|
||||
) -> Result<(), Error> {
|
||||
let mut it_a = vals;
|
||||
let mut it_b = exps;
|
||||
loop {
|
||||
let a = it_a.next();
|
||||
let b = it_b.next();
|
||||
if a.is_none() && b.is_none() {
|
||||
break;
|
||||
}
|
||||
if let (Some(val), Some(exp)) = (a, b) {
|
||||
if val != exp {
|
||||
return Err(Error::AssertMsg(format!(
|
||||
"{tag} expect close value {} vs {}",
|
||||
val, exp
|
||||
)));
|
||||
}
|
||||
} else {
|
||||
return Err(Error::AssertMsg(format!("{tag} len mismatch")));
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn exp_cnts(bins: &ContainerBins<f32>, exps: impl IntoVecDequeU64) -> Result<(), Error> {
|
||||
exp_u64(bins.cnts_iter(), exps.into_vec_deque_u64().iter(), "exp_cnts")
|
||||
}
|
||||
|
||||
fn exp_avgs(bins: &ContainerBins<f32>, exps: impl IntoVecDequeF32) -> Result<(), Error> {
|
||||
let exps = exps.into_vec_deque_f32();
|
||||
let mut it_a = bins.iter_debug();
|
||||
let mut it_b = exps.iter();
|
||||
loop {
|
||||
let a = it_a.next();
|
||||
let b = it_b.next();
|
||||
if a.is_none() && b.is_none() {
|
||||
break;
|
||||
}
|
||||
if let (Some(a), Some(&exp)) = (a, b) {
|
||||
let val = *a.avg as f32;
|
||||
if netpod::f32_close(val, exp) == false {
|
||||
return Err(Error::AssertMsg(format!("expect close value {} vs {}", val, exp)));
|
||||
}
|
||||
} else {
|
||||
return Err(Error::AssertMsg(format!(
|
||||
"len mismatch {} vs {}",
|
||||
bins.len(),
|
||||
exps.len()
|
||||
)));
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_bin_events_f32_simple_with_before_00() -> Result<(), Error> {
|
||||
let beg = TsNano::from_ms(110);
|
||||
let end = TsNano::from_ms(120);
|
||||
let nano_range = NanoRange {
|
||||
beg: beg.ns(),
|
||||
end: end.ns(),
|
||||
};
|
||||
let range = BinnedRange::from_nano_range(nano_range, DtMs::from_ms_u64(10));
|
||||
let mut binner = BinnedEventsTimeweight::new(range);
|
||||
let mut evs = ContainerEvents::<f32>::new();
|
||||
evs.push_back(TsNano::from_ms(103), 2.0);
|
||||
binner.ingest(evs)?;
|
||||
binner.input_done_range_final()?;
|
||||
let bins = binner.output();
|
||||
assert_eq!(bins.len(), 1);
|
||||
let bins = binner.output();
|
||||
assert_eq!(bins.len(), 0);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_bin_events_f32_simple_with_before_01() -> Result<(), Error> {
|
||||
let beg = TsNano::from_ms(110);
|
||||
let end = TsNano::from_ms(130);
|
||||
let nano_range = NanoRange {
|
||||
beg: beg.ns(),
|
||||
end: end.ns(),
|
||||
};
|
||||
let range = BinnedRange::from_nano_range(nano_range, DtMs::from_ms_u64(10));
|
||||
let mut binner = BinnedEventsTimeweight::new(range);
|
||||
let mut evs = ContainerEvents::<f32>::new();
|
||||
let em = &mut evs;
|
||||
pu(em, 103, 2.0);
|
||||
binner.ingest(evs)?;
|
||||
binner.input_done_range_final()?;
|
||||
let bins = binner.output();
|
||||
assert_eq!(bins.len(), 2);
|
||||
exp_cnts(&bins, "0 0")?;
|
||||
exp_avgs(&bins, "2.00 2.00")?;
|
||||
let bins = binner.output();
|
||||
assert_eq!(bins.len(), 0);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -25,8 +166,27 @@ fn test_bin_events_f32_simple_00() -> Result<(), Error> {
|
||||
let range = BinnedRange::from_nano_range(nano_range, DtMs::from_ms_u64(10));
|
||||
let mut binner = BinnedEventsTimeweight::new(range);
|
||||
let mut evs = ContainerEvents::<f32>::new();
|
||||
evs.push_back(TsNano::from_ms(103), 2.0);
|
||||
let em = &mut evs;
|
||||
pu(em, 100, 2.0);
|
||||
pu(em, 104, 2.4);
|
||||
binner.ingest(evs)?;
|
||||
let mut evs = ContainerEvents::<f32>::new();
|
||||
let em = &mut evs;
|
||||
pu(em, 111, 1.0);
|
||||
pu(em, 112, 1.2);
|
||||
pu(em, 113, 1.4);
|
||||
binner.ingest(evs)?;
|
||||
binner.input_done_range_open()?;
|
||||
let bins = binner.output();
|
||||
trace!("{bins:?}");
|
||||
for b in bins.iter_debug() {
|
||||
trace!("{b:?}");
|
||||
}
|
||||
assert_eq!(bins.len(), 2);
|
||||
exp_cnts(&bins, "2 3")?;
|
||||
exp_avgs(&bins, "2.24 1.5333")?;
|
||||
let bins = binner.output();
|
||||
assert_eq!(bins.len(), 0);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -41,14 +201,62 @@ fn test_bin_events_f32_simple_01() -> Result<(), Error> {
|
||||
let range = BinnedRange::from_nano_range(nano_range, DtMs::from_ms_u64(10));
|
||||
let mut binner = BinnedEventsTimeweight::new(range);
|
||||
let mut evs = ContainerEvents::<f32>::new();
|
||||
evs.push_back(TsNano::from_ms(103), 2.0);
|
||||
evs.push_back(TsNano::from_ms(104), 2.4);
|
||||
let em = &mut evs;
|
||||
pu(em, 102, 2.0);
|
||||
pu(em, 104, 2.4);
|
||||
binner.ingest(evs)?;
|
||||
let mut evs = ContainerEvents::<f32>::new();
|
||||
evs.push_back(TsNano::from_ms(111), 1.0);
|
||||
evs.push_back(TsNano::from_ms(112), 1.2);
|
||||
evs.push_back(TsNano::from_ms(113), 1.4);
|
||||
let em = &mut evs;
|
||||
pu(em, 111, 1.0);
|
||||
pu(em, 112, 1.2);
|
||||
pu(em, 113, 1.4);
|
||||
binner.ingest(evs)?;
|
||||
binner.input_done_range_open()?;
|
||||
let bins = binner.output();
|
||||
trace!("{bins:?}");
|
||||
for b in bins.iter_debug() {
|
||||
trace!("{b:?}");
|
||||
}
|
||||
assert_eq!(bins.len(), 2);
|
||||
exp_cnts(&bins, "2 3")?;
|
||||
exp_avgs(&bins, "2.30 1.5333")?;
|
||||
let bins = binner.output();
|
||||
assert_eq!(bins.len(), 0);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_bin_events_f32_small_range_final() -> Result<(), Error> {
|
||||
let beg = TsNano::from_ms(100);
|
||||
let end = TsNano::from_ms(120);
|
||||
let nano_range = NanoRange {
|
||||
beg: beg.ns(),
|
||||
end: end.ns(),
|
||||
};
|
||||
let range = BinnedRange::from_nano_range(nano_range, DtMs::from_ms_u64(10));
|
||||
let mut binner = BinnedEventsTimeweight::new(range);
|
||||
let mut evs = ContainerEvents::<f32>::new();
|
||||
let em = &mut evs;
|
||||
pu(em, 102, 2.0);
|
||||
pu(em, 104, 2.4);
|
||||
binner.ingest(evs)?;
|
||||
let mut evs = ContainerEvents::<f32>::new();
|
||||
let em = &mut evs;
|
||||
pu(em, 111, 1.0);
|
||||
pu(em, 112, 1.2);
|
||||
pu(em, 113, 1.4);
|
||||
binner.ingest(evs)?;
|
||||
binner.input_done_range_final()?;
|
||||
let bins = binner.output();
|
||||
trace!("{bins:?}");
|
||||
for b in bins.iter_debug() {
|
||||
trace!("{b:?}");
|
||||
}
|
||||
assert_eq!(bins.len(), 2);
|
||||
exp_cnts(&bins, "2 3")?;
|
||||
exp_avgs(&bins, "2.30 1.44")?;
|
||||
let bins = binner.output();
|
||||
assert_eq!(bins.len(), 0);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -66,8 +274,8 @@ fn test_bin_events_enum_simple_range_final() -> Result<(), Error> {
|
||||
evs.push_back(TsNano::from_ms(103), EnumVariant::new(1, "one"));
|
||||
evs.push_back(TsNano::from_ms(104), EnumVariant::new(2, "two"));
|
||||
binner.ingest(evs)?;
|
||||
binner.range_final()?;
|
||||
binner.input_done_range_final()?;
|
||||
let bins = binner.output();
|
||||
eprintln!("{:?}", bins);
|
||||
trace!("{:?}", bins);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -19,7 +19,7 @@ use std::task::Context;
|
||||
use std::task::Poll;
|
||||
|
||||
#[allow(unused)]
|
||||
macro_rules! trace_ { ($($arg:tt)*) => ( if true { eprintln!($($arg)*); }) }
|
||||
macro_rules! trace_ { ($($arg:tt)*) => ( if true { trace!($($arg)*); }) }
|
||||
|
||||
#[allow(unused)]
|
||||
macro_rules! trace_init { ($($arg:tt)*) => ( if true { trace_!($($arg)*); }) }
|
||||
@@ -48,6 +48,11 @@ macro_rules! trace_ingest_finish_bin { ($($arg:tt)*) => ( if true { trace_!($($a
|
||||
#[allow(unused)]
|
||||
macro_rules! trace_ingest_container { ($($arg:tt)*) => ( if true { trace_!($($arg)*); }) }
|
||||
|
||||
#[cold]
|
||||
#[inline]
|
||||
#[allow(unused)]
|
||||
fn cold() {}
|
||||
|
||||
const DEBUG_CHECKS: bool = true;
|
||||
|
||||
#[derive(Debug, ThisError)]
|
||||
@@ -67,6 +72,7 @@ pub enum Error {
|
||||
|
||||
type MinMax<EVT> = (EventSingle<EVT>, EventSingle<EVT>);
|
||||
|
||||
#[derive(Clone)]
|
||||
struct LstRef<'a, EVT>(&'a EventSingle<EVT>);
|
||||
|
||||
struct LstMut<'a, EVT>(&'a mut EventSingle<EVT>);
|
||||
@@ -80,6 +86,7 @@ where
|
||||
active_end: TsNano,
|
||||
active_len: DtNano,
|
||||
filled_until: TsNano,
|
||||
filled_width: DtNano,
|
||||
agg: <EVT as EventValueType>::AggregatorTimeWeight,
|
||||
}
|
||||
|
||||
@@ -99,11 +106,13 @@ where
|
||||
}
|
||||
}
|
||||
let dt = ev.ts.delta(self.filled_until);
|
||||
trace_ingest_event!("ingest_event_with_lst_gt_range_beg_agg dt {:?} ev {:?}", dt, ev);
|
||||
// TODO can the caller already take the value and replace it afterwards with the current value?
|
||||
// This fn could swap the value in lst and directly use it.
|
||||
// This would require that any call path does not mess with lst.
|
||||
// NOTE that this fn is also used during bin-cycle.
|
||||
self.agg.ingest(dt, self.active_len, lst.0.val.clone());
|
||||
self.filled_width = self.filled_width.add(dt);
|
||||
self.filled_until = ev.ts;
|
||||
}
|
||||
|
||||
@@ -150,7 +159,7 @@ where
|
||||
) -> Result<(), Error> {
|
||||
trace_ingest_event!("ingest_with_lst_gt_range_beg");
|
||||
while let Some(ev) = evs.pop_front() {
|
||||
trace_event_next!("ingest_with_lst_ge_range_beg {:?}", ev);
|
||||
trace_event_next!("EVENT POP FRONT {:?} {:30}", ev, "ingest_with_lst_gt_range_beg");
|
||||
if ev.ts <= self.active_beg {
|
||||
panic!("should never get here");
|
||||
}
|
||||
@@ -171,7 +180,7 @@ where
|
||||
) -> Result<(), Error> {
|
||||
trace_ingest_event!("ingest_with_lst_ge_range_beg");
|
||||
while let Some(ev) = evs.pop_front() {
|
||||
trace_event_next!("ingest_with_lst_ge_range_beg {:?}", ev);
|
||||
trace_event_next!("EVENT POP FRONT {:?} {:30}", ev, "ingest_with_lst_ge_range_beg");
|
||||
if ev.ts < self.active_beg {
|
||||
panic!("should never get here");
|
||||
}
|
||||
@@ -200,6 +209,8 @@ where
|
||||
trace_ingest_event!("ingest_with_lst_minmax");
|
||||
// TODO how to handle the min max? I don't take event data yet out of the container.
|
||||
if let Some(ts0) = evs.ts_first() {
|
||||
trace_ingest_event!("EVENT POP FRONT ingest_with_lst_minmax");
|
||||
trace_ingest_event!("EVENT TIMESTAMP FRONT {:?} ingest_with_lst_minmax", ts0);
|
||||
if ts0 < self.active_beg {
|
||||
panic!("should never get here");
|
||||
} else {
|
||||
@@ -212,21 +223,17 @@ where
|
||||
|
||||
// PRECONDITION: filled_until < ts <= active_end
|
||||
fn fill_until(&mut self, ts: TsNano, lst: LstRef<EVT>) {
|
||||
trace_cycle!("fill_until ts {:?}", ts);
|
||||
let b = self;
|
||||
assert!(b.filled_until < ts);
|
||||
assert!(ts <= b.active_end);
|
||||
b.agg.ingest(ts.delta(b.filled_until), b.active_len, lst.0.val.clone());
|
||||
let dt = ts.delta(b.filled_until);
|
||||
trace_cycle!("fill_until ts {:?} dt {:?} lst {:?}", ts, dt, lst.0);
|
||||
assert!(b.filled_until < ts);
|
||||
assert!(ts <= b.active_end);
|
||||
b.agg.ingest(dt, b.active_len, lst.0.val.clone());
|
||||
b.filled_width = b.filled_width.add(dt);
|
||||
b.filled_until = ts;
|
||||
}
|
||||
|
||||
fn fill_remaining_if_space_left(&mut self, lst: LstRef<EVT>) {
|
||||
trace_cycle!("fill_remaining_if_space_left");
|
||||
let b = self;
|
||||
if b.filled_until < b.active_end {
|
||||
b.fill_until(b.active_end, lst);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct InnerA<EVT>
|
||||
@@ -270,7 +277,7 @@ where
|
||||
self.inner_b.ingest_with_lst_minmax(evs, lst, minmax)
|
||||
} else {
|
||||
if let Some(ev) = evs.pop_front() {
|
||||
trace_event_next!("ingest_with_lst {:?}", ev);
|
||||
trace_event_next!("EVENT POP FRONT {:?} {:30}", ev, "ingest_with_lst");
|
||||
let beg = self.inner_b.active_beg;
|
||||
let end = self.inner_b.active_end;
|
||||
if ev.ts < beg {
|
||||
@@ -301,6 +308,53 @@ where
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn reset_01(&mut self, lst: LstRef<EVT>) {
|
||||
let selfname = "reset_01";
|
||||
let b = &mut self.inner_b;
|
||||
trace_cycle!(
|
||||
"{selfname} active_end {:?} filled_until {:?}",
|
||||
b.active_end,
|
||||
b.filled_until
|
||||
);
|
||||
let div = b.active_len.ns();
|
||||
let old_end = b.active_end;
|
||||
let ts1 = TsNano::from_ns(b.active_end.ns() / div * div);
|
||||
assert!(ts1 == old_end);
|
||||
b.active_beg = ts1;
|
||||
b.active_end = ts1.add_dt_nano(b.active_len);
|
||||
b.filled_until = ts1;
|
||||
b.filled_width = DtNano::from_ns(0);
|
||||
b.cnt = 0;
|
||||
self.minmax = Some((lst.0.clone(), lst.0.clone()));
|
||||
}
|
||||
|
||||
fn push_out_and_reset(&mut self, lst: LstRef<EVT>, range_final: bool, out: &mut ContainerBins<EVT>) {
|
||||
let selfname = "push_out_and_reset";
|
||||
// TODO there is not always good enough input to produce a meaningful bin.
|
||||
// TODO can we always reset, and what exactly does reset mean here?
|
||||
// TODO what logic can I save here? To output a bin I need to have min, max, lst.
|
||||
let b = &mut self.inner_b;
|
||||
let minmax = self.minmax.get_or_insert_with(|| {
|
||||
trace_cycle!("{selfname} minmax not yet set");
|
||||
(lst.0.clone(), lst.0.clone())
|
||||
});
|
||||
{
|
||||
let filled_width_fraction = b.filled_width.fraction_of(b.active_len);
|
||||
let res = b.agg.result_and_reset_for_new_bin(filled_width_fraction);
|
||||
out.push_back(
|
||||
b.active_beg,
|
||||
b.active_end,
|
||||
b.cnt,
|
||||
minmax.0.val.clone(),
|
||||
minmax.1.val.clone(),
|
||||
res,
|
||||
lst.0.val.clone(),
|
||||
range_final,
|
||||
);
|
||||
}
|
||||
self.reset_01(lst);
|
||||
}
|
||||
}
|
||||
|
||||
pub struct BinnedEventsTimeweight<EVT>
|
||||
@@ -330,6 +384,7 @@ where
|
||||
active_end,
|
||||
active_len,
|
||||
filled_until: active_beg,
|
||||
filled_width: DtNano::from_ns(0),
|
||||
agg: <<EVT as EventValueType>::AggregatorTimeWeight as AggregatorTimeWeight<EVT>>::new(),
|
||||
},
|
||||
minmax: None,
|
||||
@@ -349,6 +404,7 @@ where
|
||||
trace_ingest_minmax!("ingest_event_without_lst");
|
||||
self.inner_a.init_minmax(&ev);
|
||||
self.inner_a.inner_b.cnt += 1;
|
||||
self.inner_a.inner_b.filled_until = ev.ts;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
@@ -356,7 +412,7 @@ where
|
||||
|
||||
fn ingest_without_lst(&mut self, mut evs: ContainerEventsTakeUpTo<EVT>) -> Result<(), Error> {
|
||||
if let Some(ev) = evs.pop_front() {
|
||||
trace_event_next!("ingest_without_lst {:?}", ev);
|
||||
trace_event_next!("EVENT POP FRONT {:?} {:30}", ev, "ingest_without_lst");
|
||||
if ev.ts >= self.inner_a.inner_b.active_end {
|
||||
panic!("should never get here");
|
||||
} else {
|
||||
@@ -390,52 +446,38 @@ where
|
||||
let b = &self.inner_a.inner_b;
|
||||
trace_cycle!("cycle_01 {:?} {:?}", ts, b.active_end);
|
||||
assert!(b.active_beg < ts);
|
||||
let div = self.range.bin_len.ns();
|
||||
assert!(b.active_beg <= b.filled_until);
|
||||
assert!(b.filled_until < ts);
|
||||
assert!(b.filled_until <= b.active_end);
|
||||
let div = b.active_len.ns();
|
||||
if let Some(lst) = self.lst.as_ref() {
|
||||
let lst = LstRef(lst);
|
||||
let mut i = 0;
|
||||
loop {
|
||||
i += 1;
|
||||
assert!(i < 100000, "too many iterations");
|
||||
let b = &self.inner_a.inner_b;
|
||||
if b.filled_until >= ts {
|
||||
break;
|
||||
}
|
||||
if ts >= b.active_end {
|
||||
self.inner_a.inner_b.fill_remaining_if_space_left(LstRef(lst));
|
||||
let b = &mut self.inner_a.inner_b;
|
||||
let minmax = self.inner_a.minmax.get_or_insert_with(|| {
|
||||
trace_cycle!("cycle_01 minmax not yet set");
|
||||
(lst.clone(), lst.clone())
|
||||
});
|
||||
{
|
||||
// TODO push bin to output.
|
||||
let res = b.agg.result_and_reset_for_new_bin();
|
||||
self.out.push_back(
|
||||
b.active_beg,
|
||||
b.active_end,
|
||||
b.cnt,
|
||||
minmax.0.val.clone(),
|
||||
minmax.1.val.clone(),
|
||||
res,
|
||||
lst.val.clone(),
|
||||
);
|
||||
if ts > b.filled_until {
|
||||
if ts >= b.active_end {
|
||||
if b.filled_until < b.active_end {
|
||||
self.inner_a.inner_b.fill_until(b.active_end, lst.clone());
|
||||
}
|
||||
self.inner_a.push_out_and_reset(lst.clone(), true, &mut self.out);
|
||||
} else {
|
||||
self.inner_a.inner_b.fill_until(ts, lst.clone());
|
||||
}
|
||||
trace_cycle!("cycle_01 filled up to {:?} emit and reset", b.active_end);
|
||||
let old_end = b.active_end;
|
||||
let ts1 = TsNano::from_ns(b.active_end.ns() / div * div);
|
||||
assert!(ts1 == old_end);
|
||||
b.active_beg = ts1;
|
||||
b.active_end = ts1.add_dt_nano(b.active_len);
|
||||
b.filled_until = ts1;
|
||||
b.cnt = 0;
|
||||
self.inner_a.minmax = Some((lst.clone(), lst.clone()));
|
||||
} else {
|
||||
self.inner_a.inner_b.fill_until(ts, LstRef(lst));
|
||||
break;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// TODO merge with the other reset
|
||||
let ts1 = TsNano::from_ns(ts.ns() / div * div);
|
||||
let b = &mut self.inner_a.inner_b;
|
||||
b.active_beg = ts1;
|
||||
b.active_end = ts1.add_dt_nano(b.active_len);
|
||||
b.filled_until = ts1;
|
||||
b.filled_width = DtNano::from_ns(0);
|
||||
b.cnt = 0;
|
||||
b.agg.reset_for_new_bin();
|
||||
assert!(self.inner_a.minmax.is_none());
|
||||
@@ -443,6 +485,17 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
fn cycle_02(&mut self) {
|
||||
let b = &self.inner_a.inner_b;
|
||||
trace_cycle!("cycle_02 {:?}", b.active_end);
|
||||
if let Some(lst) = self.lst.as_ref() {
|
||||
let lst = LstRef(lst);
|
||||
self.inner_a.push_out_and_reset(lst, false, &mut self.out);
|
||||
} else {
|
||||
// there is nothing we can produce
|
||||
}
|
||||
}
|
||||
|
||||
pub fn ingest(&mut self, mut evs_all: ContainerEvents<EVT>) -> Result<(), Error> {
|
||||
// It is this type's task to find and store the one-before event.
|
||||
// We then pass it to the aggregation.
|
||||
@@ -457,16 +510,15 @@ where
|
||||
evs_all.verify()?;
|
||||
|
||||
loop {
|
||||
// How to handle transition to the next bin?
|
||||
// How to handle to not emit bins until at least some partially filled bin is encountered?
|
||||
break if let Some(ts) = evs_all.ts_first() {
|
||||
trace_ingest_event!("EVENT TIMESTAMP FRONT {:?} ingest", ts);
|
||||
let b = &mut self.inner_a.inner_b;
|
||||
if ts >= self.range.nano_end() {
|
||||
return Err(Error::EventAfterRange);
|
||||
}
|
||||
if ts >= b.active_end {
|
||||
trace_cycle!("bin edge boundary {:?}", b.active_end);
|
||||
assert!(b.filled_until < b.active_beg);
|
||||
assert!(b.filled_until < b.active_end, "{} < {}", b.filled_until, b.active_end);
|
||||
self.cycle_01(ts);
|
||||
}
|
||||
let n1 = evs_all.len();
|
||||
@@ -496,12 +548,18 @@ where
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn range_final(&mut self) -> Result<(), Error> {
|
||||
trace_cycle!("range_final");
|
||||
pub fn input_done_range_final(&mut self) -> Result<(), Error> {
|
||||
trace_cycle!("input_done_range_final");
|
||||
self.cycle_01(self.range.nano_end());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn input_done_range_open(&mut self) -> Result<(), Error> {
|
||||
trace_cycle!("input_done_range_open");
|
||||
self.cycle_02();
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn output(&mut self) -> ContainerBins<EVT> {
|
||||
::core::mem::replace(&mut self.out, ContainerBins::new())
|
||||
}
|
||||
|
||||
@@ -68,10 +68,13 @@ impl AggregatorTimeWeight<EnumVariant> for EnumVariantAggregatorTimeWeight {
|
||||
self.sum = f32::identity_sum();
|
||||
}
|
||||
|
||||
fn result_and_reset_for_new_bin(&mut self) -> <EnumVariant as EventValueType>::AggTimeWeightOutputAvg {
|
||||
fn result_and_reset_for_new_bin(
|
||||
&mut self,
|
||||
filled_width_fraction: f32,
|
||||
) -> <EnumVariant as EventValueType>::AggTimeWeightOutputAvg {
|
||||
let ret = self.sum.clone();
|
||||
self.sum = f32::identity_sum();
|
||||
ret
|
||||
ret / filled_width_fraction
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -50,12 +50,12 @@ pub mod log_macros {
|
||||
}
|
||||
}
|
||||
|
||||
pub mod log {
|
||||
pub mod log2 {
|
||||
pub use tracing::{self, event, span, Level};
|
||||
pub use tracing::{debug, error, info, trace, warn};
|
||||
}
|
||||
|
||||
pub mod log2 {
|
||||
pub mod log {
|
||||
pub use crate::{debug, error, info, trace, warn};
|
||||
pub use tracing::{self, event, span, Level};
|
||||
}
|
||||
@@ -1698,6 +1698,14 @@ impl DtNano {
|
||||
pub fn to_i64(&self) -> i64 {
|
||||
self.0 as i64
|
||||
}
|
||||
|
||||
pub fn add(self, rhs: Self) -> Self {
|
||||
Self(self.0 + rhs.0)
|
||||
}
|
||||
|
||||
pub fn fraction_of(self, rhs: Self) -> f32 {
|
||||
self.0 as f32 / rhs.0 as f32
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for DtNano {
|
||||
|
||||
Reference in New Issue
Block a user