From 827161e06e351b3907c14d7b8fad53864f6b2943 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Sat, 3 Apr 2021 15:12:31 +0200 Subject: [PATCH] WIP on event parser --- Cargo.toml | 8 +- disk/src/lib.rs | 217 ++++++++++++++++++++++++++++++++++++++++++--- httpret/src/lib.rs | 2 +- 3 files changed, 211 insertions(+), 16 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 34bc19e..ef35ea0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,7 +2,7 @@ members = ["retrieval", "httpret", "err", "disk"] [profile.release] -#opt-level = 0 -#overflow-checks = true -#debug = 2 -#debug-assertions = true +opt-level = 0 +overflow-checks = true +debug = 2 +debug-assertions = true diff --git a/disk/src/lib.rs b/disk/src/lib.rs index 6c19158..9d0733e 100644 --- a/disk/src/lib.rs +++ b/disk/src/lib.rs @@ -8,9 +8,11 @@ use tokio::fs::File; use std::future::Future; use futures_core::Stream; use futures_util::future::FusedFuture; -use bytes::{Bytes, BytesMut}; +use futures_util::{pin_mut, StreamExt}; +use bytes::{Bytes, BytesMut, BufMut, Buf}; use std::path::PathBuf; + pub async fn read_test_1(query: &netpod::AggQuerySingleChannel) -> Result { let pre = "/data/sf-databuffer/daq_swissfel"; let path = format!("{}/{}_{}/byTime/{}/{:019}/{:010}/{:019}_00000_Data", pre, query.ksprefix, query.keyspace, query.channel.name(), query.timebin, query.split, query.tbsize); @@ -38,11 +40,11 @@ struct FileReader { } impl Stream for FileReader { - type Item = Result; + type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let blen = self.buffer_size as usize; - let mut buf2 = bytes::BytesMut::with_capacity(blen); + let mut buf2 = BytesMut::with_capacity(blen); buf2.resize(buf2.capacity(), 0); if buf2.as_mut().len() != blen { panic!("logic"); @@ -168,7 +170,7 @@ pub fn raw_concat_channel_read_stream_try_open_in_background(query: &netpod::Agg // I can not simply drop the reading future, that would lose the request. if reading.is_some() { - let k: Result<(tokio::fs::File, bytes::BytesMut), Error> = reading.as_mut().unwrap().await; + let k: Result<(tokio::fs::File, BytesMut), Error> = reading.as_mut().unwrap().await; if k.is_err() { error!("LONELY READ ERROR"); } @@ -180,7 +182,7 @@ pub fn raw_concat_channel_read_stream_try_open_in_background(query: &netpod::Agg else if fopen.is_some() { if file.is_some() { if reading.is_none() { - let mut buf = bytes::BytesMut::with_capacity(blen); + let mut buf = BytesMut::with_capacity(blen); let mut file2 = file.take().unwrap(); let a = async move { file2.read_buf(&mut buf).await?; @@ -234,7 +236,7 @@ pub fn raw_concat_channel_read_stream_try_open_in_background(query: &netpod::Agg } else if file.is_some() { loop { - let mut buf = bytes::BytesMut::with_capacity(blen); + let mut buf = BytesMut::with_capacity(blen); let mut file2 = file.take().unwrap(); let n1 = file2.read_buf(&mut buf).await?; if n1 == 0 { @@ -261,8 +263,7 @@ pub fn raw_concat_channel_read_stream_try_open_in_background(query: &netpod::Agg } -// TODO implement another variant with a dedicated task to feed the opened file queue. -pub fn raw_concat_channel_read_stream_file_pipe(query: &netpod::AggQuerySingleChannel) -> impl Stream> + Send { +pub fn raw_concat_channel_read_stream_file_pipe(query: &netpod::AggQuerySingleChannel) -> impl Stream> + Send { let query = query.clone(); async_stream::stream! { let chrx = open_files(&query); @@ -280,7 +281,7 @@ pub fn raw_concat_channel_read_stream_file_pipe(query: &netpod::AggQuerySingleCh break; } else { - yield Ok(buf.freeze()); + yield Ok(buf); } } } @@ -319,8 +320,201 @@ fn open_files(query: &netpod::AggQuerySingleChannel) -> async_channel::Receiver< } +pub fn parsed1(query: &netpod::AggQuerySingleChannel) -> impl Stream> + Send { + let inp: Pin> + Send>> = Box::pin(raw_concat_channel_read_stream_file_pipe(query)); + EventChunker::new(inp) + .map(|k| { + // TODO handle error + match k { + Ok(k) => { + let mut buf = BytesMut::with_capacity(16); + buf.put_u64_le(k.ts); + Ok(buf.freeze()) + } + Err(e) => Err(e) + } + }) +} + + +pub struct EventChunker { + inp: NeedMinBuffer, + had_channel: bool, + need_min: u32, + polled: u32, + state: DataFileState, +} + +enum DataFileState { + BEGIN, + EVENT, +} + +impl EventChunker { + + pub fn new(inp: Pin> + Send>>) -> Self { + let mut inp = NeedMinBuffer::new(inp); + inp.set_need_min(4); + Self { + inp: inp, + had_channel: false, + need_min: 4, + polled: 0, + state: DataFileState::BEGIN, + } + } + + fn parse_buf(&mut self, buf: &mut BytesMut) -> ParseResult { + todo!() + } + +} + +enum ParseResult { + NeedMin(u32), + Ready(EventFull), +} + +impl Stream for EventChunker { + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + loop { + let mut again = false; + self.polled += 1; + if self.polled >= 50 { + return Poll::Ready(None); + } + let g = &mut self.inp; + pin_mut!(g); + let z = match g.poll_next(cx) { + Poll::Ready(Some(Ok(mut buf))) => { + info!("EventChunker got buffer len {}", buf.len()); + + // TODO put parsing code here in a separate function. + // Need to parse in a loop until no more progress can be made in that buffer. + // Test with small buffer sizes. + // Return type of this EventChunker must be able to hold many events for batching. + // TODO need a loop within this function, if parsing needs more input than it thought, + // I need to update upstream and try again. + // BUT: do not loop if at least one event can be emitted! Otherwise it accumulates indefinitely! + match self.parse_buf(&mut buf) { + ParseResult::NeedMin(need_min) => { + self.inp.put_back(buf); + self.inp.set_need_min(need_min); + again = true; + Poll::Ready(None) + } + ParseResult::Ready(ev) => Poll::Ready(Some(Ok(ev))) + } + } + Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))), + Poll::Ready(None) => Poll::Ready(None), + Poll::Pending => Poll::Pending, + }; + if !again { + break z; + } + } + } + +} + +pub struct EventFull { + ts: u64, + pulse: u64, +} + +impl EventFull { + + pub fn dummy() -> Self { + Self { + ts: 0, + pulse: 0, + } + } + +} + + + + +pub struct NeedMinBuffer { + inp: Pin> + Send>>, + need_min: u32, + left: Option, +} + +impl NeedMinBuffer { + + pub fn new(inp: Pin> + Send>>) -> Self { + Self { + inp: inp, + need_min: 1, + left: None, + } + } + + pub fn put_back(&mut self, buf: BytesMut) { + assert!(self.left.is_none()); + } + + pub fn set_need_min(&mut self, need_min: u32) { + self.need_min = need_min; + } + +} + +impl Stream for NeedMinBuffer { + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + loop { + let mut again = false; + let g = &mut self.inp; + pin_mut!(g); + let z = match g.poll_next(cx) { + Poll::Ready(Some(Ok(buf))) => { + match self.left.take() { + Some(mut left) => { + left.unsplit(buf); + let buf = left; + if buf.len() as u32 >= self.need_min { + Poll::Ready(Some(Ok(buf))) + } + else { + self.left.replace(buf); + again = true; + Poll::Pending + } + } + None => { + if buf.len() as u32 >= self.need_min { + Poll::Ready(Some(Ok(buf))) + } + else { + self.left.replace(buf); + again = true; + Poll::Pending + } + } + } + } + Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e.into()))), + Poll::Ready(None) => Poll::Ready(None), + Poll::Pending => Poll::Pending, + }; + if !again { + break z; + } + } + } + +} + + + pub fn raw_concat_channel_read_stream(query: &netpod::AggQuerySingleChannel) -> impl Stream> + Send { - use futures_util::{StreamExt, pin_mut}; let mut query = query.clone(); async_stream::stream! { let mut i1 = 0; @@ -340,6 +534,7 @@ pub fn raw_concat_channel_read_stream(query: &netpod::AggQuerySingleChannel) -> } } + pub fn raw_concat_channel_read_stream_timebin(query: &netpod::AggQuerySingleChannel) -> impl Stream> { let query = query.clone(); let pre = "/data/sf-databuffer/daq_swissfel"; @@ -355,7 +550,7 @@ pub fn raw_concat_channel_read_stream_timebin(query: &netpod::AggQuerySingleChan let blen = query.buffer_size as usize; use tokio::io::AsyncReadExt; loop { - let mut buf = bytes::BytesMut::with_capacity(blen); + let mut buf = BytesMut::with_capacity(blen); assert!(buf.is_empty()); if false { buf.resize(buf.capacity(), 0); diff --git a/httpret/src/lib.rs b/httpret/src/lib.rs index 732a9c6..7e3858c 100644 --- a/httpret/src/lib.rs +++ b/httpret/src/lib.rs @@ -65,7 +65,7 @@ async fn parsed_raw(req: Request) -> Result, Error> { let query: AggQuerySingleChannel = serde_json::from_slice(&bodyslice)?; //let q = disk::read_test_1(&query).await?; //let s = q.inner; - let s = disk::raw_concat_channel_read_stream_file_pipe(&query); + let s = disk::parsed1(&query); let res = response(StatusCode::OK) .body(Body::wrap_stream(s))?; /*