diff --git a/disk/src/agg.rs b/disk/src/agg.rs index 1bb1bfc..a687c2a 100644 --- a/disk/src/agg.rs +++ b/disk/src/agg.rs @@ -52,6 +52,7 @@ impl std::fmt::Debug for ValuesDim0 { } } +// TODO get rid of AggregatableXdim1Bin and ValuesDim1 impl AggregatableXdim1Bin for ValuesDim1 where SK: StreamKind, diff --git a/disk/src/binned.rs b/disk/src/binned.rs index 58b2784..aec6e15 100644 --- a/disk/src/binned.rs +++ b/disk/src/binned.rs @@ -20,7 +20,7 @@ use netpod::log::*; use netpod::{ AggKind, BinnedRange, NanoRange, NodeConfigCached, PerfOpts, PreBinnedPatchIterator, PreBinnedPatchRange, }; -use num_traits::Zero; +use num_traits::{AsPrimitive, Bounded, Zero}; use parse::channelconfig::{extract_matching_config_entry, read_local_config, MatchingConfigEntry}; use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize, Serializer}; @@ -555,8 +555,8 @@ impl TBinnedBins for MinMaxAvgScalarBinBatch { // I would like to decide on the disk-dtype first and get some generic intermediate type, and the // decide the AggKind, and maybe even other generic types. -pub trait NumOps: Sized + Send + Unpin + Zero + BitXor {} -impl NumOps for T where T: Sized + Send + Unpin + Zero + BitXor {} +pub trait NumOps: Sized + Copy + Send + Unpin + Zero + BitXor + AsPrimitive + Bounded + PartialOrd {} +impl NumOps for T where T: Sized + Copy + Send + Unpin + Zero + BitXor + AsPrimitive + Bounded + PartialOrd {} pub trait EventsDecoder { type Output; @@ -567,7 +567,7 @@ pub trait EventsDecoder { pub trait EventsNodeProcessor { type Input; type Output; - fn process(inp: &EventValues) -> Self::Output; + fn process(inp: EventValues) -> Self::Output; } pub struct NumEvents { diff --git a/disk/src/decode.rs b/disk/src/decode.rs index 5263c7c..337011b 100644 --- a/disk/src/decode.rs +++ b/disk/src/decode.rs @@ -1,13 +1,14 @@ -use crate::agg::eventbatch::MinMaxAvgScalarEventBatch; use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch; use crate::agg::streams::StreamItem; use crate::binned::{EventsNodeProcessor, NumOps, RangeCompletableItem}; use crate::eventblobs::EventBlobsComplete; use crate::eventchunker::EventFull; +use crate::frame::makeframe::{make_frame, Framable}; +use bytes::BytesMut; use err::Error; use futures_core::Stream; use futures_util::StreamExt; -use netpod::ScalarType; +use serde::{Deserialize, Serialize}; use std::marker::PhantomData; use std::mem::size_of; use std::pin::Pin; @@ -40,7 +41,7 @@ where NTY: NumFromBytes, { type Output; - fn convert(buf: &[u8]) -> Self::Output; + fn convert(&self, buf: &[u8]) -> Result; } impl EventValueFromBytes for EventValuesDim0Case @@ -48,8 +49,9 @@ where NTY: NumFromBytes, { type Output = NTY; - fn convert(buf: &[u8]) -> Self::Output { - NTY::convert(buf) + + fn convert(&self, buf: &[u8]) -> Result { + Ok(NTY::convert(buf)) } } @@ -58,16 +60,20 @@ where NTY: NumFromBytes, { type Output = Vec; - fn convert(buf: &[u8]) -> Self::Output { + + fn convert(&self, buf: &[u8]) -> Result { let es = size_of::(); let n1 = buf.len() / es; + if n1 != self.n as usize { + return Err(Error::with_msg(format!("ele count got {} exp {}", n1, self.n))); + } let mut vals = vec![]; // TODO could optimize using unsafe code.. for n2 in 0..n1 { let i1 = es * n2; vals.push(>::convert(&buf[i1..(i1 + es)])); } - vals + Ok(vals) } } @@ -95,8 +101,9 @@ pub struct ProcAA { impl EventsNodeProcessor for ProcAA { type Input = NTY; - type Output = MinMaxAvgScalarEventBatch; - fn process(inp: &EventValues) -> Self::Output { + type Output = MinMaxAvgScalarBinBatch; + + fn process(_inp: EventValues) -> Self::Output { todo!() } } @@ -124,11 +131,93 @@ pub struct ProcBB { _m1: PhantomData, } -impl EventsNodeProcessor for ProcBB { +#[derive(Serialize, Deserialize)] +pub struct MinMaxAvgScalarEventBatchGen { + pub tss: Vec, + pub mins: Vec>, + pub maxs: Vec>, + pub avgs: Vec>, +} + +impl MinMaxAvgScalarEventBatchGen { + pub fn empty() -> Self { + Self { + tss: vec![], + mins: vec![], + maxs: vec![], + avgs: vec![], + } + } +} + +impl Framable for Result>>, Error> +where + NTY: NumOps + Serialize, +{ + fn make_frame(&self) -> Result { + make_frame(self) + } +} + +impl EventsNodeProcessor for ProcBB +where + NTY: NumOps, +{ type Input = Vec; - type Output = MinMaxAvgScalarBinBatch; - fn process(inp: &EventValues) -> Self::Output { - todo!() + type Output = MinMaxAvgScalarEventBatchGen; + + fn process(inp: EventValues) -> Self::Output { + let nev = inp.tss.len(); + let mut ret = MinMaxAvgScalarEventBatchGen { + tss: inp.tss, + mins: Vec::with_capacity(nev), + maxs: Vec::with_capacity(nev), + avgs: Vec::with_capacity(nev), + }; + for i1 in 0..nev { + let mut min = None; + let mut max = None; + let mut sum = 0f32; + let mut count = 0; + let vals = &inp.values[i1]; + for i2 in 0..vals.len() { + let v = vals[i2]; + min = match min { + None => Some(v), + Some(min) => { + if v < min { + Some(v) + } else { + Some(min) + } + } + }; + max = match max { + None => Some(v), + Some(max) => { + if v > max { + Some(v) + } else { + Some(max) + } + } + }; + let vf = v.as_(); + if vf.is_nan() { + } else { + sum += vf; + count += 1; + } + } + ret.mins.push(min); + ret.maxs.push(max); + if count == 0 { + ret.avgs.push(None); + } else { + ret.avgs.push(Some(sum / count as f32)); + } + } + ret } } @@ -177,6 +266,7 @@ where END: Endianness, EVS: EventValueShape, { + evs: EVS, event_blobs: EventBlobsComplete, completed: bool, errored: bool, @@ -191,8 +281,9 @@ where END: Endianness, EVS: EventValueShape + EventValueFromBytes, { - pub fn new(event_blobs: EventBlobsComplete) -> Self { + pub fn new(evs: EVS, event_blobs: EventBlobsComplete) -> Self { Self { + evs, event_blobs, completed: false, errored: false, @@ -212,7 +303,7 @@ where let decomp = ev.decomps[i1].as_ref().unwrap().as_ref(); - let val = >::convert(decomp); + let val = self.evs.convert(decomp)?; ret.tss.push(ev.tss[i1]); ret.values.push(val); } diff --git a/disk/src/frame/makeframe.rs b/disk/src/frame/makeframe.rs index f365f29..98d3d2f 100644 --- a/disk/src/frame/makeframe.rs +++ b/disk/src/frame/makeframe.rs @@ -2,6 +2,7 @@ use crate::agg::eventbatch::MinMaxAvgScalarEventBatch; use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch; use crate::agg::streams::StreamItem; use crate::binned::RangeCompletableItem; +use crate::decode::MinMaxAvgScalarEventBatchGen; use crate::frame::inmem::InMemoryFrame; use crate::raw::EventQueryJsonStringFrame; use bytes::{BufMut, BytesMut}; @@ -28,6 +29,30 @@ impl FrameType for Result FrameType for Result>>, Error> { + const FRAME_TYPE_ID: u32 = 888888; +} + +pub trait ProvidesFrameType { + fn frame_type_id(&self) -> u32; +} + +pub trait Framable: Send { + fn make_frame(&self) -> Result; +} + +impl Framable for Result>, Error> { + fn make_frame(&self) -> Result { + make_frame(self) + } +} + +impl Framable for Result>, Error> { + fn make_frame(&self) -> Result { + make_frame(self) + } +} + pub fn make_frame(item: &FT) -> Result where FT: FrameType + Serialize, diff --git a/disk/src/raw/conn.rs b/disk/src/raw/conn.rs index feb83bb..d321e78 100644 --- a/disk/src/raw/conn.rs +++ b/disk/src/raw/conn.rs @@ -3,17 +3,16 @@ use crate::agg::eventbatch::MinMaxAvgScalarEventBatch; use crate::agg::streams::StreamItem; use crate::agg::IntoDim1F32Stream; use crate::binned::{ - BinnedStreamKindScalar, EventsNodeProcessor, MakeBytesFrame, NumBinnedPipeline, NumOps, RangeCompletableItem, - StreamKind, + BinnedStreamKindScalar, EventsNodeProcessor, NumBinnedPipeline, NumOps, RangeCompletableItem, StreamKind, }; use crate::decode::{ - BigEndian, Endianness, EventValueFromBytes, EventValueShape, EventValues, EventValuesDim0Case, EventValuesDim1Case, + BigEndian, Endianness, EventValueFromBytes, EventValueShape, EventValuesDim0Case, EventValuesDim1Case, EventsDecodedStream, LittleEndian, NumFromBytes, }; use crate::eventblobs::EventBlobsComplete; use crate::eventchunker::EventChunkerConf; use crate::frame::inmem::InMemoryFrameAsyncReadStream; -use crate::frame::makeframe::{decode_frame, make_frame, make_term_frame, FrameType}; +use crate::frame::makeframe::{decode_frame, make_frame, make_term_frame, Framable, FrameType}; use crate::raw::{EventQueryJsonStringFrame, EventsQuery}; use crate::Sitemty; use err::Error; @@ -22,7 +21,6 @@ use futures_util::StreamExt; use netpod::log::*; use netpod::{AggKind, ByteOrder, ByteSize, NodeConfigCached, PerfOpts, ScalarType, Shape}; use parse::channelconfig::{extract_matching_config_entry, read_local_config, MatchingConfigEntry}; -use serde::Serialize; use std::io; use std::net::SocketAddr; use std::pin::Pin; @@ -99,12 +97,10 @@ impl> From<(E, OwnedWriteHalf)> for ConnErr { } } -pub trait Framable: FrameType + Serialize + Send {} - // returns Pin::Output>> + Send>> fn make_num_pipeline_stream_evs( - _event_value_shape: EVS, + event_value_shape: EVS, event_blobs: EventBlobsComplete, ) -> Pin> + Send>> where @@ -112,15 +108,16 @@ where END: Endianness + 'static, EVS: EventValueShape + EventValueFromBytes + 'static, ENP: EventsNodeProcessor>::Output>, - Sitemty<::Output>: FrameType + Serialize, + Sitemty<::Output>: Framable + 'static, + ::Output: 'static, { NumBinnedPipeline::::new(); - let decs = EventsDecodedStream::::new(event_blobs); + let decs = EventsDecodedStream::::new(event_value_shape, event_blobs); let s2 = StreamExt::map(decs, |item| match item { Ok(item) => match item { StreamItem::DataItem(item) => match item { RangeCompletableItem::Data(item) => { - let item = ::process(&item); + let item = ::process(item); Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))) } RangeCompletableItem::RangeComplete => Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)), @@ -129,7 +126,8 @@ where StreamItem::Stats(item) => Ok(StreamItem::Stats(item)), }, Err(e) => Err(e), - }); + }) + .map(|item| Box::new(item) as Box); Box::pin(s2) } @@ -192,6 +190,7 @@ macro_rules! pipe1 { ($nty:expr, $end:expr, $shape:expr, $agg_kind:expr, $event_blobs:expr) => { match $nty { ScalarType::I32 => pipe2!(i32, $end, $shape, $agg_kind, $event_blobs), + _ => err::todoval(), } }; } @@ -290,7 +289,8 @@ async fn events_conn_handler_inner_try( // The writeout does not need to be generic. let mut p1 = pipe1!(entry.scalar_type, entry.byte_order, shape, evq.agg_kind, event_blobs); while let Some(item) = p1.next().await { - match make_frame(&item) { + let item = item.make_frame(); + match item { Ok(buf) => match netout.write_all(&buf).await { Ok(_) => {} Err(e) => return Err((e, netout))?,