WIP adding cbor stream test

This commit is contained in:
Dominik Werder
2023-12-18 15:53:33 +01:00
parent 31afee0893
commit a8479b2c8d
16 changed files with 407 additions and 175 deletions

View File

@@ -16,16 +16,12 @@ use netpod::timeunits::SEC;
use netpod::FromUrl;
use netpod::NodeConfigCached;
use netpod::ReqCtx;
use nodenet::client::OpenBoxedBytesViaHttp;
use query::api4::binned::BinnedQuery;
use tracing::Instrument;
use url::Url;
async fn binned_json(
url: Url,
req: Requ,
ctx: &ReqCtx,
node_config: &NodeConfigCached,
) -> Result<StreamResponse, Error> {
async fn binned_json(url: Url, req: Requ, ctx: &ReqCtx, ncc: &NodeConfigCached) -> Result<StreamResponse, Error> {
debug!("{:?}", req);
let reqid = crate::status_board()
.map_err(|e| Error::with_msg_no_trace(e.to_string()))?
@@ -37,7 +33,7 @@ async fn binned_json(
e.add_public_msg(msg)
})?;
// TODO handle None case better and return 404
let ch_conf = ch_conf_from_binned(&query, ctx, node_config)
let ch_conf = ch_conf_from_binned(&query, ctx, ncc)
.await?
.ok_or_else(|| Error::with_msg_no_trace("channel not found"))?;
let span1 = span!(
@@ -46,12 +42,14 @@ async fn binned_json(
reqid,
beg = query.range().beg_u64() / SEC,
end = query.range().end_u64() / SEC,
ch = query.channel().name().clone(),
ch = query.channel().name(),
);
span1.in_scope(|| {
debug!("begin");
});
let item = streams::timebinnedjson::timebinned_json(query, ch_conf, ctx, node_config.node_config.cluster.clone())
let open_bytes = OpenBoxedBytesViaHttp::new(ncc.node_config.cluster.clone());
let open_bytes = Box::pin(open_bytes);
let item = streams::timebinnedjson::timebinned_json(query, ch_conf, ctx, open_bytes)
.instrument(span1)
.await?;
let ret = response(StatusCode::OK).body(ToJsonBody::from(&item).into_body())?;

View File

@@ -25,6 +25,7 @@ use netpod::ReqCtx;
use netpod::ACCEPT_ALL;
use netpod::APP_CBOR;
use netpod::APP_JSON;
use nodenet::client::OpenBoxedBytesViaHttp;
use query::api4::events::PlainEventsQuery;
use url::Url;
@@ -81,7 +82,8 @@ async fn plain_events_cbor(url: Url, req: Requ, ctx: &ReqCtx, ncc: &NodeConfigCa
.await?
.ok_or_else(|| Error::with_msg_no_trace("channel not found"))?;
info!("plain_events_cbor chconf_from_events_quorum: {ch_conf:?} {req:?}");
let stream = streams::plaineventscbor::plain_events_cbor(&evq, ch_conf, ctx, ncc).await?;
let open_bytes = OpenBoxedBytesViaHttp::new(ncc.node_config.cluster.clone());
let stream = streams::plaineventscbor::plain_events_cbor(&evq, ch_conf, ctx, Box::pin(open_bytes)).await?;
use future::ready;
let stream = stream
.flat_map(|x| match x {
@@ -116,8 +118,15 @@ async fn plain_events_json(
.map_err(Error::from)?
.ok_or_else(|| Error::with_msg_no_trace("channel not found"))?;
info!("plain_events_json chconf_from_events_quorum: {ch_conf:?}");
let item =
streams::plaineventsjson::plain_events_json(&query, ch_conf, ctx, &node_config.node_config.cluster).await;
let open_bytes = OpenBoxedBytesViaHttp::new(node_config.node_config.cluster.clone());
let item = streams::plaineventsjson::plain_events_json(
&query,
ch_conf,
ctx,
&node_config.node_config.cluster,
Box::pin(open_bytes),
)
.await;
let item = match item {
Ok(item) => item,
Err(e) => {

View File

@@ -2946,6 +2946,12 @@ impl From<SfChFetchInfo> for ChannelTypeConfigGen {
}
}
impl From<ChConf> for ChannelTypeConfigGen {
fn from(value: ChConf) -> Self {
Self::Scylla(value)
}
}
pub fn f32_close(a: f32, b: f32) -> bool {
if (a - b).abs() < 1e-4 || (a / b > 0.999 && a / b < 1.001) {
true

View File

@@ -53,8 +53,8 @@ impl fmt::Debug for NanoRange {
impl NanoRange {
pub fn from_date_time(beg: DateTime<Utc>, end: DateTime<Utc>) -> Self {
Self {
beg: beg.timestamp_nanos() as u64,
end: end.timestamp_nanos() as u64,
beg: beg.timestamp_nanos_opt().unwrap_or(0) as u64,
end: end.timestamp_nanos_opt().unwrap_or(0) as u64,
}
}

View File

@@ -0,0 +1,91 @@
use err::Error;
use futures_util::Future;
use http::header;
use http::Method;
use http::Request;
use httpclient::body_bytes;
use httpclient::http;
use httpclient::hyper::StatusCode;
use httpclient::hyper::Uri;
use items_0::streamitem::sitem_data;
use items_2::framable::Framable;
use netpod::log::*;
use netpod::Cluster;
use netpod::ReqCtx;
use netpod::APP_OCTET;
use query::api4::events::EventsSubQuery;
use std::pin::Pin;
use streams::frames::inmem::BoxedBytesStream;
use streams::tcprawclient::make_node_command_frame;
use streams::tcprawclient::OpenBoxedBytesStreams;
async fn open_bytes_data_streams_http(
subq: EventsSubQuery,
ctx: ReqCtx,
cluster: Cluster,
) -> Result<Vec<BoxedBytesStream>, Error> {
let frame1 = make_node_command_frame(subq.clone())?;
let mut streams = Vec::new();
for node in &cluster.nodes {
let item = sitem_data(frame1.clone());
let buf = item.make_frame()?;
let url = node.baseurl().join("/api/4/private/eventdata/frames").unwrap();
debug!("open_event_data_streams_http post {url}");
let uri: Uri = url.as_str().parse().unwrap();
let req = Request::builder()
.method(Method::POST)
.uri(&uri)
.header(header::HOST, uri.host().unwrap())
.header(header::ACCEPT, APP_OCTET)
.header(ctx.header_name(), ctx.header_value())
.body(body_bytes(buf))
.map_err(|e| Error::with_msg_no_trace(e.to_string()))?;
let mut client = httpclient::connect_client(req.uri()).await?;
let res = client
.send_request(req)
.await
.map_err(|e| Error::with_msg_no_trace(e.to_string()))?;
if res.status() != StatusCode::OK {
error!("Server error {:?}", res);
let (head, body) = res.into_parts();
let buf = httpclient::read_body_bytes(body).await?;
let s = String::from_utf8_lossy(&buf);
return Err(Error::with_msg(format!(
concat!(
"Server error {:?}\n",
"---------------------- message from http body:\n",
"{}\n",
"---------------------- end of http body",
),
head, s
)));
}
let (_head, body) = res.into_parts();
let stream = Box::pin(httpclient::IncomingStream::new(body)) as BoxedBytesStream;
debug!("open_event_data_streams_http done {url}");
streams.push(Box::pin(stream) as _);
}
Ok(streams)
}
pub struct OpenBoxedBytesViaHttp {
cluster: Cluster,
}
impl OpenBoxedBytesViaHttp {
pub fn new(cluster: Cluster) -> Self {
Self { cluster }
}
}
impl OpenBoxedBytesStreams for OpenBoxedBytesViaHttp {
fn open(
&self,
subq: EventsSubQuery,
ctx: ReqCtx,
) -> Pin<Box<dyn Future<Output = Result<Vec<BoxedBytesStream>, Error>> + Send>> {
let fut = open_bytes_data_streams_http(subq, ctx, self.cluster.clone());
Box::pin(fut)
}
}

View File

@@ -27,11 +27,10 @@ use query::api4::events::EventsSubQuery;
use query::api4::events::Frame1Parts;
use std::net::SocketAddr;
use std::pin::Pin;
use streams::frames::inmem::BoxedBytesStream;
use streams::frames::inmem::InMemoryFrameStream;
use streams::frames::inmem::TcpReadAsBytes;
use streams::generators::GenerateF64V00;
use streams::generators::GenerateI32V00;
use streams::generators::GenerateI32V01;
use streams::tcprawclient::TEST_BACKEND;
use streams::transform::build_event_transform;
use taskrun::tokio;
use tokio::io::AsyncWriteExt;
@@ -39,8 +38,6 @@ use tokio::net::tcp::OwnedWriteHalf;
use tokio::net::TcpStream;
use tracing::Instrument;
const TEST_BACKEND: &str = "testbackend-00";
#[cfg(test)]
mod test;
@@ -81,51 +78,9 @@ async fn make_channel_events_stream_data(
ncc: &NodeConfigCached,
) -> Result<Pin<Box<dyn Stream<Item = Sitemty<ChannelEvents>> + Send>>, Error> {
if subq.backend() == TEST_BACKEND {
debug!("use test backend data {}", TEST_BACKEND);
let node_count = ncc.node_config.cluster.nodes.len() as u64;
let node_ix = ncc.ix as u64;
let chn = subq.name();
let range = subq.range().clone();
let one_before = subq.transform().need_one_before_range();
if chn == "test-gen-i32-dim0-v00" {
Ok(Box::pin(GenerateI32V00::new(node_ix, node_count, range, one_before)))
} else if chn == "test-gen-i32-dim0-v01" {
Ok(Box::pin(GenerateI32V01::new(node_ix, node_count, range, one_before)))
} else if chn == "test-gen-f64-dim1-v00" {
Ok(Box::pin(GenerateF64V00::new(node_ix, node_count, range, one_before)))
} else {
let na: Vec<_> = chn.split("-").collect();
if na.len() != 3 {
Err(Error::with_msg_no_trace(format!(
"make_channel_events_stream_data can not understand test channel name: {chn:?}"
)))
} else {
if na[0] != "inmem" {
Err(Error::with_msg_no_trace(format!(
"make_channel_events_stream_data can not understand test channel name: {chn:?}"
)))
} else {
let _range = subq.range().clone();
if na[1] == "d0" {
if na[2] == "i32" {
//generator::generate_i32(node_ix, node_count, range)
panic!()
} else if na[2] == "f32" {
//generator::generate_f32(node_ix, node_count, range)
panic!()
} else {
Err(Error::with_msg_no_trace(format!(
"make_channel_events_stream_data can not understand test channel name: {chn:?}"
)))
}
} else {
Err(Error::with_msg_no_trace(format!(
"make_channel_events_stream_data can not understand test channel name: {chn:?}"
)))
}
}
}
}
streams::generators::make_test_channel_events_stream_data(subq, node_count, node_ix)
} else if let Some(scyconf) = &ncc.node_config.cluster.scylla {
let cfg = subq.ch_conf().to_scylla()?;
scylla_channel_event_stream(subq, cfg, scyconf, ncc).await
@@ -154,12 +109,10 @@ async fn make_channel_events_stream(
Ok(ret)
}
pub type BytesStreamBox = Pin<Box<dyn Stream<Item = Result<Bytes, Error>> + Send>>;
pub async fn create_response_bytes_stream(
evq: EventsSubQuery,
ncc: &NodeConfigCached,
) -> Result<BytesStreamBox, Error> {
) -> Result<BoxedBytesStream, Error> {
debug!(
"create_response_bytes_stream {:?} {:?}",
evq.ch_conf().scalar_type(),
@@ -180,9 +133,8 @@ pub async fn create_response_bytes_stream(
let ret = Box::pin(stream);
Ok(ret)
} else {
let stream = make_channel_events_stream(evq.clone(), reqctx, ncc).await?;
let mut tr = build_event_transform(evq.transform())?;
let stream = make_channel_events_stream(evq, reqctx, ncc).await?;
let stream = stream.map(move |x| {
on_sitemty_data!(x, |x: ChannelEvents| {
match x {

View File

@@ -1,4 +1,5 @@
pub mod channelconfig;
pub mod client;
pub mod configquorum;
pub mod conn;
pub mod scylla;

View File

@@ -16,6 +16,8 @@ use std::task::Context;
use std::task::Poll;
use tokio::io::AsyncRead;
pub type BoxedBytesStream = Pin<Box<dyn Stream<Item = Result<Bytes, Error>> + Send>>;
#[allow(unused)]
macro_rules! trace2 {
($($arg:tt)*) => ();

View File

@@ -1,7 +1,12 @@
use crate::frames::inmem::BoxedBytesStream;
use crate::transform::build_event_transform;
use err::Error;
use futures_util::Future;
use futures_util::FutureExt;
use futures_util::Stream;
use futures_util::StreamExt;
use items_0::container::ByteEstimate;
use items_0::on_sitemty_data;
use items_0::streamitem::sitem_data;
use items_0::streamitem::RangeCompletableItem;
use items_0::streamitem::Sitemty;
@@ -10,18 +15,118 @@ use items_0::Appendable;
use items_0::Empty;
use items_0::WithLen;
use items_2::channelevents::ChannelEvents;
use items_2::empty::empty_events_dyn_ev;
use items_2::eventsdim0::EventsDim0;
use items_2::eventsdim1::EventsDim1;
use items_2::framable::Framable;
use netpod::log::*;
use netpod::range::evrange::SeriesRange;
use netpod::timeunits::DAY;
use netpod::timeunits::MS;
use query::api4::events::EventsSubQuery;
use std::f64::consts::PI;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
use std::time::Duration;
pub fn make_test_channel_events_bytes_stream(
subq: EventsSubQuery,
node_count: u64,
node_ix: u64,
) -> Result<BoxedBytesStream, Error> {
if subq.is_event_blobs() {
let e = Error::with_msg_no_trace("evq.is_event_blobs() not supported in this generator");
error!("{e}");
Err(e)
} else {
let mut tr = build_event_transform(subq.transform())?;
let stream = make_test_channel_events_stream_data(subq, node_count, node_ix)?;
let stream = stream.map(move |x| {
on_sitemty_data!(x, |x: ChannelEvents| {
match x {
ChannelEvents::Events(evs) => {
let evs = tr.0.transform(evs);
Ok(StreamItem::DataItem(RangeCompletableItem::Data(ChannelEvents::Events(
evs,
))))
}
ChannelEvents::Status(x) => Ok(StreamItem::DataItem(RangeCompletableItem::Data(
ChannelEvents::Status(x),
))),
}
})
});
let stream = stream.map(|x| x.make_frame().map(|x| x.freeze()));
let ret = Box::pin(stream);
Ok(ret)
}
}
// is also used from nodenet::conn
pub fn make_test_channel_events_stream_data(
subq: EventsSubQuery,
node_count: u64,
node_ix: u64,
) -> Result<Pin<Box<dyn Stream<Item = Sitemty<ChannelEvents>> + Send>>, Error> {
let empty = empty_events_dyn_ev(subq.ch_conf().scalar_type(), subq.ch_conf().shape())?;
let empty = sitem_data(ChannelEvents::Events(empty));
let stream = make_test_channel_events_stream_data_inner(subq, node_count, node_ix)?;
let ret = futures_util::stream::iter([empty]).chain(stream);
let ret = Box::pin(ret);
Ok(ret)
}
fn make_test_channel_events_stream_data_inner(
subq: EventsSubQuery,
node_count: u64,
node_ix: u64,
) -> Result<Pin<Box<dyn Stream<Item = Sitemty<ChannelEvents>> + Send>>, Error> {
debug!("use test backend data");
let chn = subq.name();
let range = subq.range().clone();
let one_before = subq.transform().need_one_before_range();
if chn == "test-gen-i32-dim0-v00" {
Ok(Box::pin(GenerateI32V00::new(node_ix, node_count, range, one_before)))
} else if chn == "test-gen-i32-dim0-v01" {
Ok(Box::pin(GenerateI32V01::new(node_ix, node_count, range, one_before)))
} else if chn == "test-gen-f64-dim1-v00" {
Ok(Box::pin(GenerateF64V00::new(node_ix, node_count, range, one_before)))
} else {
let na: Vec<_> = chn.split("-").collect();
if na.len() != 3 {
Err(Error::with_msg_no_trace(format!(
"make_channel_events_stream_data can not understand test channel name: {chn:?}"
)))
} else {
if na[0] != "inmem" {
Err(Error::with_msg_no_trace(format!(
"make_channel_events_stream_data can not understand test channel name: {chn:?}"
)))
} else {
let _range = subq.range().clone();
if na[1] == "d0" {
if na[2] == "i32" {
//generator::generate_i32(node_ix, node_count, range)
panic!()
} else if na[2] == "f32" {
//generator::generate_f32(node_ix, node_count, range)
panic!()
} else {
Err(Error::with_msg_no_trace(format!(
"make_channel_events_stream_data can not understand test channel name: {chn:?}"
)))
}
} else {
Err(Error::with_msg_no_trace(format!(
"make_channel_events_stream_data can not understand test channel name: {chn:?}"
)))
}
}
}
}
}
pub struct GenerateI32V00 {
ts: u64,
dts: u64,

View File

@@ -1,4 +1,5 @@
use crate::plaineventsstream::dyn_events_stream;
use crate::tcprawclient::OpenBoxedBytesStreamsBox;
use bytes::Bytes;
use err::Error;
use futures_util::future;
@@ -29,9 +30,9 @@ pub async fn plain_events_cbor(
evq: &PlainEventsQuery,
ch_conf: ChannelTypeConfigGen,
ctx: &ReqCtx,
ncc: &NodeConfigCached,
open_bytes: OpenBoxedBytesStreamsBox,
) -> Result<CborStream, Error> {
let stream = dyn_events_stream(evq, ch_conf, ctx, &ncc.node_config.cluster).await?;
let stream = dyn_events_stream(evq, ch_conf, ctx, open_bytes).await?;
let stream = stream
.map(|x| match x {
Ok(x) => match x {

View File

@@ -1,5 +1,6 @@
use crate::collect::Collect;
use crate::plaineventsstream::dyn_events_stream;
use crate::tcprawclient::OpenBoxedBytesStreamsBox;
use err::Error;
use futures_util::StreamExt;
use items_0::collect_s::Collectable;
@@ -16,12 +17,13 @@ pub async fn plain_events_json(
evq: &PlainEventsQuery,
ch_conf: ChannelTypeConfigGen,
ctx: &ReqCtx,
cluster: &Cluster,
_cluster: &Cluster,
open_bytes: OpenBoxedBytesStreamsBox,
) -> Result<JsonValue, Error> {
info!("plain_events_json evquery {:?}", evq);
let deadline = Instant::now() + evq.timeout();
let stream = dyn_events_stream(evq, ch_conf, ctx, cluster).await?;
let stream = dyn_events_stream(evq, ch_conf, ctx, open_bytes).await?;
let stream = stream.map(move |k| {
on_sitemty_data!(k, |k| {

View File

@@ -1,4 +1,6 @@
use crate::tcprawclient::open_event_data_streams;
use crate::tcprawclient::container_stream_from_bytes_stream;
use crate::tcprawclient::make_sub_query;
use crate::tcprawclient::OpenBoxedBytesStreamsBox;
use crate::transform::build_merged_event_transform;
use err::Error;
use futures_util::Stream;
@@ -12,11 +14,7 @@ use items_2::channelevents::ChannelEvents;
use items_2::merger::Merger;
use netpod::log::*;
use netpod::ChannelTypeConfigGen;
use netpod::Cluster;
use netpod::ReqCtx;
use query::api4::events::EventsSubQuery;
use query::api4::events::EventsSubQuerySelect;
use query::api4::events::EventsSubQuerySettings;
use query::api4::events::PlainEventsQuery;
use std::pin::Pin;
@@ -26,17 +24,26 @@ pub async fn dyn_events_stream(
evq: &PlainEventsQuery,
ch_conf: ChannelTypeConfigGen,
ctx: &ReqCtx,
cluster: &Cluster,
open_bytes: OpenBoxedBytesStreamsBox,
) -> Result<DynEventsStream, Error> {
let mut select = EventsSubQuerySelect::new(ch_conf, evq.range().clone(), evq.transform().clone());
if let Some(x) = evq.test_do_wasm() {
select.set_wasm1(x.into());
}
let settings = EventsSubQuerySettings::from(evq);
let subq = EventsSubQuery::from_parts(select, settings, ctx.reqid().into());
let subq = make_sub_query(
ch_conf,
evq.range().clone(),
evq.transform().clone(),
evq.test_do_wasm(),
evq,
ctx,
);
let inmem_bufcap = subq.inmem_bufcap();
let mut tr = build_merged_event_transform(evq.transform())?;
let bytes_streams = open_bytes.open(subq, ctx.clone()).await?;
let mut inps = Vec::new();
for s in bytes_streams {
let s = container_stream_from_bytes_stream::<ChannelEvents>(s, inmem_bufcap.clone(), "TODOdbgdesc".into())?;
let s = Box::pin(s) as Pin<Box<dyn Stream<Item = Sitemty<ChannelEvents>> + Send>>;
inps.push(s);
}
// TODO make sure the empty container arrives over the network.
let inps = open_event_data_streams::<ChannelEvents>(subq, ctx, cluster).await?;
// TODO propagate also the max-buf-len for the first stage event reader.
// TODO use a mixture of count and byte-size as threshold.
let stream = Merger::new(inps, evq.merger_out_len_max());

View File

@@ -4,9 +4,11 @@
//! to request such data from nodes.
use crate::frames::eventsfromframes::EventsFromFrames;
use crate::frames::inmem::BoxedBytesStream;
use crate::frames::inmem::InMemoryFrameStream;
use crate::frames::inmem::TcpReadAsBytes;
use err::Error;
use futures_util::Future;
use futures_util::Stream;
use http::Uri;
use httpclient::body_bytes;
@@ -19,18 +21,36 @@ use items_2::framable::EventQueryJsonStringFrame;
use items_2::framable::Framable;
use items_2::frame::make_term_frame;
use netpod::log::*;
use netpod::range::evrange::SeriesRange;
use netpod::ByteSize;
use netpod::ChannelTypeConfigGen;
use netpod::Cluster;
use netpod::Node;
use netpod::ReqCtx;
use netpod::APP_OCTET;
use query::api4::events::EventsSubQuery;
use query::api4::events::EventsSubQuerySelect;
use query::api4::events::EventsSubQuerySettings;
use query::api4::events::Frame1Parts;
use query::transform::TransformQuery;
use serde::de::DeserializeOwned;
use std::fmt;
use std::pin::Pin;
use tokio::io::AsyncWriteExt;
use tokio::net::TcpStream;
pub const TEST_BACKEND: &str = "testbackend-00";
pub trait OpenBoxedBytesStreams {
fn open(
&self,
subq: EventsSubQuery,
ctx: ReqCtx,
) -> Pin<Box<dyn Future<Output = Result<Vec<BoxedBytesStream>, Error>> + Send>>;
}
pub type OpenBoxedBytesStreamsBox = Pin<Box<dyn OpenBoxedBytesStreams + Send>>;
pub fn make_node_command_frame(query: EventsSubQuery) -> Result<EventQueryJsonStringFrame, Error> {
let obj = Frame1Parts::new(query);
let ret = serde_json::to_string(&obj)?;
@@ -53,7 +73,8 @@ pub async fn x_processed_event_blobs_stream_from_node_tcp(
netout.write_all(&buf).await?;
netout.flush().await?;
netout.forget();
let frames = InMemoryFrameStream::new(TcpReadAsBytes::new(netin), subq.inmem_bufcap());
let inp = Box::pin(TcpReadAsBytes::new(netin)) as BoxedBytesStream;
let frames = InMemoryFrameStream::new(inp, subq.inmem_bufcap());
let frames = Box::pin(frames);
let items = EventsFromFrames::new(frames, addr);
Ok(Box::pin(items))
@@ -106,7 +127,8 @@ pub async fn x_processed_event_blobs_stream_from_node_http(
)));
}
let (_head, body) = res.into_parts();
let frames = InMemoryFrameStream::new(httpclient::IncomingStream::new(body), subq.inmem_bufcap());
let inp = Box::pin(httpclient::IncomingStream::new(body)) as BoxedBytesStream;
let frames = InMemoryFrameStream::new(inp, subq.inmem_bufcap());
let frames = Box::pin(frames);
let stream = EventsFromFrames::new(frames, url.to_string());
debug!("open_event_data_streams_http done {url}");
@@ -129,6 +151,7 @@ pub async fn x_processed_event_blobs_stream_from_node(
pub type BoxedStream<T> = Pin<Box<dyn Stream<Item = Sitemty<T>> + Send>>;
#[allow(unused)]
async fn open_event_data_streams_tcp<T>(subq: EventsSubQuery, cluster: &Cluster) -> Result<Vec<BoxedStream<T>>, Error>
where
// TODO group bounds in new trait
@@ -150,7 +173,8 @@ where
netout.flush().await?;
netout.forget();
// TODO for images, we need larger buffer capacity
let frames = InMemoryFrameStream::new(TcpReadAsBytes::new(netin), subq.inmem_bufcap());
let inp = Box::pin(TcpReadAsBytes::new(netin)) as BoxedBytesStream;
let frames = InMemoryFrameStream::new(inp, subq.inmem_bufcap());
let frames = Box::pin(frames);
let stream = EventsFromFrames::<T>::new(frames, addr);
streams.push(Box::pin(stream) as _);
@@ -158,79 +182,37 @@ where
Ok(streams)
}
async fn open_event_data_streams_http<T>(
subq: EventsSubQuery,
ctx: &ReqCtx,
cluster: &Cluster,
) -> Result<Vec<BoxedStream<T>>, Error>
pub fn container_stream_from_bytes_stream<T>(
inp: BoxedBytesStream,
bufcap: ByteSize,
dbgdesc: String,
) -> Result<impl Stream<Item = Sitemty<T>>, Error>
where
// TODO group bounds in new trait
T: FrameTypeInnerStatic + DeserializeOwned + Send + Unpin + fmt::Debug + 'static,
{
let frame1 = make_node_command_frame(subq.clone())?;
let mut streams = Vec::new();
for node in &cluster.nodes {
use http::header;
use http::Method;
use http::Request;
use httpclient::hyper::StatusCode;
let item = sitem_data(frame1.clone());
let buf = item.make_frame()?;
let url = node.baseurl().join("/api/4/private/eventdata/frames").unwrap();
debug!("open_event_data_streams_http post {url}");
let uri: Uri = url.as_str().parse().unwrap();
let req = Request::builder()
.method(Method::POST)
.uri(&uri)
.header(header::HOST, uri.host().unwrap())
.header(header::ACCEPT, APP_OCTET)
.header(ctx.header_name(), ctx.header_value())
.body(body_bytes(buf))
.map_err(|e| Error::with_msg_no_trace(e.to_string()))?;
let mut client = httpclient::connect_client(req.uri()).await?;
let res = client
.send_request(req)
.await
.map_err(|e| Error::with_msg_no_trace(e.to_string()))?;
if res.status() != StatusCode::OK {
error!("Server error {:?}", res);
let (head, body) = res.into_parts();
let buf = httpclient::read_body_bytes(body).await?;
let s = String::from_utf8_lossy(&buf);
return Err(Error::with_msg(format!(
concat!(
"Server error {:?}\n",
"---------------------- message from http body:\n",
"{}\n",
"---------------------- end of http body",
),
head, s
)));
}
let (_head, body) = res.into_parts();
let frames = InMemoryFrameStream::new(httpclient::IncomingStream::new(body), subq.inmem_bufcap());
let frames = Box::pin(frames);
let stream = EventsFromFrames::<T>::new(frames, url.to_string());
debug!("open_event_data_streams_http done {url}");
streams.push(Box::pin(stream) as _);
}
Ok(streams)
let frames = InMemoryFrameStream::new(inp, bufcap);
// TODO let EventsFromFrames accept also non-boxed input?
let frames = Box::pin(frames);
let stream = EventsFromFrames::<T>::new(frames, dbgdesc);
Ok(stream)
}
pub async fn open_event_data_streams<T>(
subq: EventsSubQuery,
pub fn make_sub_query<SUB>(
ch_conf: ChannelTypeConfigGen,
range: SeriesRange,
transform: TransformQuery,
test_do_wasm: Option<&str>,
sub: SUB,
ctx: &ReqCtx,
cluster: &Cluster,
) -> Result<Vec<BoxedStream<T>>, Error>
) -> EventsSubQuery
where
// TODO group bounds in new trait
T: FrameTypeInnerStatic + DeserializeOwned + Send + Unpin + fmt::Debug + 'static,
SUB: Into<EventsSubQuerySettings>,
{
if true {
open_event_data_streams_http(subq, ctx, cluster).await
} else {
open_event_data_streams_tcp(subq, cluster).await
let mut select = EventsSubQuerySelect::new(ch_conf, range, transform);
if let Some(wasm1) = test_do_wasm {
select.set_wasm1(wasm1.into());
}
let settings = sub.into();
let subq = EventsSubQuery::from_parts(select, settings, ctx.reqid().into());
subq
}

View File

@@ -1,6 +1,8 @@
#[cfg(test)]
mod collect;
#[cfg(test)]
mod events;
#[cfg(test)]
mod timebin;
use err::Error;
@@ -25,7 +27,7 @@ fn inmem_test_events_d0_i32_00() -> BoxedEventStream {
evs.push(SEC * 4, 4, 10004);
let cev = ChannelEvents::Events(Box::new(evs));
let item = sitem_data(cev);
let stream = stream::iter(vec![item]);
let stream = stream::iter([item]);
Box::pin(stream)
}
@@ -34,7 +36,7 @@ fn inmem_test_events_d0_i32_01() -> BoxedEventStream {
evs.push(SEC * 2, 2, 10002);
let cev = ChannelEvents::Events(Box::new(evs));
let item = sitem_data(cev);
let stream = stream::iter(vec![item]);
let stream = stream::iter([item]);
Box::pin(stream)
}

View File

@@ -0,0 +1,68 @@
use crate::frames::inmem::BoxedBytesStream;
use crate::plaineventscbor::plain_events_cbor;
use crate::tcprawclient::OpenBoxedBytesStreams;
use crate::tcprawclient::TEST_BACKEND;
use err::Error;
use futures_util::Future;
use futures_util::StreamExt;
use netpod::range::evrange::NanoRange;
use netpod::range::evrange::SeriesRange;
use netpod::ChConf;
use netpod::ReqCtx;
use netpod::ScalarType;
use netpod::SfDbChannel;
use netpod::Shape;
use query::api4::events::EventsSubQuery;
use query::api4::events::PlainEventsQuery;
use std::pin::Pin;
#[test]
fn merged_events_cbor() {
crate::test::runfut(merged_events_inner()).unwrap();
}
async fn merged_events_inner() -> Result<(), Error> {
let ctx = ReqCtx::for_test();
let ch_conf = ChConf::new(TEST_BACKEND, 1, ScalarType::F64, Shape::Scalar, "test-gen-i32-dim0-v00");
let channel = SfDbChannel::from_name(ch_conf.backend(), ch_conf.name());
let range = SeriesRange::TimeRange(NanoRange::from_date_time(
"2023-12-18T05:10:00Z".parse().unwrap(),
"2023-12-18T05:10:10Z".parse().unwrap(),
));
let evq = PlainEventsQuery::new(channel, range);
let open_bytes = StreamOpener::new();
let open_bytes = Box::pin(open_bytes);
let mut res = plain_events_cbor(&evq, ch_conf.into(), &ctx, open_bytes).await.unwrap();
// TODO parse the cbor stream and assert
while let Some(x) = res.next().await {
let item = x?;
let bytes = item.into_inner();
eprintln!("bytes len {}", bytes.len());
}
Ok(())
}
struct StreamOpener {}
impl StreamOpener {
fn new() -> Self {
Self {}
}
}
impl OpenBoxedBytesStreams for StreamOpener {
fn open(
&self,
subq: EventsSubQuery,
_ctx: ReqCtx,
) -> Pin<Box<dyn Future<Output = Result<Vec<BoxedBytesStream>, Error>> + Send>> {
Box::pin(stream_opener(subq))
}
}
async fn stream_opener(subq: EventsSubQuery) -> Result<Vec<BoxedBytesStream>, Error> {
let mut streams = Vec::new();
let stream = crate::generators::make_test_channel_events_bytes_stream(subq, 1, 0)?;
streams.push(stream);
Ok(streams)
}

View File

@@ -1,6 +1,8 @@
use crate::collect::Collect;
use crate::rangefilter2::RangeFilter2;
use crate::tcprawclient::open_event_data_streams;
use crate::tcprawclient::container_stream_from_bytes_stream;
use crate::tcprawclient::make_sub_query;
use crate::tcprawclient::OpenBoxedBytesStreamsBox;
use crate::timebin::TimeBinnedStream;
use crate::transform::build_merged_event_transform;
use crate::transform::EventsToTimeBinnable;
@@ -25,9 +27,6 @@ use netpod::ChannelTypeConfigGen;
use netpod::Cluster;
use netpod::ReqCtx;
use query::api4::binned::BinnedQuery;
use query::api4::events::EventsSubQuery;
use query::api4::events::EventsSubQuerySelect;
use query::api4::events::EventsSubQuerySettings;
use serde_json::Value as JsonValue;
use std::pin::Pin;
use std::time::Instant;
@@ -43,16 +42,26 @@ async fn timebinnable_stream(
one_before_range: bool,
ch_conf: ChannelTypeConfigGen,
ctx: &ReqCtx,
cluster: Cluster,
open_bytes: OpenBoxedBytesStreamsBox,
) -> Result<TimeBinnableStreamBox, Error> {
let mut select = EventsSubQuerySelect::new(ch_conf, range.clone().into(), query.transform().clone());
if let Some(wasm1) = query.test_do_wasm() {
select.set_wasm1(wasm1.into());
}
let settings = EventsSubQuerySettings::from(&query);
let subq = EventsSubQuery::from_parts(select.clone(), settings, ctx.reqid().into());
let subq = make_sub_query(
ch_conf,
range.clone().into(),
query.transform().clone(),
query.test_do_wasm(),
&query,
ctx,
);
let inmem_bufcap = subq.inmem_bufcap();
let wasm1 = subq.wasm1().map(ToString::to_string);
let mut tr = build_merged_event_transform(subq.transform())?;
let inps = open_event_data_streams::<ChannelEvents>(subq, ctx, &cluster).await?;
let bytes_streams = open_bytes.open(subq, ctx.clone()).await?;
let mut inps = Vec::new();
for s in bytes_streams {
let s = container_stream_from_bytes_stream::<ChannelEvents>(s, inmem_bufcap.clone(), "TODOdbgdesc".into())?;
let s = Box::pin(s) as Pin<Box<dyn Stream<Item = Sitemty<ChannelEvents>> + Send>>;
inps.push(s);
}
// TODO propagate also the max-buf-len for the first stage event reader.
// TODO use a mixture of count and byte-size as threshold.
let stream = Merger::new(inps, query.merger_out_len_max());
@@ -68,7 +77,7 @@ async fn timebinnable_stream(
})
});
let stream = if let Some(wasmname) = select.wasm1() {
let stream = if let Some(wasmname) = wasm1 {
debug!("make wasm transform");
use httpclient::url::Url;
use wasmer::Value;
@@ -185,9 +194,6 @@ async fn timebinnable_stream(
// Box::new(item) as Box<dyn Framable + Send>
item
});
use futures_util::Stream;
use items_0::streamitem::Sitemty;
use std::pin::Pin;
Box::pin(stream) as Pin<Box<dyn Stream<Item = Sitemty<Box<dyn Events>>> + Send>>
} else {
let stream = stream.map(|x| x);
@@ -212,14 +218,14 @@ async fn timebinned_stream(
binned_range: BinnedRangeEnum,
ch_conf: ChannelTypeConfigGen,
ctx: &ReqCtx,
cluster: Cluster,
open_bytes: OpenBoxedBytesStreamsBox,
) -> Result<Pin<Box<dyn Stream<Item = Sitemty<Box<dyn TimeBinned>>> + Send>>, Error> {
let range = binned_range.binned_range_time().to_nano_range();
let do_time_weight = true;
let one_before_range = true;
let stream = timebinnable_stream(query.clone(), range, one_before_range, ch_conf, ctx, cluster).await?;
let stream = timebinnable_stream(query.clone(), range, one_before_range, ch_conf, ctx, open_bytes).await?;
let stream: Pin<Box<dyn TimeBinnableStreamTrait>> = stream.0;
let stream = Box::pin(stream);
// TODO rename TimeBinnedStream to make it more clear that it is the component which initiates the time binning.
@@ -246,12 +252,12 @@ pub async fn timebinned_json(
query: BinnedQuery,
ch_conf: ChannelTypeConfigGen,
ctx: &ReqCtx,
cluster: Cluster,
open_bytes: OpenBoxedBytesStreamsBox,
) -> Result<JsonValue, Error> {
let deadline = Instant::now().checked_add(query.timeout_value()).unwrap();
let binned_range = BinnedRangeEnum::covering_range(query.range().clone(), query.bin_count())?;
let collect_max = 10000;
let stream = timebinned_stream(query.clone(), binned_range.clone(), ch_conf, ctx, cluster).await?;
let stream = timebinned_stream(query.clone(), binned_range.clone(), ch_conf, ctx, open_bytes).await?;
let stream = timebinned_to_collectable(stream);
let collected = Collect::new(stream, deadline, collect_max, None, Some(binned_range));
let collected: BoxFuture<_> = Box::pin(collected);