From 72d985f8c8a3049b627cce5d89519394346ae908 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Sat, 3 Apr 2021 22:52:23 +0200 Subject: [PATCH] First basic chunking of datafiles into events --- disk/Cargo.toml | 2 + disk/src/lib.rs | 203 ++++++++++++++++++++++++++++++++++-------------- 2 files changed, 147 insertions(+), 58 deletions(-) diff --git a/disk/Cargo.toml b/disk/Cargo.toml index 058b2e8..48f1dc3 100644 --- a/disk/Cargo.toml +++ b/disk/Cargo.toml @@ -10,8 +10,10 @@ tracing = "0.1.25" serde_json = "1.0" async-channel = "1.6" bytes = "1.0.1" +byteorder = "1.4.3" futures-core = "0.3.12" futures-util = "0.3.13" async-stream = "0.3.0" +hex = "0.4.3" err = { path = "../err" } netpod = { path = "../netpod" } diff --git a/disk/src/lib.rs b/disk/src/lib.rs index 9d0733e..db09563 100644 --- a/disk/src/lib.rs +++ b/disk/src/lib.rs @@ -320,117 +320,204 @@ fn open_files(query: &netpod::AggQuerySingleChannel) -> async_channel::Receiver< } -pub fn parsed1(query: &netpod::AggQuerySingleChannel) -> impl Stream> + Send { - let inp: Pin> + Send>> = Box::pin(raw_concat_channel_read_stream_file_pipe(query)); - EventChunker::new(inp) - .map(|k| { - // TODO handle error - match k { - Ok(k) => { - let mut buf = BytesMut::with_capacity(16); - buf.put_u64_le(k.ts); - Ok(buf.freeze()) - } - Err(e) => Err(e) +pub fn file_content_stream(mut file: tokio::fs::File, buffer_size: usize) -> impl Stream> + Send { + async_stream::stream! { + use tokio::io::AsyncReadExt; + loop { + let mut buf = BytesMut::with_capacity(buffer_size); + let n1 = file.read_buf(&mut buf).await?; + if n1 == 0 { + info!("file EOF"); + break; } - }) + else { + yield Ok(buf); + } + } + } +} + + +pub fn parsed1(query: &netpod::AggQuerySingleChannel) -> impl Stream> + Send { + let query = query.clone(); + async_stream::stream! { + let filerx = open_files(&query); + while let Ok(fileres) = filerx.recv().await { + match fileres { + Ok(file) => { + let inp = Box::pin(file_content_stream(file, query.buffer_size as usize)); + let mut chunker = EventChunker::new(inp); + while let Some(evres) = chunker.next().await { + match evres { + Ok(evres) => { + let mut buf = BytesMut::with_capacity(16); + + // TODO put some interesting information to test + buf.put_u64_le(0xcafecafe); + + yield Ok(buf.freeze()) + } + Err(e) => { + yield Err(e) + } + } + } + } + Err(e) => { + yield Err(e); + } + } + } + } } pub struct EventChunker { inp: NeedMinBuffer, had_channel: bool, - need_min: u32, polled: u32, state: DataFileState, } enum DataFileState { - BEGIN, - EVENT, + FileHeader, + Event, } impl EventChunker { pub fn new(inp: Pin> + Send>>) -> Self { let mut inp = NeedMinBuffer::new(inp); - inp.set_need_min(4); + inp.set_need_min(6); Self { inp: inp, had_channel: false, - need_min: 4, polled: 0, - state: DataFileState::BEGIN, + state: DataFileState::FileHeader, } } - fn parse_buf(&mut self, buf: &mut BytesMut) -> ParseResult { - todo!() + fn parse_buf(&mut self, buf: &mut BytesMut) -> Result { + // must communicate to caller: + // what I've found in the buffer + // what I've consumed from the buffer + // how many bytes I need min to make progress + let mut ret = EventFull::dummy(); + let mut need_min = 0; + use byteorder::{BE, ReadBytesExt}; + //info!("parse_buf rb {}", buf.len()); + let mut i1 = 0; + loop { + //info!("parse_buf LOOP {}", i1); + match self.state { + DataFileState::FileHeader => { + assert!(buf.len() >= 6, "logic"); + let mut sl = std::io::Cursor::new(buf.as_ref()); + let fver = sl.read_i16::().unwrap(); + assert!(fver == 0, "unexpected file version"); + let len = sl.read_i32::().unwrap(); + assert!(len > 0 && len < 128, "unexpected data file header"); + let totlen = len as usize + 2; + if buf.len() < totlen { + info!("parse_buf not enough A"); + need_min = totlen as u32; + break; + } + else { + sl.advance(len as usize - 8); + let len2 = sl.read_i32::().unwrap(); + assert!(len == len2, "len mismatch"); + let s1 = String::from_utf8(buf.as_ref()[6..(len as usize + 6 - 8)].to_vec()).unwrap(); + info!("channel name {} len {} len2 {}", s1, len, len2); + self.state = DataFileState::Event; + need_min = 4; + buf.advance(totlen); + } + } + DataFileState::Event => { + assert!(buf.len() >= 4, "logic"); + let mut sl = std::io::Cursor::new(buf.as_ref()); + let len = sl.read_i32::().unwrap(); + //info!("event len {}", len); + if (buf.len() as u32) < len as u32 { + // TODO gather stats about this + //info!("parse_buf not enough B"); + need_min = len as u32; + break; + } + else { + let mut sl = std::io::Cursor::new(buf.as_ref()); + let len1b = sl.read_i32::().unwrap(); + sl.read_i64::().unwrap(); + let ts = sl.read_i64::().unwrap(); + let pulse = sl.read_i64::().unwrap(); + //info!("len {} len1b {} ts {} pulse {}", len, len1b, ts, pulse); + need_min = 4; + buf.advance(len as usize); + } + } + } + i1 += 1; + } + Ok(ParseResult { + events: ret, + need_min: need_min, + }) } } -enum ParseResult { - NeedMin(u32), - Ready(EventFull), +struct ParseResult { + events: EventFull, + need_min: u32, } impl Stream for EventChunker { type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - loop { - let mut again = false; - self.polled += 1; - if self.polled >= 50 { - return Poll::Ready(None); - } - let g = &mut self.inp; - pin_mut!(g); - let z = match g.poll_next(cx) { - Poll::Ready(Some(Ok(mut buf))) => { - info!("EventChunker got buffer len {}", buf.len()); + self.polled += 1; + if self.polled >= 20000 { + warn!("EventChunker poll limit reached"); + return Poll::Ready(None); + } + let g = &mut self.inp; + pin_mut!(g); + match g.poll_next(cx) { + Poll::Ready(Some(Ok(mut buf))) => { + //info!("EventChunker got buffer len {}", buf.len()); + match self.parse_buf(&mut buf) { + Ok(res) => { + if buf.len() > 0 { - // TODO put parsing code here in a separate function. - // Need to parse in a loop until no more progress can be made in that buffer. - // Test with small buffer sizes. - // Return type of this EventChunker must be able to hold many events for batching. - // TODO need a loop within this function, if parsing needs more input than it thought, - // I need to update upstream and try again. - // BUT: do not loop if at least one event can be emitted! Otherwise it accumulates indefinitely! - match self.parse_buf(&mut buf) { - ParseResult::NeedMin(need_min) => { + // TODO gather stats about this: + //info!("parse_buf returned {} leftover bytes to me", buf.len()); self.inp.put_back(buf); - self.inp.set_need_min(need_min); - again = true; - Poll::Ready(None) } - ParseResult::Ready(ev) => Poll::Ready(Some(Ok(ev))) + self.inp.set_need_min(res.need_min); + Poll::Ready(Some(Ok(res.events))) } + Err(e) => Poll::Ready(Some(Err(e.into()))) } - Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))), - Poll::Ready(None) => Poll::Ready(None), - Poll::Pending => Poll::Pending, - }; - if !again { - break z; } + Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))), + Poll::Ready(None) => Poll::Ready(None), + Poll::Pending => Poll::Pending, } } } pub struct EventFull { - ts: u64, - pulse: u64, + + // TODO add structures to hold list of events + } impl EventFull { pub fn dummy() -> Self { Self { - ts: 0, - pulse: 0, } }