Add buffer to binning handler

This commit is contained in:
Dominik Werder
2025-02-24 10:51:41 +01:00
parent 9a7a9b7cfc
commit f7aea90956
2 changed files with 110 additions and 21 deletions

View File

@@ -680,7 +680,7 @@ mod container_events_serde {
use std::fmt;
use std::marker::PhantomData;
macro_rules! trace_serde { ($($arg:tt)*) => ( if false { eprintln!($($arg)*); }) }
macro_rules! trace_serde { ($($arg:tt)*) => ( if true { eprintln!($($arg)*); }) }
impl<EVT> Serialize for ContainerEvents<EVT>
where
@@ -739,8 +739,8 @@ mod container_events_serde {
trace_serde!("Vis ContainerEvents visit_map");
let mut tss = None;
let mut vals = None;
while let Some(key) = map.next_key::<&str>()? {
match key {
while let Some(key) = map.next_key::<String>()? {
match key.as_str() {
"tss" => {
tss = Some(map.next_value()?);
}
@@ -749,7 +749,7 @@ mod container_events_serde {
}
_ => {
use serde::de::Error;
return Err(Error::unknown_field(key, &["tss", "vals"]));
return Err(Error::unknown_field(&key, &["tss", "vals"]));
}
}
}
@@ -1310,6 +1310,10 @@ where
self
}
fn truncate_front(&mut self, len: usize) {
self.truncate_front(len);
}
fn to_f32_for_binning_v01(&self) -> Box<dyn BinningggContainerEventsDyn> {
let mut ret = ContainerEvents::new();
for r in self.iter_zip() {

View File

@@ -6,10 +6,12 @@ use crate::log;
use daqbuf_err as err;
use futures_util::Stream;
use futures_util::StreamExt;
use items_0::streamitem::sitem_data;
use items_0::streamitem::LogItem;
use items_0::streamitem::Sitemty;
use items_0::timebin::BinnedEventsTimeweightTrait;
use items_0::timebin::BinningggContainerBinsDyn;
use items_0::timebin::BinningggContainerEventsDyn;
use items_0::timebin::BinningggError;
use items_0::timebin::BinsBoxed;
use items_0::timebin::EventsBoxed;
@@ -36,6 +38,9 @@ autoerr::create_error_v1!(
enum variants {
InnerDynMissing,
Dummy,
EmptyEventsBuffer,
NoProgress,
Binning(#[from] BinningggError),
},
);
@@ -164,6 +169,7 @@ impl BinnedEventsTimeweightTrait for BinnedEventsTimeweightLazy {
enum StreamState {
Reading,
Buffered(Box<dyn BinningggContainerEventsDyn>),
Remains,
Done,
Invalid,
@@ -190,6 +196,92 @@ impl BinnedEventsTimeweightStream {
}
}
fn consume_evsbuf(
evs: &mut Box<dyn BinningggContainerEventsDyn>,
binner: &mut BinnedEventsTimeweightLazy,
_cx: &mut Context,
) -> Result<Option<Box<dyn BinningggContainerBinsDyn>>, Error> {
let nev = evs.len();
if nev == 0 {
// should not be here
let e = Error::EmptyEventsBuffer;
return Err(e);
}
match binner.ingest(evs) {
Ok(report) => match binner.output() {
Ok(Some(x)) => {
let nc = match report {
IngestReport::ConsumedAll => nev,
IngestReport::ConsumedPart(n) => n,
};
// TODO use better api which takes the number of consumed elements.
evs.truncate_front(nev - nc);
if x.len() == 0 {
if nc == 0 {
let e = Error::NoProgress;
Err(e)
} else {
Ok(None)
}
} else {
Ok(Some(x))
}
}
Ok(None) => {
let nc = match report {
IngestReport::ConsumedAll => nev,
IngestReport::ConsumedPart(n) => n,
};
evs.truncate_front(nev - nc);
if nc == 0 {
let e = Error::NoProgress;
Err(e)
} else {
Ok(None)
}
}
Err(e) => Err(e.into()),
},
Err(e) => Err(e.into()),
}
}
fn handle_buffered(
self: Pin<&mut Self>,
cx: &mut Context,
) -> ControlFlow<Poll<Option<<Self as Stream>::Item>>> {
use ControlFlow::*;
use Poll::*;
let self2 = self.get_mut();
let evsbuf = if let StreamState::Buffered(x) = &mut self2.state {
x
} else {
panic!("logic")
};
let binner = &mut self2.binned_events;
if evsbuf.len() == 0 {
// should not be here
let e = Error::EmptyEventsBuffer;
Break(Ready(Some(Err(daqbuf_err::Error::from_string(e)))))
} else {
let j = Self::consume_evsbuf(evsbuf, binner, cx);
if evsbuf.len() == 0 {
self2.state = StreamState::Reading;
}
match j {
Ok(Some(x)) => {
let item = sitem_data(x);
Break(Ready(Some(item)))
}
Ok(None) => Continue(()),
Err(e) => {
let e = daqbuf_err::Error::from_string(e);
Break(Ready(Some(Err(e))))
}
}
}
}
fn handle_sitemty(
mut self: Pin<&mut Self>,
item: Sitemty<ChannelEvents>,
@@ -203,20 +295,13 @@ impl BinnedEventsTimeweightStream {
Ok(x) => match x {
DataItem(x) => match x {
Data(x) => match x {
ChannelEvents::Events(evs) => match self.binned_events.ingest(&evs) {
Ok(report) => match self.binned_events.output() {
Ok(Some(x)) => {
if x.len() == 0 {
Continue(())
} else {
Break(Ready(Some(Ok(DataItem(Data(x))))))
}
}
Ok(None) => Continue(()),
Err(e) => Break(Ready(Some(Err(err::Error::from_string(e))))),
},
Err(e) => Break(Ready(Some(Err(err::Error::from_string(e))))),
},
ChannelEvents::Events(evs) => {
if evs.len() == 0 {
} else {
self.state = StreamState::Buffered(evs);
}
Continue(())
}
ChannelEvents::Status(_) => {
// TODO use the status
Continue(())
@@ -277,8 +362,6 @@ impl BinnedEventsTimeweightStream {
_cx: &mut Context,
) -> Poll<Option<<Self as Stream>::Item>> {
debug_input_container!("handle_eos range final {}", self.range_final);
use items_0::streamitem::RangeCompletableItem::*;
use items_0::streamitem::StreamItem::*;
use Poll::*;
self.state = StreamState::Remains;
if true || self.range_final {
@@ -330,6 +413,8 @@ impl BinnedEventsTimeweightStream {
use ControlFlow::*;
use Poll::*;
let ret = match &self.state {
// TODO before attempt to read from input, check whether we have events left in buffer.
// TODO if we do not consume events, and also not get bins out then error.
StreamState::Reading => match self.as_mut().inp.poll_next_unpin(cx) {
Ready(Some(x)) => self.as_mut().handle_sitemty(x, cx),
Ready(None) => {
@@ -338,6 +423,7 @@ impl BinnedEventsTimeweightStream {
}
Pending => Break(Pending),
},
StreamState::Buffered(_) => self.as_mut().handle_buffered(cx),
StreamState::Remains => Break(self.as_mut().handle_remains(cx)),
StreamState::Done => {
self.state = StreamState::Invalid;
@@ -359,7 +445,6 @@ impl Stream for BinnedEventsTimeweightStream {
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use ControlFlow::*;
trace!("poll");
loop {
break match self.as_mut().handle_main(cx) {
Break(x) => x,