Introduce trait for aggregated payload

This commit is contained in:
Dominik Werder
2024-11-20 11:56:35 +01:00
parent 67590d6bd1
commit 5de8f847a4
11 changed files with 393 additions and 133 deletions

View File

@@ -1,5 +1,6 @@
pub mod aggregator;
pub mod binnedvaluetype;
pub mod container;
pub mod container_bins;
pub mod container_events;
pub mod timeweight;

View File

@@ -1,3 +1,6 @@
pub mod agg_bins;
use super::container::bins::BinAggedType;
use super::container_events::EventValueType;
use core::fmt;
use netpod::log::*;
@@ -12,21 +15,21 @@ macro_rules! trace_event { ($($arg:tt)*) => ( if false { trace!($($arg)*); }) }
#[allow(unused)]
macro_rules! trace_result { ($($arg:tt)*) => ( if false { trace!($($arg)*); }) }
pub trait AggTimeWeightOutputAvg: fmt::Debug + Clone + Send + Serialize + for<'a> Deserialize<'a> {}
pub trait AggTimeWeightOutputAvg: BinAggedType + Serialize + for<'a> Deserialize<'a> {}
impl AggTimeWeightOutputAvg for u8 {}
impl AggTimeWeightOutputAvg for u16 {}
impl AggTimeWeightOutputAvg for u32 {}
impl AggTimeWeightOutputAvg for u64 {}
impl AggTimeWeightOutputAvg for i8 {}
impl AggTimeWeightOutputAvg for i16 {}
impl AggTimeWeightOutputAvg for i32 {}
impl AggTimeWeightOutputAvg for i64 {}
// impl AggTimeWeightOutputAvg for u8 {}
// impl AggTimeWeightOutputAvg for u16 {}
// impl AggTimeWeightOutputAvg for u32 {}
// impl AggTimeWeightOutputAvg for u64 {}
// impl AggTimeWeightOutputAvg for i8 {}
// impl AggTimeWeightOutputAvg for i16 {}
// impl AggTimeWeightOutputAvg for i32 {}
// impl AggTimeWeightOutputAvg for i64 {}
impl AggTimeWeightOutputAvg for f32 {}
impl AggTimeWeightOutputAvg for f64 {}
impl AggTimeWeightOutputAvg for EnumVariant {}
impl AggTimeWeightOutputAvg for String {}
impl AggTimeWeightOutputAvg for bool {}
// impl AggTimeWeightOutputAvg for EnumVariant {}
// impl AggTimeWeightOutputAvg for String {}
// impl AggTimeWeightOutputAvg for bool {}
pub trait AggregatorTimeWeight<EVT>: fmt::Debug + Send
where
@@ -35,7 +38,10 @@ where
fn new() -> Self;
fn ingest(&mut self, dt: DtNano, bl: DtNano, val: EVT);
fn reset_for_new_bin(&mut self);
fn result_and_reset_for_new_bin(&mut self, filled_width_fraction: f32) -> EVT::AggTimeWeightOutputAvg;
fn result_and_reset_for_new_bin(
&mut self,
filled_width_fraction: f32,
) -> EVT::AggTimeWeightOutputAvg;
}
#[derive(Debug)]
@@ -71,9 +77,16 @@ where
self.sum = 0.;
}
fn result_and_reset_for_new_bin(&mut self, filled_width_fraction: f32) -> EVT::AggTimeWeightOutputAvg {
fn result_and_reset_for_new_bin(
&mut self,
filled_width_fraction: f32,
) -> EVT::AggTimeWeightOutputAvg {
let sum = self.sum.clone();
trace_result!("result_and_reset_for_new_bin sum {} {}", sum, filled_width_fraction);
trace_result!(
"result_and_reset_for_new_bin sum {} {}",
sum,
filled_width_fraction
);
self.sum = 0.;
sum / filled_width_fraction as f64
}
@@ -96,7 +109,11 @@ impl AggregatorTimeWeight<f32> for AggregatorNumeric {
fn result_and_reset_for_new_bin(&mut self, filled_width_fraction: f32) -> f32 {
let sum = self.sum.clone() as f32;
trace_result!("result_and_reset_for_new_bin sum {} {}", sum, filled_width_fraction);
trace_result!(
"result_and_reset_for_new_bin sum {} {}",
sum,
filled_width_fraction
);
self.sum = 0.;
sum / filled_width_fraction
}
@@ -158,7 +175,11 @@ impl AggregatorTimeWeight<u64> for AggregatorNumeric {
fn result_and_reset_for_new_bin(&mut self, filled_width_fraction: f32) -> f64 {
let sum = self.sum.clone();
trace_result!("result_and_reset_for_new_bin sum {} {}", sum, filled_width_fraction);
trace_result!(
"result_and_reset_for_new_bin sum {} {}",
sum,
filled_width_fraction
);
self.sum = 0.;
sum / filled_width_fraction as f64
}
@@ -181,7 +202,11 @@ impl AggregatorTimeWeight<bool> for AggregatorNumeric {
fn result_and_reset_for_new_bin(&mut self, filled_width_fraction: f32) -> f64 {
let sum = self.sum.clone();
trace_result!("result_and_reset_for_new_bin sum {} {}", sum, filled_width_fraction);
trace_result!(
"result_and_reset_for_new_bin sum {} {}",
sum,
filled_width_fraction
);
self.sum = 0.;
sum / filled_width_fraction as f64
}
@@ -204,7 +229,11 @@ impl AggregatorTimeWeight<String> for AggregatorNumeric {
fn result_and_reset_for_new_bin(&mut self, filled_width_fraction: f32) -> f64 {
let sum = self.sum.clone();
trace_result!("result_and_reset_for_new_bin sum {} {}", sum, filled_width_fraction);
trace_result!(
"result_and_reset_for_new_bin sum {} {}",
sum,
filled_width_fraction
);
self.sum = 0.;
sum / filled_width_fraction as f64
}

View File

1
src/binning/container.rs Normal file
View File

@@ -0,0 +1 @@
pub mod bins;

View File

@@ -0,0 +1,125 @@
use crate::binning::container_bins::ContainerBins;
use crate::binning::container_events::EventValueType;
use crate::binning::container_events::PartialOrdEvtA;
use items_0::vecpreview::PreviewRange;
use netpod::DtNano;
use serde::Deserialize;
use serde::Serialize;
use std::collections::VecDeque;
use std::fmt;
pub trait AggBinValTw<BVT>: fmt::Debug + Send
where
BVT: BinAggedType,
{
fn new() -> Self;
fn ingest(&mut self, bl: DtNano, val: BVT);
fn reset_for_new_bin(&mut self);
fn result_and_reset_for_new_bin(&mut self, filled_width_fraction: f32) -> BVT;
}
pub trait BinAggedContainer<BVT>:
fmt::Debug + Send + Clone + PreviewRange + Serialize + for<'a> Deserialize<'a>
where
BVT: BinAggedType,
{
fn new() -> Self;
fn push_back(&mut self, val: BVT);
fn pop_front(&mut self) -> Option<BVT>;
fn get_iter_ty_1<'a>(&'a self, pos: usize) -> Option<BVT::IterTy1<'a>>;
}
pub trait BinAggedType:
fmt::Debug + Clone + PartialOrd + Send + 'static + Serialize + for<'a> Deserialize<'a>
{
type Container: BinAggedContainer<Self>;
type AggregatorTimeWeight: AggBinValTw<Self>;
type IterTy1<'a>: fmt::Debug + Clone + PartialOrdEvtA<Self> + Into<Self>;
}
impl<EVT, BVT> PreviewRange for ContainerBins<EVT, BVT>
where
EVT: EventValueType,
BVT: BinAggedType,
{
fn preview<'a>(&'a self) -> Box<dyn fmt::Debug + 'a> {
todo!()
}
}
impl<BVT> BinAggedContainer<BVT> for VecDeque<f32>
where
BVT: BinAggedType,
{
fn new() -> Self {
todo!()
}
fn push_back(&mut self, val: BVT) {
todo!()
}
fn pop_front(&mut self) -> Option<BVT> {
todo!()
}
fn get_iter_ty_1<'a>(&'a self, pos: usize) -> Option<<BVT as BinAggedType>::IterTy1<'a>> {
todo!()
}
}
impl<BVT> BinAggedContainer<BVT> for VecDeque<f64>
where
BVT: BinAggedType,
{
fn new() -> Self {
todo!()
}
fn push_back(&mut self, val: BVT) {
todo!()
}
fn pop_front(&mut self) -> Option<BVT> {
todo!()
}
fn get_iter_ty_1<'a>(&'a self, pos: usize) -> Option<<BVT as BinAggedType>::IterTy1<'a>> {
todo!()
}
}
impl BinAggedType for f32 {
type Container = VecDeque<Self>;
type AggregatorTimeWeight = ();
type IterTy1<'a> = Self;
}
impl BinAggedType for f64 {
type Container = VecDeque<Self>;
type AggregatorTimeWeight = ();
type IterTy1<'a> = Self;
}
impl<T> AggBinValTw<T> for ()
where
T: BinAggedType,
{
fn new() -> Self {
todo!()
}
fn ingest(&mut self, bl: DtNano, val: T) {
todo!()
}
fn reset_for_new_bin(&mut self) {
todo!()
}
fn result_and_reset_for_new_bin(&mut self, filled_width_fraction: f32) -> T {
todo!()
}
}
pub struct DummyPayload {}

View File

@@ -1,3 +1,4 @@
use super::container::bins::BinAggedType;
use super::container_events::EventValueType;
use crate::offsets::ts_offs_from_abs;
use crate::offsets::ts_offs_from_abs_with_anchor;
@@ -32,56 +33,38 @@ pub enum ContainerBinsError {
Unordered,
}
pub trait BinValueType: fmt::Debug + Clone + PartialOrd {
// type Container: Container<Self>;
// type AggregatorTimeWeight: AggregatorTimeWeight<Self>;
// type AggTimeWeightOutputAvg;
// fn identity_sum() -> Self;
// fn add_weighted(&self, add: &Self, f: f32) -> Self;
}
#[derive(Debug, Clone)]
pub struct BinSingle<EVT> {
pub ts1: TsNano,
pub ts2: TsNano,
pub cnt: u64,
pub min: EVT,
pub max: EVT,
pub avg: f32,
pub lst: EVT,
pub fnl: bool,
}
#[derive(Debug, Clone)]
pub struct BinRef<'a, EVT>
pub struct BinRef<'a, EVT, BVT>
where
EVT: EventValueType,
BVT: BinAggedType,
{
pub ts1: TsNano,
pub ts2: TsNano,
pub cnt: u64,
pub min: &'a EVT,
pub max: &'a EVT,
pub avg: &'a EVT::AggTimeWeightOutputAvg,
pub agg: &'a BVT,
pub lst: &'a EVT,
pub fnl: bool,
}
pub struct IterDebug<'a, EVT>
pub struct IterDebug<'a, EVT, BVT>
where
EVT: EventValueType,
BVT: BinAggedType,
{
bins: &'a ContainerBins<EVT>,
bins: &'a ContainerBins<EVT, BVT>,
ix: usize,
len: usize,
}
impl<'a, EVT> Iterator for IterDebug<'a, EVT>
impl<'a, EVT, BVT> Iterator for IterDebug<'a, EVT, BVT>
where
EVT: EventValueType,
BVT: BinAggedType,
{
type Item = BinRef<'a, EVT>;
type Item = BinRef<'a, EVT, BVT>;
fn next(&mut self) -> Option<Self::Item> {
if self.ix < self.bins.len() && self.ix < self.len {
@@ -94,7 +77,7 @@ where
cnt: b.cnts[i],
min: &b.mins[i],
max: &b.maxs[i],
avg: &b.avgs[i],
agg: &b.aggs[i],
lst: &b.lsts[i],
fnl: b.fnls[i],
};
@@ -105,24 +88,62 @@ where
}
}
#[derive(Clone, Serialize, Deserialize)]
pub struct ContainerBins<EVT>
#[derive(Clone)]
pub struct ContainerBins<EVT, BVT>
where
EVT: EventValueType,
BVT: BinAggedType,
{
ts1s: VecDeque<TsNano>,
ts2s: VecDeque<TsNano>,
cnts: VecDeque<u64>,
mins: VecDeque<EVT>,
maxs: VecDeque<EVT>,
avgs: VecDeque<EVT::AggTimeWeightOutputAvg>,
aggs: VecDeque<BVT>,
lsts: VecDeque<EVT>,
fnls: VecDeque<bool>,
}
impl<EVT> ContainerBins<EVT>
mod container_bins_serde {
use super::ContainerBins;
use super::EventValueType;
use crate::binning::container::bins::BinAggedType;
use serde::Deserialize;
use serde::Deserializer;
use serde::Serialize;
use serde::Serializer;
impl<EVT, BVT> Serialize for ContainerBins<EVT, BVT>
where
EVT: EventValueType,
BVT: BinAggedType,
{
fn serialize<S>(&self, ser: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
todo!()
}
}
impl<'de, EVT, BVT> Deserialize<'de> for ContainerBins<EVT, BVT>
where
EVT: EventValueType,
BVT: BinAggedType,
{
fn deserialize<D>(de: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
todo!()
}
}
}
impl<EVT, BVT> ContainerBins<EVT, BVT>
where
EVT: EventValueType,
BVT: BinAggedType,
{
pub fn from_constituents(
ts1s: VecDeque<TsNano>,
@@ -130,7 +151,7 @@ where
cnts: VecDeque<u64>,
mins: VecDeque<EVT>,
maxs: VecDeque<EVT>,
avgs: VecDeque<EVT::AggTimeWeightOutputAvg>,
aggs: VecDeque<BVT>,
lsts: VecDeque<EVT>,
fnls: VecDeque<bool>,
) -> Self {
@@ -140,7 +161,7 @@ where
cnts,
mins,
maxs,
avgs,
aggs,
lsts,
fnls,
}
@@ -157,7 +178,7 @@ where
cnts: VecDeque::new(),
mins: VecDeque::new(),
maxs: VecDeque::new(),
avgs: VecDeque::new(),
aggs: VecDeque::new(),
lsts: VecDeque::new(),
fnls: VecDeque::new(),
}
@@ -215,8 +236,8 @@ where
self.maxs.iter()
}
pub fn avgs_iter(&self) -> std::collections::vec_deque::Iter<EVT::AggTimeWeightOutputAvg> {
self.avgs.iter()
pub fn aggs_iter(&self) -> std::collections::vec_deque::Iter<BVT> {
self.aggs.iter()
}
pub fn lsts_iter(&self) -> std::collections::vec_deque::Iter<EVT> {
@@ -245,7 +266,7 @@ where
>,
std::collections::vec_deque::Iter<EVT>,
>,
std::collections::vec_deque::Iter<EVT::AggTimeWeightOutputAvg>,
std::collections::vec_deque::Iter<BVT>,
>,
std::collections::vec_deque::Iter<EVT>,
>,
@@ -256,7 +277,7 @@ where
.zip(self.cnts_iter())
.zip(self.mins_iter())
.zip(self.maxs_iter())
.zip(self.avgs_iter())
.zip(self.aggs_iter())
.zip(self.lsts_iter())
.zip(self.fnls_iter())
}
@@ -288,7 +309,7 @@ where
cnt: u64,
min: EVT,
max: EVT,
avg: EVT::AggTimeWeightOutputAvg,
agg: BVT,
lst: EVT,
fnl: bool,
) {
@@ -297,12 +318,12 @@ where
self.cnts.push_back(cnt);
self.mins.push_back(min);
self.maxs.push_back(max);
self.avgs.push_back(avg);
self.aggs.push_back(agg);
self.lsts.push_back(lst);
self.fnls.push_back(fnl);
}
pub fn iter_debug(&self) -> IterDebug<EVT> {
pub fn iter_debug(&self) -> IterDebug<EVT, BVT> {
IterDebug {
bins: self,
ix: 0,
@@ -311,64 +332,70 @@ where
}
}
impl<EVT> fmt::Debug for ContainerBins<EVT>
impl<EVT, BVT> fmt::Debug for ContainerBins<EVT, BVT>
where
EVT: EventValueType,
BVT: BinAggedType,
{
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
let self_name = any::type_name::<Self>();
write!(
fmt,
"{self_name} {{ len: {:?}, ts1s: {:?}, ts2s: {:?}, cnts: {:?}, avgs {:?}, fnls {:?} }}",
"{self_name} {{ len: {:?}, ts1s: {:?}, ts2s: {:?}, cnts: {:?}, aggs {:?}, fnls {:?} }}",
self.len(),
VecPreview::new(&self.ts1s),
VecPreview::new(&self.ts2s),
VecPreview::new(&self.cnts),
VecPreview::new(&self.avgs),
VecPreview::new(&self.aggs),
VecPreview::new(&self.fnls),
)
}
}
impl<EVT> fmt::Display for ContainerBins<EVT>
impl<EVT, BVT> fmt::Display for ContainerBins<EVT, BVT>
where
EVT: EventValueType,
BVT: BinAggedType,
{
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt::Debug::fmt(self, fmt)
}
}
impl<EVT> AsAnyMut for ContainerBins<EVT>
impl<EVT, BVT> AsAnyMut for ContainerBins<EVT, BVT>
where
EVT: EventValueType,
BVT: BinAggedType,
{
fn as_any_mut(&mut self) -> &mut dyn any::Any {
self
}
}
impl<EVT> WithLen for ContainerBins<EVT>
impl<EVT, BVT> WithLen for ContainerBins<EVT, BVT>
where
EVT: EventValueType,
BVT: BinAggedType,
{
fn len(&self) -> usize {
Self::len(self)
}
}
impl<EVT> TypeName for ContainerBins<EVT>
impl<EVT, BVT> TypeName for ContainerBins<EVT, BVT>
where
EVT: EventValueType,
BVT: BinAggedType,
{
fn type_name(&self) -> String {
BinningggContainerBinsDyn::type_name(self).into()
Self::type_name().into()
}
}
impl<EVT> AsAnyRef for ContainerBins<EVT>
impl<EVT, BVT> AsAnyRef for ContainerBins<EVT, BVT>
where
EVT: EventValueType,
BVT: BinAggedType,
{
fn as_any_ref(&self) -> &dyn any::Any {
self
@@ -376,43 +403,48 @@ where
}
#[derive(Debug)]
pub struct ContainerBinsCollectorOutput<EVT>
pub struct ContainerBinsCollectorOutput<EVT, BVT>
where
EVT: EventValueType,
BVT: BinAggedType,
{
bins: ContainerBins<EVT>,
bins: ContainerBins<EVT, BVT>,
}
impl<EVT> TypeName for ContainerBinsCollectorOutput<EVT>
impl<EVT, BVT> TypeName for ContainerBinsCollectorOutput<EVT, BVT>
where
EVT: EventValueType,
BVT: BinAggedType,
{
fn type_name(&self) -> String {
any::type_name::<Self>().into()
}
}
impl<EVT> AsAnyRef for ContainerBinsCollectorOutput<EVT>
impl<EVT, BVT> AsAnyRef for ContainerBinsCollectorOutput<EVT, BVT>
where
EVT: EventValueType,
BVT: BinAggedType,
{
fn as_any_ref(&self) -> &dyn any::Any {
self
}
}
impl<EVT> AsAnyMut for ContainerBinsCollectorOutput<EVT>
impl<EVT, BVT> AsAnyMut for ContainerBinsCollectorOutput<EVT, BVT>
where
EVT: EventValueType,
BVT: BinAggedType,
{
fn as_any_mut(&mut self) -> &mut dyn any::Any {
self
}
}
impl<EVT> WithLen for ContainerBinsCollectorOutput<EVT>
impl<EVT, BVT> WithLen for ContainerBinsCollectorOutput<EVT, BVT>
where
EVT: EventValueType,
BVT: BinAggedType,
{
fn len(&self) -> usize {
self.bins.len()
@@ -420,9 +452,10 @@ where
}
#[derive(Debug, Serialize)]
struct ContainerBinsCollectorOutputUser<EVT>
struct ContainerBinsCollectorOutputUser<EVT, BVT>
where
EVT: EventValueType,
BVT: BinAggedType,
{
#[serde(rename = "tsAnchor")]
ts_anchor_sec: u64,
@@ -441,7 +474,7 @@ where
#[serde(rename = "maxs")]
maxs: VecDeque<EVT>,
#[serde(rename = "avgs")]
avgs: VecDeque<EVT::AggTimeWeightOutputAvg>,
aggs: VecDeque<BVT>,
// #[serde(rename = "rangeFinal", default, skip_serializing_if = "is_false")]
// range_final: bool,
// #[serde(rename = "timedOut", default, skip_serializing_if = "is_false")]
@@ -454,9 +487,10 @@ where
// finished_at: Option<IsoDateTime>,
}
impl<EVT> ToJsonResult for ContainerBinsCollectorOutput<EVT>
impl<EVT, BVT> ToJsonResult for ContainerBinsCollectorOutput<EVT, BVT>
where
EVT: EventValueType,
BVT: BinAggedType,
{
fn to_json_value(&self) -> Result<serde_json::Value, serde_json::Error> {
let bins = &self.bins;
@@ -467,8 +501,8 @@ where
let counts = bins.cnts.clone();
let mins = bins.mins.clone();
let maxs = bins.maxs.clone();
let avgs = bins.avgs.clone();
let val = ContainerBinsCollectorOutputUser::<EVT> {
let aggs = bins.aggs.clone();
let val = ContainerBinsCollectorOutputUser::<EVT, BVT> {
ts_anchor_sec: ts_anch,
ts1_off_ms: ts1ms,
ts2_off_ms: ts2ms,
@@ -477,38 +511,51 @@ where
counts,
mins,
maxs,
avgs,
aggs,
};
serde_json::to_value(&val)
}
}
impl<EVT> CollectedDyn for ContainerBinsCollectorOutput<EVT> where EVT: EventValueType {}
#[derive(Debug)]
pub struct ContainerBinsCollector<EVT>
impl<EVT, BVT> CollectedDyn for ContainerBinsCollectorOutput<EVT, BVT>
where
EVT: EventValueType,
BVT: BinAggedType,
{
bins: ContainerBins<EVT>,
}
#[derive(Debug)]
pub struct ContainerBinsCollector<EVT, BVT>
where
EVT: EventValueType,
BVT: BinAggedType,
{
bins: ContainerBins<EVT, BVT>,
timed_out: bool,
range_final: bool,
}
impl<EVT> ContainerBinsCollector<EVT> where EVT: EventValueType {}
impl<EVT> WithLen for ContainerBinsCollector<EVT>
impl<EVT, BVT> ContainerBinsCollector<EVT, BVT>
where
EVT: EventValueType,
BVT: BinAggedType,
{
}
impl<EVT, BVT> WithLen for ContainerBinsCollector<EVT, BVT>
where
EVT: EventValueType,
BVT: BinAggedType,
{
fn len(&self) -> usize {
self.bins.len()
}
}
impl<EVT> items_0::container::ByteEstimate for ContainerBinsCollector<EVT>
impl<EVT, BVT> items_0::container::ByteEstimate for ContainerBinsCollector<EVT, BVT>
where
EVT: EventValueType,
BVT: BinAggedType,
{
fn byte_estimate(&self) -> u64 {
// TODO need better estimate
@@ -516,12 +563,13 @@ where
}
}
impl<EVT> items_0::collect_s::CollectorDyn for ContainerBinsCollector<EVT>
impl<EVT, BVT> items_0::collect_s::CollectorDyn for ContainerBinsCollector<EVT, BVT>
where
EVT: EventValueType,
BVT: BinAggedType,
{
fn ingest(&mut self, src: &mut dyn CollectableDyn) {
if let Some(src) = src.as_any_mut().downcast_mut::<ContainerBins<EVT>>() {
if let Some(src) = src.as_any_mut().downcast_mut::<ContainerBins<EVT, BVT>>() {
src.drain_into(&mut self.bins, 0..src.len());
} else {
let srcn = src.type_name();
@@ -553,12 +601,13 @@ where
}
}
impl<EVT> CollectableDyn for ContainerBins<EVT>
impl<EVT, BVT> CollectableDyn for ContainerBins<EVT, BVT>
where
EVT: EventValueType,
BVT: BinAggedType,
{
fn new_collector(&self) -> Box<dyn items_0::collect_s::CollectorDyn> {
let ret = ContainerBinsCollector::<EVT> {
let ret = ContainerBinsCollector::<EVT, BVT> {
bins: ContainerBins::new(),
timed_out: false,
range_final: false,
@@ -567,9 +616,10 @@ where
}
}
impl<EVT> BinningggContainerBinsDyn for ContainerBins<EVT>
impl<EVT, BVT> BinningggContainerBinsDyn for ContainerBins<EVT, BVT>
where
EVT: EventValueType,
BVT: BinAggedType,
{
fn type_name(&self) -> &'static str {
any::type_name::<Self>()
@@ -604,7 +654,7 @@ where
dst.cnts.extend(self.cnts.drain(range.clone()));
dst.mins.extend(self.mins.drain(range.clone()));
dst.maxs.extend(self.maxs.drain(range.clone()));
dst.avgs.extend(self.avgs.drain(range.clone()));
dst.aggs.extend(self.aggs.drain(range.clone()));
dst.lsts.extend(self.lsts.drain(range.clone()));
dst.fnls.extend(self.fnls.drain(range.clone()));
} else {
@@ -617,7 +667,9 @@ where
&self,
range: netpod::BinnedRange<TsNano>,
) -> Box<dyn items_0::timebin::BinnedBinsTimeweightTrait> {
let ret = super::timeweight::timeweight_bins::BinnedBinsTimeweight::<EVT>::new(range);
let ret = super::timeweight::timeweight_bins::BinnedBinsTimeweight::<
EVT::AggTimeWeightOutputAvg,
>::new(range);
Box::new(ret)
}
@@ -626,32 +678,35 @@ where
.mins
.iter_mut()
.zip(self.maxs.iter_mut())
.zip(self.avgs.iter_mut())
.zip(self.aggs.iter_mut())
{}
}
}
pub struct ContainerBinsTakeUpTo<'a, EVT>
pub struct ContainerBinsTakeUpTo<'a, EVT, BVT>
where
EVT: EventValueType,
BVT: BinAggedType,
{
evs: &'a mut ContainerBins<EVT>,
evs: &'a mut ContainerBins<EVT, BVT>,
len: usize,
}
impl<'a, EVT> ContainerBinsTakeUpTo<'a, EVT>
impl<'a, EVT, BVT> ContainerBinsTakeUpTo<'a, EVT, BVT>
where
EVT: EventValueType,
BVT: BinAggedType,
{
pub fn new(evs: &'a mut ContainerBins<EVT>, len: usize) -> Self {
pub fn new(evs: &'a mut ContainerBins<EVT, BVT>, len: usize) -> Self {
let len = len.min(evs.len());
Self { evs, len }
}
}
impl<'a, EVT> ContainerBinsTakeUpTo<'a, EVT>
impl<'a, EVT, BVT> ContainerBinsTakeUpTo<'a, EVT, BVT>
where
EVT: EventValueType,
BVT: BinAggedType,
{
pub fn ts1_first(&self) -> Option<TsNano> {
self.evs.ts1_first()

View File

@@ -24,8 +24,8 @@ macro_rules! trace_init { ($($arg:tt)*) => ( if true { trace!($($arg)*); }) }
#[cstm(name = "ValueContainerError")]
pub enum ValueContainerError {}
pub trait Container<EVT>:
fmt::Debug + Send + Clone + PreviewRange + Serialize + for<'a> Deserialize<'a>
// + Serialize + for<'a> Deserialize<'a>
pub trait Container<EVT>: fmt::Debug + Send + Clone + PreviewRange
where
EVT: EventValueType,
{
@@ -39,7 +39,9 @@ pub trait PartialOrdEvtA<EVT> {
fn cmp_a(&self, other: &EVT) -> Option<std::cmp::Ordering>;
}
pub trait EventValueType: fmt::Debug + Clone + PartialOrd + Send + 'static + Serialize {
pub trait EventValueType:
fmt::Debug + Clone + PartialOrd + Send + 'static + Serialize + for<'a> Deserialize<'a>
{
type Container: Container<Self>;
type AggregatorTimeWeight: AggregatorTimeWeight<Self>;
type AggTimeWeightOutputAvg: AggTimeWeightOutputAvg;
@@ -212,7 +214,7 @@ pub enum EventsContainerError {
Unordered,
}
#[derive(Clone, Serialize, Deserialize)]
#[derive(Clone)]
pub struct ContainerEvents<EVT>
where
EVT: EventValueType,
@@ -221,6 +223,39 @@ where
vals: <EVT as EventValueType>::Container,
}
mod container_events_serde {
use super::ContainerEvents;
use super::EventValueType;
use serde::Deserialize;
use serde::Deserializer;
use serde::Serialize;
use serde::Serializer;
impl<EVT> Serialize for ContainerEvents<EVT>
where
EVT: EventValueType,
{
fn serialize<S>(&self, ser: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
todo!()
}
}
impl<'de, EVT> Deserialize<'de> for ContainerEvents<EVT>
where
EVT: EventValueType,
{
fn deserialize<D>(de: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
todo!()
}
}
}
impl<EVT> ContainerEvents<EVT>
where
EVT: EventValueType,

View File

@@ -18,7 +18,7 @@ pub(super) fn bins_gen_dim0_f32_v00(
) -> impl Stream<Item = Sitemty<Box<dyn BinningggContainerBinsDyn>>> {
futures_util::stream::iter((0usize..1000).into_iter())
.map(|x| {
let c = ContainerBins::<f32>::new();
let c = ContainerBins::<f32, f64>::new();
Box::new(c) as Box<dyn BinningggContainerBinsDyn>
})
.map(|x| Ok(StreamItem::DataItem(RangeCompletableItem::Data(x))))

View File

@@ -7,7 +7,7 @@ pub enum Error {
AssertMsg(String),
}
trait IntoVecDequeU64 {
pub(super) trait IntoVecDequeU64 {
fn into_vec_deque_u64(self) -> VecDeque<u64>;
}
@@ -18,7 +18,8 @@ impl IntoVecDequeU64 for &str {
.collect()
}
}
trait IntoVecDequeF32 {
pub(super) trait IntoVecDequeF32 {
fn into_vec_deque_f32(self) -> VecDeque<f32>;
}
@@ -88,7 +89,10 @@ fn exp_f32<'a>(
Ok(())
}
pub(super) fn exp_cnts(bins: &ContainerBins<f32>, exps: impl IntoVecDequeU64) -> Result<(), Error> {
pub(super) fn exp_cnts(
bins: &ContainerBins<f32, f32>,
exps: impl IntoVecDequeU64,
) -> Result<(), Error> {
exp_u64(
bins.cnts_iter(),
exps.into_vec_deque_u64().iter(),
@@ -96,7 +100,10 @@ pub(super) fn exp_cnts(bins: &ContainerBins<f32>, exps: impl IntoVecDequeU64) ->
)
}
pub(super) fn exp_mins(bins: &ContainerBins<f32>, exps: impl IntoVecDequeF32) -> Result<(), Error> {
pub(super) fn exp_mins(
bins: &ContainerBins<f32, f32>,
exps: impl IntoVecDequeF32,
) -> Result<(), Error> {
exp_f32(
bins.mins_iter(),
exps.into_vec_deque_f32().iter(),
@@ -104,7 +111,10 @@ pub(super) fn exp_mins(bins: &ContainerBins<f32>, exps: impl IntoVecDequeF32) ->
)
}
pub(super) fn exp_maxs(bins: &ContainerBins<f32>, exps: impl IntoVecDequeF32) -> Result<(), Error> {
pub(super) fn exp_maxs(
bins: &ContainerBins<f32, f32>,
exps: impl IntoVecDequeF32,
) -> Result<(), Error> {
exp_f32(
bins.maxs_iter(),
exps.into_vec_deque_f32().iter(),
@@ -112,7 +122,10 @@ pub(super) fn exp_maxs(bins: &ContainerBins<f32>, exps: impl IntoVecDequeF32) ->
)
}
pub(super) fn exp_avgs(bins: &ContainerBins<f32>, exps: impl IntoVecDequeF32) -> Result<(), Error> {
pub(super) fn exp_avgs(
bins: &ContainerBins<f32, f32>,
exps: impl IntoVecDequeF32,
) -> Result<(), Error> {
let exps = exps.into_vec_deque_f32();
let mut it_a = bins.iter_debug();
let mut it_b = exps.iter();
@@ -124,7 +137,7 @@ pub(super) fn exp_avgs(bins: &ContainerBins<f32>, exps: impl IntoVecDequeF32) ->
break;
}
if let (Some(a), Some(&exp)) = (a, b) {
let val = *a.avg as f32;
let val = *a.agg as f32;
if netpod::f32_close(val, exp) == false {
return Err(Error::AssertMsg(format!(
"exp_avgs val {} exp {} i {}",

View File

@@ -1,3 +1,4 @@
use crate::binning::container::bins::BinAggedType;
use crate::binning::container_bins::ContainerBins;
use crate::binning::container_events::EventValueType;
use crate::log::*;
@@ -6,23 +7,26 @@ use items_0::timebin::BinningggError;
use items_0::timebin::BinsBoxed;
use netpod::BinnedRange;
use netpod::TsNano;
use std::marker::PhantomData;
#[allow(unused)]
macro_rules! trace_init { ($($arg:tt)*) => ( if true { trace!($($arg)*); }) }
#[derive(Debug)]
pub struct BinnedBinsTimeweight<EVT>
pub struct BinnedBinsTimeweight<BVT>
where
EVT: EventValueType,
BVT: BinAggedType,
{
range: BinnedRange<TsNano>,
out: ContainerBins<EVT>,
// out: ContainerBins<BVT>,
produce_cnt_zero: bool,
// agg: <BVT as BinValueType>,
t1: PhantomData<BVT>,
}
impl<EVT> BinnedBinsTimeweight<EVT>
impl<BVT> BinnedBinsTimeweight<BVT>
where
EVT: EventValueType,
BVT: BinAggedType,
{
pub fn new(range: BinnedRange<TsNano>) -> Self {
trace_init!("BinnedBinsTimeweight::new {}", range);
@@ -31,15 +35,17 @@ where
let active_len = active_end.delta(active_beg);
Self {
range,
out: ContainerBins::new(),
// out: ContainerBins::new(),
produce_cnt_zero: true,
// agg: todo!(),
t1: PhantomData,
}
}
}
impl<EVT> BinnedBinsTimeweightTrait for BinnedBinsTimeweight<EVT>
impl<BVT> BinnedBinsTimeweightTrait for BinnedBinsTimeweight<BVT>
where
EVT: EventValueType,
BVT: BinAggedType,
{
fn ingest(&mut self, evs: &BinsBoxed) -> Result<(), BinningggError> {
todo!()

View File

@@ -254,6 +254,7 @@ where
impl<EVT> InnerA<EVT>
where
EVT: EventValueType,
// BVT: BinAggedType,
{
fn apply_min_max(ev: &EventSingleRef<EVT>, minmax: &mut MinMax<EVT>) {
if let Some(std::cmp::Ordering::Less) = ev.val.cmp_a(&minmax.0.val) {
@@ -262,12 +263,6 @@ where
if let Some(std::cmp::Ordering::Greater) = ev.val.cmp_a(&minmax.1.val) {
minmax.1 = ev.into();
}
// if ev.val < minmax.0.val {
// minmax.0 = ev.into();
// }
// if ev.val > minmax.1.val {
// minmax.1 = ev.into();
// }
}
fn apply_lst_after_event_handled(ev: EventSingleRef<EVT>, lst: LstMut<EVT>) {
@@ -367,7 +362,7 @@ where
&mut self,
lst: LstRef<EVT>,
range_final: bool,
out: &mut ContainerBins<EVT>,
out: &mut ContainerBins<EVT, EVT::AggTimeWeightOutputAvg>,
) {
let selfname = "push_out_and_reset";
// TODO there is not always good enough input to produce a meaningful bin.
@@ -403,7 +398,7 @@ where
lst: Option<EventSingle<EVT>>,
range: BinnedRange<TsNano>,
inner_a: InnerA<EVT>,
out: ContainerBins<EVT>,
out: ContainerBins<EVT, EVT::AggTimeWeightOutputAvg>,
produce_cnt_zero: bool,
}
@@ -693,7 +688,7 @@ where
self.out.len()
}
pub fn output(&mut self) -> ContainerBins<EVT> {
pub fn output(&mut self) -> ContainerBins<EVT, EVT::AggTimeWeightOutputAvg> {
mem::replace(&mut self.out, ContainerBins::new())
}
}