From 8432601e2bf0c94b5d6d5eb389eb555fe2051d1f Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Tue, 26 Nov 2024 16:28:50 +0100 Subject: [PATCH] WIP --- src/cbor_stream.rs | 187 +++++++++++++++++---------------------- src/collect.rs | 20 +++-- src/firsterr.rs | 2 +- src/frames.rs | 24 +++++ src/frames/inmem.rs | 1 - src/json_stream.rs | 84 +++--------------- src/lenframe.rs | 72 +++++++++++++++ src/lib.rs | 1 + src/plaineventscbor.rs | 3 +- src/plaineventsjson.rs | 56 ++---------- src/plaineventsstream.rs | 20 ++--- src/tcprawclient.rs | 5 ++ src/test.rs | 1 + src/test/events.rs | 4 +- src/test/framing.rs | 49 ++++++++++ src/timebinnedjson.rs | 2 +- 16 files changed, 278 insertions(+), 253 deletions(-) create mode 100644 src/lenframe.rs create mode 100644 src/test/framing.rs diff --git a/src/cbor_stream.rs b/src/cbor_stream.rs index 594ab37..d4fba12 100644 --- a/src/cbor_stream.rs +++ b/src/cbor_stream.rs @@ -1,3 +1,4 @@ +use crate::plaineventsstream::ChannelEventsStream; use crate::streamtimeout::StreamTimeout2; use crate::streamtimeout::TimeoutableStream; use bytes::Buf; @@ -6,23 +7,21 @@ use bytes::Bytes; use bytes::BytesMut; use futures_util::Stream; use futures_util::StreamExt; +use items_0::collect_s::ToCborValue; use items_0::streamitem::sitem_err2_from_string; use items_0::streamitem::sitem_err_from_string; use items_0::streamitem::LogItem; use items_0::streamitem::RangeCompletableItem; use items_0::streamitem::Sitemty; use items_0::streamitem::StreamItem; -use items_0::AsAnyMut; -use items_0::AsAnyRef; use items_0::Events; use items_0::WithLen; -use items_2::binning::container_events::ContainerEvents; use items_2::channelevents::ChannelEvents; use items_2::eventsdim0::EventsDim0; use items_2::eventsdim1::EventsDim1; +use items_2::jsonbytes::CborBytes; use netpod::log::Level; use netpod::log::*; -use netpod::EnumVariant; use netpod::ScalarType; use netpod::Shape; use std::io::Cursor; @@ -40,6 +39,7 @@ pub enum Error { FromSlice(#[from] std::array::TryFromSliceError), Msg(String), Ciborium(#[from] ciborium::de::Error), + CiboriumValue(#[from] ciborium::value::Error), } struct ErrMsg(E) @@ -55,132 +55,56 @@ where } } -pub struct CborBytes(Bytes); - -impl CborBytes { - pub fn into_inner(self) -> Bytes { - self.0 - } - - pub fn len(&self) -> u32 { - self.0.len() as _ - } -} - -impl WithLen for CborBytes { - fn len(&self) -> usize { - self.len() as usize - } -} - -impl From for Bytes { - fn from(value: CborBytes) -> Self { - value.0 - } -} - pub type CborStream = Pin> + Send>>; -// TODO move this type decl because it is not specific to cbor -pub type SitemtyDynEventsStream = Pin>> + Send>>; - pub fn events_stream_to_cbor_stream( - stream: SitemtyDynEventsStream, + stream: ChannelEventsStream, + ivl: Duration, timeout_provider: Box, ) -> impl Stream> { - let ivl = Duration::from_millis(4000); let stream = TimeoutableStream::new(ivl, timeout_provider, stream); let stream = stream.map(|x| match x { Some(x) => map_events(x), None => make_keepalive(), }); - let prepend = { - let item = make_keepalive(); - futures_util::stream::iter([item]) - }; - prepend.chain(stream) + stream } -fn map_events(x: Sitemty>) -> Result { +fn map_events(x: Sitemty) -> Result { match x { Ok(x) => match x { StreamItem::DataItem(x) => match x { RangeCompletableItem::Data(evs) => { - if false { - // TODO impl generically on EventsDim0 ? - if let Some(evs) = evs.as_any_ref().downcast_ref::>() { - let mut buf = Vec::new(); - ciborium::into_writer(evs, &mut buf) - .map_err(|e| Error::Msg(e.to_string()))?; - let bytes = Bytes::from(buf); - let _item = CborBytes(bytes); - // Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))) - } else { - let _item = LogItem::from_node( - 0, - Level::DEBUG, - format!("cbor stream discarded item"), - ); - // Ok(StreamItem::Log(item)) - }; - } - let mut k = evs; - let evs = if let Some(j) = k.as_any_mut().downcast_mut::() { - match j { - ChannelEvents::Events(m) => { - if let Some(g) = m - .as_any_mut() - .downcast_mut::>() - { - trace!("consider container EnumVariant"); - k - } else { - trace!( - "consider container channel events other events {}", - k.type_name() - ); - k - } - } - ChannelEvents::Status(_) => { - trace!( - "consider container channel events status {}", - k.type_name() - ); - k - } - } - } else { - trace!("consider container else {}", k.type_name()); - k - }; - let buf = evs.to_cbor_vec_u8(); + use items_0::apitypes::ToUserFacingApiType; + let val = evs.to_user_facing_api_type(); + let val = val.to_cbor_value()?; + let mut buf = Vec::with_capacity(64); + ciborium::into_writer(&val, &mut buf).map_err(|e| Error::Msg(e.to_string()))?; let bytes = Bytes::from(buf); - let item = CborBytes(bytes); + let item = CborBytes::new(bytes); Ok(item) } RangeCompletableItem::RangeComplete => { use ciborium::cbor; - let item = cbor!({ + let val = cbor!({ "rangeFinal" => true, }) .map_err(|e| Error::Msg(e.to_string()))?; let mut buf = Vec::with_capacity(64); - ciborium::into_writer(&item, &mut buf) - .map_err(|e| Error::Msg(e.to_string()))?; + ciborium::into_writer(&val, &mut buf).map_err(|e| Error::Msg(e.to_string()))?; let bytes = Bytes::from(buf); - let item = CborBytes(bytes); + let item = CborBytes::new(bytes); Ok(item) } }, StreamItem::Log(item) => { info!("{item:?}"); - let item = CborBytes(Bytes::new()); + let item = CborBytes::new(Bytes::new()); Ok(item) } StreamItem::Stats(item) => { info!("{item:?}"); - let item = CborBytes(Bytes::new()); + let item = CborBytes::new(Bytes::new()); Ok(item) } }, @@ -193,7 +117,58 @@ fn map_events(x: Sitemty>) -> Result { let mut buf = Vec::with_capacity(64); ciborium::into_writer(&item, &mut buf).map_err(|e| Error::Msg(e.to_string()))?; let bytes = Bytes::from(buf); - let item = CborBytes(bytes); + let item = CborBytes::new(bytes); + Ok(item) + } + } +} + +fn map_events_2(x: Sitemty) -> Result { + match x { + Ok(x) => match x { + StreamItem::DataItem(x) => match x { + RangeCompletableItem::Data(evs) => { + let val = evs.to_cbor_value()?; + let mut buf = Vec::with_capacity(64); + ciborium::into_writer(&val, &mut buf).map_err(|e| Error::Msg(e.to_string()))?; + let bytes = Bytes::from(buf); + let item = CborBytes::new(bytes); + Ok(item) + } + RangeCompletableItem::RangeComplete => { + use ciborium::cbor; + let val = cbor!({ + "rangeFinal" => true, + }) + .map_err(|e| Error::Msg(e.to_string()))?; + let mut buf = Vec::with_capacity(64); + ciborium::into_writer(&val, &mut buf).map_err(|e| Error::Msg(e.to_string()))?; + let bytes = Bytes::from(buf); + let item = CborBytes::new(bytes); + Ok(item) + } + }, + StreamItem::Log(item) => { + info!("{item:?}"); + let item = CborBytes::new(Bytes::new()); + Ok(item) + } + StreamItem::Stats(item) => { + info!("{item:?}"); + let item = CborBytes::new(Bytes::new()); + Ok(item) + } + }, + Err(e) => { + use ciborium::cbor; + let item = cbor!({ + "error" => e.to_string(), + }) + .map_err(|e| Error::Msg(e.to_string()))?; + let mut buf = Vec::with_capacity(64); + ciborium::into_writer(&item, &mut buf).map_err(|e| Error::Msg(e.to_string()))?; + let bytes = Bytes::from(buf); + let item = CborBytes::new(bytes); Ok(item) } } @@ -208,18 +183,18 @@ fn make_keepalive() -> Result { let mut buf = Vec::with_capacity(64); ciborium::into_writer(&item, &mut buf).map_err(ErrMsg)?; let bytes = Bytes::from(buf); - let item = Ok(CborBytes(bytes)); + let item = Ok(CborBytes::new(bytes)); item } -pub struct FramedBytesToSitemtyDynEventsStream { +pub struct FramedBytesToChannelEventsStream { inp: S, scalar_type: ScalarType, shape: Shape, buf: BytesMut, } -impl FramedBytesToSitemtyDynEventsStream { +impl FramedBytesToChannelEventsStream { pub fn new(inp: S, scalar_type: ScalarType, shape: Shape) -> Self { Self { inp, @@ -229,7 +204,7 @@ impl FramedBytesToSitemtyDynEventsStream { } } - fn try_parse(&mut self) -> Result>>, Error> { + fn try_parse(&mut self) -> Result>, Error> { // debug!("try_parse {}", self.buf.len()); if self.buf.len() < FRAME_HEAD_LEN { return Ok(None); @@ -252,7 +227,7 @@ impl FramedBytesToSitemtyDynEventsStream { let buf = &self.buf[FRAME_HEAD_LEN..frame_len]; let val: ciborium::Value = ciborium::from_reader(std::io::Cursor::new(buf)).map_err(ErrMsg)?; - // debug!("decoded ciborium value {val:?}"); + debug!("decoded ciborium value {val:?}"); let item = if let Some(map) = val.as_map() { let keys: Vec<&str> = map .iter() @@ -285,6 +260,7 @@ impl FramedBytesToSitemtyDynEventsStream { } else { None }; + let item = None; let item = if let Some(x) = item { Some(x) } else { @@ -302,12 +278,12 @@ impl FramedBytesToSitemtyDynEventsStream { } } -impl Stream for FramedBytesToSitemtyDynEventsStream +impl Stream for FramedBytesToChannelEventsStream where S: Stream> + Unpin, E: std::error::Error, { - type Item = ::Item; + type Item = Sitemty; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; @@ -361,7 +337,7 @@ fn decode_cbor_to_box_events( buf: &[u8], scalar_type: &ScalarType, shape: &Shape, -) -> Result, Error> { +) -> Result { let item: Box = match shape { Shape::Scalar => match scalar_type { ScalarType::U8 => cbor_scalar!(u8, buf), @@ -396,5 +372,6 @@ fn decode_cbor_to_box_events( }, Shape::Image(_, _) => todo!(), }; - Ok(item) + // Ok(item); + todo!() } diff --git a/src/collect.rs b/src/collect.rs index d0818b9..792a6fa 100644 --- a/src/collect.rs +++ b/src/collect.rs @@ -45,8 +45,9 @@ pub enum CollectResult { Some(T), } -pub struct Collect { - inp: Pin>> + Send>>, +pub struct Collect { + // inp: Pin>> + Send>>, + inp: Pin> + Send>>, events_max: u64, bytes_max: u64, range: Option, @@ -58,9 +59,13 @@ pub struct Collect { done_input: bool, } -impl Collect { +impl Collect +where + ITEM: CollectableDyn, +{ pub fn new( - inp: Pin>> + Send>>, + // inp: Pin>> + Send>>, + inp: Pin> + Send>>, deadline: Instant, events_max: u64, bytes_max: u64, @@ -83,7 +88,7 @@ impl Collect { } } - fn handle_item(&mut self, item: Sitemty>) -> Result<(), Error> { + fn handle_item(&mut self, item: Sitemty) -> Result<(), Error> { match item { Ok(item) => match item { StreamItem::DataItem(item) => match item { @@ -164,7 +169,10 @@ impl Collect { } } -impl Future for Collect { +impl Future for Collect +where + ITEM: CollectableDyn, +{ type Output = Result>, Error>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { diff --git a/src/firsterr.rs b/src/firsterr.rs index a002b9a..15335d2 100644 --- a/src/firsterr.rs +++ b/src/firsterr.rs @@ -1,8 +1,8 @@ -use crate::cbor_stream::CborBytes; use futures_util::future; use futures_util::Stream; use futures_util::StreamExt; use items_0::WithLen; +use items_2::jsonbytes::CborBytes; pub fn non_empty(inp: S) -> impl Stream> where diff --git a/src/frames.rs b/src/frames.rs index dbb234f..36c469f 100644 --- a/src/frames.rs +++ b/src/frames.rs @@ -1,2 +1,26 @@ pub mod eventsfromframes; pub mod inmem; + +use bytes::Bytes; +use bytes::BytesMut; +use futures_util::Stream; +use futures_util::StreamExt; +use items_2::framable::Framable; + +#[derive(Debug, thiserror::Error)] +#[cstm(name = "FramedStreamError")] +pub enum Error { + MakeFrame(#[from] items_2::framable::Error), +} + +pub fn frameable_stream_to_bytes_stream(stream: S) -> impl Stream> +where + S: Stream, + T: Framable, +{ + stream.map(|x| { + x.make_frame_dyn() + .map(BytesMut::freeze) + .map_err(|e| e.into()) + }) +} diff --git a/src/frames/inmem.rs b/src/frames/inmem.rs index e5bb1a6..3a05026 100644 --- a/src/frames/inmem.rs +++ b/src/frames/inmem.rs @@ -14,7 +14,6 @@ use netpod::ByteSize; use std::pin::Pin; use std::task::Context; use std::task::Poll; -// use tokio::io::AsyncRead; #[derive(Debug, thiserror::Error)] #[cstm(name = "InMem")] diff --git a/src/json_stream.rs b/src/json_stream.rs index 68f1728..a957429 100644 --- a/src/json_stream.rs +++ b/src/json_stream.rs @@ -1,8 +1,9 @@ -use crate::cbor_stream::SitemtyDynEventsStream; +use crate::plaineventsstream::ChannelEventsStream; use crate::streamtimeout::StreamTimeout2; use crate::streamtimeout::TimeoutableStream; use futures_util::Stream; use futures_util::StreamExt; +use items_0::collect_s::ToJsonValue; use items_0::streamitem::RangeCompletableItem; use items_0::streamitem::Sitemty; use items_0::streamitem::StreamItem; @@ -10,6 +11,7 @@ use items_0::Events; use items_0::WithLen; use items_2::binning::container_events::ContainerEvents; use items_2::channelevents::ChannelEvents; +use items_2::jsonbytes::JsonBytes; use netpod::log::*; use netpod::EnumVariant; use std::pin::Pin; @@ -35,93 +37,31 @@ where } } -pub struct JsonBytes(String); - -impl JsonBytes { - pub fn new>(s: S) -> Self { - Self(s.into()) - } - - pub fn into_inner(self) -> String { - self.0 - } - - pub fn len(&self) -> u32 { - self.0.len() as _ - } -} - -impl WithLen for JsonBytes { - fn len(&self) -> usize { - self.len() as usize - } -} - -impl From for String { - fn from(value: JsonBytes) -> Self { - value.0 - } -} - pub type JsonStream = Pin> + Send>>; pub fn events_stream_to_json_stream( - stream: SitemtyDynEventsStream, + stream: ChannelEventsStream, + ivl: Duration, timeout_provider: Box, ) -> impl Stream> { - let ivl = Duration::from_millis(4000); let stream = TimeoutableStream::new(ivl, timeout_provider, stream); let stream = stream.map(|x| match x { Some(x) => map_events(x), None => make_keepalive(), }); - let prepend = { - let item = make_keepalive(); - futures_util::stream::iter([item]) - }; - prepend.chain(stream) + stream } -fn map_events(x: Sitemty>) -> Result { +fn map_events(x: Sitemty) -> Result +where + T: ToJsonValue, +{ match x { Ok(x) => match x { StreamItem::DataItem(x) => match x { RangeCompletableItem::Data(evs) => { - let mut k = evs; - let evs = if let Some(j) = k.as_any_mut().downcast_mut::() { - match j { - ChannelEvents::Events(m) => { - if let Some(g) = m - .as_any_mut() - .downcast_mut::>() - { - trace!("consider container EnumVariant"); - let mut out = ContainerEvents::new(); - for (&ts, val) in g.iter_zip() { - out.push_back(ts, val.name.to_string()); - } - Box::new(ChannelEvents::Events(Box::new(out))) - } else { - trace!( - "consider container channel events other events {}", - k.type_name() - ); - k - } - } - ChannelEvents::Status(_) => { - trace!( - "consider container channel events status {}", - k.type_name() - ); - k - } - } - } else { - trace!("consider container else {}", k.type_name()); - k - }; - let s = evs.to_json_string(); + let val = evs.to_json_value()?; + let s = serde_json::to_string(&val)?; let item = JsonBytes::new(s); Ok(item) } diff --git a/src/lenframe.rs b/src/lenframe.rs new file mode 100644 index 0000000..92e054a --- /dev/null +++ b/src/lenframe.rs @@ -0,0 +1,72 @@ +use crate::log::*; +use bytes::BufMut; +use bytes::Bytes; +use bytes::BytesMut; +use futures_util::future::ready; +use futures_util::stream; +use futures_util::Stream; +use futures_util::StreamExt; + +pub fn bytes_chunks_to_framed(stream: S) -> impl Stream> +where + S: Stream>, + T: Into, + E: std::error::Error, +{ + stream + // TODO unify this map to padded bytes for both json and cbor output + .flat_map(|x| match x { + Ok(y) => { + let buf = y.into(); + let adv = (buf.len() + 7) / 8 * 8; + let pad = adv - buf.len(); + let mut b2 = BytesMut::with_capacity(16); + b2.put_u32_le(buf.len() as u32); + b2.put_slice(&[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]); + let mut b3 = BytesMut::with_capacity(16); + b3.put_slice(&[0, 0, 0, 0, 0, 0, 0, 0][..pad]); + stream::iter([Ok(b2.freeze()), Ok(buf), Ok(b3.freeze())]) + } + Err(e) => { + error!("{e}"); + stream::iter([Ok(Bytes::new()), Ok(Bytes::new()), Ok(Bytes::new())]) + } + }) + .filter(|x| { + if let Ok(x) = x { + ready(x.len() > 0) + } else { + ready(true) + } + }) +} + +// TODO move this, it's also used by binned. +pub fn bytes_chunks_to_len_framed_str(stream: S) -> impl Stream> +where + S: Stream>, + T: Into, + E: std::error::Error, +{ + stream + .flat_map(|x| match x { + Ok(y) => { + use std::fmt::Write; + let s = y.into(); + let mut b2 = String::with_capacity(16); + write!(b2, "{:15}\n", s.len()).unwrap(); + stream::iter([Ok::<_, E>(b2), Ok(s), Ok(String::from("\n"))]) + } + Err(e) => { + error!("{e}"); + stream::iter([Ok(String::new()), Ok(String::new()), Ok(String::new())]) + } + }) + .filter(|x| { + if let Ok(x) = x { + ready(x.len() > 0) + } else { + ready(true) + } + }) +} diff --git a/src/lib.rs b/src/lib.rs index 3ea898c..28b5541 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -14,6 +14,7 @@ pub mod generators; pub mod instrument; pub mod itemclone; pub mod json_stream; +pub mod lenframe; pub mod lenframed; pub mod needminbuffer; pub mod plaineventscbor; diff --git a/src/plaineventscbor.rs b/src/plaineventscbor.rs index 599d528..75bced1 100644 --- a/src/plaineventscbor.rs +++ b/src/plaineventscbor.rs @@ -23,7 +23,8 @@ pub async fn plain_events_cbor_stream( timeout_provider: Box, ) -> Result { let stream = dyn_events_stream(evq, ch_conf, ctx, open_bytes).await?; - let stream = events_stream_to_cbor_stream(stream, timeout_provider); + let stream = + events_stream_to_cbor_stream(stream, evq.timeout_content_or_default(), timeout_provider); let stream = non_empty(stream); let stream = only_first_err(stream); Ok(Box::pin(stream)) diff --git a/src/plaineventsjson.rs b/src/plaineventsjson.rs index 5c5adc6..d14acad 100644 --- a/src/plaineventsjson.rs +++ b/src/plaineventsjson.rs @@ -6,6 +6,7 @@ use crate::json_stream::events_stream_to_json_stream; use crate::json_stream::JsonStream; use crate::plaineventsstream::dyn_events_stream; use crate::streamtimeout::StreamTimeout2; +use crate::streamtimeout::TimeoutableStream; use crate::tcprawclient::OpenBoxedBytesStreamsBox; use futures_util::StreamExt; use items_0::collect_s::CollectableDyn; @@ -37,61 +38,13 @@ pub async fn plain_events_json( timeout_provider: Box, ) -> Result, Error> { debug!("plain_events_json evquery {:?}", evq); - let deadline = Instant::now() + evq.timeout().unwrap_or(Duration::from_millis(4000)); - + let deadline = Instant::now() + evq.timeout_content_or_default(); let stream = dyn_events_stream(evq, ch_conf, ctx, open_bytes).await?; - - let stream = stream.map(move |k| { - on_sitemty_data!(k, |mut k: Box| { - if let Some(j) = k - .as_any_mut() - .downcast_mut::() - { - use items_0::AsAnyMut; - match j { - items_2::channelevents::ChannelEvents::Events(m) => { - if let Some(g) = m - .as_any_mut() - .downcast_mut::>() - { - trace!("consider container EnumVariant"); - let mut out = items_2::eventsdim0enum::EventsDim0Enum::new(); - for (&ts, val) in g.tss.iter().zip(g.values.iter()) { - out.push_back(ts, val.ix(), val.name_string()); - } - let k: Box = Box::new(out); - Ok(StreamItem::DataItem(RangeCompletableItem::Data(k))) - } else { - trace!( - "consider container channel events other events {}", - k.type_name() - ); - let k: Box = Box::new(k); - Ok(StreamItem::DataItem(RangeCompletableItem::Data(k))) - } - } - items_2::channelevents::ChannelEvents::Status(_) => { - trace!( - "consider container channel events status {}", - k.type_name() - ); - let k: Box = Box::new(k); - Ok(StreamItem::DataItem(RangeCompletableItem::Data(k))) - } - } - } else { - trace!("consider container else {}", k.type_name()); - let k: Box = Box::new(k); - Ok(StreamItem::DataItem(RangeCompletableItem::Data(k))) - } - }) - }); - //let stream = PlainEventStream::new(stream); //let stream = EventsToTimeBinnable::new(stream); //let stream = TimeBinnableToCollectable::new(stream); - let stream = Box::pin(stream); debug!("plain_events_json boxed stream created"); + // let stream = Box::pin(stream); let collected = Collect::new( stream, deadline, @@ -122,7 +75,8 @@ pub async fn plain_events_json_stream( ) -> Result { trace!("plain_events_json_stream"); let stream = dyn_events_stream(evq, ch_conf, ctx, open_bytes).await?; - let stream = events_stream_to_json_stream(stream, timeout_provider); + let stream = + events_stream_to_json_stream(stream, evq.timeout_content_or_default(), timeout_provider); let stream = non_empty(stream); let stream = only_first_err(stream); Ok(Box::pin(stream)) diff --git a/src/plaineventsstream.rs b/src/plaineventsstream.rs index 5824744..4337151 100644 --- a/src/plaineventsstream.rs +++ b/src/plaineventsstream.rs @@ -22,14 +22,14 @@ pub enum Error { TcpRawClient(#[from] crate::tcprawclient::Error), } -pub type DynEventsStream = Pin>> + Send>>; +pub type ChannelEventsStream = Pin> + Send>>; pub async fn dyn_events_stream( evq: &PlainEventsQuery, ch_conf: ChannelTypeConfigGen, ctx: &ReqCtx, open_bytes: OpenBoxedBytesStreamsBox, -) -> Result { +) -> Result { trace!("dyn_events_stream {}", evq.summary_short()); use query::api4::events::EventsSubQuerySettings; let subq = make_sub_query( @@ -62,12 +62,6 @@ pub async fn dyn_events_stream( evq.range().try_into()?, evq.one_before_range(), ); - let stream = stream.map(move |k| { - on_sitemty_data!(k, |k| { - let k: Box = Box::new(k); - Ok(StreamItem::DataItem(RangeCompletableItem::Data(k))) - }) - }); if let Some(wasmname) = evq.test_do_wasm() { let stream = transform_wasm::<_, items_0::streamitem::SitemErrTy>(stream, wasmname, ctx).await?; @@ -82,11 +76,11 @@ async fn transform_wasm( stream: INP, _wasmname: &str, _ctx: &ReqCtx, -) -> Result>> + Send, Error> +) -> Result> + Send, Error> where - INP: Stream>> + Send + 'static, + INP: Stream> + Send + 'static, { - let ret: Pin>> + Send>> = Box::pin(stream); + let ret = Box::pin(stream); Ok(ret) } @@ -95,9 +89,9 @@ async fn transform_wasm( stream: INP, wasmname: &str, ctx: &ReqCtx, -) -> Result>> + Send, Error> +) -> Result> + Send, Error> where - INP: Stream>> + Send + 'static, + INP: Stream> + Send + 'static, { debug!("make wasm transform"); use httpclient::url::Url; diff --git a/src/tcprawclient.rs b/src/tcprawclient.rs index ce63b79..279eda4 100644 --- a/src/tcprawclient.rs +++ b/src/tcprawclient.rs @@ -186,6 +186,11 @@ where { let frames = InMemoryFrameStream::new(inp, bufcap); let frames = frames.map_err(sitem_err2_from_string); + let frames = frames.inspect(|x| { + if false { + eprintln!("container_stream_from_bytes_stream see frame {:?}", x); + } + }); // TODO let EventsFromFrames accept also non-boxed input? let frames = Box::pin(frames); let stream = EventsFromFrames::::new(frames, dbgdesc); diff --git a/src/test.rs b/src/test.rs index a7b2594..c383392 100644 --- a/src/test.rs +++ b/src/test.rs @@ -1,6 +1,7 @@ mod collect; mod events; mod events_reader; +mod framing; mod timebin; use futures_util::stream; diff --git a/src/test/events.rs b/src/test/events.rs index f1dbca6..d19e151 100644 --- a/src/test/events.rs +++ b/src/test/events.rs @@ -1,4 +1,4 @@ -use crate::cbor_stream::FramedBytesToSitemtyDynEventsStream; +use crate::cbor_stream::FramedBytesToChannelEventsStream; use crate::firsterr::only_first_err; use crate::frames::inmem::BoxedBytesStream; use crate::lenframed; @@ -66,7 +66,7 @@ async fn merged_events_inner() -> Result<(), Error> { .await .unwrap(); let stream = lenframed::length_framed(stream); - let stream = FramedBytesToSitemtyDynEventsStream::new( + let stream = FramedBytesToChannelEventsStream::new( stream, ch_conf.scalar_type().clone(), ch_conf.shape().clone(), diff --git a/src/test/framing.rs b/src/test/framing.rs new file mode 100644 index 0000000..9996faa --- /dev/null +++ b/src/test/framing.rs @@ -0,0 +1,49 @@ +use crate::frames::frameable_stream_to_bytes_stream; +use crate::tcprawclient::container_stream_from_bytes_stream; +use futures_util::TryStreamExt; +use items_0::streamitem::sitem_err2_from_string; +use items_0::streamitem::RangeCompletableItem; +use items_0::streamitem::Sitemty; +use items_0::streamitem::StreamItem; +use items_2::binning::container_events::ContainerEvents; +use items_2::channelevents::ChannelEvents; +use netpod::ByteSize; +use netpod::TsNano; + +async fn framing_00_inner() -> Result<(), Box> { + let mut evs = ContainerEvents::::new(); + evs.push_back(TsNano::from_ns(1), 1.2); + let cevs = ChannelEvents::from(evs); + let item: Sitemty<_> = Ok(StreamItem::DataItem(RangeCompletableItem::Data(cevs))); + let stream = futures_util::stream::iter([item]); + let stream = frameable_stream_to_bytes_stream(stream); + let stream = stream.map_err(sitem_err2_from_string); + let stream = stream.inspect_ok(|x| { + if false { + let a = &x[0..x.len().min(40)]; + eprintln!("byte blob in stream {} {:?}", x.len(), a); + } + }); + let stream = Box::pin(stream); + let bufcap = ByteSize(1024 * 1024); + let mut stream = + container_stream_from_bytes_stream::(stream, bufcap, "test".into())?; + let mut n = 0; + while let Some(x) = stream.try_next().await? { + if false { + eprintln!("{x:?}"); + } + n += 1; + } + assert_eq!(n, 1); + Ok(()) +} + +#[test] +fn framing_00() { + tokio::runtime::Builder::new_current_thread() + .build() + .unwrap() + .block_on(framing_00_inner()) + .unwrap() +} diff --git a/src/timebinnedjson.rs b/src/timebinnedjson.rs index 30545a8..ae112e6 100644 --- a/src/timebinnedjson.rs +++ b/src/timebinnedjson.rs @@ -1,6 +1,5 @@ use crate::collect::Collect; use crate::collect::CollectResult; -use crate::json_stream::JsonBytes; use crate::json_stream::JsonStream; use crate::rangefilter2::RangeFilter2; use crate::streamtimeout::StreamTimeout2; @@ -20,6 +19,7 @@ use items_0::streamitem::Sitemty; use items_0::streamitem::StreamItem; use items_0::Events; use items_2::channelevents::ChannelEvents; +use items_2::jsonbytes::JsonBytes; use items_2::merger::Merger; use netpod::log::*; use netpod::range::evrange::NanoRange;