Support container output format

This commit is contained in:
Dominik Werder
2024-12-04 12:15:29 +01:00
parent 34e82d9584
commit 71f5c3c763
10 changed files with 81 additions and 146 deletions

View File

@@ -7,6 +7,7 @@ use bytes::Bytes;
use bytes::BytesMut;
use futures_util::Stream;
use futures_util::StreamExt;
use items_0::apitypes::ToUserFacingApiType;
use items_0::streamitem::sitem_err2_from_string;
use items_0::streamitem::sitem_err_from_string;
use items_0::streamitem::LogItem;
@@ -14,7 +15,6 @@ use items_0::streamitem::RangeCompletableItem;
use items_0::streamitem::Sitemty;
use items_0::streamitem::StreamItem;
use items_0::Events;
use items_0::WithLen;
use items_2::channelevents::ChannelEvents;
use items_2::jsonbytes::CborBytes;
use netpod::log::Level;
@@ -66,15 +66,16 @@ pub fn events_stream_to_cbor_stream(
stream
}
fn map_events(x: Sitemty<ChannelEvents>) -> Result<CborBytes, Error> {
fn map_events<T>(x: Sitemty<T>) -> Result<CborBytes, Error>
where
T: ToUserFacingApiType,
{
match x {
Ok(x) => match x {
StreamItem::DataItem(x) => match x {
RangeCompletableItem::Data(evs) => {
trace!("map_events Data evs len {}", evs.len());
use items_0::apitypes::ToUserFacingApiType;
let val = evs.to_user_facing_api_type();
let val = val.to_cbor_value()?;
let val = evs.into_user_facing_api_type();
let val = val.into_serializable();
let mut buf = Vec::with_capacity(64);
ciborium::into_writer(&val, &mut buf).map_err(|e| Error::Msg(e.to_string()))?;
let bytes = Bytes::from(buf);

View File

@@ -1,4 +1,5 @@
use crate::streamtimeout::StreamTimeout2;
use core::fmt;
use futures_util::Future;
use futures_util::FutureExt;
use futures_util::Stream;
@@ -40,7 +41,11 @@ where
}
}
pub enum CollectResult<T> {
#[derive(Debug)]
pub enum CollectResult<T>
where
T: fmt::Debug,
{
Timeout,
Some(T),
}

View File

@@ -34,7 +34,7 @@ impl Stream for ConvertForBinning {
.downcast_ref::<ContainerEvents<EnumVariant>>()
{
let mut dst = ContainerEvents::new();
for (&ts, val) in evs.iter_zip() {
for (ts, val) in evs.iter_zip() {
dst.push_back(ts, val.ix);
}
let item = Ok(DataItem(Data(ChannelEvents::Events(Box::new(dst)))));
@@ -43,7 +43,7 @@ impl Stream for ConvertForBinning {
evs.as_any_ref().downcast_ref::<ContainerEvents<bool>>()
{
let mut dst = ContainerEvents::new();
for (&ts, val) in evs.iter_zip() {
for (ts, val) in evs.iter_zip() {
dst.push_back(ts, val as u8);
}
let item = Ok(DataItem(Data(ChannelEvents::Events(Box::new(dst)))));
@@ -52,7 +52,7 @@ impl Stream for ConvertForBinning {
evs.as_any_ref().downcast_ref::<ContainerEvents<String>>()
{
let mut dst = ContainerEvents::new();
for (&ts, _) in evs.iter_zip() {
for (ts, _) in evs.iter_zip() {
dst.push_back(ts, 1);
}
let item = Ok(DataItem(Data(ChannelEvents::Events(Box::new(dst)))));
@@ -96,7 +96,7 @@ impl Stream for ConvertForTesting {
let s = String::from_utf8_lossy(&buf);
if s.contains("u8") {
let mut dst = Cont::new();
for (&ts, val) in evs.iter_zip() {
for (ts, val) in evs.iter_zip() {
let v = (val * 1e6) as u8;
dst.push_back(ts, v);
}
@@ -104,7 +104,7 @@ impl Stream for ConvertForTesting {
Ready(Some(item))
} else if s.contains("i16") {
let mut dst = Cont::new();
for (&ts, val) in evs.iter_zip() {
for (ts, val) in evs.iter_zip() {
let v = (val * 1e6) as i16 - 50;
dst.push_back(ts, v);
}
@@ -112,7 +112,7 @@ impl Stream for ConvertForTesting {
Ready(Some(item))
} else if s.contains("bool") {
let mut dst = Cont::new();
for (&ts, val) in evs.iter_zip() {
for (ts, val) in evs.iter_zip() {
let g = u64::from_ne_bytes(val.to_ne_bytes());
let val = g % 2 == 0;
dst.push_back(ts, val);
@@ -121,7 +121,7 @@ impl Stream for ConvertForTesting {
Ready(Some(item))
} else if s.contains("enum") {
let mut dst = Cont::new();
for (&ts, val) in evs.iter_zip() {
for (ts, val) in evs.iter_zip() {
let buf = val.to_ne_bytes();
let h = buf[0]
^ buf[1]
@@ -138,7 +138,7 @@ impl Stream for ConvertForTesting {
Ready(Some(item))
} else if s.contains("string") {
let mut dst = Cont::new();
for (&ts, val) in evs.iter_zip() {
for (ts, val) in evs.iter_zip() {
dst.push_back(ts, val.to_string());
}
let item = Ok(DataItem(Data(ChannelEvents::Events(Box::new(dst)))));

View File

@@ -1,3 +1,4 @@
use crate::log::*;
use crate::slidebuf::SlideBuf;
use bytes::Bytes;
use futures_util::pin_mut;
@@ -8,11 +9,8 @@ use items_0::streamitem::SitemErrTy;
use items_0::streamitem::Sitemty;
use items_0::streamitem::StreamItem;
use items_0::streamitem::TERM_FRAME_TYPE_ID;
use items_2::framable::INMEM_FRAME_FOOT;
use items_2::framable::INMEM_FRAME_HEAD;
use items_2::framable::INMEM_FRAME_MAGIC;
use items_2::inmem::InMemoryFrame;
use netpod::log::*;
use netpod::ByteSize;
use std::pin::Pin;
use std::task::Context;
@@ -31,6 +29,7 @@ pub enum Error {
TryFromSlice(#[from] std::array::TryFromSliceError),
BadCrc,
EnoughInputNothingParsed,
InMemParse(#[from] items_2::inmem::Error),
}
pub type BoxedBytesStream = Pin<Box<dyn Stream<Item = Result<Bytes, SitemErrTy>> + Send>>;
@@ -76,9 +75,6 @@ where
fn poll_upstream(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<usize, Error>> {
trace2!("poll_upstream");
use Poll::*;
// use tokio::io::AsyncRead;
// use tokio::io::ReadBuf;
// let mut buf = ReadBuf::new(self.buf.available_writable_area(self.need_min.saturating_sub(self.buf.len()))?);
let inp = &mut self.inp;
pin_mut!(inp);
match inp.poll_next(cx) {
@@ -94,16 +90,6 @@ where
Ready(None) => Ready(Ok(0)),
Pending => Pending,
}
// match AsyncRead::poll_read(inp, cx, &mut buf) {
// Ready(Ok(())) => {
// let n = buf.filled().len();
// self.buf.wadv(n)?;
// trace2!("recv bytes {}", n);
// Ready(Ok(n))
// }
// Ready(Err(e)) => Ready(Err(e.into())),
// Pending => Pending,
// }
}
// Try to consume bytes to parse a frame.
@@ -114,70 +100,22 @@ where
if buf.len() < self.need_min {
return Err(Error::LessThanNeedMin);
}
if buf.len() < INMEM_FRAME_HEAD {
return Err(Error::LessThanHeader);
use items_2::inmem::ParseResult;
match InMemoryFrame::parse(buf) {
Ok(x) => match x {
ParseResult::NotEnoughData(n) => {
self.need_min = n;
Ok(None)
}
ParseResult::Parsed(lentot, val) => {
self.buf.adv(lentot)?;
self.need_min = INMEM_FRAME_HEAD;
self.inp_bytes_consumed += lentot as u64;
Ok(Some(val))
}
},
Err(e) => Err(e.into()),
}
let magic = u32::from_le_bytes(buf[0..4].try_into()?);
let encid = u32::from_le_bytes(buf[4..8].try_into()?);
let tyid = u32::from_le_bytes(buf[8..12].try_into()?);
let len = u32::from_le_bytes(buf[12..16].try_into()?);
let payload_crc_exp = u32::from_le_bytes(buf[16..20].try_into()?);
if magic != INMEM_FRAME_MAGIC {
let n = buf.len().min(64);
let u = String::from_utf8_lossy(&buf[0..n]);
let msg = format!(
"InMemoryFrameAsyncReadStream tryparse incorrect magic: {} buf as utf8: {:?}",
magic, u
);
error!("{msg}");
return Err(Error::BadMagic(magic));
}
if len > 1024 * 1024 * 50 {
let msg = format!(
"InMemoryFrameAsyncReadStream tryparse huge buffer len {} self.inp_bytes_consumed {}",
len, self.inp_bytes_consumed
);
error!("{msg}");
return Err(Error::HugeFrame(len));
}
let lentot = INMEM_FRAME_HEAD + INMEM_FRAME_FOOT + len as usize;
if buf.len() < lentot {
// TODO count cases in production
self.need_min = lentot;
return Ok(None);
}
let p1 = INMEM_FRAME_HEAD + len as usize;
let mut h = crc32fast::Hasher::new();
h.update(&buf[..p1]);
let frame_crc = h.finalize();
let mut h = crc32fast::Hasher::new();
h.update(&buf[INMEM_FRAME_HEAD..p1]);
let payload_crc = h.finalize();
let frame_crc_ind = u32::from_le_bytes(buf[p1..p1 + 4].try_into()?);
let payload_crc_match = payload_crc_exp == payload_crc;
let frame_crc_match = frame_crc_ind == frame_crc;
if !frame_crc_match || !payload_crc_match {
let _ss = String::from_utf8_lossy(&buf[..buf.len().min(256)]);
let msg = format!(
"InMemoryFrameAsyncReadStream tryparse crc mismatch A {} {}",
payload_crc_match, frame_crc_match,
);
error!("{msg}");
let e = Error::BadCrc;
return Err(e);
}
self.inp_bytes_consumed += lentot as u64;
// TODO metrics
//trace!("parsed frame well len {}", len);
let ret = InMemoryFrame {
len,
tyid,
encid,
buf: Bytes::from(buf[INMEM_FRAME_HEAD..p1].to_vec()),
};
self.buf.adv(lentot)?;
self.need_min = INMEM_FRAME_HEAD;
Ok(Some(ret))
}
}

View File

@@ -3,17 +3,12 @@ use crate::streamtimeout::StreamTimeout2;
use crate::streamtimeout::TimeoutableStream;
use futures_util::Stream;
use futures_util::StreamExt;
use items_0::collect_s::ToJsonValue;
use items_0::apitypes::ToUserFacingApiType;
use items_0::streamitem::RangeCompletableItem;
use items_0::streamitem::Sitemty;
use items_0::streamitem::StreamItem;
use items_0::Events;
use items_0::WithLen;
use items_2::binning::container_events::ContainerEvents;
use items_2::channelevents::ChannelEvents;
use items_2::jsonbytes::JsonBytes;
use netpod::log::*;
use netpod::EnumVariant;
use std::pin::Pin;
use std::time::Duration;
@@ -54,13 +49,14 @@ pub fn events_stream_to_json_stream(
fn map_events<T>(x: Sitemty<T>) -> Result<JsonBytes, Error>
where
T: ToJsonValue,
T: ToUserFacingApiType,
{
match x {
Ok(x) => match x {
StreamItem::DataItem(x) => match x {
RangeCompletableItem::Data(evs) => {
let val = evs.to_json_value()?;
let val = evs.into_user_facing_api_type();
let val = val.into_serializable_json();
let s = serde_json::to_string(&val)?;
let item = JsonBytes::new(s);
Ok(item)

View File

@@ -28,7 +28,7 @@ where
stream::iter([Ok(b2.freeze()), Ok(buf), Ok(b3.freeze())])
}
Err(e) => {
error!("{e}");
error!("{}", e);
stream::iter([Ok(Bytes::new()), Ok(Bytes::new()), Ok(Bytes::new())])
}
})
@@ -58,7 +58,7 @@ where
stream::iter([Ok::<_, E>(b2), Ok(s), Ok(String::from("\n"))])
}
Err(e) => {
error!("{e}");
error!("{}", e);
stream::iter([Ok(String::new()), Ok(String::new()), Ok(String::new())])
}
})

View File

@@ -11,6 +11,7 @@ use crate::tcprawclient::OpenBoxedBytesStreamsBox;
use futures_util::StreamExt;
use items_0::collect_s::CollectableDyn;
use items_0::on_sitemty_data;
use items_2::jsonbytes::JsonBytes;
use netpod::log::*;
use netpod::ChannelTypeConfigGen;
use netpod::Cluster;
@@ -36,7 +37,7 @@ pub async fn plain_events_json(
_cluster: &Cluster,
open_bytes: OpenBoxedBytesStreamsBox,
timeout_provider: Box<dyn StreamTimeout2>,
) -> Result<CollectResult<JsonValue>, Error> {
) -> Result<CollectResult<JsonBytes>, Error> {
debug!("plain_events_json evquery {:?}", evq);
let deadline = Instant::now() + evq.timeout_content_or_default();
let stream = dyn_events_stream(evq, ch_conf, ctx, open_bytes).await?;
@@ -55,12 +56,13 @@ pub async fn plain_events_json(
timeout_provider,
)
.await?;
debug!("plain_events_json collected");
warn!("plain_events_json collected {:?}", collected);
if let CollectResult::Some(x) = collected {
let x = x.to_user_facing_api_type_box();
let jsval = x.to_json_value()?;
let x = x.into_user_facing_api_type_box();
let val = x.into_serializable_json();
let jsval = serde_json::to_string(&val)?;
debug!("plain_events_json json serialized");
Ok(CollectResult::Some(jsval))
Ok(CollectResult::Some(JsonBytes::new(jsval)))
} else {
debug!("plain_events_json timeout");
Ok(CollectResult::Timeout)

View File

@@ -124,6 +124,7 @@ where
{
debug!("make wasm transform");
use httpclient::url::Url;
use items_2::binning::container_events::ContainerEvents;
use wasmer::Value;
use wasmer::WasmSlice;
let t = httpclient::http_get(
@@ -157,46 +158,35 @@ where
if true {
let r1 = evs
.as_any_mut()
.downcast_mut::<items_2::eventsdim0::EventsDim0<f64>>()
.downcast_mut::<ContainerEvents<f64>>()
.is_some();
let r2 = evs
.as_mut()
.as_any_mut()
.downcast_mut::<items_2::eventsdim0::EventsDim0<f64>>()
.downcast_mut::<Box<ContainerEvents<f64>>>()
.is_some();
let r3 = evs
.as_any_mut()
.downcast_mut::<Box<items_2::eventsdim0::EventsDim0<f64>>>()
.is_some();
let r4 = evs
.as_mut()
.as_any_mut()
.downcast_mut::<Box<items_2::eventsdim0::EventsDim0<f64>>>()
.is_some();
let r5 = evs
.as_mut()
.as_any_mut()
.downcast_mut::<ChannelEvents>()
.is_some();
let r6 = evs
let r4 = evs
.as_mut()
.as_any_mut()
.downcast_mut::<Box<ChannelEvents>>()
.is_some();
debug!("wasm castings: {r1} {r2} {r3} {r4} {r5} {r6}");
debug!("wasm castings: {r1} {r2} {r3} {r4}");
}
if let Some(evs) = evs.as_any_mut().downcast_mut::<ChannelEvents>() {
match evs {
ChannelEvents::Events(evs) => {
if let Some(evs) = evs
.as_any_mut()
.downcast_mut::<items_2::eventsdim0::EventsDim0<f64>>()
if let Some(evs) =
evs.as_any_mut().downcast_mut::<ContainerEvents<f64>>()
{
use items_0::WithLen;
if evs.len() == 0 {
debug!("wasm empty EventsDim0<f64>");
debug!("wasm empty");
} else {
debug!("wasm see EventsDim0<f64>");
debug!("wasm see");
let max_len_needed = 16000;
let dummy1 = instance.exports.get_function("dummy1").unwrap();
let s = evs.values.as_mut_slices();
@@ -252,7 +242,6 @@ where
};
Ok(StreamItem::DataItem(RangeCompletableItem::Data(x)))
});
// Box::new(item) as Box<dyn Framable + Send>
item
});
let ret: Pin<Box<dyn Stream<Item = Sitemty<Box<dyn Events>>> + Send>> = Box::pin(stream);

View File

@@ -247,7 +247,7 @@ where
}
Ok(None) => continue,
Err(e) => {
error!("sees: {e}");
error!("sees: {}", e);
self.inp_done = true;
Ready(Some(sitem_err_from_string(e)))
}

View File

@@ -12,6 +12,7 @@ use crate::timebin::cached::reader::EventsReadProvider;
use futures_util::future::BoxFuture;
use futures_util::Stream;
use futures_util::StreamExt;
use futures_util::TryStreamExt;
use items_0::collect_s::CollectableDyn;
use items_0::on_sitemty_data;
use items_0::streamitem::RangeCompletableItem;
@@ -320,7 +321,7 @@ pub async fn timebinned_json(
cache_read_provider: Arc<dyn CacheReadProvider>,
events_read_provider: Arc<dyn EventsReadProvider>,
timeout_provider: Box<dyn StreamTimeout2>,
) -> Result<CollectResult<JsonValue>, Error> {
) -> Result<CollectResult<JsonBytes>, Error> {
let deadline = Instant::now()
+ query
.timeout_content()
@@ -353,9 +354,10 @@ pub async fn timebinned_json(
let collres = collected.await?;
match collres {
CollectResult::Some(collres) => {
let x = collres.to_user_facing_api_type_box();
let jsval = x.to_json_value()?;
Ok(CollectResult::Some(jsval))
let x = collres.into_user_facing_api_type_box();
let val = x.into_serializable();
let jsval = serde_json::to_string(&val)?;
Ok(CollectResult::Some(JsonBytes::new(jsval)))
}
CollectResult::Timeout => Ok(CollectResult::Timeout),
}
@@ -363,16 +365,17 @@ pub async fn timebinned_json(
fn take_collector_result(
coll: &mut Box<dyn items_0::collect_s::CollectorDyn>,
) -> Option<serde_json::Value> {
) -> Option<JsonBytes> {
match coll.result() {
Ok(collres) => {
let x = collres.to_user_facing_api_type_box();
match x.to_json_value() {
Ok(val) => Some(val),
Err(e) => Some(serde_json::Value::String(format!("{e}"))),
let x = collres.into_user_facing_api_type_box();
let val = x.into_serializable();
match serde_json::to_string(&val) {
Ok(jsval) => Some(JsonBytes::new(jsval)),
Err(e) => Some(JsonBytes::new("{\"ERROR\":true}")),
}
}
Err(e) => Some(serde_json::Value::String(format!("{e}"))),
Err(e) => Some(JsonBytes::new("{\"ERROR\":true}")),
}
}
@@ -474,11 +477,12 @@ pub async fn timebinned_json_framed(
});
let stream = stream.filter_map(|x| futures_util::future::ready(x));
// TODO skip the intermediate conversion to js value, go directly to string data
let stream = stream.map(|x| match x {
Ok(x) => Ok(JsonBytes::new(serde_json::to_string(&x).unwrap())),
Err(e) => Err(crate::json_stream::Error::from(crate::json_stream::ErrMsg(
e,
))),
});
// let stream = stream.map(|x| match x {
// Ok(x) => Ok(JsonBytes::new(serde_json::to_string(&x).unwrap())),
// Err(e) => Err(crate::json_stream::Error::from(crate::json_stream::ErrMsg(
// e,
// ))),
// });
let stream = stream.map_err(|e| crate::json_stream::Error::Msg(e.to_string()));
Ok(Box::pin(stream))
}