diff --git a/src/binning/container_events.rs b/src/binning/container_events.rs index 5228458..c8af133 100644 --- a/src/binning/container_events.rs +++ b/src/binning/container_events.rs @@ -680,7 +680,7 @@ mod container_events_serde { use std::fmt; use std::marker::PhantomData; - macro_rules! trace_serde { ($($arg:tt)*) => ( if false { eprintln!($($arg)*); }) } + macro_rules! trace_serde { ($($arg:tt)*) => ( if true { eprintln!($($arg)*); }) } impl Serialize for ContainerEvents where @@ -739,8 +739,8 @@ mod container_events_serde { trace_serde!("Vis ContainerEvents visit_map"); let mut tss = None; let mut vals = None; - while let Some(key) = map.next_key::<&str>()? { - match key { + while let Some(key) = map.next_key::()? { + match key.as_str() { "tss" => { tss = Some(map.next_value()?); } @@ -749,7 +749,7 @@ mod container_events_serde { } _ => { use serde::de::Error; - return Err(Error::unknown_field(key, &["tss", "vals"])); + return Err(Error::unknown_field(&key, &["tss", "vals"])); } } } @@ -1310,6 +1310,10 @@ where self } + fn truncate_front(&mut self, len: usize) { + self.truncate_front(len); + } + fn to_f32_for_binning_v01(&self) -> Box { let mut ret = ContainerEvents::new(); for r in self.iter_zip() { diff --git a/src/binning/timeweight/timeweight_events_dyn.rs b/src/binning/timeweight/timeweight_events_dyn.rs index 3d9dd98..2e629ec 100644 --- a/src/binning/timeweight/timeweight_events_dyn.rs +++ b/src/binning/timeweight/timeweight_events_dyn.rs @@ -6,10 +6,12 @@ use crate::log; use daqbuf_err as err; use futures_util::Stream; use futures_util::StreamExt; +use items_0::streamitem::sitem_data; use items_0::streamitem::LogItem; use items_0::streamitem::Sitemty; use items_0::timebin::BinnedEventsTimeweightTrait; use items_0::timebin::BinningggContainerBinsDyn; +use items_0::timebin::BinningggContainerEventsDyn; use items_0::timebin::BinningggError; use items_0::timebin::BinsBoxed; use items_0::timebin::EventsBoxed; @@ -36,6 +38,9 @@ autoerr::create_error_v1!( enum variants { InnerDynMissing, Dummy, + EmptyEventsBuffer, + NoProgress, + Binning(#[from] BinningggError), }, ); @@ -164,6 +169,7 @@ impl BinnedEventsTimeweightTrait for BinnedEventsTimeweightLazy { enum StreamState { Reading, + Buffered(Box), Remains, Done, Invalid, @@ -190,6 +196,92 @@ impl BinnedEventsTimeweightStream { } } + fn consume_evsbuf( + evs: &mut Box, + binner: &mut BinnedEventsTimeweightLazy, + _cx: &mut Context, + ) -> Result>, Error> { + let nev = evs.len(); + if nev == 0 { + // should not be here + let e = Error::EmptyEventsBuffer; + return Err(e); + } + match binner.ingest(evs) { + Ok(report) => match binner.output() { + Ok(Some(x)) => { + let nc = match report { + IngestReport::ConsumedAll => nev, + IngestReport::ConsumedPart(n) => n, + }; + // TODO use better api which takes the number of consumed elements. + evs.truncate_front(nev - nc); + if x.len() == 0 { + if nc == 0 { + let e = Error::NoProgress; + Err(e) + } else { + Ok(None) + } + } else { + Ok(Some(x)) + } + } + Ok(None) => { + let nc = match report { + IngestReport::ConsumedAll => nev, + IngestReport::ConsumedPart(n) => n, + }; + evs.truncate_front(nev - nc); + if nc == 0 { + let e = Error::NoProgress; + Err(e) + } else { + Ok(None) + } + } + Err(e) => Err(e.into()), + }, + Err(e) => Err(e.into()), + } + } + + fn handle_buffered( + self: Pin<&mut Self>, + cx: &mut Context, + ) -> ControlFlow::Item>>> { + use ControlFlow::*; + use Poll::*; + let self2 = self.get_mut(); + let evsbuf = if let StreamState::Buffered(x) = &mut self2.state { + x + } else { + panic!("logic") + }; + let binner = &mut self2.binned_events; + if evsbuf.len() == 0 { + // should not be here + let e = Error::EmptyEventsBuffer; + Break(Ready(Some(Err(daqbuf_err::Error::from_string(e))))) + } else { + let j = Self::consume_evsbuf(evsbuf, binner, cx); + if evsbuf.len() == 0 { + self2.state = StreamState::Reading; + } + match j { + Ok(Some(x)) => { + let item = sitem_data(x); + Break(Ready(Some(item))) + } + Ok(None) => Continue(()), + Err(e) => { + let e = daqbuf_err::Error::from_string(e); + Break(Ready(Some(Err(e)))) + } + } + } + } + fn handle_sitemty( mut self: Pin<&mut Self>, item: Sitemty, @@ -203,20 +295,13 @@ impl BinnedEventsTimeweightStream { Ok(x) => match x { DataItem(x) => match x { Data(x) => match x { - ChannelEvents::Events(evs) => match self.binned_events.ingest(&evs) { - Ok(report) => match self.binned_events.output() { - Ok(Some(x)) => { - if x.len() == 0 { - Continue(()) - } else { - Break(Ready(Some(Ok(DataItem(Data(x)))))) - } - } - Ok(None) => Continue(()), - Err(e) => Break(Ready(Some(Err(err::Error::from_string(e))))), - }, - Err(e) => Break(Ready(Some(Err(err::Error::from_string(e))))), - }, + ChannelEvents::Events(evs) => { + if evs.len() == 0 { + } else { + self.state = StreamState::Buffered(evs); + } + Continue(()) + } ChannelEvents::Status(_) => { // TODO use the status Continue(()) @@ -277,8 +362,6 @@ impl BinnedEventsTimeweightStream { _cx: &mut Context, ) -> Poll::Item>> { debug_input_container!("handle_eos range final {}", self.range_final); - use items_0::streamitem::RangeCompletableItem::*; - use items_0::streamitem::StreamItem::*; use Poll::*; self.state = StreamState::Remains; if true || self.range_final { @@ -330,6 +413,8 @@ impl BinnedEventsTimeweightStream { use ControlFlow::*; use Poll::*; let ret = match &self.state { + // TODO before attempt to read from input, check whether we have events left in buffer. + // TODO if we do not consume events, and also not get bins out then error. StreamState::Reading => match self.as_mut().inp.poll_next_unpin(cx) { Ready(Some(x)) => self.as_mut().handle_sitemty(x, cx), Ready(None) => { @@ -338,6 +423,7 @@ impl BinnedEventsTimeweightStream { } Pending => Break(Pending), }, + StreamState::Buffered(_) => self.as_mut().handle_buffered(cx), StreamState::Remains => Break(self.as_mut().handle_remains(cx)), StreamState::Done => { self.state = StreamState::Invalid; @@ -359,7 +445,6 @@ impl Stream for BinnedEventsTimeweightStream { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use ControlFlow::*; - trace!("poll"); loop { break match self.as_mut().handle_main(cx) { Break(x) => x,