diff --git a/items_2/src/binsdim0.rs b/items_2/src/binsdim0.rs index ed2cb72..c3de48c 100644 --- a/items_2/src/binsdim0.rs +++ b/items_2/src/binsdim0.rs @@ -101,7 +101,6 @@ impl BinsDim0 { return false; } } - return true; for (a, b) in self.maxs.iter().zip(other.maxs.iter()) { if !a.equal_slack(b) { return false; diff --git a/streams/src/test/timebin.rs b/streams/src/test/timebin.rs index 8f285ab..48da89d 100644 --- a/streams/src/test/timebin.rs +++ b/streams/src/test/timebin.rs @@ -40,7 +40,7 @@ fn time_bin_00() { let deadline = Instant::now() + Duration::from_millis(2000000); let mut binned_stream = crate::timebin::TimeBinnedStream::new(stream0, edges, true, deadline); while let Some(item) = binned_stream.next().await { - eprintln!("{item:?}"); + //eprintln!("{item:?}"); match item { Ok(item) => match item { StreamItem::DataItem(item) => match item { @@ -66,3 +66,63 @@ fn time_bin_00() { }; runfut(fut).unwrap() } + +#[test] +fn time_bin_01() { + let fut = async { + let edges = [0, 1, 2, 3, 4, 5, 6, 7, 8].into_iter().map(|x| SEC * x).collect(); + let evs0 = make_some_boxed_d0_f32(10, SEC * 1, MS * 500, 0, 1846713782); + let evs1 = make_some_boxed_d0_f32(10, SEC * 6, MS * 500, 0, 1846713781); + let v0 = ChannelEvents::Events(evs0); + let v1 = ChannelEvents::Events(evs1); + let stream0 = stream::iter(vec![ + // + sitem_data(v0), + sitem_data(v1), + ]); + let stream0 = stream0.then({ + let mut i = 0; + move |x| { + let delay = if i == 1 { 2000 } else { 0 }; + i += 1; + let dur = Duration::from_millis(delay); + async move { + tokio::time::sleep(dur).await; + x + } + } + }); + let stream0 = Box::pin(stream0); + let deadline = Instant::now() + Duration::from_millis(200); + let mut binned_stream = crate::timebin::TimeBinnedStream::new(stream0, edges, true, deadline); + while let Some(item) = binned_stream.next().await { + if true { + eprintln!("{item:?}"); + } + match item { + Ok(item) => match item { + StreamItem::DataItem(item) => match item { + RangeCompletableItem::Data(item) => { + if let Some(_) = item.as_any().downcast_ref::>() { + } else { + return Err(Error::with_msg_no_trace(format!("bad, got item with unexpected type"))); + } + } + RangeCompletableItem::RangeComplete => {} + }, + StreamItem::Log(_) => {} + StreamItem::Stats(_) => {} + }, + Err(e) => Err(e).unwrap(), + } + } + // TODO assert that we get the bins which are sure to be ready. + // TODO assert correct numbers. + // TODO assert that we don't get bins which may be still changing. + // TODO add similar test case with a RangeComplete event at different places before the timeout. + Ok(()) + }; + runfut(fut).unwrap() +} + +// TODO add test case to observe RangeComplete after binning. diff --git a/streams/src/timebin.rs b/streams/src/timebin.rs index c489026..96bb723 100644 --- a/streams/src/timebin.rs +++ b/streams/src/timebin.rs @@ -115,8 +115,25 @@ where } else { match self.deadline_fut.poll_unpin(cx) { Ready(()) => { - // TODO add timeout behavior - todo!(); + trace2!("timeout"); + let self_range_complete = self.range_complete; + if let Some(binner) = self.binner.as_mut() { + trace2!("bins ready count before finish {}", binner.bins_ready_count()); + // TODO rework the finish logic + if self_range_complete { + binner.set_range_complete(); + } + trace2!("bins ready count after finish {}", binner.bins_ready_count()); + if let Some(bins) = binner.bins_ready() { + self.done_data = true; + return Ready(Some(sitem_data(bins))); + } else { + self.done_data = true; + continue; + } + } else { + continue; + } } Pending => {} }