Getting closer
This commit is contained in:
@@ -222,7 +222,7 @@ impl AggregatorTdim for MinMaxAvgScalarEventBatchAggregator {
|
|||||||
Some(ts) => {
|
Some(ts) => {
|
||||||
*ts < self.ts1
|
*ts < self.ts1
|
||||||
}
|
}
|
||||||
_ => panic!()
|
None => true,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -788,7 +788,7 @@ async fn agg_x_dim_0_inner() {
|
|||||||
buffer_size: 1024 * 4,
|
buffer_size: 1024 * 4,
|
||||||
};
|
};
|
||||||
let bin_count = 20;
|
let bin_count = 20;
|
||||||
let ts1 = query.timebin as u64 * query.channel_config.time_bin_size * MS;
|
let ts1 = query.timebin as u64 * query.channel_config.time_bin_size;
|
||||||
let ts2 = ts1 + HOUR * 24;
|
let ts2 = ts1 + HOUR * 24;
|
||||||
let fut1 = crate::EventBlobsComplete::new(&query, &node)
|
let fut1 = crate::EventBlobsComplete::new(&query, &node)
|
||||||
.into_dim_1_f32_stream()
|
.into_dim_1_f32_stream()
|
||||||
@@ -822,6 +822,7 @@ fn agg_x_dim_1() {
|
|||||||
async fn agg_x_dim_1_inner() {
|
async fn agg_x_dim_1_inner() {
|
||||||
// sf-databuffer
|
// sf-databuffer
|
||||||
// /data/sf-databuffer/daq_swissfel/daq_swissfel_3/byTime/S10BC01-DBAM070\:BAM_CH1_NORM/*
|
// /data/sf-databuffer/daq_swissfel/daq_swissfel_3/byTime/S10BC01-DBAM070\:BAM_CH1_NORM/*
|
||||||
|
// S10BC01-DBAM070:BAM_CH1_NORM
|
||||||
let node = Node {
|
let node = Node {
|
||||||
host: "localhost".into(),
|
host: "localhost".into(),
|
||||||
port: 8888,
|
port: 8888,
|
||||||
@@ -834,7 +835,7 @@ async fn agg_x_dim_1_inner() {
|
|||||||
channel: Channel {
|
channel: Channel {
|
||||||
backend: "ks".into(),
|
backend: "ks".into(),
|
||||||
keyspace: 3,
|
keyspace: 3,
|
||||||
name: "S10BC01-DBAM070:BAM_CH1_NORM".into(),
|
name: "wave1".into(),
|
||||||
},
|
},
|
||||||
time_bin_size: DAY,
|
time_bin_size: DAY,
|
||||||
shape: Shape::Wave(1024),
|
shape: Shape::Wave(1024),
|
||||||
@@ -842,12 +843,12 @@ async fn agg_x_dim_1_inner() {
|
|||||||
big_endian: true,
|
big_endian: true,
|
||||||
compression: true,
|
compression: true,
|
||||||
},
|
},
|
||||||
timebin: 18722,
|
timebin: 0,
|
||||||
tb_file_count: 1,
|
tb_file_count: 1,
|
||||||
buffer_size: 1024 * 4,
|
buffer_size: 17,
|
||||||
};
|
};
|
||||||
let bin_count = 100;
|
let bin_count = 10;
|
||||||
let ts1 = query.timebin as u64 * query.channel_config.time_bin_size * MS;
|
let ts1 = query.timebin as u64 * query.channel_config.time_bin_size;
|
||||||
let ts2 = ts1 + HOUR * 24;
|
let ts2 = ts1 + HOUR * 24;
|
||||||
let fut1 = crate::EventBlobsComplete::new(&query, &node)
|
let fut1 = crate::EventBlobsComplete::new(&query, &node)
|
||||||
.into_dim_1_f32_stream()
|
.into_dim_1_f32_stream()
|
||||||
|
|||||||
@@ -175,7 +175,7 @@ async fn gen_event(file: &mut File, ts: u64, config: &ChannelConfig) -> Result<(
|
|||||||
buf.put_u64(vals.len() as u64);
|
buf.put_u64(vals.len() as u64);
|
||||||
let comp_block_size = 0;
|
let comp_block_size = 0;
|
||||||
buf.put_u32(comp_block_size);
|
buf.put_u32(comp_block_size);
|
||||||
buf.put(comp.as_slice());
|
buf.put(&comp[..n1]);
|
||||||
}
|
}
|
||||||
_ => todo!()
|
_ => todo!()
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -597,16 +597,17 @@ impl EventChunker {
|
|||||||
assert!(!is_shaped || (shape_dim >= 1 && shape_dim <= 2));
|
assert!(!is_shaped || (shape_dim >= 1 && shape_dim <= 2));
|
||||||
let mut shape_lens = [0, 0, 0, 0];
|
let mut shape_lens = [0, 0, 0, 0];
|
||||||
for i1 in 0..shape_dim {
|
for i1 in 0..shape_dim {
|
||||||
shape_lens[i1 as usize] = sl.read_u8().unwrap();
|
shape_lens[i1 as usize] = sl.read_u32::<BE>().unwrap();
|
||||||
}
|
}
|
||||||
if is_compressed {
|
if is_compressed {
|
||||||
//info!("event ts {} is_compressed {}", ts, is_compressed);
|
//debug!("event ts {} is_compressed {}", ts, is_compressed);
|
||||||
let value_bytes = sl.read_u64::<BE>().unwrap();
|
let value_bytes = sl.read_u64::<BE>().unwrap();
|
||||||
let block_size = sl.read_u32::<BE>().unwrap();
|
let block_size = sl.read_u32::<BE>().unwrap();
|
||||||
let p1 = sl.position() as u32;
|
let p1 = sl.position() as u32;
|
||||||
let k1 = len as u32 - p1 - 4;
|
let k1 = len as u32 - p1 - 4;
|
||||||
|
debug!("event len {} ts {} is_compressed {} shape_dim {} len-dim-0 {} value_bytes {} block_size {}", len, ts, is_compressed, shape_dim, shape_lens[0], value_bytes, block_size);
|
||||||
assert!(value_bytes < 1024 * 256);
|
assert!(value_bytes < 1024 * 256);
|
||||||
assert!(block_size == 1024 * 8);
|
assert!(block_size < 1024 * 32);
|
||||||
//let value_bytes = value_bytes;
|
//let value_bytes = value_bytes;
|
||||||
let type_size = type_size(type_index);
|
let type_size = type_size(type_index);
|
||||||
let ele_count = value_bytes / type_size as u64;
|
let ele_count = value_bytes / type_size as u64;
|
||||||
@@ -616,10 +617,10 @@ impl EventChunker {
|
|||||||
unsafe {
|
unsafe {
|
||||||
decomp.set_len(decomp_bytes);
|
decomp.set_len(decomp_bytes);
|
||||||
}
|
}
|
||||||
//info!("try decompress value_bytes {} ele_size {} ele_count {} type_index {}", value_bytes, ele_size, ele_count, type_index);
|
debug!("try decompress value_bytes {} ele_size {} ele_count {} type_index {}", value_bytes, ele_size, ele_count, type_index);
|
||||||
let c1 = bitshuffle_decompress(&buf.as_ref()[p1 as usize..], &mut decomp, ele_count as usize, ele_size as usize, 0);
|
let c1 = bitshuffle_decompress(&buf.as_ref()[p1 as usize..], &mut decomp, ele_count as usize, ele_size as usize, 0).unwrap();
|
||||||
//info!("decompress result: {:?}", c1);
|
debug!("decompress result c1 {} k1 {}", c1, k1);
|
||||||
assert!(c1.unwrap() as u32 == k1);
|
assert!(c1 as u32 == k1);
|
||||||
ret.add_event(ts, pulse, Some(decomp), ScalarType::from_dtype_index(type_index));
|
ret.add_event(ts, pulse, Some(decomp), ScalarType::from_dtype_index(type_index));
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
@@ -877,7 +878,7 @@ fn datapath(timebin: u64, config: &netpod::ChannelConfig, node: &netpod::Node) -
|
|||||||
.join(config.channel.name.clone())
|
.join(config.channel.name.clone())
|
||||||
.join(format!("{:019}", timebin))
|
.join(format!("{:019}", timebin))
|
||||||
.join(format!("{:010}", node.split))
|
.join(format!("{:010}", node.split))
|
||||||
.join(format!("{:019}_00000_Data", config.time_bin_size))
|
.join(format!("{:019}_00000_Data", config.time_bin_size / timeunits::MS))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user