This commit is contained in:
Dominik Werder
2023-02-10 15:40:40 +01:00
parent b453d61250
commit 717dc1d1b6
4 changed files with 30 additions and 10 deletions

View File

@@ -523,7 +523,7 @@ impl crate::merger::Mergeable for ChannelEvents {
fn new_empty(&self) -> Self { fn new_empty(&self) -> Self {
match self { match self {
ChannelEvents::Events(k) => ChannelEvents::Events(k.new_empty()), ChannelEvents::Events(k) => ChannelEvents::Events(k.new_empty()),
ChannelEvents::Status(k) => ChannelEvents::Status(k.clone()), ChannelEvents::Status(_) => ChannelEvents::Status(None),
} }
} }
@@ -536,8 +536,20 @@ impl crate::merger::Mergeable for ChannelEvents {
ChannelEvents::Status(k) => match dst { ChannelEvents::Status(k) => match dst {
ChannelEvents::Events(_) => Err(merger::MergeError::NotCompatible), ChannelEvents::Events(_) => Err(merger::MergeError::NotCompatible),
ChannelEvents::Status(j) => match j { ChannelEvents::Status(j) => match j {
Some(_) => Err(merger::MergeError::Full), Some(_) => {
trace!("drain_into merger::MergeError::Full");
Err(merger::MergeError::Full)
}
None => { None => {
if range.0 > 0 {
trace!("weird range {range:?}");
}
if range.1 > 1 {
trace!("weird range {range:?}");
}
if range.0 == range.1 {
trace!("try to add empty range to status container {range:?}");
}
*j = k.take(); *j = k.take();
Ok(()) Ok(())
} }

View File

@@ -15,19 +15,19 @@ use std::task::Poll;
#[allow(unused)] #[allow(unused)]
macro_rules! trace2 { macro_rules! trace2 {
($($arg:tt)*) => (); (__$($arg:tt)*) => ();
($($arg:tt)*) => (trace!($($arg)*)); ($($arg:tt)*) => (trace!($($arg)*));
} }
#[allow(unused)] #[allow(unused)]
macro_rules! trace3 { macro_rules! trace3 {
($($arg:tt)*) => (); (__$($arg:tt)*) => ();
($($arg:tt)*) => (trace!($($arg)*)); ($($arg:tt)*) => (trace!($($arg)*));
} }
#[allow(unused)] #[allow(unused)]
macro_rules! trace4 { macro_rules! trace4 {
($($arg:tt)*) => (); (__$($arg:tt)*) => ();
($($arg:tt)*) => (trace!($($arg)*)); ($($arg:tt)*) => (trace!($($arg)*));
} }
@@ -48,6 +48,7 @@ pub trait Mergeable<Rhs = Self>: fmt::Debug + Unpin {
fn ts_min(&self) -> Option<u64>; fn ts_min(&self) -> Option<u64>;
fn ts_max(&self) -> Option<u64>; fn ts_max(&self) -> Option<u64>;
fn new_empty(&self) -> Self; fn new_empty(&self) -> Self;
// TODO when MergeError::Full gets returned, any guarantees about what has been modified or kept unchanged?
fn drain_into(&mut self, dst: &mut Self, range: (usize, usize)) -> Result<(), MergeError>; fn drain_into(&mut self, dst: &mut Self, range: (usize, usize)) -> Result<(), MergeError>;
fn find_lowest_index_gt(&self, ts: u64) -> Option<usize>; fn find_lowest_index_gt(&self, ts: u64) -> Option<usize>;
fn find_lowest_index_ge(&self, ts: u64) -> Option<usize>; fn find_lowest_index_ge(&self, ts: u64) -> Option<usize>;
@@ -68,6 +69,7 @@ pub struct Merger<T> {
done_buffered: bool, done_buffered: bool,
done_range_complete: bool, done_range_complete: bool,
complete: bool, complete: bool,
poll_count: usize,
} }
impl<T> fmt::Debug for Merger<T> impl<T> fmt::Debug for Merger<T>
@@ -107,6 +109,7 @@ where
done_buffered: false, done_buffered: false,
done_range_complete: false, done_range_complete: false,
complete: false, complete: false,
poll_count: 0,
} }
} }
@@ -300,6 +303,7 @@ where
fn poll3(mut self: Pin<&mut Self>, cx: &mut Context) -> ControlFlow<Poll<Option<Result<T, Error>>>> { fn poll3(mut self: Pin<&mut Self>, cx: &mut Context) -> ControlFlow<Poll<Option<Result<T, Error>>>> {
use ControlFlow::*; use ControlFlow::*;
use Poll::*; use Poll::*;
trace4!("poll3");
#[allow(unused)] #[allow(unused)]
let ninps = self.inps.iter().filter(|a| a.is_some()).count(); let ninps = self.inps.iter().filter(|a| a.is_some()).count();
let nitems = self.items.iter().filter(|a| a.is_some()).count(); let nitems = self.items.iter().filter(|a| a.is_some()).count();
@@ -362,11 +366,15 @@ where
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*; use Poll::*;
let span = span!(Level::TRACE, "merger"); self.poll_count += 1;
let _spanguard = span.enter(); let span1 = span!(Level::INFO, "Merger", pc = self.poll_count);
let _spg = span1.enter();
loop { loop {
trace3!("poll"); trace3!("poll");
break if self.complete { break if self.poll_count == usize::MAX {
self.done_range_complete = true;
continue;
} else if self.complete {
panic!("poll after complete"); panic!("poll after complete");
} else if self.done_range_complete { } else if self.done_range_complete {
self.complete = true; self.complete = true;

View File

@@ -1727,7 +1727,7 @@ where
pub mod log { pub mod log {
#[allow(unused_imports)] #[allow(unused_imports)]
pub use tracing::{debug, error, event, info, span, trace, warn, Level}; pub use tracing::{self, debug, error, event, info, span, trace, warn, Level};
} }
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]

View File

@@ -194,7 +194,7 @@ where
type Item = Sitemty<ITY>; type Item = Sitemty<ITY>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> { fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let span1 = span!(Level::TRACE, "RangeFilter2", range = tracing::field::Empty); let span1 = span!(Level::INFO, "RangeFilter2", range = tracing::field::Empty);
span1.record("range", &self.range_str.as_str()); span1.record("range", &self.range_str.as_str());
let _spg = span1.enter(); let _spg = span1.enter();
Self::poll_next(self, cx) Self::poll_next(self, cx)