From 717dc1d1b69ff342cf7cd5748b4ef4dddb521760 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Fri, 10 Feb 2023 15:40:40 +0100 Subject: [PATCH] WIP --- items_2/src/channelevents.rs | 16 ++++++++++++++-- items_2/src/merger.rs | 20 ++++++++++++++------ netpod/src/netpod.rs | 2 +- streams/src/rangefilter2.rs | 2 +- 4 files changed, 30 insertions(+), 10 deletions(-) diff --git a/items_2/src/channelevents.rs b/items_2/src/channelevents.rs index 1234ad2..28c6b5b 100644 --- a/items_2/src/channelevents.rs +++ b/items_2/src/channelevents.rs @@ -523,7 +523,7 @@ impl crate::merger::Mergeable for ChannelEvents { fn new_empty(&self) -> Self { match self { ChannelEvents::Events(k) => ChannelEvents::Events(k.new_empty()), - ChannelEvents::Status(k) => ChannelEvents::Status(k.clone()), + ChannelEvents::Status(_) => ChannelEvents::Status(None), } } @@ -536,8 +536,20 @@ impl crate::merger::Mergeable for ChannelEvents { ChannelEvents::Status(k) => match dst { ChannelEvents::Events(_) => Err(merger::MergeError::NotCompatible), ChannelEvents::Status(j) => match j { - Some(_) => Err(merger::MergeError::Full), + Some(_) => { + trace!("drain_into merger::MergeError::Full"); + Err(merger::MergeError::Full) + } None => { + if range.0 > 0 { + trace!("weird range {range:?}"); + } + if range.1 > 1 { + trace!("weird range {range:?}"); + } + if range.0 == range.1 { + trace!("try to add empty range to status container {range:?}"); + } *j = k.take(); Ok(()) } diff --git a/items_2/src/merger.rs b/items_2/src/merger.rs index c05e99e..74337d9 100644 --- a/items_2/src/merger.rs +++ b/items_2/src/merger.rs @@ -15,19 +15,19 @@ use std::task::Poll; #[allow(unused)] macro_rules! trace2 { - ($($arg:tt)*) => (); + (__$($arg:tt)*) => (); ($($arg:tt)*) => (trace!($($arg)*)); } #[allow(unused)] macro_rules! trace3 { - ($($arg:tt)*) => (); + (__$($arg:tt)*) => (); ($($arg:tt)*) => (trace!($($arg)*)); } #[allow(unused)] macro_rules! trace4 { - ($($arg:tt)*) => (); + (__$($arg:tt)*) => (); ($($arg:tt)*) => (trace!($($arg)*)); } @@ -48,6 +48,7 @@ pub trait Mergeable: fmt::Debug + Unpin { fn ts_min(&self) -> Option; fn ts_max(&self) -> Option; fn new_empty(&self) -> Self; + // TODO when MergeError::Full gets returned, any guarantees about what has been modified or kept unchanged? fn drain_into(&mut self, dst: &mut Self, range: (usize, usize)) -> Result<(), MergeError>; fn find_lowest_index_gt(&self, ts: u64) -> Option; fn find_lowest_index_ge(&self, ts: u64) -> Option; @@ -68,6 +69,7 @@ pub struct Merger { done_buffered: bool, done_range_complete: bool, complete: bool, + poll_count: usize, } impl fmt::Debug for Merger @@ -107,6 +109,7 @@ where done_buffered: false, done_range_complete: false, complete: false, + poll_count: 0, } } @@ -300,6 +303,7 @@ where fn poll3(mut self: Pin<&mut Self>, cx: &mut Context) -> ControlFlow>>> { use ControlFlow::*; use Poll::*; + trace4!("poll3"); #[allow(unused)] let ninps = self.inps.iter().filter(|a| a.is_some()).count(); let nitems = self.items.iter().filter(|a| a.is_some()).count(); @@ -362,11 +366,15 @@ where fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; - let span = span!(Level::TRACE, "merger"); - let _spanguard = span.enter(); + self.poll_count += 1; + let span1 = span!(Level::INFO, "Merger", pc = self.poll_count); + let _spg = span1.enter(); loop { trace3!("poll"); - break if self.complete { + break if self.poll_count == usize::MAX { + self.done_range_complete = true; + continue; + } else if self.complete { panic!("poll after complete"); } else if self.done_range_complete { self.complete = true; diff --git a/netpod/src/netpod.rs b/netpod/src/netpod.rs index e901f2b..b2fbeaa 100644 --- a/netpod/src/netpod.rs +++ b/netpod/src/netpod.rs @@ -1727,7 +1727,7 @@ where pub mod log { #[allow(unused_imports)] - pub use tracing::{debug, error, event, info, span, trace, warn, Level}; + pub use tracing::{self, debug, error, event, info, span, trace, warn, Level}; } #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] diff --git a/streams/src/rangefilter2.rs b/streams/src/rangefilter2.rs index a28caea..fbea3e3 100644 --- a/streams/src/rangefilter2.rs +++ b/streams/src/rangefilter2.rs @@ -194,7 +194,7 @@ where type Item = Sitemty; fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - let span1 = span!(Level::TRACE, "RangeFilter2", range = tracing::field::Empty); + let span1 = span!(Level::INFO, "RangeFilter2", range = tracing::field::Empty); span1.record("range", &self.range_str.as_str()); let _spg = span1.enter(); Self::poll_next(self, cx)