WIP testing cbor output

This commit is contained in:
Dominik Werder
2023-12-19 16:30:06 +01:00
parent a8479b2c8d
commit b5ce2dd743
17 changed files with 495 additions and 116 deletions

296
crates/streams/src/cbor.rs Normal file
View File

@@ -0,0 +1,296 @@
use bytes::Buf;
use bytes::BufMut;
use bytes::Bytes;
use bytes::BytesMut;
use err::Error;
use futures_util::Stream;
use futures_util::StreamExt;
use items_0::streamitem::LogItem;
use items_0::streamitem::RangeCompletableItem;
use items_0::streamitem::Sitemty;
use items_0::streamitem::StreamItem;
use items_0::Events;
use items_0::WithLen;
use items_2::eventsdim0::EventsDim0;
use items_2::eventsdim1::EventsDim1;
use netpod::log::Level;
use netpod::log::*;
use netpod::ScalarType;
use netpod::Shape;
use std::fmt;
use std::io::Cursor;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
trait ErrConv<T> {
fn ec(self) -> Result<T, Error>;
}
impl<T, K> ErrConv<T> for Result<T, ciborium::de::Error<K>>
where
K: fmt::Debug,
{
fn ec(self) -> Result<T, Error> {
self.map_err(|e| Error::from_string(format!("{e}")))
}
}
pub struct CborBytes(Bytes);
impl CborBytes {
pub fn into_inner(self) -> Bytes {
self.0
}
pub fn len(&self) -> u32 {
self.0.len() as _
}
}
impl WithLen for CborBytes {
fn len(&self) -> usize {
self.len() as usize
}
}
impl From<CborBytes> for Bytes {
fn from(value: CborBytes) -> Self {
value.0
}
}
pub type CborStream = Pin<Box<dyn Stream<Item = Result<CborBytes, Error>> + Send>>;
pub type SitemtyDynEventsStream =
Pin<Box<dyn Stream<Item = Result<StreamItem<RangeCompletableItem<Box<dyn Events>>>, Error>> + Send>>;
pub fn events_stream_to_cbor_stream(stream: SitemtyDynEventsStream) -> impl Stream<Item = Result<CborBytes, Error>> {
let stream = stream.map(|x| match x {
Ok(x) => match x {
StreamItem::DataItem(x) => match x {
RangeCompletableItem::Data(evs) => {
if false {
use items_0::AsAnyRef;
// TODO impl generically on EventsDim0 ?
if let Some(evs) = evs.as_any_ref().downcast_ref::<items_2::eventsdim0::EventsDim0<f64>>() {
let mut buf = Vec::new();
ciborium::into_writer(evs, &mut buf)
.map_err(|e| Error::with_msg_no_trace(format!("{e}")))?;
let bytes = Bytes::from(buf);
let _item = CborBytes(bytes);
// Ok(StreamItem::DataItem(RangeCompletableItem::Data(item)))
} else {
let _item = LogItem::from_node(0, Level::DEBUG, format!("cbor stream discarded item"));
// Ok(StreamItem::Log(item))
};
}
let buf = evs.to_cbor_vec_u8();
let bytes = Bytes::from(buf);
let item = CborBytes(bytes);
Ok(item)
}
RangeCompletableItem::RangeComplete => {
use ciborium::cbor;
let item = cbor!({
"rangeFinal" => true,
})
.map_err(Error::from_string)?;
let mut buf = Vec::with_capacity(64);
ciborium::into_writer(&item, &mut buf).map_err(Error::from_string)?;
let bytes = Bytes::from(buf);
let item = CborBytes(bytes);
Ok(item)
}
},
StreamItem::Log(item) => {
info!("{item:?}");
let item = CborBytes(Bytes::new());
Ok(item)
}
StreamItem::Stats(item) => {
info!("{item:?}");
let item = CborBytes(Bytes::new());
Ok(item)
}
},
Err(e) => {
use ciborium::cbor;
let item = cbor!({
"error" => e.to_string(),
})
.map_err(Error::from_string)?;
let mut buf = Vec::with_capacity(64);
ciborium::into_writer(&item, &mut buf).map_err(Error::from_string)?;
let bytes = Bytes::from(buf);
let item = CborBytes(bytes);
Ok(item)
}
});
stream
}
pub struct FramedBytesToSitemtyDynEventsStream<S> {
inp: S,
scalar_type: ScalarType,
shape: Shape,
buf: BytesMut,
}
impl<S> FramedBytesToSitemtyDynEventsStream<S> {
pub fn new(inp: S, scalar_type: ScalarType, shape: Shape) -> Self {
Self {
inp,
scalar_type,
shape,
buf: BytesMut::with_capacity(1024 * 64),
}
}
fn try_parse(&mut self) -> Result<Option<Sitemty<Box<dyn Events>>>, Error> {
// debug!("try_parse {}", self.buf.len());
if self.buf.len() < 4 {
return Ok(None);
}
let n = u32::from_le_bytes(self.buf[..4].try_into()?);
if n > 1024 * 1024 * 40 {
let e = Error::with_msg_no_trace(format!("frame too large {n}"));
error!("{e}");
return Err(e);
}
if self.buf.len() < 4 + n as usize {
// debug!("not enough {} {}", n, self.buf.len());
return Ok(None);
}
let buf = &self.buf[4..4 + n as usize];
let val: ciborium::Value = ciborium::from_reader(std::io::Cursor::new(buf)).map_err(Error::from_string)?;
// debug!("decoded ciborium value {val:?}");
let item = if let Some(map) = val.as_map() {
if let Some(x) = map.get(0) {
if let Some(y) = x.0.as_text() {
if y == "rangeFinal" {
if let Some(y) = x.1.as_bool() {
if y {
Some(StreamItem::DataItem(
RangeCompletableItem::<Box<dyn Events>>::RangeComplete,
))
} else {
None
}
} else {
None
}
} else {
None
}
} else {
None
}
} else {
None
}
} else {
None
};
let item = if let Some(x) = item {
Some(x)
} else {
let item = decode_cbor_to_box_events(buf, &self.scalar_type, &self.shape)?;
Some(StreamItem::DataItem(RangeCompletableItem::Data(item)))
};
self.buf.advance(4 + n as usize);
if let Some(x) = item {
Ok(Some(Ok(x)))
} else {
let item = LogItem::from_node(0, Level::DEBUG, format!("decoded ciborium Value"));
Ok(Some(Ok(StreamItem::Log(item))))
}
}
}
impl<S> Stream for FramedBytesToSitemtyDynEventsStream<S>
where
S: Stream<Item = Result<Bytes, Error>> + Unpin,
{
type Item = <SitemtyDynEventsStream as Stream>::Item;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
loop {
break match self.try_parse() {
Ok(Some(x)) => Ready(Some(x)),
Ok(None) => match self.inp.poll_next_unpin(cx) {
Ready(Some(x)) => match x {
Ok(x) => {
self.buf.put_slice(&x);
continue;
}
Err(e) => Ready(Some(Err(e))),
},
Ready(None) => {
if self.buf.len() > 0 {
warn!("remaining bytes in input buffer, input closed len {}", self.buf.len());
}
Ready(None)
}
Pending => Pending,
},
Err(e) => Ready(Some(Err(e))),
};
}
}
}
macro_rules! cbor_scalar {
($ty:ident, $buf:expr) => {{
type T = $ty;
type C = EventsDim0<T>;
let item: C = ciborium::from_reader(Cursor::new($buf)).ec()?;
Box::new(item)
}};
}
macro_rules! cbor_wave {
($ty:ident, $buf:expr) => {{
type T = $ty;
type C = EventsDim1<T>;
let item: C = ciborium::from_reader(Cursor::new($buf)).ec()?;
Box::new(item)
}};
}
fn decode_cbor_to_box_events(buf: &[u8], scalar_type: &ScalarType, shape: &Shape) -> Result<Box<dyn Events>, Error> {
let item: Box<dyn Events> = match shape {
Shape::Scalar => match scalar_type {
ScalarType::U8 => cbor_scalar!(u8, buf),
ScalarType::U16 => cbor_scalar!(u16, buf),
ScalarType::U32 => cbor_scalar!(u32, buf),
ScalarType::U64 => cbor_scalar!(u64, buf),
ScalarType::I8 => cbor_scalar!(i8, buf),
ScalarType::I16 => cbor_scalar!(i16, buf),
ScalarType::I32 => cbor_scalar!(i32, buf),
ScalarType::I64 => cbor_scalar!(i64, buf),
ScalarType::F32 => cbor_scalar!(f32, buf),
ScalarType::F64 => cbor_scalar!(f64, buf),
_ => {
return Err(Error::from_string(format!(
"decode_cbor_to_box_events {:?} {:?}",
scalar_type, shape
)))
}
},
Shape::Wave(_) => match scalar_type {
ScalarType::U8 => cbor_wave!(u8, buf),
ScalarType::U16 => cbor_wave!(u16, buf),
ScalarType::I64 => cbor_wave!(i64, buf),
_ => {
return Err(Error::from_string(format!(
"decode_cbor_to_box_events {:?} {:?}",
scalar_type, shape
)))
}
},
Shape::Image(_, _) => todo!(),
};
Ok(item)
}

View File

@@ -0,0 +1,32 @@
use crate::cbor::CborBytes;
use futures_util::future;
use futures_util::Stream;
use futures_util::StreamExt;
pub fn non_empty<S, E>(inp: S) -> impl Stream<Item = Result<CborBytes, E>>
where
S: Stream<Item = Result<CborBytes, E>>,
{
inp.filter(|x| {
future::ready(match x {
Ok(x) => x.len() > 0,
Err(_) => true,
})
})
}
pub fn only_first_err<S, T, E>(inp: S) -> impl Stream<Item = Result<T, E>>
where
S: Stream<Item = Result<T, E>>,
{
inp.take_while({
let mut state = true;
move |x| {
let ret = state;
if x.is_err() {
state = false;
}
future::ready(ret)
}
})
}

View File

@@ -0,0 +1,27 @@
use bytes::BufMut;
use bytes::Bytes;
use bytes::BytesMut;
use futures_util::future;
use futures_util::stream;
use futures_util::Stream;
use futures_util::StreamExt;
use items_0::WithLen;
pub fn length_framed<S, T, E>(inp: S) -> impl Stream<Item = Result<Bytes, E>>
where
S: Stream<Item = Result<T, E>>,
T: WithLen + Into<Bytes>,
{
inp.map(|x| match x {
Ok(x) => {
let n = x.len() as u32;
let mut buf1 = BytesMut::with_capacity(8);
buf1.put_u32_le(n);
[Some(Ok(buf1.freeze())), Some(Ok(x.into()))]
}
Err(e) => [Some(Err(e)), None],
})
.map(|x| stream::iter(x))
.flatten()
.filter_map(|x| future::ready(x))
}

View File

@@ -1,10 +1,13 @@
pub mod boxed;
pub mod cbor;
pub mod collect;
pub mod dtflags;
pub mod filechunkread;
pub mod firsterr;
pub mod frames;
pub mod generators;
pub mod itemclone;
pub mod lenframed;
pub mod needminbuffer;
pub mod plaineventscbor;
pub mod plaineventsjson;

View File

@@ -1,30 +1,14 @@
use crate::cbor::events_stream_to_cbor_stream;
use crate::cbor::CborStream;
use crate::firsterr::non_empty;
use crate::firsterr::only_first_err;
use crate::plaineventsstream::dyn_events_stream;
use crate::tcprawclient::OpenBoxedBytesStreamsBox;
use bytes::Bytes;
use err::Error;
use futures_util::future;
use futures_util::Stream;
use futures_util::StreamExt;
use items_0::streamitem::LogItem;
use items_0::streamitem::RangeCompletableItem;
use items_0::streamitem::StreamItem;
use netpod::log::Level;
use netpod::log::*;
use netpod::ChannelTypeConfigGen;
use netpod::NodeConfigCached;
use netpod::ReqCtx;
use query::api4::events::PlainEventsQuery;
use std::pin::Pin;
pub struct CborBytes(Bytes);
impl CborBytes {
pub fn into_inner(self) -> Bytes {
self.0
}
}
pub type CborStream = Pin<Box<dyn Stream<Item = Result<CborBytes, Error>> + Send>>;
pub async fn plain_events_cbor(
evq: &PlainEventsQuery,
@@ -33,83 +17,8 @@ pub async fn plain_events_cbor(
open_bytes: OpenBoxedBytesStreamsBox,
) -> Result<CborStream, Error> {
let stream = dyn_events_stream(evq, ch_conf, ctx, open_bytes).await?;
let stream = stream
.map(|x| match x {
Ok(x) => match x {
StreamItem::DataItem(x) => match x {
RangeCompletableItem::Data(evs) => {
if false {
use items_0::AsAnyRef;
// TODO impl generically on EventsDim0 ?
if let Some(evs) = evs.as_any_ref().downcast_ref::<items_2::eventsdim0::EventsDim0<f64>>() {
let mut buf = Vec::new();
ciborium::into_writer(evs, &mut buf)
.map_err(|e| Error::with_msg_no_trace(format!("{e}")))?;
let bytes = Bytes::from(buf);
let _item = CborBytes(bytes);
// Ok(StreamItem::DataItem(RangeCompletableItem::Data(item)))
} else {
let _item = LogItem::from_node(0, Level::DEBUG, format!("cbor stream discarded item"));
// Ok(StreamItem::Log(item))
};
}
let buf = evs.to_cbor_vec_u8();
let bytes = Bytes::from(buf);
let item = CborBytes(bytes);
Ok(item)
}
RangeCompletableItem::RangeComplete => {
use ciborium::cbor;
let item = cbor!({
"rangeFinal" => true,
})
.map_err(Error::from_string)?;
let mut buf = Vec::with_capacity(64);
ciborium::into_writer(&item, &mut buf).map_err(Error::from_string)?;
let bytes = Bytes::from(buf);
let item = CborBytes(bytes);
Ok(item)
}
},
StreamItem::Log(item) => {
info!("{item:?}");
let item = CborBytes(Bytes::new());
Ok(item)
}
StreamItem::Stats(item) => {
info!("{item:?}");
let item = CborBytes(Bytes::new());
Ok(item)
}
},
Err(e) => {
use ciborium::cbor;
let item = cbor!({
"error" => e.to_string(),
})
.map_err(Error::from_string)?;
let mut buf = Vec::with_capacity(64);
ciborium::into_writer(&item, &mut buf).map_err(Error::from_string)?;
let bytes = Bytes::from(buf);
let item = CborBytes(bytes);
Ok(item)
}
})
.filter(|x| {
future::ready(match x {
Ok(x) => x.0.len() > 0,
Err(_) => true,
})
})
.take_while({
let mut state = true;
move |x| {
let ret = state;
if x.is_err() {
state = false;
}
future::ready(ret)
}
});
let stream = events_stream_to_cbor_stream(stream);
let stream = non_empty(stream);
let stream = only_first_err(stream);
Ok(Box::pin(stream))
}

View File

@@ -1,10 +1,15 @@
use crate::cbor::FramedBytesToSitemtyDynEventsStream;
use crate::firsterr::only_first_err;
use crate::frames::inmem::BoxedBytesStream;
use crate::lenframed;
use crate::plaineventscbor::plain_events_cbor;
use crate::tcprawclient::OpenBoxedBytesStreams;
use crate::tcprawclient::TEST_BACKEND;
use err::Error;
use futures_util::future;
use futures_util::Future;
use futures_util::StreamExt;
use netpod::log::*;
use netpod::range::evrange::NanoRange;
use netpod::range::evrange::SeriesRange;
use netpod::ChConf;
@@ -23,22 +28,30 @@ fn merged_events_cbor() {
async fn merged_events_inner() -> Result<(), Error> {
let ctx = ReqCtx::for_test();
let ch_conf = ChConf::new(TEST_BACKEND, 1, ScalarType::F64, Shape::Scalar, "test-gen-i32-dim0-v00");
// TODO factor out the channel config lookup such that the test code can use a similar code path,
// except that we don't want to go over the network here.
let ch_conf = ChConf::new(TEST_BACKEND, 1, ScalarType::I32, Shape::Scalar, "test-gen-i32-dim0-v00");
let channel = SfDbChannel::from_name(ch_conf.backend(), ch_conf.name());
let range = SeriesRange::TimeRange(NanoRange::from_date_time(
"2023-12-18T05:10:00Z".parse().unwrap(),
"2023-12-18T05:10:10Z".parse().unwrap(),
"2023-12-18T05:12:00Z".parse().unwrap(),
));
let evq = PlainEventsQuery::new(channel, range);
let open_bytes = StreamOpener::new();
let open_bytes = Box::pin(open_bytes);
let mut res = plain_events_cbor(&evq, ch_conf.into(), &ctx, open_bytes).await.unwrap();
// TODO parse the cbor stream and assert
while let Some(x) = res.next().await {
let item = x?;
let bytes = item.into_inner();
eprintln!("bytes len {}", bytes.len());
}
let stream = plain_events_cbor(&evq, ch_conf.clone().into(), &ctx, open_bytes)
.await
.unwrap();
let stream = lenframed::length_framed(stream);
let stream =
FramedBytesToSitemtyDynEventsStream::new(stream, ch_conf.scalar_type().clone(), ch_conf.shape().clone());
let stream = only_first_err(stream);
stream
.for_each(|item| {
debug!("{item:?}");
future::ready(())
})
.await;
Ok(())
}

View File

@@ -163,12 +163,6 @@ pub fn build_full_transform_collectable(
// TODO this must return a Stream!
//let evs = build_event_transform(tr, inp)?;
let trtb = tr.get_tr_time_binning();
use futures_util::Stream;
use items_0::collect_s::Collectable;
use items_0::streamitem::RangeCompletableItem;
use items_0::streamitem::Sitemty;
use items_0::streamitem::StreamItem;
use std::pin::Pin;
let a: Pin<Box<dyn Stream<Item = Sitemty<Box<dyn Collectable>>> + Send>> = Box::pin(inp.0.map(|item| match item {
Ok(item) => match item {
StreamItem::DataItem(item) => match item {