WIP Start to add alternative tbin pipeline
This commit is contained in:
+27
-30
@@ -1,49 +1,53 @@
|
||||
use crate::agg::streams::StreamItem;
|
||||
use crate::binned::{RangeCompletableItem, StreamKind, XBinnedEvents};
|
||||
use crate::frame::inmem::InMemoryFrameAsyncReadStream;
|
||||
use crate::frame::makeframe::decode_frame;
|
||||
use crate::frame::makeframe::{decode_frame, FrameType};
|
||||
use crate::Sitemty;
|
||||
use err::Error;
|
||||
use futures_core::Stream;
|
||||
use futures_util::StreamExt;
|
||||
use netpod::log::*;
|
||||
use serde::de::DeserializeOwned;
|
||||
use std::marker::PhantomData;
|
||||
use std::pin::Pin;
|
||||
use std::task::{Context, Poll};
|
||||
use tokio::io::AsyncRead;
|
||||
|
||||
pub struct EventsFromFrames<T, SK>
|
||||
// TODO remove usage of SK, no longer needed.
|
||||
pub struct EventsFromFrames<T, I>
|
||||
where
|
||||
T: AsyncRead + Unpin,
|
||||
SK: StreamKind,
|
||||
{
|
||||
inp: InMemoryFrameAsyncReadStream<T>,
|
||||
errored: bool,
|
||||
completed: bool,
|
||||
_stream_kind: SK,
|
||||
_m2: PhantomData<I>,
|
||||
}
|
||||
|
||||
impl<T, SK> EventsFromFrames<T, SK>
|
||||
impl<T, I> EventsFromFrames<T, I>
|
||||
where
|
||||
T: AsyncRead + Unpin,
|
||||
SK: StreamKind,
|
||||
{
|
||||
pub fn new(inp: InMemoryFrameAsyncReadStream<T>, stream_kind: SK) -> Self {
|
||||
pub fn new(inp: InMemoryFrameAsyncReadStream<T>) -> Self {
|
||||
Self {
|
||||
inp,
|
||||
errored: false,
|
||||
completed: false,
|
||||
_stream_kind: stream_kind,
|
||||
_m2: PhantomData,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T, SK> Stream for EventsFromFrames<T, SK>
|
||||
impl<T, I> Stream for EventsFromFrames<T, I>
|
||||
where
|
||||
T: AsyncRead + Unpin,
|
||||
SK: StreamKind,
|
||||
//SK: StreamKind,
|
||||
I: DeserializeOwned + Unpin,
|
||||
// TODO see binned.rs better to express it on trait?
|
||||
//Result<StreamItem<RangeCompletableItem<<SK as BinnedStreamKind>::XBinnedEvents>>, Error>: FrameType,
|
||||
Sitemty<I>: FrameType,
|
||||
{
|
||||
type Item = Result<StreamItem<RangeCompletableItem<<SK as StreamKind>::XBinnedEvents>>, Error>;
|
||||
type Item = Sitemty<I>;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
||||
use Poll::*;
|
||||
@@ -58,30 +62,23 @@ where
|
||||
Ready(Some(Ok(item))) => match item {
|
||||
StreamItem::Log(item) => Ready(Some(Ok(StreamItem::Log(item)))),
|
||||
StreamItem::Stats(item) => Ready(Some(Ok(StreamItem::Stats(item)))),
|
||||
StreamItem::DataItem(frame) => {
|
||||
match decode_frame::<
|
||||
Result<StreamItem<RangeCompletableItem<<SK as StreamKind>::XBinnedEvents>>, Error>,
|
||||
>(
|
||||
&frame,
|
||||
<<SK as StreamKind>::XBinnedEvents as XBinnedEvents<SK>>::frame_type(),
|
||||
) {
|
||||
Ok(item) => match item {
|
||||
Ok(item) => Ready(Some(Ok(item))),
|
||||
Err(e) => {
|
||||
self.errored = true;
|
||||
Ready(Some(Err(e)))
|
||||
}
|
||||
},
|
||||
StreamItem::DataItem(frame) => match decode_frame::<Sitemty<I>>(&frame, 0) {
|
||||
Ok(item) => match item {
|
||||
Ok(item) => Ready(Some(Ok(item))),
|
||||
Err(e) => {
|
||||
error!(
|
||||
"EventsFromFrames ~~~~~~~~ ERROR on frame payload {}",
|
||||
frame.buf().len(),
|
||||
);
|
||||
self.errored = true;
|
||||
Ready(Some(Err(e)))
|
||||
}
|
||||
},
|
||||
Err(e) => {
|
||||
error!(
|
||||
"EventsFromFrames ~~~~~~~~ ERROR on frame payload {}",
|
||||
frame.buf().len(),
|
||||
);
|
||||
self.errored = true;
|
||||
Ready(Some(Err(e)))
|
||||
}
|
||||
}
|
||||
},
|
||||
},
|
||||
Ready(Some(Err(e))) => {
|
||||
self.errored = true;
|
||||
|
||||
@@ -2,9 +2,7 @@ use crate::agg::binnedx::IntoBinnedXBins1;
|
||||
use crate::agg::eventbatch::MinMaxAvgScalarEventBatch;
|
||||
use crate::agg::streams::StreamItem;
|
||||
use crate::agg::IntoDim1F32Stream;
|
||||
use crate::binned::{
|
||||
BinnedStreamKindScalar, EventsNodeProcessor, NumBinnedPipeline, NumOps, RangeCompletableItem, StreamKind,
|
||||
};
|
||||
use crate::binned::{BinnedStreamKindScalar, EventsNodeProcessor, NumOps, RangeCompletableItem, StreamKind};
|
||||
use crate::decode::{
|
||||
BigEndian, Endianness, EventValueFromBytes, EventValueShape, EventValuesDim0Case, EventValuesDim1Case,
|
||||
EventsDecodedStream, LittleEndian, NumFromBytes,
|
||||
@@ -111,7 +109,6 @@ where
|
||||
Sitemty<<ENP as EventsNodeProcessor>::Output>: Framable + 'static,
|
||||
<ENP as EventsNodeProcessor>::Output: 'static,
|
||||
{
|
||||
NumBinnedPipeline::<NTY, END, ENP>::new();
|
||||
let decs = EventsDecodedStream::<NTY, END, EVS>::new(event_value_shape, event_blobs);
|
||||
let s2 = StreamExt::map(decs, |item| match item {
|
||||
Ok(item) => match item {
|
||||
|
||||
Reference in New Issue
Block a user