This commit is contained in:
Dominik Werder
2023-04-17 15:16:44 +02:00
parent 565999e7c7
commit b1c0aa9054
8 changed files with 81 additions and 49 deletions

View File

@@ -135,12 +135,12 @@ where
}
// TODO rename to `Typed`
pub trait CollectableType: fmt::Debug + AsAnyRef + AsAnyMut + TypeName {
pub trait CollectableType: fmt::Debug + AsAnyRef + AsAnyMut + TypeName + Send {
type Collector: CollectorType<Input = Self>;
fn new_collector() -> Self::Collector;
}
pub trait Collectable: fmt::Debug + AsAnyRef + AsAnyMut + TypeName {
pub trait Collectable: fmt::Debug + AsAnyRef + AsAnyMut + TypeName + Send {
fn new_collector(&self) -> Box<dyn Collector>;
}

View File

@@ -1,3 +1,4 @@
use crate::collect_s::Collectable;
use crate::collect_s::Collected;
use crate::streamitem::RangeCompletableItem;
use crate::streamitem::Sitemty;
@@ -105,18 +106,4 @@ where
}
}
// TODO these must return type which can take events as input.
pub struct TransformedCollectedStream(pub Pin<Box<dyn Future<Output = Result<Box<dyn Collected>, Error>>>>);
impl WithTransformProperties for TransformedCollectedStream {
fn query_transform_properties(&self) -> TransformProperties {
todo!()
}
}
impl EventTransform for TransformedCollectedStream {
fn transform(&mut self, src: Box<dyn Events>) -> Box<dyn Events> {
todo!()
}
}
pub struct CollectableStream(pub Pin<Box<dyn Stream<Item = Sitemty<Box<dyn Collectable>>> + Send>>);

View File

@@ -1,10 +1,4 @@
use err::Error;
use items_0::transform::EventStream;
use items_0::transform::TransformEvent;
use items_0::transform::TransformedCollectedStream;
use items_2::transform::make_transform_identity;
use items_2::transform::make_transform_min_max_avg;
use items_2::transform::make_transform_pulse_id_diff;
use netpod::get_url_query_pairs;
use netpod::log::*;
use netpod::AppendToUrl;
@@ -15,7 +9,7 @@ use std::collections::BTreeMap;
use url::Url;
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
enum EventTransformQuery {
pub enum EventTransformQuery {
EventBlobsVerbatim,
EventBlobsUncompressed,
ValueFull,
@@ -39,7 +33,7 @@ impl EventTransformQuery {
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
enum TimeBinningTransformQuery {
pub enum TimeBinningTransformQuery {
None,
TimeWeighted,
Unweighted,
@@ -114,30 +108,12 @@ impl TransformQuery {
}
}
pub fn build_event_transform(&self, inp: EventStream) -> Result<TransformEvent, Error> {
match &self.event {
EventTransformQuery::ValueFull => Ok(make_transform_identity()),
EventTransformQuery::MinMaxAvgDev => Ok(make_transform_min_max_avg()),
EventTransformQuery::ArrayPick(..) => Err(Error::with_msg_no_trace(format!(
"build_event_transform don't know what to do {self:?}"
))),
EventTransformQuery::PulseIdDiff => Ok(make_transform_pulse_id_diff()),
EventTransformQuery::EventBlobsVerbatim => Err(Error::with_msg_no_trace(format!(
"build_event_transform don't know what to do {self:?}"
))),
EventTransformQuery::EventBlobsUncompressed => Err(Error::with_msg_no_trace(format!(
"build_event_transform don't know what to do {self:?}"
))),
}
pub fn get_tr_event(&self) -> &EventTransformQuery {
&self.event
}
pub fn build_full_transform_collected(&self, inp: EventStream) -> Result<TransformedCollectedStream, Error> {
let evs = self.build_event_transform(inp)?;
match &self.time_binning {
TimeBinningTransformQuery::None => todo!(),
TimeBinningTransformQuery::TimeWeighted => todo!(),
TimeBinningTransformQuery::Unweighted => todo!(),
}
pub fn get_tr_time_binning(&self) -> &TimeBinningTransformQuery {
&self.time_binning
}
}

View File

@@ -8,6 +8,7 @@ use items_0::streamitem::RangeCompletableItem;
use items_0::streamitem::Sitemty;
use items_0::streamitem::StatsItem;
use items_0::streamitem::StreamItem;
use items_0::transform::EventStream;
use items_0::WithLen;
use netpod::log::*;
use netpod::range::evrange::SeriesRange;
@@ -36,6 +37,10 @@ macro_rules! trace4 {
($($arg:tt)*) => (eprintln!($($arg)*));
}
pub struct Collect {
inp: EventStream,
}
async fn collect_in_span<T, S>(
stream: S,
deadline: Instant,

View File

@@ -13,3 +13,4 @@ pub mod tcprawclient;
pub mod test;
pub mod timebin;
pub mod timebinnedjson;
pub mod transform;

View File

@@ -91,7 +91,7 @@ pub async fn plain_events_json(evq: &PlainEventsQuery, chconf: &ChConf, cluster:
info!("item after merge: {item:?}");
item
});
let stream = RangeFilter2::new(stream, evq.range().try_into()?, evquery.one_before_range());
//let stream = RangeFilter2::new(stream, evq.range().try_into()?, evquery.one_before_range());
#[cfg(DISABLED)]
let stream = stream.map(|item| {
info!("item after rangefilter: {item:?}");

View File

@@ -1,6 +1,7 @@
use crate::collect::collect;
use crate::generators::GenerateI32;
use crate::test::runfut;
use crate::transform::build_event_transform;
use chrono::DateTime;
use chrono::Utc;
use err::Error;
@@ -255,6 +256,6 @@ fn transform_chain_correctness_01() -> Result<(), Error> {
type STY = f32;
let tq = TransformQuery::default_time_binned();
let empty = EventsDim0::<STY>::empty();
tq.build_event_transform(empty.into())?;
build_event_transform(&tq, empty.into())?;
Ok(())
}

62
streams/src/transform.rs Normal file
View File

@@ -0,0 +1,62 @@
use err::Error;
use futures_util::StreamExt;
use items_0::transform::CollectableStream;
use items_0::transform::EventStream;
use items_0::transform::TransformEvent;
use items_2::transform::make_transform_identity;
use items_2::transform::make_transform_min_max_avg;
use items_2::transform::make_transform_pulse_id_diff;
use query::transform::EventTransformQuery;
use query::transform::TimeBinningTransformQuery;
use query::transform::TransformQuery;
pub fn build_event_transform(tr: &TransformQuery, inp: EventStream) -> Result<TransformEvent, Error> {
let trev = tr.get_tr_event();
match trev {
EventTransformQuery::ValueFull => Ok(make_transform_identity()),
EventTransformQuery::MinMaxAvgDev => Ok(make_transform_min_max_avg()),
EventTransformQuery::ArrayPick(..) => Err(Error::with_msg_no_trace(format!(
"build_event_transform don't know what to do {trev:?}"
))),
EventTransformQuery::PulseIdDiff => Ok(make_transform_pulse_id_diff()),
EventTransformQuery::EventBlobsVerbatim => Err(Error::with_msg_no_trace(format!(
"build_event_transform don't know what to do {trev:?}"
))),
EventTransformQuery::EventBlobsUncompressed => Err(Error::with_msg_no_trace(format!(
"build_event_transform don't know what to do {trev:?}"
))),
}
}
pub fn build_full_transform_collected(tr: &TransformQuery, inp: EventStream) -> Result<CollectableStream, Error> {
// TODO this must return a Stream!
//let evs = build_event_transform(tr, inp)?;
let trtb = tr.get_tr_time_binning();
use futures_util::Stream;
use items_0::collect_s::Collectable;
use items_0::streamitem::RangeCompletableItem;
use items_0::streamitem::Sitemty;
use items_0::streamitem::StreamItem;
use std::pin::Pin;
let a: Pin<Box<dyn Stream<Item = Sitemty<Box<dyn Collectable>>> + Send>> = Box::pin(inp.0.map(|item| match item {
Ok(item) => match item {
StreamItem::DataItem(item) => match item {
RangeCompletableItem::Data(item) => {
let item: Box<dyn Collectable> = Box::new(item);
Ok(StreamItem::DataItem(RangeCompletableItem::Data(item)))
}
RangeCompletableItem::RangeComplete => Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)),
},
StreamItem::Log(item) => Ok(StreamItem::Log(item)),
StreamItem::Stats(item) => Ok(StreamItem::Stats(item)),
},
Err(e) => Err(e),
}));
let stream: Pin<Box<dyn Stream<Item = Sitemty<Box<dyn Collectable>>> + Send>> =
Box::pin(futures_util::stream::empty());
match trtb {
TimeBinningTransformQuery::None => Ok(CollectableStream(stream)),
TimeBinningTransformQuery::TimeWeighted => todo!(),
TimeBinningTransformQuery::Unweighted => todo!(),
}
}