higher-ranked lifetime error, could not prove, could not normalize

This commit is contained in:
Dominik Werder
2023-04-19 13:50:33 +02:00
parent a94d68aa49
commit 868f4b014a
5 changed files with 142 additions and 23 deletions

View File

@@ -62,6 +62,30 @@ pub trait TimeBinned: Any + TypeName + TimeBinnable + Collectable + erased_serde
fn validate(&self) -> Result<(), String>;
}
impl RangeOverlapInfo for Box<dyn TimeBinned> {
fn ends_before(&self, range: &SeriesRange) -> bool {
todo!()
}
fn ends_after(&self, range: &SeriesRange) -> bool {
todo!()
}
fn starts_after(&self, range: &SeriesRange) -> bool {
todo!()
}
}
impl TimeBinnable for Box<dyn TimeBinned> {
fn time_binner_new(&self, binrange: BinnedRangeEnum, do_time_weight: bool) -> Box<dyn TimeBinner> {
todo!()
}
fn to_box_to_json_result(&self) -> Box<dyn ToJsonResult> {
todo!()
}
}
pub trait TimeBinner: fmt::Debug + Send {
fn ingest(&mut self, item: &mut dyn TimeBinnable);
fn bins_ready_count(&self) -> usize;

View File

@@ -6,9 +6,11 @@ use items_0::collect_s::Collectable;
use items_0::streamitem::RangeCompletableItem;
use items_0::streamitem::Sitemty;
use items_0::streamitem::StreamItem;
use items_0::timebin::TimeBinnable;
use items_0::transform::CollectableStreamTrait;
use items_0::transform::EventStreamTrait;
use items_0::transform::EventTransform;
use items_0::transform::TimeBinnableStreamTrait;
use items_0::transform::TransformProperties;
use items_0::transform::WithTransformProperties;
use items_0::Events;
@@ -292,3 +294,66 @@ where
INP: Stream<Item = Sitemty<T>> + Send,
{
}
/// Wrap any event stream and provide transformation properties.
pub struct PlainTimeBinnableStream<INP, T>
where
T: TimeBinnable,
INP: Stream<Item = Sitemty<T>> + Send,
{
inp: Pin<Box<INP>>,
}
impl<INP, T> PlainTimeBinnableStream<INP, T>
where
T: TimeBinnable,
INP: Stream<Item = Sitemty<T>> + Send,
{
pub fn new(inp: INP) -> Self {
Self { inp: Box::pin(inp) }
}
}
impl<INP, T> Stream for PlainTimeBinnableStream<INP, T>
where
T: TimeBinnable,
INP: Stream<Item = Sitemty<T>> + Send,
{
type Item = Sitemty<Box<dyn TimeBinnable>>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
match self.inp.poll_next_unpin(cx) {
Ready(Some(item)) => Ready(Some(match item {
Ok(item) => Ok(match item {
StreamItem::DataItem(item) => StreamItem::DataItem(match item {
RangeCompletableItem::RangeComplete => RangeCompletableItem::RangeComplete,
RangeCompletableItem::Data(item) => RangeCompletableItem::Data(Box::new(item)),
}),
StreamItem::Log(item) => StreamItem::Log(item),
StreamItem::Stats(item) => StreamItem::Stats(item),
}),
Err(e) => Err(e),
})),
Ready(None) => Ready(None),
Pending => Pending,
}
}
}
impl<INP, T> WithTransformProperties for PlainTimeBinnableStream<INP, T>
where
T: TimeBinnable,
INP: Stream<Item = Sitemty<T>> + Send,
{
fn query_transform_properties(&self) -> TransformProperties {
todo!()
}
}
impl<INP, T> TimeBinnableStreamTrait for PlainTimeBinnableStream<INP, T>
where
T: TimeBinnable,
INP: Stream<Item = Sitemty<T>> + Send,
{
}

View File

@@ -10,6 +10,8 @@ use items_0::streamitem::StreamItem;
use items_0::timebin::TimeBinnableTy;
use items_0::timebin::TimeBinner;
use items_0::timebin::TimeBinnerTy;
use items_0::transform::TimeBinnableStreamTrait;
use items_0::transform::WithTransformProperties;
use netpod::log::*;
use netpod::BinnedRange;
use netpod::BinnedRangeEnum;
@@ -237,3 +239,7 @@ where
}
}
}
//impl<T> WithTransformProperties for TimeBinnedStream<T> where T: TimeBinnableTy {}
//impl<T> TimeBinnableStreamTrait for TimeBinnedStream<T> where T: TimeBinnableTy {}

View File

@@ -1,15 +1,26 @@
use crate::collect::Collect;
use crate::rangefilter2::RangeFilter2;
use crate::tcprawclient::open_tcp_streams;
use crate::timebin::TimeBinnedStream;
use crate::transform::build_merged_event_transform;
use crate::transform::EventsToTimeBinnable;
use crate::transform::TimeBinnableToCollectable;
use err::Error;
use futures_util::stream;
use futures_util::StreamExt;
use items_0::on_sitemty_data;
use items_0::streamitem::sitem_data;
use items_0::streamitem::Sitemty;
use items_0::timebin::TimeBinnable;
use items_0::timebin::TimeBinned;
use items_0::transform::TimeBinnableStreamBox;
use items_0::Events;
use items_2::channelevents::ChannelEvents;
use items_2::merger::Merger;
use items_2::streams::PlainEventStream;
use items_2::streams::PlainTimeBinnableStream;
use netpod::log::*;
use netpod::BinnedRange;
use netpod::BinnedRangeEnum;
use netpod::ChConf;
use netpod::Cluster;
@@ -23,22 +34,13 @@ pub async fn timebinned_json(query: &BinnedQuery, chconf: &ChConf, cluster: &Clu
let bins_max = 10000;
warn!("TODO add with_deadline to PlainEventsQuery");
let deadline = Instant::now() + query.timeout_value();
let empty = items_2::empty::empty_events_dyn_ev(&chconf.scalar_type, &chconf.shape)?;
warn!("TODO feed through transform chain");
let empty = ChannelEvents::Events(empty);
let empty = sitem_data(empty);
// TODO this part is supposed to be done one the event nodes, if sharded:
//crate::transform::build_event_transform(tr, inp);
query.transform();
// TODO
let evquery = PlainEventsQuery::new(query.channel().clone(), query.range().clone()).for_time_weighted_scalar();
let inps = open_tcp_streams::<_, items_2::channelevents::ChannelEvents>(&evquery, cluster).await?;
// TODO propagate also the max-buf-len for the first stage event reader:
info!("timebinned_json with empty item {empty:?}");
let stream = Merger::new(inps, 1024);
let stream = stream::iter([empty]).chain(stream);
// TODO construct the events query in a better way.
let evq = PlainEventsQuery::new(query.channel().clone(), query.range().clone()).for_time_weighted_scalar();
let mut tr = build_merged_event_transform(evq.transform())?;
let inps = open_tcp_streams::<_, ChannelEvents>(&evq, cluster).await?;
// TODO propagate also the max-buf-len for the first stage event reader.
// TODO use a mixture of count and byte-size as threshold.
let stream = Merger::new(inps, evq.merger_out_len_max());
// TODO
let do_time_weight = true;
@@ -48,19 +50,39 @@ pub async fn timebinned_json(query: &BinnedQuery, chconf: &ChConf, cluster: &Clu
let range = query.range().try_into()?;
let stream = RangeFilter2::new(stream, range, one_before_range);
let stream = stream.map(move |k| {
on_sitemty_data!(k, |k| {
let k: Box<dyn Events> = Box::new(k);
info!("-------------------------\ngot len {}", k.len());
let k = tr.0.transform(k);
Ok(StreamItem::DataItem(RangeCompletableItem::Data(k)))
})
});
let stream = PlainEventStream::new(stream);
let stream = EventsToTimeBinnable::new(stream);
let stream = Box::pin(stream);
// TODO TimeBinnedStream must accept types bin edges.
// Maybe even take a BinnedRangeEnum?
let stream = TimeBinnedStream::new(stream, todo!(), do_time_weight, deadline);
if false {
let mut stream = stream;
let _: Option<Sitemty<Box<dyn TimeBinned>>> = stream.next().await;
panic!()
}
let stream = TimeBinnedStream::new(stream, binned_range, do_time_weight, deadline);
let stream = stream.map(|k| {
on_sitemty_data!(k, |k| {
let k: Box<dyn TimeBinnable> = Box::new(k);
sitem_data(k)
})
});
let stream = PlainTimeBinnableStream::new(stream);
//let stream = Box::pin(stream);
//let stream = TimeBinnableStreamBox(stream);
let stream = TimeBinnableToCollectable::new(stream);
let stream = Box::pin(stream);
// TODO collect should not have to accept two ranges, instead, generalize over it.
let collected = crate::collect::collect(stream, deadline, bins_max, None, Some(binned_range.clone())).await?;
//let collected = crate::collect::collect(stream, deadline, bins_max, None, Some(binned_range.clone())).await?;
let stream = futures_util::stream::empty();
let collected = Collect::new(stream, deadline, evq.events_max(), Some(evq.range().clone()), None).await?;
let jsval = serde_json::to_value(&collected)?;
Ok(jsval)
}

View File

@@ -139,6 +139,8 @@ impl WithTransformProperties for TimeBinnableToCollectable {
impl CollectableStreamTrait for TimeBinnableToCollectable {}
impl CollectableStreamTrait for Pin<Box<TimeBinnableToCollectable>> {}
pub fn build_time_binning_transform(
tr: &TransformQuery,
inp: Pin<Box<dyn TimeBinnableStreamTrait>>,