This commit is contained in:
Dominik Werder
2023-04-17 16:24:33 +02:00
parent b1c0aa9054
commit 7f641f5971
3 changed files with 119 additions and 4 deletions

View File

@@ -1,14 +1,25 @@
use err::Error;
use futures_util::Stream;
use futures_util::StreamExt;
use items_0::streamitem::RangeCompletableItem;
use items_0::streamitem::Sitemty;
use items_0::streamitem::StreamItem;
use items_0::timebin::TimeBinnable;
use items_0::transform::CollectableStream;
use items_0::transform::EventStream;
use items_0::transform::EventStreamTrait;
use items_0::transform::TransformEvent;
use items_0::transform::WithTransformProperties;
use items_0::Events;
use items_2::transform::make_transform_identity;
use items_2::transform::make_transform_min_max_avg;
use items_2::transform::make_transform_pulse_id_diff;
use query::transform::EventTransformQuery;
use query::transform::TimeBinningTransformQuery;
use query::transform::TransformQuery;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
pub fn build_event_transform(tr: &TransformQuery, inp: EventStream) -> Result<TransformEvent, Error> {
let trev = tr.get_tr_event();
@@ -28,7 +39,40 @@ pub fn build_event_transform(tr: &TransformQuery, inp: EventStream) -> Result<Tr
}
}
pub fn build_full_transform_collected(tr: &TransformQuery, inp: EventStream) -> Result<CollectableStream, Error> {
pub struct EventsToTimeBinnable {
inp: Pin<Box<dyn EventStreamTrait>>,
}
impl Stream for EventsToTimeBinnable {
type Item = Sitemty<Box<dyn TimeBinnable>>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
match self.inp.poll_next_unpin(cx) {
Ready(Some(item)) => Ready(Some(match item {
Ok(item) => Ok(match item {
StreamItem::DataItem(item) => StreamItem::DataItem(match item {
RangeCompletableItem::RangeComplete => RangeCompletableItem::RangeComplete,
RangeCompletableItem::Data(item) => RangeCompletableItem::Data(Box::new(item)),
}),
StreamItem::Log(item) => StreamItem::Log(item),
StreamItem::Stats(item) => StreamItem::Stats(item),
}),
Err(e) => Err(e),
})),
Ready(None) => Ready(None),
Pending => Pending,
}
}
}
impl WithTransformProperties for EventsToTimeBinnable {
fn query_transform_properties(&self) -> items_0::transform::TransformProperties {
self.inp.query_transform_properties()
}
}
pub fn build_full_transform_collectable(tr: &TransformQuery, inp: EventStream) -> Result<CollectableStream, Error> {
// TODO this must return a Stream!
//let evs = build_event_transform(tr, inp)?;
let trtb = tr.get_tr_time_binning();