WIP Refactor stream build up, it type checks

This commit is contained in:
Dominik Werder
2021-06-04 17:57:40 +02:00
parent b07fa84b42
commit e4c5e05310
11 changed files with 622 additions and 48 deletions

View File

@@ -3,8 +3,8 @@ use disk::cache::CacheUsage;
use err::Error; use err::Error;
use netpod::log::*; use netpod::log::*;
use netpod::{NodeConfig, NodeConfigCached}; use netpod::{NodeConfig, NodeConfigCached};
use tokio::io::AsyncReadExt;
use tokio::fs::File; use tokio::fs::File;
use tokio::io::AsyncReadExt;
pub fn main() { pub fn main() {
match taskrun::run(go()) { match taskrun::run(go()) {
@@ -93,7 +93,9 @@ async fn go() -> Result<(), Error> {
#[test] #[test]
fn simple_fetch() { fn simple_fetch() {
use netpod::Nanos; use netpod::Nanos;
use netpod::{timeunits::*, Channel, ChannelConfig, Cluster, Database, Node, NodeConfig, ScalarType, Shape}; use netpod::{
timeunits::*, ByteOrder, Channel, ChannelConfig, Cluster, Database, Node, NodeConfig, ScalarType, Shape,
};
taskrun::run(async { taskrun::run(async {
let t1 = chrono::Utc::now(); let t1 = chrono::Utc::now();
let node = Node { let node = Node {
@@ -117,7 +119,7 @@ fn simple_fetch() {
array: true, array: true,
scalar_type: ScalarType::F64, scalar_type: ScalarType::F64,
shape: Shape::Wave(err::todoval()), shape: Shape::Wave(err::todoval()),
big_endian: true, byte_order: ByteOrder::big_endian(),
compression: true, compression: true,
}, },
timebin: 18720, timebin: 18720,

View File

@@ -4,7 +4,7 @@ use chrono::{DateTime, Utc};
use disk::agg::scalarbinbatch::MinMaxAvgScalarBinBatch; use disk::agg::scalarbinbatch::MinMaxAvgScalarBinBatch;
use disk::agg::streams::{Bins, StatsItem, StreamItem}; use disk::agg::streams::{Bins, StatsItem, StreamItem};
use disk::binned::RangeCompletableItem; use disk::binned::RangeCompletableItem;
use disk::cache::BinnedQuery; use disk::cache::{BinnedQuery, CacheUsage};
use disk::frame::inmem::InMemoryFrameAsyncReadStream; use disk::frame::inmem::InMemoryFrameAsyncReadStream;
use disk::streamlog::Streamlog; use disk::streamlog::Streamlog;
use err::Error; use err::Error;
@@ -93,7 +93,7 @@ async fn get_binned_binary_inner() -> Result<(), Error> {
) )
.await?; .await?;
} }
if true { if false {
get_binned_channel( get_binned_channel(
"wave-u16-le-n77", "wave-u16-le-n77",
"1970-01-01T01:11:00.000Z", "1970-01-01T01:11:00.000Z",
@@ -105,7 +105,7 @@ async fn get_binned_binary_inner() -> Result<(), Error> {
) )
.await?; .await?;
} }
if true { if false {
get_binned_channel( get_binned_channel(
"wave-u16-le-n77", "wave-u16-le-n77",
"1970-01-01T01:42:00.000Z", "1970-01-01T01:42:00.000Z",
@@ -144,7 +144,8 @@ where
name: channel_name.into(), name: channel_name.into(),
}; };
let range = NanoRange::from_date_time(beg_date, end_date); let range = NanoRange::from_date_time(beg_date, end_date);
let query = BinnedQuery::new(channel, range, bin_count, agg_kind); let mut query = BinnedQuery::new(channel, range, bin_count, agg_kind);
query.set_cache_usage(CacheUsage::Ignore);
let hp = HostPort::from_node(node0); let hp = HostPort::from_node(node0);
let url = query.url(&hp); let url = query.url(&hp);
info!("get_binned_channel get {}", url); info!("get_binned_channel get {}", url);

View File

@@ -1,6 +1,6 @@
[package] [package]
name = "disk" name = "disk"
version = "0.0.1-a.0" version = "0.0.1-a.1"
authors = ["Dominik Werder <dominik.werder@gmail.com>"] authors = ["Dominik Werder <dominik.werder@gmail.com>"]
edition = "2018" edition = "2018"

View File

@@ -6,7 +6,7 @@ use crate::eventblobs::EventBlobsComplete;
use crate::eventchunker::EventChunkerConf; use crate::eventchunker::EventChunkerConf;
use futures_util::StreamExt; use futures_util::StreamExt;
use netpod::timeunits::*; use netpod::timeunits::*;
use netpod::{BinnedRange, ByteSize, Channel, ChannelConfig, NanoRange, Nanos, Node, ScalarType, Shape}; use netpod::{BinnedRange, ByteOrder, ByteSize, Channel, ChannelConfig, NanoRange, Nanos, Node, ScalarType, Shape};
use std::future::ready; use std::future::ready;
#[allow(unused_imports)] #[allow(unused_imports)]
use tracing::{debug, error, info, trace, warn}; use tracing::{debug, error, info, trace, warn};
@@ -46,7 +46,7 @@ async fn agg_x_dim_0_inner() {
array: false, array: false,
shape: Shape::Scalar, shape: Shape::Scalar,
scalar_type: ScalarType::F64, scalar_type: ScalarType::F64,
big_endian: true, byte_order: ByteOrder::big_endian(),
compression: true, compression: true,
}, },
timebin: 18723, timebin: 18723,
@@ -103,7 +103,7 @@ async fn agg_x_dim_1_inner() {
array: true, array: true,
shape: Shape::Wave(1024), shape: Shape::Wave(1024),
scalar_type: ScalarType::F64, scalar_type: ScalarType::F64,
big_endian: true, byte_order: ByteOrder::big_endian(),
compression: true, compression: true,
}, },
timebin: 0, timebin: 0,

View File

@@ -8,6 +8,7 @@ use crate::agg::{Fits, FitsInside};
use crate::binned::scalar::binned_stream; use crate::binned::scalar::binned_stream;
use crate::binnedstream::{BinnedScalarStreamFromPreBinnedPatches, BoxedStream}; use crate::binnedstream::{BinnedScalarStreamFromPreBinnedPatches, BoxedStream};
use crate::cache::{BinnedQuery, MergedFromRemotes}; use crate::cache::{BinnedQuery, MergedFromRemotes};
use crate::decode::{Endianness, EventValues};
use crate::frame::makeframe::FrameType; use crate::frame::makeframe::FrameType;
use crate::raw::EventsQuery; use crate::raw::EventsQuery;
use bytes::Bytes; use bytes::Bytes;
@@ -25,6 +26,8 @@ use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize, Serializer}; use serde::{Deserialize, Serialize, Serializer};
use serde_json::Map; use serde_json::Map;
use std::future::Future; use std::future::Future;
use std::marker::PhantomData;
use std::ops::BitXor;
use std::pin::Pin; use std::pin::Pin;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use std::time::Duration; use std::time::Duration;
@@ -552,32 +555,130 @@ impl TBinnedBins for MinMaxAvgScalarBinBatch {
// I would like to decide on the disk-dtype first and get some generic intermediate type, and the // I would like to decide on the disk-dtype first and get some generic intermediate type, and the
// decide the AggKind, and maybe even other generic types. // decide the AggKind, and maybe even other generic types.
pub trait EventDecoder { pub trait NumOps: Sized + Send + Unpin + Zero + BitXor {}
impl<T> NumOps for T where T: Sized + Send + Unpin + Zero + BitXor {}
pub trait EventsDecoder {
type Output; type Output;
} fn ingest(&mut self, event: &[u8]);
fn result(&mut self) -> Self::Output;
pub struct U32EventsDecoder {}
pub struct U32Events {}
pub struct U32SingleBins {}
impl EventDecoder for U32EventsDecoder {
// TODO U32Event is just for demo.
type Output = U32Events;
} }
pub trait EventsNodeProcessor { pub trait EventsNodeProcessor {
type Input; type Input;
type Output; type Output;
fn process(inp: &EventValues<Self::Input>) -> Self::Output;
} }
// TODO the avg needs to be f32, but the min/max have to be the regular type of the disk data. pub struct NumEvents<N> {
// Try to make that generic... _m: PhantomData<N>,
pub struct U32XAggToSingleBin {} }
impl<N> NumEvents<N> {
pub fn new() -> Self {
Self { _m: PhantomData }
}
}
impl EventsNodeProcessor for U32XAggToSingleBin { pub struct NumSingleXBin<N> {
type Input = U32Events; _m: PhantomData<N>,
type Output = U32SingleBins; }
impl<N> NumSingleXBin<N> {
pub fn new() -> Self {
Self { _m: PhantomData }
}
}
pub struct NumEventsDecoder<N, END>
where
END: Endianness,
{
_m1: PhantomData<N>,
_m2: PhantomData<END>,
}
impl<N, END> NumEventsDecoder<N, END>
where
END: Endianness,
{
pub fn new() -> Self {
Self {
_m1: PhantomData,
_m2: PhantomData,
}
}
}
impl<N, END> EventsDecoder for NumEventsDecoder<N, END>
where
END: Endianness,
{
type Output = NumEvents<N>;
fn ingest(&mut self, _event: &[u8]) {}
fn result(&mut self) -> Self::Output {
err::todoval()
}
}
pub struct NumXAggToSingleBin<VT> {
_m: PhantomData<VT>,
}
impl<VT> NumXAggToSingleBin<VT> {
pub fn new() -> Self {
Self { _m: PhantomData }
}
}
impl<VT> EventsNodeProcessor for NumXAggToSingleBin<VT> {
type Input = VT;
type Output = NumSingleXBin<VT>;
fn process(inp: &EventValues<Self::Input>) -> Self::Output {
err::todoval()
}
}
pub trait BinnedPipeline {
type EventsDecoder: EventsDecoder;
type EventsNodeProcessor: EventsNodeProcessor;
fn events_decoder(&self) -> Self::EventsDecoder;
fn events_node_processor(&self) -> Self::EventsNodeProcessor;
}
pub struct NumBinnedPipeline<N, END, ENP>
where
END: Endianness,
{
_m1: PhantomData<N>,
_m2: PhantomData<END>,
_m3: PhantomData<ENP>,
}
impl<N, END, ENP> NumBinnedPipeline<N, END, ENP>
where
END: Endianness,
{
pub fn new() -> Self {
Self {
_m1: PhantomData,
_m2: PhantomData,
_m3: PhantomData,
}
}
}
impl<N, END, ENP> BinnedPipeline for NumBinnedPipeline<N, END, ENP>
where
ENP: EventsNodeProcessor,
END: Endianness,
{
type EventsDecoder = NumEventsDecoder<N, END>;
type EventsNodeProcessor = ENP;
fn events_decoder(&self) -> Self::EventsDecoder {
todo!()
}
fn events_node_processor(&self) -> Self::EventsNodeProcessor {
todo!()
}
} }
pub trait StreamKind: Clone + Unpin + Send + Sync + 'static { pub trait StreamKind: Clone + Unpin + Send + Sync + 'static {
@@ -587,8 +688,6 @@ pub trait StreamKind: Clone + Unpin + Send + Sync + 'static {
type XBinnedToTBinnedAggregator; type XBinnedToTBinnedAggregator;
type XBinnedToTBinnedStream: Stream<Item = Result<StreamItem<RangeCompletableItem<Self::TBinnedBins>>, Error>> type XBinnedToTBinnedStream: Stream<Item = Result<StreamItem<RangeCompletableItem<Self::TBinnedBins>>, Error>>
+ Send; + Send;
type EventsDecoder: EventDecoder;
type EventsNodeProcessor: EventsNodeProcessor;
fn new_binned_from_prebinned( fn new_binned_from_prebinned(
&self, &self,
@@ -642,8 +741,6 @@ impl StreamKind for BinnedStreamKindScalar {
type TBinnedBins = MinMaxAvgScalarBinBatch; type TBinnedBins = MinMaxAvgScalarBinBatch;
type XBinnedToTBinnedAggregator = Agg3; type XBinnedToTBinnedAggregator = Agg3;
type XBinnedToTBinnedStream = BinnedT3Stream; type XBinnedToTBinnedStream = BinnedT3Stream;
type EventsDecoder = U32EventsDecoder;
type EventsNodeProcessor = U32XAggToSingleBin;
fn new_binned_from_prebinned( fn new_binned_from_prebinned(
&self, &self,

275
disk/src/decode.rs Normal file
View File

@@ -0,0 +1,275 @@
use crate::agg::streams::StreamItem;
use crate::binned::{EventsNodeProcessor, NumOps, RangeCompletableItem};
use crate::eventblobs::EventBlobsComplete;
use crate::eventchunker::EventFull;
use err::Error;
use futures_core::Stream;
use futures_util::StreamExt;
use netpod::ScalarType;
use std::marker::PhantomData;
use std::mem::size_of;
use std::pin::Pin;
use std::task::{Context, Poll};
pub trait Endianness: Send + Unpin {}
pub struct LittleEndian {}
pub struct BigEndian {}
impl Endianness for LittleEndian {}
impl Endianness for BigEndian {}
pub trait NumFromBytes<NTY, END> {
fn convert(buf: &[u8]) -> NTY;
}
impl NumFromBytes<i32, LittleEndian> for i32 {
fn convert(buf: &[u8]) -> i32 {
i32::from_le_bytes(*arrayref::array_ref![buf, 0, 4])
}
}
impl NumFromBytes<i32, BigEndian> for i32 {
fn convert(buf: &[u8]) -> i32 {
i32::from_be_bytes(*arrayref::array_ref![buf, 0, 4])
}
}
pub trait EventValueFromBytes<NTY, END>
where
NTY: NumFromBytes<NTY, END>,
{
type Output;
fn convert(buf: &[u8]) -> Self::Output;
}
impl<NTY, END> EventValueFromBytes<NTY, END> for EventValuesDim0Case<NTY>
where
NTY: NumFromBytes<NTY, END>,
{
type Output = NTY;
fn convert(buf: &[u8]) -> Self::Output {
NTY::convert(buf)
}
}
impl<NTY, END> EventValueFromBytes<NTY, END> for EventValuesDim1Case<NTY>
where
NTY: NumFromBytes<NTY, END>,
{
type Output = Vec<NTY>;
fn convert(buf: &[u8]) -> Self::Output {
let es = size_of::<NTY>();
let n1 = buf.len() / es;
let mut vals = vec![];
// TODO could optimize using unsafe code..
for n2 in 0..n1 {
let i1 = es * n2;
vals.push(<NTY as NumFromBytes<NTY, END>>::convert(&buf[i1..(i1 + es)]));
}
vals
}
}
pub trait EventValueShape<NTY, END>: EventValueFromBytes<NTY, END> + Send + Unpin
where
NTY: NumFromBytes<NTY, END>,
{
type NumXAggToSingleBin: EventsNodeProcessor<Input = <Self as EventValueFromBytes<NTY, END>>::Output>;
type NumXAggToNBins: EventsNodeProcessor<Input = <Self as EventValueFromBytes<NTY, END>>::Output>;
}
pub struct EventValuesDim0Case<NTY> {
_m1: PhantomData<NTY>,
}
impl<NTY> EventValuesDim0Case<NTY> {
pub fn new() -> Self {
Self { _m1: PhantomData }
}
}
pub struct ProcAA<NTY> {
_m1: PhantomData<NTY>,
}
impl<NTY> EventsNodeProcessor for ProcAA<NTY> {
type Input = NTY;
type Output = ();
fn process(inp: &EventValues<Self::Input>) -> Self::Output {
todo!()
}
}
impl<NTY, END> EventValueShape<NTY, END> for EventValuesDim0Case<NTY>
where
NTY: NumOps + NumFromBytes<NTY, END>,
{
type NumXAggToSingleBin = ProcAA<NTY>;
type NumXAggToNBins = ProcAA<NTY>;
}
pub struct EventValuesDim1Case<NTY> {
n: u32,
_m1: PhantomData<NTY>,
}
impl<NTY> EventValuesDim1Case<NTY> {
pub fn new(n: u32) -> Self {
Self { n, _m1: PhantomData }
}
}
pub struct ProcBB<NTY> {
_m1: PhantomData<NTY>,
}
impl<NTY> EventsNodeProcessor for ProcBB<NTY> {
type Input = Vec<NTY>;
type Output = ();
fn process(inp: &EventValues<Self::Input>) -> Self::Output {
todo!()
}
}
impl<NTY, END> EventValueShape<NTY, END> for EventValuesDim1Case<NTY>
where
NTY: NumOps + NumFromBytes<NTY, END>,
{
type NumXAggToSingleBin = ProcBB<NTY>;
type NumXAggToNBins = ProcBB<NTY>;
}
pub struct EventValues<VT> {
pub tss: Vec<u64>,
pub values: Vec<VT>,
}
impl<VT> EventValues<VT> {
pub fn empty() -> Self {
Self {
tss: vec![],
values: vec![],
}
}
}
impl<VT> std::fmt::Debug for EventValues<VT>
where
VT: std::fmt::Debug,
{
fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(
fmt,
"count {} ts {:?} .. {:?} vals {:?} .. {:?}",
self.tss.len(),
self.tss.first(),
self.tss.last(),
self.values.first(),
self.values.last(),
)
}
}
pub struct EventsDecodedStream<NTY, END, EVS>
where
NTY: NumOps + NumFromBytes<NTY, END>,
END: Endianness,
EVS: EventValueShape<NTY, END>,
{
event_blobs: EventBlobsComplete,
completed: bool,
errored: bool,
_m1: PhantomData<NTY>,
_m2: PhantomData<END>,
_m3: PhantomData<EVS>,
}
impl<NTY, END, EVS> EventsDecodedStream<NTY, END, EVS>
where
NTY: NumOps + NumFromBytes<NTY, END>,
END: Endianness,
EVS: EventValueShape<NTY, END> + EventValueFromBytes<NTY, END>,
{
pub fn new(event_blobs: EventBlobsComplete) -> Self {
Self {
event_blobs,
completed: false,
errored: false,
_m1: PhantomData,
_m2: PhantomData,
_m3: PhantomData,
}
}
fn decode(&mut self, ev: &EventFull) -> Result<EventValues<<EVS as EventValueFromBytes<NTY, END>>::Output>, Error> {
let mut ret = EventValues::empty();
for i1 in 0..ev.tss.len() {
// TODO check that dtype, event endianness and event shape match our static
// expectation about the data in this channel.
let _ty = &ev.scalar_types[i1];
let _be = ev.be[i1];
let decomp = ev.decomps[i1].as_ref().unwrap().as_ref();
let val = <EVS as EventValueFromBytes<NTY, END>>::convert(decomp);
ret.tss.push(ev.tss[i1]);
ret.values.push(val);
}
Ok(ret)
}
}
impl<NTY, END, EVS> Stream for EventsDecodedStream<NTY, END, EVS>
where
NTY: NumOps + NumFromBytes<NTY, END>,
END: Endianness,
EVS: EventValueShape<NTY, END> + EventValueFromBytes<NTY, END>,
{
type Item =
Result<StreamItem<RangeCompletableItem<EventValues<<EVS as EventValueFromBytes<NTY, END>>::Output>>>, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
loop {
break if self.completed {
panic!("poll_next on completed")
} else if self.errored {
self.completed = true;
Ready(None)
} else {
match self.event_blobs.poll_next_unpin(cx) {
Ready(item) => match item {
Some(item) => match item {
Ok(item) => match item {
StreamItem::DataItem(item) => match item {
RangeCompletableItem::RangeComplete => {
Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete))))
}
RangeCompletableItem::Data(item) => match self.decode(&item) {
Ok(res) => {
Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(res)))))
}
Err(e) => {
self.errored = true;
Ready(Some(Err(e)))
}
},
},
StreamItem::Log(item) => Ready(Some(Ok(StreamItem::Log(item)))),
StreamItem::Stats(item) => Ready(Some(Ok(StreamItem::Stats(item)))),
},
Err(e) => {
self.errored = true;
Ready(Some(Err(e)))
}
},
None => {
self.completed = true;
Ready(None)
}
},
Pending => Pending,
}
};
}
}
}

View File

@@ -2,7 +2,7 @@ use crate::ChannelConfigExt;
use bitshuffle::bitshuffle_compress; use bitshuffle::bitshuffle_compress;
use bytes::{BufMut, BytesMut}; use bytes::{BufMut, BytesMut};
use err::Error; use err::Error;
use netpod::{timeunits::*, Channel, ChannelConfig, Node, Shape}; use netpod::{timeunits::*, ByteOrder, Channel, ChannelConfig, Node, Shape};
use netpod::{Nanos, ScalarType}; use netpod::{Nanos, ScalarType};
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use tokio::fs::{File, OpenOptions}; use tokio::fs::{File, OpenOptions};
@@ -18,6 +18,23 @@ pub async fn gen_test_data() -> Result<(), Error> {
channels: vec![], channels: vec![],
}; };
{ {
let chn = ChannelGenProps {
config: ChannelConfig {
channel: Channel {
backend: "testbackend".into(),
name: "scalar-i32-be".into(),
},
keyspace: 2,
time_bin_size: Nanos { ns: DAY },
array: false,
scalar_type: ScalarType::I32,
shape: Shape::Scalar,
byte_order: ByteOrder::big_endian(),
compression: false,
},
time_spacing: MS * 100,
};
ensemble.channels.push(chn);
let chn = ChannelGenProps { let chn = ChannelGenProps {
config: ChannelConfig { config: ChannelConfig {
channel: Channel { channel: Channel {
@@ -29,7 +46,7 @@ pub async fn gen_test_data() -> Result<(), Error> {
array: true, array: true,
scalar_type: ScalarType::F64, scalar_type: ScalarType::F64,
shape: Shape::Wave(21), shape: Shape::Wave(21),
big_endian: true, byte_order: ByteOrder::big_endian(),
compression: true, compression: true,
}, },
time_spacing: MS * 1000, time_spacing: MS * 1000,
@@ -46,7 +63,7 @@ pub async fn gen_test_data() -> Result<(), Error> {
array: true, array: true,
scalar_type: ScalarType::U16, scalar_type: ScalarType::U16,
shape: Shape::Wave(77), shape: Shape::Wave(77),
big_endian: false, byte_order: ByteOrder::little_endian(),
compression: true, compression: true,
}, },
time_spacing: MS * 100, time_spacing: MS * 100,
@@ -338,7 +355,11 @@ async fn gen_event(
if config.compression { if config.compression {
match config.shape { match config.shape {
Shape::Wave(ele_count) => { Shape::Wave(ele_count) => {
buf.put_u8(COMPRESSION | ARRAY | SHAPE | BIG_ENDIAN); let mut flags = COMPRESSION | ARRAY | SHAPE;
if config.byte_order.is_be() {
flags |= BIG_ENDIAN;
}
buf.put_u8(flags);
buf.put_u8(config.scalar_type.index()); buf.put_u8(config.scalar_type.index());
let comp_method = 0 as u8; let comp_method = 0 as u8;
buf.put_u8(comp_method); buf.put_u8(comp_method);
@@ -350,7 +371,7 @@ async fn gen_event(
let mut vals = vec![0; (ele_size * ele_count) as usize]; let mut vals = vec![0; (ele_size * ele_count) as usize];
for i1 in 0..ele_count { for i1 in 0..ele_count {
let v = evix as f64; let v = evix as f64;
let a = if config.big_endian { let a = if config.byte_order.is_be() {
v.to_be_bytes() v.to_be_bytes()
} else { } else {
v.to_le_bytes() v.to_le_bytes()
@@ -373,7 +394,7 @@ async fn gen_event(
let mut vals = vec![0; (ele_size * ele_count) as usize]; let mut vals = vec![0; (ele_size * ele_count) as usize];
for i1 in 0..ele_count { for i1 in 0..ele_count {
let v = evix as u16; let v = evix as u16;
let a = if config.big_endian { let a = if config.byte_order.is_be() {
v.to_be_bytes() v.to_be_bytes()
} else { } else {
v.to_le_bytes() v.to_le_bytes()
@@ -397,7 +418,28 @@ async fn gen_event(
_ => todo!("Shape not yet supported: {:?}", config.shape), _ => todo!("Shape not yet supported: {:?}", config.shape),
} }
} else { } else {
todo!("Uncompressed not yet supported"); match config.shape {
Shape::Scalar => {
let mut flags = 0;
if config.byte_order.is_be() {
flags |= BIG_ENDIAN;
}
buf.put_u8(flags);
buf.put_u8(config.scalar_type.index());
match &config.scalar_type {
ScalarType::I32 => {
let v = evix as i32;
if config.byte_order.is_be() {
buf.put_i32(v);
} else {
buf.put_i32_le(v);
};
}
_ => todo!("Datatype not yet supported: {:?}", config.scalar_type),
}
}
_ => todo!("Shape not yet supported: {:?}", config.shape),
}
} }
{ {
let len = buf.len() as u32 + 4; let len = buf.len() as u32 + 4;

View File

@@ -23,6 +23,7 @@ pub mod binnedstream;
pub mod cache; pub mod cache;
pub mod channelconfig; pub mod channelconfig;
pub mod dataopen; pub mod dataopen;
pub mod decode;
pub mod eventblobs; pub mod eventblobs;
pub mod eventchunker; pub mod eventchunker;
pub mod frame; pub mod frame;
@@ -536,7 +537,7 @@ impl ChannelConfigExt for ChannelConfig {
ret |= SHAPE; ret |= SHAPE;
} }
} }
if self.big_endian { if self.byte_order.is_be() {
ret |= BIG_ENDIAN; ret |= BIG_ENDIAN;
} }
if self.array { if self.array {

View File

@@ -2,19 +2,28 @@ use crate::agg::binnedx::IntoBinnedXBins1;
use crate::agg::eventbatch::MinMaxAvgScalarEventBatch; use crate::agg::eventbatch::MinMaxAvgScalarEventBatch;
use crate::agg::streams::StreamItem; use crate::agg::streams::StreamItem;
use crate::agg::IntoDim1F32Stream; use crate::agg::IntoDim1F32Stream;
use crate::binned::{BinnedStreamKindScalar, RangeCompletableItem, StreamKind}; use crate::binned::{
BinnedStreamKindScalar, EventsNodeProcessor, NumBinnedPipeline, NumOps, NumXAggToSingleBin, RangeCompletableItem,
StreamKind,
};
use crate::decode::{
BigEndian, Endianness, EventValueFromBytes, EventValueShape, EventValues, EventValuesDim0Case, EventValuesDim1Case,
EventsDecodedStream, LittleEndian, NumFromBytes,
};
use crate::eventblobs::EventBlobsComplete; use crate::eventblobs::EventBlobsComplete;
use crate::eventchunker::EventChunkerConf; use crate::eventchunker::EventChunkerConf;
use crate::frame::inmem::InMemoryFrameAsyncReadStream; use crate::frame::inmem::InMemoryFrameAsyncReadStream;
use crate::frame::makeframe::{decode_frame, make_frame, make_term_frame, FrameType}; use crate::frame::makeframe::{decode_frame, make_frame, make_term_frame, FrameType};
use crate::raw::{EventQueryJsonStringFrame, EventsQuery}; use crate::raw::{EventQueryJsonStringFrame, EventsQuery};
use err::Error; use err::Error;
use futures_core::Stream;
use futures_util::StreamExt; use futures_util::StreamExt;
use netpod::log::*; use netpod::log::*;
use netpod::{AggKind, ByteSize, NodeConfigCached, PerfOpts}; use netpod::{AggKind, ByteOrder, ByteSize, NodeConfigCached, PerfOpts, Shape};
use parse::channelconfig::{extract_matching_config_entry, read_local_config, MatchingConfigEntry}; use parse::channelconfig::{extract_matching_config_entry, read_local_config, MatchingConfigEntry};
use std::io; use std::io;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::pin::Pin;
use tokio::io::AsyncWriteExt; use tokio::io::AsyncWriteExt;
use tokio::net::tcp::OwnedWriteHalf; use tokio::net::tcp::OwnedWriteHalf;
use tokio::net::TcpStream; use tokio::net::TcpStream;
@@ -88,6 +97,96 @@ impl<E: Into<Error>> From<(E, OwnedWriteHalf)> for ConnErr {
} }
} }
fn make_num_pipeline_stream_evs<NTY, END, EVS, ENP>(
event_value_shape: EVS,
event_blobs: EventBlobsComplete,
) -> Pin<
Box<
dyn Stream<Item = Result<StreamItem<RangeCompletableItem<<ENP as EventsNodeProcessor>::Output>>, Error>> + Send,
>,
>
where
NTY: NumOps + NumFromBytes<NTY, END> + 'static,
END: Endianness + 'static,
EVS: EventValueShape<NTY, END> + EventValueFromBytes<NTY, END> + 'static,
ENP: EventsNodeProcessor<Input = <EVS as EventValueFromBytes<NTY, END>>::Output>,
{
let p1 = NumBinnedPipeline::<NTY, END, ENP>::new();
// TODO implement first and statically assume that we have a wave.
// TODO then implement scalar case with a different container type and get the type check working.
let decs = EventsDecodedStream::<NTY, END, EVS>::new(event_blobs);
let s2 = StreamExt::map(decs, |item| match item {
Ok(item) => match item {
StreamItem::DataItem(item) => match item {
RangeCompletableItem::Data(item) => {
let item = <ENP as EventsNodeProcessor>::process(&item);
Ok(StreamItem::DataItem(RangeCompletableItem::Data(item)))
}
RangeCompletableItem::RangeComplete => Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)),
},
StreamItem::Log(item) => Ok(StreamItem::Log(item)),
StreamItem::Stats(item) => Ok(StreamItem::Stats(item)),
},
Err(e) => Err(e),
});
Box::pin(s2)
}
macro_rules! pipe3 {
($nty:ident, $end:ident, $evs:ident, $evsv:expr, $agg_kind:expr, $event_blobs:expr) => {
match $agg_kind {
AggKind::DimXBins1 => make_num_pipeline_stream_evs::<
$nty,
$end,
$evs<$nty>,
<$evs<$nty> as EventValueShape<$nty, $end>>::NumXAggToSingleBin,
>($evsv, $event_blobs),
AggKind::DimXBinsN(_) => make_num_pipeline_stream_evs::<
$nty,
$end,
$evs<$nty>,
<$evs<$nty> as EventValueShape<$nty, $end>>::NumXAggToSingleBin,
>($evsv, $event_blobs),
}
};
}
macro_rules! pipe2 {
($nty:ident, $end:ident, $shape:expr, $agg_kind:expr, $event_blobs:expr) => {
match $shape {
Shape::Scalar => {
pipe3!(
$nty,
$end,
EventValuesDim0Case,
EventValuesDim0Case::<$nty>::new(),
$agg_kind,
$event_blobs
)
}
Shape::Wave(n) => {
pipe3!(
$nty,
$end,
EventValuesDim1Case,
EventValuesDim1Case::<$nty>::new(n),
$agg_kind,
$event_blobs
)
}
}
};
}
macro_rules! pipe1 {
($nty:ident, $end:expr, $shape:expr, $agg_kind:expr, $event_blobs:expr) => {
match $end {
ByteOrder::LE => pipe2!($nty, LittleEndian, $shape, $agg_kind, $event_blobs),
ByteOrder::BE => pipe2!($nty, BigEndian, $shape, $agg_kind, $event_blobs),
}
};
}
async fn events_conn_handler_inner_try( async fn events_conn_handler_inner_try(
stream: TcpStream, stream: TcpStream,
addr: SocketAddr, addr: SocketAddr,
@@ -158,10 +257,27 @@ async fn events_conn_handler_inner_try(
time_bin_size: entry.bs, time_bin_size: entry.bs,
shape: shape, shape: shape,
scalar_type: entry.scalar_type.clone(), scalar_type: entry.scalar_type.clone(),
big_endian: entry.is_big_endian, byte_order: entry.byte_order.clone(),
array: entry.is_array, array: entry.is_array,
compression: entry.is_compressed, compression: entry.is_compressed,
}; };
if true {
// TODO use a requested buffer size
let buffer_size = 1024 * 4;
let event_chunker_conf = EventChunkerConf::new(ByteSize::kb(1024));
let event_blobs = EventBlobsComplete::new(
range.clone(),
channel_config.clone(),
node_config.node.clone(),
node_config.ix,
buffer_size,
event_chunker_conf,
);
let shape = entry.to_shape().unwrap();
let p1 = pipe1!(i32, entry.byte_order, shape, evq.agg_kind, event_blobs);
}
// TODO use a requested buffer size // TODO use a requested buffer size
let buffer_size = 1024 * 4; let buffer_size = 1024 * 4;
let event_chunker_conf = EventChunkerConf::new(ByteSize::kb(1024)); let event_chunker_conf = EventChunkerConf::new(ByteSize::kb(1024));

View File

@@ -251,6 +251,46 @@ impl NanoRange {
} }
} }
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum ByteOrder {
LE,
BE,
}
impl ByteOrder {
pub fn little_endian() -> Self {
Self::LE
}
pub fn big_endian() -> Self {
Self::BE
}
pub fn from_dtype_flags(flags: u8) -> Self {
if flags & 0x20 == 0 {
Self::LE
} else {
Self::BE
}
}
pub fn is_le(&self) -> bool {
if let Self::LE = self {
true
} else {
false
}
}
pub fn is_be(&self) -> bool {
if let Self::BE = self {
true
} else {
false
}
}
}
#[derive(Clone, Debug, Serialize, Deserialize)] #[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ChannelConfig { pub struct ChannelConfig {
pub channel: Channel, pub channel: Channel,
@@ -260,7 +300,7 @@ pub struct ChannelConfig {
pub compression: bool, pub compression: bool,
pub shape: Shape, pub shape: Shape,
pub array: bool, pub array: bool,
pub big_endian: bool, pub byte_order: ByteOrder,
} }
#[derive(Clone, Debug, Serialize, Deserialize)] #[derive(Clone, Debug, Serialize, Deserialize)]

View File

@@ -1,6 +1,6 @@
use err::Error; use err::Error;
use netpod::timeunits::MS; use netpod::timeunits::MS;
use netpod::{Channel, NanoRange, Nanos, Node, ScalarType, Shape}; use netpod::{ByteOrder, Channel, NanoRange, Nanos, Node, ScalarType, Shape};
use nom::bytes::complete::take; use nom::bytes::complete::take;
use nom::number::complete::{be_i16, be_i32, be_i64, be_i8, be_u8}; use nom::number::complete::{be_i16, be_i32, be_i64, be_i8, be_u8};
use nom::Needed; use nom::Needed;
@@ -54,7 +54,7 @@ pub struct ConfigEntry {
pub is_compressed: bool, pub is_compressed: bool,
pub is_shaped: bool, pub is_shaped: bool,
pub is_array: bool, pub is_array: bool,
pub is_big_endian: bool, pub byte_order: ByteOrder,
pub compression_method: Option<CompressionMethod>, pub compression_method: Option<CompressionMethod>,
pub shape: Option<Vec<u32>>, pub shape: Option<Vec<u32>>,
pub source_name: Option<String>, pub source_name: Option<String>,
@@ -135,7 +135,7 @@ pub fn parse_entry(inp: &[u8]) -> NRes<Option<ConfigEntry>> {
let (inp, dtmask) = be_u8(inp)?; let (inp, dtmask) = be_u8(inp)?;
let is_compressed = dtmask & 0x80 != 0; let is_compressed = dtmask & 0x80 != 0;
let is_array = dtmask & 0x40 != 0; let is_array = dtmask & 0x40 != 0;
let is_big_endian = dtmask & 0x20 != 0; let byte_order = ByteOrder::from_dtype_flags(dtmask);
let is_shaped = dtmask & 0x10 != 0; let is_shaped = dtmask & 0x10 != 0;
let (inp, dtype) = be_u8(inp)?; let (inp, dtype) = be_u8(inp)?;
if dtype > 13 { if dtype > 13 {
@@ -202,7 +202,7 @@ pub fn parse_entry(inp: &[u8]) -> NRes<Option<ConfigEntry>> {
is_compressed: is_compressed, is_compressed: is_compressed,
is_array: is_array, is_array: is_array,
is_shaped: is_shaped, is_shaped: is_shaped,
is_big_endian: is_big_endian, byte_order,
compression_method: compression_method, compression_method: compression_method,
shape, shape,
source_name: source_name, source_name: source_name,