From debf06bc97e7da90d8316a6ea9c12bea6de5da82 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Tue, 6 Apr 2021 22:49:27 +0200 Subject: [PATCH] up --- bitshuffle/build.rs | 1 + disk/src/agg.rs | 153 +++++++++++++++++++++++++++++++++ disk/src/lib.rs | 50 +++++++---- retrieval/src/bin/retrieval.rs | 4 +- 4 files changed, 191 insertions(+), 17 deletions(-) create mode 100644 disk/src/agg.rs diff --git a/bitshuffle/build.rs b/bitshuffle/build.rs index 46a164c..c923a49 100644 --- a/bitshuffle/build.rs +++ b/bitshuffle/build.rs @@ -5,5 +5,6 @@ fn main() { .file("src/iochain.c") .file("src/lz4.c") .include("src") + .warnings(false) .compile("bitshufbundled"); } diff --git a/disk/src/agg.rs b/disk/src/agg.rs new file mode 100644 index 0000000..e9639a2 --- /dev/null +++ b/disk/src/agg.rs @@ -0,0 +1,153 @@ +pub trait AggregatorTdim { + type OutputValue: AggregatableXdim1Bin + AggregatableTdim; +} + +pub trait AggregatableXdim1Bin { + type Output: AggregatableXdim1Bin + AggregatableTdim; + fn into_agg(self) -> Self::Output; +} + +pub trait AggregatableTdim { + type Output: AggregatableXdim1Bin + AggregatableTdim; + type Aggregator: AggregatorTdim; + fn aggregator_new(&self) -> Self::Aggregator; +} + + +// dummy +impl AggregatableXdim1Bin for () { + type Output = (); + fn into_agg(self) -> Self::Output { todo!() } +} +impl AggregatableTdim for () { + type Output = (); + type Aggregator = (); + fn aggregator_new(&self) -> Self::Aggregator { + todo!() + } +} +impl AggregatorTdim for () { + type OutputValue = (); +} + + +pub struct ValuesDim1 { + tss: Vec, + values: Vec>, +} + +impl AggregatableXdim1Bin for ValuesDim1 { + type Output = MinMaxAvgScalarEventBatch; + fn into_agg(self) -> Self::Output { + todo!() + } +} + + +pub struct MinMaxAvgScalarEventBatch { + ts1s: Vec, + ts2s: Vec, + mins: Vec, + maxs: Vec, + avgs: Vec, +} + +impl AggregatableXdim1Bin for MinMaxAvgScalarEventBatch { + type Output = MinMaxAvgScalarEventBatch; + fn into_agg(self) -> Self::Output { + self + } +} + +impl AggregatableTdim for MinMaxAvgScalarEventBatch { + type Output = MinMaxAvgScalarBinBatch; + type Aggregator = MinMaxAvgScalarEventBatchAggregator; + fn aggregator_new(&self) -> Self::Aggregator { + todo!() + } +} + +pub struct MinMaxAvgScalarEventBatchAggregator {} + +impl AggregatorTdim for MinMaxAvgScalarEventBatchAggregator { + type OutputValue = MinMaxAvgScalarBinBatch; +} + + +pub struct MinMaxAvgScalarBinBatch { + ts1s: Vec, + ts2s: Vec, + mins: Vec, + maxs: Vec, + avgs: Vec, +} + +impl AggregatableXdim1Bin for MinMaxAvgScalarBinBatch { + type Output = MinMaxAvgScalarBinBatch; + fn into_agg(self) -> Self::Output { + todo!() + } +} + +impl AggregatableTdim for MinMaxAvgScalarBinBatch { + type Output = MinMaxAvgScalarBinSingle; + type Aggregator = MinMaxAvgScalarBinSingle; + fn aggregator_new(&self) -> Self::Aggregator { + todo!() + } +} + +pub struct MinMaxAvgScalarBinSingle { + ts1: u64, + ts2: u64, + min: f32, + max: f32, + avg: f32, +} + +impl AggregatableTdim for MinMaxAvgScalarBinSingle { + type Output = MinMaxAvgScalarBinSingle; + type Aggregator = MinMaxAvgScalarBinSingle; + fn aggregator_new(&self) -> Self::Aggregator { + todo!() + } +} + +impl AggregatorTdim for MinMaxAvgScalarBinSingle { + type OutputValue = (); +} + +impl AggregatableXdim1Bin for MinMaxAvgScalarBinSingle { + type Output = MinMaxAvgScalarBinSingle; + fn into_agg(self) -> Self::Output { + self + } +} + +#[test] +fn agg_x_dim_1() { + let vals = ValuesDim1 { + tss: vec![0, 1, 2, 3], + values: vec![ + vec![0., 0., 0.], + vec![1., 1., 1.], + vec![2., 2., 2.], + vec![3., 3., 3.], + ], + }; + // I want to distinguish already in the outer part between dim-0 and dim-1 and generate + // separate code for these cases... + // That means that also the reading chain itself needs to be typed on that. + // Need to supply some event-payload converter type which has that type as Output type. + let vals2 = vals.into_agg(); + // Now the T-binning: + + /* + T-aggregator must be able to produce empty-values of correct type even if we never get + a single value of input data. + Therefore, it needs the bin range definition. + How do I want to drive the system? + If I write the T-binner as a Stream, then I also need to pass it the input! + Meaning, I need to pass the Stream which produces the actual numbers from disk. + */ +} diff --git a/disk/src/lib.rs b/disk/src/lib.rs index b515fe2..f0a276c 100644 --- a/disk/src/lib.rs +++ b/disk/src/lib.rs @@ -1,3 +1,5 @@ +pub mod agg; + #[allow(unused_imports)] use tracing::{error, warn, info, debug, trace}; use err::Error; @@ -12,6 +14,7 @@ use futures_util::{pin_mut, StreamExt}; use bytes::{Bytes, BytesMut, BufMut, Buf}; use std::path::PathBuf; use bitshuffle::bitshuffle_decompress; +use async_channel::bounded; pub async fn read_test_1(query: &netpod::AggQuerySingleChannel) -> Result { @@ -352,12 +355,15 @@ pub fn parsed1(query: &netpod::AggQuerySingleChannel) -> impl Stream { - let mut buf = BytesMut::with_capacity(16); - + //let mut buf = BytesMut::with_capacity(16); // TODO put some interesting information to test - buf.put_u64_le(0xcafecafe); - - yield Ok(buf.freeze()) + //buf.put_u64_le(0xcafecafe); + //yield Ok(buf.freeze()) + for bufopt in evres.decomps { + if let Some(buf) = bufopt { + yield Ok(buf.freeze()); + } + } } Err(e) => { yield Err(e) @@ -406,7 +412,7 @@ impl EventChunker { // what I've found in the buffer // what I've consumed from the buffer // how many bytes I need min to make progress - let mut ret = EventFull::dummy(); + let mut ret = EventFull::empty(); let mut need_min = 0 as u32; use byteorder::{BE, ReadBytesExt}; //info!("parse_buf rb {}", buf.len()); @@ -466,8 +472,8 @@ impl EventChunker { let len1b = sl.read_i32::().unwrap(); assert!(len == len1b); sl.read_i64::().unwrap(); - let ts = sl.read_i64::().unwrap(); - let pulse = sl.read_i64::().unwrap(); + let ts = sl.read_i64::().unwrap() as u64; + let pulse = sl.read_i64::().unwrap() as u64; sl.read_i64::().unwrap(); let _status = sl.read_i8().unwrap(); let _severity = sl.read_i8().unwrap(); @@ -508,15 +514,20 @@ impl EventChunker { let k1 = len as u32 - p1 - 4; assert!(value_bytes < 1024 * 256); assert!(block_size == 1024 * 8); - let value_bytes = value_bytes; - let inp = [0; 16]; + //let value_bytes = value_bytes; let type_size = type_size(type_index); let ele_count = value_bytes / type_size as u64; let ele_size = type_size; + let decomp_bytes = (type_size * ele_count as u32) as usize; + let mut decomp = BytesMut::with_capacity(decomp_bytes); + unsafe { + decomp.set_len(decomp_bytes); + } //info!("try decompress value_bytes {} ele_size {} ele_count {} type_index {}", value_bytes, ele_size, ele_count, type_index); - let c1 = bitshuffle_decompress(&buf.as_ref()[p1 as usize..], &mut self.tmpbuf, ele_count as usize, ele_size as usize, 0); + let c1 = bitshuffle_decompress(&buf.as_ref()[p1 as usize..], &mut decomp, ele_count as usize, ele_size as usize, 0); //info!("decompress result: {:?}", c1); assert!(c1.unwrap() as u32 == k1); + ret.add_event(ts, pulse, Some(decomp)); } buf.advance(len as usize); need_min = 4; @@ -600,18 +611,27 @@ impl Stream for EventChunker { } pub struct EventFull { - - // TODO add structures to hold list of events - + tss: Vec, + pulses: Vec, + decomps: Vec>, } impl EventFull { - pub fn dummy() -> Self { + pub fn empty() -> Self { Self { + tss: vec![], + pulses: vec![], + decomps: vec![], } } + fn add_event(&mut self, ts: u64, pulse: u64, decomp: Option) { + self.tss.push(ts); + self.pulses.push(pulse); + self.decomps.push(decomp); + } + } diff --git a/retrieval/src/bin/retrieval.rs b/retrieval/src/bin/retrieval.rs index cefb3cc..f5d17e2 100644 --- a/retrieval/src/bin/retrieval.rs +++ b/retrieval/src/bin/retrieval.rs @@ -61,7 +61,7 @@ fn simple_fetch() { name: "S10BC01-DBAM070:BAM_CH1_NORM".into(), backend: "sf-databuffer".into(), }, - timebin: 18719, + timebin: 18720, tb_file_count: 1, split: 12, tbsize: 1000 * 60 * 60 * 24, @@ -97,7 +97,7 @@ fn simple_fetch() { let t2 = chrono::Utc::now(); let ms = t2.signed_duration_since(t1).num_milliseconds() as u64; let throughput = ntot / 1024 * 1000 / ms; - info!("total download bytes {} throughput {:5} kB/s", ntot, throughput); + info!("total download {} MB throughput {:5} kB/s", ntot / 1024 / 1024, throughput); //Err::<(), _>(format!("test error").into()) Ok(()) }).unwrap();