First basic chunking of datafiles into events

This commit is contained in:
Dominik Werder
2021-04-03 22:52:23 +02:00
parent 827161e06e
commit 72d985f8c8
2 changed files with 147 additions and 58 deletions

View File

@@ -10,8 +10,10 @@ tracing = "0.1.25"
serde_json = "1.0"
async-channel = "1.6"
bytes = "1.0.1"
byteorder = "1.4.3"
futures-core = "0.3.12"
futures-util = "0.3.13"
async-stream = "0.3.0"
hex = "0.4.3"
err = { path = "../err" }
netpod = { path = "../netpod" }

View File

@@ -320,117 +320,204 @@ fn open_files(query: &netpod::AggQuerySingleChannel) -> async_channel::Receiver<
}
pub fn parsed1(query: &netpod::AggQuerySingleChannel) -> impl Stream<Item=Result<Bytes, Error>> + Send {
let inp: Pin<Box<dyn Stream<Item=Result<BytesMut, Error>> + Send>> = Box::pin(raw_concat_channel_read_stream_file_pipe(query));
EventChunker::new(inp)
.map(|k| {
// TODO handle error
match k {
Ok(k) => {
let mut buf = BytesMut::with_capacity(16);
buf.put_u64_le(k.ts);
Ok(buf.freeze())
}
Err(e) => Err(e)
pub fn file_content_stream(mut file: tokio::fs::File, buffer_size: usize) -> impl Stream<Item=Result<BytesMut, Error>> + Send {
async_stream::stream! {
use tokio::io::AsyncReadExt;
loop {
let mut buf = BytesMut::with_capacity(buffer_size);
let n1 = file.read_buf(&mut buf).await?;
if n1 == 0 {
info!("file EOF");
break;
}
})
else {
yield Ok(buf);
}
}
}
}
pub fn parsed1(query: &netpod::AggQuerySingleChannel) -> impl Stream<Item=Result<Bytes, Error>> + Send {
let query = query.clone();
async_stream::stream! {
let filerx = open_files(&query);
while let Ok(fileres) = filerx.recv().await {
match fileres {
Ok(file) => {
let inp = Box::pin(file_content_stream(file, query.buffer_size as usize));
let mut chunker = EventChunker::new(inp);
while let Some(evres) = chunker.next().await {
match evres {
Ok(evres) => {
let mut buf = BytesMut::with_capacity(16);
// TODO put some interesting information to test
buf.put_u64_le(0xcafecafe);
yield Ok(buf.freeze())
}
Err(e) => {
yield Err(e)
}
}
}
}
Err(e) => {
yield Err(e);
}
}
}
}
}
pub struct EventChunker {
inp: NeedMinBuffer,
had_channel: bool,
need_min: u32,
polled: u32,
state: DataFileState,
}
enum DataFileState {
BEGIN,
EVENT,
FileHeader,
Event,
}
impl EventChunker {
pub fn new(inp: Pin<Box<dyn Stream<Item=Result<BytesMut, Error>> + Send>>) -> Self {
let mut inp = NeedMinBuffer::new(inp);
inp.set_need_min(4);
inp.set_need_min(6);
Self {
inp: inp,
had_channel: false,
need_min: 4,
polled: 0,
state: DataFileState::BEGIN,
state: DataFileState::FileHeader,
}
}
fn parse_buf(&mut self, buf: &mut BytesMut) -> ParseResult {
todo!()
fn parse_buf(&mut self, buf: &mut BytesMut) -> Result<ParseResult, Error> {
// must communicate to caller:
// what I've found in the buffer
// what I've consumed from the buffer
// how many bytes I need min to make progress
let mut ret = EventFull::dummy();
let mut need_min = 0;
use byteorder::{BE, ReadBytesExt};
//info!("parse_buf rb {}", buf.len());
let mut i1 = 0;
loop {
//info!("parse_buf LOOP {}", i1);
match self.state {
DataFileState::FileHeader => {
assert!(buf.len() >= 6, "logic");
let mut sl = std::io::Cursor::new(buf.as_ref());
let fver = sl.read_i16::<BE>().unwrap();
assert!(fver == 0, "unexpected file version");
let len = sl.read_i32::<BE>().unwrap();
assert!(len > 0 && len < 128, "unexpected data file header");
let totlen = len as usize + 2;
if buf.len() < totlen {
info!("parse_buf not enough A");
need_min = totlen as u32;
break;
}
else {
sl.advance(len as usize - 8);
let len2 = sl.read_i32::<BE>().unwrap();
assert!(len == len2, "len mismatch");
let s1 = String::from_utf8(buf.as_ref()[6..(len as usize + 6 - 8)].to_vec()).unwrap();
info!("channel name {} len {} len2 {}", s1, len, len2);
self.state = DataFileState::Event;
need_min = 4;
buf.advance(totlen);
}
}
DataFileState::Event => {
assert!(buf.len() >= 4, "logic");
let mut sl = std::io::Cursor::new(buf.as_ref());
let len = sl.read_i32::<BE>().unwrap();
//info!("event len {}", len);
if (buf.len() as u32) < len as u32 {
// TODO gather stats about this
//info!("parse_buf not enough B");
need_min = len as u32;
break;
}
else {
let mut sl = std::io::Cursor::new(buf.as_ref());
let len1b = sl.read_i32::<BE>().unwrap();
sl.read_i64::<BE>().unwrap();
let ts = sl.read_i64::<BE>().unwrap();
let pulse = sl.read_i64::<BE>().unwrap();
//info!("len {} len1b {} ts {} pulse {}", len, len1b, ts, pulse);
need_min = 4;
buf.advance(len as usize);
}
}
}
i1 += 1;
}
Ok(ParseResult {
events: ret,
need_min: need_min,
})
}
}
enum ParseResult {
NeedMin(u32),
Ready(EventFull),
struct ParseResult {
events: EventFull,
need_min: u32,
}
impl Stream for EventChunker {
type Item = Result<EventFull, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
let mut again = false;
self.polled += 1;
if self.polled >= 50 {
return Poll::Ready(None);
}
let g = &mut self.inp;
pin_mut!(g);
let z = match g.poll_next(cx) {
Poll::Ready(Some(Ok(mut buf))) => {
info!("EventChunker got buffer len {}", buf.len());
self.polled += 1;
if self.polled >= 20000 {
warn!("EventChunker poll limit reached");
return Poll::Ready(None);
}
let g = &mut self.inp;
pin_mut!(g);
match g.poll_next(cx) {
Poll::Ready(Some(Ok(mut buf))) => {
//info!("EventChunker got buffer len {}", buf.len());
match self.parse_buf(&mut buf) {
Ok(res) => {
if buf.len() > 0 {
// TODO put parsing code here in a separate function.
// Need to parse in a loop until no more progress can be made in that buffer.
// Test with small buffer sizes.
// Return type of this EventChunker must be able to hold many events for batching.
// TODO need a loop within this function, if parsing needs more input than it thought,
// I need to update upstream and try again.
// BUT: do not loop if at least one event can be emitted! Otherwise it accumulates indefinitely!
match self.parse_buf(&mut buf) {
ParseResult::NeedMin(need_min) => {
// TODO gather stats about this:
//info!("parse_buf returned {} leftover bytes to me", buf.len());
self.inp.put_back(buf);
self.inp.set_need_min(need_min);
again = true;
Poll::Ready(None)
}
ParseResult::Ready(ev) => Poll::Ready(Some(Ok(ev)))
self.inp.set_need_min(res.need_min);
Poll::Ready(Some(Ok(res.events)))
}
Err(e) => Poll::Ready(Some(Err(e.into())))
}
Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))),
Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
};
if !again {
break z;
}
Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))),
Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
}
}
}
pub struct EventFull {
ts: u64,
pulse: u64,
// TODO add structures to hold list of events
}
impl EventFull {
pub fn dummy() -> Self {
Self {
ts: 0,
pulse: 0,
}
}