From 34e82d95849beac648a67f921959ce2a4838b8f7 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Sat, 30 Nov 2024 11:44:29 +0100 Subject: [PATCH] WIP change container type --- src/cbor_stream.rs | 140 ++------ src/collect.rs | 6 +- src/frames/eventsfromframes.rs | 92 +++--- src/frames/inmem.rs | 14 +- src/lib.rs | 2 + src/plaineventsjson.rs | 1 + src/plaineventsstream.rs | 39 ++- src/rangefilter2.rs | 56 ++-- src/rangefilter2/test.rs | 586 +++++++++++++++++++-------------- src/rt.rs | 11 + src/tcprawclient.rs | 25 +- src/timebinnedjson.rs | 16 +- 12 files changed, 531 insertions(+), 457 deletions(-) create mode 100644 src/rt.rs diff --git a/src/cbor_stream.rs b/src/cbor_stream.rs index d4fba12..024ce76 100644 --- a/src/cbor_stream.rs +++ b/src/cbor_stream.rs @@ -7,7 +7,6 @@ use bytes::Bytes; use bytes::BytesMut; use futures_util::Stream; use futures_util::StreamExt; -use items_0::collect_s::ToCborValue; use items_0::streamitem::sitem_err2_from_string; use items_0::streamitem::sitem_err_from_string; use items_0::streamitem::LogItem; @@ -17,14 +16,11 @@ use items_0::streamitem::StreamItem; use items_0::Events; use items_0::WithLen; use items_2::channelevents::ChannelEvents; -use items_2::eventsdim0::EventsDim0; -use items_2::eventsdim1::EventsDim1; use items_2::jsonbytes::CborBytes; use netpod::log::Level; use netpod::log::*; use netpod::ScalarType; use netpod::Shape; -use std::io::Cursor; use std::pin::Pin; use std::task::Context; use std::task::Poll; @@ -75,6 +71,7 @@ fn map_events(x: Sitemty) -> Result { Ok(x) => match x { StreamItem::DataItem(x) => match x { RangeCompletableItem::Data(evs) => { + trace!("map_events Data evs len {}", evs.len()); use items_0::apitypes::ToUserFacingApiType; let val = evs.to_user_facing_api_type(); let val = val.to_cbor_value()?; @@ -98,58 +95,23 @@ fn map_events(x: Sitemty) -> Result { } }, StreamItem::Log(item) => { - info!("{item:?}"); - let item = CborBytes::new(Bytes::new()); - Ok(item) - } - StreamItem::Stats(item) => { - info!("{item:?}"); - let item = CborBytes::new(Bytes::new()); - Ok(item) - } - }, - Err(e) => { - use ciborium::cbor; - let item = cbor!({ - "error" => e.to_string(), - }) - .map_err(|e| Error::Msg(e.to_string()))?; - let mut buf = Vec::with_capacity(64); - ciborium::into_writer(&item, &mut buf).map_err(|e| Error::Msg(e.to_string()))?; - let bytes = Bytes::from(buf); - let item = CborBytes::new(bytes); - Ok(item) - } - } -} - -fn map_events_2(x: Sitemty) -> Result { - match x { - Ok(x) => match x { - StreamItem::DataItem(x) => match x { - RangeCompletableItem::Data(evs) => { - let val = evs.to_cbor_value()?; - let mut buf = Vec::with_capacity(64); - ciborium::into_writer(&val, &mut buf).map_err(|e| Error::Msg(e.to_string()))?; - let bytes = Bytes::from(buf); - let item = CborBytes::new(bytes); - Ok(item) + match item.level { + Level::TRACE => { + trace!("{item:?}"); + } + Level::DEBUG => { + debug!("{item:?}"); + } + Level::INFO => { + info!("{item:?}"); + } + Level::WARN => { + warn!("{item:?}"); + } + Level::ERROR => { + error!("{item:?}"); + } } - RangeCompletableItem::RangeComplete => { - use ciborium::cbor; - let val = cbor!({ - "rangeFinal" => true, - }) - .map_err(|e| Error::Msg(e.to_string()))?; - let mut buf = Vec::with_capacity(64); - ciborium::into_writer(&val, &mut buf).map_err(|e| Error::Msg(e.to_string()))?; - let bytes = Bytes::from(buf); - let item = CborBytes::new(bytes); - Ok(item) - } - }, - StreamItem::Log(item) => { - info!("{item:?}"); let item = CborBytes::new(Bytes::new()); Ok(item) } @@ -264,9 +226,10 @@ impl FramedBytesToChannelEventsStream { let item = if let Some(x) = item { Some(x) } else { - let item = decode_cbor_to_box_events(buf, &self.scalar_type, &self.shape)?; - debug!("decoded boxed events len {}", item.len()); - Some(StreamItem::DataItem(RangeCompletableItem::Data(item))) + // let item = decode_cbor_to_box_events(buf, &self.scalar_type, &self.shape)?; + // debug!("decoded boxed events len {}", item.len()); + // Some(StreamItem::DataItem(RangeCompletableItem::Data(item))) + todo!() }; self.buf.advance(adv); if let Some(x) = item { @@ -314,64 +277,3 @@ where } } } - -macro_rules! cbor_scalar { - ($ty:ident, $buf:expr) => {{ - type T = $ty; - type C = EventsDim0; - let item: C = ciborium::from_reader(Cursor::new($buf))?; - Box::new(item) - }}; -} - -macro_rules! cbor_wave { - ($ty:ident, $buf:expr) => {{ - type T = $ty; - type C = EventsDim1; - let item: C = ciborium::from_reader(Cursor::new($buf))?; - Box::new(item) - }}; -} - -fn decode_cbor_to_box_events( - buf: &[u8], - scalar_type: &ScalarType, - shape: &Shape, -) -> Result { - let item: Box = match shape { - Shape::Scalar => match scalar_type { - ScalarType::U8 => cbor_scalar!(u8, buf), - ScalarType::U16 => cbor_scalar!(u16, buf), - ScalarType::U32 => cbor_scalar!(u32, buf), - ScalarType::U64 => cbor_scalar!(u64, buf), - ScalarType::I8 => cbor_scalar!(i8, buf), - ScalarType::I16 => cbor_scalar!(i16, buf), - ScalarType::I32 => cbor_scalar!(i32, buf), - ScalarType::I64 => cbor_scalar!(i64, buf), - ScalarType::F32 => cbor_scalar!(f32, buf), - ScalarType::F64 => cbor_scalar!(f64, buf), - _ => { - return Err(ErrMsg(format!( - "decode_cbor_to_box_events {:?} {:?}", - scalar_type, shape - )) - .into()) - } - }, - Shape::Wave(_) => match scalar_type { - ScalarType::U8 => cbor_wave!(u8, buf), - ScalarType::U16 => cbor_wave!(u16, buf), - ScalarType::I64 => cbor_wave!(i64, buf), - _ => { - return Err(ErrMsg(format!( - "decode_cbor_to_box_events {:?} {:?}", - scalar_type, shape - )) - .into()) - } - }, - Shape::Image(_, _) => todo!(), - }; - // Ok(item); - todo!() -} diff --git a/src/collect.rs b/src/collect.rs index 792a6fa..b7d6a8e 100644 --- a/src/collect.rs +++ b/src/collect.rs @@ -46,7 +46,6 @@ pub enum CollectResult { } pub struct Collect { - // inp: Pin>> + Send>>, inp: Pin> + Send>>, events_max: u64, bytes_max: u64, @@ -64,7 +63,6 @@ where ITEM: CollectableDyn, { pub fn new( - // inp: Pin>> + Send>>, inp: Pin> + Send>>, deadline: Instant, events_max: u64, @@ -107,7 +105,6 @@ where coll.ingest(&mut item); if coll.len() as u64 >= self.events_max { info!("reached events_max {} / {}", coll.len(), self.events_max); - coll.set_continue_at_here(); self.done_input = true; } if coll.byte_estimate() >= self.bytes_max { @@ -116,7 +113,6 @@ where coll.byte_estimate(), self.events_max ); - coll.set_continue_at_here(); self.done_input = true; } Ok(()) @@ -192,7 +188,7 @@ where // TODO use range_final and timeout in result. match self.collector.take() { Some(mut coll) => { - match coll.result(self.range.clone(), self.binrange.clone()) { + match coll.result() { Ok(res) => { //info!("collect stats total duration: {:?}", total_duration); Ready(Ok(CollectResult::Some(res))) diff --git a/src/frames/eventsfromframes.rs b/src/frames/eventsfromframes.rs index ba036e6..28bfa5b 100644 --- a/src/frames/eventsfromframes.rs +++ b/src/frames/eventsfromframes.rs @@ -3,7 +3,6 @@ use futures_util::StreamExt; use items_0::framable::FrameTypeInnerStatic; use items_0::streamitem::sitem_err_from_string; use items_0::streamitem::RangeCompletableItem; -use items_0::streamitem::SitemErrTy; use items_0::streamitem::Sitemty; use items_0::streamitem::StreamItem; use items_2::frame::decode_frame; @@ -19,19 +18,16 @@ use std::task::Poll; #[cstm(name = "FromFrames")] pub enum Error {} -pub struct EventsFromFrames { - inp: Pin, SitemErrTy>> + Send>>, +pub struct EventsFromFrames { + inp: INP, dbgdesc: String, errored: bool, completed: bool, _m1: PhantomData, } -impl EventsFromFrames { - pub fn new( - inp: Pin, SitemErrTy>> + Send>>, - dbgdesc: String, - ) -> Self { +impl EventsFromFrames { + pub fn new(inp: INP, dbgdesc: String) -> Self { Self { inp, dbgdesc, @@ -42,9 +38,10 @@ impl EventsFromFrames { } } -impl Stream for EventsFromFrames +impl Stream for EventsFromFrames where O: FrameTypeInnerStatic + DeserializeOwned + Unpin, + INP: Stream> + Unpin, { type Item = Sitemty; @@ -62,47 +59,54 @@ where } else { match self.inp.poll_next_unpin(cx) { Ready(Some(Ok(item))) => match item { - StreamItem::Log(item) => { - //info!("{} {:?} {}", item.node_ix, item.level, item.msg); - Ready(Some(Ok(StreamItem::Log(item)))) - } + StreamItem::Log(item) => Ready(Some(Ok(StreamItem::Log(item)))), StreamItem::Stats(item) => Ready(Some(Ok(StreamItem::Stats(item)))), - StreamItem::DataItem(frame) => match decode_frame::>(&frame) { - Ok(item) => match item { - Ok(item) => match item { - StreamItem::DataItem(item2) => match item2 { - RangeCompletableItem::Data(item3) => Ready(Some(Ok( - StreamItem::DataItem(RangeCompletableItem::Data(item3)), - ))), - RangeCompletableItem::RangeComplete => { - debug!("EventsFromFrames RangeComplete"); - Ready(Some(Ok(StreamItem::DataItem( - RangeCompletableItem::RangeComplete, - )))) + StreamItem::DataItem(x) => match x { + RangeCompletableItem::Data(frame) => { + match decode_frame::>(&frame) { + Ok(item) => match item { + Ok(item) => match item { + StreamItem::DataItem(item2) => match item2 { + RangeCompletableItem::Data(item3) => { + Ready(Some(Ok(StreamItem::DataItem( + RangeCompletableItem::Data(item3), + )))) + } + RangeCompletableItem::RangeComplete => { + debug!("EventsFromFrames RangeComplete"); + Ready(Some(Ok(StreamItem::DataItem( + RangeCompletableItem::RangeComplete, + )))) + } + }, + StreamItem::Log(k) => { + Ready(Some(Ok(StreamItem::Log(k)))) + } + StreamItem::Stats(k) => { + Ready(Some(Ok(StreamItem::Stats(k)))) + } + }, + Err(e) => { + error!("rcvd err: {}", e); + self.errored = true; + Ready(Some(Err(e))) } }, - StreamItem::Log(k) => { - //info!("rcvd log: {} {:?} {}", k.node_ix, k.level, k.msg); - Ready(Some(Ok(StreamItem::Log(k)))) + Err(e) => { + error!( + "frame payload len {} tyid {:04x} {}", + frame.buf().len(), + frame.tyid(), + e + ); + self.errored = true; + Ready(Some(sitem_err_from_string(e))) } - StreamItem::Stats(k) => Ready(Some(Ok(StreamItem::Stats(k)))), - }, - Err(e) => { - error!("rcvd err: {}", e); - self.errored = true; - Ready(Some(Err(e))) } - }, - Err(e) => { - error!( - "frame payload len {} tyid {:04x} {}", - frame.buf().len(), - frame.tyid(), - e - ); - self.errored = true; - Ready(Some(sitem_err_from_string(e))) } + RangeCompletableItem::RangeComplete => Ready(Some(Ok( + StreamItem::DataItem(RangeCompletableItem::RangeComplete), + ))), }, }, Ready(Some(Err(e))) => { diff --git a/src/frames/inmem.rs b/src/frames/inmem.rs index 3a05026..4f48b59 100644 --- a/src/frames/inmem.rs +++ b/src/frames/inmem.rs @@ -2,7 +2,10 @@ use crate::slidebuf::SlideBuf; use bytes::Bytes; use futures_util::pin_mut; use futures_util::Stream; +use items_0::streamitem::sitem_err2_from_string; +use items_0::streamitem::RangeCompletableItem; use items_0::streamitem::SitemErrTy; +use items_0::streamitem::Sitemty; use items_0::streamitem::StreamItem; use items_0::streamitem::TERM_FRAME_TYPE_ID; use items_2::framable::INMEM_FRAME_FOOT; @@ -182,7 +185,7 @@ impl Stream for InMemoryFrameStream where T: Stream> + Unpin, { - type Item = Result, Error>; + type Item = Sitemty; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; @@ -200,7 +203,7 @@ where if self.buf.len() >= self.need_min { self.done = true; let e = Error::EnoughInputNothingParsed; - Ready(Some(Err(e))) + Ready(Some(Err(sitem_err2_from_string(e)))) } else { continue; } @@ -210,12 +213,13 @@ where self.done = true; continue; } else { - Ready(Some(Ok(StreamItem::DataItem(item)))) + let item = Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))); + Ready(Some(item)) } } Err(e) => { self.done = true; - Ready(Some(Err(e))) + Ready(Some(Err(sitem_err2_from_string(e)))) } } } else { @@ -234,7 +238,7 @@ where self.need_min, self.buf, e ); self.done = true; - Ready(Some(Err(e))) + Ready(Some(Err(sitem_err2_from_string(e)))) } Pending => Pending, } diff --git a/src/lib.rs b/src/lib.rs index 28b5541..8770c14 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -22,6 +22,8 @@ pub mod plaineventsjson; pub mod plaineventsstream; pub mod print_on_done; pub mod rangefilter2; +#[cfg(test)] +pub mod rt; pub mod slidebuf; pub mod streamtimeout; pub mod tcprawclient; diff --git a/src/plaineventsjson.rs b/src/plaineventsjson.rs index d14acad..43339f5 100644 --- a/src/plaineventsjson.rs +++ b/src/plaineventsjson.rs @@ -57,6 +57,7 @@ pub async fn plain_events_json( .await?; debug!("plain_events_json collected"); if let CollectResult::Some(x) = collected { + let x = x.to_user_facing_api_type_box(); let jsval = x.to_json_value()?; debug!("plain_events_json json serialized"); Ok(CollectResult::Some(jsval)) diff --git a/src/plaineventsstream.rs b/src/plaineventsstream.rs index 4337151..b22b66e 100644 --- a/src/plaineventsstream.rs +++ b/src/plaineventsstream.rs @@ -1,3 +1,4 @@ +use crate::rangefilter2::RangeFilter2; use crate::tcprawclient::container_stream_from_bytes_stream; use crate::tcprawclient::make_sub_query; use crate::tcprawclient::OpenBoxedBytesStreamsBox; @@ -57,11 +58,39 @@ pub async fn dyn_events_stream( // TODO propagate also the max-buf-len for the first stage event reader. // TODO use a mixture of count and byte-size as threshold. let stream = Merger::new(inps, evq.merger_out_len_max()); - let stream = crate::rangefilter2::RangeFilter2::new( - stream, - evq.range().try_into()?, - evq.one_before_range(), - ); + let stream = stream.inspect(|x| { + if true { + use items_0::streamitem::RangeCompletableItem::*; + use items_0::streamitem::StreamItem::*; + use items_0::WithLen; + use items_2::channelevents::ChannelEvents; + match x { + Ok(DataItem(Data(ChannelEvents::Events(x)))) => { + trace!("after MERGE yields item len {}", x.len()); + } + _ => { + trace!("after MERGE yields item {:?}", x); + } + } + } + }); + let stream = RangeFilter2::new(stream, evq.range().try_into()?, evq.one_before_range()); + let stream = stream.inspect(|x| { + if true { + use items_0::streamitem::RangeCompletableItem::*; + use items_0::streamitem::StreamItem::*; + use items_0::WithLen; + use items_2::channelevents::ChannelEvents; + match x { + Ok(DataItem(Data(ChannelEvents::Events(x)))) => { + trace!("after merge and filter yields item len {}", x.len()); + } + _ => { + trace!("after merge and filter yields item {:?}", x); + } + } + } + }); if let Some(wasmname) = evq.test_do_wasm() { let stream = transform_wasm::<_, items_0::streamitem::SitemErrTy>(stream, wasmname, ctx).await?; diff --git a/src/rangefilter2.rs b/src/rangefilter2.rs index 5d67ab7..8543859 100644 --- a/src/rangefilter2.rs +++ b/src/rangefilter2.rs @@ -1,6 +1,7 @@ -#[cfg(feature = "tests-runtime")] +#[cfg(test)] mod test; +use crate::log::*; use futures_util::Stream; use futures_util::StreamExt; use items_0::merge::DrainIntoNewResult; @@ -10,17 +11,20 @@ use items_0::streamitem::RangeCompletableItem; use items_0::streamitem::Sitemty; use items_0::streamitem::StatsItem; use items_0::streamitem::StreamItem; -use netpod::log::*; use netpod::range::evrange::NanoRange; use netpod::RangeFilterStats; -use netpod::TsMsVecFmt; use netpod::TsNano; +use netpod::TsNanoVecFmt; use std::fmt; use std::pin::Pin; use std::task::Context; use std::task::Poll; -macro_rules! trace_emit { ($det:expr, $($arg:tt)*) => ( if false && $det { trace!($($arg)*); } ) } +macro_rules! trace_inp { ($det:expr, $($arg:tt)*) => ( if false && $det { trace!($($arg)*); } ) } + +macro_rules! trace_init { ($det:expr, $($arg:tt)*) => ( if false { trace!($($arg)*); } ) } + +macro_rules! trace_emit { ($det:expr, $($arg:tt)*) => ( if false { trace!($($arg)*); } ) } #[derive(Debug, thiserror::Error)] #[cstm(name = "Rangefilter")] @@ -28,12 +32,12 @@ pub enum Error { DrainUnclean, } -pub struct RangeFilter2 +pub struct RangeFilter2 where - S: Stream> + Unpin, + INP: Stream> + Unpin, ITY: MergeableTy, { - inp: S, + inp: INP, range: NanoRange, range_str: String, one_before: bool, @@ -47,19 +51,19 @@ where trdet: bool, } -impl RangeFilter2 +impl RangeFilter2 where - S: Stream> + Unpin, + INP: Stream> + Unpin, ITY: MergeableTy, { pub fn type_name() -> &'static str { std::any::type_name::() } - pub fn new(inp: S, range: NanoRange, one_before: bool) -> Self { + pub fn new(inp: INP, range: NanoRange, one_before: bool) -> Self { let trdet = false; - trace_emit!( - trdet, + trace_init!( + self.trdet, "{}::new range: {:?} one_before {:?}", Self::type_name(), range, @@ -129,16 +133,17 @@ where } let min = item.ts_min(); let max = item.ts_max(); - trace_emit!( + trace_inp!( self.trdet, "see event len {} min {:?} max {:?}", item.len(), min, max ); - let mut item = self.prune_high(item, TsNano::from_ns(self.range.end))?; + let mut item = self.prune_high(item, self.range.end_ts())?; + trace_inp!(self.trdet, "item len after prune_high {}", item.len()); if self.one_before { - let lige = item.find_lowest_index_ge(TsNano::from_ns(self.range.beg)); + let lige = item.find_lowest_index_ge(self.range.beg_ts()); trace_emit!(self.trdet, "YES one_before_range ilge {:?}", lige); match lige { Some(lige) => { @@ -162,7 +167,7 @@ where } None => { // TODO keep stats about this case - trace_emit!(self.trdet, "drain into to keep one before"); + trace_emit!(self.trdet, "drain into to keep one before",); let n = item.len(); match item.drain_into_new(n.max(1) - 1..n) { DrainIntoNewResult::Done(keep) => { @@ -195,9 +200,9 @@ where } } -impl RangeFilter2 +impl RangeFilter2 where - S: Stream> + Unpin, + INP: Stream> + Unpin, ITY: MergeableTy, { fn poll_next( @@ -220,9 +225,8 @@ where } else if self.inp_done { self.raco_done = true; if self.have_range_complete { - Ready(Some(Ok(StreamItem::DataItem( - RangeCompletableItem::RangeComplete, - )))) + let item = Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)); + Ready(Some(item)) } else { continue; } @@ -235,7 +239,7 @@ where trace_emit!( self.trdet, "emit {}", - TsMsVecFmt(MergeableTy::tss_for_testing(&item).iter()) + TsNanoVecFmt(MergeableTy::tss_for_testing(&item).iter()) ); let item = Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))); @@ -272,9 +276,9 @@ where } } -impl Stream for RangeFilter2 +impl Stream for RangeFilter2 where - S: Stream> + Unpin, + INP: Stream> + Unpin, ITY: MergeableTy, { type Item = Sitemty; @@ -287,9 +291,9 @@ where } } -impl fmt::Debug for RangeFilter2 +impl fmt::Debug for RangeFilter2 where - S: Stream> + Unpin, + INP: Stream> + Unpin, ITY: MergeableTy, { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { diff --git a/src/rangefilter2/test.rs b/src/rangefilter2/test.rs index 5abe556..b8193a7 100644 --- a/src/rangefilter2/test.rs +++ b/src/rangefilter2/test.rs @@ -1,267 +1,371 @@ use crate::rangefilter2::RangeFilter2; +use crate::rt::run_test; use futures_util::Stream; use futures_util::StreamExt; use items_0::streamitem::RangeCompletableItem; use items_0::streamitem::Sitemty; use items_0::streamitem::StreamItem; use items_0::Events; +use items_2::binning::container_events::ContainerEvents; +use items_2::channelevents::ChannelEvents; use netpod::range::evrange::NanoRange; +use netpod::DtMs; +use netpod::DtNano; use netpod::TsNano; use std::collections::VecDeque; -#[test] -fn test_00() { - use items_0::Empty; - use items_2::eventsdim0::EventsDim0; - let ms = 1_000_000; - let beg = TsNano::from_ms(1000 * 10); - let end = TsNano::from_ms(1000 * 20); - let mut item1 = EventsDim0::::empty(); - item1.push_back(beg.ns() + 0 * ms, 0, 3.); - item1.push_back(beg.ns() + 1 * ms, 0, 3.1); - item1.push_back(beg.ns() + 2 * ms, 0, 3.2); - item1.push_back(beg.ns() + 3 * ms, 0, 3.3); - item1.push_back(beg.ns() + 4 * ms, 0, 3.4); - item1.push_back(end.ns() - 1, 0, 4.0); - item1.push_back(end.ns() + 0, 0, 4.1); - item1.push_back(end.ns() + 1, 0, 4.1); - let w1: Box = Box::new(item1.clone()); - let e1 = Ok(StreamItem::DataItem(RangeCompletableItem::Data(w1))); - let inp = futures_util::stream::iter([e1]); - let one_before_range = false; - let range = NanoRange::from((beg.ns(), end.ns())); - let stream = RangeFilter2::new(inp, range, one_before_range); - let fut = async move { - let tss_items = fetch_into_tss_items(stream).await; - let exp: &[&[u64]] = &[&[ - beg.ns() + 0 * ms, - beg.ns() + 1 * ms, - beg.ns() + 2 * ms, - beg.ns() + 3 * ms, - beg.ns() + 4 * ms, - end.ns() - 1, - ]]; - assert_eq!(&tss_items, &exp); - Ok::<_, Error>(()) - }; - taskrun::run(fut).unwrap(); +#[derive(Debug, thiserror::Error)] +pub enum Error {} + +fn pu(c: &mut ContainerEvents, ts: TsNano, v: f32) { + c.push_back(ts, v); } -#[test] -fn test_cut_before_00() { - use items_0::Empty; - use items_2::eventsdim0::EventsDim0; - let ms = 1_000_000; - let beg = TsNano::from_ms(1000 * 10); - let end = TsNano::from_ms(1000 * 20); - let mut items = Vec::new(); - { - let mut item = EventsDim0::::empty(); - item.push_back(beg.ns() - 1, 0, 2.9); - let w: Box = Box::new(item.clone()); - let e: Sitemty<_> = Ok(StreamItem::DataItem(RangeCompletableItem::Data(w))); - items.push(e); - } - { - let mut item = EventsDim0::::empty(); - item.push_back(beg.ns() + 0 * ms, 0, 3.); - item.push_back(beg.ns() + 1 * ms, 0, 3.1); - item.push_back(beg.ns() + 2 * ms, 0, 3.2); - item.push_back(beg.ns() + 3 * ms, 0, 3.3); - item.push_back(beg.ns() + 4 * ms, 0, 3.4); - item.push_back(end.ns() - 1, 0, 4.0); - item.push_back(end.ns() + 0, 0, 4.1); - item.push_back(end.ns() + 1, 0, 4.1); - let w: Box = Box::new(item.clone()); - let e: Sitemty<_> = Ok(StreamItem::DataItem(RangeCompletableItem::Data(w))); - items.push(e); - } - let inp = futures_util::stream::iter(items); - let one_before_range = false; - let range = NanoRange::from((beg.ns(), end.ns())); - let stream = RangeFilter2::new(inp, range, one_before_range); - let fut = async move { - let tss_items = fetch_into_tss_items(stream).await; - let exp: &[&[u64]] = &[ - // TODO in the future this empty may be discarded - &[], - &[ - beg.ns() + 0 * ms, - beg.ns() + 1 * ms, - beg.ns() + 2 * ms, - beg.ns() + 3 * ms, - beg.ns() + 4 * ms, - end.ns() - 1, - ], - ]; - assert_eq!(&tss_items, &exp); - Ok::<_, Error>(()) - }; - taskrun::run(fut).unwrap(); +fn dataitem(c: ContainerEvents) -> Sitemty { + Ok(StreamItem::DataItem(RangeCompletableItem::Data( + ChannelEvents::Events(Box::new(c)), + ))) } -#[test] -fn test_one_before_00() { - use items_0::Empty; - use items_2::eventsdim0::EventsDim0; - let ms = 1_000_000; - let beg = TsNano::from_ms(1000 * 10); - let end = TsNano::from_ms(1000 * 20); - let mut items = Vec::new(); - { - let mut item = EventsDim0::::empty(); - item.push_back(beg.ns() - 1, 0, 2.9); - let w: Box = Box::new(item.clone()); - let e: Sitemty<_> = Ok(StreamItem::DataItem(RangeCompletableItem::Data(w))); - items.push(e); - } - { - let mut item = EventsDim0::::empty(); - item.push_back(beg.ns() + 0 * ms, 0, 3.); - item.push_back(beg.ns() + 1 * ms, 0, 3.1); - item.push_back(beg.ns() + 2 * ms, 0, 3.2); - item.push_back(beg.ns() + 3 * ms, 0, 3.3); - item.push_back(beg.ns() + 4 * ms, 0, 3.4); - item.push_back(end.ns() - 1, 0, 4.0); - item.push_back(end.ns() + 0, 0, 4.1); - item.push_back(end.ns() + 1, 0, 4.1); - let w: Box = Box::new(item.clone()); - let e: Sitemty<_> = Ok(StreamItem::DataItem(RangeCompletableItem::Data(w))); - items.push(e); - } - let inp = futures_util::stream::iter(items); - let one_before_range = true; - let range = NanoRange::from((beg.ns(), end.ns())); - let stream = RangeFilter2::new(inp, range, one_before_range); - let fut = async move { - let tss_items = fetch_into_tss_items(stream).await; - let exp: &[&[u64]] = &[ - // TODO in the future this empty may be discarded - &[], - &[ - // - beg.ns() - 1, - ], - &[ - beg.ns() + 0 * ms, - beg.ns() + 1 * ms, - beg.ns() + 2 * ms, - beg.ns() + 3 * ms, - beg.ns() + 4 * ms, - end.ns() - 1, - ], - ]; - assert_eq!(&tss_items, &exp); - Ok::<_, Error>(()) - }; - taskrun::run(fut).unwrap(); -} - -#[test] -fn test_one_before_01() { - use items_0::Empty; - use items_2::eventsdim0::EventsDim0; - let ms = 1_000_000; - let beg = TsNano::from_ms(1000 * 10); - let end = TsNano::from_ms(1000 * 20); - let mut items = Vec::new(); - { - let mut item = EventsDim0::::empty(); - item.push_back(beg.ns() - 1, 0, 2.9); - item.push_back(beg.ns() + 0 * ms, 0, 3.); - let w: Box = Box::new(item.clone()); - let e: Sitemty<_> = Ok(StreamItem::DataItem(RangeCompletableItem::Data(w))); - items.push(e); - } - { - let mut item = EventsDim0::::empty(); - item.push_back(beg.ns() + 1 * ms, 0, 3.1); - item.push_back(beg.ns() + 2 * ms, 0, 3.2); - item.push_back(beg.ns() + 3 * ms, 0, 3.3); - item.push_back(beg.ns() + 4 * ms, 0, 3.4); - item.push_back(end.ns() - 1, 0, 4.0); - item.push_back(end.ns() + 0, 0, 4.1); - item.push_back(end.ns() + 1, 0, 4.1); - let w: Box = Box::new(item.clone()); - let e: Sitemty<_> = Ok(StreamItem::DataItem(RangeCompletableItem::Data(w))); - items.push(e); - } - let inp = futures_util::stream::iter(items); - let one_before_range = true; - let range = NanoRange::from((beg.ns(), end.ns())); - let stream = RangeFilter2::new(inp, range, one_before_range); - let fut = async move { - let tss_items = fetch_into_tss_items(stream).await; - let exp: &[&[u64]] = &[ - // TODO in the future this empty may be discarded - // &[], - &[ - // - beg.ns() - 1, - beg.ns() + 0 * ms, - ], - &[ - beg.ns() + 1 * ms, - beg.ns() + 2 * ms, - beg.ns() + 3 * ms, - beg.ns() + 4 * ms, - end.ns() - 1, - ], - ]; - assert_eq!(&tss_items, &exp); - Ok::<_, Error>(()) - }; - taskrun::run(fut).unwrap(); -} - -#[test] -fn test_one_before_only() { - use items_0::Empty; - use items_2::eventsdim0::EventsDim0; - let _ms = 1_000_000; - let beg = TsNano::from_ms(1000 * 10); - let end = TsNano::from_ms(1000 * 20); - let mut items = Vec::new(); - { - let mut item = EventsDim0::::empty(); - item.push_back(beg.ns() - 1, 0, 2.9); - let w: Box = Box::new(item.clone()); - let e: Sitemty<_> = Ok(StreamItem::DataItem(RangeCompletableItem::Data(w))); - items.push(e); - } - let inp = futures_util::stream::iter(items); - let one_before_range = true; - let range = NanoRange::from((beg.ns(), end.ns())); - let stream = RangeFilter2::new(inp, range, one_before_range); - let fut = async move { - let tss_items = fetch_into_tss_items(stream).await; - let exp: &[&[u64]] = &[ - // TODO in the future this empty may be discarded - &[], - &[ - // - beg.ns() - 1, - ], - ]; - assert_eq!(&tss_items, &exp); - Ok::<_, Error>(()) - }; - taskrun::run(fut).unwrap(); -} - -#[cfg(test)] -async fn fetch_into_tss_items(mut inp: INP) -> VecDeque> +async fn fetch_into_tss_items(mut inp: INP) -> VecDeque> where - INP: Stream>> + Unpin, + INP: Stream> + Unpin, { let mut tss_items = VecDeque::new(); while let Some(e) = inp.next().await { if let Ok(StreamItem::DataItem(RangeCompletableItem::Data(evs))) = e { - eprintln!("{:?}", evs); - tss_items.push_back(Events::tss(&evs).clone()); + eprintln!("fetch_into_tss_items sees {:?}", evs); + match evs { + ChannelEvents::Events(x) => { + tss_items.push_back(x.tss_for_testing()); + } + ChannelEvents::Status(x) => {} + } } else { eprintln!("other item ----------: {:?}", e); } } tss_items } + +fn gen_vstss(range: &NanoRange, cuts: C) -> VecDeque> +where + C: IntoIterator, +{ + let dt = DtNano::from_ms(1); + let mut cuts: VecDeque<_> = cuts.into_iter().map(|x| TsNano::from_ms(x)).collect(); + let mut ts = range.beg_ts(); + let mut ret = VecDeque::new(); + let mut buf = VecDeque::new(); + loop { + if ts >= range.end_ts() { + break; + } + if cuts.front().map_or(false, |&x| ts >= x) { + cuts.pop_front(); + ret.push_back(std::mem::replace(&mut buf, VecDeque::new())); + } + buf.push_back(ts); + ts = ts.add_dt_nano(dt); + } + ret.push_back(std::mem::replace(&mut buf, VecDeque::new())); + ret +} + +fn gen_vec_evs(range: &NanoRange, cuts: C) -> VecDeque> +where + C: IntoIterator, +{ + let vstss_inp = gen_vstss(range, cuts); + let mut co = ContainerEvents::::new(); + let mut ret = VecDeque::new(); + for b in vstss_inp { + let c = &mut co; + for ts in b { + pu(c, ts, 3.); + } + ret.push_back(co); + co = ContainerEvents::::new(); + } + ret +} + +fn gen_inp_stream(range: &NanoRange, cuts: C) -> impl Stream> +where + C: IntoIterator, +{ + let mut items = VecDeque::new(); + let a = gen_vec_evs(range, cuts); + for evs in a { + items.push_back(dataitem(evs)); + } + eprintln!("INPUT GENERATED {:?}", items); + futures_util::stream::iter(items) +} + +#[test] +fn test_single_prune_nothing_00() { + let range1 = NanoRange::from_ms_u64(10, 20); + let range2 = NanoRange::from_ms_u64(10, 20); + let vtss_exp = gen_vstss(&range1, []); + let inp = gen_inp_stream(&range2, []); + let one_before_range = false; + let stream = RangeFilter2::new(inp, range1, one_before_range); + let fut = async move { + let tss_items = fetch_into_tss_items(stream).await; + eprintln!("{:?}", tss_items); + assert_eq!(tss_items, vtss_exp); + Ok::<_, Error>(()) + }; + run_test(fut).unwrap(); +} + +#[test] +fn test_prune_high_00() { + let range1 = NanoRange::from_ms_u64(10, 20); + let range2 = NanoRange::from_ms_u64(10, 21); + let vtss_exp = gen_vstss(&range1, []); + let inp = gen_inp_stream(&range2, []); + let one_before_range = false; + let stream = RangeFilter2::new(inp, range1, one_before_range); + let fut = async move { + let tss_items = fetch_into_tss_items(stream).await; + // eprintln!("{:?}", tss_items); + assert_eq!(tss_items, vtss_exp); + Ok::<_, Error>(()) + }; + run_test(fut).unwrap(); +} + +#[test] +fn test_prune_high_01() { + let range1 = NanoRange::from_ms_u64(10, 20); + let range2 = NanoRange::from_ms_u64(10, 21); + let vtss_exp = gen_vstss(&range1, [14]); + let inp = gen_inp_stream(&range2, [14]); + let one_before_range = false; + let stream = RangeFilter2::new(inp, range1, one_before_range); + let fut = async move { + let tss_items = fetch_into_tss_items(stream).await; + // eprintln!("{:?}", tss_items); + assert_eq!(tss_items, vtss_exp); + Ok::<_, Error>(()) + }; + run_test(fut).unwrap(); +} + +#[test] +fn test_prune_low() { + let range1 = NanoRange::from_ms_u64(10, 20); + let range2 = NanoRange::from_ms_u64(8, 18); + let range3 = NanoRange::from_ms_u64(10, 18); + let inp = gen_inp_stream(&range2, [14]); + let vtss_exp = gen_vstss(&range3, [14]); + let one_before_range = false; + let stream = RangeFilter2::new(inp, range1, one_before_range); + let fut = async move { + let tss_items = fetch_into_tss_items(stream).await; + // eprintln!("{:?}", tss_items); + assert_eq!(tss_items, vtss_exp); + Ok::<_, Error>(()) + }; + run_test(fut).unwrap(); +} + +// #[test] +// fn test_cut_before_00() { +// let ms = 1_000_000; +// let beg = TsNano::from_ms(1000 * 10); +// let end = TsNano::from_ms(1000 * 20); +// let mut items = Vec::new(); +// { +// let mut item = ContainerEvents::::empty(); +// item.push_back(beg.ns() - 1, 0, 2.9); +// let w: Box = Box::new(item.clone()); +// let e: Sitemty<_> = Ok(StreamItem::DataItem(RangeCompletableItem::Data(w))); +// items.push(e); +// } +// { +// let mut item = ContainerEvents::::empty(); +// item.push_back(beg.ns() + 0 * ms, 0, 3.); +// item.push_back(beg.ns() + 1 * ms, 0, 3.1); +// item.push_back(beg.ns() + 2 * ms, 0, 3.2); +// item.push_back(beg.ns() + 3 * ms, 0, 3.3); +// item.push_back(beg.ns() + 4 * ms, 0, 3.4); +// item.push_back(end.ns() - 1, 0, 4.0); +// item.push_back(end.ns() + 0, 0, 4.1); +// item.push_back(end.ns() + 1, 0, 4.1); +// let w: Box = Box::new(item.clone()); +// let e: Sitemty<_> = Ok(StreamItem::DataItem(RangeCompletableItem::Data(w))); +// items.push(e); +// } +// let inp = futures_util::stream::iter(items); +// let one_before_range = false; +// let range = NanoRange::from((beg.ns(), end.ns())); +// let stream = RangeFilter2::new(inp, range, one_before_range); +// let fut = async move { +// let tss_items = fetch_into_tss_items(stream).await; +// let exp: &[&[u64]] = &[ +// // TODO in the future this empty may be discarded +// &[], +// &[ +// beg.ns() + 0 * ms, +// beg.ns() + 1 * ms, +// beg.ns() + 2 * ms, +// beg.ns() + 3 * ms, +// beg.ns() + 4 * ms, +// end.ns() - 1, +// ], +// ]; +// assert_eq!(&tss_items, &exp); +// Ok::<_, Error>(()) +// }; +// run_test(fut).unwrap(); +// } + +// #[test] +// fn test_one_before_00() { +// let ms = 1_000_000; +// let beg = TsNano::from_ms(1000 * 10); +// let end = TsNano::from_ms(1000 * 20); +// let mut items = Vec::new(); +// { +// let mut item = ContainerEvents::::empty(); +// item.push_back(beg.ns() - 1, 0, 2.9); +// let w: Box = Box::new(item.clone()); +// let e: Sitemty<_> = Ok(StreamItem::DataItem(RangeCompletableItem::Data(w))); +// items.push(e); +// } +// { +// let mut item = ContainerEvents::::empty(); +// item.push_back(beg.ns() + 0 * ms, 0, 3.); +// item.push_back(beg.ns() + 1 * ms, 0, 3.1); +// item.push_back(beg.ns() + 2 * ms, 0, 3.2); +// item.push_back(beg.ns() + 3 * ms, 0, 3.3); +// item.push_back(beg.ns() + 4 * ms, 0, 3.4); +// item.push_back(end.ns() - 1, 0, 4.0); +// item.push_back(end.ns() + 0, 0, 4.1); +// item.push_back(end.ns() + 1, 0, 4.1); +// let w: Box = Box::new(item.clone()); +// let e: Sitemty<_> = Ok(StreamItem::DataItem(RangeCompletableItem::Data(w))); +// items.push(e); +// } +// let inp = futures_util::stream::iter(items); +// let one_before_range = true; +// let range = NanoRange::from((beg.ns(), end.ns())); +// let stream = RangeFilter2::new(inp, range, one_before_range); +// let fut = async move { +// let tss_items = fetch_into_tss_items(stream).await; +// let exp: &[&[u64]] = &[ +// // TODO in the future this empty may be discarded +// &[], +// &[ +// // +// beg.ns() - 1, +// ], +// &[ +// beg.ns() + 0 * ms, +// beg.ns() + 1 * ms, +// beg.ns() + 2 * ms, +// beg.ns() + 3 * ms, +// beg.ns() + 4 * ms, +// end.ns() - 1, +// ], +// ]; +// assert_eq!(&tss_items, &exp); +// Ok::<_, Error>(()) +// }; +// taskrun::run(fut).unwrap(); +// } + +// #[test] +// fn test_one_before_01() { +// use items_0::Empty; +// use items_2::eventsdim0::EventsDim0; +// let ms = 1_000_000; +// let beg = TsNano::from_ms(1000 * 10); +// let end = TsNano::from_ms(1000 * 20); +// let mut items = Vec::new(); +// { +// let mut item = EventsDim0::::empty(); +// item.push_back(beg.ns() - 1, 0, 2.9); +// item.push_back(beg.ns() + 0 * ms, 0, 3.); +// let w: Box = Box::new(item.clone()); +// let e: Sitemty<_> = Ok(StreamItem::DataItem(RangeCompletableItem::Data(w))); +// items.push(e); +// } +// { +// let mut item = EventsDim0::::empty(); +// item.push_back(beg.ns() + 1 * ms, 0, 3.1); +// item.push_back(beg.ns() + 2 * ms, 0, 3.2); +// item.push_back(beg.ns() + 3 * ms, 0, 3.3); +// item.push_back(beg.ns() + 4 * ms, 0, 3.4); +// item.push_back(end.ns() - 1, 0, 4.0); +// item.push_back(end.ns() + 0, 0, 4.1); +// item.push_back(end.ns() + 1, 0, 4.1); +// let w: Box = Box::new(item.clone()); +// let e: Sitemty<_> = Ok(StreamItem::DataItem(RangeCompletableItem::Data(w))); +// items.push(e); +// } +// let inp = futures_util::stream::iter(items); +// let one_before_range = true; +// let range = NanoRange::from((beg.ns(), end.ns())); +// let stream = RangeFilter2::new(inp, range, one_before_range); +// let fut = async move { +// let tss_items = fetch_into_tss_items(stream).await; +// let exp: &[&[u64]] = &[ +// // TODO in the future this empty may be discarded +// // &[], +// &[ +// // +// beg.ns() - 1, +// beg.ns() + 0 * ms, +// ], +// &[ +// beg.ns() + 1 * ms, +// beg.ns() + 2 * ms, +// beg.ns() + 3 * ms, +// beg.ns() + 4 * ms, +// end.ns() - 1, +// ], +// ]; +// assert_eq!(&tss_items, &exp); +// Ok::<_, Error>(()) +// }; +// taskrun::run(fut).unwrap(); +// } + +// #[test] +// fn test_one_before_only() { +// use items_0::Empty; +// use items_2::eventsdim0::EventsDim0; +// let _ms = 1_000_000; +// let beg = TsNano::from_ms(1000 * 10); +// let end = TsNano::from_ms(1000 * 20); +// let mut items = Vec::new(); +// { +// let mut item = EventsDim0::::empty(); +// item.push_back(beg.ns() - 1, 0, 2.9); +// let w: Box = Box::new(item.clone()); +// let e: Sitemty<_> = Ok(StreamItem::DataItem(RangeCompletableItem::Data(w))); +// items.push(e); +// } +// let inp = futures_util::stream::iter(items); +// let one_before_range = true; +// let range = NanoRange::from((beg.ns(), end.ns())); +// let stream = RangeFilter2::new(inp, range, one_before_range); +// let fut = async move { +// let tss_items = fetch_into_tss_items(stream).await; +// let exp: &[&[u64]] = &[ +// // TODO in the future this empty may be discarded +// &[], +// &[ +// // +// beg.ns() - 1, +// ], +// ]; +// assert_eq!(&tss_items, &exp); +// Ok::<_, Error>(()) +// }; +// taskrun::run(fut).unwrap(); +// } diff --git a/src/rt.rs b/src/rt.rs new file mode 100644 index 0000000..b678de9 --- /dev/null +++ b/src/rt.rs @@ -0,0 +1,11 @@ +use futures_util::Future; + +pub fn run_test(fut: F) -> ::Output +where + F: Future, +{ + tokio::runtime::Builder::new_current_thread() + .build() + .unwrap() + .block_on(fut) +} diff --git a/src/tcprawclient.rs b/src/tcprawclient.rs index 279eda4..5f18e46 100644 --- a/src/tcprawclient.rs +++ b/src/tcprawclient.rs @@ -13,6 +13,7 @@ use items_0::framable::FrameTypeInnerStatic; use items_0::streamitem::sitem_data; use items_0::streamitem::sitem_err2_from_string; use items_0::streamitem::Sitemty; +use items_0::WithLen; use items_2::eventfull::EventFull; use items_2::framable::EventQueryJsonStringFrame; use items_2::framable::Framable; @@ -182,18 +183,30 @@ pub fn container_stream_from_bytes_stream( dbgdesc: String, ) -> Result>, Error> where - T: FrameTypeInnerStatic + DeserializeOwned + Send + Unpin + fmt::Debug + 'static, + T: FrameTypeInnerStatic + DeserializeOwned + Send + Unpin + fmt::Debug + 'static + WithLen, { let frames = InMemoryFrameStream::new(inp, bufcap); let frames = frames.map_err(sitem_err2_from_string); let frames = frames.inspect(|x| { - if false { - eprintln!("container_stream_from_bytes_stream see frame {:?}", x); + if true { + trace!("container_stream_from_bytes_stream see frame {:?}", x); + } + }); + let stream = EventsFromFrames::::new(frames, dbgdesc); + let stream = stream.inspect(|x| { + if true { + use items_0::streamitem::RangeCompletableItem::*; + use items_0::streamitem::StreamItem::*; + match x { + Ok(DataItem(Data(x))) => { + trace!("EventsFromFrames yields item len {}", x.len()); + } + _ => { + trace!("EventsFromFrames yields item {:?}", x); + } + } } }); - // TODO let EventsFromFrames accept also non-boxed input? - let frames = Box::pin(frames); - let stream = EventsFromFrames::::new(frames, dbgdesc); Ok(stream) } diff --git a/src/timebinnedjson.rs b/src/timebinnedjson.rs index ae112e6..92e8f49 100644 --- a/src/timebinnedjson.rs +++ b/src/timebinnedjson.rs @@ -353,7 +353,8 @@ pub async fn timebinned_json( let collres = collected.await?; match collres { CollectResult::Some(collres) => { - let jsval = collres.to_json_value()?; + let x = collres.to_user_facing_api_type_box(); + let jsval = x.to_json_value()?; Ok(CollectResult::Some(jsval)) } CollectResult::Timeout => Ok(CollectResult::Timeout), @@ -363,11 +364,14 @@ pub async fn timebinned_json( fn take_collector_result( coll: &mut Box, ) -> Option { - match coll.result(None, None) { - Ok(collres) => match collres.to_json_value() { - Ok(val) => Some(val), - Err(e) => Some(serde_json::Value::String(format!("{e}"))), - }, + match coll.result() { + Ok(collres) => { + let x = collres.to_user_facing_api_type_box(); + match x.to_json_value() { + Ok(val) => Some(val), + Err(e) => Some(serde_json::Value::String(format!("{e}"))), + } + } Err(e) => Some(serde_json::Value::String(format!("{e}"))), } }