WIP (write message)

This commit is contained in:
Dominik Werder
2023-12-07 16:33:52 +01:00
parent f946d1b6d9
commit 90fe23b676
28 changed files with 365 additions and 205 deletions

View File

@@ -14,6 +14,7 @@ use items_2::streams::PlainEventStream;
use netpod::log::*;
use netpod::ChannelTypeConfigGen;
use netpod::Cluster;
use netpod::ReqCtx;
use query::api4::events::EventsSubQuery;
use query::api4::events::EventsSubQuerySelect;
use query::api4::events::EventsSubQuerySettings;
@@ -24,7 +25,7 @@ use std::time::Instant;
pub async fn plain_events_json(
evq: &PlainEventsQuery,
ch_conf: ChannelTypeConfigGen,
reqid: String,
ctx: &ReqCtx,
cluster: &Cluster,
) -> Result<JsonValue, Error> {
info!("plain_events_json evquery {:?}", evq);
@@ -33,12 +34,12 @@ pub async fn plain_events_json(
select.set_wasm1(x.into());
}
let settings = EventsSubQuerySettings::from(evq);
let subq = EventsSubQuery::from_parts(select, settings, reqid);
let subq = EventsSubQuery::from_parts(select, settings, ctx.reqid().into());
// TODO remove magic constant
let deadline = Instant::now() + evq.timeout();
let mut tr = build_merged_event_transform(evq.transform())?;
// TODO make sure the empty container arrives over the network.
let inps = open_event_data_streams::<ChannelEvents>(subq, cluster).await?;
let inps = open_event_data_streams::<ChannelEvents>(subq, ctx, cluster).await?;
// TODO propagate also the max-buf-len for the first stage event reader.
// TODO use a mixture of count and byte-size as threshold.
let stream = Merger::new(inps, evq.merger_out_len_max());
@@ -72,6 +73,7 @@ pub async fn plain_events_json(
let t = httpclient::http_get(
Url::parse(&format!("http://data-api.psi.ch/distri/{}", wasmname)).unwrap(),
"*/*",
ctx,
)
.await
.unwrap();

View File

@@ -21,6 +21,7 @@ use items_2::frame::make_term_frame;
use netpod::log::*;
use netpod::Cluster;
use netpod::Node;
use netpod::ReqCtx;
use netpod::APP_OCTET;
use query::api4::events::EventsSubQuery;
use query::api4::events::Frame1Parts;
@@ -61,6 +62,7 @@ pub async fn x_processed_event_blobs_stream_from_node_tcp(
pub async fn x_processed_event_blobs_stream_from_node_http(
subq: EventsSubQuery,
node: Node,
ctx: &ReqCtx,
) -> Result<Pin<Box<dyn Stream<Item = Sitemty<EventFull>> + Send>>, Error> {
use http::header;
use http::Method;
@@ -80,6 +82,7 @@ pub async fn x_processed_event_blobs_stream_from_node_http(
.uri(&uri)
.header(header::HOST, uri.host().unwrap())
.header(header::ACCEPT, APP_OCTET)
.header(ctx.header_name(), ctx.header_value())
.body(body_bytes(buf))
.map_err(|e| Error::with_msg_no_trace(e.to_string()))?;
let mut client = httpclient::connect_client(req.uri()).await?;
@@ -115,9 +118,10 @@ pub async fn x_processed_event_blobs_stream_from_node_http(
pub async fn x_processed_event_blobs_stream_from_node(
subq: EventsSubQuery,
node: Node,
ctx: ReqCtx,
) -> Result<Pin<Box<dyn Stream<Item = Sitemty<EventFull>> + Send>>, Error> {
if true {
x_processed_event_blobs_stream_from_node_http(subq, node).await
x_processed_event_blobs_stream_from_node_http(subq, node, &ctx).await
} else {
x_processed_event_blobs_stream_from_node_tcp(subq, node).await
}
@@ -154,7 +158,11 @@ where
Ok(streams)
}
async fn open_event_data_streams_http<T>(subq: EventsSubQuery, cluster: &Cluster) -> Result<Vec<BoxedStream<T>>, Error>
async fn open_event_data_streams_http<T>(
subq: EventsSubQuery,
ctx: &ReqCtx,
cluster: &Cluster,
) -> Result<Vec<BoxedStream<T>>, Error>
where
// TODO group bounds in new trait
T: FrameTypeInnerStatic + DeserializeOwned + Send + Unpin + fmt::Debug + 'static,
@@ -178,6 +186,7 @@ where
.uri(&uri)
.header(header::HOST, uri.host().unwrap())
.header(header::ACCEPT, APP_OCTET)
.header(ctx.header_name(), ctx.header_value())
.body(body_bytes(buf))
.map_err(|e| Error::with_msg_no_trace(e.to_string()))?;
let mut client = httpclient::connect_client(req.uri()).await?;
@@ -210,13 +219,17 @@ where
Ok(streams)
}
pub async fn open_event_data_streams<T>(subq: EventsSubQuery, cluster: &Cluster) -> Result<Vec<BoxedStream<T>>, Error>
pub async fn open_event_data_streams<T>(
subq: EventsSubQuery,
ctx: &ReqCtx,
cluster: &Cluster,
) -> Result<Vec<BoxedStream<T>>, Error>
where
// TODO group bounds in new trait
T: FrameTypeInnerStatic + DeserializeOwned + Send + Unpin + fmt::Debug + 'static,
{
if true {
open_event_data_streams_http(subq, cluster).await
open_event_data_streams_http(subq, ctx, cluster).await
} else {
open_event_data_streams_tcp(subq, cluster).await
}

View File

@@ -23,6 +23,7 @@ use netpod::range::evrange::NanoRange;
use netpod::BinnedRangeEnum;
use netpod::ChannelTypeConfigGen;
use netpod::Cluster;
use netpod::ReqCtx;
use query::api4::binned::BinnedQuery;
use query::api4::events::EventsSubQuery;
use query::api4::events::EventsSubQuerySelect;
@@ -41,7 +42,7 @@ async fn timebinnable_stream(
range: NanoRange,
one_before_range: bool,
ch_conf: ChannelTypeConfigGen,
reqid: String,
ctx: &ReqCtx,
cluster: Cluster,
) -> Result<TimeBinnableStreamBox, Error> {
let mut select = EventsSubQuerySelect::new(ch_conf, range.clone().into(), query.transform().clone());
@@ -49,9 +50,9 @@ async fn timebinnable_stream(
select.set_wasm1(wasm1.into());
}
let settings = EventsSubQuerySettings::from(&query);
let subq = EventsSubQuery::from_parts(select.clone(), settings, reqid);
let subq = EventsSubQuery::from_parts(select.clone(), settings, ctx.reqid().into());
let mut tr = build_merged_event_transform(subq.transform())?;
let inps = open_event_data_streams::<ChannelEvents>(subq, &cluster).await?;
let inps = open_event_data_streams::<ChannelEvents>(subq, ctx, &cluster).await?;
// TODO propagate also the max-buf-len for the first stage event reader.
// TODO use a mixture of count and byte-size as threshold.
let stream = Merger::new(inps, query.merger_out_len_max());
@@ -75,6 +76,7 @@ async fn timebinnable_stream(
let t = httpclient::http_get(
Url::parse(&format!("http://data-api.psi.ch/distri/{}", wasmname)).unwrap(),
"*/*",
ctx,
)
.await
.unwrap();
@@ -209,7 +211,7 @@ async fn timebinned_stream(
query: BinnedQuery,
binned_range: BinnedRangeEnum,
ch_conf: ChannelTypeConfigGen,
reqid: String,
ctx: &ReqCtx,
cluster: Cluster,
) -> Result<Pin<Box<dyn Stream<Item = Sitemty<Box<dyn TimeBinned>>> + Send>>, Error> {
let range = binned_range.binned_range_time().to_nano_range();
@@ -217,7 +219,7 @@ async fn timebinned_stream(
let do_time_weight = true;
let one_before_range = true;
let stream = timebinnable_stream(query.clone(), range, one_before_range, ch_conf, reqid, cluster).await?;
let stream = timebinnable_stream(query.clone(), range, one_before_range, ch_conf, ctx, cluster).await?;
let stream: Pin<Box<dyn TimeBinnableStreamTrait>> = stream.0;
let stream = Box::pin(stream);
// TODO rename TimeBinnedStream to make it more clear that it is the component which initiates the time binning.
@@ -243,13 +245,13 @@ fn timebinned_to_collectable(
pub async fn timebinned_json(
query: BinnedQuery,
ch_conf: ChannelTypeConfigGen,
reqid: String,
ctx: &ReqCtx,
cluster: Cluster,
) -> Result<JsonValue, Error> {
let deadline = Instant::now().checked_add(query.timeout_value()).unwrap();
let binned_range = BinnedRangeEnum::covering_range(query.range().clone(), query.bin_count())?;
let collect_max = 10000;
let stream = timebinned_stream(query.clone(), binned_range.clone(), ch_conf, reqid, cluster).await?;
let stream = timebinned_stream(query.clone(), binned_range.clone(), ch_conf, ctx, cluster).await?;
let stream = timebinned_to_collectable(stream);
let collected = Collect::new(stream, deadline, collect_max, None, Some(binned_range));
let collected: BoxFuture<_> = Box::pin(collected);