Review binned timeout test case

This commit is contained in:
Dominik Werder
2022-11-12 09:06:27 +01:00
parent 67bca9da5e
commit 8584d36d21
5 changed files with 65 additions and 30 deletions

View File

@@ -483,11 +483,17 @@ impl<NTY: NumOps> MinMaxAvgDim0BinsTimeBinner<NTY> {
None None
} }
} }
fn struct_name() -> &'static str {
std::any::type_name::<Self>()
}
} }
impl<NTY: NumOps + 'static> TimeBinnerDyn for MinMaxAvgDim0BinsTimeBinner<NTY> { impl<NTY: NumOps + 'static> TimeBinnerDyn for MinMaxAvgDim0BinsTimeBinner<NTY> {
fn ingest(&mut self, item: &dyn TimeBinnableDyn) { fn ingest(&mut self, item: &dyn TimeBinnableDyn) {
const SELF: &str = "MinMaxAvgDim0BinsTimeBinner"; //const SELF: &str = "MinMaxAvgDim0BinsTimeBinner";
#[allow(non_snake_case)]
let SELF = Self::struct_name();
if item.len() == 0 { if item.len() == 0 {
// Return already here, RangeOverlapInfo would not give much sense. // Return already here, RangeOverlapInfo would not give much sense.
return; return;

View File

@@ -6,6 +6,7 @@ edition = "2021"
[lib] [lib]
path = "src/items_2.rs" path = "src/items_2.rs"
doctest = false
[dependencies] [dependencies]
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }
@@ -22,3 +23,4 @@ err = { path = "../err" }
items = { path = "../items" } items = { path = "../items" }
items_proc = { path = "../items_proc" } items_proc = { path = "../items_proc" }
netpod = { path = "../netpod" } netpod = { path = "../netpod" }
taskrun = { path = "../taskrun" }

View File

@@ -756,7 +756,6 @@ pub async fn binned_collected(
} }
RangeCompletableItem::Data(k) => match k { RangeCompletableItem::Data(k) => match k {
ChannelEvents::Events(events) => { ChannelEvents::Events(events) => {
info!("binned_collected sees\n{:?}", events);
if binner.is_none() { if binner.is_none() {
let bb = events.as_time_binnable().time_binner_new(edges.clone(), do_time_weight); let bb = events.as_time_binnable().time_binner_new(edges.clone(), do_time_weight);
binner = Some(bb); binner = Some(bb);
@@ -766,32 +765,40 @@ pub async fn binned_collected(
flush_binned(binner, &mut coll, bin_count_exp, false)?; flush_binned(binner, &mut coll, bin_count_exp, false)?;
} }
ChannelEvents::Status(item) => { ChannelEvents::Status(item) => {
info!("{:?}", item); trace!("{:?}", item);
} }
}, },
}, },
StreamItem::Log(item) => { StreamItem::Log(item) => {
// TODO collect also errors here? // TODO collect also errors here?
info!("{:?}", item); trace!("{:?}", item);
} }
StreamItem::Stats(item) => { StreamItem::Stats(item) => {
// TODO do something with the stats // TODO do something with the stats
info!("{:?}", item); trace!("{:?}", item);
} }
} }
} }
if let Some(mut binner) = binner { if let Some(mut binner) = binner {
if did_range_complete { if did_range_complete {
trace!("did_range_complete");
binner.set_range_complete(); binner.set_range_complete();
} else {
debug!("range not complete");
} }
if !did_timeout { if did_timeout {
warn!("cycle the binner"); warn!("timeout");
} else {
trace!("cycle the binner");
binner.cycle(); binner.cycle();
} }
trace!("flush binned");
flush_binned(&mut binner, &mut coll, bin_count_exp, false)?; flush_binned(&mut binner, &mut coll, bin_count_exp, false)?;
if coll.is_none() { if coll.is_none() {
warn!("force a bin"); debug!("force a bin");
flush_binned(&mut binner, &mut coll, bin_count_exp, true)?; flush_binned(&mut binner, &mut coll, bin_count_exp, true)?;
} else {
trace!("coll is already some");
} }
} else { } else {
error!("no binner, should always have one"); error!("no binner, should always have one");
@@ -810,3 +817,12 @@ pub async fn binned_collected(
} }
} }
} }
pub fn runfut<T, F>(fut: F) -> Result<T, err::Error>
where
F: std::future::Future<Output = Result<T, Error>>,
{
use futures_util::TryFutureExt;
let fut = fut.map_err(|e| e.into());
taskrun::run(fut)
}

View File

@@ -1,11 +1,12 @@
use crate::binsdim0::BinsDim0CollectedResult; use crate::binsdim0::BinsDim0CollectedResult;
use crate::eventsdim0::EventsDim0; use crate::eventsdim0::EventsDim0;
use crate::merger::ChannelEventsMerger; use crate::merger::ChannelEventsMerger;
use crate::{binned_collected, ChannelEvents, Empty, Events, IsoDateTime}; use crate::{binned_collected, runfut, ChannelEvents, Empty, Events, IsoDateTime};
use crate::{ConnStatus, ConnStatusEvent, Error}; use crate::{ConnStatus, ConnStatusEvent, Error};
use chrono::{TimeZone, Utc}; use chrono::{TimeZone, Utc};
use futures_util::StreamExt; use futures_util::StreamExt;
use items::{RangeCompletableItem, Sitemty, StreamItem}; use items::{RangeCompletableItem, Sitemty, StreamItem};
use netpod::log::*;
use netpod::timeunits::*; use netpod::timeunits::*;
use netpod::{AggKind, BinnedRange, NanoRange, ScalarType, Shape}; use netpod::{AggKind, BinnedRange, NanoRange, ScalarType, Shape};
use std::time::Duration; use std::time::Duration;
@@ -36,8 +37,9 @@ fn merge01() {
assert_eq!(item.as_ref(), events_vec2.get(0)); assert_eq!(item.as_ref(), events_vec2.get(0));
let item = merger.next().await; let item = merger.next().await;
assert_eq!(item.as_ref(), None); assert_eq!(item.as_ref(), None);
Ok(())
}; };
tokio::runtime::Runtime::new().unwrap().block_on(fut); runfut(fut).unwrap();
} }
#[test] #[test]
@@ -71,8 +73,9 @@ fn merge02() {
assert_eq!(item.as_ref(), exp.get(1)); assert_eq!(item.as_ref(), exp.get(1));
let item = merger.next().await; let item = merger.next().await;
assert_eq!(item.as_ref(), None); assert_eq!(item.as_ref(), None);
Ok(())
}; };
tokio::runtime::Runtime::new().unwrap().block_on(fut); runfut(fut).unwrap();
} }
fn push_evd0(vec: &mut Vec<Sitemty<ChannelEvents>>, events: Box<dyn Events>) { fn push_evd0(vec: &mut Vec<Sitemty<ChannelEvents>>, events: Box<dyn Events>) {
@@ -156,8 +159,9 @@ fn merge03() {
assert_eq!(item.as_ref(), events_vec2.get(2)); assert_eq!(item.as_ref(), events_vec2.get(2));
let item = merger.next().await; let item = merger.next().await;
assert_eq!(item.as_ref(), None); assert_eq!(item.as_ref(), None);
Ok(())
}; };
tokio::runtime::Runtime::new().unwrap().block_on(fut); runfut(fut).unwrap();
} }
#[test] #[test]
@@ -260,7 +264,7 @@ fn bin01() {
} }
Ok::<_, Error>(()) Ok::<_, Error>(())
}; };
tokio::runtime::Runtime::new().unwrap().block_on(fut).unwrap(); runfut(fut).unwrap();
} }
#[test] #[test]
@@ -310,11 +314,12 @@ fn bin02() {
eprintln!("collected {:?}", collected); eprintln!("collected {:?}", collected);
Ok::<_, Error>(()) Ok::<_, Error>(())
}; };
tokio::runtime::Runtime::new().unwrap().block_on(fut).unwrap(); runfut(fut).unwrap();
} }
#[test] #[test]
fn bin03() { fn binned_timeout_01() {
trace!("binned_timeout_01 uses a delay");
const TSBASE: u64 = SEC * 1600000000; const TSBASE: u64 = SEC * 1600000000;
fn val(ts: u64) -> f32 { fn val(ts: u64) -> f32 {
2f32 + ((ts / SEC) % 2) as f32 + 0.2 * ((ts / (MS * 100)) % 2) as f32 2f32 + ((ts / SEC) % 2) as f32 + 0.2 * ((ts / (MS * 100)) % 2) as f32
@@ -334,12 +339,12 @@ fn bin03() {
events_vec1.push(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete))); events_vec1.push(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)));
let inp1 = events_vec1; let inp1 = events_vec1;
let inp1 = futures_util::stream::iter(inp1).enumerate().then(|(i, k)| async move { let inp1 = futures_util::stream::iter(inp1).enumerate().then(|(i, k)| async move {
if i == 4 { if i == 5 {
let _ = tokio::time::sleep(Duration::from_millis(10000)).await; let _ = tokio::time::sleep(Duration::from_millis(10000)).await;
} }
k k
}); });
let edges = (0..10).into_iter().map(|x| TSBASE + SEC * 1 + SEC * x).collect(); let edges = (0..10).into_iter().map(|x| TSBASE + SEC * (1 + x)).collect();
let inp1 = Box::pin(inp1) as _; let inp1 = Box::pin(inp1) as _;
let timeout = Duration::from_millis(400); let timeout = Duration::from_millis(400);
let res = binned_collected( let res = binned_collected(
@@ -363,5 +368,5 @@ fn bin03() {
); );
Ok::<_, Error>(()) Ok::<_, Error>(())
}; };
tokio::runtime::Runtime::new().unwrap().block_on(fut).unwrap(); runfut(fut).unwrap();
} }

View File

@@ -66,7 +66,10 @@ pub fn get_runtime_opts(nworkers: usize, nblocking: usize) -> Arc<Runtime> {
} }
} }
pub fn run<T, F: std::future::Future<Output = Result<T, Error>>>(f: F) -> Result<T, Error> { pub fn run<T, F>(f: F) -> Result<T, Error>
where
F: std::future::Future<Output = Result<T, Error>>,
{
let runtime = get_runtime(); let runtime = get_runtime();
let res = runtime.block_on(async { f.await }); let res = runtime.block_on(async { f.await });
match res { match res {
@@ -94,22 +97,25 @@ pub fn tracing_init() {
//let console_layer = console_subscriber::spawn(); //let console_layer = console_subscriber::spawn();
//tracing_subscriber::registry().with(console_layer).with(trsub).init(); //tracing_subscriber::registry().with(console_layer).with(trsub).init();
//console_subscriber::init(); //console_subscriber::init();
#[allow(unused)]
let log_filter = tracing_subscriber::EnvFilter::new(
[
//"tokio=trace",
//"runtime=trace",
"warn",
"disk::binned::pbv=trace",
"[log_span_d]=debug",
"[log_span_t]=trace",
]
.join(","),
);
tracing_subscriber::fmt() tracing_subscriber::fmt()
.with_timer(timer) .with_timer(timer)
.with_target(true) .with_target(true)
.with_thread_names(true) .with_thread_names(true)
//.with_max_level(tracing::Level::INFO) //.with_max_level(tracing::Level::INFO)
.with_env_filter(tracing_subscriber::EnvFilter::new( //.with_env_filter(log_filter)
[ .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
//"tokio=trace",
//"runtime=trace",
"info",
"disk::binned::pbv=trace",
"[log_span_d]=debug",
"[log_span_t]=trace",
]
.join(","),
))
.init(); .init();
*g = 1; *g = 1;
//warn!("tracing_init done"); //warn!("tracing_init done");