Numeric stream item and default processing on node

This commit is contained in:
Dominik Werder
2021-06-06 08:55:26 +02:00
parent 99a312ec26
commit abcaaf96be
5 changed files with 149 additions and 32 deletions
+1
View File
@@ -52,6 +52,7 @@ impl std::fmt::Debug for ValuesDim0 {
} }
} }
// TODO get rid of AggregatableXdim1Bin and ValuesDim1
impl<SK> AggregatableXdim1Bin<SK> for ValuesDim1 impl<SK> AggregatableXdim1Bin<SK> for ValuesDim1
where where
SK: StreamKind, SK: StreamKind,
+4 -4
View File
@@ -20,7 +20,7 @@ use netpod::log::*;
use netpod::{ use netpod::{
AggKind, BinnedRange, NanoRange, NodeConfigCached, PerfOpts, PreBinnedPatchIterator, PreBinnedPatchRange, AggKind, BinnedRange, NanoRange, NodeConfigCached, PerfOpts, PreBinnedPatchIterator, PreBinnedPatchRange,
}; };
use num_traits::Zero; use num_traits::{AsPrimitive, Bounded, Zero};
use parse::channelconfig::{extract_matching_config_entry, read_local_config, MatchingConfigEntry}; use parse::channelconfig::{extract_matching_config_entry, read_local_config, MatchingConfigEntry};
use serde::de::DeserializeOwned; use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize, Serializer}; use serde::{Deserialize, Serialize, Serializer};
@@ -555,8 +555,8 @@ impl TBinnedBins for MinMaxAvgScalarBinBatch {
// I would like to decide on the disk-dtype first and get some generic intermediate type, and the // I would like to decide on the disk-dtype first and get some generic intermediate type, and the
// decide the AggKind, and maybe even other generic types. // decide the AggKind, and maybe even other generic types.
pub trait NumOps: Sized + Send + Unpin + Zero + BitXor {} pub trait NumOps: Sized + Copy + Send + Unpin + Zero + BitXor + AsPrimitive<f32> + Bounded + PartialOrd {}
impl<T> NumOps for T where T: Sized + Send + Unpin + Zero + BitXor {} impl<T> NumOps for T where T: Sized + Copy + Send + Unpin + Zero + BitXor + AsPrimitive<f32> + Bounded + PartialOrd {}
pub trait EventsDecoder { pub trait EventsDecoder {
type Output; type Output;
@@ -567,7 +567,7 @@ pub trait EventsDecoder {
pub trait EventsNodeProcessor { pub trait EventsNodeProcessor {
type Input; type Input;
type Output; type Output;
fn process(inp: &EventValues<Self::Input>) -> Self::Output; fn process(inp: EventValues<Self::Input>) -> Self::Output;
} }
pub struct NumEvents<N> { pub struct NumEvents<N> {
+106 -15
View File
@@ -1,13 +1,14 @@
use crate::agg::eventbatch::MinMaxAvgScalarEventBatch;
use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch; use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch;
use crate::agg::streams::StreamItem; use crate::agg::streams::StreamItem;
use crate::binned::{EventsNodeProcessor, NumOps, RangeCompletableItem}; use crate::binned::{EventsNodeProcessor, NumOps, RangeCompletableItem};
use crate::eventblobs::EventBlobsComplete; use crate::eventblobs::EventBlobsComplete;
use crate::eventchunker::EventFull; use crate::eventchunker::EventFull;
use crate::frame::makeframe::{make_frame, Framable};
use bytes::BytesMut;
use err::Error; use err::Error;
use futures_core::Stream; use futures_core::Stream;
use futures_util::StreamExt; use futures_util::StreamExt;
use netpod::ScalarType; use serde::{Deserialize, Serialize};
use std::marker::PhantomData; use std::marker::PhantomData;
use std::mem::size_of; use std::mem::size_of;
use std::pin::Pin; use std::pin::Pin;
@@ -40,7 +41,7 @@ where
NTY: NumFromBytes<NTY, END>, NTY: NumFromBytes<NTY, END>,
{ {
type Output; type Output;
fn convert(buf: &[u8]) -> Self::Output; fn convert(&self, buf: &[u8]) -> Result<Self::Output, Error>;
} }
impl<NTY, END> EventValueFromBytes<NTY, END> for EventValuesDim0Case<NTY> impl<NTY, END> EventValueFromBytes<NTY, END> for EventValuesDim0Case<NTY>
@@ -48,8 +49,9 @@ where
NTY: NumFromBytes<NTY, END>, NTY: NumFromBytes<NTY, END>,
{ {
type Output = NTY; type Output = NTY;
fn convert(buf: &[u8]) -> Self::Output {
NTY::convert(buf) fn convert(&self, buf: &[u8]) -> Result<Self::Output, Error> {
Ok(NTY::convert(buf))
} }
} }
@@ -58,16 +60,20 @@ where
NTY: NumFromBytes<NTY, END>, NTY: NumFromBytes<NTY, END>,
{ {
type Output = Vec<NTY>; type Output = Vec<NTY>;
fn convert(buf: &[u8]) -> Self::Output {
fn convert(&self, buf: &[u8]) -> Result<Self::Output, Error> {
let es = size_of::<NTY>(); let es = size_of::<NTY>();
let n1 = buf.len() / es; let n1 = buf.len() / es;
if n1 != self.n as usize {
return Err(Error::with_msg(format!("ele count got {} exp {}", n1, self.n)));
}
let mut vals = vec![]; let mut vals = vec![];
// TODO could optimize using unsafe code.. // TODO could optimize using unsafe code..
for n2 in 0..n1 { for n2 in 0..n1 {
let i1 = es * n2; let i1 = es * n2;
vals.push(<NTY as NumFromBytes<NTY, END>>::convert(&buf[i1..(i1 + es)])); vals.push(<NTY as NumFromBytes<NTY, END>>::convert(&buf[i1..(i1 + es)]));
} }
vals Ok(vals)
} }
} }
@@ -95,8 +101,9 @@ pub struct ProcAA<NTY> {
impl<NTY> EventsNodeProcessor for ProcAA<NTY> { impl<NTY> EventsNodeProcessor for ProcAA<NTY> {
type Input = NTY; type Input = NTY;
type Output = MinMaxAvgScalarEventBatch; type Output = MinMaxAvgScalarBinBatch;
fn process(inp: &EventValues<Self::Input>) -> Self::Output {
fn process(_inp: EventValues<Self::Input>) -> Self::Output {
todo!() todo!()
} }
} }
@@ -124,11 +131,93 @@ pub struct ProcBB<NTY> {
_m1: PhantomData<NTY>, _m1: PhantomData<NTY>,
} }
impl<NTY> EventsNodeProcessor for ProcBB<NTY> { #[derive(Serialize, Deserialize)]
pub struct MinMaxAvgScalarEventBatchGen<NTY> {
pub tss: Vec<u64>,
pub mins: Vec<Option<NTY>>,
pub maxs: Vec<Option<NTY>>,
pub avgs: Vec<Option<f32>>,
}
impl<NTY> MinMaxAvgScalarEventBatchGen<NTY> {
pub fn empty() -> Self {
Self {
tss: vec![],
mins: vec![],
maxs: vec![],
avgs: vec![],
}
}
}
impl<NTY> Framable for Result<StreamItem<RangeCompletableItem<MinMaxAvgScalarEventBatchGen<NTY>>>, Error>
where
NTY: NumOps + Serialize,
{
fn make_frame(&self) -> Result<BytesMut, Error> {
make_frame(self)
}
}
impl<NTY> EventsNodeProcessor for ProcBB<NTY>
where
NTY: NumOps,
{
type Input = Vec<NTY>; type Input = Vec<NTY>;
type Output = MinMaxAvgScalarBinBatch; type Output = MinMaxAvgScalarEventBatchGen<NTY>;
fn process(inp: &EventValues<Self::Input>) -> Self::Output {
todo!() fn process(inp: EventValues<Self::Input>) -> Self::Output {
let nev = inp.tss.len();
let mut ret = MinMaxAvgScalarEventBatchGen {
tss: inp.tss,
mins: Vec::with_capacity(nev),
maxs: Vec::with_capacity(nev),
avgs: Vec::with_capacity(nev),
};
for i1 in 0..nev {
let mut min = None;
let mut max = None;
let mut sum = 0f32;
let mut count = 0;
let vals = &inp.values[i1];
for i2 in 0..vals.len() {
let v = vals[i2];
min = match min {
None => Some(v),
Some(min) => {
if v < min {
Some(v)
} else {
Some(min)
}
}
};
max = match max {
None => Some(v),
Some(max) => {
if v > max {
Some(v)
} else {
Some(max)
}
}
};
let vf = v.as_();
if vf.is_nan() {
} else {
sum += vf;
count += 1;
}
}
ret.mins.push(min);
ret.maxs.push(max);
if count == 0 {
ret.avgs.push(None);
} else {
ret.avgs.push(Some(sum / count as f32));
}
}
ret
} }
} }
@@ -177,6 +266,7 @@ where
END: Endianness, END: Endianness,
EVS: EventValueShape<NTY, END>, EVS: EventValueShape<NTY, END>,
{ {
evs: EVS,
event_blobs: EventBlobsComplete, event_blobs: EventBlobsComplete,
completed: bool, completed: bool,
errored: bool, errored: bool,
@@ -191,8 +281,9 @@ where
END: Endianness, END: Endianness,
EVS: EventValueShape<NTY, END> + EventValueFromBytes<NTY, END>, EVS: EventValueShape<NTY, END> + EventValueFromBytes<NTY, END>,
{ {
pub fn new(event_blobs: EventBlobsComplete) -> Self { pub fn new(evs: EVS, event_blobs: EventBlobsComplete) -> Self {
Self { Self {
evs,
event_blobs, event_blobs,
completed: false, completed: false,
errored: false, errored: false,
@@ -212,7 +303,7 @@ where
let decomp = ev.decomps[i1].as_ref().unwrap().as_ref(); let decomp = ev.decomps[i1].as_ref().unwrap().as_ref();
let val = <EVS as EventValueFromBytes<NTY, END>>::convert(decomp); let val = self.evs.convert(decomp)?;
ret.tss.push(ev.tss[i1]); ret.tss.push(ev.tss[i1]);
ret.values.push(val); ret.values.push(val);
} }
+25
View File
@@ -2,6 +2,7 @@ use crate::agg::eventbatch::MinMaxAvgScalarEventBatch;
use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch; use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch;
use crate::agg::streams::StreamItem; use crate::agg::streams::StreamItem;
use crate::binned::RangeCompletableItem; use crate::binned::RangeCompletableItem;
use crate::decode::MinMaxAvgScalarEventBatchGen;
use crate::frame::inmem::InMemoryFrame; use crate::frame::inmem::InMemoryFrame;
use crate::raw::EventQueryJsonStringFrame; use crate::raw::EventQueryJsonStringFrame;
use bytes::{BufMut, BytesMut}; use bytes::{BufMut, BytesMut};
@@ -28,6 +29,30 @@ impl FrameType for Result<StreamItem<RangeCompletableItem<MinMaxAvgScalarEventBa
const FRAME_TYPE_ID: u32 = 0x11; const FRAME_TYPE_ID: u32 = 0x11;
} }
impl<NTY> FrameType for Result<StreamItem<RangeCompletableItem<MinMaxAvgScalarEventBatchGen<NTY>>>, Error> {
const FRAME_TYPE_ID: u32 = 888888;
}
pub trait ProvidesFrameType {
fn frame_type_id(&self) -> u32;
}
pub trait Framable: Send {
fn make_frame(&self) -> Result<BytesMut, Error>;
}
impl Framable for Result<StreamItem<RangeCompletableItem<MinMaxAvgScalarBinBatch>>, Error> {
fn make_frame(&self) -> Result<BytesMut, Error> {
make_frame(self)
}
}
impl Framable for Result<StreamItem<RangeCompletableItem<MinMaxAvgScalarEventBatch>>, Error> {
fn make_frame(&self) -> Result<BytesMut, Error> {
make_frame(self)
}
}
pub fn make_frame<FT>(item: &FT) -> Result<BytesMut, Error> pub fn make_frame<FT>(item: &FT) -> Result<BytesMut, Error>
where where
FT: FrameType + Serialize, FT: FrameType + Serialize,
+13 -13
View File
@@ -3,17 +3,16 @@ use crate::agg::eventbatch::MinMaxAvgScalarEventBatch;
use crate::agg::streams::StreamItem; use crate::agg::streams::StreamItem;
use crate::agg::IntoDim1F32Stream; use crate::agg::IntoDim1F32Stream;
use crate::binned::{ use crate::binned::{
BinnedStreamKindScalar, EventsNodeProcessor, MakeBytesFrame, NumBinnedPipeline, NumOps, RangeCompletableItem, BinnedStreamKindScalar, EventsNodeProcessor, NumBinnedPipeline, NumOps, RangeCompletableItem, StreamKind,
StreamKind,
}; };
use crate::decode::{ use crate::decode::{
BigEndian, Endianness, EventValueFromBytes, EventValueShape, EventValues, EventValuesDim0Case, EventValuesDim1Case, BigEndian, Endianness, EventValueFromBytes, EventValueShape, EventValuesDim0Case, EventValuesDim1Case,
EventsDecodedStream, LittleEndian, NumFromBytes, EventsDecodedStream, LittleEndian, NumFromBytes,
}; };
use crate::eventblobs::EventBlobsComplete; use crate::eventblobs::EventBlobsComplete;
use crate::eventchunker::EventChunkerConf; use crate::eventchunker::EventChunkerConf;
use crate::frame::inmem::InMemoryFrameAsyncReadStream; use crate::frame::inmem::InMemoryFrameAsyncReadStream;
use crate::frame::makeframe::{decode_frame, make_frame, make_term_frame, FrameType}; use crate::frame::makeframe::{decode_frame, make_frame, make_term_frame, Framable, FrameType};
use crate::raw::{EventQueryJsonStringFrame, EventsQuery}; use crate::raw::{EventQueryJsonStringFrame, EventsQuery};
use crate::Sitemty; use crate::Sitemty;
use err::Error; use err::Error;
@@ -22,7 +21,6 @@ use futures_util::StreamExt;
use netpod::log::*; use netpod::log::*;
use netpod::{AggKind, ByteOrder, ByteSize, NodeConfigCached, PerfOpts, ScalarType, Shape}; use netpod::{AggKind, ByteOrder, ByteSize, NodeConfigCached, PerfOpts, ScalarType, Shape};
use parse::channelconfig::{extract_matching_config_entry, read_local_config, MatchingConfigEntry}; use parse::channelconfig::{extract_matching_config_entry, read_local_config, MatchingConfigEntry};
use serde::Serialize;
use std::io; use std::io;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::pin::Pin; use std::pin::Pin;
@@ -99,12 +97,10 @@ impl<E: Into<Error>> From<(E, OwnedWriteHalf)> for ConnErr {
} }
} }
pub trait Framable: FrameType + Serialize + Send {}
// returns Pin<Box<dyn Stream<Item = Sitemty<<ENP as EventsNodeProcessor>::Output>> + Send>> // returns Pin<Box<dyn Stream<Item = Sitemty<<ENP as EventsNodeProcessor>::Output>> + Send>>
fn make_num_pipeline_stream_evs<NTY, END, EVS, ENP>( fn make_num_pipeline_stream_evs<NTY, END, EVS, ENP>(
_event_value_shape: EVS, event_value_shape: EVS,
event_blobs: EventBlobsComplete, event_blobs: EventBlobsComplete,
) -> Pin<Box<dyn Stream<Item = Box<dyn Framable>> + Send>> ) -> Pin<Box<dyn Stream<Item = Box<dyn Framable>> + Send>>
where where
@@ -112,15 +108,16 @@ where
END: Endianness + 'static, END: Endianness + 'static,
EVS: EventValueShape<NTY, END> + EventValueFromBytes<NTY, END> + 'static, EVS: EventValueShape<NTY, END> + EventValueFromBytes<NTY, END> + 'static,
ENP: EventsNodeProcessor<Input = <EVS as EventValueFromBytes<NTY, END>>::Output>, ENP: EventsNodeProcessor<Input = <EVS as EventValueFromBytes<NTY, END>>::Output>,
Sitemty<<ENP as EventsNodeProcessor>::Output>: FrameType + Serialize, Sitemty<<ENP as EventsNodeProcessor>::Output>: Framable + 'static,
<ENP as EventsNodeProcessor>::Output: 'static,
{ {
NumBinnedPipeline::<NTY, END, ENP>::new(); NumBinnedPipeline::<NTY, END, ENP>::new();
let decs = EventsDecodedStream::<NTY, END, EVS>::new(event_blobs); let decs = EventsDecodedStream::<NTY, END, EVS>::new(event_value_shape, event_blobs);
let s2 = StreamExt::map(decs, |item| match item { let s2 = StreamExt::map(decs, |item| match item {
Ok(item) => match item { Ok(item) => match item {
StreamItem::DataItem(item) => match item { StreamItem::DataItem(item) => match item {
RangeCompletableItem::Data(item) => { RangeCompletableItem::Data(item) => {
let item = <ENP as EventsNodeProcessor>::process(&item); let item = <ENP as EventsNodeProcessor>::process(item);
Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))) Ok(StreamItem::DataItem(RangeCompletableItem::Data(item)))
} }
RangeCompletableItem::RangeComplete => Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)), RangeCompletableItem::RangeComplete => Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)),
@@ -129,7 +126,8 @@ where
StreamItem::Stats(item) => Ok(StreamItem::Stats(item)), StreamItem::Stats(item) => Ok(StreamItem::Stats(item)),
}, },
Err(e) => Err(e), Err(e) => Err(e),
}); })
.map(|item| Box::new(item) as Box<dyn Framable>);
Box::pin(s2) Box::pin(s2)
} }
@@ -192,6 +190,7 @@ macro_rules! pipe1 {
($nty:expr, $end:expr, $shape:expr, $agg_kind:expr, $event_blobs:expr) => { ($nty:expr, $end:expr, $shape:expr, $agg_kind:expr, $event_blobs:expr) => {
match $nty { match $nty {
ScalarType::I32 => pipe2!(i32, $end, $shape, $agg_kind, $event_blobs), ScalarType::I32 => pipe2!(i32, $end, $shape, $agg_kind, $event_blobs),
_ => err::todoval(),
} }
}; };
} }
@@ -290,7 +289,8 @@ async fn events_conn_handler_inner_try(
// The writeout does not need to be generic. // The writeout does not need to be generic.
let mut p1 = pipe1!(entry.scalar_type, entry.byte_order, shape, evq.agg_kind, event_blobs); let mut p1 = pipe1!(entry.scalar_type, entry.byte_order, shape, evq.agg_kind, event_blobs);
while let Some(item) = p1.next().await { while let Some(item) = p1.next().await {
match make_frame(&item) { let item = item.make_frame();
match item {
Ok(buf) => match netout.write_all(&buf).await { Ok(buf) => match netout.write_all(&buf).await {
Ok(_) => {} Ok(_) => {}
Err(e) => return Err((e, netout))?, Err(e) => return Err((e, netout))?,