Decompress waveform channel

This commit is contained in:
Dominik Werder
2021-04-05 21:16:34 +02:00
parent 72d985f8c8
commit d374540496
17 changed files with 4612 additions and 20 deletions

View File

@@ -11,6 +11,7 @@ use futures_util::future::FusedFuture;
use futures_util::{pin_mut, StreamExt};
use bytes::{Bytes, BytesMut, BufMut, Buf};
use std::path::PathBuf;
use bitshuffle::bitshuffle_decompress;
pub async fn read_test_1(query: &netpod::AggQuerySingleChannel) -> Result<netpod::BodyStream, Error> {
@@ -293,13 +294,14 @@ fn open_files(query: &netpod::AggQuerySingleChannel) -> async_channel::Receiver<
let mut query = query.clone();
tokio::spawn(async move {
let tb0 = query.timebin;
for i1 in 0..16 {
for i1 in 0..query.tb_file_count {
query.timebin = tb0 + i1;
let path = datapath(&query);
let fileres = tokio::fs::OpenOptions::new()
.read(true)
.open(path)
.open(&path)
.await;
info!("opened file {:?} {:?}", &path, &fileres);
match fileres {
Ok(k) => {
match chtx.send(Ok(k)).await {
@@ -377,6 +379,7 @@ pub struct EventChunker {
had_channel: bool,
polled: u32,
state: DataFileState,
tmpbuf: Vec<u8>,
}
enum DataFileState {
@@ -394,6 +397,7 @@ impl EventChunker {
had_channel: false,
polled: 0,
state: DataFileState::FileHeader,
tmpbuf: vec![0; 1024 * 1024 * 4],
}
}
@@ -403,12 +407,15 @@ impl EventChunker {
// what I've consumed from the buffer
// how many bytes I need min to make progress
let mut ret = EventFull::dummy();
let mut need_min = 0;
let mut need_min = 0 as u32;
use byteorder::{BE, ReadBytesExt};
//info!("parse_buf rb {}", buf.len());
let mut i1 = 0;
//let mut i1 = 0;
loop {
//info!("parse_buf LOOP {}", i1);
if (buf.len() as u32) < need_min {
break;
}
match self.state {
DataFileState::FileHeader => {
assert!(buf.len() >= 6, "logic");
@@ -419,7 +426,7 @@ impl EventChunker {
assert!(len > 0 && len < 128, "unexpected data file header");
let totlen = len as usize + 2;
if buf.len() < totlen {
info!("parse_buf not enough A");
info!("parse_buf not enough A totlen {}", totlen);
need_min = totlen as u32;
break;
}
@@ -435,29 +442,88 @@ impl EventChunker {
}
}
DataFileState::Event => {
assert!(buf.len() >= 4, "logic");
let mut sl = std::io::Cursor::new(buf.as_ref());
let len = sl.read_i32::<BE>().unwrap();
//info!("event len {}", len);
if (buf.len() as u32) < len as u32 {
// TODO gather stats about this
if (buf.len() as u32) < 20 {
// TODO gather stats about how often we find not enough input
//info!("parse_buf not enough B");
need_min = len as u32;
break;
}
else if (buf.len() as u32) < len as u32 {
// TODO this is just for testing
let mut sl = std::io::Cursor::new(buf.as_ref());
sl.read_i32::<BE>().unwrap();
sl.read_i64::<BE>().unwrap();
let ts = sl.read_i64::<BE>().unwrap();
//info!("parse_buf not enough C len {} have {} ts {}", len, buf.len(), ts);
need_min = len as u32;
break;
}
else {
let mut sl = std::io::Cursor::new(buf.as_ref());
let len1b = sl.read_i32::<BE>().unwrap();
assert!(len == len1b);
sl.read_i64::<BE>().unwrap();
let ts = sl.read_i64::<BE>().unwrap();
let pulse = sl.read_i64::<BE>().unwrap();
//info!("len {} len1b {} ts {} pulse {}", len, len1b, ts, pulse);
need_min = 4;
sl.read_i64::<BE>().unwrap();
let _status = sl.read_i8().unwrap();
let _severity = sl.read_i8().unwrap();
let _optional = sl.read_i32::<BE>().unwrap();
assert!(_status == 0);
assert!(_severity == 0);
assert!(_optional == -1);
let type_flags = sl.read_u8().unwrap();
let type_index = sl.read_u8().unwrap();
assert!(type_index <= 13);
let is_compressed = type_flags & 0x80 != 0;
let is_array = type_flags & 0x40 != 0;
let is_big_endian = type_flags & 0x20 != 0;
let is_shaped = type_flags & 0x10 != 0;
let compression_method = if is_compressed {
sl.read_u8().unwrap()
}
else {
0
};
let shape_dim = if is_shaped {
sl.read_u8().unwrap()
}
else {
0
};
assert!(compression_method <= 0);
assert!(!is_shaped || (shape_dim >= 1 && shape_dim <= 2));
let mut shape_lens = [0, 0, 0, 0];
for i1 in 0..shape_dim {
shape_lens[i1 as usize] = sl.read_u8().unwrap();
}
if true && is_compressed {
//info!("event ts {} is_compressed {}", ts, is_compressed);
let value_bytes = sl.read_u64::<BE>().unwrap();
let block_size = sl.read_u32::<BE>().unwrap();
let p1 = sl.position() as u32;
let k1 = len as u32 - p1 - 4;
assert!(value_bytes < 1024 * 256);
assert!(block_size == 1024 * 8);
let value_bytes = value_bytes;
let inp = [0; 16];
let type_size = type_size(type_index);
let ele_count = value_bytes / type_size as u64;
let ele_size = type_size;
//info!("try decompress value_bytes {} ele_size {} ele_count {} type_index {}", value_bytes, ele_size, ele_count, type_index);
let c1 = bitshuffle_decompress(&buf.as_ref()[p1 as usize..], &mut self.tmpbuf, ele_count as usize, ele_size as usize, 0);
//info!("decompress result: {:?}", c1);
assert!(c1.unwrap() as u32 == k1);
}
buf.advance(len as usize);
need_min = 4;
}
}
}
i1 += 1;
//i1 += 1;
}
Ok(ParseResult {
events: ret,
@@ -467,6 +533,26 @@ impl EventChunker {
}
fn type_size(ix: u8) -> u32 {
match ix {
0 => 1,
1 => 1,
2 => 1,
3 => 1,
4 => 2,
5 => 2,
6 => 2,
7 => 4,
8 => 4,
9 => 8,
10 => 8,
11 => 4,
12 => 8,
13 => 1,
_ => panic!("logic")
}
}
struct ParseResult {
events: EventFull,
need_min: u32,
@@ -477,7 +563,7 @@ impl Stream for EventChunker {
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.polled += 1;
if self.polled >= 20000 {
if self.polled >= 2000000 {
warn!("EventChunker poll limit reached");
return Poll::Ready(None);
}
@@ -493,6 +579,11 @@ impl Stream for EventChunker {
// TODO gather stats about this:
//info!("parse_buf returned {} leftover bytes to me", buf.len());
self.inp.put_back(buf);
}
if res.need_min > 8000 {
warn!("spurious EventChunker asks for need_min {}", res.need_min);
panic!();
}
self.inp.set_need_min(res.need_min);
Poll::Ready(Some(Ok(res.events)))
@@ -544,6 +635,7 @@ impl NeedMinBuffer {
pub fn put_back(&mut self, buf: BytesMut) {
assert!(self.left.is_none());
self.left = Some(buf);
}
pub fn set_need_min(&mut self, need_min: u32) {
@@ -562,14 +654,17 @@ impl Stream for NeedMinBuffer {
pin_mut!(g);
let z = match g.poll_next(cx) {
Poll::Ready(Some(Ok(buf))) => {
//info!("NeedMin got buf len {}", buf.len());
match self.left.take() {
Some(mut left) => {
left.unsplit(buf);
let buf = left;
if buf.len() as u32 >= self.need_min {
//info!("with left ready len {} need_min {}", buf.len(), self.need_min);
Poll::Ready(Some(Ok(buf)))
}
else {
//info!("with left not enough len {} need_min {}", buf.len(), self.need_min);
self.left.replace(buf);
again = true;
Poll::Pending
@@ -577,9 +672,11 @@ impl Stream for NeedMinBuffer {
}
None => {
if buf.len() as u32 >= self.need_min {
//info!("simply ready len {} need_min {}", buf.len(), self.need_min);
Poll::Ready(Some(Ok(buf)))
}
else {
//info!("no previous leftover, need more len {} need_min {}", buf.len(), self.need_min);
self.left.replace(buf);
again = true;
Poll::Pending