From 45421415d0d80fe85ade2a66ea615a007983187b Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Tue, 12 Dec 2023 16:23:26 +0100 Subject: [PATCH] CBOR chunked download --- .cargo/cargo-lock | 40 ++- crates/daqbuffer/Cargo.toml | 5 +- crates/daqbuffer/src/bin/daqbuffer.rs | 3 - crates/httpret/src/api4/events.rs | 51 +-- crates/httpret/src/proxy.rs | 18 +- crates/httpret/src/proxy/api4.rs | 1 + crates/httpret/src/proxy/api4/events.rs | 101 ++++++ crates/items_0/Cargo.toml | 2 +- crates/items_0/src/items_0.rs | 5 + crates/items_0/src/scalar_ops.rs | 31 +- crates/items_2/Cargo.toml | 3 +- crates/items_2/src/channelevents.rs | 45 +++ crates/items_2/src/eventsdim0.rs | 23 ++ crates/items_2/src/eventsdim1.rs | 24 ++ crates/items_2/src/eventsxbindim0.rs | 4 + crates/items_2/src/frame.rs | 18 +- crates/netpod/src/netpod.rs | 1 + crates/streams/Cargo.toml | 2 +- crates/streams/src/frames/eventsfromframes.rs | 12 +- crates/streams/src/lib.rs | 1 + crates/streams/src/plaineventscbor.rs | 114 +++++++ crates/streams/src/plaineventsjson.rs | 306 ++++++++++-------- 22 files changed, 611 insertions(+), 199 deletions(-) create mode 100644 crates/httpret/src/proxy/api4/events.rs create mode 100644 crates/streams/src/plaineventscbor.rs diff --git a/.cargo/cargo-lock b/.cargo/cargo-lock index 12eb82e..763bf58 100644 --- a/.cargo/cargo-lock +++ b/.cargo/cargo-lock @@ -393,6 +393,33 @@ dependencies = [ "windows-targets 0.48.5", ] +[[package]] +name = "ciborium" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "effd91f6c78e5a4ace8a5d3c0b6bfaec9e2baaef55f3efc00e45fb2e477ee926" +dependencies = [ + "ciborium-io", + "ciborium-ll", + "serde", +] + +[[package]] +name = "ciborium-io" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cdf919175532b369853f5d5e20b26b43112613fd6fe7aee757e35f7a44642656" + +[[package]] +name = "ciborium-ll" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "defaa24ecc093c77630e6c15e17c51f5e187bf35ee514f4e2d67baaa96dae22b" +dependencies = [ + "ciborium-io", + "half", +] + [[package]] name = "clap" version = "4.4.11" @@ -732,7 +759,7 @@ dependencies = [ [[package]] name = "daqbuffer" -version = "0.5.0-alpha.0" +version = "0.5.0-alpha.1" dependencies = [ "bytes", "chrono", @@ -997,9 +1024,9 @@ checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5" [[package]] name = "erased-serde" -version = "0.3.31" +version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6c138974f9d5e7fe373eb04df7cae98833802ae4b11c24ac7039a21d5af4b26c" +checksum = "a3286168faae03a0e583f6fde17c02c8b8bba2dcc2061d0f7817066e5b0af706" dependencies = [ "serde", ] @@ -1730,6 +1757,7 @@ dependencies = [ "bitshuffle", "bytes", "chrono", + "ciborium", "crc32fast", "erased-serde", "err", @@ -3019,8 +3047,8 @@ dependencies = [ "byteorder", "bytes", "chrono", + "ciborium", "crc32fast", - "erased-serde", "err", "futures-util", "httpclient", @@ -4039,9 +4067,9 @@ checksum = "dff9641d1cd4be8d1a070daf9e3773c5f67e78b4d9d42263020c057706765c04" [[package]] name = "winnow" -version = "0.5.26" +version = "0.5.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b67b5f0a4e7a27a64c651977932b9dc5667ca7fc31ac44b03ed37a0cf42fdfff" +checksum = "6c830786f7720c2fd27a1a0e27a709dbd3c4d009b56d098fc742d4f4eab91fe2" dependencies = [ "memchr", ] diff --git a/crates/daqbuffer/Cargo.toml b/crates/daqbuffer/Cargo.toml index e118e22..c83116e 100644 --- a/crates/daqbuffer/Cargo.toml +++ b/crates/daqbuffer/Cargo.toml @@ -1,13 +1,12 @@ [package] name = "daqbuffer" -version = "0.5.0-alpha.0" +version = "0.5.0-alpha.1" authors = ["Dominik Werder "] edition = "2021" [dependencies] -futures-util = "0.3.14" +futures-util = "0.3.29" bytes = "1.5.0" -#dashmap = "3" serde = "1.0" serde_derive = "1.0" serde_json = "1.0" diff --git a/crates/daqbuffer/src/bin/daqbuffer.rs b/crates/daqbuffer/src/bin/daqbuffer.rs index 3ba65da..ec459e0 100644 --- a/crates/daqbuffer/src/bin/daqbuffer.rs +++ b/crates/daqbuffer/src/bin/daqbuffer.rs @@ -5,12 +5,9 @@ use clap::Parser; use daqbuffer::cli::ClientType; use daqbuffer::cli::Opts; use daqbuffer::cli::SubCmd; -use disk::AggQuerySingleChannel; -use disk::SfDbChConf; use err::Error; use netpod::log::*; use netpod::query::CacheUsage; -use netpod::DtNano; use netpod::NodeConfig; use netpod::NodeConfigCached; use netpod::ProxyConfig; diff --git a/crates/httpret/src/api4/events.rs b/crates/httpret/src/api4/events.rs index 52a9780..109f73d 100644 --- a/crates/httpret/src/api4/events.rs +++ b/crates/httpret/src/api4/events.rs @@ -3,8 +3,12 @@ use crate::err::Error; use crate::response; use crate::response_err; use crate::ToPublicResponse; +use bytes::Bytes; +use bytes::BytesMut; +use futures_util::future; use futures_util::stream; -use futures_util::TryStreamExt; +use futures_util::StreamExt; +use http::header; use http::Method; use http::StatusCode; use httpclient::body_empty; @@ -19,8 +23,8 @@ use netpod::FromUrl; use netpod::NodeConfigCached; use netpod::ReqCtx; use netpod::ACCEPT_ALL; +use netpod::APP_CBOR; use netpod::APP_JSON; -use netpod::APP_OCTET; use query::api4::events::PlainEventsQuery; use url::Url; @@ -58,32 +62,41 @@ async fn plain_events(req: Requ, ctx: &ReqCtx, node_config: &NodeConfigCached) - let accept_def = APP_JSON; let accept = req .headers() - .get(http::header::ACCEPT) + .get(header::ACCEPT) .map_or(accept_def, |k| k.to_str().unwrap_or(accept_def)); let url = req_uri_to_url(req.uri())?; if accept.contains(APP_JSON) || accept.contains(ACCEPT_ALL) { Ok(plain_events_json(url, req, ctx, node_config).await?) - } else if accept == APP_OCTET { - Ok(plain_events_binary(url, req, ctx, node_config).await?) + } else if accept == APP_CBOR { + Ok(plain_events_cbor(url, req, ctx, node_config).await?) } else { - let ret = response_err(StatusCode::NOT_ACCEPTABLE, format!("Unsupported Accept: {:?}", accept))?; + let ret = response_err(StatusCode::NOT_ACCEPTABLE, format!("unsupported accept: {}", accept))?; Ok(ret) } } -async fn plain_events_binary( - url: Url, - req: Requ, - ctx: &ReqCtx, - node_config: &NodeConfigCached, -) -> Result { - debug!("{:?}", req); - let query = PlainEventsQuery::from_url(&url).map_err(|e| e.add_public_msg(format!("Can not understand query")))?; - let ch_conf = chconf_from_events_quorum(&query, ctx, node_config).await?; - info!("plain_events_binary chconf_from_events_quorum: {ch_conf:?}"); - let s = stream::iter([Ok::<_, Error>(String::from("TODO_PREBINNED_BINARY_STREAM"))]); - let s = s.map_err(Error::from); - let ret = response(StatusCode::OK).body(body_stream(s))?; +async fn plain_events_cbor(url: Url, req: Requ, ctx: &ReqCtx, ncc: &NodeConfigCached) -> Result { + let evq = PlainEventsQuery::from_url(&url).map_err(|e| e.add_public_msg(format!("Can not understand query")))?; + let ch_conf = chconf_from_events_quorum(&evq, ctx, ncc) + .await? + .ok_or_else(|| Error::with_msg_no_trace("channel not found"))?; + info!("plain_events_cbor chconf_from_events_quorum: {ch_conf:?} {req:?}"); + let stream = streams::plaineventscbor::plain_events_cbor(&evq, ch_conf, ctx, ncc).await?; + use future::ready; + let stream = stream + .flat_map(|x| match x { + Ok(y) => { + use bytes::BufMut; + let buf = y.into_inner(); + let mut b2 = BytesMut::with_capacity(8); + b2.put_u32_le(buf.len() as u32); + stream::iter([Ok::<_, Error>(b2.freeze()), Ok(buf)]) + } + // TODO handle other cases + _ => stream::iter([Ok(Bytes::new()), Ok(Bytes::new())]), + }) + .filter(|x| if let Ok(x) = x { ready(x.len() > 0) } else { ready(true) }); + let ret = response(StatusCode::OK).body(body_stream(stream))?; Ok(ret) } diff --git a/crates/httpret/src/proxy.rs b/crates/httpret/src/proxy.rs index 1550f60..d76fdaa 100644 --- a/crates/httpret/src/proxy.rs +++ b/crates/httpret/src/proxy.rs @@ -190,8 +190,8 @@ async fn proxy_http_service_inner( Ok(backends(req, proxy_config).await?) } else if let Some(h) = api4::ChannelSearchAggHandler::handler(&req) { h.handle(req, ctx, &proxy_config).await - } else if path == "/api/4/events" { - Ok(proxy_single_backend_query::(req, ctx, proxy_config).await?) + } else if let Some(h) = api4::events::EventsHandler::handler(&req) { + h.handle(req, ctx, &proxy_config).await } else if path == "/api/4/status/connection/events" { Ok(proxy_single_backend_query::(req, ctx, proxy_config).await?) } else if path == "/api/4/status/channel/events" { @@ -550,7 +550,7 @@ where } Err(e) => Err(Error::with_msg(format!("parse error for: {:?} {:?}", sh, e))), }) - .fold_ok(vec![], |mut a, x| { + .fold_ok(Vec::new(), |mut a, x| { a.push(x); a })?; @@ -631,3 +631,15 @@ fn get_query_host_for_backend(backend: &str, proxy_config: &ProxyConfig) -> Resu } return Err(Error::with_msg(format!("host not found for backend {:?}", backend))); } + +fn get_random_query_host_for_backend(backend: &str, proxy_config: &ProxyConfig) -> Result { + let url = get_query_host_for_backend(backend, proxy_config)?; + // TODO remove special code, make it part of configuration + if url.contains("sf-daqbuf-23.psi.ch") { + let id = 21 + rand::random::() % 13; + let url = url.replace("-23.", &format!("-{id}.")); + Ok(url) + } else { + Ok(url) + } +} diff --git a/crates/httpret/src/proxy/api4.rs b/crates/httpret/src/proxy/api4.rs index 5bb1c7c..5619ad9 100644 --- a/crates/httpret/src/proxy/api4.rs +++ b/crates/httpret/src/proxy/api4.rs @@ -1,4 +1,5 @@ pub mod caioclookup; +pub mod events; use crate::bodystream::ToPublicResponse; use crate::err::Error; diff --git a/crates/httpret/src/proxy/api4/events.rs b/crates/httpret/src/proxy/api4/events.rs new file mode 100644 index 0000000..801d3cf --- /dev/null +++ b/crates/httpret/src/proxy/api4/events.rs @@ -0,0 +1,101 @@ +use crate::bodystream::response; +use crate::err::Error; +use crate::proxy::get_query_host_for_backend; +use crate::ReqCtx; +use http::header; +use http::HeaderValue; +use http::Method; +use http::Request; +use http::StatusCode; +use http::Uri; +use httpclient::body_bytes; +use httpclient::body_empty; +use httpclient::body_stream; +use httpclient::connect_client; +use httpclient::read_body_bytes; +use httpclient::Requ; +use httpclient::StreamIncoming; +use httpclient::StreamResponse; +use netpod::get_url_query_pairs; +use netpod::log::*; +use netpod::query::api1::Api1Query; +use netpod::req_uri_to_url; +use netpod::FromUrl; +use netpod::HasBackend; +use netpod::ProxyConfig; +use netpod::ACCEPT_ALL; +use netpod::APP_CBOR; +use netpod::APP_JSON; +use netpod::X_DAQBUF_REQID; +use query::api4::events::PlainEventsQuery; + +pub struct EventsHandler {} + +impl EventsHandler { + pub fn path() -> &'static str { + "/api/4/events" + } + + pub fn handler(req: &Requ) -> Option { + if req.uri().path() == Self::path() { + Some(Self {}) + } else { + None + } + } + + pub async fn handle(&self, req: Requ, ctx: &ReqCtx, proxy_config: &ProxyConfig) -> Result { + if req.method() != Method::GET { + return Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(body_empty())?); + } + + let def = HeaderValue::from_static("*/*"); + let accept = req.headers().get(header::ACCEPT).unwrap_or(&def); + let accept = accept.to_str()?; + + if accept.contains(APP_CBOR) { + self.handle_cbor(req, ctx, proxy_config).await + } else if accept.contains(APP_JSON) { + return Ok(crate::proxy::proxy_single_backend_query::(req, ctx, proxy_config).await?); + } else if accept.contains(ACCEPT_ALL) { + todo!() + } else { + return Err(Error::with_msg_no_trace(format!("bad accept {:?}", req.headers()))); + } + } + + async fn handle_cbor(&self, req: Requ, ctx: &ReqCtx, proxy_config: &ProxyConfig) -> Result { + let (head, _body) = req.into_parts(); + let url = req_uri_to_url(&head.uri)?; + let pairs = get_url_query_pairs(&url); + let evq = PlainEventsQuery::from_pairs(&pairs)?; + info!("{evq:?}"); + // Ok(response(StatusCode::INTERNAL_SERVER_ERROR).body(body_empty())?) + + // get_random_query_host_for_backend + let query_host = get_query_host_for_backend(evq.backend(), proxy_config)?; + + // TODO this ignores the fragment + let url_str = format!("{}{}", query_host, head.uri.path_and_query().unwrap()); + info!("try to contact {url_str}"); + + let uri: Uri = url_str.parse()?; + let req = Request::builder() + .method(Method::GET) + .header(header::HOST, uri.host().unwrap()) + .header(header::ACCEPT, APP_CBOR) + .header(ctx.header_name(), ctx.header_value()) + .uri(&uri) + .body(body_empty())?; + let mut client = connect_client(&uri).await?; + let res = client.send_request(req).await?; + let (head, body) = res.into_parts(); + if head.status != StatusCode::OK { + error!("backend returned error: {head:?}"); + Ok(response(StatusCode::INTERNAL_SERVER_ERROR).body(body_empty())?) + } else { + info!("backend returned OK"); + Ok(response(StatusCode::OK).body(body_stream(StreamIncoming::new(body)))?) + } + } +} diff --git a/crates/items_0/Cargo.toml b/crates/items_0/Cargo.toml index 4384537..88f4bda 100644 --- a/crates/items_0/Cargo.toml +++ b/crates/items_0/Cargo.toml @@ -9,7 +9,7 @@ path = "src/items_0.rs" [dependencies] serde = { version = "1.0", features = ["derive"] } -erased-serde = "0.3" +erased-serde = "0.4" serde_json = "1.0" bincode = "1.3.3" bytes = "1.2.1" diff --git a/crates/items_0/src/items_0.rs b/crates/items_0/src/items_0.rs index a17598a..ed552cb 100644 --- a/crates/items_0/src/items_0.rs +++ b/crates/items_0/src/items_0.rs @@ -143,6 +143,7 @@ pub trait Events: fn pulses(&self) -> &VecDeque; fn frame_type_id(&self) -> u32; fn to_min_max_avg(&mut self) -> Box; + fn to_cbor_vec_u8(&self) -> Vec; } impl WithLen for Box { @@ -261,4 +262,8 @@ impl Events for Box { fn to_min_max_avg(&mut self) -> Box { Events::to_min_max_avg(self.as_mut()) } + + fn to_cbor_vec_u8(&self) -> Vec { + Events::to_cbor_vec_u8(self.as_ref()) + } } diff --git a/crates/items_0/src/scalar_ops.rs b/crates/items_0/src/scalar_ops.rs index 50e2db2..b005ff6 100644 --- a/crates/items_0/src/scalar_ops.rs +++ b/crates/items_0/src/scalar_ops.rs @@ -63,6 +63,7 @@ impl AsPrimF32 for String { pub trait ScalarOps: fmt::Debug + Clone + PartialOrd + PartialEq + SubFrId + AsPrimF32 + Serialize + Unpin + Send + 'static { + fn scalar_type_name() -> &'static str; fn zero_b() -> Self; fn equal_slack(&self, rhs: &Self) -> bool; fn add(&mut self, rhs: &Self); @@ -73,8 +74,12 @@ pub trait ScalarOps: } macro_rules! impl_scalar_ops { - ($ty:ident, $zero:expr, $equal_slack:ident, $mac_add:ident, $mac_div:ident) => { + ($ty:ident, $zero:expr, $equal_slack:ident, $mac_add:ident, $mac_div:ident, $sty_name:expr) => { impl ScalarOps for $ty { + fn scalar_type_name() -> &'static str { + $sty_name + } + fn zero_b() -> Self { $zero } @@ -200,15 +205,15 @@ macro_rules! div_string { }; } -impl_scalar_ops!(u8, 0, equal_int, add_int, div_int); -impl_scalar_ops!(u16, 0, equal_int, add_int, div_int); -impl_scalar_ops!(u32, 0, equal_int, add_int, div_int); -impl_scalar_ops!(u64, 0, equal_int, add_int, div_int); -impl_scalar_ops!(i8, 0, equal_int, add_int, div_int); -impl_scalar_ops!(i16, 0, equal_int, add_int, div_int); -impl_scalar_ops!(i32, 0, equal_int, add_int, div_int); -impl_scalar_ops!(i64, 0, equal_int, add_int, div_int); -impl_scalar_ops!(f32, 0., equal_f32, add_int, div_int); -impl_scalar_ops!(f64, 0., equal_f64, add_int, div_int); -impl_scalar_ops!(bool, false, equal_bool, add_bool, div_bool); -impl_scalar_ops!(String, String::new(), equal_string, add_string, div_string); +impl_scalar_ops!(u8, 0, equal_int, add_int, div_int, "u8"); +impl_scalar_ops!(u16, 0, equal_int, add_int, div_int, "u16"); +impl_scalar_ops!(u32, 0, equal_int, add_int, div_int, "u32"); +impl_scalar_ops!(u64, 0, equal_int, add_int, div_int, "u64"); +impl_scalar_ops!(i8, 0, equal_int, add_int, div_int, "i8"); +impl_scalar_ops!(i16, 0, equal_int, add_int, div_int, "i16"); +impl_scalar_ops!(i32, 0, equal_int, add_int, div_int, "i32"); +impl_scalar_ops!(i64, 0, equal_int, add_int, div_int, "i64"); +impl_scalar_ops!(f32, 0., equal_f32, add_int, div_int, "f32"); +impl_scalar_ops!(f64, 0., equal_f64, add_int, div_int, "f64"); +impl_scalar_ops!(bool, false, equal_bool, add_bool, div_bool, "bool"); +impl_scalar_ops!(String, String::new(), equal_string, add_string, div_string, "string"); diff --git a/crates/items_2/Cargo.toml b/crates/items_2/Cargo.toml index 4dbd349..2ad7425 100644 --- a/crates/items_2/Cargo.toml +++ b/crates/items_2/Cargo.toml @@ -12,9 +12,10 @@ doctest = false serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" serde_cbor = "0.11.2" +ciborium = "0.2.1" rmp-serde = "1.1.1" postcard = { version = "1.0.0", features = ["use-std"] } -erased-serde = "0.3" +erased-serde = "0.4" bytes = "1.2.1" num-traits = "0.2.15" chrono = { version = "0.4.19", features = ["serde"] } diff --git a/crates/items_2/src/channelevents.rs b/crates/items_2/src/channelevents.rs index c2850ac..57e6780 100644 --- a/crates/items_2/src/channelevents.rs +++ b/crates/items_2/src/channelevents.rs @@ -319,6 +319,41 @@ mod serde_channel_events { } } else if cty == EventsDim1::::serde_id() { match nty { + u8::SUB => { + let obj: EventsDim1 = + seq.next_element()?.ok_or_else(|| de::Error::missing_field("[2] obj"))?; + Ok(EvBox(Box::new(obj))) + } + u16::SUB => { + let obj: EventsDim1 = + seq.next_element()?.ok_or_else(|| de::Error::missing_field("[2] obj"))?; + Ok(EvBox(Box::new(obj))) + } + u32::SUB => { + let obj: EventsDim1 = + seq.next_element()?.ok_or_else(|| de::Error::missing_field("[2] obj"))?; + Ok(EvBox(Box::new(obj))) + } + u64::SUB => { + let obj: EventsDim1 = + seq.next_element()?.ok_or_else(|| de::Error::missing_field("[2] obj"))?; + Ok(EvBox(Box::new(obj))) + } + i8::SUB => { + let obj: EventsDim1 = + seq.next_element()?.ok_or_else(|| de::Error::missing_field("[2] obj"))?; + Ok(EvBox(Box::new(obj))) + } + i16::SUB => { + let obj: EventsDim1 = + seq.next_element()?.ok_or_else(|| de::Error::missing_field("[2] obj"))?; + Ok(EvBox(Box::new(obj))) + } + i32::SUB => { + let obj: EventsDim1 = + seq.next_element()?.ok_or_else(|| de::Error::missing_field("[2] obj"))?; + Ok(EvBox(Box::new(obj))) + } i64::SUB => { let obj: EventsDim1 = seq.next_element()?.ok_or_else(|| de::Error::missing_field("[2] obj"))?; @@ -879,6 +914,16 @@ impl Events for ChannelEvents { ChannelEvents::Status(item) => Box::new(ChannelEvents::Status(item.take())), } } + + fn to_cbor_vec_u8(&self) -> Vec { + match self { + ChannelEvents::Events(item) => item.to_cbor_vec_u8(), + ChannelEvents::Status(item) => { + error!("TODO convert status to cbor"); + Vec::new() + } + } + } } impl Collectable for ChannelEvents { diff --git a/crates/items_2/src/eventsdim0.rs b/crates/items_2/src/eventsdim0.rs index d71617b..6413f4d 100644 --- a/crates/items_2/src/eventsdim0.rs +++ b/crates/items_2/src/eventsdim0.rs @@ -226,6 +226,16 @@ where } } +#[derive(Debug, Serialize, Deserialize)] +pub struct EventsDim0ChunkOutput { + tss: VecDeque, + pulses: VecDeque, + values: VecDeque, + scalar_type: String, +} + +impl EventsDim0ChunkOutput {} + #[derive(Debug)] pub struct EventsDim0Collector { vals: EventsDim0, @@ -925,6 +935,19 @@ impl Events for EventsDim0 { }; Box::new(dst) } + + fn to_cbor_vec_u8(&self) -> Vec { + let ret = EventsDim0ChunkOutput { + // TODO use &mut to swap the content + tss: self.tss.clone(), + pulses: self.pulses.clone(), + values: self.values.clone(), + scalar_type: STY::scalar_type_name().into(), + }; + let mut buf = Vec::new(); + ciborium::into_writer(&ret, &mut buf).unwrap(); + buf + } } #[derive(Debug)] diff --git a/crates/items_2/src/eventsdim1.rs b/crates/items_2/src/eventsdim1.rs index 3a548ec..b173def 100644 --- a/crates/items_2/src/eventsdim1.rs +++ b/crates/items_2/src/eventsdim1.rs @@ -1,6 +1,7 @@ use crate::binsdim0::BinsDim0; use crate::eventsxbindim0::EventsXbinDim0; use crate::framable::FrameType; +use crate::ts_offs_from_abs_with_anchor; use crate::IsoDateTime; use crate::RangeOverlapInfo; use crate::TimeBinnableType; @@ -184,6 +185,16 @@ where } } +#[derive(Debug, Serialize, Deserialize)] +pub struct EventsDim1ChunkOutput { + tss: VecDeque, + pulses: VecDeque, + values: VecDeque>, + scalar_type: String, +} + +impl EventsDim1ChunkOutput {} + #[derive(Debug)] pub struct EventsDim1Collector { vals: EventsDim1, @@ -913,6 +924,19 @@ impl Events for EventsDim1 { }; Box::new(item) } + + fn to_cbor_vec_u8(&self) -> Vec { + let ret = EventsDim1ChunkOutput { + // TODO use &mut to swap the content + tss: self.tss.clone(), + pulses: self.pulses.clone(), + values: self.values.clone(), + scalar_type: STY::scalar_type_name().into(), + }; + let mut buf = Vec::new(); + ciborium::into_writer(&ret, &mut buf).unwrap(); + buf + } } #[derive(Debug)] diff --git a/crates/items_2/src/eventsxbindim0.rs b/crates/items_2/src/eventsxbindim0.rs index c3c1a18..d0b6d88 100644 --- a/crates/items_2/src/eventsxbindim0.rs +++ b/crates/items_2/src/eventsxbindim0.rs @@ -369,6 +369,10 @@ impl Events for EventsXbinDim0 { }; Box::new(dst) } + + fn to_cbor_vec_u8(&self) -> Vec { + todo!() + } } #[derive(Debug)] diff --git a/crates/items_2/src/frame.rs b/crates/items_2/src/frame.rs index 03b52a0..5517e67 100644 --- a/crates/items_2/src/frame.rs +++ b/crates/items_2/src/frame.rs @@ -99,10 +99,12 @@ where T: erased_serde::Serialize, { let mut out = Vec::new(); - let mut ser1 = rmp_serde::Serializer::new(&mut out).with_struct_map(); - let mut ser2 = ::erase(&mut ser1); - item.erased_serialize(&mut ser2) - .map_err(|e| Error::from(format!("{e}")))?; + { + let mut ser1 = rmp_serde::Serializer::new(&mut out).with_struct_map(); + let mut ser2 = ::erase(&mut ser1); + item.erased_serialize(&mut ser2) + .map_err(|e| Error::from(format!("{e}")))?; + } Ok(out) } @@ -128,9 +130,11 @@ where let mut ser1 = postcard::Serializer { output: postcard::ser_flavors::AllocVec::new(), }; - let mut ser2 = ::erase(&mut ser1); - item.erased_serialize(&mut ser2) - .map_err(|e| Error::from(format!("{e}")))?; + { + let mut ser2 = ::erase(&mut ser1); + item.erased_serialize(&mut ser2) + } + .map_err(|e| Error::from(format!("{e}")))?; let ret = ser1.output.finalize().map_err(|e| format!("{e}").into()); ret } diff --git a/crates/netpod/src/netpod.rs b/crates/netpod/src/netpod.rs index c471b7b..4de8b8d 100644 --- a/crates/netpod/src/netpod.rs +++ b/crates/netpod/src/netpod.rs @@ -49,6 +49,7 @@ use url::Url; pub const APP_JSON: &str = "application/json"; pub const APP_JSON_LINES: &str = "application/jsonlines"; pub const APP_OCTET: &str = "application/octet-stream"; +pub const APP_CBOR: &str = "application/cbor"; pub const ACCEPT_ALL: &str = "*/*"; pub const X_DAQBUF_REQID: &str = "x-daqbuffer-request-id"; diff --git a/crates/streams/Cargo.toml b/crates/streams/Cargo.toml index 7ab8312..61657cf 100644 --- a/crates/streams/Cargo.toml +++ b/crates/streams/Cargo.toml @@ -11,7 +11,7 @@ pin-project = "1.0.12" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" serde_cbor = "0.11.1" -erased-serde = "0.3.23" +ciborium = "0.2.1" bytes = "1.3" arrayref = "0.3.6" crc32fast = "1.3.2" diff --git a/crates/streams/src/frames/eventsfromframes.rs b/crates/streams/src/frames/eventsfromframes.rs index 8f1e43c..5ac9e16 100644 --- a/crates/streams/src/frames/eventsfromframes.rs +++ b/crates/streams/src/frames/eventsfromframes.rs @@ -2,6 +2,7 @@ use err::Error; use futures_util::Stream; use futures_util::StreamExt; use items_0::framable::FrameTypeInnerStatic; +use items_0::streamitem::RangeCompletableItem; use items_0::streamitem::Sitemty; use items_0::streamitem::StreamItem; use items_2::frame::decode_frame; @@ -64,11 +65,20 @@ where StreamItem::DataItem(frame) => match decode_frame::>(&frame) { Ok(item) => match item { Ok(item) => match item { + StreamItem::DataItem(item2) => match item2 { + RangeCompletableItem::Data(item3) => { + Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(item3))))) + } + RangeCompletableItem::RangeComplete => { + debug!("EventsFromFrames RangeComplete"); + Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)))) + } + }, StreamItem::Log(k) => { //info!("rcvd log: {} {:?} {}", k.node_ix, k.level, k.msg); Ready(Some(Ok(StreamItem::Log(k)))) } - item => Ready(Some(Ok(item))), + StreamItem::Stats(k) => Ready(Some(Ok(StreamItem::Stats(k)))), }, Err(e) => { error!("rcvd err: {}", e); diff --git a/crates/streams/src/lib.rs b/crates/streams/src/lib.rs index 6c1bf29..30438d7 100644 --- a/crates/streams/src/lib.rs +++ b/crates/streams/src/lib.rs @@ -6,6 +6,7 @@ pub mod frames; pub mod generators; pub mod itemclone; pub mod needminbuffer; +pub mod plaineventscbor; pub mod plaineventsjson; pub mod print_on_done; pub mod rangefilter2; diff --git a/crates/streams/src/plaineventscbor.rs b/crates/streams/src/plaineventscbor.rs new file mode 100644 index 0000000..bce9f33 --- /dev/null +++ b/crates/streams/src/plaineventscbor.rs @@ -0,0 +1,114 @@ +use crate::plaineventsjson::dyn_events_stream; +use bytes::Bytes; +use err::Error; +use futures_util::future; +use futures_util::Stream; +use futures_util::StreamExt; +use items_0::streamitem::LogItem; +use items_0::streamitem::RangeCompletableItem; +use items_0::streamitem::StreamItem; +use netpod::log::Level; +use netpod::log::*; +use netpod::ChannelTypeConfigGen; +use netpod::NodeConfigCached; +use netpod::ReqCtx; +use query::api4::events::PlainEventsQuery; +use std::pin::Pin; + +pub struct CborBytes(Bytes); + +impl CborBytes { + pub fn into_inner(self) -> Bytes { + self.0 + } +} + +pub type CborStream = Pin> + Send>>; + +pub async fn plain_events_cbor( + evq: &PlainEventsQuery, + ch_conf: ChannelTypeConfigGen, + ctx: &ReqCtx, + ncc: &NodeConfigCached, +) -> Result { + let stream = dyn_events_stream(evq, ch_conf, ctx, &ncc.node_config.cluster).await?; + let stream = stream + .map(|x| match x { + Ok(x) => match x { + StreamItem::DataItem(x) => match x { + RangeCompletableItem::Data(evs) => { + if false { + use items_0::AsAnyRef; + // TODO impl generically on EventsDim0 ? + if let Some(evs) = evs.as_any_ref().downcast_ref::>() { + let mut buf = Vec::new(); + ciborium::into_writer(evs, &mut buf) + .map_err(|e| Error::with_msg_no_trace(format!("{e}")))?; + let bytes = Bytes::from(buf); + let item = CborBytes(bytes); + // Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))) + } else { + let item = LogItem::from_node(0, Level::DEBUG, format!("cbor stream discarded item")); + // Ok(StreamItem::Log(item)) + }; + } + let buf = evs.to_cbor_vec_u8(); + let bytes = Bytes::from(buf); + let item = CborBytes(bytes); + Ok(item) + } + RangeCompletableItem::RangeComplete => { + use ciborium::cbor; + let item = cbor!({ + "rangeFinal" => true, + }) + .map_err(Error::from_string)?; + let mut buf = Vec::with_capacity(64); + ciborium::into_writer(&item, &mut buf).map_err(Error::from_string)?; + let bytes = Bytes::from(buf); + let item = CborBytes(bytes); + Ok(item) + } + }, + StreamItem::Log(item) => { + info!("{item:?}"); + let item = CborBytes(Bytes::new()); + Ok(item) + } + StreamItem::Stats(item) => { + info!("{item:?}"); + let item = CborBytes(Bytes::new()); + Ok(item) + } + }, + Err(e) => { + use ciborium::cbor; + let item = cbor!({ + "error" => e.to_string(), + }) + .map_err(Error::from_string)?; + let mut buf = Vec::with_capacity(64); + ciborium::into_writer(&item, &mut buf).map_err(Error::from_string)?; + let bytes = Bytes::from(buf); + let item = CborBytes(bytes); + Ok(item) + } + }) + .filter(|x| { + future::ready(match x { + Ok(x) => x.0.len() > 0, + Err(_) => true, + }) + }) + .take_while({ + let mut state = true; + move |x| { + let ret = state; + if x.is_err() { + state = false; + } + future::ready(ret) + } + }); + Ok(Box::pin(stream)) +} diff --git a/crates/streams/src/plaineventsjson.rs b/crates/streams/src/plaineventsjson.rs index 44fdc3e..272bd8c 100644 --- a/crates/streams/src/plaineventsjson.rs +++ b/crates/streams/src/plaineventsjson.rs @@ -4,9 +4,13 @@ use crate::transform::build_merged_event_transform; use crate::transform::EventsToTimeBinnable; use crate::transform::TimeBinnableToCollectable; use err::Error; +use futures_util::Stream; use futures_util::StreamExt; use items_0::collect_s::Collectable; use items_0::on_sitemty_data; +use items_0::streamitem::RangeCompletableItem; +use items_0::streamitem::Sitemty; +use items_0::streamitem::StreamItem; use items_0::Events; use items_2::channelevents::ChannelEvents; use items_2::merger::Merger; @@ -20,6 +24,7 @@ use query::api4::events::EventsSubQuerySelect; use query::api4::events::EventsSubQuerySettings; use query::api4::events::PlainEventsQuery; use serde_json::Value as JsonValue; +use std::pin::Pin; use std::time::Instant; pub async fn plain_events_json( @@ -29,14 +34,40 @@ pub async fn plain_events_json( cluster: &Cluster, ) -> Result { info!("plain_events_json evquery {:?}", evq); + let deadline = Instant::now() + evq.timeout(); + + let stream = dyn_events_stream(evq, ch_conf, ctx, cluster).await?; + + let stream = stream.map(move |k| { + on_sitemty_data!(k, |k| { + let k: Box = Box::new(k); + Ok(StreamItem::DataItem(RangeCompletableItem::Data(k))) + }) + }); + + //let stream = PlainEventStream::new(stream); + //let stream = EventsToTimeBinnable::new(stream); + //let stream = TimeBinnableToCollectable::new(stream); + let stream = Box::pin(stream); + let collected = Collect::new(stream, deadline, evq.events_max(), Some(evq.range().clone()), None).await?; + let jsval = serde_json::to_value(&collected)?; + Ok(jsval) +} + +pub type DynEventsStream = Pin>> + Send>>; + +pub async fn dyn_events_stream( + evq: &PlainEventsQuery, + ch_conf: ChannelTypeConfigGen, + ctx: &ReqCtx, + cluster: &Cluster, +) -> Result { let mut select = EventsSubQuerySelect::new(ch_conf, evq.range().clone(), evq.transform().clone()); if let Some(x) = evq.test_do_wasm() { select.set_wasm1(x.into()); } let settings = EventsSubQuerySettings::from(evq); let subq = EventsSubQuery::from_parts(select, settings, ctx.reqid().into()); - // TODO remove magic constant - let deadline = Instant::now() + evq.timeout(); let mut tr = build_merged_event_transform(evq.transform())?; // TODO make sure the empty container arrives over the network. let inps = open_event_data_streams::(subq, ctx, cluster).await?; @@ -65,144 +96,137 @@ pub async fn plain_events_json( }) }); - let stream = if let Some(wasmname) = evq.test_do_wasm() { - debug!("make wasm transform"); - use httpclient::url::Url; - use wasmer::Value; - use wasmer::WasmSlice; - let t = httpclient::http_get( - Url::parse(&format!("http://data-api.psi.ch/distri/{}", wasmname)).unwrap(), - "*/*", - ctx, - ) - .await - .unwrap(); - let wasm = t.body; - // let wasm = include_bytes!("dummy.wasm"); - let mut store = wasmer::Store::default(); - let module = wasmer::Module::new(&store, wasm).unwrap(); - // TODO assert that memory is large enough - let memory = wasmer::Memory::new(&mut store, wasmer::MemoryType::new(10, Some(30), false)).unwrap(); - let import_object = wasmer::imports! { - "env" => { - "memory" => memory.clone(), - } - }; - let instance = wasmer::Instance::new(&mut store, &module, &import_object).unwrap(); - let get_buffer_ptr = instance.exports.get_function("get_buffer_ptr").unwrap(); - let buffer_ptr = get_buffer_ptr.call(&mut store, &[]).unwrap(); - let buffer_ptr = buffer_ptr[0].i32().unwrap(); - let stream = stream.map(move |x| { - let memory = memory.clone(); - let item = on_sitemty_data!(x, |mut evs: Box| { - let x = { - use items_0::AsAnyMut; - if true { - let r1 = evs - .as_any_mut() - .downcast_mut::>() - .is_some(); - let r2 = evs - .as_mut() - .as_any_mut() - .downcast_mut::>() - .is_some(); - let r3 = evs - .as_any_mut() - .downcast_mut::>>() - .is_some(); - let r4 = evs - .as_mut() - .as_any_mut() - .downcast_mut::>>() - .is_some(); - let r5 = evs.as_mut().as_any_mut().downcast_mut::().is_some(); - let r6 = evs.as_mut().as_any_mut().downcast_mut::>().is_some(); - debug!("wasm castings: {r1} {r2} {r3} {r4} {r5} {r6}"); - } - if let Some(evs) = evs.as_any_mut().downcast_mut::() { - match evs { - ChannelEvents::Events(evs) => { - if let Some(evs) = - evs.as_any_mut().downcast_mut::>() - { - use items_0::WithLen; - if evs.len() == 0 { - debug!("wasm empty EventsDim0"); - } else { - debug!("wasm see EventsDim0"); - let max_len_needed = 16000; - let dummy1 = instance.exports.get_function("dummy1").unwrap(); - let s = evs.values.as_mut_slices(); - for sl in [s.0, s.1] { - if sl.len() > max_len_needed as _ { - // TODO cause error - panic!(); - } - let wmemoff = buffer_ptr as u64; - let view = memory.view(&store); - // TODO is the offset bytes or elements? - let wsl = WasmSlice::::new(&view, wmemoff, sl.len() as _).unwrap(); - // debug!("wasm pages {:?} data size {:?}", view.size(), view.data_size()); - wsl.write_slice(&sl).unwrap(); - let ptr = wsl.as_ptr32(); - debug!("ptr {:?} offset {}", ptr, ptr.offset()); - let params = [Value::I32(ptr.offset() as _), Value::I32(sl.len() as _)]; - let res = dummy1.call(&mut store, ¶ms).unwrap(); - match res[0] { - Value::I32(x) => { - debug!("wasm dummy1 returned: {x:?}"); - if x != 1 { - error!("unexpected return value {res:?}"); - } - } - _ => { - error!("unexpected return type {res:?}"); - } - } - // Init the slice again because we need to drop ownership for the function call. - let view = memory.view(&store); - let wsl = WasmSlice::::new(&view, wmemoff, sl.len() as _).unwrap(); - wsl.read_slice(sl).unwrap(); - } - } - } else { - debug!("wasm not EventsDim0"); - } - } - ChannelEvents::Status(_) => {} - } - } else { - debug!("wasm not ChannelEvents"); - } - evs - }; - Ok(StreamItem::DataItem(RangeCompletableItem::Data(x))) - }); - // Box::new(item) as Box - item - }); - use futures_util::Stream; - use items_0::streamitem::Sitemty; - use std::pin::Pin; - Box::pin(stream) as Pin>> + Send>> + if let Some(wasmname) = evq.test_do_wasm() { + let stream = transform_wasm(stream, wasmname, ctx).await?; + Ok(Box::pin(stream)) } else { - let stream = stream.map(|x| x); - Box::pin(stream) - }; - - let stream = stream.map(move |k| { - on_sitemty_data!(k, |k| { - let k: Box = Box::new(k); - Ok(StreamItem::DataItem(RangeCompletableItem::Data(k))) - }) - }); - - //let stream = PlainEventStream::new(stream); - //let stream = EventsToTimeBinnable::new(stream); - //let stream = TimeBinnableToCollectable::new(stream); - let stream = Box::pin(stream); - let collected = Collect::new(stream, deadline, evq.events_max(), Some(evq.range().clone()), None).await?; - let jsval = serde_json::to_value(&collected)?; - Ok(jsval) + // let stream = stream.map(|x| x); + Ok(Box::pin(stream)) + } +} + +async fn transform_wasm( + stream: INP, + wasmname: &str, + ctx: &ReqCtx, +) -> Result>>, Error>> + Send, Error> +where + INP: Stream>>, Error>> + Send + 'static, +{ + debug!("make wasm transform"); + use httpclient::url::Url; + use wasmer::Value; + use wasmer::WasmSlice; + let t = httpclient::http_get( + Url::parse(&format!("http://data-api.psi.ch/distri/{}", wasmname)).unwrap(), + "*/*", + ctx, + ) + .await + .unwrap(); + let wasm = t.body; + // let wasm = include_bytes!("dummy.wasm"); + let mut store = wasmer::Store::default(); + let module = wasmer::Module::new(&store, wasm).unwrap(); + // TODO assert that memory is large enough + let memory = wasmer::Memory::new(&mut store, wasmer::MemoryType::new(10, Some(30), false)).unwrap(); + let import_object = wasmer::imports! { + "env" => { + "memory" => memory.clone(), + } + }; + let instance = wasmer::Instance::new(&mut store, &module, &import_object).unwrap(); + let get_buffer_ptr = instance.exports.get_function("get_buffer_ptr").unwrap(); + let buffer_ptr = get_buffer_ptr.call(&mut store, &[]).unwrap(); + let buffer_ptr = buffer_ptr[0].i32().unwrap(); + let stream = stream.map(move |x| { + let memory = memory.clone(); + let item = on_sitemty_data!(x, |mut evs: Box| { + let x = { + use items_0::AsAnyMut; + if true { + let r1 = evs + .as_any_mut() + .downcast_mut::>() + .is_some(); + let r2 = evs + .as_mut() + .as_any_mut() + .downcast_mut::>() + .is_some(); + let r3 = evs + .as_any_mut() + .downcast_mut::>>() + .is_some(); + let r4 = evs + .as_mut() + .as_any_mut() + .downcast_mut::>>() + .is_some(); + let r5 = evs.as_mut().as_any_mut().downcast_mut::().is_some(); + let r6 = evs.as_mut().as_any_mut().downcast_mut::>().is_some(); + debug!("wasm castings: {r1} {r2} {r3} {r4} {r5} {r6}"); + } + if let Some(evs) = evs.as_any_mut().downcast_mut::() { + match evs { + ChannelEvents::Events(evs) => { + if let Some(evs) = evs.as_any_mut().downcast_mut::>() { + use items_0::WithLen; + if evs.len() == 0 { + debug!("wasm empty EventsDim0"); + } else { + debug!("wasm see EventsDim0"); + let max_len_needed = 16000; + let dummy1 = instance.exports.get_function("dummy1").unwrap(); + let s = evs.values.as_mut_slices(); + for sl in [s.0, s.1] { + if sl.len() > max_len_needed as _ { + // TODO cause error + panic!(); + } + let wmemoff = buffer_ptr as u64; + let view = memory.view(&store); + // TODO is the offset bytes or elements? + let wsl = WasmSlice::::new(&view, wmemoff, sl.len() as _).unwrap(); + // debug!("wasm pages {:?} data size {:?}", view.size(), view.data_size()); + wsl.write_slice(&sl).unwrap(); + let ptr = wsl.as_ptr32(); + debug!("ptr {:?} offset {}", ptr, ptr.offset()); + let params = [Value::I32(ptr.offset() as _), Value::I32(sl.len() as _)]; + let res = dummy1.call(&mut store, ¶ms).unwrap(); + match res[0] { + Value::I32(x) => { + debug!("wasm dummy1 returned: {x:?}"); + if x != 1 { + error!("unexpected return value {res:?}"); + } + } + _ => { + error!("unexpected return type {res:?}"); + } + } + // Init the slice again because we need to drop ownership for the function call. + let view = memory.view(&store); + let wsl = WasmSlice::::new(&view, wmemoff, sl.len() as _).unwrap(); + wsl.read_slice(sl).unwrap(); + } + } + } else { + debug!("wasm not EventsDim0"); + } + } + ChannelEvents::Status(_) => {} + } + } else { + debug!("wasm not ChannelEvents"); + } + evs + }; + Ok(StreamItem::DataItem(RangeCompletableItem::Data(x))) + }); + // Box::new(item) as Box + item + }); + let ret: Pin>> + Send>> = Box::pin(stream); + Ok(ret) }