WIP on emitable items

This commit is contained in:
Dominik Werder
2021-06-04 23:04:23 +02:00
parent e4c5e05310
commit 99a312ec26
4 changed files with 117 additions and 92 deletions

View File

@@ -619,24 +619,6 @@ where
}
}
pub struct NumXAggToSingleBin<VT> {
_m: PhantomData<VT>,
}
impl<VT> NumXAggToSingleBin<VT> {
pub fn new() -> Self {
Self { _m: PhantomData }
}
}
impl<VT> EventsNodeProcessor for NumXAggToSingleBin<VT> {
type Input = VT;
type Output = NumSingleXBin<VT>;
fn process(inp: &EventValues<Self::Input>) -> Self::Output {
err::todoval()
}
}
pub trait BinnedPipeline {
type EventsDecoder: EventsDecoder;
type EventsNodeProcessor: EventsNodeProcessor;

View File

@@ -1,3 +1,5 @@
use crate::agg::eventbatch::MinMaxAvgScalarEventBatch;
use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch;
use crate::agg::streams::StreamItem;
use crate::binned::{EventsNodeProcessor, NumOps, RangeCompletableItem};
use crate::eventblobs::EventBlobsComplete;
@@ -93,7 +95,7 @@ pub struct ProcAA<NTY> {
impl<NTY> EventsNodeProcessor for ProcAA<NTY> {
type Input = NTY;
type Output = ();
type Output = MinMaxAvgScalarEventBatch;
fn process(inp: &EventValues<Self::Input>) -> Self::Output {
todo!()
}
@@ -124,7 +126,7 @@ pub struct ProcBB<NTY> {
impl<NTY> EventsNodeProcessor for ProcBB<NTY> {
type Input = Vec<NTY>;
type Output = ();
type Output = MinMaxAvgScalarBinBatch;
fn process(inp: &EventValues<Self::Input>) -> Self::Output {
todo!()
}

View File

@@ -1,3 +1,5 @@
use crate::agg::streams::StreamItem;
use crate::binned::RangeCompletableItem;
use crate::dataopen::open_files;
use crate::dtflags::{ARRAY, BIG_ENDIAN, COMPRESSION, SHAPE};
use bytes::{Bytes, BytesMut};
@@ -514,6 +516,8 @@ impl futures_core::Stream for RawConcatChannelReader {
}
}
type Sitemty<T> = Result<StreamItem<RangeCompletableItem<T>>, Error>;
pub mod dtflags {
pub const COMPRESSION: u8 = 0x80;
pub const ARRAY: u8 = 0x40;

View File

@@ -3,7 +3,7 @@ use crate::agg::eventbatch::MinMaxAvgScalarEventBatch;
use crate::agg::streams::StreamItem;
use crate::agg::IntoDim1F32Stream;
use crate::binned::{
BinnedStreamKindScalar, EventsNodeProcessor, NumBinnedPipeline, NumOps, NumXAggToSingleBin, RangeCompletableItem,
BinnedStreamKindScalar, EventsNodeProcessor, MakeBytesFrame, NumBinnedPipeline, NumOps, RangeCompletableItem,
StreamKind,
};
use crate::decode::{
@@ -15,12 +15,14 @@ use crate::eventchunker::EventChunkerConf;
use crate::frame::inmem::InMemoryFrameAsyncReadStream;
use crate::frame::makeframe::{decode_frame, make_frame, make_term_frame, FrameType};
use crate::raw::{EventQueryJsonStringFrame, EventsQuery};
use crate::Sitemty;
use err::Error;
use futures_core::Stream;
use futures_util::StreamExt;
use netpod::log::*;
use netpod::{AggKind, ByteOrder, ByteSize, NodeConfigCached, PerfOpts, Shape};
use netpod::{AggKind, ByteOrder, ByteSize, NodeConfigCached, PerfOpts, ScalarType, Shape};
use parse::channelconfig::{extract_matching_config_entry, read_local_config, MatchingConfigEntry};
use serde::Serialize;
use std::io;
use std::net::SocketAddr;
use std::pin::Pin;
@@ -97,23 +99,22 @@ impl<E: Into<Error>> From<(E, OwnedWriteHalf)> for ConnErr {
}
}
pub trait Framable: FrameType + Serialize + Send {}
// returns Pin<Box<dyn Stream<Item = Sitemty<<ENP as EventsNodeProcessor>::Output>> + Send>>
fn make_num_pipeline_stream_evs<NTY, END, EVS, ENP>(
event_value_shape: EVS,
_event_value_shape: EVS,
event_blobs: EventBlobsComplete,
) -> Pin<
Box<
dyn Stream<Item = Result<StreamItem<RangeCompletableItem<<ENP as EventsNodeProcessor>::Output>>, Error>> + Send,
>,
>
) -> Pin<Box<dyn Stream<Item = Box<dyn Framable>> + Send>>
where
NTY: NumOps + NumFromBytes<NTY, END> + 'static,
END: Endianness + 'static,
EVS: EventValueShape<NTY, END> + EventValueFromBytes<NTY, END> + 'static,
ENP: EventsNodeProcessor<Input = <EVS as EventValueFromBytes<NTY, END>>::Output>,
Sitemty<<ENP as EventsNodeProcessor>::Output>: FrameType + Serialize,
{
let p1 = NumBinnedPipeline::<NTY, END, ENP>::new();
// TODO implement first and statically assume that we have a wave.
// TODO then implement scalar case with a different container type and get the type check working.
NumBinnedPipeline::<NTY, END, ENP>::new();
let decs = EventsDecodedStream::<NTY, END, EVS>::new(event_blobs);
let s2 = StreamExt::map(decs, |item| match item {
Ok(item) => match item {
@@ -132,7 +133,7 @@ where
Box::pin(s2)
}
macro_rules! pipe3 {
macro_rules! pipe4 {
($nty:ident, $end:ident, $evs:ident, $evsv:expr, $agg_kind:expr, $event_blobs:expr) => {
match $agg_kind {
AggKind::DimXBins1 => make_num_pipeline_stream_evs::<
@@ -151,11 +152,11 @@ macro_rules! pipe3 {
};
}
macro_rules! pipe2 {
macro_rules! pipe3 {
($nty:ident, $end:ident, $shape:expr, $agg_kind:expr, $event_blobs:expr) => {
match $shape {
Shape::Scalar => {
pipe3!(
pipe4!(
$nty,
$end,
EventValuesDim0Case,
@@ -165,7 +166,7 @@ macro_rules! pipe2 {
)
}
Shape::Wave(n) => {
pipe3!(
pipe4!(
$nty,
$end,
EventValuesDim1Case,
@@ -178,11 +179,19 @@ macro_rules! pipe2 {
};
}
macro_rules! pipe1 {
macro_rules! pipe2 {
($nty:ident, $end:expr, $shape:expr, $agg_kind:expr, $event_blobs:expr) => {
match $end {
ByteOrder::LE => pipe2!($nty, LittleEndian, $shape, $agg_kind, $event_blobs),
ByteOrder::BE => pipe2!($nty, BigEndian, $shape, $agg_kind, $event_blobs),
ByteOrder::LE => pipe3!($nty, LittleEndian, $shape, $agg_kind, $event_blobs),
ByteOrder::BE => pipe3!($nty, BigEndian, $shape, $agg_kind, $event_blobs),
}
};
}
macro_rules! pipe1 {
($nty:expr, $end:expr, $shape:expr, $agg_kind:expr, $event_blobs:expr) => {
match $nty {
ScalarType::I32 => pipe2!(i32, $end, $shape, $agg_kind, $event_blobs),
}
};
}
@@ -262,7 +271,7 @@ async fn events_conn_handler_inner_try(
compression: entry.is_compressed,
};
if true {
if false {
// TODO use a requested buffer size
let buffer_size = 1024 * 4;
let event_chunker_conf = EventChunkerConf::new(ByteSize::kb(1024));
@@ -275,40 +284,81 @@ async fn events_conn_handler_inner_try(
event_chunker_conf,
);
let shape = entry.to_shape().unwrap();
let p1 = pipe1!(i32, entry.byte_order, shape, evq.agg_kind, event_blobs);
}
// TODO use a requested buffer size
let buffer_size = 1024 * 4;
let event_chunker_conf = EventChunkerConf::new(ByteSize::kb(1024));
let s1 = EventBlobsComplete::new(
range.clone(),
channel_config.clone(),
node_config.node.clone(),
node_config.ix,
buffer_size,
event_chunker_conf,
)
.into_dim_1_f32_stream();
// TODO need to decide already here on the type I want to use.
let mut s1 = IntoBinnedXBins1::<_, BinnedStreamKindScalar>::into_binned_x_bins_1(s1);
let mut e = 0;
while let Some(item) = s1.next().await {
match &item {
Ok(StreamItem::DataItem(_)) => {
e += 1;
// TODO
// First, generalize over the number types.
// Then return boxed trait objects from the stream which are MakeFrame.
// The writeout does not need to be generic.
let mut p1 = pipe1!(entry.scalar_type, entry.byte_order, shape, evq.agg_kind, event_blobs);
while let Some(item) = p1.next().await {
match make_frame(&item) {
Ok(buf) => match netout.write_all(&buf).await {
Ok(_) => {}
Err(e) => return Err((e, netout))?,
},
Err(e) => {
return Err((e, netout))?;
}
}
Ok(_) => {}
Err(_) => {}
}
match evq.agg_kind {
AggKind::DimXBins1 => {
match make_frame::<
let buf = make_term_frame();
match netout.write_all(&buf).await {
Ok(_) => (),
Err(e) => return Err((e, netout))?,
}
match netout.flush().await {
Ok(_) => (),
Err(e) => return Err((e, netout))?,
}
Ok(())
} else {
// TODO use a requested buffer size
let buffer_size = 1024 * 4;
let event_chunker_conf = EventChunkerConf::new(ByteSize::kb(1024));
let s1 = EventBlobsComplete::new(
range.clone(),
channel_config.clone(),
node_config.node.clone(),
node_config.ix,
buffer_size,
event_chunker_conf,
)
.into_dim_1_f32_stream();
// TODO need to decide already here on the type I want to use.
let mut s1 = IntoBinnedXBins1::<_, BinnedStreamKindScalar>::into_binned_x_bins_1(s1);
let mut e = 0;
while let Some(item) = s1.next().await {
match &item {
Ok(StreamItem::DataItem(_)) => {
e += 1;
}
Ok(_) => {}
Err(_) => {}
}
match evq.agg_kind {
AggKind::DimXBins1 => {
match make_frame::<
Result<
StreamItem<RangeCompletableItem<<BinnedStreamKindScalar as StreamKind>::XBinnedEvents>>,
Error,
>,
>(&item)
{
Ok(buf) => match netout.write_all(&buf).await {
Ok(_) => {}
Err(e) => return Err((e, netout))?,
},
Err(e) => {
return Err((e, netout))?;
}
}
}
// TODO define this case:
AggKind::DimXBinsN(_xbincount) => match make_frame::<
Result<
StreamItem<RangeCompletableItem<<BinnedStreamKindScalar as StreamKind>::XBinnedEvents>>,
Error,
>,
>(&item)
>(err::todoval())
{
Ok(buf) => match netout.write_all(&buf).await {
Ok(_) => {}
@@ -317,32 +367,19 @@ async fn events_conn_handler_inner_try(
Err(e) => {
return Err((e, netout))?;
}
}
}
// TODO define this case:
AggKind::DimXBinsN(_xbincount) => match make_frame::<
Result<StreamItem<RangeCompletableItem<<BinnedStreamKindScalar as StreamKind>::XBinnedEvents>>, Error>,
>(err::todoval())
{
Ok(buf) => match netout.write_all(&buf).await {
Ok(_) => {}
Err(e) => return Err((e, netout))?,
},
Err(e) => {
return Err((e, netout))?;
}
},
}
}
let buf = make_term_frame();
match netout.write_all(&buf).await {
Ok(_) => (),
Err(e) => return Err((e, netout))?,
}
match netout.flush().await {
Ok(_) => (),
Err(e) => return Err((e, netout))?,
}
let _total_written_value_items = e;
Ok(())
}
let buf = make_term_frame();
match netout.write_all(&buf).await {
Ok(_) => (),
Err(e) => return Err((e, netout))?,
}
match netout.flush().await {
Ok(_) => (),
Err(e) => return Err((e, netout))?,
}
let _total_written_value_items = e;
Ok(())
}