Add timeout test case stub

This commit is contained in:
Dominik Werder
2022-11-18 14:28:39 +01:00
parent dbbc3ab01f
commit d57aa5474e
3 changed files with 80 additions and 4 deletions

View File

@@ -101,7 +101,6 @@ impl<NTY: ScalarOps> BinsDim0<NTY> {
return false;
}
}
return true;
for (a, b) in self.maxs.iter().zip(other.maxs.iter()) {
if !a.equal_slack(b) {
return false;

View File

@@ -40,7 +40,7 @@ fn time_bin_00() {
let deadline = Instant::now() + Duration::from_millis(2000000);
let mut binned_stream = crate::timebin::TimeBinnedStream::new(stream0, edges, true, deadline);
while let Some(item) = binned_stream.next().await {
eprintln!("{item:?}");
//eprintln!("{item:?}");
match item {
Ok(item) => match item {
StreamItem::DataItem(item) => match item {
@@ -66,3 +66,63 @@ fn time_bin_00() {
};
runfut(fut).unwrap()
}
#[test]
fn time_bin_01() {
let fut = async {
let edges = [0, 1, 2, 3, 4, 5, 6, 7, 8].into_iter().map(|x| SEC * x).collect();
let evs0 = make_some_boxed_d0_f32(10, SEC * 1, MS * 500, 0, 1846713782);
let evs1 = make_some_boxed_d0_f32(10, SEC * 6, MS * 500, 0, 1846713781);
let v0 = ChannelEvents::Events(evs0);
let v1 = ChannelEvents::Events(evs1);
let stream0 = stream::iter(vec![
//
sitem_data(v0),
sitem_data(v1),
]);
let stream0 = stream0.then({
let mut i = 0;
move |x| {
let delay = if i == 1 { 2000 } else { 0 };
i += 1;
let dur = Duration::from_millis(delay);
async move {
tokio::time::sleep(dur).await;
x
}
}
});
let stream0 = Box::pin(stream0);
let deadline = Instant::now() + Duration::from_millis(200);
let mut binned_stream = crate::timebin::TimeBinnedStream::new(stream0, edges, true, deadline);
while let Some(item) = binned_stream.next().await {
if true {
eprintln!("{item:?}");
}
match item {
Ok(item) => match item {
StreamItem::DataItem(item) => match item {
RangeCompletableItem::Data(item) => {
if let Some(_) = item.as_any().downcast_ref::<BinsDim0<f32>>() {
} else {
return Err(Error::with_msg_no_trace(format!("bad, got item with unexpected type")));
}
}
RangeCompletableItem::RangeComplete => {}
},
StreamItem::Log(_) => {}
StreamItem::Stats(_) => {}
},
Err(e) => Err(e).unwrap(),
}
}
// TODO assert that we get the bins which are sure to be ready.
// TODO assert correct numbers.
// TODO assert that we don't get bins which may be still changing.
// TODO add similar test case with a RangeComplete event at different places before the timeout.
Ok(())
};
runfut(fut).unwrap()
}
// TODO add test case to observe RangeComplete after binning.

View File

@@ -115,8 +115,25 @@ where
} else {
match self.deadline_fut.poll_unpin(cx) {
Ready(()) => {
// TODO add timeout behavior
todo!();
trace2!("timeout");
let self_range_complete = self.range_complete;
if let Some(binner) = self.binner.as_mut() {
trace2!("bins ready count before finish {}", binner.bins_ready_count());
// TODO rework the finish logic
if self_range_complete {
binner.set_range_complete();
}
trace2!("bins ready count after finish {}", binner.bins_ready_count());
if let Some(bins) = binner.bins_ready() {
self.done_data = true;
return Ready(Some(sitem_data(bins)));
} else {
self.done_data = true;
continue;
}
} else {
continue;
}
}
Pending => {}
}