Add json-framed encoding, docs, refactor
This commit is contained in:
@@ -23,6 +23,9 @@ use std::pin::Pin;
|
||||
use std::task::Context;
|
||||
use std::task::Poll;
|
||||
|
||||
const FRAME_HEAD_LEN: usize = 16;
|
||||
const FRAME_PAYLOAD_MAX: u32 = 1024 * 1024 * 80;
|
||||
|
||||
trait ErrConv<T> {
|
||||
fn ec(self) -> Result<T, Error>;
|
||||
}
|
||||
@@ -62,6 +65,7 @@ impl From<CborBytes> for Bytes {
|
||||
|
||||
pub type CborStream = Pin<Box<dyn Stream<Item = Result<CborBytes, Error>> + Send>>;
|
||||
|
||||
// TODO move this type decl because it is not specific to cbor
|
||||
pub type SitemtyDynEventsStream =
|
||||
Pin<Box<dyn Stream<Item = Result<StreamItem<RangeCompletableItem<Box<dyn Events>>>, Error>> + Send>>;
|
||||
|
||||
@@ -143,29 +147,36 @@ impl<S> FramedBytesToSitemtyDynEventsStream<S> {
|
||||
inp,
|
||||
scalar_type,
|
||||
shape,
|
||||
buf: BytesMut::with_capacity(1024 * 64),
|
||||
buf: BytesMut::with_capacity(1024 * 256),
|
||||
}
|
||||
}
|
||||
|
||||
fn try_parse(&mut self) -> Result<Option<Sitemty<Box<dyn Events>>>, Error> {
|
||||
// debug!("try_parse {}", self.buf.len());
|
||||
if self.buf.len() < 4 {
|
||||
if self.buf.len() < FRAME_HEAD_LEN {
|
||||
return Ok(None);
|
||||
}
|
||||
let n = u32::from_le_bytes(self.buf[..4].try_into()?);
|
||||
if n > 1024 * 1024 * 40 {
|
||||
if n > FRAME_PAYLOAD_MAX {
|
||||
let e = Error::with_msg_no_trace(format!("frame too large {n}"));
|
||||
error!("{e}");
|
||||
return Err(e);
|
||||
}
|
||||
if self.buf.len() < 4 + n as usize {
|
||||
let frame_len = FRAME_HEAD_LEN + n as usize;
|
||||
let adv = (frame_len + 7) / 8 * 8;
|
||||
assert!(adv % 8 == 0);
|
||||
assert!(adv >= frame_len);
|
||||
assert!(adv < 8 + frame_len);
|
||||
if self.buf.len() < adv {
|
||||
// debug!("not enough {} {}", n, self.buf.len());
|
||||
return Ok(None);
|
||||
}
|
||||
let buf = &self.buf[4..4 + n as usize];
|
||||
let buf = &self.buf[FRAME_HEAD_LEN..frame_len];
|
||||
let val: ciborium::Value = ciborium::from_reader(std::io::Cursor::new(buf)).map_err(Error::from_string)?;
|
||||
// debug!("decoded ciborium value {val:?}");
|
||||
let item = if let Some(map) = val.as_map() {
|
||||
let keys: Vec<&str> = map.iter().map(|k| k.0.as_text().unwrap_or("(none)")).collect();
|
||||
debug!("keys {keys:?}");
|
||||
if let Some(x) = map.get(0) {
|
||||
if let Some(y) = x.0.as_text() {
|
||||
if y == "rangeFinal" {
|
||||
@@ -196,9 +207,10 @@ impl<S> FramedBytesToSitemtyDynEventsStream<S> {
|
||||
Some(x)
|
||||
} else {
|
||||
let item = decode_cbor_to_box_events(buf, &self.scalar_type, &self.shape)?;
|
||||
debug!("decoded boxed events len {}", item.len());
|
||||
Some(StreamItem::DataItem(RangeCompletableItem::Data(item)))
|
||||
};
|
||||
self.buf.advance(4 + n as usize);
|
||||
self.buf.advance(adv);
|
||||
if let Some(x) = item {
|
||||
Ok(Some(Ok(x)))
|
||||
} else {
|
||||
|
||||
@@ -2,8 +2,22 @@ use crate::cbor::CborBytes;
|
||||
use futures_util::future;
|
||||
use futures_util::Stream;
|
||||
use futures_util::StreamExt;
|
||||
use items_0::WithLen;
|
||||
|
||||
pub fn non_empty<S, E>(inp: S) -> impl Stream<Item = Result<CborBytes, E>>
|
||||
pub fn non_empty<S, T, E>(inp: S) -> impl Stream<Item = Result<T, E>>
|
||||
where
|
||||
S: Stream<Item = Result<T, E>>,
|
||||
T: WithLen,
|
||||
{
|
||||
inp.filter(|x| {
|
||||
future::ready(match x {
|
||||
Ok(x) => x.len() > 0,
|
||||
Err(_) => true,
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
pub fn non_empty_nongen<S, E>(inp: S) -> impl Stream<Item = Result<CborBytes, E>>
|
||||
where
|
||||
S: Stream<Item = Result<CborBytes, E>>,
|
||||
{
|
||||
|
||||
@@ -420,3 +420,114 @@ impl Stream for GenerateF64V00 {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct GenerateWaveI16V00 {
|
||||
ivl: u64,
|
||||
ts: u64,
|
||||
dts: u64,
|
||||
tsend: u64,
|
||||
node_ix: u64,
|
||||
timeout: Option<Pin<Box<dyn Future<Output = ()> + Send>>>,
|
||||
do_throttle: bool,
|
||||
done: bool,
|
||||
done_range_final: bool,
|
||||
}
|
||||
|
||||
impl GenerateWaveI16V00 {
|
||||
pub fn self_name() -> &'static str {
|
||||
std::any::type_name::<Self>()
|
||||
}
|
||||
|
||||
pub fn new(node_ix: u64, node_count: u64, range: SeriesRange, one_before_range: bool) -> Self {
|
||||
let range = match range {
|
||||
SeriesRange::TimeRange(k) => k,
|
||||
SeriesRange::PulseRange(_) => todo!(),
|
||||
};
|
||||
let ivl = MS * 100;
|
||||
let dts = ivl * node_count as u64;
|
||||
let ts = (range.beg / ivl + node_ix - if one_before_range { 1 } else { 0 }) * ivl;
|
||||
let tsend = range.end;
|
||||
debug!(
|
||||
"{}::new ivl {} dts {} ts {} one_before_range {}",
|
||||
Self::self_name(),
|
||||
ivl,
|
||||
dts,
|
||||
ts,
|
||||
one_before_range
|
||||
);
|
||||
Self {
|
||||
ivl,
|
||||
ts,
|
||||
dts,
|
||||
tsend,
|
||||
node_ix,
|
||||
timeout: None,
|
||||
do_throttle: false,
|
||||
done: false,
|
||||
done_range_final: false,
|
||||
}
|
||||
}
|
||||
|
||||
fn make_batch(&mut self) -> Sitemty<ChannelEvents> {
|
||||
type T = i16;
|
||||
let mut item = EventsDim1::empty();
|
||||
let mut ts = self.ts;
|
||||
loop {
|
||||
if self.ts >= self.tsend || item.byte_estimate() > 1024 * 20 {
|
||||
break;
|
||||
}
|
||||
let pulse = ts;
|
||||
let ampl = ((ts / self.ivl) as f32).sin() + 2.;
|
||||
let mut value = Vec::new();
|
||||
let pi = std::f32::consts::PI;
|
||||
for i in 0..21 {
|
||||
let x = ((-pi + (2. * pi / 20.) * i as f32).cos() + 1.1) * ampl;
|
||||
value.push(x as T);
|
||||
}
|
||||
if false {
|
||||
info!(
|
||||
"v01 node {} made event ts {} pulse {} value {:?}",
|
||||
self.node_ix, ts, pulse, value
|
||||
);
|
||||
}
|
||||
item.push(ts, pulse, value);
|
||||
ts += self.dts;
|
||||
}
|
||||
self.ts = ts;
|
||||
trace!("generated len {}", item.len());
|
||||
let w = ChannelEvents::Events(Box::new(item) as _);
|
||||
let w = sitem_data(w);
|
||||
w
|
||||
}
|
||||
}
|
||||
|
||||
impl Stream for GenerateWaveI16V00 {
|
||||
type Item = Sitemty<ChannelEvents>;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
||||
use Poll::*;
|
||||
loop {
|
||||
break if self.done {
|
||||
Ready(None)
|
||||
} else if self.ts >= self.tsend {
|
||||
self.done = true;
|
||||
self.done_range_final = true;
|
||||
Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete))))
|
||||
} else if !self.do_throttle {
|
||||
// To use the generator without throttling, use this scope
|
||||
Ready(Some(self.make_batch()))
|
||||
} else if let Some(fut) = self.timeout.as_mut() {
|
||||
match fut.poll_unpin(cx) {
|
||||
Ready(()) => {
|
||||
self.timeout = None;
|
||||
Ready(Some(self.make_batch()))
|
||||
}
|
||||
Pending => Pending,
|
||||
}
|
||||
} else {
|
||||
self.timeout = Some(Box::pin(tokio::time::sleep(Duration::from_millis(2))));
|
||||
continue;
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
80
crates/streams/src/json_stream.rs
Normal file
80
crates/streams/src/json_stream.rs
Normal file
@@ -0,0 +1,80 @@
|
||||
use crate::cbor::SitemtyDynEventsStream;
|
||||
use bytes::Bytes;
|
||||
use err::Error;
|
||||
use futures_util::Stream;
|
||||
use futures_util::StreamExt;
|
||||
use items_0::streamitem::RangeCompletableItem;
|
||||
use items_0::streamitem::StreamItem;
|
||||
use items_0::WithLen;
|
||||
use netpod::log::*;
|
||||
use std::pin::Pin;
|
||||
|
||||
pub struct JsonBytes(Bytes);
|
||||
|
||||
impl JsonBytes {
|
||||
pub fn into_inner(self) -> Bytes {
|
||||
self.0
|
||||
}
|
||||
|
||||
pub fn len(&self) -> u32 {
|
||||
self.0.len() as _
|
||||
}
|
||||
}
|
||||
|
||||
impl WithLen for JsonBytes {
|
||||
fn len(&self) -> usize {
|
||||
self.len() as usize
|
||||
}
|
||||
}
|
||||
|
||||
impl From<JsonBytes> for Bytes {
|
||||
fn from(value: JsonBytes) -> Self {
|
||||
value.0
|
||||
}
|
||||
}
|
||||
|
||||
pub type JsonStream = Pin<Box<dyn Stream<Item = Result<JsonBytes, Error>> + Send>>;
|
||||
|
||||
pub fn events_stream_to_json_stream(stream: SitemtyDynEventsStream) -> impl Stream<Item = Result<JsonBytes, Error>> {
|
||||
let stream = stream.map(|x| match x {
|
||||
Ok(x) => match x {
|
||||
StreamItem::DataItem(x) => match x {
|
||||
RangeCompletableItem::Data(evs) => {
|
||||
let buf = evs.to_json_vec_u8();
|
||||
let bytes = Bytes::from(buf);
|
||||
let item = JsonBytes(bytes);
|
||||
Ok(item)
|
||||
}
|
||||
RangeCompletableItem::RangeComplete => {
|
||||
let item = serde_json::json!({
|
||||
"rangeFinal": true,
|
||||
});
|
||||
let buf = serde_json::to_vec(&item)?;
|
||||
let bytes = Bytes::from(buf);
|
||||
let item = JsonBytes(bytes);
|
||||
Ok(item)
|
||||
}
|
||||
},
|
||||
StreamItem::Log(item) => {
|
||||
info!("{item:?}");
|
||||
let item = JsonBytes(Bytes::new());
|
||||
Ok(item)
|
||||
}
|
||||
StreamItem::Stats(item) => {
|
||||
info!("{item:?}");
|
||||
let item = JsonBytes(Bytes::new());
|
||||
Ok(item)
|
||||
}
|
||||
},
|
||||
Err(e) => {
|
||||
let item = serde_json::json!({
|
||||
"error": e.to_string(),
|
||||
});
|
||||
let buf = serde_json::to_vec(&item)?;
|
||||
let bytes = Bytes::from(buf);
|
||||
let item = JsonBytes(bytes);
|
||||
Ok(item)
|
||||
}
|
||||
});
|
||||
stream
|
||||
}
|
||||
@@ -7,6 +7,7 @@ pub mod firsterr;
|
||||
pub mod frames;
|
||||
pub mod generators;
|
||||
pub mod itemclone;
|
||||
pub mod json_stream;
|
||||
pub mod lenframed;
|
||||
pub mod needminbuffer;
|
||||
pub mod plaineventscbor;
|
||||
|
||||
@@ -10,12 +10,13 @@ use netpod::ChannelTypeConfigGen;
|
||||
use netpod::ReqCtx;
|
||||
use query::api4::events::PlainEventsQuery;
|
||||
|
||||
pub async fn plain_events_cbor(
|
||||
pub async fn plain_events_cbor_stream(
|
||||
evq: &PlainEventsQuery,
|
||||
ch_conf: ChannelTypeConfigGen,
|
||||
ctx: &ReqCtx,
|
||||
open_bytes: OpenBoxedBytesStreamsBox,
|
||||
) -> Result<CborStream, Error> {
|
||||
trace!("build stream");
|
||||
let stream = dyn_events_stream(evq, ch_conf, ctx, open_bytes).await?;
|
||||
let stream = events_stream_to_cbor_stream(stream);
|
||||
let stream = non_empty(stream);
|
||||
|
||||
@@ -1,4 +1,8 @@
|
||||
use crate::collect::Collect;
|
||||
use crate::firsterr::non_empty;
|
||||
use crate::firsterr::only_first_err;
|
||||
use crate::json_stream::events_stream_to_json_stream;
|
||||
use crate::json_stream::JsonStream;
|
||||
use crate::plaineventsstream::dyn_events_stream;
|
||||
use crate::tcprawclient::OpenBoxedBytesStreamsBox;
|
||||
use err::Error;
|
||||
@@ -51,3 +55,17 @@ pub async fn plain_events_json(
|
||||
info!("plain_events_json json serialized");
|
||||
Ok(jsval)
|
||||
}
|
||||
|
||||
pub async fn plain_events_json_stream(
|
||||
evq: &PlainEventsQuery,
|
||||
ch_conf: ChannelTypeConfigGen,
|
||||
ctx: &ReqCtx,
|
||||
open_bytes: OpenBoxedBytesStreamsBox,
|
||||
) -> Result<JsonStream, Error> {
|
||||
trace!("build stream");
|
||||
let stream = dyn_events_stream(evq, ch_conf, ctx, open_bytes).await?;
|
||||
let stream = events_stream_to_json_stream(stream);
|
||||
let stream = non_empty(stream);
|
||||
let stream = only_first_err(stream);
|
||||
Ok(Box::pin(stream))
|
||||
}
|
||||
|
||||
@@ -26,6 +26,7 @@ pub async fn dyn_events_stream(
|
||||
ctx: &ReqCtx,
|
||||
open_bytes: OpenBoxedBytesStreamsBox,
|
||||
) -> Result<DynEventsStream, Error> {
|
||||
trace!("dyn_events_stream begin");
|
||||
let subq = make_sub_query(
|
||||
ch_conf,
|
||||
evq.range().clone(),
|
||||
@@ -78,6 +79,21 @@ pub async fn dyn_events_stream(
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(not(wasm_transform))]
|
||||
async fn transform_wasm<INP>(
|
||||
stream: INP,
|
||||
_wasmname: &str,
|
||||
_ctx: &ReqCtx,
|
||||
) -> Result<impl Stream<Item = Result<StreamItem<RangeCompletableItem<Box<dyn Events>>>, Error>> + Send, Error>
|
||||
where
|
||||
INP: Stream<Item = Result<StreamItem<RangeCompletableItem<Box<dyn Events>>>, Error>> + Send + 'static,
|
||||
{
|
||||
let ret: Pin<Box<dyn Stream<Item = Sitemty<Box<dyn Events>>> + Send>> = Box::pin(stream);
|
||||
Ok(ret)
|
||||
}
|
||||
|
||||
#[cfg(DISABLED)]
|
||||
#[cfg(wasm_transform)]
|
||||
async fn transform_wasm<INP>(
|
||||
stream: INP,
|
||||
wasmname: &str,
|
||||
|
||||
@@ -2,7 +2,7 @@ use crate::cbor::FramedBytesToSitemtyDynEventsStream;
|
||||
use crate::firsterr::only_first_err;
|
||||
use crate::frames::inmem::BoxedBytesStream;
|
||||
use crate::lenframed;
|
||||
use crate::plaineventscbor::plain_events_cbor;
|
||||
use crate::plaineventscbor::plain_events_cbor_stream;
|
||||
use crate::tcprawclient::OpenBoxedBytesStreams;
|
||||
use crate::tcprawclient::TEST_BACKEND;
|
||||
use err::Error;
|
||||
@@ -39,7 +39,7 @@ async fn merged_events_inner() -> Result<(), Error> {
|
||||
let evq = PlainEventsQuery::new(channel, range);
|
||||
let open_bytes = StreamOpener::new();
|
||||
let open_bytes = Box::pin(open_bytes);
|
||||
let stream = plain_events_cbor(&evq, ch_conf.clone().into(), &ctx, open_bytes)
|
||||
let stream = plain_events_cbor_stream(&evq, ch_conf.clone().into(), &ctx, open_bytes)
|
||||
.await
|
||||
.unwrap();
|
||||
let stream = lenframed::length_framed(stream);
|
||||
|
||||
@@ -77,6 +77,8 @@ async fn timebinnable_stream(
|
||||
})
|
||||
});
|
||||
|
||||
#[cfg(DISABLED)]
|
||||
#[cfg(wasm_transform)]
|
||||
let stream = if let Some(wasmname) = wasm1 {
|
||||
debug!("make wasm transform");
|
||||
use httpclient::url::Url;
|
||||
|
||||
Reference in New Issue
Block a user