From b274e660ff57cf7c41c1b7fd60828eaed5cd91a4 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Sat, 10 Apr 2021 09:43:27 +0200 Subject: [PATCH] Getting closer --- disk/src/agg.rs | 15 ++++++++------- disk/src/gen.rs | 2 +- disk/src/lib.rs | 17 +++++++++-------- 3 files changed, 18 insertions(+), 16 deletions(-) diff --git a/disk/src/agg.rs b/disk/src/agg.rs index a4d1c2a..0cd4a9c 100644 --- a/disk/src/agg.rs +++ b/disk/src/agg.rs @@ -222,7 +222,7 @@ impl AggregatorTdim for MinMaxAvgScalarEventBatchAggregator { Some(ts) => { *ts < self.ts1 } - _ => panic!() + None => true, } } @@ -788,7 +788,7 @@ async fn agg_x_dim_0_inner() { buffer_size: 1024 * 4, }; let bin_count = 20; - let ts1 = query.timebin as u64 * query.channel_config.time_bin_size * MS; + let ts1 = query.timebin as u64 * query.channel_config.time_bin_size; let ts2 = ts1 + HOUR * 24; let fut1 = crate::EventBlobsComplete::new(&query, &node) .into_dim_1_f32_stream() @@ -822,6 +822,7 @@ fn agg_x_dim_1() { async fn agg_x_dim_1_inner() { // sf-databuffer // /data/sf-databuffer/daq_swissfel/daq_swissfel_3/byTime/S10BC01-DBAM070\:BAM_CH1_NORM/* + // S10BC01-DBAM070:BAM_CH1_NORM let node = Node { host: "localhost".into(), port: 8888, @@ -834,7 +835,7 @@ async fn agg_x_dim_1_inner() { channel: Channel { backend: "ks".into(), keyspace: 3, - name: "S10BC01-DBAM070:BAM_CH1_NORM".into(), + name: "wave1".into(), }, time_bin_size: DAY, shape: Shape::Wave(1024), @@ -842,12 +843,12 @@ async fn agg_x_dim_1_inner() { big_endian: true, compression: true, }, - timebin: 18722, + timebin: 0, tb_file_count: 1, - buffer_size: 1024 * 4, + buffer_size: 17, }; - let bin_count = 100; - let ts1 = query.timebin as u64 * query.channel_config.time_bin_size * MS; + let bin_count = 10; + let ts1 = query.timebin as u64 * query.channel_config.time_bin_size; let ts2 = ts1 + HOUR * 24; let fut1 = crate::EventBlobsComplete::new(&query, &node) .into_dim_1_f32_stream() diff --git a/disk/src/gen.rs b/disk/src/gen.rs index 9d70eda..314dbec 100644 --- a/disk/src/gen.rs +++ b/disk/src/gen.rs @@ -175,7 +175,7 @@ async fn gen_event(file: &mut File, ts: u64, config: &ChannelConfig) -> Result<( buf.put_u64(vals.len() as u64); let comp_block_size = 0; buf.put_u32(comp_block_size); - buf.put(comp.as_slice()); + buf.put(&comp[..n1]); } _ => todo!() } diff --git a/disk/src/lib.rs b/disk/src/lib.rs index 66c571c..250721a 100644 --- a/disk/src/lib.rs +++ b/disk/src/lib.rs @@ -597,16 +597,17 @@ impl EventChunker { assert!(!is_shaped || (shape_dim >= 1 && shape_dim <= 2)); let mut shape_lens = [0, 0, 0, 0]; for i1 in 0..shape_dim { - shape_lens[i1 as usize] = sl.read_u8().unwrap(); + shape_lens[i1 as usize] = sl.read_u32::().unwrap(); } if is_compressed { - //info!("event ts {} is_compressed {}", ts, is_compressed); + //debug!("event ts {} is_compressed {}", ts, is_compressed); let value_bytes = sl.read_u64::().unwrap(); let block_size = sl.read_u32::().unwrap(); let p1 = sl.position() as u32; let k1 = len as u32 - p1 - 4; + debug!("event len {} ts {} is_compressed {} shape_dim {} len-dim-0 {} value_bytes {} block_size {}", len, ts, is_compressed, shape_dim, shape_lens[0], value_bytes, block_size); assert!(value_bytes < 1024 * 256); - assert!(block_size == 1024 * 8); + assert!(block_size < 1024 * 32); //let value_bytes = value_bytes; let type_size = type_size(type_index); let ele_count = value_bytes / type_size as u64; @@ -616,10 +617,10 @@ impl EventChunker { unsafe { decomp.set_len(decomp_bytes); } - //info!("try decompress value_bytes {} ele_size {} ele_count {} type_index {}", value_bytes, ele_size, ele_count, type_index); - let c1 = bitshuffle_decompress(&buf.as_ref()[p1 as usize..], &mut decomp, ele_count as usize, ele_size as usize, 0); - //info!("decompress result: {:?}", c1); - assert!(c1.unwrap() as u32 == k1); + debug!("try decompress value_bytes {} ele_size {} ele_count {} type_index {}", value_bytes, ele_size, ele_count, type_index); + let c1 = bitshuffle_decompress(&buf.as_ref()[p1 as usize..], &mut decomp, ele_count as usize, ele_size as usize, 0).unwrap(); + debug!("decompress result c1 {} k1 {}", c1, k1); + assert!(c1 as u32 == k1); ret.add_event(ts, pulse, Some(decomp), ScalarType::from_dtype_index(type_index)); } else { @@ -877,7 +878,7 @@ fn datapath(timebin: u64, config: &netpod::ChannelConfig, node: &netpod::Node) - .join(config.channel.name.clone()) .join(format!("{:019}", timebin)) .join(format!("{:010}", node.split)) - .join(format!("{:019}_00000_Data", config.time_bin_size)) + .join(format!("{:019}_00000_Data", config.time_bin_size / timeunits::MS)) }