diff --git a/apidoc/src/events.md b/apidoc/src/events.md index e6303d2..e45a878 100644 --- a/apidoc/src/events.md +++ b/apidoc/src/events.md @@ -48,6 +48,9 @@ where each `[JSON-frame]` looks like: [padding: P zero-bytes, 0 <= P <= 7, such that (N + P) mod 8 = 0] ``` +Note: "data" objects are currently identified by the presence of the `tss` key. +There can be other types of objects, like keepalive, log or statistics. + ## Events as framed CBOR stream @@ -81,6 +84,5 @@ Most returned CBOR objects are data objects and look like this in equivalent jso ``` where `tss` is the array of timestamps and `values` the corresponding array of values. -Note: "data" CBOR objects are currently identified by the presence of the `tss` key. There can be -other types of CBOR objects, like log or statistics. -The next update will add a type-tag to discriminate them, but for now, look for the key `tss`. +Note: "data" objects are currently identified by the presence of the `tss` key. +There can be other types of objects, like keepalive, log or statistics. diff --git a/crates/items_2/src/eventsdim0.rs b/crates/items_2/src/eventsdim0.rs index e3d864d..93743c7 100644 --- a/crates/items_2/src/eventsdim0.rs +++ b/crates/items_2/src/eventsdim0.rs @@ -966,17 +966,31 @@ impl Events for EventsDim0 { } fn to_json_vec_u8(&self) -> Vec { - let ret = EventsDim0ChunkOutput { - // TODO use &mut to swap the content - tss: self.tss.clone(), - pulses: self.pulses.clone(), - values: self.values.clone(), - scalar_type: STY::scalar_type_name().into(), + // TODO redesign with mut access, rename to `into_` and take the values out. + let mut tss = self.tss.clone(); + let mut pulses = self.pulses.clone(); + let mut values = self.values.clone(); + let tss_sl = tss.make_contiguous(); + let pulses_sl = pulses.make_contiguous(); + let (ts_anchor_sec, ts_off_ms, ts_off_ns) = crate::ts_offs_from_abs(tss_sl); + let (pulse_anchor, pulse_off) = crate::pulse_offs_from_abs(pulses_sl); + let values = mem::replace(&mut values, VecDeque::new()); + let ret = EventsDim0CollectorOutput { + ts_anchor_sec, + ts_off_ms, + ts_off_ns, + pulse_anchor, + pulse_off, + values, + range_final: false, + timed_out: false, + continue_at: None, }; serde_json::to_vec(&ret).unwrap() } fn to_cbor_vec_u8(&self) -> Vec { + // TODO redesign with mut access, rename to `into_` and take the values out. let ret = EventsDim0ChunkOutput { // TODO use &mut to swap the content tss: self.tss.clone(), diff --git a/crates/streams/Cargo.toml b/crates/streams/Cargo.toml index 8d98fd5..89fa60b 100644 --- a/crates/streams/Cargo.toml +++ b/crates/streams/Cargo.toml @@ -8,6 +8,7 @@ edition = "2021" tokio = { version = "1.34", features = ["io-util", "net", "time", "sync", "fs"] } futures-util = "0.3.15" pin-project = "1.0.12" +tokio-stream = "0.1" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" serde_cbor = "0.11.1" diff --git a/crates/streams/src/cbor_stream.rs b/crates/streams/src/cbor_stream.rs index 9301c75..03b0aa2 100644 --- a/crates/streams/src/cbor_stream.rs +++ b/crates/streams/src/cbor_stream.rs @@ -22,6 +22,7 @@ use std::io::Cursor; use std::pin::Pin; use std::task::Context; use std::task::Poll; +use std::time::Duration; const FRAME_HEAD_LEN: usize = 16; const FRAME_PAYLOAD_MAX: u32 = 1024 * 1024 * 80; @@ -70,7 +71,20 @@ pub type SitemtyDynEventsStream = Pin>>, Error>> + Send>>; pub fn events_stream_to_cbor_stream(stream: SitemtyDynEventsStream) -> impl Stream> { - let stream = stream.map(|x| match x { + let interval = tokio::time::interval(Duration::from_millis(4000)); + let stream = tokio_stream::StreamExt::timeout_repeating(stream, interval).map(|x| match x { + Ok(x) => map_events(x), + Err(_) => make_keepalive(), + }); + let prepend = { + let item = make_keepalive(); + futures_util::stream::iter([item]) + }; + prepend.chain(stream) +} + +fn map_events(x: Result>>, Error>) -> Result { + match x { Ok(x) => match x { StreamItem::DataItem(x) => match x { RangeCompletableItem::Data(evs) => { @@ -130,8 +144,20 @@ pub fn events_stream_to_cbor_stream(stream: SitemtyDynEventsStream) -> impl Stre let item = CborBytes(bytes); Ok(item) } - }); - stream + } +} + +fn make_keepalive() -> Result { + use ciborium::cbor; + let item = cbor!({ + "type" => "keepalive", + }) + .map_err(Error::from_string)?; + let mut buf = Vec::with_capacity(64); + ciborium::into_writer(&item, &mut buf).map_err(Error::from_string)?; + let bytes = Bytes::from(buf); + let item = Ok(CborBytes(bytes)); + item } pub struct FramedBytesToSitemtyDynEventsStream { diff --git a/crates/streams/src/json_stream.rs b/crates/streams/src/json_stream.rs index c4d2d51..e1b7989 100644 --- a/crates/streams/src/json_stream.rs +++ b/crates/streams/src/json_stream.rs @@ -5,9 +5,11 @@ use futures_util::Stream; use futures_util::StreamExt; use items_0::streamitem::RangeCompletableItem; use items_0::streamitem::StreamItem; +use items_0::Events; use items_0::WithLen; use netpod::log::*; use std::pin::Pin; +use std::time::Duration; pub struct JsonBytes(Bytes); @@ -36,7 +38,20 @@ impl From for Bytes { pub type JsonStream = Pin> + Send>>; pub fn events_stream_to_json_stream(stream: SitemtyDynEventsStream) -> impl Stream> { - let stream = stream.map(|x| match x { + let interval = tokio::time::interval(Duration::from_millis(4000)); + let stream = tokio_stream::StreamExt::timeout_repeating(stream, interval).map(|x| match x { + Ok(x) => map_events(x), + Err(_) => make_keepalive(), + }); + let prepend = { + let item = make_keepalive(); + futures_util::stream::iter([item]) + }; + prepend.chain(stream) +} + +fn map_events(x: Result>>, Error>) -> Result { + match x { Ok(x) => match x { StreamItem::DataItem(x) => match x { RangeCompletableItem::Data(evs) => { @@ -75,6 +90,15 @@ pub fn events_stream_to_json_stream(stream: SitemtyDynEventsStream) -> impl Stre let item = JsonBytes(bytes); Ok(item) } - }); - stream + } +} + +fn make_keepalive() -> Result { + let item = serde_json::json!({ + "type": "keepalive", + }); + let buf = serde_json::to_vec(&item).unwrap(); + let bytes = Bytes::from(buf); + let item = Ok(JsonBytes(bytes)); + item }