From 71f5c3c763da0e243f19ab5b75d07473bcfe84c8 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Wed, 4 Dec 2024 12:15:29 +0100 Subject: [PATCH] Support container output format --- src/cbor_stream.rs | 13 ++--- src/collect.rs | 7 ++- src/events/convertforbinning.rs | 16 +++--- src/frames/inmem.rs | 96 ++++++--------------------------- src/json_stream.rs | 12 ++--- src/lenframe.rs | 4 +- src/plaineventsjson.rs | 12 +++-- src/plaineventsstream.rs | 29 ++++------ src/rangefilter2.rs | 2 +- src/timebinnedjson.rs | 36 +++++++------ 10 files changed, 81 insertions(+), 146 deletions(-) diff --git a/src/cbor_stream.rs b/src/cbor_stream.rs index 024ce76..502ad95 100644 --- a/src/cbor_stream.rs +++ b/src/cbor_stream.rs @@ -7,6 +7,7 @@ use bytes::Bytes; use bytes::BytesMut; use futures_util::Stream; use futures_util::StreamExt; +use items_0::apitypes::ToUserFacingApiType; use items_0::streamitem::sitem_err2_from_string; use items_0::streamitem::sitem_err_from_string; use items_0::streamitem::LogItem; @@ -14,7 +15,6 @@ use items_0::streamitem::RangeCompletableItem; use items_0::streamitem::Sitemty; use items_0::streamitem::StreamItem; use items_0::Events; -use items_0::WithLen; use items_2::channelevents::ChannelEvents; use items_2::jsonbytes::CborBytes; use netpod::log::Level; @@ -66,15 +66,16 @@ pub fn events_stream_to_cbor_stream( stream } -fn map_events(x: Sitemty) -> Result { +fn map_events(x: Sitemty) -> Result +where + T: ToUserFacingApiType, +{ match x { Ok(x) => match x { StreamItem::DataItem(x) => match x { RangeCompletableItem::Data(evs) => { - trace!("map_events Data evs len {}", evs.len()); - use items_0::apitypes::ToUserFacingApiType; - let val = evs.to_user_facing_api_type(); - let val = val.to_cbor_value()?; + let val = evs.into_user_facing_api_type(); + let val = val.into_serializable(); let mut buf = Vec::with_capacity(64); ciborium::into_writer(&val, &mut buf).map_err(|e| Error::Msg(e.to_string()))?; let bytes = Bytes::from(buf); diff --git a/src/collect.rs b/src/collect.rs index b7d6a8e..b444f7b 100644 --- a/src/collect.rs +++ b/src/collect.rs @@ -1,4 +1,5 @@ use crate::streamtimeout::StreamTimeout2; +use core::fmt; use futures_util::Future; use futures_util::FutureExt; use futures_util::Stream; @@ -40,7 +41,11 @@ where } } -pub enum CollectResult { +#[derive(Debug)] +pub enum CollectResult +where + T: fmt::Debug, +{ Timeout, Some(T), } diff --git a/src/events/convertforbinning.rs b/src/events/convertforbinning.rs index 71963bd..d8fa9cf 100644 --- a/src/events/convertforbinning.rs +++ b/src/events/convertforbinning.rs @@ -34,7 +34,7 @@ impl Stream for ConvertForBinning { .downcast_ref::>() { let mut dst = ContainerEvents::new(); - for (&ts, val) in evs.iter_zip() { + for (ts, val) in evs.iter_zip() { dst.push_back(ts, val.ix); } let item = Ok(DataItem(Data(ChannelEvents::Events(Box::new(dst))))); @@ -43,7 +43,7 @@ impl Stream for ConvertForBinning { evs.as_any_ref().downcast_ref::>() { let mut dst = ContainerEvents::new(); - for (&ts, val) in evs.iter_zip() { + for (ts, val) in evs.iter_zip() { dst.push_back(ts, val as u8); } let item = Ok(DataItem(Data(ChannelEvents::Events(Box::new(dst))))); @@ -52,7 +52,7 @@ impl Stream for ConvertForBinning { evs.as_any_ref().downcast_ref::>() { let mut dst = ContainerEvents::new(); - for (&ts, _) in evs.iter_zip() { + for (ts, _) in evs.iter_zip() { dst.push_back(ts, 1); } let item = Ok(DataItem(Data(ChannelEvents::Events(Box::new(dst))))); @@ -96,7 +96,7 @@ impl Stream for ConvertForTesting { let s = String::from_utf8_lossy(&buf); if s.contains("u8") { let mut dst = Cont::new(); - for (&ts, val) in evs.iter_zip() { + for (ts, val) in evs.iter_zip() { let v = (val * 1e6) as u8; dst.push_back(ts, v); } @@ -104,7 +104,7 @@ impl Stream for ConvertForTesting { Ready(Some(item)) } else if s.contains("i16") { let mut dst = Cont::new(); - for (&ts, val) in evs.iter_zip() { + for (ts, val) in evs.iter_zip() { let v = (val * 1e6) as i16 - 50; dst.push_back(ts, v); } @@ -112,7 +112,7 @@ impl Stream for ConvertForTesting { Ready(Some(item)) } else if s.contains("bool") { let mut dst = Cont::new(); - for (&ts, val) in evs.iter_zip() { + for (ts, val) in evs.iter_zip() { let g = u64::from_ne_bytes(val.to_ne_bytes()); let val = g % 2 == 0; dst.push_back(ts, val); @@ -121,7 +121,7 @@ impl Stream for ConvertForTesting { Ready(Some(item)) } else if s.contains("enum") { let mut dst = Cont::new(); - for (&ts, val) in evs.iter_zip() { + for (ts, val) in evs.iter_zip() { let buf = val.to_ne_bytes(); let h = buf[0] ^ buf[1] @@ -138,7 +138,7 @@ impl Stream for ConvertForTesting { Ready(Some(item)) } else if s.contains("string") { let mut dst = Cont::new(); - for (&ts, val) in evs.iter_zip() { + for (ts, val) in evs.iter_zip() { dst.push_back(ts, val.to_string()); } let item = Ok(DataItem(Data(ChannelEvents::Events(Box::new(dst))))); diff --git a/src/frames/inmem.rs b/src/frames/inmem.rs index 4f48b59..ba688f0 100644 --- a/src/frames/inmem.rs +++ b/src/frames/inmem.rs @@ -1,3 +1,4 @@ +use crate::log::*; use crate::slidebuf::SlideBuf; use bytes::Bytes; use futures_util::pin_mut; @@ -8,11 +9,8 @@ use items_0::streamitem::SitemErrTy; use items_0::streamitem::Sitemty; use items_0::streamitem::StreamItem; use items_0::streamitem::TERM_FRAME_TYPE_ID; -use items_2::framable::INMEM_FRAME_FOOT; use items_2::framable::INMEM_FRAME_HEAD; -use items_2::framable::INMEM_FRAME_MAGIC; use items_2::inmem::InMemoryFrame; -use netpod::log::*; use netpod::ByteSize; use std::pin::Pin; use std::task::Context; @@ -31,6 +29,7 @@ pub enum Error { TryFromSlice(#[from] std::array::TryFromSliceError), BadCrc, EnoughInputNothingParsed, + InMemParse(#[from] items_2::inmem::Error), } pub type BoxedBytesStream = Pin> + Send>>; @@ -76,9 +75,6 @@ where fn poll_upstream(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { trace2!("poll_upstream"); use Poll::*; - // use tokio::io::AsyncRead; - // use tokio::io::ReadBuf; - // let mut buf = ReadBuf::new(self.buf.available_writable_area(self.need_min.saturating_sub(self.buf.len()))?); let inp = &mut self.inp; pin_mut!(inp); match inp.poll_next(cx) { @@ -94,16 +90,6 @@ where Ready(None) => Ready(Ok(0)), Pending => Pending, } - // match AsyncRead::poll_read(inp, cx, &mut buf) { - // Ready(Ok(())) => { - // let n = buf.filled().len(); - // self.buf.wadv(n)?; - // trace2!("recv bytes {}", n); - // Ready(Ok(n)) - // } - // Ready(Err(e)) => Ready(Err(e.into())), - // Pending => Pending, - // } } // Try to consume bytes to parse a frame. @@ -114,70 +100,22 @@ where if buf.len() < self.need_min { return Err(Error::LessThanNeedMin); } - if buf.len() < INMEM_FRAME_HEAD { - return Err(Error::LessThanHeader); + use items_2::inmem::ParseResult; + match InMemoryFrame::parse(buf) { + Ok(x) => match x { + ParseResult::NotEnoughData(n) => { + self.need_min = n; + Ok(None) + } + ParseResult::Parsed(lentot, val) => { + self.buf.adv(lentot)?; + self.need_min = INMEM_FRAME_HEAD; + self.inp_bytes_consumed += lentot as u64; + Ok(Some(val)) + } + }, + Err(e) => Err(e.into()), } - let magic = u32::from_le_bytes(buf[0..4].try_into()?); - let encid = u32::from_le_bytes(buf[4..8].try_into()?); - let tyid = u32::from_le_bytes(buf[8..12].try_into()?); - let len = u32::from_le_bytes(buf[12..16].try_into()?); - let payload_crc_exp = u32::from_le_bytes(buf[16..20].try_into()?); - if magic != INMEM_FRAME_MAGIC { - let n = buf.len().min(64); - let u = String::from_utf8_lossy(&buf[0..n]); - let msg = format!( - "InMemoryFrameAsyncReadStream tryparse incorrect magic: {} buf as utf8: {:?}", - magic, u - ); - error!("{msg}"); - return Err(Error::BadMagic(magic)); - } - if len > 1024 * 1024 * 50 { - let msg = format!( - "InMemoryFrameAsyncReadStream tryparse huge buffer len {} self.inp_bytes_consumed {}", - len, self.inp_bytes_consumed - ); - error!("{msg}"); - return Err(Error::HugeFrame(len)); - } - let lentot = INMEM_FRAME_HEAD + INMEM_FRAME_FOOT + len as usize; - if buf.len() < lentot { - // TODO count cases in production - self.need_min = lentot; - return Ok(None); - } - let p1 = INMEM_FRAME_HEAD + len as usize; - let mut h = crc32fast::Hasher::new(); - h.update(&buf[..p1]); - let frame_crc = h.finalize(); - let mut h = crc32fast::Hasher::new(); - h.update(&buf[INMEM_FRAME_HEAD..p1]); - let payload_crc = h.finalize(); - let frame_crc_ind = u32::from_le_bytes(buf[p1..p1 + 4].try_into()?); - let payload_crc_match = payload_crc_exp == payload_crc; - let frame_crc_match = frame_crc_ind == frame_crc; - if !frame_crc_match || !payload_crc_match { - let _ss = String::from_utf8_lossy(&buf[..buf.len().min(256)]); - let msg = format!( - "InMemoryFrameAsyncReadStream tryparse crc mismatch A {} {}", - payload_crc_match, frame_crc_match, - ); - error!("{msg}"); - let e = Error::BadCrc; - return Err(e); - } - self.inp_bytes_consumed += lentot as u64; - // TODO metrics - //trace!("parsed frame well len {}", len); - let ret = InMemoryFrame { - len, - tyid, - encid, - buf: Bytes::from(buf[INMEM_FRAME_HEAD..p1].to_vec()), - }; - self.buf.adv(lentot)?; - self.need_min = INMEM_FRAME_HEAD; - Ok(Some(ret)) } } diff --git a/src/json_stream.rs b/src/json_stream.rs index a957429..a8ad3eb 100644 --- a/src/json_stream.rs +++ b/src/json_stream.rs @@ -3,17 +3,12 @@ use crate::streamtimeout::StreamTimeout2; use crate::streamtimeout::TimeoutableStream; use futures_util::Stream; use futures_util::StreamExt; -use items_0::collect_s::ToJsonValue; +use items_0::apitypes::ToUserFacingApiType; use items_0::streamitem::RangeCompletableItem; use items_0::streamitem::Sitemty; use items_0::streamitem::StreamItem; -use items_0::Events; -use items_0::WithLen; -use items_2::binning::container_events::ContainerEvents; -use items_2::channelevents::ChannelEvents; use items_2::jsonbytes::JsonBytes; use netpod::log::*; -use netpod::EnumVariant; use std::pin::Pin; use std::time::Duration; @@ -54,13 +49,14 @@ pub fn events_stream_to_json_stream( fn map_events(x: Sitemty) -> Result where - T: ToJsonValue, + T: ToUserFacingApiType, { match x { Ok(x) => match x { StreamItem::DataItem(x) => match x { RangeCompletableItem::Data(evs) => { - let val = evs.to_json_value()?; + let val = evs.into_user_facing_api_type(); + let val = val.into_serializable_json(); let s = serde_json::to_string(&val)?; let item = JsonBytes::new(s); Ok(item) diff --git a/src/lenframe.rs b/src/lenframe.rs index 92e054a..2e1bdea 100644 --- a/src/lenframe.rs +++ b/src/lenframe.rs @@ -28,7 +28,7 @@ where stream::iter([Ok(b2.freeze()), Ok(buf), Ok(b3.freeze())]) } Err(e) => { - error!("{e}"); + error!("{}", e); stream::iter([Ok(Bytes::new()), Ok(Bytes::new()), Ok(Bytes::new())]) } }) @@ -58,7 +58,7 @@ where stream::iter([Ok::<_, E>(b2), Ok(s), Ok(String::from("\n"))]) } Err(e) => { - error!("{e}"); + error!("{}", e); stream::iter([Ok(String::new()), Ok(String::new()), Ok(String::new())]) } }) diff --git a/src/plaineventsjson.rs b/src/plaineventsjson.rs index 43339f5..0750002 100644 --- a/src/plaineventsjson.rs +++ b/src/plaineventsjson.rs @@ -11,6 +11,7 @@ use crate::tcprawclient::OpenBoxedBytesStreamsBox; use futures_util::StreamExt; use items_0::collect_s::CollectableDyn; use items_0::on_sitemty_data; +use items_2::jsonbytes::JsonBytes; use netpod::log::*; use netpod::ChannelTypeConfigGen; use netpod::Cluster; @@ -36,7 +37,7 @@ pub async fn plain_events_json( _cluster: &Cluster, open_bytes: OpenBoxedBytesStreamsBox, timeout_provider: Box, -) -> Result, Error> { +) -> Result, Error> { debug!("plain_events_json evquery {:?}", evq); let deadline = Instant::now() + evq.timeout_content_or_default(); let stream = dyn_events_stream(evq, ch_conf, ctx, open_bytes).await?; @@ -55,12 +56,13 @@ pub async fn plain_events_json( timeout_provider, ) .await?; - debug!("plain_events_json collected"); + warn!("plain_events_json collected {:?}", collected); if let CollectResult::Some(x) = collected { - let x = x.to_user_facing_api_type_box(); - let jsval = x.to_json_value()?; + let x = x.into_user_facing_api_type_box(); + let val = x.into_serializable_json(); + let jsval = serde_json::to_string(&val)?; debug!("plain_events_json json serialized"); - Ok(CollectResult::Some(jsval)) + Ok(CollectResult::Some(JsonBytes::new(jsval))) } else { debug!("plain_events_json timeout"); Ok(CollectResult::Timeout) diff --git a/src/plaineventsstream.rs b/src/plaineventsstream.rs index b22b66e..f010b5a 100644 --- a/src/plaineventsstream.rs +++ b/src/plaineventsstream.rs @@ -124,6 +124,7 @@ where { debug!("make wasm transform"); use httpclient::url::Url; + use items_2::binning::container_events::ContainerEvents; use wasmer::Value; use wasmer::WasmSlice; let t = httpclient::http_get( @@ -157,46 +158,35 @@ where if true { let r1 = evs .as_any_mut() - .downcast_mut::>() + .downcast_mut::>() .is_some(); let r2 = evs - .as_mut() .as_any_mut() - .downcast_mut::>() + .downcast_mut::>>() .is_some(); let r3 = evs - .as_any_mut() - .downcast_mut::>>() - .is_some(); - let r4 = evs - .as_mut() - .as_any_mut() - .downcast_mut::>>() - .is_some(); - let r5 = evs .as_mut() .as_any_mut() .downcast_mut::() .is_some(); - let r6 = evs + let r4 = evs .as_mut() .as_any_mut() .downcast_mut::>() .is_some(); - debug!("wasm castings: {r1} {r2} {r3} {r4} {r5} {r6}"); + debug!("wasm castings: {r1} {r2} {r3} {r4}"); } if let Some(evs) = evs.as_any_mut().downcast_mut::() { match evs { ChannelEvents::Events(evs) => { - if let Some(evs) = evs - .as_any_mut() - .downcast_mut::>() + if let Some(evs) = + evs.as_any_mut().downcast_mut::>() { use items_0::WithLen; if evs.len() == 0 { - debug!("wasm empty EventsDim0"); + debug!("wasm empty"); } else { - debug!("wasm see EventsDim0"); + debug!("wasm see"); let max_len_needed = 16000; let dummy1 = instance.exports.get_function("dummy1").unwrap(); let s = evs.values.as_mut_slices(); @@ -252,7 +242,6 @@ where }; Ok(StreamItem::DataItem(RangeCompletableItem::Data(x))) }); - // Box::new(item) as Box item }); let ret: Pin>> + Send>> = Box::pin(stream); diff --git a/src/rangefilter2.rs b/src/rangefilter2.rs index 8543859..3dfc077 100644 --- a/src/rangefilter2.rs +++ b/src/rangefilter2.rs @@ -247,7 +247,7 @@ where } Ok(None) => continue, Err(e) => { - error!("sees: {e}"); + error!("sees: {}", e); self.inp_done = true; Ready(Some(sitem_err_from_string(e))) } diff --git a/src/timebinnedjson.rs b/src/timebinnedjson.rs index 92e8f49..5bda731 100644 --- a/src/timebinnedjson.rs +++ b/src/timebinnedjson.rs @@ -12,6 +12,7 @@ use crate::timebin::cached::reader::EventsReadProvider; use futures_util::future::BoxFuture; use futures_util::Stream; use futures_util::StreamExt; +use futures_util::TryStreamExt; use items_0::collect_s::CollectableDyn; use items_0::on_sitemty_data; use items_0::streamitem::RangeCompletableItem; @@ -320,7 +321,7 @@ pub async fn timebinned_json( cache_read_provider: Arc, events_read_provider: Arc, timeout_provider: Box, -) -> Result, Error> { +) -> Result, Error> { let deadline = Instant::now() + query .timeout_content() @@ -353,9 +354,10 @@ pub async fn timebinned_json( let collres = collected.await?; match collres { CollectResult::Some(collres) => { - let x = collres.to_user_facing_api_type_box(); - let jsval = x.to_json_value()?; - Ok(CollectResult::Some(jsval)) + let x = collres.into_user_facing_api_type_box(); + let val = x.into_serializable(); + let jsval = serde_json::to_string(&val)?; + Ok(CollectResult::Some(JsonBytes::new(jsval))) } CollectResult::Timeout => Ok(CollectResult::Timeout), } @@ -363,16 +365,17 @@ pub async fn timebinned_json( fn take_collector_result( coll: &mut Box, -) -> Option { +) -> Option { match coll.result() { Ok(collres) => { - let x = collres.to_user_facing_api_type_box(); - match x.to_json_value() { - Ok(val) => Some(val), - Err(e) => Some(serde_json::Value::String(format!("{e}"))), + let x = collres.into_user_facing_api_type_box(); + let val = x.into_serializable(); + match serde_json::to_string(&val) { + Ok(jsval) => Some(JsonBytes::new(jsval)), + Err(e) => Some(JsonBytes::new("{\"ERROR\":true}")), } } - Err(e) => Some(serde_json::Value::String(format!("{e}"))), + Err(e) => Some(JsonBytes::new("{\"ERROR\":true}")), } } @@ -474,11 +477,12 @@ pub async fn timebinned_json_framed( }); let stream = stream.filter_map(|x| futures_util::future::ready(x)); // TODO skip the intermediate conversion to js value, go directly to string data - let stream = stream.map(|x| match x { - Ok(x) => Ok(JsonBytes::new(serde_json::to_string(&x).unwrap())), - Err(e) => Err(crate::json_stream::Error::from(crate::json_stream::ErrMsg( - e, - ))), - }); + // let stream = stream.map(|x| match x { + // Ok(x) => Ok(JsonBytes::new(serde_json::to_string(&x).unwrap())), + // Err(e) => Err(crate::json_stream::Error::from(crate::json_stream::ErrMsg( + // e, + // ))), + // }); + let stream = stream.map_err(|e| crate::json_stream::Error::Msg(e.to_string())); Ok(Box::pin(stream)) }