This commit is contained in:
Dominik Werder
2023-04-19 15:47:37 +02:00
parent 868f4b014a
commit f959f04399
8 changed files with 174 additions and 31 deletions

View File

@@ -45,7 +45,7 @@ async fn binned_json(url: Url, req: Request<Body>, node_config: &NodeConfigCache
span1.in_scope(|| {
debug!("begin");
});
let item = streams::timebinnedjson::timebinned_json(&query, &chconf, &node_config.node_config.cluster)
let item = streams::timebinnedjson::timebinned_json(query, chconf, node_config.node_config.cluster.clone())
.instrument(span1)
.await?;
let buf = serde_json::to_vec(&item)?;

View File

@@ -20,7 +20,7 @@ pub trait TimeBins {
fn ts_min_max(&self) -> Option<(u64, u64)>;
}
pub trait TimeBinnerTy: fmt::Debug + Unpin {
pub trait TimeBinnerTy: fmt::Debug + Send + Unpin {
type Input: fmt::Debug;
type Output: fmt::Debug;
@@ -44,7 +44,7 @@ pub trait TimeBinnerTy: fmt::Debug + Unpin {
fn empty(&self) -> Option<Self::Output>;
}
pub trait TimeBinnableTy: fmt::Debug + Sized {
pub trait TimeBinnableTy: fmt::Debug + Send + Sized {
type TimeBinner: TimeBinnerTy<Input = Self>;
fn time_binner_new(&self, binrange: BinnedRangeEnum, do_time_weight: bool) -> Self::TimeBinner;

View File

@@ -157,6 +157,22 @@ impl TimeBinnableStreamTrait for TimeBinnableStreamBox {}
pub struct CollectableStreamBox(pub Pin<Box<dyn CollectableStreamTrait>>);
impl Stream for CollectableStreamBox {
type Item = Sitemty<Box<dyn Collectable>>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
self.0.poll_next_unpin(cx)
}
}
impl WithTransformProperties for CollectableStreamBox {
fn query_transform_properties(&self) -> TransformProperties {
todo!()
}
}
impl CollectableStreamTrait for CollectableStreamBox {}
impl<T> WithTransformProperties for stream::Empty<T> {
fn query_transform_properties(&self) -> TransformProperties {
todo!()
@@ -169,3 +185,5 @@ where
stream::Empty<T>: Stream<Item = Sitemty<Box<dyn Collectable>>>,
{
}
impl<T> CollectableStreamTrait for Pin<Box<T>> where T: CollectableStreamTrait {}

View File

@@ -45,7 +45,7 @@ macro_rules! trace4 {
}
pub struct Collect {
inp: CollectableStreamBox,
inp: Pin<Box<dyn Stream<Item = Sitemty<Box<dyn Collectable>>> + Send>>,
events_max: u64,
range: Option<SeriesRange>,
binrange: Option<BinnedRangeEnum>,
@@ -57,19 +57,16 @@ pub struct Collect {
}
impl Collect {
pub fn new<INP>(
inp: INP,
pub fn new(
inp: Pin<Box<dyn Stream<Item = Sitemty<Box<dyn Collectable>>> + Send>>,
deadline: Instant,
events_max: u64,
range: Option<SeriesRange>,
binrange: Option<BinnedRangeEnum>,
) -> Self
where
INP: CollectableStreamTrait + 'static,
{
) -> Self {
let timer = tokio::time::sleep_until(deadline.into());
Self {
inp: CollectableStreamBox(Box::pin(inp)),
inp,
events_max,
range,
binrange,
@@ -181,7 +178,7 @@ impl Future for Collect {
self.done_input = true;
continue;
}
Pending => match self.inp.0.poll_next_unpin(cx) {
Pending => match self.inp.poll_next_unpin(cx) {
Ready(Some(item)) => match self.handle_item(item) {
Ok(()) => {
continue;

View File

@@ -5,6 +5,7 @@ use crate::transform::EventsToTimeBinnable;
use crate::transform::TimeBinnableToCollectable;
use err::Error;
use futures_util::StreamExt;
use items_0::collect_s::Collectable;
use items_0::on_sitemty_data;
use items_0::Events;
use items_2::channelevents::ChannelEvents;
@@ -48,12 +49,14 @@ pub async fn plain_events_json(
let k: Box<dyn Events> = Box::new(k);
info!("-------------------------\ngot len {}", k.len());
let k = tr.0.transform(k);
let k: Box<dyn Collectable> = Box::new(k);
Ok(StreamItem::DataItem(RangeCompletableItem::Data(k)))
})
});
let stream = PlainEventStream::new(stream);
let stream = EventsToTimeBinnable::new(stream);
let stream = TimeBinnableToCollectable::new(stream);
//let stream = PlainEventStream::new(stream);
//let stream = EventsToTimeBinnable::new(stream);
//let stream = TimeBinnableToCollectable::new(stream);
let stream = Box::pin(stream);
let collected = Collect::new(stream, deadline, evq.events_max(), Some(evq.range().clone()), None).await?;
let jsval = serde_json::to_value(&collected)?;
Ok(jsval)

View File

@@ -66,6 +66,7 @@ fn collect_channel_events_01() -> Result<(), Error> {
let stream = PlainEventStream::new(stream);
let stream = EventsToTimeBinnable::new(stream);
let stream = TimeBinnableToCollectable::new(stream);
let stream = Box::pin(stream);
let res = Collect::new(stream, deadline, events_max, None, None).await?;
if let Some(res) = res.as_any_ref().downcast_ref::<EventsDim0CollectorOutput<f32>>() {
eprintln!("Great, a match");
@@ -105,6 +106,7 @@ fn collect_channel_events_pulse_id_diff() -> Result<(), Error> {
let stream = Box::pin(stream);
let stream = build_time_binning_transform(&trqu, stream)?;
let stream = TimeBinnableToCollectable::new(stream);
let stream = Box::pin(stream);
let res = Collect::new(stream, deadline, events_max, None, None).await?;
if let Some(res) = res.as_any_ref().downcast_ref::<EventsDim0CollectorOutput<i64>>() {
eprintln!("Great, a match");

View File

@@ -6,20 +6,28 @@ use crate::transform::build_merged_event_transform;
use crate::transform::EventsToTimeBinnable;
use crate::transform::TimeBinnableToCollectable;
use err::Error;
use futures_util::future::BoxFuture;
use futures_util::stream;
use futures_util::stream::BoxStream;
use futures_util::Stream;
use futures_util::StreamExt;
use items_0::collect_s::Collectable;
use items_0::on_sitemty_data;
use items_0::streamitem::sitem_data;
use items_0::streamitem::Sitemty;
use items_0::timebin::TimeBinnable;
use items_0::timebin::TimeBinned;
use items_0::transform::CollectableStreamBox;
use items_0::transform::TimeBinnableStreamBox;
use items_0::transform::TimeBinnableStreamTrait;
use items_0::Events;
use items_2::channelevents::ChannelEvents;
use items_2::merger::Merger;
use items_2::streams::PlainEventStream;
use items_2::streams::PlainTimeBinnableStream;
use netpod::log::*;
use netpod::range::evrange::NanoRange;
use netpod::range::evrange::SeriesRange;
use netpod::BinnedRange;
use netpod::BinnedRangeEnum;
use netpod::ChConf;
@@ -27,28 +35,28 @@ use netpod::Cluster;
use query::api4::binned::BinnedQuery;
use query::api4::events::PlainEventsQuery;
use serde_json::Value as JsonValue;
use std::pin::Pin;
use std::time::Instant;
pub async fn timebinned_json(query: &BinnedQuery, chconf: &ChConf, cluster: &Cluster) -> Result<JsonValue, Error> {
let binned_range = BinnedRangeEnum::covering_range(query.range().clone(), query.bin_count())?;
let bins_max = 10000;
warn!("TODO add with_deadline to PlainEventsQuery");
let deadline = Instant::now() + query.timeout_value();
// TODO construct the events query in a better way.
fn assert_stream_send<'u, R>(stream: impl 'u + Send + Stream<Item = R>) -> impl 'u + Send + Stream<Item = R> {
stream
}
async fn timebinnable_stream(
query: BinnedQuery,
chconf: ChConf,
range: NanoRange,
one_before_range: bool,
cluster: Cluster,
) -> Result<TimeBinnableStreamBox, Error> {
let evq = PlainEventsQuery::new(query.channel().clone(), query.range().clone()).for_time_weighted_scalar();
let mut tr = build_merged_event_transform(evq.transform())?;
let inps = open_tcp_streams::<_, ChannelEvents>(&evq, cluster).await?;
let inps = open_tcp_streams::<_, ChannelEvents>(&evq, &cluster).await?;
// TODO propagate also the max-buf-len for the first stage event reader.
// TODO use a mixture of count and byte-size as threshold.
let stream = Merger::new(inps, evq.merger_out_len_max());
// TODO
let do_time_weight = true;
let one_before_range = true;
// TODO RangeFilter2 must accept SeriesRange
let range = query.range().try_into()?;
let stream = RangeFilter2::new(stream, range, one_before_range);
let stream = stream.map(move |k| {
@@ -62,10 +70,119 @@ pub async fn timebinned_json(query: &BinnedQuery, chconf: &ChConf, cluster: &Clu
let stream = PlainEventStream::new(stream);
let stream = EventsToTimeBinnable::new(stream);
let stream = Box::pin(stream);
Ok(TimeBinnableStreamBox(stream))
}
async fn timebinned_stream(
query: BinnedQuery,
chconf: ChConf,
cluster: Cluster,
) -> Result<Pin<Box<dyn Stream<Item = Sitemty<Box<dyn TimeBinned>>> + Send>>, Error> {
let deadline = Instant::now();
let range: NanoRange = query.range().try_into()?;
//let binned_range = BinnedRangeEnum::covering_range(SeriesRange::TimeRange(NanoRange { beg: 123, end: 456 }), 10)?;
let binned_range = BinnedRangeEnum::covering_range(query.range().clone(), query.bin_count())?;
let do_time_weight = true;
let one_before_range = true;
let stream = timebinnable_stream(query.clone(), chconf, range, one_before_range, cluster).await?;
let stream: Pin<Box<dyn TimeBinnableStreamTrait>> = stream.0;
let stream = Box::pin(stream);
let stream = TimeBinnedStream::new(stream, binned_range, do_time_weight, deadline);
let stream: Pin<Box<dyn Stream<Item = Sitemty<Box<dyn TimeBinned>>> + Send>> = Box::pin(stream);
Ok(stream)
}
fn timebinned_to_collectable(
stream: Pin<Box<dyn Stream<Item = Sitemty<Box<dyn TimeBinned>>> + Send>>,
) -> Pin<Box<dyn Stream<Item = Sitemty<Box<dyn Collectable>>> + Send>> {
let stream = stream.map(|k| {
on_sitemty_data!(k, |k| {
let k: Box<dyn Collectable> = Box::new(k);
info!("-------------------------\ngot len {}", k.len());
Ok(StreamItem::DataItem(RangeCompletableItem::Data(k)))
})
});
let stream: Pin<Box<dyn Stream<Item = Sitemty<Box<dyn Collectable>>> + Send>> = Box::pin(stream);
stream
}
pub async fn timebinned_json(query: BinnedQuery, chconf: ChConf, cluster: Cluster) -> Result<JsonValue, Error> {
let stream = timebinned_stream(query.clone(), chconf.clone(), cluster).await?;
let deadline = Instant::now();
let evq = PlainEventsQuery::new(query.channel().clone(), query.range().clone()).for_time_weighted_scalar();
let collect_range = evq.range().clone();
let events_max = evq.events_max();
let range: NanoRange = query.range().try_into()?;
let binned_range = BinnedRangeEnum::covering_range(SeriesRange::TimeRange(NanoRange { beg: 123, end: 456 }), 10)?;
let do_time_weight = true;
let one_before_range = true;
let stream = timebinned_to_collectable(stream);
/*
let stream = stream.map(|k| {
on_sitemty_data!(k, |k| {
let k: Box<dyn Collectable> = Box::new(k);
info!("-------------------------\ngot len {}", k.len());
Ok(StreamItem::DataItem(RangeCompletableItem::Data(k)))
})
});
let stream: Pin<Box<dyn Stream<Item = Sitemty<Box<dyn Collectable>>> + Send>> = Box::pin(stream);
*/
let collected = Collect::new(stream, deadline, events_max, Some(collect_range), None);
let collected: BoxFuture<_> = Box::pin(collected);
let collected = collected.await?;
let jsval = serde_json::to_value(&collected)?;
Ok(jsval)
}
pub async fn timebinned_json_2(query: BinnedQuery, chconf: ChConf, cluster: Cluster) -> Result<JsonValue, Error> {
warn!("TODO add with_deadline to PlainEventsQuery");
let deadline = Instant::now() + query.timeout_value();
// TODO RangeFilter2 must accept SeriesRange
let range: NanoRange = query.range().try_into()?;
// TODO
let do_time_weight = true;
let one_before_range = true;
// TODO construct the events query in a better way.
let evq = PlainEventsQuery::new(query.channel().clone(), query.range().clone()).for_time_weighted_scalar();
let binned_range = BinnedRangeEnum::covering_range(query.range().clone(), query.bin_count())?;
let bins_max = 10000;
let events_max = evq.events_max();
let collect_range = evq.range().clone();
let mut tr = build_merged_event_transform(evq.transform())?;
/*
let inps = open_tcp_streams::<_, ChannelEvents>(&evq, &cluster).await?;
// TODO propagate also the max-buf-len for the first stage event reader.
// TODO use a mixture of count and byte-size as threshold.
let stream = Merger::new(inps, evq.merger_out_len_max());
let stream = RangeFilter2::new(stream, range, one_before_range);
let stream = stream.map(move |k| {
on_sitemty_data!(k, |k| {
let k: Box<dyn Events> = Box::new(k);
info!("-------------------------\ngot len {}", k.len());
let k = tr.0.transform(k);
Ok(StreamItem::DataItem(RangeCompletableItem::Data(k)))
})
});
let stream = PlainEventStream::new(stream);
let stream = EventsToTimeBinnable::new(stream);
let stream = Box::pin(stream);
*/
let stream: Pin<Box<dyn Stream<Item = Sitemty<Box<dyn TimeBinnable>>> + Send>> = todo!();
// TODO TimeBinnedStream must accept types bin edges.
// Maybe even take a BinnedRangeEnum?
let stream = TimeBinnedStream::new(stream, binned_range, do_time_weight, deadline);
/*
let stream = stream.map(|k| {
on_sitemty_data!(k, |k| {
let k: Box<dyn TimeBinnable> = Box::new(k);
@@ -78,11 +195,17 @@ pub async fn timebinned_json(query: &BinnedQuery, chconf: &ChConf, cluster: &Clu
let stream = TimeBinnableToCollectable::new(stream);
let stream = Box::pin(stream);
*/
// TODO collect should not have to accept two ranges, instead, generalize over it.
//let collected = crate::collect::collect(stream, deadline, bins_max, None, Some(binned_range.clone())).await?;
let stream = futures_util::stream::empty();
let collected = Collect::new(stream, deadline, evq.events_max(), Some(evq.range().clone()), None).await?;
let stream = CollectableStreamBox(Box::pin(stream));
/*let collected = Collect::new(stream, deadline, events_max, Some(collect_range), None);
let collected: BoxFuture<_> = Box::pin(collected);
let collected = collected.await?;
*/
let collected = "";
let jsval = serde_json::to_value(&collected)?;
Ok(jsval)
}

View File

@@ -139,7 +139,7 @@ impl WithTransformProperties for TimeBinnableToCollectable {
impl CollectableStreamTrait for TimeBinnableToCollectable {}
impl CollectableStreamTrait for Pin<Box<TimeBinnableToCollectable>> {}
//impl CollectableStreamTrait for Pin<Box<TimeBinnableToCollectable>> {}
pub fn build_time_binning_transform(
tr: &TransformQuery,