From 8a1b094390b1826cf64ce19ee5d7792bd55c3f0e Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Sun, 11 Apr 2021 19:35:08 +0200 Subject: [PATCH] Uncompress and merge 24 hours of 100 Hz small waveform from 13 files in 8 seconds --- disk/src/agg.rs | 22 +++++++++++----------- disk/src/gen.rs | 36 +++++++++++++++--------------------- disk/src/lib.rs | 25 +++++++++++++++++++------ netpod/src/lib.rs | 2 +- 4 files changed, 46 insertions(+), 39 deletions(-) diff --git a/disk/src/agg.rs b/disk/src/agg.rs index aaa6a21..27e06bd 100644 --- a/disk/src/agg.rs +++ b/disk/src/agg.rs @@ -807,7 +807,7 @@ async fn agg_x_dim_0_inner() { let bin_count = 20; let ts1 = query.timebin as u64 * query.channel_config.time_bin_size; let ts2 = ts1 + HOUR * 24; - let fut1 = crate::EventBlobsComplete::new(&query, &node) + let fut1 = crate::EventBlobsComplete::new(&query, query.channel_config.clone(), &node) .into_dim_1_f32_stream() //.take(1000) .map(|q| { @@ -861,7 +861,7 @@ async fn agg_x_dim_1_inner() { let bin_count = 10; let ts1 = query.timebin as u64 * query.channel_config.time_bin_size; let ts2 = ts1 + HOUR * 24; - let fut1 = crate::EventBlobsComplete::new(&query, &node) + let fut1 = crate::EventBlobsComplete::new(&query, query.channel_config.clone(), &node) .into_dim_1_f32_stream() //.take(1000) .map(|q| { @@ -890,10 +890,6 @@ fn merge_0() { } async fn merge_0_inner() { - let nodes = vec![ - make_test_node(0), - make_test_node(1), - ]; let query = netpod::AggQuerySingleChannel { channel_config: ChannelConfig { channel: Channel { @@ -902,17 +898,21 @@ async fn merge_0_inner() { name: "wave1".into(), }, time_bin_size: DAY, - shape: Shape::Wave(1024), + shape: Shape::Wave(17), scalar_type: ScalarType::F64, big_endian: true, compression: true, }, timebin: 0, tb_file_count: 1, - buffer_size: 17, + buffer_size: 1024 * 8, }; - let streams: Vec<_> = nodes.into_iter().map(|node| { - crate::EventBlobsComplete::new(&query, &node) + let streams = (0..13).into_iter() + .map(|k| { + make_test_node(k) + }) + .map(|node| { + crate::EventBlobsComplete::new(&query, query.channel_config.clone(), &node) .into_dim_1_f32_stream() }) .collect(); @@ -920,7 +920,7 @@ async fn merge_0_inner() { .map(|k| { //info!("NEXT MERGED ITEM ts {:?}", k.as_ref().unwrap().tss); }) - .fold(0, |k, q| async { 0 }) + .fold(0, |k, q| ready(0)) .await; } diff --git a/disk/src/gen.rs b/disk/src/gen.rs index e5866d4..bd14a55 100644 --- a/disk/src/gen.rs +++ b/disk/src/gen.rs @@ -41,30 +41,24 @@ pub async fn gen_test_data() -> Result<(), Error> { }, time_bin_size: DAY, scalar_type: ScalarType::F64, - shape: Shape::Wave(9), + shape: Shape::Wave(17), big_endian: true, compression: true, }, - time_spacing: SEC * 1, + time_spacing: MS * 10, }; ensemble.channels.push(chn); } - let node0 = Node { - host: "localhost".into(), - port: 7780, - split: 0, - data_base_path: data_base_path.join("node00"), - ksprefix: ksprefix.clone(), - }; - let node1 = Node { - host: "localhost".into(), - port: 7781, - split: 1, - data_base_path: data_base_path.join("node01"), - ksprefix: ksprefix.clone(), - }; - ensemble.nodes.push(node0); - ensemble.nodes.push(node1); + for i1 in 0..13 { + let node = Node { + host: "localhost".into(), + port: 7780 + i1, + split: i1 as u8, + data_base_path: data_base_path.join(format!("node{:02}", i1)), + ksprefix: ksprefix.clone(), + }; + ensemble.nodes.push(node); + } for node in &ensemble.nodes { gen_node(node, &ensemble).await?; } @@ -173,7 +167,7 @@ async fn gen_event(file: &mut File, evix: u64, ts: u64, config: &ChannelConfig) match &config.scalar_type { ScalarType::F64 => { let ele_size = 8; - let mut vals = vec![0; ele_size * ele_count]; + let mut vals = vec![0; (ele_size * ele_count) as usize]; for i1 in 0..ele_count { let v = evix as f64; let a = v.to_be_bytes(); @@ -182,8 +176,8 @@ async fn gen_event(file: &mut File, evix: u64, ts: u64, config: &ChannelConfig) c1.seek(SeekFrom::Start(i1 as u64 * ele_size as u64))?; std::io::Write::write_all(&mut c1, &a)?; } - let mut comp = vec![0u8; ele_size * ele_count + 64]; - let n1 = bitshuffle_compress(&vals, &mut comp, ele_count, ele_size, 0).unwrap(); + let mut comp = vec![0u8; (ele_size * ele_count + 64) as usize]; + let n1 = bitshuffle_compress(&vals, &mut comp, ele_count as usize, ele_size as usize, 0).unwrap(); buf.put_u64(vals.len() as u64); let comp_block_size = 0; buf.put_u32(comp_block_size); diff --git a/disk/src/lib.rs b/disk/src/lib.rs index 1eccbea..c3888fa 100644 --- a/disk/src/lib.rs +++ b/disk/src/lib.rs @@ -16,7 +16,7 @@ use tokio::fs::{OpenOptions, File}; use bytes::{Bytes, BytesMut, Buf}; use std::path::PathBuf; use bitshuffle::bitshuffle_decompress; -use netpod::{ScalarType, Node}; +use netpod::{ScalarType, Shape, Node, ChannelConfig}; pub async fn read_test_1(query: &netpod::AggQuerySingleChannel, node: &Node) -> Result { @@ -354,7 +354,7 @@ pub fn parsed1(query: &netpod::AggQuerySingleChannel, node: &netpod::Node) -> im match fileres { Ok(file) => { let inp = Box::pin(file_content_stream(file, query.buffer_size as usize)); - let mut chunker = EventChunker::new(inp); + let mut chunker = EventChunker::new(inp, todo!()); while let Some(evres) = chunker.next().await { match evres { Ok(evres) => { @@ -384,17 +384,19 @@ pub fn parsed1(query: &netpod::AggQuerySingleChannel, node: &netpod::Node) -> im pub struct EventBlobsComplete { + channel_config: ChannelConfig, file_chan: async_channel::Receiver>, evs: Option, buffer_size: u32, } impl EventBlobsComplete { - pub fn new(query: &netpod::AggQuerySingleChannel, node: &netpod::Node) -> Self { + pub fn new(query: &netpod::AggQuerySingleChannel, channel_config: ChannelConfig, node: &netpod::Node) -> Self { Self { file_chan: open_files(query, node), evs: None, buffer_size: query.buffer_size, + channel_config, } } } @@ -424,7 +426,7 @@ impl Stream for EventBlobsComplete { match k { Ok(file) => { let inp = Box::pin(file_content_stream(file, self.buffer_size as usize)); - let mut chunker = EventChunker::new(inp); + let mut chunker = EventChunker::new(inp, self.channel_config.clone()); self.evs.replace(chunker); continue 'outer; } @@ -452,7 +454,7 @@ pub fn event_blobs_complete(query: &netpod::AggQuerySingleChannel, node: &netpod match fileres { Ok(file) => { let inp = Box::pin(file_content_stream(file, query.buffer_size as usize)); - let mut chunker = EventChunker::new(inp); + let mut chunker = EventChunker::new(inp, todo!()); while let Some(evres) = chunker.next().await { match evres { Ok(evres) => { @@ -479,6 +481,7 @@ pub struct EventChunker { polled: u32, state: DataFileState, tmpbuf: Vec, + channel_config: ChannelConfig, } enum DataFileState { @@ -488,7 +491,7 @@ enum DataFileState { impl EventChunker { - pub fn new(inp: Pin> + Send>>) -> Self { + pub fn new(inp: Pin> + Send>>, channel_config: ChannelConfig) -> Self { let mut inp = NeedMinBuffer::new(inp); inp.set_need_min(6); Self { @@ -497,6 +500,7 @@ impl EventChunker { polled: 0, state: DataFileState::FileHeader, tmpbuf: vec![0; 1024 * 1024 * 4], + channel_config, } } @@ -582,6 +586,9 @@ impl EventChunker { let is_array = type_flags & ARRAY != 0; let is_big_endian = type_flags & BIG_ENDIAN != 0; let is_shaped = type_flags & SHAPE != 0; + if let Shape::Wave(_) = self.channel_config.shape { + assert!(is_array); + } let compression_method = if is_compressed { sl.read_u8().unwrap() } @@ -613,6 +620,12 @@ impl EventChunker { let type_size = type_size(type_index); let ele_count = value_bytes / type_size as u64; let ele_size = type_size; + match self.channel_config.shape { + Shape::Wave(ele2) => { + assert!(ele2 == ele_count as u32); + } + _ => panic!(), + } let decomp_bytes = (type_size * ele_count as u32) as usize; let mut decomp = BytesMut::with_capacity(decomp_bytes); unsafe { diff --git a/netpod/src/lib.rs b/netpod/src/lib.rs index b04078d..ecdd28d 100644 --- a/netpod/src/lib.rs +++ b/netpod/src/lib.rs @@ -134,7 +134,7 @@ pub struct ChannelConfig { #[derive(Clone, Debug, Serialize, Deserialize)] pub enum Shape { Scalar, - Wave(usize), + Wave(u32), } pub mod timeunits {