This commit is contained in:
Dominik Werder
2021-04-06 22:49:27 +02:00
parent d374540496
commit debf06bc97
4 changed files with 191 additions and 17 deletions

View File

@@ -1,3 +1,5 @@
pub mod agg;
#[allow(unused_imports)]
use tracing::{error, warn, info, debug, trace};
use err::Error;
@@ -12,6 +14,7 @@ use futures_util::{pin_mut, StreamExt};
use bytes::{Bytes, BytesMut, BufMut, Buf};
use std::path::PathBuf;
use bitshuffle::bitshuffle_decompress;
use async_channel::bounded;
pub async fn read_test_1(query: &netpod::AggQuerySingleChannel) -> Result<netpod::BodyStream, Error> {
@@ -352,12 +355,15 @@ pub fn parsed1(query: &netpod::AggQuerySingleChannel) -> impl Stream<Item=Result
while let Some(evres) = chunker.next().await {
match evres {
Ok(evres) => {
let mut buf = BytesMut::with_capacity(16);
//let mut buf = BytesMut::with_capacity(16);
// TODO put some interesting information to test
buf.put_u64_le(0xcafecafe);
yield Ok(buf.freeze())
//buf.put_u64_le(0xcafecafe);
//yield Ok(buf.freeze())
for bufopt in evres.decomps {
if let Some(buf) = bufopt {
yield Ok(buf.freeze());
}
}
}
Err(e) => {
yield Err(e)
@@ -406,7 +412,7 @@ impl EventChunker {
// what I've found in the buffer
// what I've consumed from the buffer
// how many bytes I need min to make progress
let mut ret = EventFull::dummy();
let mut ret = EventFull::empty();
let mut need_min = 0 as u32;
use byteorder::{BE, ReadBytesExt};
//info!("parse_buf rb {}", buf.len());
@@ -466,8 +472,8 @@ impl EventChunker {
let len1b = sl.read_i32::<BE>().unwrap();
assert!(len == len1b);
sl.read_i64::<BE>().unwrap();
let ts = sl.read_i64::<BE>().unwrap();
let pulse = sl.read_i64::<BE>().unwrap();
let ts = sl.read_i64::<BE>().unwrap() as u64;
let pulse = sl.read_i64::<BE>().unwrap() as u64;
sl.read_i64::<BE>().unwrap();
let _status = sl.read_i8().unwrap();
let _severity = sl.read_i8().unwrap();
@@ -508,15 +514,20 @@ impl EventChunker {
let k1 = len as u32 - p1 - 4;
assert!(value_bytes < 1024 * 256);
assert!(block_size == 1024 * 8);
let value_bytes = value_bytes;
let inp = [0; 16];
//let value_bytes = value_bytes;
let type_size = type_size(type_index);
let ele_count = value_bytes / type_size as u64;
let ele_size = type_size;
let decomp_bytes = (type_size * ele_count as u32) as usize;
let mut decomp = BytesMut::with_capacity(decomp_bytes);
unsafe {
decomp.set_len(decomp_bytes);
}
//info!("try decompress value_bytes {} ele_size {} ele_count {} type_index {}", value_bytes, ele_size, ele_count, type_index);
let c1 = bitshuffle_decompress(&buf.as_ref()[p1 as usize..], &mut self.tmpbuf, ele_count as usize, ele_size as usize, 0);
let c1 = bitshuffle_decompress(&buf.as_ref()[p1 as usize..], &mut decomp, ele_count as usize, ele_size as usize, 0);
//info!("decompress result: {:?}", c1);
assert!(c1.unwrap() as u32 == k1);
ret.add_event(ts, pulse, Some(decomp));
}
buf.advance(len as usize);
need_min = 4;
@@ -600,18 +611,27 @@ impl Stream for EventChunker {
}
pub struct EventFull {
// TODO add structures to hold list of events
tss: Vec<u64>,
pulses: Vec<u64>,
decomps: Vec<Option<BytesMut>>,
}
impl EventFull {
pub fn dummy() -> Self {
pub fn empty() -> Self {
Self {
tss: vec![],
pulses: vec![],
decomps: vec![],
}
}
fn add_event(&mut self, ts: u64, pulse: u64, decomp: Option<BytesMut>) {
self.tss.push(ts);
self.pulses.push(pulse);
self.decomps.push(decomp);
}
}