Basic time bin test case

This commit is contained in:
Dominik Werder
2022-11-18 14:03:12 +01:00
parent b3225ae4c1
commit dbbc3ab01f
3 changed files with 107 additions and 13 deletions

View File

@@ -15,6 +15,7 @@ use std::any::Any;
use std::collections::VecDeque;
use std::{fmt, mem};
// TODO make members private
#[derive(Clone, PartialEq, Serialize, Deserialize)]
pub struct BinsDim0<NTY> {
pub ts1s: VecDeque<u64>,
@@ -57,6 +58,15 @@ impl<NTY: ScalarOps> BinsDim0<NTY> {
}
}
pub fn push(&mut self, ts1: u64, ts2: u64, count: u64, min: NTY, max: NTY, avg: f32) {
self.ts1s.push_back(ts1);
self.ts2s.push_back(ts2);
self.counts.push_back(count);
self.mins.push_back(min);
self.maxs.push_back(max);
self.avgs.push_back(avg);
}
pub fn append_zero(&mut self, beg: u64, end: u64) {
self.ts1s.push_back(beg);
self.ts2s.push_back(end);
@@ -74,6 +84,36 @@ impl<NTY: ScalarOps> BinsDim0<NTY> {
self.maxs.extend(src.maxs.drain(..));
self.avgs.extend(src.avgs.drain(..));
}
pub fn equal_slack(&self, other: &Self) -> bool {
for (&a, &b) in self.ts1s.iter().zip(other.ts1s.iter()) {
if a != b {
return false;
}
}
for (&a, &b) in self.ts2s.iter().zip(other.ts2s.iter()) {
if a != b {
return false;
}
}
for (a, b) in self.mins.iter().zip(other.mins.iter()) {
if !a.equal_slack(b) {
return false;
}
}
return true;
for (a, b) in self.maxs.iter().zip(other.maxs.iter()) {
if !a.equal_slack(b) {
return false;
}
}
for (a, b) in self.avgs.iter().zip(other.avgs.iter()) {
if !a.equal_slack(b) {
return false;
}
}
true
}
}
impl<NTY> WithLen for BinsDim0<NTY> {

View File

@@ -111,28 +111,45 @@ pub trait ScalarOps:
fmt::Debug + Clone + PartialOrd + SubFrId + AsPrimF32 + Serialize + Unpin + Send + 'static
{
fn zero() -> Self;
fn equal_slack(&self, rhs: &Self) -> bool;
}
macro_rules! impl_num_ops {
($ty:ident, $zero:expr) => {
($ty:ident, $zero:expr, $equal_slack:ident) => {
impl ScalarOps for $ty {
fn zero() -> Self {
$zero
}
fn equal_slack(&self, rhs: &Self) -> bool {
$equal_slack(*self, *rhs)
}
}
};
}
impl_num_ops!(u8, 0);
impl_num_ops!(u16, 0);
impl_num_ops!(u32, 0);
impl_num_ops!(u64, 0);
impl_num_ops!(i8, 0);
impl_num_ops!(i16, 0);
impl_num_ops!(i32, 0);
impl_num_ops!(i64, 0);
impl_num_ops!(f32, 0.);
impl_num_ops!(f64, 0.);
fn equal_int<T: PartialEq>(a: T, b: T) -> bool {
a == b
}
fn equal_f32(a: f32, b: f32) -> bool {
(a - b).abs() < 1e-4 || (a / b > 0.999 && a / b < 1.001)
}
fn equal_f64(a: f64, b: f64) -> bool {
(a - b).abs() < 1e-6 || (a / b > 0.99999 && a / b < 1.00001)
}
impl_num_ops!(u8, 0, equal_int);
impl_num_ops!(u16, 0, equal_int);
impl_num_ops!(u32, 0, equal_int);
impl_num_ops!(u64, 0, equal_int);
impl_num_ops!(i8, 0, equal_int);
impl_num_ops!(i16, 0, equal_int);
impl_num_ops!(i32, 0, equal_int);
impl_num_ops!(i64, 0, equal_int);
impl_num_ops!(f32, 0., equal_f32);
impl_num_ops!(f64, 0., equal_f64);
#[allow(unused)]
struct Ts(u64);

View File

@@ -1,9 +1,12 @@
use crate::test::runfut;
use err::Error;
use futures_util::{stream, StreamExt};
use items::sitem_data;
use items::{sitem_data, RangeCompletableItem, StreamItem};
use items_2::binsdim0::BinsDim0;
use items_2::testgen::make_some_boxed_d0_f32;
use items_2::{ChannelEvents, ConnStatus, ConnStatusEvent};
use netpod::timeunits::{MS, SEC};
use std::collections::VecDeque;
use std::time::{Duration, Instant};
#[test]
@@ -20,10 +23,44 @@ fn time_bin_00() {
sitem_data(v0),
sitem_data(v4),
]));
let deadline = Instant::now() + Duration::from_millis(2000);
let mut exps = {
let mut d = VecDeque::new();
let mut bins = BinsDim0::empty();
bins.push(SEC * 0, SEC * 1, 0, 0.0, 0.0, 0.0);
bins.push(SEC * 1, SEC * 2, 2, 0.0535830, 100.0589, 50.05624);
bins.push(SEC * 2, SEC * 3, 2, 200.06143, 300.07645, 250.06894);
bins.push(SEC * 3, SEC * 4, 2, 400.08554, 500.05222, 450.06888);
bins.push(SEC * 4, SEC * 5, 2, 600.0025, 700.09094, 650.04675);
d.push_back(bins);
let mut bins = BinsDim0::empty();
bins.push(SEC * 5, SEC * 6, 2, 800.0619, 900.02844, 850.04517);
d.push_back(bins);
d
};
let deadline = Instant::now() + Duration::from_millis(2000000);
let mut binned_stream = crate::timebin::TimeBinnedStream::new(stream0, edges, true, deadline);
while let Some(item) = binned_stream.next().await {
eprintln!("{item:?}");
match item {
Ok(item) => match item {
StreamItem::DataItem(item) => match item {
RangeCompletableItem::Data(item) => {
if let Some(item) = item.as_any().downcast_ref::<BinsDim0<f32>>() {
let exp = exps.pop_front().unwrap();
if !item.equal_slack(&exp) {
return Err(Error::with_msg_no_trace(format!("bad, content not equal")));
}
} else {
return Err(Error::with_msg_no_trace(format!("bad, got item with unexpected type")));
}
}
RangeCompletableItem::RangeComplete => {}
},
StreamItem::Log(_) => {}
StreamItem::Stats(_) => {}
},
Err(e) => Err(e).unwrap(),
}
}
Ok(())
};