From a8479b2c8d7a13f978cbefd7a0d8da9be1cf167b Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Mon, 18 Dec 2023 15:53:33 +0100 Subject: [PATCH] WIP adding cbor stream test --- crates/httpret/src/api4/binned.rs | 16 ++-- crates/httpret/src/api4/events.rs | 15 ++- crates/netpod/src/netpod.rs | 6 ++ crates/netpod/src/range/evrange.rs | 4 +- crates/nodenet/src/client.rs | 91 ++++++++++++++++++ crates/nodenet/src/conn.rs | 58 +----------- crates/nodenet/src/lib.rs | 1 + crates/streams/src/frames/inmem.rs | 2 + crates/streams/src/generators.rs | 105 +++++++++++++++++++++ crates/streams/src/plaineventscbor.rs | 5 +- crates/streams/src/plaineventsjson.rs | 6 +- crates/streams/src/plaineventsstream.rs | 33 ++++--- crates/streams/src/tcprawclient.rs | 120 ++++++++++-------------- crates/streams/src/test.rs | 6 +- crates/streams/src/test/events.rs | 68 ++++++++++++++ crates/streams/src/timebinnedjson.rs | 46 +++++---- 16 files changed, 407 insertions(+), 175 deletions(-) create mode 100644 crates/nodenet/src/client.rs create mode 100644 crates/streams/src/test/events.rs diff --git a/crates/httpret/src/api4/binned.rs b/crates/httpret/src/api4/binned.rs index ff21844..b00ab9d 100644 --- a/crates/httpret/src/api4/binned.rs +++ b/crates/httpret/src/api4/binned.rs @@ -16,16 +16,12 @@ use netpod::timeunits::SEC; use netpod::FromUrl; use netpod::NodeConfigCached; use netpod::ReqCtx; +use nodenet::client::OpenBoxedBytesViaHttp; use query::api4::binned::BinnedQuery; use tracing::Instrument; use url::Url; -async fn binned_json( - url: Url, - req: Requ, - ctx: &ReqCtx, - node_config: &NodeConfigCached, -) -> Result { +async fn binned_json(url: Url, req: Requ, ctx: &ReqCtx, ncc: &NodeConfigCached) -> Result { debug!("{:?}", req); let reqid = crate::status_board() .map_err(|e| Error::with_msg_no_trace(e.to_string()))? @@ -37,7 +33,7 @@ async fn binned_json( e.add_public_msg(msg) })?; // TODO handle None case better and return 404 - let ch_conf = ch_conf_from_binned(&query, ctx, node_config) + let ch_conf = ch_conf_from_binned(&query, ctx, ncc) .await? .ok_or_else(|| Error::with_msg_no_trace("channel not found"))?; let span1 = span!( @@ -46,12 +42,14 @@ async fn binned_json( reqid, beg = query.range().beg_u64() / SEC, end = query.range().end_u64() / SEC, - ch = query.channel().name().clone(), + ch = query.channel().name(), ); span1.in_scope(|| { debug!("begin"); }); - let item = streams::timebinnedjson::timebinned_json(query, ch_conf, ctx, node_config.node_config.cluster.clone()) + let open_bytes = OpenBoxedBytesViaHttp::new(ncc.node_config.cluster.clone()); + let open_bytes = Box::pin(open_bytes); + let item = streams::timebinnedjson::timebinned_json(query, ch_conf, ctx, open_bytes) .instrument(span1) .await?; let ret = response(StatusCode::OK).body(ToJsonBody::from(&item).into_body())?; diff --git a/crates/httpret/src/api4/events.rs b/crates/httpret/src/api4/events.rs index 109f73d..35cc6c2 100644 --- a/crates/httpret/src/api4/events.rs +++ b/crates/httpret/src/api4/events.rs @@ -25,6 +25,7 @@ use netpod::ReqCtx; use netpod::ACCEPT_ALL; use netpod::APP_CBOR; use netpod::APP_JSON; +use nodenet::client::OpenBoxedBytesViaHttp; use query::api4::events::PlainEventsQuery; use url::Url; @@ -81,7 +82,8 @@ async fn plain_events_cbor(url: Url, req: Requ, ctx: &ReqCtx, ncc: &NodeConfigCa .await? .ok_or_else(|| Error::with_msg_no_trace("channel not found"))?; info!("plain_events_cbor chconf_from_events_quorum: {ch_conf:?} {req:?}"); - let stream = streams::plaineventscbor::plain_events_cbor(&evq, ch_conf, ctx, ncc).await?; + let open_bytes = OpenBoxedBytesViaHttp::new(ncc.node_config.cluster.clone()); + let stream = streams::plaineventscbor::plain_events_cbor(&evq, ch_conf, ctx, Box::pin(open_bytes)).await?; use future::ready; let stream = stream .flat_map(|x| match x { @@ -116,8 +118,15 @@ async fn plain_events_json( .map_err(Error::from)? .ok_or_else(|| Error::with_msg_no_trace("channel not found"))?; info!("plain_events_json chconf_from_events_quorum: {ch_conf:?}"); - let item = - streams::plaineventsjson::plain_events_json(&query, ch_conf, ctx, &node_config.node_config.cluster).await; + let open_bytes = OpenBoxedBytesViaHttp::new(node_config.node_config.cluster.clone()); + let item = streams::plaineventsjson::plain_events_json( + &query, + ch_conf, + ctx, + &node_config.node_config.cluster, + Box::pin(open_bytes), + ) + .await; let item = match item { Ok(item) => item, Err(e) => { diff --git a/crates/netpod/src/netpod.rs b/crates/netpod/src/netpod.rs index 4de8b8d..64aa9a3 100644 --- a/crates/netpod/src/netpod.rs +++ b/crates/netpod/src/netpod.rs @@ -2946,6 +2946,12 @@ impl From for ChannelTypeConfigGen { } } +impl From for ChannelTypeConfigGen { + fn from(value: ChConf) -> Self { + Self::Scylla(value) + } +} + pub fn f32_close(a: f32, b: f32) -> bool { if (a - b).abs() < 1e-4 || (a / b > 0.999 && a / b < 1.001) { true diff --git a/crates/netpod/src/range/evrange.rs b/crates/netpod/src/range/evrange.rs index d55614d..73a9625 100644 --- a/crates/netpod/src/range/evrange.rs +++ b/crates/netpod/src/range/evrange.rs @@ -53,8 +53,8 @@ impl fmt::Debug for NanoRange { impl NanoRange { pub fn from_date_time(beg: DateTime, end: DateTime) -> Self { Self { - beg: beg.timestamp_nanos() as u64, - end: end.timestamp_nanos() as u64, + beg: beg.timestamp_nanos_opt().unwrap_or(0) as u64, + end: end.timestamp_nanos_opt().unwrap_or(0) as u64, } } diff --git a/crates/nodenet/src/client.rs b/crates/nodenet/src/client.rs new file mode 100644 index 0000000..8d4ac52 --- /dev/null +++ b/crates/nodenet/src/client.rs @@ -0,0 +1,91 @@ +use err::Error; +use futures_util::Future; +use http::header; +use http::Method; +use http::Request; +use httpclient::body_bytes; +use httpclient::http; +use httpclient::hyper::StatusCode; +use httpclient::hyper::Uri; +use items_0::streamitem::sitem_data; +use items_2::framable::Framable; +use netpod::log::*; +use netpod::Cluster; +use netpod::ReqCtx; +use netpod::APP_OCTET; +use query::api4::events::EventsSubQuery; +use std::pin::Pin; +use streams::frames::inmem::BoxedBytesStream; +use streams::tcprawclient::make_node_command_frame; +use streams::tcprawclient::OpenBoxedBytesStreams; + +async fn open_bytes_data_streams_http( + subq: EventsSubQuery, + ctx: ReqCtx, + cluster: Cluster, +) -> Result, Error> { + let frame1 = make_node_command_frame(subq.clone())?; + let mut streams = Vec::new(); + for node in &cluster.nodes { + let item = sitem_data(frame1.clone()); + let buf = item.make_frame()?; + + let url = node.baseurl().join("/api/4/private/eventdata/frames").unwrap(); + debug!("open_event_data_streams_http post {url}"); + let uri: Uri = url.as_str().parse().unwrap(); + let req = Request::builder() + .method(Method::POST) + .uri(&uri) + .header(header::HOST, uri.host().unwrap()) + .header(header::ACCEPT, APP_OCTET) + .header(ctx.header_name(), ctx.header_value()) + .body(body_bytes(buf)) + .map_err(|e| Error::with_msg_no_trace(e.to_string()))?; + let mut client = httpclient::connect_client(req.uri()).await?; + let res = client + .send_request(req) + .await + .map_err(|e| Error::with_msg_no_trace(e.to_string()))?; + if res.status() != StatusCode::OK { + error!("Server error {:?}", res); + let (head, body) = res.into_parts(); + let buf = httpclient::read_body_bytes(body).await?; + let s = String::from_utf8_lossy(&buf); + return Err(Error::with_msg(format!( + concat!( + "Server error {:?}\n", + "---------------------- message from http body:\n", + "{}\n", + "---------------------- end of http body", + ), + head, s + ))); + } + let (_head, body) = res.into_parts(); + let stream = Box::pin(httpclient::IncomingStream::new(body)) as BoxedBytesStream; + debug!("open_event_data_streams_http done {url}"); + streams.push(Box::pin(stream) as _); + } + Ok(streams) +} + +pub struct OpenBoxedBytesViaHttp { + cluster: Cluster, +} + +impl OpenBoxedBytesViaHttp { + pub fn new(cluster: Cluster) -> Self { + Self { cluster } + } +} + +impl OpenBoxedBytesStreams for OpenBoxedBytesViaHttp { + fn open( + &self, + subq: EventsSubQuery, + ctx: ReqCtx, + ) -> Pin, Error>> + Send>> { + let fut = open_bytes_data_streams_http(subq, ctx, self.cluster.clone()); + Box::pin(fut) + } +} diff --git a/crates/nodenet/src/conn.rs b/crates/nodenet/src/conn.rs index 837906c..6aa3150 100644 --- a/crates/nodenet/src/conn.rs +++ b/crates/nodenet/src/conn.rs @@ -27,11 +27,10 @@ use query::api4::events::EventsSubQuery; use query::api4::events::Frame1Parts; use std::net::SocketAddr; use std::pin::Pin; +use streams::frames::inmem::BoxedBytesStream; use streams::frames::inmem::InMemoryFrameStream; use streams::frames::inmem::TcpReadAsBytes; -use streams::generators::GenerateF64V00; -use streams::generators::GenerateI32V00; -use streams::generators::GenerateI32V01; +use streams::tcprawclient::TEST_BACKEND; use streams::transform::build_event_transform; use taskrun::tokio; use tokio::io::AsyncWriteExt; @@ -39,8 +38,6 @@ use tokio::net::tcp::OwnedWriteHalf; use tokio::net::TcpStream; use tracing::Instrument; -const TEST_BACKEND: &str = "testbackend-00"; - #[cfg(test)] mod test; @@ -81,51 +78,9 @@ async fn make_channel_events_stream_data( ncc: &NodeConfigCached, ) -> Result> + Send>>, Error> { if subq.backend() == TEST_BACKEND { - debug!("use test backend data {}", TEST_BACKEND); let node_count = ncc.node_config.cluster.nodes.len() as u64; let node_ix = ncc.ix as u64; - let chn = subq.name(); - let range = subq.range().clone(); - let one_before = subq.transform().need_one_before_range(); - if chn == "test-gen-i32-dim0-v00" { - Ok(Box::pin(GenerateI32V00::new(node_ix, node_count, range, one_before))) - } else if chn == "test-gen-i32-dim0-v01" { - Ok(Box::pin(GenerateI32V01::new(node_ix, node_count, range, one_before))) - } else if chn == "test-gen-f64-dim1-v00" { - Ok(Box::pin(GenerateF64V00::new(node_ix, node_count, range, one_before))) - } else { - let na: Vec<_> = chn.split("-").collect(); - if na.len() != 3 { - Err(Error::with_msg_no_trace(format!( - "make_channel_events_stream_data can not understand test channel name: {chn:?}" - ))) - } else { - if na[0] != "inmem" { - Err(Error::with_msg_no_trace(format!( - "make_channel_events_stream_data can not understand test channel name: {chn:?}" - ))) - } else { - let _range = subq.range().clone(); - if na[1] == "d0" { - if na[2] == "i32" { - //generator::generate_i32(node_ix, node_count, range) - panic!() - } else if na[2] == "f32" { - //generator::generate_f32(node_ix, node_count, range) - panic!() - } else { - Err(Error::with_msg_no_trace(format!( - "make_channel_events_stream_data can not understand test channel name: {chn:?}" - ))) - } - } else { - Err(Error::with_msg_no_trace(format!( - "make_channel_events_stream_data can not understand test channel name: {chn:?}" - ))) - } - } - } - } + streams::generators::make_test_channel_events_stream_data(subq, node_count, node_ix) } else if let Some(scyconf) = &ncc.node_config.cluster.scylla { let cfg = subq.ch_conf().to_scylla()?; scylla_channel_event_stream(subq, cfg, scyconf, ncc).await @@ -154,12 +109,10 @@ async fn make_channel_events_stream( Ok(ret) } -pub type BytesStreamBox = Pin> + Send>>; - pub async fn create_response_bytes_stream( evq: EventsSubQuery, ncc: &NodeConfigCached, -) -> Result { +) -> Result { debug!( "create_response_bytes_stream {:?} {:?}", evq.ch_conf().scalar_type(), @@ -180,9 +133,8 @@ pub async fn create_response_bytes_stream( let ret = Box::pin(stream); Ok(ret) } else { - let stream = make_channel_events_stream(evq.clone(), reqctx, ncc).await?; let mut tr = build_event_transform(evq.transform())?; - + let stream = make_channel_events_stream(evq, reqctx, ncc).await?; let stream = stream.map(move |x| { on_sitemty_data!(x, |x: ChannelEvents| { match x { diff --git a/crates/nodenet/src/lib.rs b/crates/nodenet/src/lib.rs index a0011c1..82e939f 100644 --- a/crates/nodenet/src/lib.rs +++ b/crates/nodenet/src/lib.rs @@ -1,4 +1,5 @@ pub mod channelconfig; +pub mod client; pub mod configquorum; pub mod conn; pub mod scylla; diff --git a/crates/streams/src/frames/inmem.rs b/crates/streams/src/frames/inmem.rs index ae78d91..26698e4 100644 --- a/crates/streams/src/frames/inmem.rs +++ b/crates/streams/src/frames/inmem.rs @@ -16,6 +16,8 @@ use std::task::Context; use std::task::Poll; use tokio::io::AsyncRead; +pub type BoxedBytesStream = Pin> + Send>>; + #[allow(unused)] macro_rules! trace2 { ($($arg:tt)*) => (); diff --git a/crates/streams/src/generators.rs b/crates/streams/src/generators.rs index d2899c3..042bc82 100644 --- a/crates/streams/src/generators.rs +++ b/crates/streams/src/generators.rs @@ -1,7 +1,12 @@ +use crate::frames::inmem::BoxedBytesStream; +use crate::transform::build_event_transform; +use err::Error; use futures_util::Future; use futures_util::FutureExt; use futures_util::Stream; +use futures_util::StreamExt; use items_0::container::ByteEstimate; +use items_0::on_sitemty_data; use items_0::streamitem::sitem_data; use items_0::streamitem::RangeCompletableItem; use items_0::streamitem::Sitemty; @@ -10,18 +15,118 @@ use items_0::Appendable; use items_0::Empty; use items_0::WithLen; use items_2::channelevents::ChannelEvents; +use items_2::empty::empty_events_dyn_ev; use items_2::eventsdim0::EventsDim0; use items_2::eventsdim1::EventsDim1; +use items_2::framable::Framable; use netpod::log::*; use netpod::range::evrange::SeriesRange; use netpod::timeunits::DAY; use netpod::timeunits::MS; +use query::api4::events::EventsSubQuery; use std::f64::consts::PI; use std::pin::Pin; use std::task::Context; use std::task::Poll; use std::time::Duration; +pub fn make_test_channel_events_bytes_stream( + subq: EventsSubQuery, + node_count: u64, + node_ix: u64, +) -> Result { + if subq.is_event_blobs() { + let e = Error::with_msg_no_trace("evq.is_event_blobs() not supported in this generator"); + error!("{e}"); + Err(e) + } else { + let mut tr = build_event_transform(subq.transform())?; + let stream = make_test_channel_events_stream_data(subq, node_count, node_ix)?; + let stream = stream.map(move |x| { + on_sitemty_data!(x, |x: ChannelEvents| { + match x { + ChannelEvents::Events(evs) => { + let evs = tr.0.transform(evs); + Ok(StreamItem::DataItem(RangeCompletableItem::Data(ChannelEvents::Events( + evs, + )))) + } + ChannelEvents::Status(x) => Ok(StreamItem::DataItem(RangeCompletableItem::Data( + ChannelEvents::Status(x), + ))), + } + }) + }); + let stream = stream.map(|x| x.make_frame().map(|x| x.freeze())); + let ret = Box::pin(stream); + Ok(ret) + } +} + +// is also used from nodenet::conn +pub fn make_test_channel_events_stream_data( + subq: EventsSubQuery, + node_count: u64, + node_ix: u64, +) -> Result> + Send>>, Error> { + let empty = empty_events_dyn_ev(subq.ch_conf().scalar_type(), subq.ch_conf().shape())?; + let empty = sitem_data(ChannelEvents::Events(empty)); + let stream = make_test_channel_events_stream_data_inner(subq, node_count, node_ix)?; + let ret = futures_util::stream::iter([empty]).chain(stream); + let ret = Box::pin(ret); + Ok(ret) +} + +fn make_test_channel_events_stream_data_inner( + subq: EventsSubQuery, + node_count: u64, + node_ix: u64, +) -> Result> + Send>>, Error> { + debug!("use test backend data"); + let chn = subq.name(); + let range = subq.range().clone(); + let one_before = subq.transform().need_one_before_range(); + if chn == "test-gen-i32-dim0-v00" { + Ok(Box::pin(GenerateI32V00::new(node_ix, node_count, range, one_before))) + } else if chn == "test-gen-i32-dim0-v01" { + Ok(Box::pin(GenerateI32V01::new(node_ix, node_count, range, one_before))) + } else if chn == "test-gen-f64-dim1-v00" { + Ok(Box::pin(GenerateF64V00::new(node_ix, node_count, range, one_before))) + } else { + let na: Vec<_> = chn.split("-").collect(); + if na.len() != 3 { + Err(Error::with_msg_no_trace(format!( + "make_channel_events_stream_data can not understand test channel name: {chn:?}" + ))) + } else { + if na[0] != "inmem" { + Err(Error::with_msg_no_trace(format!( + "make_channel_events_stream_data can not understand test channel name: {chn:?}" + ))) + } else { + let _range = subq.range().clone(); + if na[1] == "d0" { + if na[2] == "i32" { + //generator::generate_i32(node_ix, node_count, range) + panic!() + } else if na[2] == "f32" { + //generator::generate_f32(node_ix, node_count, range) + panic!() + } else { + Err(Error::with_msg_no_trace(format!( + "make_channel_events_stream_data can not understand test channel name: {chn:?}" + ))) + } + } else { + Err(Error::with_msg_no_trace(format!( + "make_channel_events_stream_data can not understand test channel name: {chn:?}" + ))) + } + } + } + } +} + pub struct GenerateI32V00 { ts: u64, dts: u64, diff --git a/crates/streams/src/plaineventscbor.rs b/crates/streams/src/plaineventscbor.rs index beb24e1..9df16f6 100644 --- a/crates/streams/src/plaineventscbor.rs +++ b/crates/streams/src/plaineventscbor.rs @@ -1,4 +1,5 @@ use crate::plaineventsstream::dyn_events_stream; +use crate::tcprawclient::OpenBoxedBytesStreamsBox; use bytes::Bytes; use err::Error; use futures_util::future; @@ -29,9 +30,9 @@ pub async fn plain_events_cbor( evq: &PlainEventsQuery, ch_conf: ChannelTypeConfigGen, ctx: &ReqCtx, - ncc: &NodeConfigCached, + open_bytes: OpenBoxedBytesStreamsBox, ) -> Result { - let stream = dyn_events_stream(evq, ch_conf, ctx, &ncc.node_config.cluster).await?; + let stream = dyn_events_stream(evq, ch_conf, ctx, open_bytes).await?; let stream = stream .map(|x| match x { Ok(x) => match x { diff --git a/crates/streams/src/plaineventsjson.rs b/crates/streams/src/plaineventsjson.rs index 3798ff2..188bf9d 100644 --- a/crates/streams/src/plaineventsjson.rs +++ b/crates/streams/src/plaineventsjson.rs @@ -1,5 +1,6 @@ use crate::collect::Collect; use crate::plaineventsstream::dyn_events_stream; +use crate::tcprawclient::OpenBoxedBytesStreamsBox; use err::Error; use futures_util::StreamExt; use items_0::collect_s::Collectable; @@ -16,12 +17,13 @@ pub async fn plain_events_json( evq: &PlainEventsQuery, ch_conf: ChannelTypeConfigGen, ctx: &ReqCtx, - cluster: &Cluster, + _cluster: &Cluster, + open_bytes: OpenBoxedBytesStreamsBox, ) -> Result { info!("plain_events_json evquery {:?}", evq); let deadline = Instant::now() + evq.timeout(); - let stream = dyn_events_stream(evq, ch_conf, ctx, cluster).await?; + let stream = dyn_events_stream(evq, ch_conf, ctx, open_bytes).await?; let stream = stream.map(move |k| { on_sitemty_data!(k, |k| { diff --git a/crates/streams/src/plaineventsstream.rs b/crates/streams/src/plaineventsstream.rs index 6ad07e9..b0cdd38 100644 --- a/crates/streams/src/plaineventsstream.rs +++ b/crates/streams/src/plaineventsstream.rs @@ -1,4 +1,6 @@ -use crate::tcprawclient::open_event_data_streams; +use crate::tcprawclient::container_stream_from_bytes_stream; +use crate::tcprawclient::make_sub_query; +use crate::tcprawclient::OpenBoxedBytesStreamsBox; use crate::transform::build_merged_event_transform; use err::Error; use futures_util::Stream; @@ -12,11 +14,7 @@ use items_2::channelevents::ChannelEvents; use items_2::merger::Merger; use netpod::log::*; use netpod::ChannelTypeConfigGen; -use netpod::Cluster; use netpod::ReqCtx; -use query::api4::events::EventsSubQuery; -use query::api4::events::EventsSubQuerySelect; -use query::api4::events::EventsSubQuerySettings; use query::api4::events::PlainEventsQuery; use std::pin::Pin; @@ -26,17 +24,26 @@ pub async fn dyn_events_stream( evq: &PlainEventsQuery, ch_conf: ChannelTypeConfigGen, ctx: &ReqCtx, - cluster: &Cluster, + open_bytes: OpenBoxedBytesStreamsBox, ) -> Result { - let mut select = EventsSubQuerySelect::new(ch_conf, evq.range().clone(), evq.transform().clone()); - if let Some(x) = evq.test_do_wasm() { - select.set_wasm1(x.into()); - } - let settings = EventsSubQuerySettings::from(evq); - let subq = EventsSubQuery::from_parts(select, settings, ctx.reqid().into()); + let subq = make_sub_query( + ch_conf, + evq.range().clone(), + evq.transform().clone(), + evq.test_do_wasm(), + evq, + ctx, + ); + let inmem_bufcap = subq.inmem_bufcap(); let mut tr = build_merged_event_transform(evq.transform())?; + let bytes_streams = open_bytes.open(subq, ctx.clone()).await?; + let mut inps = Vec::new(); + for s in bytes_streams { + let s = container_stream_from_bytes_stream::(s, inmem_bufcap.clone(), "TODOdbgdesc".into())?; + let s = Box::pin(s) as Pin> + Send>>; + inps.push(s); + } // TODO make sure the empty container arrives over the network. - let inps = open_event_data_streams::(subq, ctx, cluster).await?; // TODO propagate also the max-buf-len for the first stage event reader. // TODO use a mixture of count and byte-size as threshold. let stream = Merger::new(inps, evq.merger_out_len_max()); diff --git a/crates/streams/src/tcprawclient.rs b/crates/streams/src/tcprawclient.rs index d7012b7..7b5d290 100644 --- a/crates/streams/src/tcprawclient.rs +++ b/crates/streams/src/tcprawclient.rs @@ -4,9 +4,11 @@ //! to request such data from nodes. use crate::frames::eventsfromframes::EventsFromFrames; +use crate::frames::inmem::BoxedBytesStream; use crate::frames::inmem::InMemoryFrameStream; use crate::frames::inmem::TcpReadAsBytes; use err::Error; +use futures_util::Future; use futures_util::Stream; use http::Uri; use httpclient::body_bytes; @@ -19,18 +21,36 @@ use items_2::framable::EventQueryJsonStringFrame; use items_2::framable::Framable; use items_2::frame::make_term_frame; use netpod::log::*; +use netpod::range::evrange::SeriesRange; +use netpod::ByteSize; +use netpod::ChannelTypeConfigGen; use netpod::Cluster; use netpod::Node; use netpod::ReqCtx; use netpod::APP_OCTET; use query::api4::events::EventsSubQuery; +use query::api4::events::EventsSubQuerySelect; +use query::api4::events::EventsSubQuerySettings; use query::api4::events::Frame1Parts; +use query::transform::TransformQuery; use serde::de::DeserializeOwned; use std::fmt; use std::pin::Pin; use tokio::io::AsyncWriteExt; use tokio::net::TcpStream; +pub const TEST_BACKEND: &str = "testbackend-00"; + +pub trait OpenBoxedBytesStreams { + fn open( + &self, + subq: EventsSubQuery, + ctx: ReqCtx, + ) -> Pin, Error>> + Send>>; +} + +pub type OpenBoxedBytesStreamsBox = Pin>; + pub fn make_node_command_frame(query: EventsSubQuery) -> Result { let obj = Frame1Parts::new(query); let ret = serde_json::to_string(&obj)?; @@ -53,7 +73,8 @@ pub async fn x_processed_event_blobs_stream_from_node_tcp( netout.write_all(&buf).await?; netout.flush().await?; netout.forget(); - let frames = InMemoryFrameStream::new(TcpReadAsBytes::new(netin), subq.inmem_bufcap()); + let inp = Box::pin(TcpReadAsBytes::new(netin)) as BoxedBytesStream; + let frames = InMemoryFrameStream::new(inp, subq.inmem_bufcap()); let frames = Box::pin(frames); let items = EventsFromFrames::new(frames, addr); Ok(Box::pin(items)) @@ -106,7 +127,8 @@ pub async fn x_processed_event_blobs_stream_from_node_http( ))); } let (_head, body) = res.into_parts(); - let frames = InMemoryFrameStream::new(httpclient::IncomingStream::new(body), subq.inmem_bufcap()); + let inp = Box::pin(httpclient::IncomingStream::new(body)) as BoxedBytesStream; + let frames = InMemoryFrameStream::new(inp, subq.inmem_bufcap()); let frames = Box::pin(frames); let stream = EventsFromFrames::new(frames, url.to_string()); debug!("open_event_data_streams_http done {url}"); @@ -129,6 +151,7 @@ pub async fn x_processed_event_blobs_stream_from_node( pub type BoxedStream = Pin> + Send>>; +#[allow(unused)] async fn open_event_data_streams_tcp(subq: EventsSubQuery, cluster: &Cluster) -> Result>, Error> where // TODO group bounds in new trait @@ -150,7 +173,8 @@ where netout.flush().await?; netout.forget(); // TODO for images, we need larger buffer capacity - let frames = InMemoryFrameStream::new(TcpReadAsBytes::new(netin), subq.inmem_bufcap()); + let inp = Box::pin(TcpReadAsBytes::new(netin)) as BoxedBytesStream; + let frames = InMemoryFrameStream::new(inp, subq.inmem_bufcap()); let frames = Box::pin(frames); let stream = EventsFromFrames::::new(frames, addr); streams.push(Box::pin(stream) as _); @@ -158,79 +182,37 @@ where Ok(streams) } -async fn open_event_data_streams_http( - subq: EventsSubQuery, - ctx: &ReqCtx, - cluster: &Cluster, -) -> Result>, Error> +pub fn container_stream_from_bytes_stream( + inp: BoxedBytesStream, + bufcap: ByteSize, + dbgdesc: String, +) -> Result>, Error> where - // TODO group bounds in new trait T: FrameTypeInnerStatic + DeserializeOwned + Send + Unpin + fmt::Debug + 'static, { - let frame1 = make_node_command_frame(subq.clone())?; - let mut streams = Vec::new(); - for node in &cluster.nodes { - use http::header; - use http::Method; - use http::Request; - use httpclient::hyper::StatusCode; - - let item = sitem_data(frame1.clone()); - let buf = item.make_frame()?; - - let url = node.baseurl().join("/api/4/private/eventdata/frames").unwrap(); - debug!("open_event_data_streams_http post {url}"); - let uri: Uri = url.as_str().parse().unwrap(); - let req = Request::builder() - .method(Method::POST) - .uri(&uri) - .header(header::HOST, uri.host().unwrap()) - .header(header::ACCEPT, APP_OCTET) - .header(ctx.header_name(), ctx.header_value()) - .body(body_bytes(buf)) - .map_err(|e| Error::with_msg_no_trace(e.to_string()))?; - let mut client = httpclient::connect_client(req.uri()).await?; - let res = client - .send_request(req) - .await - .map_err(|e| Error::with_msg_no_trace(e.to_string()))?; - if res.status() != StatusCode::OK { - error!("Server error {:?}", res); - let (head, body) = res.into_parts(); - let buf = httpclient::read_body_bytes(body).await?; - let s = String::from_utf8_lossy(&buf); - return Err(Error::with_msg(format!( - concat!( - "Server error {:?}\n", - "---------------------- message from http body:\n", - "{}\n", - "---------------------- end of http body", - ), - head, s - ))); - } - let (_head, body) = res.into_parts(); - let frames = InMemoryFrameStream::new(httpclient::IncomingStream::new(body), subq.inmem_bufcap()); - let frames = Box::pin(frames); - let stream = EventsFromFrames::::new(frames, url.to_string()); - debug!("open_event_data_streams_http done {url}"); - streams.push(Box::pin(stream) as _); - } - Ok(streams) + let frames = InMemoryFrameStream::new(inp, bufcap); + // TODO let EventsFromFrames accept also non-boxed input? + let frames = Box::pin(frames); + let stream = EventsFromFrames::::new(frames, dbgdesc); + Ok(stream) } -pub async fn open_event_data_streams( - subq: EventsSubQuery, +pub fn make_sub_query( + ch_conf: ChannelTypeConfigGen, + range: SeriesRange, + transform: TransformQuery, + test_do_wasm: Option<&str>, + sub: SUB, ctx: &ReqCtx, - cluster: &Cluster, -) -> Result>, Error> +) -> EventsSubQuery where - // TODO group bounds in new trait - T: FrameTypeInnerStatic + DeserializeOwned + Send + Unpin + fmt::Debug + 'static, + SUB: Into, { - if true { - open_event_data_streams_http(subq, ctx, cluster).await - } else { - open_event_data_streams_tcp(subq, cluster).await + let mut select = EventsSubQuerySelect::new(ch_conf, range, transform); + if let Some(wasm1) = test_do_wasm { + select.set_wasm1(wasm1.into()); } + let settings = sub.into(); + let subq = EventsSubQuery::from_parts(select, settings, ctx.reqid().into()); + subq } diff --git a/crates/streams/src/test.rs b/crates/streams/src/test.rs index fc4df14..d24c5b9 100644 --- a/crates/streams/src/test.rs +++ b/crates/streams/src/test.rs @@ -1,6 +1,8 @@ #[cfg(test)] mod collect; #[cfg(test)] +mod events; +#[cfg(test)] mod timebin; use err::Error; @@ -25,7 +27,7 @@ fn inmem_test_events_d0_i32_00() -> BoxedEventStream { evs.push(SEC * 4, 4, 10004); let cev = ChannelEvents::Events(Box::new(evs)); let item = sitem_data(cev); - let stream = stream::iter(vec![item]); + let stream = stream::iter([item]); Box::pin(stream) } @@ -34,7 +36,7 @@ fn inmem_test_events_d0_i32_01() -> BoxedEventStream { evs.push(SEC * 2, 2, 10002); let cev = ChannelEvents::Events(Box::new(evs)); let item = sitem_data(cev); - let stream = stream::iter(vec![item]); + let stream = stream::iter([item]); Box::pin(stream) } diff --git a/crates/streams/src/test/events.rs b/crates/streams/src/test/events.rs new file mode 100644 index 0000000..f99f862 --- /dev/null +++ b/crates/streams/src/test/events.rs @@ -0,0 +1,68 @@ +use crate::frames::inmem::BoxedBytesStream; +use crate::plaineventscbor::plain_events_cbor; +use crate::tcprawclient::OpenBoxedBytesStreams; +use crate::tcprawclient::TEST_BACKEND; +use err::Error; +use futures_util::Future; +use futures_util::StreamExt; +use netpod::range::evrange::NanoRange; +use netpod::range::evrange::SeriesRange; +use netpod::ChConf; +use netpod::ReqCtx; +use netpod::ScalarType; +use netpod::SfDbChannel; +use netpod::Shape; +use query::api4::events::EventsSubQuery; +use query::api4::events::PlainEventsQuery; +use std::pin::Pin; + +#[test] +fn merged_events_cbor() { + crate::test::runfut(merged_events_inner()).unwrap(); +} + +async fn merged_events_inner() -> Result<(), Error> { + let ctx = ReqCtx::for_test(); + let ch_conf = ChConf::new(TEST_BACKEND, 1, ScalarType::F64, Shape::Scalar, "test-gen-i32-dim0-v00"); + let channel = SfDbChannel::from_name(ch_conf.backend(), ch_conf.name()); + let range = SeriesRange::TimeRange(NanoRange::from_date_time( + "2023-12-18T05:10:00Z".parse().unwrap(), + "2023-12-18T05:10:10Z".parse().unwrap(), + )); + let evq = PlainEventsQuery::new(channel, range); + let open_bytes = StreamOpener::new(); + let open_bytes = Box::pin(open_bytes); + let mut res = plain_events_cbor(&evq, ch_conf.into(), &ctx, open_bytes).await.unwrap(); + // TODO parse the cbor stream and assert + while let Some(x) = res.next().await { + let item = x?; + let bytes = item.into_inner(); + eprintln!("bytes len {}", bytes.len()); + } + Ok(()) +} + +struct StreamOpener {} + +impl StreamOpener { + fn new() -> Self { + Self {} + } +} + +impl OpenBoxedBytesStreams for StreamOpener { + fn open( + &self, + subq: EventsSubQuery, + _ctx: ReqCtx, + ) -> Pin, Error>> + Send>> { + Box::pin(stream_opener(subq)) + } +} + +async fn stream_opener(subq: EventsSubQuery) -> Result, Error> { + let mut streams = Vec::new(); + let stream = crate::generators::make_test_channel_events_bytes_stream(subq, 1, 0)?; + streams.push(stream); + Ok(streams) +} diff --git a/crates/streams/src/timebinnedjson.rs b/crates/streams/src/timebinnedjson.rs index 43a6837..7bbec9e 100644 --- a/crates/streams/src/timebinnedjson.rs +++ b/crates/streams/src/timebinnedjson.rs @@ -1,6 +1,8 @@ use crate::collect::Collect; use crate::rangefilter2::RangeFilter2; -use crate::tcprawclient::open_event_data_streams; +use crate::tcprawclient::container_stream_from_bytes_stream; +use crate::tcprawclient::make_sub_query; +use crate::tcprawclient::OpenBoxedBytesStreamsBox; use crate::timebin::TimeBinnedStream; use crate::transform::build_merged_event_transform; use crate::transform::EventsToTimeBinnable; @@ -25,9 +27,6 @@ use netpod::ChannelTypeConfigGen; use netpod::Cluster; use netpod::ReqCtx; use query::api4::binned::BinnedQuery; -use query::api4::events::EventsSubQuery; -use query::api4::events::EventsSubQuerySelect; -use query::api4::events::EventsSubQuerySettings; use serde_json::Value as JsonValue; use std::pin::Pin; use std::time::Instant; @@ -43,16 +42,26 @@ async fn timebinnable_stream( one_before_range: bool, ch_conf: ChannelTypeConfigGen, ctx: &ReqCtx, - cluster: Cluster, + open_bytes: OpenBoxedBytesStreamsBox, ) -> Result { - let mut select = EventsSubQuerySelect::new(ch_conf, range.clone().into(), query.transform().clone()); - if let Some(wasm1) = query.test_do_wasm() { - select.set_wasm1(wasm1.into()); - } - let settings = EventsSubQuerySettings::from(&query); - let subq = EventsSubQuery::from_parts(select.clone(), settings, ctx.reqid().into()); + let subq = make_sub_query( + ch_conf, + range.clone().into(), + query.transform().clone(), + query.test_do_wasm(), + &query, + ctx, + ); + let inmem_bufcap = subq.inmem_bufcap(); + let wasm1 = subq.wasm1().map(ToString::to_string); let mut tr = build_merged_event_transform(subq.transform())?; - let inps = open_event_data_streams::(subq, ctx, &cluster).await?; + let bytes_streams = open_bytes.open(subq, ctx.clone()).await?; + let mut inps = Vec::new(); + for s in bytes_streams { + let s = container_stream_from_bytes_stream::(s, inmem_bufcap.clone(), "TODOdbgdesc".into())?; + let s = Box::pin(s) as Pin> + Send>>; + inps.push(s); + } // TODO propagate also the max-buf-len for the first stage event reader. // TODO use a mixture of count and byte-size as threshold. let stream = Merger::new(inps, query.merger_out_len_max()); @@ -68,7 +77,7 @@ async fn timebinnable_stream( }) }); - let stream = if let Some(wasmname) = select.wasm1() { + let stream = if let Some(wasmname) = wasm1 { debug!("make wasm transform"); use httpclient::url::Url; use wasmer::Value; @@ -185,9 +194,6 @@ async fn timebinnable_stream( // Box::new(item) as Box item }); - use futures_util::Stream; - use items_0::streamitem::Sitemty; - use std::pin::Pin; Box::pin(stream) as Pin>> + Send>> } else { let stream = stream.map(|x| x); @@ -212,14 +218,14 @@ async fn timebinned_stream( binned_range: BinnedRangeEnum, ch_conf: ChannelTypeConfigGen, ctx: &ReqCtx, - cluster: Cluster, + open_bytes: OpenBoxedBytesStreamsBox, ) -> Result>> + Send>>, Error> { let range = binned_range.binned_range_time().to_nano_range(); let do_time_weight = true; let one_before_range = true; - let stream = timebinnable_stream(query.clone(), range, one_before_range, ch_conf, ctx, cluster).await?; + let stream = timebinnable_stream(query.clone(), range, one_before_range, ch_conf, ctx, open_bytes).await?; let stream: Pin> = stream.0; let stream = Box::pin(stream); // TODO rename TimeBinnedStream to make it more clear that it is the component which initiates the time binning. @@ -246,12 +252,12 @@ pub async fn timebinned_json( query: BinnedQuery, ch_conf: ChannelTypeConfigGen, ctx: &ReqCtx, - cluster: Cluster, + open_bytes: OpenBoxedBytesStreamsBox, ) -> Result { let deadline = Instant::now().checked_add(query.timeout_value()).unwrap(); let binned_range = BinnedRangeEnum::covering_range(query.range().clone(), query.bin_count())?; let collect_max = 10000; - let stream = timebinned_stream(query.clone(), binned_range.clone(), ch_conf, ctx, cluster).await?; + let stream = timebinned_stream(query.clone(), binned_range.clone(), ch_conf, ctx, open_bytes).await?; let stream = timebinned_to_collectable(stream); let collected = Collect::new(stream, deadline, collect_max, None, Some(binned_range)); let collected: BoxFuture<_> = Box::pin(collected);