Add framed json stream
This commit is contained in:
@@ -48,6 +48,9 @@ where each `[JSON-frame]` looks like:
|
|||||||
[padding: P zero-bytes, 0 <= P <= 7, such that (N + P) mod 8 = 0]
|
[padding: P zero-bytes, 0 <= P <= 7, such that (N + P) mod 8 = 0]
|
||||||
```
|
```
|
||||||
|
|
||||||
|
Note: "data" objects are currently identified by the presence of the `tss` key.
|
||||||
|
There can be other types of objects, like keepalive, log or statistics.
|
||||||
|
|
||||||
|
|
||||||
## Events as framed CBOR stream
|
## Events as framed CBOR stream
|
||||||
|
|
||||||
@@ -81,6 +84,5 @@ Most returned CBOR objects are data objects and look like this in equivalent jso
|
|||||||
```
|
```
|
||||||
where `tss` is the array of timestamps and `values` the corresponding array of values.
|
where `tss` is the array of timestamps and `values` the corresponding array of values.
|
||||||
|
|
||||||
Note: "data" CBOR objects are currently identified by the presence of the `tss` key. There can be
|
Note: "data" objects are currently identified by the presence of the `tss` key.
|
||||||
other types of CBOR objects, like log or statistics.
|
There can be other types of objects, like keepalive, log or statistics.
|
||||||
The next update will add a type-tag to discriminate them, but for now, look for the key `tss`.
|
|
||||||
|
|||||||
@@ -966,17 +966,31 @@ impl<STY: ScalarOps> Events for EventsDim0<STY> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn to_json_vec_u8(&self) -> Vec<u8> {
|
fn to_json_vec_u8(&self) -> Vec<u8> {
|
||||||
let ret = EventsDim0ChunkOutput {
|
// TODO redesign with mut access, rename to `into_` and take the values out.
|
||||||
// TODO use &mut to swap the content
|
let mut tss = self.tss.clone();
|
||||||
tss: self.tss.clone(),
|
let mut pulses = self.pulses.clone();
|
||||||
pulses: self.pulses.clone(),
|
let mut values = self.values.clone();
|
||||||
values: self.values.clone(),
|
let tss_sl = tss.make_contiguous();
|
||||||
scalar_type: STY::scalar_type_name().into(),
|
let pulses_sl = pulses.make_contiguous();
|
||||||
|
let (ts_anchor_sec, ts_off_ms, ts_off_ns) = crate::ts_offs_from_abs(tss_sl);
|
||||||
|
let (pulse_anchor, pulse_off) = crate::pulse_offs_from_abs(pulses_sl);
|
||||||
|
let values = mem::replace(&mut values, VecDeque::new());
|
||||||
|
let ret = EventsDim0CollectorOutput {
|
||||||
|
ts_anchor_sec,
|
||||||
|
ts_off_ms,
|
||||||
|
ts_off_ns,
|
||||||
|
pulse_anchor,
|
||||||
|
pulse_off,
|
||||||
|
values,
|
||||||
|
range_final: false,
|
||||||
|
timed_out: false,
|
||||||
|
continue_at: None,
|
||||||
};
|
};
|
||||||
serde_json::to_vec(&ret).unwrap()
|
serde_json::to_vec(&ret).unwrap()
|
||||||
}
|
}
|
||||||
|
|
||||||
fn to_cbor_vec_u8(&self) -> Vec<u8> {
|
fn to_cbor_vec_u8(&self) -> Vec<u8> {
|
||||||
|
// TODO redesign with mut access, rename to `into_` and take the values out.
|
||||||
let ret = EventsDim0ChunkOutput {
|
let ret = EventsDim0ChunkOutput {
|
||||||
// TODO use &mut to swap the content
|
// TODO use &mut to swap the content
|
||||||
tss: self.tss.clone(),
|
tss: self.tss.clone(),
|
||||||
|
|||||||
@@ -8,6 +8,7 @@ edition = "2021"
|
|||||||
tokio = { version = "1.34", features = ["io-util", "net", "time", "sync", "fs"] }
|
tokio = { version = "1.34", features = ["io-util", "net", "time", "sync", "fs"] }
|
||||||
futures-util = "0.3.15"
|
futures-util = "0.3.15"
|
||||||
pin-project = "1.0.12"
|
pin-project = "1.0.12"
|
||||||
|
tokio-stream = "0.1"
|
||||||
serde = { version = "1.0", features = ["derive"] }
|
serde = { version = "1.0", features = ["derive"] }
|
||||||
serde_json = "1.0"
|
serde_json = "1.0"
|
||||||
serde_cbor = "0.11.1"
|
serde_cbor = "0.11.1"
|
||||||
|
|||||||
@@ -22,6 +22,7 @@ use std::io::Cursor;
|
|||||||
use std::pin::Pin;
|
use std::pin::Pin;
|
||||||
use std::task::Context;
|
use std::task::Context;
|
||||||
use std::task::Poll;
|
use std::task::Poll;
|
||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
const FRAME_HEAD_LEN: usize = 16;
|
const FRAME_HEAD_LEN: usize = 16;
|
||||||
const FRAME_PAYLOAD_MAX: u32 = 1024 * 1024 * 80;
|
const FRAME_PAYLOAD_MAX: u32 = 1024 * 1024 * 80;
|
||||||
@@ -70,7 +71,20 @@ pub type SitemtyDynEventsStream =
|
|||||||
Pin<Box<dyn Stream<Item = Result<StreamItem<RangeCompletableItem<Box<dyn Events>>>, Error>> + Send>>;
|
Pin<Box<dyn Stream<Item = Result<StreamItem<RangeCompletableItem<Box<dyn Events>>>, Error>> + Send>>;
|
||||||
|
|
||||||
pub fn events_stream_to_cbor_stream(stream: SitemtyDynEventsStream) -> impl Stream<Item = Result<CborBytes, Error>> {
|
pub fn events_stream_to_cbor_stream(stream: SitemtyDynEventsStream) -> impl Stream<Item = Result<CborBytes, Error>> {
|
||||||
let stream = stream.map(|x| match x {
|
let interval = tokio::time::interval(Duration::from_millis(4000));
|
||||||
|
let stream = tokio_stream::StreamExt::timeout_repeating(stream, interval).map(|x| match x {
|
||||||
|
Ok(x) => map_events(x),
|
||||||
|
Err(_) => make_keepalive(),
|
||||||
|
});
|
||||||
|
let prepend = {
|
||||||
|
let item = make_keepalive();
|
||||||
|
futures_util::stream::iter([item])
|
||||||
|
};
|
||||||
|
prepend.chain(stream)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn map_events(x: Result<StreamItem<RangeCompletableItem<Box<dyn Events>>>, Error>) -> Result<CborBytes, Error> {
|
||||||
|
match x {
|
||||||
Ok(x) => match x {
|
Ok(x) => match x {
|
||||||
StreamItem::DataItem(x) => match x {
|
StreamItem::DataItem(x) => match x {
|
||||||
RangeCompletableItem::Data(evs) => {
|
RangeCompletableItem::Data(evs) => {
|
||||||
@@ -130,8 +144,20 @@ pub fn events_stream_to_cbor_stream(stream: SitemtyDynEventsStream) -> impl Stre
|
|||||||
let item = CborBytes(bytes);
|
let item = CborBytes(bytes);
|
||||||
Ok(item)
|
Ok(item)
|
||||||
}
|
}
|
||||||
});
|
}
|
||||||
stream
|
}
|
||||||
|
|
||||||
|
fn make_keepalive() -> Result<CborBytes, Error> {
|
||||||
|
use ciborium::cbor;
|
||||||
|
let item = cbor!({
|
||||||
|
"type" => "keepalive",
|
||||||
|
})
|
||||||
|
.map_err(Error::from_string)?;
|
||||||
|
let mut buf = Vec::with_capacity(64);
|
||||||
|
ciborium::into_writer(&item, &mut buf).map_err(Error::from_string)?;
|
||||||
|
let bytes = Bytes::from(buf);
|
||||||
|
let item = Ok(CborBytes(bytes));
|
||||||
|
item
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct FramedBytesToSitemtyDynEventsStream<S> {
|
pub struct FramedBytesToSitemtyDynEventsStream<S> {
|
||||||
|
|||||||
@@ -5,9 +5,11 @@ use futures_util::Stream;
|
|||||||
use futures_util::StreamExt;
|
use futures_util::StreamExt;
|
||||||
use items_0::streamitem::RangeCompletableItem;
|
use items_0::streamitem::RangeCompletableItem;
|
||||||
use items_0::streamitem::StreamItem;
|
use items_0::streamitem::StreamItem;
|
||||||
|
use items_0::Events;
|
||||||
use items_0::WithLen;
|
use items_0::WithLen;
|
||||||
use netpod::log::*;
|
use netpod::log::*;
|
||||||
use std::pin::Pin;
|
use std::pin::Pin;
|
||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
pub struct JsonBytes(Bytes);
|
pub struct JsonBytes(Bytes);
|
||||||
|
|
||||||
@@ -36,7 +38,20 @@ impl From<JsonBytes> for Bytes {
|
|||||||
pub type JsonStream = Pin<Box<dyn Stream<Item = Result<JsonBytes, Error>> + Send>>;
|
pub type JsonStream = Pin<Box<dyn Stream<Item = Result<JsonBytes, Error>> + Send>>;
|
||||||
|
|
||||||
pub fn events_stream_to_json_stream(stream: SitemtyDynEventsStream) -> impl Stream<Item = Result<JsonBytes, Error>> {
|
pub fn events_stream_to_json_stream(stream: SitemtyDynEventsStream) -> impl Stream<Item = Result<JsonBytes, Error>> {
|
||||||
let stream = stream.map(|x| match x {
|
let interval = tokio::time::interval(Duration::from_millis(4000));
|
||||||
|
let stream = tokio_stream::StreamExt::timeout_repeating(stream, interval).map(|x| match x {
|
||||||
|
Ok(x) => map_events(x),
|
||||||
|
Err(_) => make_keepalive(),
|
||||||
|
});
|
||||||
|
let prepend = {
|
||||||
|
let item = make_keepalive();
|
||||||
|
futures_util::stream::iter([item])
|
||||||
|
};
|
||||||
|
prepend.chain(stream)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn map_events(x: Result<StreamItem<RangeCompletableItem<Box<dyn Events>>>, Error>) -> Result<JsonBytes, Error> {
|
||||||
|
match x {
|
||||||
Ok(x) => match x {
|
Ok(x) => match x {
|
||||||
StreamItem::DataItem(x) => match x {
|
StreamItem::DataItem(x) => match x {
|
||||||
RangeCompletableItem::Data(evs) => {
|
RangeCompletableItem::Data(evs) => {
|
||||||
@@ -75,6 +90,15 @@ pub fn events_stream_to_json_stream(stream: SitemtyDynEventsStream) -> impl Stre
|
|||||||
let item = JsonBytes(bytes);
|
let item = JsonBytes(bytes);
|
||||||
Ok(item)
|
Ok(item)
|
||||||
}
|
}
|
||||||
});
|
}
|
||||||
stream
|
}
|
||||||
|
|
||||||
|
fn make_keepalive() -> Result<JsonBytes, Error> {
|
||||||
|
let item = serde_json::json!({
|
||||||
|
"type": "keepalive",
|
||||||
|
});
|
||||||
|
let buf = serde_json::to_vec(&item).unwrap();
|
||||||
|
let bytes = Bytes::from(buf);
|
||||||
|
let item = Ok(JsonBytes(bytes));
|
||||||
|
item
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user