WIP on event parser

This commit is contained in:
Dominik Werder
2021-04-03 15:12:31 +02:00
parent be617258b2
commit 827161e06e
3 changed files with 211 additions and 16 deletions

View File

@@ -8,9 +8,11 @@ use tokio::fs::File;
use std::future::Future;
use futures_core::Stream;
use futures_util::future::FusedFuture;
use bytes::{Bytes, BytesMut};
use futures_util::{pin_mut, StreamExt};
use bytes::{Bytes, BytesMut, BufMut, Buf};
use std::path::PathBuf;
pub async fn read_test_1(query: &netpod::AggQuerySingleChannel) -> Result<netpod::BodyStream, Error> {
let pre = "/data/sf-databuffer/daq_swissfel";
let path = format!("{}/{}_{}/byTime/{}/{:019}/{:010}/{:019}_00000_Data", pre, query.ksprefix, query.keyspace, query.channel.name(), query.timebin, query.split, query.tbsize);
@@ -38,11 +40,11 @@ struct FileReader {
}
impl Stream for FileReader {
type Item = Result<bytes::Bytes, Error>;
type Item = Result<Bytes, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let blen = self.buffer_size as usize;
let mut buf2 = bytes::BytesMut::with_capacity(blen);
let mut buf2 = BytesMut::with_capacity(blen);
buf2.resize(buf2.capacity(), 0);
if buf2.as_mut().len() != blen {
panic!("logic");
@@ -168,7 +170,7 @@ pub fn raw_concat_channel_read_stream_try_open_in_background(query: &netpod::Agg
// I can not simply drop the reading future, that would lose the request.
if reading.is_some() {
let k: Result<(tokio::fs::File, bytes::BytesMut), Error> = reading.as_mut().unwrap().await;
let k: Result<(tokio::fs::File, BytesMut), Error> = reading.as_mut().unwrap().await;
if k.is_err() {
error!("LONELY READ ERROR");
}
@@ -180,7 +182,7 @@ pub fn raw_concat_channel_read_stream_try_open_in_background(query: &netpod::Agg
else if fopen.is_some() {
if file.is_some() {
if reading.is_none() {
let mut buf = bytes::BytesMut::with_capacity(blen);
let mut buf = BytesMut::with_capacity(blen);
let mut file2 = file.take().unwrap();
let a = async move {
file2.read_buf(&mut buf).await?;
@@ -234,7 +236,7 @@ pub fn raw_concat_channel_read_stream_try_open_in_background(query: &netpod::Agg
}
else if file.is_some() {
loop {
let mut buf = bytes::BytesMut::with_capacity(blen);
let mut buf = BytesMut::with_capacity(blen);
let mut file2 = file.take().unwrap();
let n1 = file2.read_buf(&mut buf).await?;
if n1 == 0 {
@@ -261,8 +263,7 @@ pub fn raw_concat_channel_read_stream_try_open_in_background(query: &netpod::Agg
}
// TODO implement another variant with a dedicated task to feed the opened file queue.
pub fn raw_concat_channel_read_stream_file_pipe(query: &netpod::AggQuerySingleChannel) -> impl Stream<Item=Result<Bytes, Error>> + Send {
pub fn raw_concat_channel_read_stream_file_pipe(query: &netpod::AggQuerySingleChannel) -> impl Stream<Item=Result<BytesMut, Error>> + Send {
let query = query.clone();
async_stream::stream! {
let chrx = open_files(&query);
@@ -280,7 +281,7 @@ pub fn raw_concat_channel_read_stream_file_pipe(query: &netpod::AggQuerySingleCh
break;
}
else {
yield Ok(buf.freeze());
yield Ok(buf);
}
}
}
@@ -319,8 +320,201 @@ fn open_files(query: &netpod::AggQuerySingleChannel) -> async_channel::Receiver<
}
pub fn parsed1(query: &netpod::AggQuerySingleChannel) -> impl Stream<Item=Result<Bytes, Error>> + Send {
let inp: Pin<Box<dyn Stream<Item=Result<BytesMut, Error>> + Send>> = Box::pin(raw_concat_channel_read_stream_file_pipe(query));
EventChunker::new(inp)
.map(|k| {
// TODO handle error
match k {
Ok(k) => {
let mut buf = BytesMut::with_capacity(16);
buf.put_u64_le(k.ts);
Ok(buf.freeze())
}
Err(e) => Err(e)
}
})
}
pub struct EventChunker {
inp: NeedMinBuffer,
had_channel: bool,
need_min: u32,
polled: u32,
state: DataFileState,
}
enum DataFileState {
BEGIN,
EVENT,
}
impl EventChunker {
pub fn new(inp: Pin<Box<dyn Stream<Item=Result<BytesMut, Error>> + Send>>) -> Self {
let mut inp = NeedMinBuffer::new(inp);
inp.set_need_min(4);
Self {
inp: inp,
had_channel: false,
need_min: 4,
polled: 0,
state: DataFileState::BEGIN,
}
}
fn parse_buf(&mut self, buf: &mut BytesMut) -> ParseResult {
todo!()
}
}
enum ParseResult {
NeedMin(u32),
Ready(EventFull),
}
impl Stream for EventChunker {
type Item = Result<EventFull, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
let mut again = false;
self.polled += 1;
if self.polled >= 50 {
return Poll::Ready(None);
}
let g = &mut self.inp;
pin_mut!(g);
let z = match g.poll_next(cx) {
Poll::Ready(Some(Ok(mut buf))) => {
info!("EventChunker got buffer len {}", buf.len());
// TODO put parsing code here in a separate function.
// Need to parse in a loop until no more progress can be made in that buffer.
// Test with small buffer sizes.
// Return type of this EventChunker must be able to hold many events for batching.
// TODO need a loop within this function, if parsing needs more input than it thought,
// I need to update upstream and try again.
// BUT: do not loop if at least one event can be emitted! Otherwise it accumulates indefinitely!
match self.parse_buf(&mut buf) {
ParseResult::NeedMin(need_min) => {
self.inp.put_back(buf);
self.inp.set_need_min(need_min);
again = true;
Poll::Ready(None)
}
ParseResult::Ready(ev) => Poll::Ready(Some(Ok(ev)))
}
}
Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))),
Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
};
if !again {
break z;
}
}
}
}
pub struct EventFull {
ts: u64,
pulse: u64,
}
impl EventFull {
pub fn dummy() -> Self {
Self {
ts: 0,
pulse: 0,
}
}
}
pub struct NeedMinBuffer {
inp: Pin<Box<dyn Stream<Item=Result<BytesMut, Error>> + Send>>,
need_min: u32,
left: Option<BytesMut>,
}
impl NeedMinBuffer {
pub fn new(inp: Pin<Box<dyn Stream<Item=Result<BytesMut, Error>> + Send>>) -> Self {
Self {
inp: inp,
need_min: 1,
left: None,
}
}
pub fn put_back(&mut self, buf: BytesMut) {
assert!(self.left.is_none());
}
pub fn set_need_min(&mut self, need_min: u32) {
self.need_min = need_min;
}
}
impl Stream for NeedMinBuffer {
type Item = Result<BytesMut, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
let mut again = false;
let g = &mut self.inp;
pin_mut!(g);
let z = match g.poll_next(cx) {
Poll::Ready(Some(Ok(buf))) => {
match self.left.take() {
Some(mut left) => {
left.unsplit(buf);
let buf = left;
if buf.len() as u32 >= self.need_min {
Poll::Ready(Some(Ok(buf)))
}
else {
self.left.replace(buf);
again = true;
Poll::Pending
}
}
None => {
if buf.len() as u32 >= self.need_min {
Poll::Ready(Some(Ok(buf)))
}
else {
self.left.replace(buf);
again = true;
Poll::Pending
}
}
}
}
Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e.into()))),
Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
};
if !again {
break z;
}
}
}
}
pub fn raw_concat_channel_read_stream(query: &netpod::AggQuerySingleChannel) -> impl Stream<Item=Result<Bytes, Error>> + Send {
use futures_util::{StreamExt, pin_mut};
let mut query = query.clone();
async_stream::stream! {
let mut i1 = 0;
@@ -340,6 +534,7 @@ pub fn raw_concat_channel_read_stream(query: &netpod::AggQuerySingleChannel) ->
}
}
pub fn raw_concat_channel_read_stream_timebin(query: &netpod::AggQuerySingleChannel) -> impl Stream<Item=Result<Bytes, Error>> {
let query = query.clone();
let pre = "/data/sf-databuffer/daq_swissfel";
@@ -355,7 +550,7 @@ pub fn raw_concat_channel_read_stream_timebin(query: &netpod::AggQuerySingleChan
let blen = query.buffer_size as usize;
use tokio::io::AsyncReadExt;
loop {
let mut buf = bytes::BytesMut::with_capacity(blen);
let mut buf = BytesMut::with_capacity(blen);
assert!(buf.is_empty());
if false {
buf.resize(buf.capacity(), 0);