Uncompress and merge 24 hours of 100 Hz small waveform from 13 files in 8 seconds

This commit is contained in:
Dominik Werder
2021-04-11 19:35:08 +02:00
parent 61000f2336
commit 8a1b094390
4 changed files with 46 additions and 39 deletions

View File

@@ -807,7 +807,7 @@ async fn agg_x_dim_0_inner() {
let bin_count = 20;
let ts1 = query.timebin as u64 * query.channel_config.time_bin_size;
let ts2 = ts1 + HOUR * 24;
let fut1 = crate::EventBlobsComplete::new(&query, &node)
let fut1 = crate::EventBlobsComplete::new(&query, query.channel_config.clone(), &node)
.into_dim_1_f32_stream()
//.take(1000)
.map(|q| {
@@ -861,7 +861,7 @@ async fn agg_x_dim_1_inner() {
let bin_count = 10;
let ts1 = query.timebin as u64 * query.channel_config.time_bin_size;
let ts2 = ts1 + HOUR * 24;
let fut1 = crate::EventBlobsComplete::new(&query, &node)
let fut1 = crate::EventBlobsComplete::new(&query, query.channel_config.clone(), &node)
.into_dim_1_f32_stream()
//.take(1000)
.map(|q| {
@@ -890,10 +890,6 @@ fn merge_0() {
}
async fn merge_0_inner() {
let nodes = vec![
make_test_node(0),
make_test_node(1),
];
let query = netpod::AggQuerySingleChannel {
channel_config: ChannelConfig {
channel: Channel {
@@ -902,17 +898,21 @@ async fn merge_0_inner() {
name: "wave1".into(),
},
time_bin_size: DAY,
shape: Shape::Wave(1024),
shape: Shape::Wave(17),
scalar_type: ScalarType::F64,
big_endian: true,
compression: true,
},
timebin: 0,
tb_file_count: 1,
buffer_size: 17,
buffer_size: 1024 * 8,
};
let streams: Vec<_> = nodes.into_iter().map(|node| {
crate::EventBlobsComplete::new(&query, &node)
let streams = (0..13).into_iter()
.map(|k| {
make_test_node(k)
})
.map(|node| {
crate::EventBlobsComplete::new(&query, query.channel_config.clone(), &node)
.into_dim_1_f32_stream()
})
.collect();
@@ -920,7 +920,7 @@ async fn merge_0_inner() {
.map(|k| {
//info!("NEXT MERGED ITEM ts {:?}", k.as_ref().unwrap().tss);
})
.fold(0, |k, q| async { 0 })
.fold(0, |k, q| ready(0))
.await;
}

View File

@@ -41,30 +41,24 @@ pub async fn gen_test_data() -> Result<(), Error> {
},
time_bin_size: DAY,
scalar_type: ScalarType::F64,
shape: Shape::Wave(9),
shape: Shape::Wave(17),
big_endian: true,
compression: true,
},
time_spacing: SEC * 1,
time_spacing: MS * 10,
};
ensemble.channels.push(chn);
}
let node0 = Node {
host: "localhost".into(),
port: 7780,
split: 0,
data_base_path: data_base_path.join("node00"),
ksprefix: ksprefix.clone(),
};
let node1 = Node {
host: "localhost".into(),
port: 7781,
split: 1,
data_base_path: data_base_path.join("node01"),
ksprefix: ksprefix.clone(),
};
ensemble.nodes.push(node0);
ensemble.nodes.push(node1);
for i1 in 0..13 {
let node = Node {
host: "localhost".into(),
port: 7780 + i1,
split: i1 as u8,
data_base_path: data_base_path.join(format!("node{:02}", i1)),
ksprefix: ksprefix.clone(),
};
ensemble.nodes.push(node);
}
for node in &ensemble.nodes {
gen_node(node, &ensemble).await?;
}
@@ -173,7 +167,7 @@ async fn gen_event(file: &mut File, evix: u64, ts: u64, config: &ChannelConfig)
match &config.scalar_type {
ScalarType::F64 => {
let ele_size = 8;
let mut vals = vec![0; ele_size * ele_count];
let mut vals = vec![0; (ele_size * ele_count) as usize];
for i1 in 0..ele_count {
let v = evix as f64;
let a = v.to_be_bytes();
@@ -182,8 +176,8 @@ async fn gen_event(file: &mut File, evix: u64, ts: u64, config: &ChannelConfig)
c1.seek(SeekFrom::Start(i1 as u64 * ele_size as u64))?;
std::io::Write::write_all(&mut c1, &a)?;
}
let mut comp = vec![0u8; ele_size * ele_count + 64];
let n1 = bitshuffle_compress(&vals, &mut comp, ele_count, ele_size, 0).unwrap();
let mut comp = vec![0u8; (ele_size * ele_count + 64) as usize];
let n1 = bitshuffle_compress(&vals, &mut comp, ele_count as usize, ele_size as usize, 0).unwrap();
buf.put_u64(vals.len() as u64);
let comp_block_size = 0;
buf.put_u32(comp_block_size);

View File

@@ -16,7 +16,7 @@ use tokio::fs::{OpenOptions, File};
use bytes::{Bytes, BytesMut, Buf};
use std::path::PathBuf;
use bitshuffle::bitshuffle_decompress;
use netpod::{ScalarType, Node};
use netpod::{ScalarType, Shape, Node, ChannelConfig};
pub async fn read_test_1(query: &netpod::AggQuerySingleChannel, node: &Node) -> Result<netpod::BodyStream, Error> {
@@ -354,7 +354,7 @@ pub fn parsed1(query: &netpod::AggQuerySingleChannel, node: &netpod::Node) -> im
match fileres {
Ok(file) => {
let inp = Box::pin(file_content_stream(file, query.buffer_size as usize));
let mut chunker = EventChunker::new(inp);
let mut chunker = EventChunker::new(inp, todo!());
while let Some(evres) = chunker.next().await {
match evres {
Ok(evres) => {
@@ -384,17 +384,19 @@ pub fn parsed1(query: &netpod::AggQuerySingleChannel, node: &netpod::Node) -> im
pub struct EventBlobsComplete {
channel_config: ChannelConfig,
file_chan: async_channel::Receiver<Result<File, Error>>,
evs: Option<EventChunker>,
buffer_size: u32,
}
impl EventBlobsComplete {
pub fn new(query: &netpod::AggQuerySingleChannel, node: &netpod::Node) -> Self {
pub fn new(query: &netpod::AggQuerySingleChannel, channel_config: ChannelConfig, node: &netpod::Node) -> Self {
Self {
file_chan: open_files(query, node),
evs: None,
buffer_size: query.buffer_size,
channel_config,
}
}
}
@@ -424,7 +426,7 @@ impl Stream for EventBlobsComplete {
match k {
Ok(file) => {
let inp = Box::pin(file_content_stream(file, self.buffer_size as usize));
let mut chunker = EventChunker::new(inp);
let mut chunker = EventChunker::new(inp, self.channel_config.clone());
self.evs.replace(chunker);
continue 'outer;
}
@@ -452,7 +454,7 @@ pub fn event_blobs_complete(query: &netpod::AggQuerySingleChannel, node: &netpod
match fileres {
Ok(file) => {
let inp = Box::pin(file_content_stream(file, query.buffer_size as usize));
let mut chunker = EventChunker::new(inp);
let mut chunker = EventChunker::new(inp, todo!());
while let Some(evres) = chunker.next().await {
match evres {
Ok(evres) => {
@@ -479,6 +481,7 @@ pub struct EventChunker {
polled: u32,
state: DataFileState,
tmpbuf: Vec<u8>,
channel_config: ChannelConfig,
}
enum DataFileState {
@@ -488,7 +491,7 @@ enum DataFileState {
impl EventChunker {
pub fn new(inp: Pin<Box<dyn Stream<Item=Result<BytesMut, Error>> + Send>>) -> Self {
pub fn new(inp: Pin<Box<dyn Stream<Item=Result<BytesMut, Error>> + Send>>, channel_config: ChannelConfig) -> Self {
let mut inp = NeedMinBuffer::new(inp);
inp.set_need_min(6);
Self {
@@ -497,6 +500,7 @@ impl EventChunker {
polled: 0,
state: DataFileState::FileHeader,
tmpbuf: vec![0; 1024 * 1024 * 4],
channel_config,
}
}
@@ -582,6 +586,9 @@ impl EventChunker {
let is_array = type_flags & ARRAY != 0;
let is_big_endian = type_flags & BIG_ENDIAN != 0;
let is_shaped = type_flags & SHAPE != 0;
if let Shape::Wave(_) = self.channel_config.shape {
assert!(is_array);
}
let compression_method = if is_compressed {
sl.read_u8().unwrap()
}
@@ -613,6 +620,12 @@ impl EventChunker {
let type_size = type_size(type_index);
let ele_count = value_bytes / type_size as u64;
let ele_size = type_size;
match self.channel_config.shape {
Shape::Wave(ele2) => {
assert!(ele2 == ele_count as u32);
}
_ => panic!(),
}
let decomp_bytes = (type_size * ele_count as u32) as usize;
let mut decomp = BytesMut::with_capacity(decomp_bytes);
unsafe {

View File

@@ -134,7 +134,7 @@ pub struct ChannelConfig {
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum Shape {
Scalar,
Wave(usize),
Wave(u32),
}
pub mod timeunits {