From b5ce2dd743d6461bedc760fa0f4bcfb4eca13efc Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Tue, 19 Dec 2023 16:30:06 +0100 Subject: [PATCH] WIP testing cbor output --- crates/daqbuffer/Cargo.toml | 1 + crates/daqbuffer/src/bin/daqbuffer.rs | 12 ++ crates/daqbuffer/src/cli.rs | 11 + crates/daqbuffer/src/fetch.rs | 47 ++++ crates/daqbuffer/src/lib.rs | 1 + crates/httpret/Cargo.toml | 4 +- crates/items_0/src/items_0.rs | 6 + crates/netpod/src/netpod.rs | 26 +++ crates/scyllaconn/Cargo.toml | 2 +- crates/streams/Cargo.toml | 1 + crates/streams/src/cbor.rs | 296 ++++++++++++++++++++++++++ crates/streams/src/firsterr.rs | 32 +++ crates/streams/src/lenframed.rs | 27 +++ crates/streams/src/lib.rs | 3 + crates/streams/src/plaineventscbor.rs | 105 +-------- crates/streams/src/test/events.rs | 31 ++- crates/streams/src/transform.rs | 6 - 17 files changed, 495 insertions(+), 116 deletions(-) create mode 100644 crates/daqbuffer/src/fetch.rs create mode 100644 crates/streams/src/cbor.rs create mode 100644 crates/streams/src/firsterr.rs create mode 100644 crates/streams/src/lenframed.rs diff --git a/crates/daqbuffer/Cargo.toml b/crates/daqbuffer/Cargo.toml index c83116e..2c2614b 100644 --- a/crates/daqbuffer/Cargo.toml +++ b/crates/daqbuffer/Cargo.toml @@ -19,4 +19,5 @@ taskrun = { path = "../taskrun" } netpod = { path = "../netpod" } disk = { path = "../disk" } httpclient = { path = "../httpclient" } +streams = { path = "../streams" } daqbufp2 = { path = "../daqbufp2" } diff --git a/crates/daqbuffer/src/bin/daqbuffer.rs b/crates/daqbuffer/src/bin/daqbuffer.rs index ec459e0..a0115b8 100644 --- a/crates/daqbuffer/src/bin/daqbuffer.rs +++ b/crates/daqbuffer/src/bin/daqbuffer.rs @@ -11,7 +11,9 @@ use netpod::query::CacheUsage; use netpod::NodeConfig; use netpod::NodeConfigCached; use netpod::ProxyConfig; +use netpod::ScalarType; use netpod::ServiceVersion; +use netpod::Shape; use taskrun::tokio; use tokio::fs::File; use tokio::io::AsyncReadExt; @@ -123,6 +125,16 @@ async fn go() -> Result<(), Error> { ) .await?; } + ClientType::CborEvents(opts) => { + daqbuffer::fetch::fetch_cbor( + &opts.url, + ScalarType::from_variant_str(&opts.scalar_type).unwrap(), + Shape::from_dims_str(&opts.shape).unwrap(), + ) + .await + .map_err(|_| Error::with_msg_no_trace("error")) + .unwrap(); + } }, SubCmd::GenerateTestData => { disk::gen::gen_test_data().await?; diff --git a/crates/daqbuffer/src/cli.rs b/crates/daqbuffer/src/cli.rs index 94fa97a..c272d5f 100644 --- a/crates/daqbuffer/src/cli.rs +++ b/crates/daqbuffer/src/cli.rs @@ -42,6 +42,7 @@ pub struct Client { pub enum ClientType { Binned(BinnedClient), Status(StatusClient), + CborEvents(CborEvents), } #[derive(Debug, Parser)] @@ -73,3 +74,13 @@ pub struct BinnedClient { #[arg(long, default_value = "1048576")] pub disk_stats_every_kb: u32, } + +#[derive(Debug, Parser)] +pub struct CborEvents { + #[arg(long)] + pub url: String, + #[arg(long)] + pub scalar_type: String, + #[arg(long)] + pub shape: String, +} diff --git a/crates/daqbuffer/src/fetch.rs b/crates/daqbuffer/src/fetch.rs new file mode 100644 index 0000000..efa11d0 --- /dev/null +++ b/crates/daqbuffer/src/fetch.rs @@ -0,0 +1,47 @@ +use futures_util::future; +use futures_util::StreamExt; +use http::header; +use http::Method; +use http::StatusCode; +use httpclient::body_empty; +use httpclient::connect_client; +use httpclient::http; +use httpclient::hyper::Request; +use httpclient::IncomingStream; +use netpod::log::*; +use netpod::ScalarType; +use netpod::Shape; +use std::fmt; +use streams::cbor::FramedBytesToSitemtyDynEventsStream; +use url::Url; + +pub struct Error {} + +impl From for Error +where + T: fmt::Debug, +{ + fn from(_value: T) -> Self { + Self {} + } +} + +pub async fn fetch_cbor(url: &str, scalar_type: ScalarType, shape: Shape) -> Result<(), Error> { + let url: Url = url.parse().unwrap(); + let accept = "application/cbor"; + let req = Request::builder() + .method(Method::GET) + .uri(url.to_string()) + .header(header::HOST, url.host_str().ok_or_else(|| "NoHostname")?) + .header(header::ACCEPT, accept) + .body(body_empty())?; + let mut send_req = connect_client(req.uri()).await?; + let res = send_req.send_request(req).await?; + let (head, body) = res.into_parts(); + debug!("http_get head {head:?}"); + let stream = IncomingStream::new(body); + let stream = FramedBytesToSitemtyDynEventsStream::new(stream, scalar_type, shape); + let stream = stream.map(|item| info!("{item:?}")); + stream.for_each(|_| future::ready(())).await; + Ok(()) +} diff --git a/crates/daqbuffer/src/lib.rs b/crates/daqbuffer/src/lib.rs index bdf0a8d..b6f6fe7 100644 --- a/crates/daqbuffer/src/lib.rs +++ b/crates/daqbuffer/src/lib.rs @@ -1,2 +1,3 @@ pub mod cli; pub mod err; +pub mod fetch; diff --git a/crates/httpret/Cargo.toml b/crates/httpret/Cargo.toml index 864f92c..95811b4 100644 --- a/crates/httpret/Cargo.toml +++ b/crates/httpret/Cargo.toml @@ -13,7 +13,7 @@ serde_json = "1.0" url = "2.5.0" http = "1.0.0" http-body-util = { version = "0.1.0" } -hyper = { version = "1.0.1", features = ["http1", "http2", "client", "server"] } +hyper = { version = "1.1.0", features = ["http1", "http2", "client", "server"] } hyper-util = { version = "0.1.1", features = ["http1", "http2", "client", "server"] } bytes = "1.5.0" futures-util = "0.3.14" @@ -27,7 +27,7 @@ regex = "1.10.2" rand = "0.8.5" ciborium = "0.2.1" flate2 = "1" -brotli = "2.4" +brotli = "3.4.0" err = { path = "../err" } netpod = { path = "../netpod" } query = { path = "../query" } diff --git a/crates/items_0/src/items_0.rs b/crates/items_0/src/items_0.rs index ed552cb..313929c 100644 --- a/crates/items_0/src/items_0.rs +++ b/crates/items_0/src/items_0.rs @@ -27,6 +27,12 @@ pub trait WithLen { fn len(&self) -> usize; } +impl WithLen for bytes::Bytes { + fn len(&self) -> usize { + self.len() + } +} + pub trait Empty { fn empty() -> Self; } diff --git a/crates/netpod/src/netpod.rs b/crates/netpod/src/netpod.rs index 64aa9a3..2283d46 100644 --- a/crates/netpod/src/netpod.rs +++ b/crates/netpod/src/netpod.rs @@ -232,6 +232,32 @@ impl ScalarType { } } + pub fn from_variant_str(s: &str) -> Result { + use ScalarType::*; + let ret = match s { + "u8" => U8, + "u16" => U16, + "u32" => U32, + "u64" => U64, + "i8" => I8, + "i16" => I16, + "i32" => I32, + "i64" => I64, + "f32" => F32, + "f64" => F64, + "bool" => BOOL, + "string" => STRING, + "ChannelStatus" => ChannelStatus, + _ => { + return Err(Error::with_msg_no_trace(format!( + "from_bsread_str can not understand bsread {:?}", + s + ))) + } + }; + Ok(ret) + } + pub fn to_bsread_str(&self) -> &'static str { use ScalarType::*; match self { diff --git a/crates/scyllaconn/Cargo.toml b/crates/scyllaconn/Cargo.toml index 2cf777e..dc26de9 100644 --- a/crates/scyllaconn/Cargo.toml +++ b/crates/scyllaconn/Cargo.toml @@ -10,7 +10,7 @@ path = "src/scyllaconn.rs" [dependencies] futures-util = "0.3.24" async-channel = "1.9.0" -scylla = "0.10.1" +scylla = "0.11.0" err = { path = "../err" } netpod = { path = "../netpod" } query = { path = "../query" } diff --git a/crates/streams/Cargo.toml b/crates/streams/Cargo.toml index 61657cf..8fdb3ce 100644 --- a/crates/streams/Cargo.toml +++ b/crates/streams/Cargo.toml @@ -11,6 +11,7 @@ pin-project = "1.0.12" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" serde_cbor = "0.11.1" +typetag = "0.2.14" ciborium = "0.2.1" bytes = "1.3" arrayref = "0.3.6" diff --git a/crates/streams/src/cbor.rs b/crates/streams/src/cbor.rs new file mode 100644 index 0000000..4a456c9 --- /dev/null +++ b/crates/streams/src/cbor.rs @@ -0,0 +1,296 @@ +use bytes::Buf; +use bytes::BufMut; +use bytes::Bytes; +use bytes::BytesMut; +use err::Error; +use futures_util::Stream; +use futures_util::StreamExt; +use items_0::streamitem::LogItem; +use items_0::streamitem::RangeCompletableItem; +use items_0::streamitem::Sitemty; +use items_0::streamitem::StreamItem; +use items_0::Events; +use items_0::WithLen; +use items_2::eventsdim0::EventsDim0; +use items_2::eventsdim1::EventsDim1; +use netpod::log::Level; +use netpod::log::*; +use netpod::ScalarType; +use netpod::Shape; +use std::fmt; +use std::io::Cursor; +use std::pin::Pin; +use std::task::Context; +use std::task::Poll; + +trait ErrConv { + fn ec(self) -> Result; +} + +impl ErrConv for Result> +where + K: fmt::Debug, +{ + fn ec(self) -> Result { + self.map_err(|e| Error::from_string(format!("{e}"))) + } +} + +pub struct CborBytes(Bytes); + +impl CborBytes { + pub fn into_inner(self) -> Bytes { + self.0 + } + + pub fn len(&self) -> u32 { + self.0.len() as _ + } +} + +impl WithLen for CborBytes { + fn len(&self) -> usize { + self.len() as usize + } +} + +impl From for Bytes { + fn from(value: CborBytes) -> Self { + value.0 + } +} + +pub type CborStream = Pin> + Send>>; + +pub type SitemtyDynEventsStream = + Pin>>, Error>> + Send>>; + +pub fn events_stream_to_cbor_stream(stream: SitemtyDynEventsStream) -> impl Stream> { + let stream = stream.map(|x| match x { + Ok(x) => match x { + StreamItem::DataItem(x) => match x { + RangeCompletableItem::Data(evs) => { + if false { + use items_0::AsAnyRef; + // TODO impl generically on EventsDim0 ? + if let Some(evs) = evs.as_any_ref().downcast_ref::>() { + let mut buf = Vec::new(); + ciborium::into_writer(evs, &mut buf) + .map_err(|e| Error::with_msg_no_trace(format!("{e}")))?; + let bytes = Bytes::from(buf); + let _item = CborBytes(bytes); + // Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))) + } else { + let _item = LogItem::from_node(0, Level::DEBUG, format!("cbor stream discarded item")); + // Ok(StreamItem::Log(item)) + }; + } + let buf = evs.to_cbor_vec_u8(); + let bytes = Bytes::from(buf); + let item = CborBytes(bytes); + Ok(item) + } + RangeCompletableItem::RangeComplete => { + use ciborium::cbor; + let item = cbor!({ + "rangeFinal" => true, + }) + .map_err(Error::from_string)?; + let mut buf = Vec::with_capacity(64); + ciborium::into_writer(&item, &mut buf).map_err(Error::from_string)?; + let bytes = Bytes::from(buf); + let item = CborBytes(bytes); + Ok(item) + } + }, + StreamItem::Log(item) => { + info!("{item:?}"); + let item = CborBytes(Bytes::new()); + Ok(item) + } + StreamItem::Stats(item) => { + info!("{item:?}"); + let item = CborBytes(Bytes::new()); + Ok(item) + } + }, + Err(e) => { + use ciborium::cbor; + let item = cbor!({ + "error" => e.to_string(), + }) + .map_err(Error::from_string)?; + let mut buf = Vec::with_capacity(64); + ciborium::into_writer(&item, &mut buf).map_err(Error::from_string)?; + let bytes = Bytes::from(buf); + let item = CborBytes(bytes); + Ok(item) + } + }); + stream +} + +pub struct FramedBytesToSitemtyDynEventsStream { + inp: S, + scalar_type: ScalarType, + shape: Shape, + buf: BytesMut, +} + +impl FramedBytesToSitemtyDynEventsStream { + pub fn new(inp: S, scalar_type: ScalarType, shape: Shape) -> Self { + Self { + inp, + scalar_type, + shape, + buf: BytesMut::with_capacity(1024 * 64), + } + } + + fn try_parse(&mut self) -> Result>>, Error> { + // debug!("try_parse {}", self.buf.len()); + if self.buf.len() < 4 { + return Ok(None); + } + let n = u32::from_le_bytes(self.buf[..4].try_into()?); + if n > 1024 * 1024 * 40 { + let e = Error::with_msg_no_trace(format!("frame too large {n}")); + error!("{e}"); + return Err(e); + } + if self.buf.len() < 4 + n as usize { + // debug!("not enough {} {}", n, self.buf.len()); + return Ok(None); + } + let buf = &self.buf[4..4 + n as usize]; + let val: ciborium::Value = ciborium::from_reader(std::io::Cursor::new(buf)).map_err(Error::from_string)?; + // debug!("decoded ciborium value {val:?}"); + let item = if let Some(map) = val.as_map() { + if let Some(x) = map.get(0) { + if let Some(y) = x.0.as_text() { + if y == "rangeFinal" { + if let Some(y) = x.1.as_bool() { + if y { + Some(StreamItem::DataItem( + RangeCompletableItem::>::RangeComplete, + )) + } else { + None + } + } else { + None + } + } else { + None + } + } else { + None + } + } else { + None + } + } else { + None + }; + let item = if let Some(x) = item { + Some(x) + } else { + let item = decode_cbor_to_box_events(buf, &self.scalar_type, &self.shape)?; + Some(StreamItem::DataItem(RangeCompletableItem::Data(item))) + }; + self.buf.advance(4 + n as usize); + if let Some(x) = item { + Ok(Some(Ok(x))) + } else { + let item = LogItem::from_node(0, Level::DEBUG, format!("decoded ciborium Value")); + Ok(Some(Ok(StreamItem::Log(item)))) + } + } +} + +impl Stream for FramedBytesToSitemtyDynEventsStream +where + S: Stream> + Unpin, +{ + type Item = ::Item; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + use Poll::*; + loop { + break match self.try_parse() { + Ok(Some(x)) => Ready(Some(x)), + Ok(None) => match self.inp.poll_next_unpin(cx) { + Ready(Some(x)) => match x { + Ok(x) => { + self.buf.put_slice(&x); + continue; + } + Err(e) => Ready(Some(Err(e))), + }, + Ready(None) => { + if self.buf.len() > 0 { + warn!("remaining bytes in input buffer, input closed len {}", self.buf.len()); + } + Ready(None) + } + Pending => Pending, + }, + Err(e) => Ready(Some(Err(e))), + }; + } + } +} + +macro_rules! cbor_scalar { + ($ty:ident, $buf:expr) => {{ + type T = $ty; + type C = EventsDim0; + let item: C = ciborium::from_reader(Cursor::new($buf)).ec()?; + Box::new(item) + }}; +} + +macro_rules! cbor_wave { + ($ty:ident, $buf:expr) => {{ + type T = $ty; + type C = EventsDim1; + let item: C = ciborium::from_reader(Cursor::new($buf)).ec()?; + Box::new(item) + }}; +} + +fn decode_cbor_to_box_events(buf: &[u8], scalar_type: &ScalarType, shape: &Shape) -> Result, Error> { + let item: Box = match shape { + Shape::Scalar => match scalar_type { + ScalarType::U8 => cbor_scalar!(u8, buf), + ScalarType::U16 => cbor_scalar!(u16, buf), + ScalarType::U32 => cbor_scalar!(u32, buf), + ScalarType::U64 => cbor_scalar!(u64, buf), + ScalarType::I8 => cbor_scalar!(i8, buf), + ScalarType::I16 => cbor_scalar!(i16, buf), + ScalarType::I32 => cbor_scalar!(i32, buf), + ScalarType::I64 => cbor_scalar!(i64, buf), + ScalarType::F32 => cbor_scalar!(f32, buf), + ScalarType::F64 => cbor_scalar!(f64, buf), + _ => { + return Err(Error::from_string(format!( + "decode_cbor_to_box_events {:?} {:?}", + scalar_type, shape + ))) + } + }, + Shape::Wave(_) => match scalar_type { + ScalarType::U8 => cbor_wave!(u8, buf), + ScalarType::U16 => cbor_wave!(u16, buf), + ScalarType::I64 => cbor_wave!(i64, buf), + _ => { + return Err(Error::from_string(format!( + "decode_cbor_to_box_events {:?} {:?}", + scalar_type, shape + ))) + } + }, + Shape::Image(_, _) => todo!(), + }; + Ok(item) +} diff --git a/crates/streams/src/firsterr.rs b/crates/streams/src/firsterr.rs new file mode 100644 index 0000000..e202a00 --- /dev/null +++ b/crates/streams/src/firsterr.rs @@ -0,0 +1,32 @@ +use crate::cbor::CborBytes; +use futures_util::future; +use futures_util::Stream; +use futures_util::StreamExt; + +pub fn non_empty(inp: S) -> impl Stream> +where + S: Stream>, +{ + inp.filter(|x| { + future::ready(match x { + Ok(x) => x.len() > 0, + Err(_) => true, + }) + }) +} + +pub fn only_first_err(inp: S) -> impl Stream> +where + S: Stream>, +{ + inp.take_while({ + let mut state = true; + move |x| { + let ret = state; + if x.is_err() { + state = false; + } + future::ready(ret) + } + }) +} diff --git a/crates/streams/src/lenframed.rs b/crates/streams/src/lenframed.rs new file mode 100644 index 0000000..e13ff34 --- /dev/null +++ b/crates/streams/src/lenframed.rs @@ -0,0 +1,27 @@ +use bytes::BufMut; +use bytes::Bytes; +use bytes::BytesMut; +use futures_util::future; +use futures_util::stream; +use futures_util::Stream; +use futures_util::StreamExt; +use items_0::WithLen; + +pub fn length_framed(inp: S) -> impl Stream> +where + S: Stream>, + T: WithLen + Into, +{ + inp.map(|x| match x { + Ok(x) => { + let n = x.len() as u32; + let mut buf1 = BytesMut::with_capacity(8); + buf1.put_u32_le(n); + [Some(Ok(buf1.freeze())), Some(Ok(x.into()))] + } + Err(e) => [Some(Err(e)), None], + }) + .map(|x| stream::iter(x)) + .flatten() + .filter_map(|x| future::ready(x)) +} diff --git a/crates/streams/src/lib.rs b/crates/streams/src/lib.rs index 00f21d9..d799082 100644 --- a/crates/streams/src/lib.rs +++ b/crates/streams/src/lib.rs @@ -1,10 +1,13 @@ pub mod boxed; +pub mod cbor; pub mod collect; pub mod dtflags; pub mod filechunkread; +pub mod firsterr; pub mod frames; pub mod generators; pub mod itemclone; +pub mod lenframed; pub mod needminbuffer; pub mod plaineventscbor; pub mod plaineventsjson; diff --git a/crates/streams/src/plaineventscbor.rs b/crates/streams/src/plaineventscbor.rs index 9df16f6..50621b2 100644 --- a/crates/streams/src/plaineventscbor.rs +++ b/crates/streams/src/plaineventscbor.rs @@ -1,30 +1,14 @@ +use crate::cbor::events_stream_to_cbor_stream; +use crate::cbor::CborStream; +use crate::firsterr::non_empty; +use crate::firsterr::only_first_err; use crate::plaineventsstream::dyn_events_stream; use crate::tcprawclient::OpenBoxedBytesStreamsBox; -use bytes::Bytes; use err::Error; -use futures_util::future; -use futures_util::Stream; -use futures_util::StreamExt; -use items_0::streamitem::LogItem; -use items_0::streamitem::RangeCompletableItem; -use items_0::streamitem::StreamItem; -use netpod::log::Level; use netpod::log::*; use netpod::ChannelTypeConfigGen; -use netpod::NodeConfigCached; use netpod::ReqCtx; use query::api4::events::PlainEventsQuery; -use std::pin::Pin; - -pub struct CborBytes(Bytes); - -impl CborBytes { - pub fn into_inner(self) -> Bytes { - self.0 - } -} - -pub type CborStream = Pin> + Send>>; pub async fn plain_events_cbor( evq: &PlainEventsQuery, @@ -33,83 +17,8 @@ pub async fn plain_events_cbor( open_bytes: OpenBoxedBytesStreamsBox, ) -> Result { let stream = dyn_events_stream(evq, ch_conf, ctx, open_bytes).await?; - let stream = stream - .map(|x| match x { - Ok(x) => match x { - StreamItem::DataItem(x) => match x { - RangeCompletableItem::Data(evs) => { - if false { - use items_0::AsAnyRef; - // TODO impl generically on EventsDim0 ? - if let Some(evs) = evs.as_any_ref().downcast_ref::>() { - let mut buf = Vec::new(); - ciborium::into_writer(evs, &mut buf) - .map_err(|e| Error::with_msg_no_trace(format!("{e}")))?; - let bytes = Bytes::from(buf); - let _item = CborBytes(bytes); - // Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))) - } else { - let _item = LogItem::from_node(0, Level::DEBUG, format!("cbor stream discarded item")); - // Ok(StreamItem::Log(item)) - }; - } - let buf = evs.to_cbor_vec_u8(); - let bytes = Bytes::from(buf); - let item = CborBytes(bytes); - Ok(item) - } - RangeCompletableItem::RangeComplete => { - use ciborium::cbor; - let item = cbor!({ - "rangeFinal" => true, - }) - .map_err(Error::from_string)?; - let mut buf = Vec::with_capacity(64); - ciborium::into_writer(&item, &mut buf).map_err(Error::from_string)?; - let bytes = Bytes::from(buf); - let item = CborBytes(bytes); - Ok(item) - } - }, - StreamItem::Log(item) => { - info!("{item:?}"); - let item = CborBytes(Bytes::new()); - Ok(item) - } - StreamItem::Stats(item) => { - info!("{item:?}"); - let item = CborBytes(Bytes::new()); - Ok(item) - } - }, - Err(e) => { - use ciborium::cbor; - let item = cbor!({ - "error" => e.to_string(), - }) - .map_err(Error::from_string)?; - let mut buf = Vec::with_capacity(64); - ciborium::into_writer(&item, &mut buf).map_err(Error::from_string)?; - let bytes = Bytes::from(buf); - let item = CborBytes(bytes); - Ok(item) - } - }) - .filter(|x| { - future::ready(match x { - Ok(x) => x.0.len() > 0, - Err(_) => true, - }) - }) - .take_while({ - let mut state = true; - move |x| { - let ret = state; - if x.is_err() { - state = false; - } - future::ready(ret) - } - }); + let stream = events_stream_to_cbor_stream(stream); + let stream = non_empty(stream); + let stream = only_first_err(stream); Ok(Box::pin(stream)) } diff --git a/crates/streams/src/test/events.rs b/crates/streams/src/test/events.rs index f99f862..8830093 100644 --- a/crates/streams/src/test/events.rs +++ b/crates/streams/src/test/events.rs @@ -1,10 +1,15 @@ +use crate::cbor::FramedBytesToSitemtyDynEventsStream; +use crate::firsterr::only_first_err; use crate::frames::inmem::BoxedBytesStream; +use crate::lenframed; use crate::plaineventscbor::plain_events_cbor; use crate::tcprawclient::OpenBoxedBytesStreams; use crate::tcprawclient::TEST_BACKEND; use err::Error; +use futures_util::future; use futures_util::Future; use futures_util::StreamExt; +use netpod::log::*; use netpod::range::evrange::NanoRange; use netpod::range::evrange::SeriesRange; use netpod::ChConf; @@ -23,22 +28,30 @@ fn merged_events_cbor() { async fn merged_events_inner() -> Result<(), Error> { let ctx = ReqCtx::for_test(); - let ch_conf = ChConf::new(TEST_BACKEND, 1, ScalarType::F64, Shape::Scalar, "test-gen-i32-dim0-v00"); + // TODO factor out the channel config lookup such that the test code can use a similar code path, + // except that we don't want to go over the network here. + let ch_conf = ChConf::new(TEST_BACKEND, 1, ScalarType::I32, Shape::Scalar, "test-gen-i32-dim0-v00"); let channel = SfDbChannel::from_name(ch_conf.backend(), ch_conf.name()); let range = SeriesRange::TimeRange(NanoRange::from_date_time( "2023-12-18T05:10:00Z".parse().unwrap(), - "2023-12-18T05:10:10Z".parse().unwrap(), + "2023-12-18T05:12:00Z".parse().unwrap(), )); let evq = PlainEventsQuery::new(channel, range); let open_bytes = StreamOpener::new(); let open_bytes = Box::pin(open_bytes); - let mut res = plain_events_cbor(&evq, ch_conf.into(), &ctx, open_bytes).await.unwrap(); - // TODO parse the cbor stream and assert - while let Some(x) = res.next().await { - let item = x?; - let bytes = item.into_inner(); - eprintln!("bytes len {}", bytes.len()); - } + let stream = plain_events_cbor(&evq, ch_conf.clone().into(), &ctx, open_bytes) + .await + .unwrap(); + let stream = lenframed::length_framed(stream); + let stream = + FramedBytesToSitemtyDynEventsStream::new(stream, ch_conf.scalar_type().clone(), ch_conf.shape().clone()); + let stream = only_first_err(stream); + stream + .for_each(|item| { + debug!("{item:?}"); + future::ready(()) + }) + .await; Ok(()) } diff --git a/crates/streams/src/transform.rs b/crates/streams/src/transform.rs index 1beaa1f..0db3178 100644 --- a/crates/streams/src/transform.rs +++ b/crates/streams/src/transform.rs @@ -163,12 +163,6 @@ pub fn build_full_transform_collectable( // TODO this must return a Stream! //let evs = build_event_transform(tr, inp)?; let trtb = tr.get_tr_time_binning(); - use futures_util::Stream; - use items_0::collect_s::Collectable; - use items_0::streamitem::RangeCompletableItem; - use items_0::streamitem::Sitemty; - use items_0::streamitem::StreamItem; - use std::pin::Pin; let a: Pin>> + Send>> = Box::pin(inp.0.map(|item| match item { Ok(item) => match item { StreamItem::DataItem(item) => match item {