diff --git a/Cargo.toml b/Cargo.toml index ddf0b4a..2db9d1c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,3 +32,6 @@ heavy = [] [dev-dependencies] tokio = { version = "1", features = ["rt"] } + +[patch.crates-io] +thiserror = { git = "https://github.com/dominikwerder/thiserror.git", branch = "cstm" } diff --git a/src/binning/container_bins.rs b/src/binning/container_bins.rs index fe85b89..7817539 100644 --- a/src/binning/container_bins.rs +++ b/src/binning/container_bins.rs @@ -760,14 +760,30 @@ where DrainIntoNewResult::Done(dst) } + fn is_strict_monotonic(&self) -> bool { + let mut mono = true; + let n = self.ts1s.len(); + for (&ts_a, &ts_b) in self.ts1s.iter().zip(self.ts1s.range(n.min(1)..)) { + if ts_a >= ts_b { + mono = false; + error!("non-monotonic event data ts1 {} ts2 {}", ts_a, ts_b); + break; + } + } + mono + } + fn is_consistent(&self) -> bool { + let mut good = true; + good &= self.is_strict_monotonic(); let n = self.ts1s.len(); let mut same_len = true; same_len &= n == self.ts2s.len(); same_len &= n == self.cnts.len(); same_len &= n == self.mins.len(); same_len &= n == self.ts2s.len(); - same_len + good &= same_len; + good } } diff --git a/src/binning/container_events.rs b/src/binning/container_events.rs index a53224d..e4109c6 100644 --- a/src/binning/container_events.rs +++ b/src/binning/container_events.rs @@ -9,6 +9,12 @@ use crate::log::*; use crate::offsets::pulse_offs_from_abs; use core::fmt; use core::ops::Range; +use items_0::Appendable; +use items_0::AsAnyMut; +use items_0::AsAnyRef; +use items_0::Empty; +use items_0::TypeName; +use items_0::WithLen; use items_0::apitypes::ToUserFacingApiType; use items_0::apitypes::UserApiType; use items_0::collect_s::CollectableDyn; @@ -25,12 +31,6 @@ use items_0::subfr::SubFrId; use items_0::timebin::BinningggContainerEventsDyn; use items_0::vecpreview::PreviewRange; use items_0::vecpreview::VecPreview; -use items_0::Appendable; -use items_0::AsAnyMut; -use items_0::AsAnyRef; -use items_0::Empty; -use items_0::TypeName; -use items_0::WithLen; use netpod::BinnedRange; use netpod::EnumVariant; use netpod::TsNano; @@ -702,14 +702,14 @@ where mod container_events_serde { use super::ContainerEvents; use super::EventValueType; - use serde::de::MapAccess; - use serde::de::SeqAccess; - use serde::de::Visitor; - use serde::ser::SerializeStruct; use serde::Deserialize; use serde::Deserializer; use serde::Serialize; use serde::Serializer; + use serde::de::MapAccess; + use serde::de::SeqAccess; + use serde::de::Visitor; + use serde::ser::SerializeStruct; use std::cell::RefCell; use std::fmt; use std::marker::PhantomData; @@ -1043,29 +1043,17 @@ where fn find_lowest_index_gt(&self, ts: TsNano) -> Option { let x = self.tss.partition_point(|&x| x <= ts); - if x >= self.tss.len() { - None - } else { - Some(x) - } + if x >= self.tss.len() { None } else { Some(x) } } fn find_lowest_index_ge(&self, ts: TsNano) -> Option { let x = self.tss.partition_point(|&x| x < ts); - if x >= self.tss.len() { - None - } else { - Some(x) - } + if x >= self.tss.len() { None } else { Some(x) } } fn find_highest_index_lt(&self, ts: TsNano) -> Option { let x = self.tss.partition_point(|&x| x < ts); - if x == 0 { - None - } else { - Some(x - 1) - } + if x == 0 { None } else { Some(x - 1) } } fn tss_for_testing(&self) -> VecDeque { @@ -1084,16 +1072,22 @@ where DrainIntoNewResult::Done(dst) } - fn is_consistent(&self) -> bool { - let mut good = true; + fn is_strict_monotonic(&self) -> bool { + let mut mono = true; let n = self.tss.len(); for (&ts1, &ts2) in self.tss.iter().zip(self.tss.range(n.min(1)..n)) { - if ts1 > ts2 { - good = false; - error!("unordered event data ts1 {} ts2 {}", ts1, ts2); + if ts1 >= ts2 { + mono = false; + error!("non-monotonic event data ts1 {} ts2 {}", ts1, ts2); break; } } + mono + } + + fn is_consistent(&self) -> bool { + let mut good = true; + good &= MergeableTy::is_strict_monotonic(self); good } } @@ -1146,6 +1140,10 @@ where } } + fn is_strict_monotonic(&self) -> bool { + MergeableTy::is_strict_monotonic(self) + } + fn is_consistent(&self) -> bool { MergeableTy::is_consistent(self) } @@ -1369,6 +1367,28 @@ where } } +impl ContainerEvents { + pub fn testing_cmp(lhs: &Self, rhs: &Self) -> Result<(), String> { + use fmt::Write; + let mut log = String::new(); + for (j, k) in lhs.iter_zip().zip(rhs.iter_zip()) { + if j.0 != k.0 { + write!(&mut log, "ts mismatch {:?} {:?}", j, k).unwrap(); + log.push('\n'); + } + if j.1 != k.1 { + write!(&mut log, "value mismatch {:?} {:?}", j, k).unwrap(); + log.push('\n'); + } + } + if lhs.len() != rhs.len() { + write!(&mut log, "len mismatch {:?} {:?}", lhs.len(), rhs.len()).unwrap(); + log.push('\n'); + } + if log.len() != 0 { Err(log) } else { Ok(()) } + } +} + #[cfg(test)] mod test_frame { use super::*; @@ -1377,10 +1397,10 @@ mod test_frame { use crate::frame::decode_frame; use crate::inmem::InMemoryFrame; use crate::inmem::ParseResult; - use items_0::streamitem::sitem_data; use items_0::streamitem::RangeCompletableItem; use items_0::streamitem::Sitemty; use items_0::streamitem::StreamItem; + use items_0::streamitem::sitem_data; #[test] fn events_serialize() { diff --git a/src/binning/test/bins00.rs b/src/binning/test/bins00.rs index a34153a..1fba8fa 100644 --- a/src/binning/test/bins00.rs +++ b/src/binning/test/bins00.rs @@ -10,19 +10,20 @@ use crate::binning::timeweight::timeweight_bins_stream::BinnedBinsTimeweightStre use crate::binning::timeweight::timeweight_events::BinnedEventsTimeweight; use futures_util::StreamExt; use items_0::timebin::BinningggContainerBinsDyn; -use netpod::log::*; -use netpod::range::evrange::NanoRange; use netpod::BinnedRange; use netpod::DtMs; use netpod::TsNano; +use netpod::log::*; +use netpod::range::evrange::NanoRange; -#[derive(Debug, thiserror::Error)] -#[cstm(name = "Error")] -enum Error { - Timeweight(#[from] crate::binning::timeweight::timeweight_events::Error), - AssertMsg(String), - Compare(#[from] super::compare::Error), -} +autoerr::create_error_v1!( + name(Error, "Error"), + enum variants { + Timeweight(#[from] crate::binning::timeweight::timeweight_events::Error), + AssertMsg(String), + Compare(#[from] super::compare::Error), + }, +); #[test] fn test_bin_events_f32_simple_01() -> Result<(), Error> { diff --git a/src/channelevents.rs b/src/channelevents.rs index 5d8038a..60fe9f1 100644 --- a/src/channelevents.rs +++ b/src/channelevents.rs @@ -882,6 +882,13 @@ impl MergeableTy for ChannelEvents { } } + fn is_strict_monotonic(&self) -> bool { + match self { + ChannelEvents::Events(x) => x.is_strict_monotonic(), + ChannelEvents::Status(_) => true, + } + } + fn is_consistent(&self) -> bool { match self { ChannelEvents::Events(x) => x.is_consistent(), diff --git a/src/eventfull.rs b/src/eventfull.rs index 25b2bb7..e49285c 100644 --- a/src/eventfull.rs +++ b/src/eventfull.rs @@ -2,21 +2,21 @@ use crate::framable::FrameType; use bytes::BytesMut; use core::ops::Range; use daqbuf_err as err; -use err::thiserror; use err::ThisError; +use err::thiserror; +use items_0::Empty; +use items_0::WithLen; use items_0::container::ByteEstimate; use items_0::framable::FrameTypeInnerStatic; use items_0::merge::DrainIntoDstResult; use items_0::merge::DrainIntoNewResult; use items_0::merge::MergeableTy; use items_0::streamitem::EVENT_FULL_FRAME_TYPE_ID; -use items_0::Empty; -use items_0::WithLen; -#[allow(unused)] -use netpod::log::*; use netpod::ScalarType; use netpod::Shape; use netpod::TsNano; +#[allow(unused)] +use netpod::log::*; use parse::channelconfig::CompressionMethod; use serde::Deserialize; use serde::Deserializer; @@ -255,8 +255,23 @@ impl MergeableTy for EventFull { self.tss.iter().map(|&x| TsNano::from_ns(x)).collect() } + fn is_strict_monotonic(&self) -> bool { + let mut mono = true; + let n = self.tss.len(); + for (&ts_a, &ts_b) in self.tss.iter().zip(self.tss.range(n.min(1)..n)) { + if ts_a >= ts_b { + mono = false; + error!("non-monotonic event data ts1 {} ts2 {}", ts_a, ts_b); + break; + } + } + mono + } + fn is_consistent(&self) -> bool { - true + let mut good = true; + good &= MergeableTy::is_strict_monotonic(self); + good } } diff --git a/src/merger.rs b/src/merger.rs index 317d1a8..8cb5807 100644 --- a/src/merger.rs +++ b/src/merger.rs @@ -1,7 +1,7 @@ #[cfg(test)] mod test; -use crate::log::*; +use crate::log; use futures_util::Stream; use futures_util::StreamExt; use items_0::merge::DrainIntoDstResult; @@ -26,13 +26,19 @@ use std::task::Poll; const OUT_MAX_BYTES: u32 = 1024 * 1024 * 20; const DO_DETECT_NON_MONO: bool = true; -macro_rules! trace2 { ($($arg:expr),*) => ( if false { trace!($($arg),*); } ) } +macro_rules! trace2 { ($($arg:expr),*) => ( if true { log::trace!($($arg),*); } ) } -macro_rules! trace3 { ($($arg:expr),*) => ( if false { trace!($($arg),*); } ) } +macro_rules! trace3 { ($($arg:expr),*) => ( if true { log::trace!($($arg),*); } ) } -macro_rules! trace4 { ($($arg:expr),*) => ( if false { trace!($($arg),*); } ) } +macro_rules! trace4 { ($($arg:expr),*) => ( if true { log::trace!($($arg),*); } ) } -macro_rules! trace_emit { ($($arg:expr),*) => ( if false { trace!($($arg),*); } ) } +macro_rules! trace_emit { ($($arg:expr),*) => ( if true { log::trace!($($arg),*); } ) } + +macro_rules! trace_inp_special { ($($arg:expr),*) => ( if true { log::trace!($($arg),*); } ) } + +macro_rules! trace_emit_special { ($($arg:expr),*) => ( if true { log::trace!($($arg),*); } ) } + +macro_rules! debug_inp { ($($arg:expr),*) => ( if true { log::trace!($($arg),*); } ) } autoerr::create_error_v1!( name(Error, "MergerError"), @@ -42,42 +48,39 @@ autoerr::create_error_v1!( ShouldFindTsMin, ItemShouldHaveTsMax, PartialPathDrainedAllItems, + InputNotStrictMonotonic, + Logic, }, ); type MergeInp = Pin> + Send>>; +struct Inps(Vec>>); + +impl fmt::Debug for Inps { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_tuple("Vec").finish() + } +} + +#[derive(Debug)] pub struct Merger { - inps: Vec>>, + inps: Inps, items: Vec>, out: Option, do_clear_out: bool, out_max_len: usize, range_complete: Vec, - out_of_band_queue: VecDeque>, + outbuf: VecDeque>, log_queue: VecDeque, dim0ix_max: TsNano, done_inp: bool, done_range_complete: bool, complete: bool, -} - -impl fmt::Debug for Merger -where - T: MergeableTy, -{ - fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - let inps: Vec<_> = self.inps.iter().map(|x| x.is_some()).collect(); - fmt.debug_struct(std::any::type_name::()) - .field("inps", &inps) - .field("items", &self.items) - .field("out_max_len", &self.out_max_len) - .field("range_complete", &self.range_complete) - .field("out_of_band_queue", &self.out_of_band_queue.len()) - .field("done_data", &self.done_inp) - .field("done_range_complete", &self.done_range_complete) - .finish() - } + stats_process_item_empty: u32, + take_item_single_inp_cnt: u32, + take_item_full_cnt: u32, + take_item_partial_cnt: u32, } impl Merger @@ -87,18 +90,22 @@ where pub fn new(inps: Vec>, out_max_len: Option) -> Self { let n = inps.len(); Self { - inps: inps.into_iter().map(|x| Some(x)).collect(), + inps: Inps(inps.into_iter().map(|x| Some(x)).collect()), items: (0..n).into_iter().map(|_| None).collect(), out: None, do_clear_out: false, out_max_len: out_max_len.unwrap_or(1000) as usize, range_complete: vec![false; n], - out_of_band_queue: VecDeque::new(), + outbuf: VecDeque::new(), log_queue: VecDeque::new(), dim0ix_max: TsNano::from_ns(0), done_inp: false, done_range_complete: false, complete: false, + stats_process_item_empty: 0, + take_item_single_inp_cnt: 0, + take_item_full_cnt: 0, + take_item_partial_cnt: 0, } } @@ -149,12 +156,17 @@ where self.take_into_output_upto(src, TsNano::from_ns(u64::MAX)) } - fn process(mut self: Pin<&mut Self>, _cx: &mut Context) -> Result, Error> { - use ControlFlow::*; + fn take_outbuf_filtered(&mut self) -> Option { + // TODO currently nothing done here + self.out.take() + } + + fn process(mut self: Pin<&mut Self>, _cx: &mut Context) -> Result<(), Error> { trace4!("process"); let mut log_items = Vec::new(); + let self2 = self.as_mut().get_mut(); let mut tslows = [None, None]; - for (i1, itemopt) in self.items.iter_mut().enumerate() { + for (i1, itemopt) in self2.items.iter_mut().enumerate() { if let Some(item) = itemopt { if let Some(t1) = item.ts_min() { if let Some((_, a)) = tslows[0] { @@ -176,11 +188,9 @@ where tslows[0] = Some((i1, t1)); } } else { - // the item seems empty. - // TODO count for stats. - trace2!("empty item, something to do here?"); + self2.stats_process_item_empty += 1; *itemopt = None; - return Ok(Continue(())); + return Ok(()); } } } @@ -190,7 +200,7 @@ where self.dim0ix_max = *t1; let item = LogItem { node_ix: *i1 as _, - level: Level::INFO, + level: log::Level::INFO, msg: format!( "dim0ix_max {} vs {} diff {}", self.dim0ix_max, @@ -203,35 +213,43 @@ where } } trace4!("tslows {:?}", tslows); - if let Some((il0, _tl0)) = tslows[0] { + let ret = if let Some((il0, _tl0)) = tslows[0] { if let Some((_il1, tl1)) = tslows[1] { // There is a second input, take only up to the second highest timestamp let item = self.items[il0].as_mut().unwrap(); if let Some(th0) = item.ts_max() { if th0 <= tl1 { // Can take the whole item - // TODO gather stats about this case. Should be never for databuffer, and often for scylla. + self.take_item_full_cnt += 1; let mut item = self.items[il0].take().unwrap(); trace3!("Take all from item {:?}", item); match self.take_into_output_all(&mut item) { - DrainIntoDstResult::Done => Ok(Break(())), + DrainIntoDstResult::Done => { + // TODO can we eliminate the unwraps? + self.dim0ix_max = self.out.as_ref().unwrap().ts_max().unwrap(); + Ok(()) + } DrainIntoDstResult::Partial => { // TODO count for stats trace3!("Put item back"); self.items[il0] = Some(item); self.do_clear_out = true; - Ok(Break(())) + // TODO can we eliminate the unwraps? + self.dim0ix_max = self.out.as_ref().unwrap().ts_max().unwrap(); + Ok(()) } DrainIntoDstResult::NotCompatible => { // TODO count for stats trace3!("Put item back"); self.items[il0] = Some(item); self.do_clear_out = true; - Ok(Break(())) + // TODO we assume that nothing got drained. + Ok(()) } } } else { - // Take only up to the lowest ts of the second-lowest input + // Take only up to including the lowest ts of the second-lowest input + self.take_item_partial_cnt += 1; let mut item = self.items[il0].take().unwrap(); trace3!("Take up to {} from item {:?}", tl1, item); match self.take_into_output_upto(&mut item, tl1) { @@ -241,7 +259,9 @@ where Err(Error::PartialPathDrainedAllItems) } else { self.items[il0] = Some(item); - Ok(Break(())) + // TODO can we eliminate the unwraps? + self.dim0ix_max = self.out.as_ref().unwrap().ts_max().unwrap(); + Ok(()) } } DrainIntoDstResult::Partial => { @@ -249,14 +269,17 @@ where trace3!("Put item back because Partial"); self.items[il0] = Some(item); self.do_clear_out = true; - Ok(Break(())) + // TODO can we eliminate the unwraps? + self.dim0ix_max = self.out.as_ref().unwrap().ts_max().unwrap(); + Ok(()) } DrainIntoDstResult::NotCompatible => { // TODO count for stats trace3!("Put item back because NotCompatible"); self.items[il0] = Some(item); self.do_clear_out = true; - Ok(Break(())) + // TODO we assume that nothing got drained. + Ok(()) } } } @@ -265,69 +288,102 @@ where } } else { // No other input, take the whole item + self.take_item_single_inp_cnt += 1; let mut item = self.items[il0].take().unwrap(); trace3!("Take all from item (no other input) {:?}", item); match self.take_into_output_all(&mut item) { - DrainIntoDstResult::Done => Ok(Break(())), + DrainIntoDstResult::Done => { + // TODO can we eliminate the unwraps? + self.dim0ix_max = self.out.as_ref().unwrap().ts_max().unwrap(); + Ok(()) + } DrainIntoDstResult::Partial => { // TODO count for stats trace3!("Put item back"); self.items[il0] = Some(item); self.do_clear_out = true; - Ok(Break(())) + // TODO can we eliminate the unwraps? + self.dim0ix_max = self.out.as_ref().unwrap().ts_max().unwrap(); + Ok(()) } DrainIntoDstResult::NotCompatible => { // TODO count for stats trace3!("Put item back"); self.items[il0] = Some(item); self.do_clear_out = true; - Ok(Break(())) + // TODO we assume that nothing got drained. + Ok(()) } } } } else { Err(Error::ShouldFindTsMin) + }; + let dim0ix_max = self.dim0ix_max; + if let Some((_, tl1)) = tslows[1] { + if tl1 <= dim0ix_max { + for item in self.items.iter_mut().filter_map(|x| x.as_mut()) { + loop { + break if let Some(ts_min) = item.ts_min() { + if ts_min <= dim0ix_max { + // TODO add plain discard function to avoid the loop + match Self::drain_into_new_upto(item, dim0ix_max) { + DrainIntoNewResult::Done(_) => {} + DrainIntoNewResult::Partial(_) => continue, + DrainIntoNewResult::NotCompatible => { + // TODO drain into new result must not have this variant + return Err(Error::Logic); + } + } + } + }; + } + } + } } + ret } fn refill(mut self: Pin<&mut Self>, cx: &mut Context) -> Result, Error> { trace4!("refill"); use Poll::*; let mut has_pending = false; - for i in 0..self.inps.len() { + for i in 0..self.inps.0.len() { if self.items[i].is_none() { - while let Some(inp) = self.inps[i].as_mut() { + while let Some(inp) = self.inps.0[i].as_mut() { match inp.poll_next_unpin(cx) { Ready(Some(Ok(k))) => match k { StreamItem::DataItem(k) => match k { RangeCompletableItem::Data(k) => { + if k.is_strict_monotonic() == false { + return Err(Error::InputNotStrictMonotonic); + } self.items[i] = Some(k); trace4!("refilled {}", i); } RangeCompletableItem::RangeComplete => { self.range_complete[i] = true; - trace!("range_complete {:?}", self.range_complete); + trace_inp_special!("range_complete {:?}", self.range_complete); continue; } }, StreamItem::Log(item) => { // TODO limit queue length - self.out_of_band_queue.push_back(Ok(StreamItem::Log(item))); + self.outbuf.push_back(Ok(StreamItem::Log(item))); continue; } StreamItem::Stats(item) => { // TODO limit queue length - self.out_of_band_queue - .push_back(Ok(StreamItem::Stats(item))); + self.outbuf.push_back(Ok(StreamItem::Stats(item))); continue; } }, Ready(Some(Err(e))) => { - self.inps[i] = None; + self.inps.0[i] = None; return Err(Error::Input(e)); } Ready(None) => { - self.inps[i] = None; + self.inps.0[i] = None; } Pending => { has_pending = true; @@ -349,10 +405,11 @@ where use Poll::*; trace4!("poll3"); #[allow(unused)] - let ninps = self.inps.iter().filter(|a| a.is_some()).count(); + let ninps = self.inps.0.iter().filter(|a| a.is_some()).count(); let nitems = self.items.iter().filter(|a| a.is_some()).count(); let nitemsmissing = self .inps + .0 .iter() .zip(self.items.iter()) .filter(|(a, b)| a.is_some() && b.is_none()) @@ -369,35 +426,36 @@ where } let last_emit = nitems == 0; if nitems != 0 { - match Self::process(Pin::new(&mut self), cx) { - Ok(Break(())) => {} - Ok(Continue(())) => {} + match Self::process(self.as_mut(), cx) { + Ok(()) => {} Err(e) => return Break(Ready(Some(e))), } } - if let Some(o) = self.out.as_ref() { - if o.len() >= self.out_max_len - || o.byte_estimate() >= OUT_MAX_BYTES + if let Some(out) = self.out.as_ref() { + if out.len() >= self.out_max_len + || out.byte_estimate() >= OUT_MAX_BYTES || self.do_clear_out || last_emit { - if o.len() > 2 * self.out_max_len { - debug!("over length item {} vs {}", o.len(), self.out_max_len); + if out.len() > 2 * self.out_max_len { + debug_inp!("over length item {} vs {}", out.len(), self.out_max_len); } - if o.byte_estimate() > 2 * OUT_MAX_BYTES { - debug!( + if out.byte_estimate() > 2 * OUT_MAX_BYTES { + debug_inp!( "over weight item {} vs {}", - o.byte_estimate(), + out.byte_estimate(), OUT_MAX_BYTES ); } trace3!("decide to output"); self.do_clear_out = false; - let item = sitem_data(self.out.take().unwrap()); - self.out_of_band_queue.push_back(item); + if let Some(out) = self.take_outbuf_filtered() { + let item = sitem_data(out); + self.outbuf.push_back(item); + } Continue(()) } else { - trace4!("not enough output yet"); + trace4!("not enough output yet {}", out.len()); Continue(()) } } else { @@ -429,8 +487,8 @@ where fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; - // let span1 = span!(Level::INFO, "Merger", pc = self.poll_count); - let span1 = span!(Level::INFO, "Merger"); + // let span1 = log::span!(log::Level::INFO, "Merger", pc = self.poll_count); + let span1 = log::span!(log::Level::INFO, "Merger"); let _spg = span1.enter(); loop { trace3!("poll"); @@ -441,7 +499,7 @@ where Ready(None) } else if let Some(item) = self.log_queue.pop_front() { Ready(Some(Ok(StreamItem::Log(item)))) - } else if let Some(item) = self.out_of_band_queue.pop_front() { + } else if let Some(item) = self.outbuf.pop_front() { trace_emit!("emit item"); let item = on_sitemty_data!(item, |k: T| { trace_emit!("emit item len {}", k.len()); @@ -458,9 +516,9 @@ where } Ready(None) => { self.done_inp = true; - if let Some(out) = self.out.take() { - trace!("done_data emit buffered len {}", out.len()); - self.out_of_band_queue.push_back(sitem_data(out)); + if let Some(out) = self.take_outbuf_filtered() { + trace_emit_special!("done_data emit buffered len {}", out.len()); + self.outbuf.push_back(sitem_data(out)); } continue; } @@ -470,9 +528,9 @@ where } else { self.done_range_complete = true; if self.range_complete.iter().all(|x| *x) { - trace!("emit RangeComplete"); + trace_emit_special!("emit RangeComplete"); let item = Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)); - self.out_of_band_queue.push_back(item); + self.outbuf.push_back(item); } continue; }; diff --git a/src/merger/test.rs b/src/merger/test.rs index 4b10ece..45e182b 100644 --- a/src/merger/test.rs +++ b/src/merger/test.rs @@ -1,10 +1,31 @@ use super::MergeInp; use super::Merger; +use crate::binning::container_bins::compare_boxed_f32; use crate::binning::container_events::ContainerEvents; -use crate::log::*; +use crate::log; +use futures_util::Future; +use futures_util::Stream; use futures_util::StreamExt; +use items_0::on_sitemty_data; +use items_0::streamitem::Sitemty; use items_0::streamitem::sitem_data; use netpod::TsNano; +use netpod::timeunits::SEC; +use std::pin::Pin; + +macro_rules! error { ($($arg:expr),*) => ( if true { log::error!($($arg),*); } ) } + +macro_rules! trace { ($($arg:expr),*) => ( if true { log::trace!($($arg),*); } ) } + +fn run_test(fut: F) -> F::Output +where + F: Future, +{ + tokio::runtime::Builder::new_current_thread() + .build() + .unwrap() + .block_on(fut) +} async fn merger_00_inner() { let mut evs0 = ContainerEvents::::new(); @@ -18,15 +39,186 @@ async fn merger_00_inner() { let inps = vec![inp0]; let mut merger = Merger::new(inps, None); while let Some(x) = merger.next().await { - trace!("{:?}", x); + log::trace!("{:?}", x); } - trace!("DONE"); + log::trace!("DONE"); } #[test] fn merger_00() { - tokio::runtime::Builder::new_current_thread() - .build() - .unwrap() - .block_on(merger_00_inner()); + run_test(merger_00_inner()); +} + +fn make_container(off: usize, step: usize, cnt: usize) -> ContainerEvents { + let mut ret = ContainerEvents::new(); + let mut ts = off; + for i in 0..cnt { + ret.push_back(TsNano::from_ns(SEC * ts as u64), ts as f32); + ts += step; + } + ret +} + +fn make_stream_items( + off: usize, + step: usize, + cnt_stream: usize, + cnt_item: usize, +) -> Vec>> { + let mut ret = Vec::new(); + let mut cnt_todo = cnt_stream; + let mut off = off; + while cnt_todo > 0 { + let cnt = if cnt_todo > cnt_item { + let x = cnt_item; + cnt_todo -= x; + x + } else { + let x = cnt_todo; + cnt_todo = 0; + x + }; + let c = make_container(off, step, cnt); + ret.push(sitem_data(c)); + off += cnt * step; + } + ret +} + +fn make_stream( + off: usize, + step: usize, + cnt_stream: usize, + cnt_item: usize, +) -> Pin>> + Send>> { + let items = make_stream_items(off, step, cnt_stream, cnt_item); + trace!("items {:?}", items); + let st = futures_util::stream::iter(items); + Box::pin(st) +} + +fn make_streams_exactly_alternating( + off: usize, + n_streams: usize, + cnt_total: usize, + cnt_per_item: usize, +) -> Vec>> + Send>>> { + let mut ret = Vec::new(); + let mut cnt_todo = cnt_total; + let cnt_per_stream = cnt_total / n_streams; + for i in 0..n_streams { + let cnt = if cnt_todo > cnt_per_stream { + let x = cnt_per_stream; + cnt_todo -= x; + x + } else { + let x = cnt_todo; + cnt_todo = 0; + x + }; + let st = make_stream(off + i, n_streams, cnt, cnt_per_item); + ret.push(st); + } + ret +} + +fn make_streams_from_pattern( + pattern: P, +) -> Vec>> + Send>>> +where + P: AsRef<[J]>, + J: AsRef<[K]>, + K: AsRef<[usize]>, +{ + let mut streams = Vec::new(); + for pt1 in pattern.as_ref().iter() { + let mut conts = Vec::new(); + for pt2 in pt1.as_ref().iter() { + let mut c = ContainerEvents::new(); + for &pt3 in pt2.as_ref().iter() { + c.push_back(TsNano::from_ns(SEC * pt3 as u64), pt3 as f32); + } + trace!("made pattern {:?}", c); + conts.push(sitem_data(c)); + } + let st = futures_util::stream::iter(conts); + streams.push( + Box::pin(st) as Pin>> + Send>> + ); + } + streams +} + +async fn merger_alternating_00_inner() { + let exp_01 = make_container(400, 1, 100); + let inps = make_streams_exactly_alternating(400, 3, 100, 7); + let mut merger = Merger::new(inps, None); + let mut bad = false; + while let Some(x) = merger.next().await { + trace!("MERGER OUTPUT {:?}\n\n", x); + let _ = on_sitemty_data!(x, |x| { + match ContainerEvents::testing_cmp(&x, &exp_01) { + Ok(()) => {} + Err(msg) => { + bad = true; + error!("cmp {}", msg); + } + }; + sitem_data(x) + }); + } + if bad { + panic!("bad result"); + } +} + +#[test] +fn merger_alternating_00() { + run_test(merger_alternating_00_inner()); +} + +macro_rules! a { + ($val:expr) => { + &($val[..]) + }; +} + +async fn merger_overlap_00_inner() { + let pattern = [ + &[a!([400, 405, 408, 410, 414]), a!([416, 417, 418, 419])][..], + &[a!([402]), a!([404, 406, 411, 412, 413])][..], + &[ + a!([401]), + a!([403]), + a!([406, 407, 409]), + a!([413, 414, 415]), + ][..], + ]; + let inps = make_streams_from_pattern(pattern); + let exp_00 = make_container(400, 1, 20); + let mut merger = Merger::new(inps, None); + let mut merged = Vec::new(); + while let Some(x) = merger.next().await { + trace!("MERGER OUTPUT {:?}\n\n", x); + let _ = on_sitemty_data!(x, |x| { + merged.push(x); + sitem_data(ContainerEvents::::new()) + }); + } + let mut bad = false; + match ContainerEvents::testing_cmp(&merged[0], &exp_00) { + Ok(()) => {} + Err(msg) => { + bad = true; + error!("cmp {}", msg); + } + }; + if bad { + panic!("bad result"); + } +} + +#[test] +fn merger_overlap_00() { + run_test(merger_overlap_00_inner()); }