WIP try performance

This commit is contained in:
Dominik Werder
2021-07-12 16:34:58 +02:00
parent 9509b43848
commit 48e0226a4f

View File

@@ -10,7 +10,7 @@ use netpod::log::*;
use netpod::log::*;
use netpod::query::RawEventsQuery;
use netpod::timeunits::{DAY, SEC};
use netpod::{ArchiverAppliance, Channel, ChannelInfo, ScalarType, Shape};
use netpod::{ArchiverAppliance, Channel, ChannelInfo, NanoRange, ScalarType, Shape};
use serde_json::Value as JsonValue;
use std::collections::VecDeque;
use std::future::Future;
@@ -129,30 +129,51 @@ trait FrameMakerTrait: Send {
fn make_frame(&self, ei: Sitemty<EventsItem>) -> Box<dyn Framable>;
}
struct FrameMaker<NTY> {
_m1: PhantomData<NTY>,
struct FrameMaker {
scalar_type: ScalarType,
shape: Shape,
}
impl<NTY> FrameMakerTrait for FrameMaker<NTY>
where
NTY: Send,
{
fn make_frame(&self, ei: Sitemty<EventsItem>) -> Box<dyn Framable> {
match ei {
macro_rules! events_item_to_sitemty {
($ei:expr, $var:ident) => {{
let d = match $ei {
Ok(j) => match j {
StreamItem::DataItem(j) => match j {
RangeCompletableItem::Data(j) => match j {
EventsItem::ScalarDouble(j) => {
let b = Ok(StreamItem::DataItem(RangeCompletableItem::Data(j)));
Box::new(b)
RangeCompletableItem::Data(j) => {
if let EventsItem::$var(j) = j {
Ok(StreamItem::DataItem(RangeCompletableItem::Data(j)))
} else {
panic!()
}
_ => panic!(),
},
_ => panic!(),
}
RangeCompletableItem::RangeComplete => {
Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete))
}
},
StreamItem::Log(j) => Ok(StreamItem::Log(j)),
StreamItem::Stats(j) => Ok(StreamItem::Stats(j)),
},
Err(e) => Err(e),
};
Box::new(d)
}};
}
impl FrameMakerTrait for FrameMaker {
fn make_frame(&self, ei: Sitemty<EventsItem>) -> Box<dyn Framable> {
match self.shape {
Shape::Scalar => match self.scalar_type {
ScalarType::I32 => events_item_to_sitemty!(ei, ScalarInt),
ScalarType::F32 => events_item_to_sitemty!(ei, ScalarFloat),
ScalarType::F64 => events_item_to_sitemty!(ei, ScalarDouble),
_ => panic!(),
},
Shape::Wave(_) => match self.scalar_type {
ScalarType::I32 => events_item_to_sitemty!(ei, WaveInt),
ScalarType::F32 => events_item_to_sitemty!(ei, WaveFloat),
ScalarType::F64 => events_item_to_sitemty!(ei, WaveDouble),
_ => panic!(),
},
_ => panic!(),
}
}
}
@@ -161,7 +182,7 @@ pub async fn make_event_pipe(
evq: &RawEventsQuery,
aa: &ArchiverAppliance,
) -> Result<Pin<Box<dyn Stream<Item = Box<dyn Framable>> + Send>>, Error> {
let _channel_info = channel_info(&evq.channel, aa).await?;
let ci = channel_info(&evq.channel, aa).await?;
let mut inps = vec![];
for p1 in &aa.data_base_paths {
let p2 = p1.clone();
@@ -173,14 +194,12 @@ pub async fn make_event_pipe(
completed_inps: vec![false; inps.len()],
inps,
};
let frame_maker = if true {
Box::new(FrameMaker::<f64> { _m1: PhantomData }) as Box<dyn FrameMakerTrait>
} else {
Box::new(FrameMaker::<f32> { _m1: PhantomData })
};
let frame_maker = Box::new(FrameMaker {
scalar_type: ci.scalar_type.clone(),
shape: ci.shape.clone(),
}) as Box<dyn FrameMakerTrait>;
let ret = sm.map(move |j| frame_maker.make_frame(j));
Ok(Box::pin(ret))
//err::todoval()
}
pub async fn make_single_event_pipe(
@@ -255,12 +274,13 @@ pub async fn make_single_event_pipe(
fn events_item_to_framable(ei: EventsItem) -> Result<Box<dyn Framable + Send>, Error> {
match ei {
EventsItem::ScalarDouble(h) => {
/*let (x, y) = h
let range: NanoRange = err::todoval();
let (x, y) = h
.tss
.into_iter()
.zip(h.values.into_iter())
.filter_map(|(j, k)| {
if j < evq.range.beg || j >= evq.range.end {
if j < range.beg || j >= range.end {
None
} else {
Some((j, k))
@@ -271,8 +291,8 @@ fn events_item_to_framable(ei: EventsItem) -> Result<Box<dyn Framable + Send>, E
b.push(k);
(a, b)
});
let b = EventValues { tss: x, values: y };*/
let b = Ok(StreamItem::DataItem(RangeCompletableItem::Data(h)));
let b = EventValues { tss: x, values: y };
let b = Ok(StreamItem::DataItem(RangeCompletableItem::Data(b)));
let ret = Box::new(b);
Ok(ret)
}