From 99a312ec26305133206dbf4f84e814ce94268516 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Fri, 4 Jun 2021 23:04:23 +0200 Subject: [PATCH] WIP on emitable items --- disk/src/binned.rs | 18 ----- disk/src/decode.rs | 6 +- disk/src/lib.rs | 4 + disk/src/raw/conn.rs | 181 ++++++++++++++++++++++++++----------------- 4 files changed, 117 insertions(+), 92 deletions(-) diff --git a/disk/src/binned.rs b/disk/src/binned.rs index 4e6e7ab..58b2784 100644 --- a/disk/src/binned.rs +++ b/disk/src/binned.rs @@ -619,24 +619,6 @@ where } } -pub struct NumXAggToSingleBin { - _m: PhantomData, -} - -impl NumXAggToSingleBin { - pub fn new() -> Self { - Self { _m: PhantomData } - } -} - -impl EventsNodeProcessor for NumXAggToSingleBin { - type Input = VT; - type Output = NumSingleXBin; - fn process(inp: &EventValues) -> Self::Output { - err::todoval() - } -} - pub trait BinnedPipeline { type EventsDecoder: EventsDecoder; type EventsNodeProcessor: EventsNodeProcessor; diff --git a/disk/src/decode.rs b/disk/src/decode.rs index 9d03a14..5263c7c 100644 --- a/disk/src/decode.rs +++ b/disk/src/decode.rs @@ -1,3 +1,5 @@ +use crate::agg::eventbatch::MinMaxAvgScalarEventBatch; +use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch; use crate::agg::streams::StreamItem; use crate::binned::{EventsNodeProcessor, NumOps, RangeCompletableItem}; use crate::eventblobs::EventBlobsComplete; @@ -93,7 +95,7 @@ pub struct ProcAA { impl EventsNodeProcessor for ProcAA { type Input = NTY; - type Output = (); + type Output = MinMaxAvgScalarEventBatch; fn process(inp: &EventValues) -> Self::Output { todo!() } @@ -124,7 +126,7 @@ pub struct ProcBB { impl EventsNodeProcessor for ProcBB { type Input = Vec; - type Output = (); + type Output = MinMaxAvgScalarBinBatch; fn process(inp: &EventValues) -> Self::Output { todo!() } diff --git a/disk/src/lib.rs b/disk/src/lib.rs index 4bd3811..edf6f7e 100644 --- a/disk/src/lib.rs +++ b/disk/src/lib.rs @@ -1,3 +1,5 @@ +use crate::agg::streams::StreamItem; +use crate::binned::RangeCompletableItem; use crate::dataopen::open_files; use crate::dtflags::{ARRAY, BIG_ENDIAN, COMPRESSION, SHAPE}; use bytes::{Bytes, BytesMut}; @@ -514,6 +516,8 @@ impl futures_core::Stream for RawConcatChannelReader { } } +type Sitemty = Result>, Error>; + pub mod dtflags { pub const COMPRESSION: u8 = 0x80; pub const ARRAY: u8 = 0x40; diff --git a/disk/src/raw/conn.rs b/disk/src/raw/conn.rs index 952d102..feb83bb 100644 --- a/disk/src/raw/conn.rs +++ b/disk/src/raw/conn.rs @@ -3,7 +3,7 @@ use crate::agg::eventbatch::MinMaxAvgScalarEventBatch; use crate::agg::streams::StreamItem; use crate::agg::IntoDim1F32Stream; use crate::binned::{ - BinnedStreamKindScalar, EventsNodeProcessor, NumBinnedPipeline, NumOps, NumXAggToSingleBin, RangeCompletableItem, + BinnedStreamKindScalar, EventsNodeProcessor, MakeBytesFrame, NumBinnedPipeline, NumOps, RangeCompletableItem, StreamKind, }; use crate::decode::{ @@ -15,12 +15,14 @@ use crate::eventchunker::EventChunkerConf; use crate::frame::inmem::InMemoryFrameAsyncReadStream; use crate::frame::makeframe::{decode_frame, make_frame, make_term_frame, FrameType}; use crate::raw::{EventQueryJsonStringFrame, EventsQuery}; +use crate::Sitemty; use err::Error; use futures_core::Stream; use futures_util::StreamExt; use netpod::log::*; -use netpod::{AggKind, ByteOrder, ByteSize, NodeConfigCached, PerfOpts, Shape}; +use netpod::{AggKind, ByteOrder, ByteSize, NodeConfigCached, PerfOpts, ScalarType, Shape}; use parse::channelconfig::{extract_matching_config_entry, read_local_config, MatchingConfigEntry}; +use serde::Serialize; use std::io; use std::net::SocketAddr; use std::pin::Pin; @@ -97,23 +99,22 @@ impl> From<(E, OwnedWriteHalf)> for ConnErr { } } +pub trait Framable: FrameType + Serialize + Send {} + +// returns Pin::Output>> + Send>> + fn make_num_pipeline_stream_evs( - event_value_shape: EVS, + _event_value_shape: EVS, event_blobs: EventBlobsComplete, -) -> Pin< - Box< - dyn Stream::Output>>, Error>> + Send, - >, -> +) -> Pin> + Send>> where NTY: NumOps + NumFromBytes + 'static, END: Endianness + 'static, EVS: EventValueShape + EventValueFromBytes + 'static, ENP: EventsNodeProcessor>::Output>, + Sitemty<::Output>: FrameType + Serialize, { - let p1 = NumBinnedPipeline::::new(); - // TODO implement first and statically assume that we have a wave. - // TODO then implement scalar case with a different container type and get the type check working. + NumBinnedPipeline::::new(); let decs = EventsDecodedStream::::new(event_blobs); let s2 = StreamExt::map(decs, |item| match item { Ok(item) => match item { @@ -132,7 +133,7 @@ where Box::pin(s2) } -macro_rules! pipe3 { +macro_rules! pipe4 { ($nty:ident, $end:ident, $evs:ident, $evsv:expr, $agg_kind:expr, $event_blobs:expr) => { match $agg_kind { AggKind::DimXBins1 => make_num_pipeline_stream_evs::< @@ -151,11 +152,11 @@ macro_rules! pipe3 { }; } -macro_rules! pipe2 { +macro_rules! pipe3 { ($nty:ident, $end:ident, $shape:expr, $agg_kind:expr, $event_blobs:expr) => { match $shape { Shape::Scalar => { - pipe3!( + pipe4!( $nty, $end, EventValuesDim0Case, @@ -165,7 +166,7 @@ macro_rules! pipe2 { ) } Shape::Wave(n) => { - pipe3!( + pipe4!( $nty, $end, EventValuesDim1Case, @@ -178,11 +179,19 @@ macro_rules! pipe2 { }; } -macro_rules! pipe1 { +macro_rules! pipe2 { ($nty:ident, $end:expr, $shape:expr, $agg_kind:expr, $event_blobs:expr) => { match $end { - ByteOrder::LE => pipe2!($nty, LittleEndian, $shape, $agg_kind, $event_blobs), - ByteOrder::BE => pipe2!($nty, BigEndian, $shape, $agg_kind, $event_blobs), + ByteOrder::LE => pipe3!($nty, LittleEndian, $shape, $agg_kind, $event_blobs), + ByteOrder::BE => pipe3!($nty, BigEndian, $shape, $agg_kind, $event_blobs), + } + }; +} + +macro_rules! pipe1 { + ($nty:expr, $end:expr, $shape:expr, $agg_kind:expr, $event_blobs:expr) => { + match $nty { + ScalarType::I32 => pipe2!(i32, $end, $shape, $agg_kind, $event_blobs), } }; } @@ -262,7 +271,7 @@ async fn events_conn_handler_inner_try( compression: entry.is_compressed, }; - if true { + if false { // TODO use a requested buffer size let buffer_size = 1024 * 4; let event_chunker_conf = EventChunkerConf::new(ByteSize::kb(1024)); @@ -275,40 +284,81 @@ async fn events_conn_handler_inner_try( event_chunker_conf, ); let shape = entry.to_shape().unwrap(); - let p1 = pipe1!(i32, entry.byte_order, shape, evq.agg_kind, event_blobs); - } - - // TODO use a requested buffer size - let buffer_size = 1024 * 4; - let event_chunker_conf = EventChunkerConf::new(ByteSize::kb(1024)); - let s1 = EventBlobsComplete::new( - range.clone(), - channel_config.clone(), - node_config.node.clone(), - node_config.ix, - buffer_size, - event_chunker_conf, - ) - .into_dim_1_f32_stream(); - // TODO need to decide already here on the type I want to use. - let mut s1 = IntoBinnedXBins1::<_, BinnedStreamKindScalar>::into_binned_x_bins_1(s1); - let mut e = 0; - while let Some(item) = s1.next().await { - match &item { - Ok(StreamItem::DataItem(_)) => { - e += 1; + // TODO + // First, generalize over the number types. + // Then return boxed trait objects from the stream which are MakeFrame. + // The writeout does not need to be generic. + let mut p1 = pipe1!(entry.scalar_type, entry.byte_order, shape, evq.agg_kind, event_blobs); + while let Some(item) = p1.next().await { + match make_frame(&item) { + Ok(buf) => match netout.write_all(&buf).await { + Ok(_) => {} + Err(e) => return Err((e, netout))?, + }, + Err(e) => { + return Err((e, netout))?; + } } - Ok(_) => {} - Err(_) => {} } - match evq.agg_kind { - AggKind::DimXBins1 => { - match make_frame::< + let buf = make_term_frame(); + match netout.write_all(&buf).await { + Ok(_) => (), + Err(e) => return Err((e, netout))?, + } + match netout.flush().await { + Ok(_) => (), + Err(e) => return Err((e, netout))?, + } + Ok(()) + } else { + // TODO use a requested buffer size + let buffer_size = 1024 * 4; + let event_chunker_conf = EventChunkerConf::new(ByteSize::kb(1024)); + let s1 = EventBlobsComplete::new( + range.clone(), + channel_config.clone(), + node_config.node.clone(), + node_config.ix, + buffer_size, + event_chunker_conf, + ) + .into_dim_1_f32_stream(); + // TODO need to decide already here on the type I want to use. + let mut s1 = IntoBinnedXBins1::<_, BinnedStreamKindScalar>::into_binned_x_bins_1(s1); + let mut e = 0; + while let Some(item) = s1.next().await { + match &item { + Ok(StreamItem::DataItem(_)) => { + e += 1; + } + Ok(_) => {} + Err(_) => {} + } + match evq.agg_kind { + AggKind::DimXBins1 => { + match make_frame::< + Result< + StreamItem::XBinnedEvents>>, + Error, + >, + >(&item) + { + Ok(buf) => match netout.write_all(&buf).await { + Ok(_) => {} + Err(e) => return Err((e, netout))?, + }, + Err(e) => { + return Err((e, netout))?; + } + } + } + // TODO define this case: + AggKind::DimXBinsN(_xbincount) => match make_frame::< Result< StreamItem::XBinnedEvents>>, Error, >, - >(&item) + >(err::todoval()) { Ok(buf) => match netout.write_all(&buf).await { Ok(_) => {} @@ -317,32 +367,19 @@ async fn events_conn_handler_inner_try( Err(e) => { return Err((e, netout))?; } - } - } - // TODO define this case: - AggKind::DimXBinsN(_xbincount) => match make_frame::< - Result::XBinnedEvents>>, Error>, - >(err::todoval()) - { - Ok(buf) => match netout.write_all(&buf).await { - Ok(_) => {} - Err(e) => return Err((e, netout))?, }, - Err(e) => { - return Err((e, netout))?; - } - }, + } } + let buf = make_term_frame(); + match netout.write_all(&buf).await { + Ok(_) => (), + Err(e) => return Err((e, netout))?, + } + match netout.flush().await { + Ok(_) => (), + Err(e) => return Err((e, netout))?, + } + let _total_written_value_items = e; + Ok(()) } - let buf = make_term_frame(); - match netout.write_all(&buf).await { - Ok(_) => (), - Err(e) => return Err((e, netout))?, - } - match netout.flush().await { - Ok(_) => (), - Err(e) => return Err((e, netout))?, - } - let _total_written_value_items = e; - Ok(()) }