From e4c5e05310e4263dcbd463cc313bb502995f6b5b Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Fri, 4 Jun 2021 17:57:40 +0200 Subject: [PATCH] WIP Refactor stream build up, it type checks --- daqbuffer/src/bin/daqbuffer.rs | 8 +- daqbuffer/src/test.rs | 9 +- disk/Cargo.toml | 2 +- disk/src/aggtest.rs | 6 +- disk/src/binned.rs | 139 ++++++++++++++--- disk/src/decode.rs | 275 +++++++++++++++++++++++++++++++++ disk/src/gen.rs | 56 ++++++- disk/src/lib.rs | 3 +- disk/src/raw/conn.rs | 122 ++++++++++++++- netpod/src/lib.rs | 42 ++++- parse/src/channelconfig.rs | 8 +- 11 files changed, 622 insertions(+), 48 deletions(-) create mode 100644 disk/src/decode.rs diff --git a/daqbuffer/src/bin/daqbuffer.rs b/daqbuffer/src/bin/daqbuffer.rs index aba5310..20944b3 100644 --- a/daqbuffer/src/bin/daqbuffer.rs +++ b/daqbuffer/src/bin/daqbuffer.rs @@ -3,8 +3,8 @@ use disk::cache::CacheUsage; use err::Error; use netpod::log::*; use netpod::{NodeConfig, NodeConfigCached}; -use tokio::io::AsyncReadExt; use tokio::fs::File; +use tokio::io::AsyncReadExt; pub fn main() { match taskrun::run(go()) { @@ -93,7 +93,9 @@ async fn go() -> Result<(), Error> { #[test] fn simple_fetch() { use netpod::Nanos; - use netpod::{timeunits::*, Channel, ChannelConfig, Cluster, Database, Node, NodeConfig, ScalarType, Shape}; + use netpod::{ + timeunits::*, ByteOrder, Channel, ChannelConfig, Cluster, Database, Node, NodeConfig, ScalarType, Shape, + }; taskrun::run(async { let t1 = chrono::Utc::now(); let node = Node { @@ -117,7 +119,7 @@ fn simple_fetch() { array: true, scalar_type: ScalarType::F64, shape: Shape::Wave(err::todoval()), - big_endian: true, + byte_order: ByteOrder::big_endian(), compression: true, }, timebin: 18720, diff --git a/daqbuffer/src/test.rs b/daqbuffer/src/test.rs index 6dc4afb..a4bebc1 100644 --- a/daqbuffer/src/test.rs +++ b/daqbuffer/src/test.rs @@ -4,7 +4,7 @@ use chrono::{DateTime, Utc}; use disk::agg::scalarbinbatch::MinMaxAvgScalarBinBatch; use disk::agg::streams::{Bins, StatsItem, StreamItem}; use disk::binned::RangeCompletableItem; -use disk::cache::BinnedQuery; +use disk::cache::{BinnedQuery, CacheUsage}; use disk::frame::inmem::InMemoryFrameAsyncReadStream; use disk::streamlog::Streamlog; use err::Error; @@ -93,7 +93,7 @@ async fn get_binned_binary_inner() -> Result<(), Error> { ) .await?; } - if true { + if false { get_binned_channel( "wave-u16-le-n77", "1970-01-01T01:11:00.000Z", @@ -105,7 +105,7 @@ async fn get_binned_binary_inner() -> Result<(), Error> { ) .await?; } - if true { + if false { get_binned_channel( "wave-u16-le-n77", "1970-01-01T01:42:00.000Z", @@ -144,7 +144,8 @@ where name: channel_name.into(), }; let range = NanoRange::from_date_time(beg_date, end_date); - let query = BinnedQuery::new(channel, range, bin_count, agg_kind); + let mut query = BinnedQuery::new(channel, range, bin_count, agg_kind); + query.set_cache_usage(CacheUsage::Ignore); let hp = HostPort::from_node(node0); let url = query.url(&hp); info!("get_binned_channel get {}", url); diff --git a/disk/Cargo.toml b/disk/Cargo.toml index 1dbfc00..21da2de 100644 --- a/disk/Cargo.toml +++ b/disk/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "disk" -version = "0.0.1-a.0" +version = "0.0.1-a.1" authors = ["Dominik Werder "] edition = "2018" diff --git a/disk/src/aggtest.rs b/disk/src/aggtest.rs index 46ea031..bb58adb 100644 --- a/disk/src/aggtest.rs +++ b/disk/src/aggtest.rs @@ -6,7 +6,7 @@ use crate::eventblobs::EventBlobsComplete; use crate::eventchunker::EventChunkerConf; use futures_util::StreamExt; use netpod::timeunits::*; -use netpod::{BinnedRange, ByteSize, Channel, ChannelConfig, NanoRange, Nanos, Node, ScalarType, Shape}; +use netpod::{BinnedRange, ByteOrder, ByteSize, Channel, ChannelConfig, NanoRange, Nanos, Node, ScalarType, Shape}; use std::future::ready; #[allow(unused_imports)] use tracing::{debug, error, info, trace, warn}; @@ -46,7 +46,7 @@ async fn agg_x_dim_0_inner() { array: false, shape: Shape::Scalar, scalar_type: ScalarType::F64, - big_endian: true, + byte_order: ByteOrder::big_endian(), compression: true, }, timebin: 18723, @@ -103,7 +103,7 @@ async fn agg_x_dim_1_inner() { array: true, shape: Shape::Wave(1024), scalar_type: ScalarType::F64, - big_endian: true, + byte_order: ByteOrder::big_endian(), compression: true, }, timebin: 0, diff --git a/disk/src/binned.rs b/disk/src/binned.rs index ef291c0..4e6e7ab 100644 --- a/disk/src/binned.rs +++ b/disk/src/binned.rs @@ -8,6 +8,7 @@ use crate::agg::{Fits, FitsInside}; use crate::binned::scalar::binned_stream; use crate::binnedstream::{BinnedScalarStreamFromPreBinnedPatches, BoxedStream}; use crate::cache::{BinnedQuery, MergedFromRemotes}; +use crate::decode::{Endianness, EventValues}; use crate::frame::makeframe::FrameType; use crate::raw::EventsQuery; use bytes::Bytes; @@ -25,6 +26,8 @@ use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize, Serializer}; use serde_json::Map; use std::future::Future; +use std::marker::PhantomData; +use std::ops::BitXor; use std::pin::Pin; use std::task::{Context, Poll}; use std::time::Duration; @@ -552,32 +555,130 @@ impl TBinnedBins for MinMaxAvgScalarBinBatch { // I would like to decide on the disk-dtype first and get some generic intermediate type, and the // decide the AggKind, and maybe even other generic types. -pub trait EventDecoder { +pub trait NumOps: Sized + Send + Unpin + Zero + BitXor {} +impl NumOps for T where T: Sized + Send + Unpin + Zero + BitXor {} + +pub trait EventsDecoder { type Output; -} - -pub struct U32EventsDecoder {} - -pub struct U32Events {} -pub struct U32SingleBins {} - -impl EventDecoder for U32EventsDecoder { - // TODO U32Event is just for demo. - type Output = U32Events; + fn ingest(&mut self, event: &[u8]); + fn result(&mut self) -> Self::Output; } pub trait EventsNodeProcessor { type Input; type Output; + fn process(inp: &EventValues) -> Self::Output; } -// TODO the avg needs to be f32, but the min/max have to be the regular type of the disk data. -// Try to make that generic... -pub struct U32XAggToSingleBin {} +pub struct NumEvents { + _m: PhantomData, +} +impl NumEvents { + pub fn new() -> Self { + Self { _m: PhantomData } + } +} -impl EventsNodeProcessor for U32XAggToSingleBin { - type Input = U32Events; - type Output = U32SingleBins; +pub struct NumSingleXBin { + _m: PhantomData, +} +impl NumSingleXBin { + pub fn new() -> Self { + Self { _m: PhantomData } + } +} + +pub struct NumEventsDecoder +where + END: Endianness, +{ + _m1: PhantomData, + _m2: PhantomData, +} + +impl NumEventsDecoder +where + END: Endianness, +{ + pub fn new() -> Self { + Self { + _m1: PhantomData, + _m2: PhantomData, + } + } +} + +impl EventsDecoder for NumEventsDecoder +where + END: Endianness, +{ + type Output = NumEvents; + fn ingest(&mut self, _event: &[u8]) {} + fn result(&mut self) -> Self::Output { + err::todoval() + } +} + +pub struct NumXAggToSingleBin { + _m: PhantomData, +} + +impl NumXAggToSingleBin { + pub fn new() -> Self { + Self { _m: PhantomData } + } +} + +impl EventsNodeProcessor for NumXAggToSingleBin { + type Input = VT; + type Output = NumSingleXBin; + fn process(inp: &EventValues) -> Self::Output { + err::todoval() + } +} + +pub trait BinnedPipeline { + type EventsDecoder: EventsDecoder; + type EventsNodeProcessor: EventsNodeProcessor; + fn events_decoder(&self) -> Self::EventsDecoder; + fn events_node_processor(&self) -> Self::EventsNodeProcessor; +} + +pub struct NumBinnedPipeline +where + END: Endianness, +{ + _m1: PhantomData, + _m2: PhantomData, + _m3: PhantomData, +} + +impl NumBinnedPipeline +where + END: Endianness, +{ + pub fn new() -> Self { + Self { + _m1: PhantomData, + _m2: PhantomData, + _m3: PhantomData, + } + } +} + +impl BinnedPipeline for NumBinnedPipeline +where + ENP: EventsNodeProcessor, + END: Endianness, +{ + type EventsDecoder = NumEventsDecoder; + type EventsNodeProcessor = ENP; + fn events_decoder(&self) -> Self::EventsDecoder { + todo!() + } + fn events_node_processor(&self) -> Self::EventsNodeProcessor { + todo!() + } } pub trait StreamKind: Clone + Unpin + Send + Sync + 'static { @@ -587,8 +688,6 @@ pub trait StreamKind: Clone + Unpin + Send + Sync + 'static { type XBinnedToTBinnedAggregator; type XBinnedToTBinnedStream: Stream>, Error>> + Send; - type EventsDecoder: EventDecoder; - type EventsNodeProcessor: EventsNodeProcessor; fn new_binned_from_prebinned( &self, @@ -642,8 +741,6 @@ impl StreamKind for BinnedStreamKindScalar { type TBinnedBins = MinMaxAvgScalarBinBatch; type XBinnedToTBinnedAggregator = Agg3; type XBinnedToTBinnedStream = BinnedT3Stream; - type EventsDecoder = U32EventsDecoder; - type EventsNodeProcessor = U32XAggToSingleBin; fn new_binned_from_prebinned( &self, diff --git a/disk/src/decode.rs b/disk/src/decode.rs new file mode 100644 index 0000000..9d03a14 --- /dev/null +++ b/disk/src/decode.rs @@ -0,0 +1,275 @@ +use crate::agg::streams::StreamItem; +use crate::binned::{EventsNodeProcessor, NumOps, RangeCompletableItem}; +use crate::eventblobs::EventBlobsComplete; +use crate::eventchunker::EventFull; +use err::Error; +use futures_core::Stream; +use futures_util::StreamExt; +use netpod::ScalarType; +use std::marker::PhantomData; +use std::mem::size_of; +use std::pin::Pin; +use std::task::{Context, Poll}; + +pub trait Endianness: Send + Unpin {} +pub struct LittleEndian {} +pub struct BigEndian {} +impl Endianness for LittleEndian {} +impl Endianness for BigEndian {} + +pub trait NumFromBytes { + fn convert(buf: &[u8]) -> NTY; +} + +impl NumFromBytes for i32 { + fn convert(buf: &[u8]) -> i32 { + i32::from_le_bytes(*arrayref::array_ref![buf, 0, 4]) + } +} + +impl NumFromBytes for i32 { + fn convert(buf: &[u8]) -> i32 { + i32::from_be_bytes(*arrayref::array_ref![buf, 0, 4]) + } +} + +pub trait EventValueFromBytes +where + NTY: NumFromBytes, +{ + type Output; + fn convert(buf: &[u8]) -> Self::Output; +} + +impl EventValueFromBytes for EventValuesDim0Case +where + NTY: NumFromBytes, +{ + type Output = NTY; + fn convert(buf: &[u8]) -> Self::Output { + NTY::convert(buf) + } +} + +impl EventValueFromBytes for EventValuesDim1Case +where + NTY: NumFromBytes, +{ + type Output = Vec; + fn convert(buf: &[u8]) -> Self::Output { + let es = size_of::(); + let n1 = buf.len() / es; + let mut vals = vec![]; + // TODO could optimize using unsafe code.. + for n2 in 0..n1 { + let i1 = es * n2; + vals.push(>::convert(&buf[i1..(i1 + es)])); + } + vals + } +} + +pub trait EventValueShape: EventValueFromBytes + Send + Unpin +where + NTY: NumFromBytes, +{ + type NumXAggToSingleBin: EventsNodeProcessor>::Output>; + type NumXAggToNBins: EventsNodeProcessor>::Output>; +} + +pub struct EventValuesDim0Case { + _m1: PhantomData, +} + +impl EventValuesDim0Case { + pub fn new() -> Self { + Self { _m1: PhantomData } + } +} + +pub struct ProcAA { + _m1: PhantomData, +} + +impl EventsNodeProcessor for ProcAA { + type Input = NTY; + type Output = (); + fn process(inp: &EventValues) -> Self::Output { + todo!() + } +} + +impl EventValueShape for EventValuesDim0Case +where + NTY: NumOps + NumFromBytes, +{ + type NumXAggToSingleBin = ProcAA; + type NumXAggToNBins = ProcAA; +} + +pub struct EventValuesDim1Case { + n: u32, + _m1: PhantomData, +} + +impl EventValuesDim1Case { + pub fn new(n: u32) -> Self { + Self { n, _m1: PhantomData } + } +} + +pub struct ProcBB { + _m1: PhantomData, +} + +impl EventsNodeProcessor for ProcBB { + type Input = Vec; + type Output = (); + fn process(inp: &EventValues) -> Self::Output { + todo!() + } +} + +impl EventValueShape for EventValuesDim1Case +where + NTY: NumOps + NumFromBytes, +{ + type NumXAggToSingleBin = ProcBB; + type NumXAggToNBins = ProcBB; +} + +pub struct EventValues { + pub tss: Vec, + pub values: Vec, +} + +impl EventValues { + pub fn empty() -> Self { + Self { + tss: vec![], + values: vec![], + } + } +} + +impl std::fmt::Debug for EventValues +where + VT: std::fmt::Debug, +{ + fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result { + write!( + fmt, + "count {} ts {:?} .. {:?} vals {:?} .. {:?}", + self.tss.len(), + self.tss.first(), + self.tss.last(), + self.values.first(), + self.values.last(), + ) + } +} + +pub struct EventsDecodedStream +where + NTY: NumOps + NumFromBytes, + END: Endianness, + EVS: EventValueShape, +{ + event_blobs: EventBlobsComplete, + completed: bool, + errored: bool, + _m1: PhantomData, + _m2: PhantomData, + _m3: PhantomData, +} + +impl EventsDecodedStream +where + NTY: NumOps + NumFromBytes, + END: Endianness, + EVS: EventValueShape + EventValueFromBytes, +{ + pub fn new(event_blobs: EventBlobsComplete) -> Self { + Self { + event_blobs, + completed: false, + errored: false, + _m1: PhantomData, + _m2: PhantomData, + _m3: PhantomData, + } + } + + fn decode(&mut self, ev: &EventFull) -> Result>::Output>, Error> { + let mut ret = EventValues::empty(); + for i1 in 0..ev.tss.len() { + // TODO check that dtype, event endianness and event shape match our static + // expectation about the data in this channel. + let _ty = &ev.scalar_types[i1]; + let _be = ev.be[i1]; + + let decomp = ev.decomps[i1].as_ref().unwrap().as_ref(); + + let val = >::convert(decomp); + ret.tss.push(ev.tss[i1]); + ret.values.push(val); + } + Ok(ret) + } +} + +impl Stream for EventsDecodedStream +where + NTY: NumOps + NumFromBytes, + END: Endianness, + EVS: EventValueShape + EventValueFromBytes, +{ + type Item = + Result>::Output>>>, Error>; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + use Poll::*; + loop { + break if self.completed { + panic!("poll_next on completed") + } else if self.errored { + self.completed = true; + Ready(None) + } else { + match self.event_blobs.poll_next_unpin(cx) { + Ready(item) => match item { + Some(item) => match item { + Ok(item) => match item { + StreamItem::DataItem(item) => match item { + RangeCompletableItem::RangeComplete => { + Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)))) + } + RangeCompletableItem::Data(item) => match self.decode(&item) { + Ok(res) => { + Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(res))))) + } + Err(e) => { + self.errored = true; + Ready(Some(Err(e))) + } + }, + }, + StreamItem::Log(item) => Ready(Some(Ok(StreamItem::Log(item)))), + StreamItem::Stats(item) => Ready(Some(Ok(StreamItem::Stats(item)))), + }, + Err(e) => { + self.errored = true; + Ready(Some(Err(e))) + } + }, + None => { + self.completed = true; + Ready(None) + } + }, + Pending => Pending, + } + }; + } + } +} diff --git a/disk/src/gen.rs b/disk/src/gen.rs index 53e5152..144cd88 100644 --- a/disk/src/gen.rs +++ b/disk/src/gen.rs @@ -2,7 +2,7 @@ use crate::ChannelConfigExt; use bitshuffle::bitshuffle_compress; use bytes::{BufMut, BytesMut}; use err::Error; -use netpod::{timeunits::*, Channel, ChannelConfig, Node, Shape}; +use netpod::{timeunits::*, ByteOrder, Channel, ChannelConfig, Node, Shape}; use netpod::{Nanos, ScalarType}; use std::path::{Path, PathBuf}; use tokio::fs::{File, OpenOptions}; @@ -18,6 +18,23 @@ pub async fn gen_test_data() -> Result<(), Error> { channels: vec![], }; { + let chn = ChannelGenProps { + config: ChannelConfig { + channel: Channel { + backend: "testbackend".into(), + name: "scalar-i32-be".into(), + }, + keyspace: 2, + time_bin_size: Nanos { ns: DAY }, + array: false, + scalar_type: ScalarType::I32, + shape: Shape::Scalar, + byte_order: ByteOrder::big_endian(), + compression: false, + }, + time_spacing: MS * 100, + }; + ensemble.channels.push(chn); let chn = ChannelGenProps { config: ChannelConfig { channel: Channel { @@ -29,7 +46,7 @@ pub async fn gen_test_data() -> Result<(), Error> { array: true, scalar_type: ScalarType::F64, shape: Shape::Wave(21), - big_endian: true, + byte_order: ByteOrder::big_endian(), compression: true, }, time_spacing: MS * 1000, @@ -46,7 +63,7 @@ pub async fn gen_test_data() -> Result<(), Error> { array: true, scalar_type: ScalarType::U16, shape: Shape::Wave(77), - big_endian: false, + byte_order: ByteOrder::little_endian(), compression: true, }, time_spacing: MS * 100, @@ -338,7 +355,11 @@ async fn gen_event( if config.compression { match config.shape { Shape::Wave(ele_count) => { - buf.put_u8(COMPRESSION | ARRAY | SHAPE | BIG_ENDIAN); + let mut flags = COMPRESSION | ARRAY | SHAPE; + if config.byte_order.is_be() { + flags |= BIG_ENDIAN; + } + buf.put_u8(flags); buf.put_u8(config.scalar_type.index()); let comp_method = 0 as u8; buf.put_u8(comp_method); @@ -350,7 +371,7 @@ async fn gen_event( let mut vals = vec![0; (ele_size * ele_count) as usize]; for i1 in 0..ele_count { let v = evix as f64; - let a = if config.big_endian { + let a = if config.byte_order.is_be() { v.to_be_bytes() } else { v.to_le_bytes() @@ -373,7 +394,7 @@ async fn gen_event( let mut vals = vec![0; (ele_size * ele_count) as usize]; for i1 in 0..ele_count { let v = evix as u16; - let a = if config.big_endian { + let a = if config.byte_order.is_be() { v.to_be_bytes() } else { v.to_le_bytes() @@ -397,7 +418,28 @@ async fn gen_event( _ => todo!("Shape not yet supported: {:?}", config.shape), } } else { - todo!("Uncompressed not yet supported"); + match config.shape { + Shape::Scalar => { + let mut flags = 0; + if config.byte_order.is_be() { + flags |= BIG_ENDIAN; + } + buf.put_u8(flags); + buf.put_u8(config.scalar_type.index()); + match &config.scalar_type { + ScalarType::I32 => { + let v = evix as i32; + if config.byte_order.is_be() { + buf.put_i32(v); + } else { + buf.put_i32_le(v); + }; + } + _ => todo!("Datatype not yet supported: {:?}", config.scalar_type), + } + } + _ => todo!("Shape not yet supported: {:?}", config.shape), + } } { let len = buf.len() as u32 + 4; diff --git a/disk/src/lib.rs b/disk/src/lib.rs index 3053257..4bd3811 100644 --- a/disk/src/lib.rs +++ b/disk/src/lib.rs @@ -23,6 +23,7 @@ pub mod binnedstream; pub mod cache; pub mod channelconfig; pub mod dataopen; +pub mod decode; pub mod eventblobs; pub mod eventchunker; pub mod frame; @@ -536,7 +537,7 @@ impl ChannelConfigExt for ChannelConfig { ret |= SHAPE; } } - if self.big_endian { + if self.byte_order.is_be() { ret |= BIG_ENDIAN; } if self.array { diff --git a/disk/src/raw/conn.rs b/disk/src/raw/conn.rs index 9bd59d3..952d102 100644 --- a/disk/src/raw/conn.rs +++ b/disk/src/raw/conn.rs @@ -2,19 +2,28 @@ use crate::agg::binnedx::IntoBinnedXBins1; use crate::agg::eventbatch::MinMaxAvgScalarEventBatch; use crate::agg::streams::StreamItem; use crate::agg::IntoDim1F32Stream; -use crate::binned::{BinnedStreamKindScalar, RangeCompletableItem, StreamKind}; +use crate::binned::{ + BinnedStreamKindScalar, EventsNodeProcessor, NumBinnedPipeline, NumOps, NumXAggToSingleBin, RangeCompletableItem, + StreamKind, +}; +use crate::decode::{ + BigEndian, Endianness, EventValueFromBytes, EventValueShape, EventValues, EventValuesDim0Case, EventValuesDim1Case, + EventsDecodedStream, LittleEndian, NumFromBytes, +}; use crate::eventblobs::EventBlobsComplete; use crate::eventchunker::EventChunkerConf; use crate::frame::inmem::InMemoryFrameAsyncReadStream; use crate::frame::makeframe::{decode_frame, make_frame, make_term_frame, FrameType}; use crate::raw::{EventQueryJsonStringFrame, EventsQuery}; use err::Error; +use futures_core::Stream; use futures_util::StreamExt; use netpod::log::*; -use netpod::{AggKind, ByteSize, NodeConfigCached, PerfOpts}; +use netpod::{AggKind, ByteOrder, ByteSize, NodeConfigCached, PerfOpts, Shape}; use parse::channelconfig::{extract_matching_config_entry, read_local_config, MatchingConfigEntry}; use std::io; use std::net::SocketAddr; +use std::pin::Pin; use tokio::io::AsyncWriteExt; use tokio::net::tcp::OwnedWriteHalf; use tokio::net::TcpStream; @@ -88,6 +97,96 @@ impl> From<(E, OwnedWriteHalf)> for ConnErr { } } +fn make_num_pipeline_stream_evs( + event_value_shape: EVS, + event_blobs: EventBlobsComplete, +) -> Pin< + Box< + dyn Stream::Output>>, Error>> + Send, + >, +> +where + NTY: NumOps + NumFromBytes + 'static, + END: Endianness + 'static, + EVS: EventValueShape + EventValueFromBytes + 'static, + ENP: EventsNodeProcessor>::Output>, +{ + let p1 = NumBinnedPipeline::::new(); + // TODO implement first and statically assume that we have a wave. + // TODO then implement scalar case with a different container type and get the type check working. + let decs = EventsDecodedStream::::new(event_blobs); + let s2 = StreamExt::map(decs, |item| match item { + Ok(item) => match item { + StreamItem::DataItem(item) => match item { + RangeCompletableItem::Data(item) => { + let item = ::process(&item); + Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))) + } + RangeCompletableItem::RangeComplete => Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)), + }, + StreamItem::Log(item) => Ok(StreamItem::Log(item)), + StreamItem::Stats(item) => Ok(StreamItem::Stats(item)), + }, + Err(e) => Err(e), + }); + Box::pin(s2) +} + +macro_rules! pipe3 { + ($nty:ident, $end:ident, $evs:ident, $evsv:expr, $agg_kind:expr, $event_blobs:expr) => { + match $agg_kind { + AggKind::DimXBins1 => make_num_pipeline_stream_evs::< + $nty, + $end, + $evs<$nty>, + <$evs<$nty> as EventValueShape<$nty, $end>>::NumXAggToSingleBin, + >($evsv, $event_blobs), + AggKind::DimXBinsN(_) => make_num_pipeline_stream_evs::< + $nty, + $end, + $evs<$nty>, + <$evs<$nty> as EventValueShape<$nty, $end>>::NumXAggToSingleBin, + >($evsv, $event_blobs), + } + }; +} + +macro_rules! pipe2 { + ($nty:ident, $end:ident, $shape:expr, $agg_kind:expr, $event_blobs:expr) => { + match $shape { + Shape::Scalar => { + pipe3!( + $nty, + $end, + EventValuesDim0Case, + EventValuesDim0Case::<$nty>::new(), + $agg_kind, + $event_blobs + ) + } + Shape::Wave(n) => { + pipe3!( + $nty, + $end, + EventValuesDim1Case, + EventValuesDim1Case::<$nty>::new(n), + $agg_kind, + $event_blobs + ) + } + } + }; +} + +macro_rules! pipe1 { + ($nty:ident, $end:expr, $shape:expr, $agg_kind:expr, $event_blobs:expr) => { + match $end { + ByteOrder::LE => pipe2!($nty, LittleEndian, $shape, $agg_kind, $event_blobs), + ByteOrder::BE => pipe2!($nty, BigEndian, $shape, $agg_kind, $event_blobs), + } + }; +} + async fn events_conn_handler_inner_try( stream: TcpStream, addr: SocketAddr, @@ -158,10 +257,27 @@ async fn events_conn_handler_inner_try( time_bin_size: entry.bs, shape: shape, scalar_type: entry.scalar_type.clone(), - big_endian: entry.is_big_endian, + byte_order: entry.byte_order.clone(), array: entry.is_array, compression: entry.is_compressed, }; + + if true { + // TODO use a requested buffer size + let buffer_size = 1024 * 4; + let event_chunker_conf = EventChunkerConf::new(ByteSize::kb(1024)); + let event_blobs = EventBlobsComplete::new( + range.clone(), + channel_config.clone(), + node_config.node.clone(), + node_config.ix, + buffer_size, + event_chunker_conf, + ); + let shape = entry.to_shape().unwrap(); + let p1 = pipe1!(i32, entry.byte_order, shape, evq.agg_kind, event_blobs); + } + // TODO use a requested buffer size let buffer_size = 1024 * 4; let event_chunker_conf = EventChunkerConf::new(ByteSize::kb(1024)); diff --git a/netpod/src/lib.rs b/netpod/src/lib.rs index 2a53fa3..4811fe9 100644 --- a/netpod/src/lib.rs +++ b/netpod/src/lib.rs @@ -251,6 +251,46 @@ impl NanoRange { } } +#[derive(Clone, Debug, Serialize, Deserialize)] +pub enum ByteOrder { + LE, + BE, +} + +impl ByteOrder { + pub fn little_endian() -> Self { + Self::LE + } + + pub fn big_endian() -> Self { + Self::BE + } + + pub fn from_dtype_flags(flags: u8) -> Self { + if flags & 0x20 == 0 { + Self::LE + } else { + Self::BE + } + } + + pub fn is_le(&self) -> bool { + if let Self::LE = self { + true + } else { + false + } + } + + pub fn is_be(&self) -> bool { + if let Self::BE = self { + true + } else { + false + } + } +} + #[derive(Clone, Debug, Serialize, Deserialize)] pub struct ChannelConfig { pub channel: Channel, @@ -260,7 +300,7 @@ pub struct ChannelConfig { pub compression: bool, pub shape: Shape, pub array: bool, - pub big_endian: bool, + pub byte_order: ByteOrder, } #[derive(Clone, Debug, Serialize, Deserialize)] diff --git a/parse/src/channelconfig.rs b/parse/src/channelconfig.rs index ff43b0e..9001f32 100644 --- a/parse/src/channelconfig.rs +++ b/parse/src/channelconfig.rs @@ -1,6 +1,6 @@ use err::Error; use netpod::timeunits::MS; -use netpod::{Channel, NanoRange, Nanos, Node, ScalarType, Shape}; +use netpod::{ByteOrder, Channel, NanoRange, Nanos, Node, ScalarType, Shape}; use nom::bytes::complete::take; use nom::number::complete::{be_i16, be_i32, be_i64, be_i8, be_u8}; use nom::Needed; @@ -54,7 +54,7 @@ pub struct ConfigEntry { pub is_compressed: bool, pub is_shaped: bool, pub is_array: bool, - pub is_big_endian: bool, + pub byte_order: ByteOrder, pub compression_method: Option, pub shape: Option>, pub source_name: Option, @@ -135,7 +135,7 @@ pub fn parse_entry(inp: &[u8]) -> NRes> { let (inp, dtmask) = be_u8(inp)?; let is_compressed = dtmask & 0x80 != 0; let is_array = dtmask & 0x40 != 0; - let is_big_endian = dtmask & 0x20 != 0; + let byte_order = ByteOrder::from_dtype_flags(dtmask); let is_shaped = dtmask & 0x10 != 0; let (inp, dtype) = be_u8(inp)?; if dtype > 13 { @@ -202,7 +202,7 @@ pub fn parse_entry(inp: &[u8]) -> NRes> { is_compressed: is_compressed, is_array: is_array, is_shaped: is_shaped, - is_big_endian: is_big_endian, + byte_order, compression_method: compression_method, shape, source_name: source_name,