WIP fix tests

This commit is contained in:
Dominik Werder
2023-05-03 17:34:50 +02:00
parent 479cec75e7
commit 03854395ff
28 changed files with 402 additions and 717 deletions

View File

@@ -14,6 +14,7 @@ use items_2::eventsdim0::EventsDim0;
use items_2::eventsdim1::EventsDim1;
use netpod::log::*;
use netpod::range::evrange::SeriesRange;
use netpod::timeunits::DAY;
use netpod::timeunits::MS;
use std::f64::consts::PI;
use std::pin::Pin;
@@ -21,25 +22,27 @@ use std::task::Context;
use std::task::Poll;
use std::time::Duration;
pub struct GenerateI32 {
pub struct GenerateI32V00 {
ts: u64,
dts: u64,
tsend: u64,
#[allow(unused)]
c1: u64,
timeout: Option<Pin<Box<dyn Future<Output = ()> + Send>>>,
do_throttle: bool,
done: bool,
done_range_final: bool,
}
impl GenerateI32 {
pub fn new(node_ix: u64, node_count: u64, range: SeriesRange) -> Self {
impl GenerateI32V00 {
pub fn new(node_ix: u64, node_count: u64, range: SeriesRange, one_before_range: bool) -> Self {
let range = match range {
SeriesRange::TimeRange(k) => k,
SeriesRange::PulseRange(_) => todo!(),
};
let dts = MS * 1000 * node_count as u64;
let ts = (range.beg / dts + node_ix) * dts;
let ivl = MS * 1000;
let dts = ivl * node_count as u64;
let ts = (range.beg / ivl + node_ix - if one_before_range { 1 } else { 0 }) * ivl;
let tsend = range.end;
Self {
ts,
@@ -47,6 +50,7 @@ impl GenerateI32 {
tsend,
c1: 0,
timeout: None,
do_throttle: false,
done: false,
done_range_final: false,
}
@@ -72,19 +76,19 @@ impl GenerateI32 {
}
}
impl Stream for GenerateI32 {
impl Stream for GenerateI32V00 {
type Item = Sitemty<ChannelEvents>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
loop {
break if self.done_range_final {
break if self.done {
Ready(None)
} else if self.ts >= self.tsend {
self.done = true;
self.done_range_final = true;
Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete))))
} else if false {
} else if !self.do_throttle {
// To use the generator without throttling, use this scope
Ready(Some(self.make_batch()))
} else if let Some(fut) = self.timeout.as_mut() {
@@ -112,6 +116,8 @@ pub struct GenerateI32V01 {
c1: u64,
node_ix: u64,
timeout: Option<Pin<Box<dyn Future<Output = ()> + Send>>>,
do_throttle: bool,
have_range_final: bool,
done: bool,
done_range_final: bool,
}
@@ -125,7 +131,8 @@ impl GenerateI32V01 {
let ivl = MS * 500;
let dts = ivl * node_count as u64;
let ts = (range.beg / ivl + node_ix - if one_before_range { 1 } else { 0 }) * ivl;
let tsend = range.end;
let tsend = range.end.min(DAY);
let have_range_final = range.end < (DAY - ivl);
info!(
"START GENERATOR GenerateI32V01 ivl {} dts {} ts {} one_before_range {}",
ivl, dts, ts, one_before_range
@@ -138,6 +145,8 @@ impl GenerateI32V01 {
c1: 0,
node_ix,
timeout: None,
do_throttle: false,
have_range_final,
done: false,
done_range_final: false,
}
@@ -148,7 +157,7 @@ impl GenerateI32V01 {
let mut item = EventsDim0::empty();
let mut ts = self.ts;
loop {
if self.ts >= self.tsend || item.byte_estimate() > 40 {
if self.ts >= self.tsend || item.byte_estimate() > 200 {
break;
}
let pulse = ts;
@@ -175,13 +184,17 @@ impl Stream for GenerateI32V01 {
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
loop {
break if self.done_range_final {
break if self.done {
Ready(None)
} else if self.ts >= self.tsend {
self.done = true;
self.done_range_final = true;
Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete))))
} else if false {
if self.have_range_final {
Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete))))
} else {
continue;
}
} else if !self.do_throttle {
// To use the generator without throttling, use this scope
Ready(Some(self.make_batch()))
} else if let Some(fut) = self.timeout.as_mut() {
@@ -207,6 +220,7 @@ pub struct GenerateF64V00 {
tsend: u64,
node_ix: u64,
timeout: Option<Pin<Box<dyn Future<Output = ()> + Send>>>,
do_throttle: bool,
done: bool,
done_range_final: bool,
}
@@ -232,6 +246,7 @@ impl GenerateF64V00 {
tsend,
node_ix,
timeout: None,
do_throttle: false,
done: false,
done_range_final: false,
}
@@ -276,13 +291,13 @@ impl Stream for GenerateF64V00 {
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
loop {
break if self.done_range_final {
break if self.done {
Ready(None)
} else if self.ts >= self.tsend {
self.done = true;
self.done_range_final = true;
Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete))))
} else if false {
} else if !self.do_throttle {
// To use the generator without throttling, use this scope
Ready(Some(self.make_batch()))
} else if let Some(fut) = self.timeout.as_mut() {

View File

@@ -37,8 +37,8 @@ pub async fn plain_events_json(
info!("item after merge: {item:?}");
item
});
#[cfg(DISABLED)]
let stream = crate::rangefilter2::RangeFilter2::new(stream, evq.range().try_into()?, evquery.one_before_range());
//#[cfg(DISABLED)]
let stream = crate::rangefilter2::RangeFilter2::new(stream, evq.range().try_into()?, evq.one_before_range());
#[cfg(DISABLED)]
let stream = stream.map(|item| {
info!("item after rangefilter: {item:?}");

View File

@@ -38,13 +38,6 @@ fn inmem_test_events_d0_i32_01() -> BoxedEventStream {
Box::pin(stream)
}
#[test]
fn empty_input() -> Result<(), Error> {
// TODO with a pipeline of x-binning, merging, t-binning and collection, how do I get a meaningful
// result even if there is no input data at all?
Err(Error::with_msg_no_trace("TODO"))
}
#[test]
fn merge_mergeable_00() -> Result<(), Error> {
let fut = async {
@@ -56,12 +49,6 @@ fn merge_mergeable_00() -> Result<(), Error> {
runfut(fut)
}
#[test]
fn timeout() -> Result<(), Error> {
// TODO expand from items_2::test
Err(Error::with_msg_no_trace("TODO"))
}
fn runfut<T, F>(fut: F) -> Result<T, err::Error>
where
F: std::future::Future<Output = Result<T, Error>>,

View File

@@ -1,5 +1,6 @@
use crate::collect::collect;
use crate::generators::GenerateI32;
use crate::generators::GenerateF64V00;
use crate::generators::GenerateI32V00;
use crate::test::runfut;
use crate::transform::build_event_transform;
use chrono::DateTime;
@@ -30,6 +31,13 @@ use std::collections::VecDeque;
use std::time::Duration;
use std::time::Instant;
fn nano_range_from_str(beg_date: &str, end_date: &str) -> Result<NanoRange, Error> {
let beg_date = beg_date.parse()?;
let end_date = end_date.parse()?;
let range = NanoRange::from_date_time(beg_date, end_date);
Ok(range)
}
#[test]
fn time_bin_00() {
let fut = async {
@@ -38,17 +46,21 @@ fn time_bin_00() {
let min_bin_count = 8;
let binned_range = BinnedRangeEnum::covering_range(range, min_bin_count)?;
let evs0 = make_some_boxed_d0_f32(10, SEC * 1, MS * 500, 0, 1846713782);
let v0 = ChannelEvents::Events(evs0);
let v2 = ChannelEvents::Status(Some(ConnStatusEvent::new(MS * 100, ConnStatus::Connect)));
let v4 = ChannelEvents::Status(Some(ConnStatusEvent::new(MS * 6000, ConnStatus::Disconnect)));
let v00 = ChannelEvents::Events(Box::new(EventsDim0::<f32>::empty()));
let v01 = ChannelEvents::Events(evs0);
let v02 = ChannelEvents::Status(Some(ConnStatusEvent::new(MS * 100, ConnStatus::Connect)));
let v03 = ChannelEvents::Status(Some(ConnStatusEvent::new(MS * 6000, ConnStatus::Disconnect)));
let stream0 = Box::pin(stream::iter(vec![
//
sitem_data(v2),
sitem_data(v0),
sitem_data(v4),
sitem_data(v00),
sitem_data(v02),
sitem_data(v01),
sitem_data(v03),
]));
let mut exps = {
let mut d = VecDeque::new();
let bins = BinsDim0::empty();
d.push_back(bins);
let mut bins = BinsDim0::empty();
bins.push(SEC * 0, SEC * 1, 0, 0.0, 0.0, 0.0);
bins.push(SEC * 1, SEC * 2, 2, 0.0535830, 100.0589, 50.05624);
@@ -61,10 +73,9 @@ fn time_bin_00() {
d.push_back(bins);
d
};
let deadline = Instant::now() + Duration::from_millis(2000000);
let mut binned_stream = crate::timebin::TimeBinnedStream::new(stream0, binned_range, true);
while let Some(item) = binned_stream.next().await {
//eprintln!("{item:?}");
eprintln!("{item:?}");
match item {
Ok(item) => match item {
StreamItem::DataItem(item) => match item {
@@ -72,6 +83,11 @@ fn time_bin_00() {
if let Some(item) = item.as_any_ref().downcast_ref::<BinsDim0<f32>>() {
let exp = exps.pop_front().unwrap();
if !item.equal_slack(&exp) {
eprintln!("-----------------------");
eprintln!("item {:?}", item);
eprintln!("-----------------------");
eprintln!("exp {:?}", exp);
eprintln!("-----------------------");
return Err(Error::with_msg_no_trace(format!("bad, content not equal")));
}
} else {
@@ -98,14 +114,16 @@ fn time_bin_01() {
let range = SeriesRange::TimeRange(range);
let min_bin_count = 8;
let binned_range = BinnedRangeEnum::covering_range(range, min_bin_count)?;
let v00 = ChannelEvents::Events(Box::new(EventsDim0::<f32>::empty()));
let evs0 = make_some_boxed_d0_f32(10, SEC * 1, MS * 500, 0, 1846713782);
let evs1 = make_some_boxed_d0_f32(10, SEC * 6, MS * 500, 0, 1846713781);
let v0 = ChannelEvents::Events(evs0);
let v1 = ChannelEvents::Events(evs1);
let v01 = ChannelEvents::Events(evs0);
let v02 = ChannelEvents::Events(evs1);
let stream0 = stream::iter(vec![
//
sitem_data(v0),
sitem_data(v1),
sitem_data(v00),
sitem_data(v01),
sitem_data(v02),
]);
let stream0 = stream0.then({
let mut i = 0;
@@ -152,13 +170,6 @@ fn time_bin_01() {
runfut(fut).unwrap()
}
fn nano_range_from_str(beg_date: &str, end_date: &str) -> Result<NanoRange, Error> {
let beg_date = beg_date.parse()?;
let end_date = end_date.parse()?;
let range = NanoRange::from_date_time(beg_date, end_date);
Ok(range)
}
#[test]
fn time_bin_02() -> Result<(), Error> {
let fut = async {
@@ -181,7 +192,7 @@ fn time_bin_02() -> Result<(), Error> {
let event_range = binned_range.binned_range_time().full_range();
let series_range = SeriesRange::TimeRange(event_range);
// TODO the test stream must be able to generate also one-before (on demand) and RangeComplete (by default).
let stream = GenerateI32::new(0, 1, series_range);
let stream = GenerateI32V00::new(0, 1, series_range, true);
// TODO apply first some box dyn EventTransform which later is provided by TransformQuery.
// Then the Merge will happen always by default for backends where this is needed.
// TODO then apply the transform chain for the after-merged-stream.
@@ -248,14 +259,55 @@ fn time_bin_02() -> Result<(), Error> {
runfut(fut)
}
//
#[test]
fn time_bin_03() {
let fut = async {
let range = nano_range_from_str("1970-01-01T00:00:00Z", "1970-01-01T00:00:08Z")?;
let range = SeriesRange::TimeRange(range);
let min_bin_count = 8;
let binned_range = BinnedRangeEnum::covering_range(range, min_bin_count)?;
let evs0 = make_some_boxed_d0_f32(10, SEC * 1, MS * 500, 0, 1846713782);
//let v00 = ChannelEvents::Events(Box::new(EventsDim0::<f32>::empty()));
let v01 = ChannelEvents::Events(evs0);
let v02 = ChannelEvents::Status(Some(ConnStatusEvent::new(MS * 100, ConnStatus::Connect)));
let v03 = ChannelEvents::Status(Some(ConnStatusEvent::new(MS * 6000, ConnStatus::Disconnect)));
let stream0 = Box::pin(stream::iter(vec![
//
//sitem_data(v00),
sitem_data(v02),
sitem_data(v01),
sitem_data(v03),
]));
let mut binned_stream = crate::timebin::TimeBinnedStream::new(stream0, binned_range, true);
while let Some(item) = binned_stream.next().await {
eprintln!("{item:?}");
match item {
Err(e) => {
if e.to_string().contains("must emit but can not even create empty A") {
return Ok(());
} else {
return Err(Error::with_msg_no_trace("should not succeed"));
}
}
_ => {
return Err(Error::with_msg_no_trace("should not succeed"));
}
}
}
return Err(Error::with_msg_no_trace("should not succeed"));
};
runfut(fut).unwrap()
}
// TODO add test case to observe RangeComplete after binning.
#[test]
fn transform_chain_correctness_01() -> Result<(), Error> {
type STY = f32;
fn transform_chain_correctness_00() -> Result<(), Error> {
// TODO
//type STY = f32;
//let empty = EventsDim0::<STY>::empty();
let tq = TransformQuery::default_time_binned();
let empty = EventsDim0::<STY>::empty();
build_event_transform(&tq)?;
todo!();
Ok(())
}

View File

@@ -83,14 +83,14 @@ where
}
fn process_item(&mut self, mut item: T) -> () {
info!("process_item {item:?}");
trace2!("process_item {item:?}");
if self.binner.is_none() {
trace!("process_item call time_binner_new");
let binner = item.time_binner_new(self.range.clone(), self.do_time_weight);
self.binner = Some(binner);
}
let binner = self.binner.as_mut().unwrap();
trace!("process_item call binner ingest");
trace2!("process_item call binner ingest");
binner.ingest(&mut item);
}
@@ -100,7 +100,7 @@ where
) -> Result<ControlFlow<Poll<Sitemty<<<T as TimeBinnableTy>::TimeBinner as TimeBinnerTy>::Output>>>, Error> {
use ControlFlow::*;
use Poll::*;
info!("================= handle_data_item");
trace2!("================= handle_data_item");
let item_len = item.len();
self.process_item(item);
let mut do_emit = false;
@@ -127,11 +127,11 @@ where
if let Some(bins) = binner.bins_ready() {
Ok(Break(Ready(sitem_data(bins))))
} else {
warn!("bins ready but got nothing");
warn!("must emit but got nothing");
if let Some(bins) = binner.empty() {
Ok(Break(Ready(sitem_data(bins))))
} else {
let e = Error::with_msg_no_trace("bins ready, but nothing, can not even create empty A");
let e = Error::with_msg_no_trace("must emit but can not even create empty A");
error!("{e}");
Err(e)
}
@@ -152,7 +152,7 @@ where
) -> Result<ControlFlow<Poll<Sitemty<<<T as TimeBinnableTy>::TimeBinner as TimeBinnerTy>::Output>>>, Error> {
use ControlFlow::*;
use Poll::*;
info!("================= handle_item");
trace2!("================= handle_item");
match item {
Ok(item) => match item {
StreamItem::DataItem(item) => match item {
@@ -193,12 +193,12 @@ where
self.done_data = true;
Ok(Break(Ready(sitem_data(bins))))
} else {
warn!("bins ready but got nothing");
warn!("must emit but got nothing");
if let Some(bins) = binner.empty() {
self.done_data = true;
Ok(Break(Ready(sitem_data(bins))))
} else {
let e = Error::with_msg_no_trace("bins ready, but nothing, can not even create empty B");
let e = Error::with_msg_no_trace("must emit but can not even create empty B");
error!("{e}");
self.done_data = true;
Err(e)