Refactor and prepare to remove MinMaxAvgWaveBins

This commit is contained in:
Dominik Werder
2022-06-16 19:24:55 +02:00
parent 8090088448
commit f183f0bb28
6 changed files with 79 additions and 56 deletions

View File

@@ -40,7 +40,8 @@ where
agg_kind: AggKind,
node_config: NodeConfigCached,
open_check_local_file: Option<Pin<Box<dyn Future<Output = Result<File, io::Error>> + Send>>>,
fut2: Option<Pin<Box<dyn Stream<Item = Sitemty<<ENP as EventsTypeAliases>::TimeBinOutput>> + Send>>>,
stream_from_other_inputs:
Option<Pin<Box<dyn Stream<Item = Sitemty<<ENP as EventsTypeAliases>::TimeBinOutput>> + Send>>>,
read_from_cache: bool,
cache_written: bool,
data_complete: bool,
@@ -77,7 +78,7 @@ where
agg_kind,
node_config: node_config.clone(),
open_check_local_file: None,
fut2: None,
stream_from_other_inputs: None,
read_from_cache: false,
cache_written: false,
data_complete: false,
@@ -208,10 +209,10 @@ where
let range = self.query.patch().patch_range();
match PreBinnedPatchRange::covering_range(range, self.query.patch().bin_count() + 1) {
Ok(Some(range)) => {
self.fut2 = Some(self.setup_from_higher_res_prebinned(range)?);
self.stream_from_other_inputs = Some(self.setup_from_higher_res_prebinned(range)?);
}
Ok(None) => {
self.fut2 = Some(self.setup_merged_from_remotes()?);
self.stream_from_other_inputs = Some(self.setup_merged_from_remotes()?);
}
Err(e) => return Err(e),
}
@@ -223,6 +224,7 @@ where
mut fut: Pin<Box<dyn Future<Output = Result<WrittenPbCache, Error>> + Send>>,
cx: &mut Context,
) -> Poll<Option<Sitemty<<<ENP as EventsNodeProcessor>::Output as TimeBinnableType>::Output>>> {
trace!("poll_write_fut");
use Poll::*;
match fut.poll_unpin(cx) {
Ready(item) => {
@@ -268,6 +270,7 @@ where
>,
cx: &mut Context,
) -> Poll<Option<Sitemty<<<ENP as EventsNodeProcessor>::Output as TimeBinnableType>::Output>>> {
trace!("poll_read_cache_fut");
use Poll::*;
match fut.poll_unpin(cx) {
Ready(item) => {
@@ -294,8 +297,10 @@ where
fn handle_data_complete(
self: &mut Self,
) -> Poll<Option<Sitemty<<<ENP as EventsNodeProcessor>::Output as TimeBinnableType>::Output>>> {
trace!("handle_data_complete");
use Poll::*;
if self.cache_written {
// TODO can we ever get here?
if self.range_complete_observed {
self.range_complete_emitted = true;
let item = RangeCompletableItem::RangeComplete;
@@ -305,6 +310,7 @@ where
Ready(None)
}
} else if self.read_from_cache {
// TODO refactor: raising cache_written even though we did not actually write is misleading.
self.cache_written = true;
self.all_done = true;
Ready(None)
@@ -333,6 +339,7 @@ where
}
}
_ => {
// TODO refactor: raising cache_written even though we did not actually write is misleading.
self.cache_written = true;
self.all_done = true;
Ready(None)
@@ -341,7 +348,7 @@ where
}
}
fn poll_fut2(
fn poll_stream_from_other_inputs(
self: &mut Self,
mut fut: Pin<
Box<
@@ -358,26 +365,29 @@ where
use Poll::*;
match fut.poll_next_unpin(cx) {
Ready(Some(k)) => match k {
Ok(item) => match item {
StreamItem::Log(item) => Ready(Some(Ok(StreamItem::Log(item)))),
StreamItem::Stats(item) => Ready(Some(Ok(StreamItem::Stats(item)))),
StreamItem::DataItem(item) => match item {
RangeCompletableItem::RangeComplete => {
self.range_complete_observed = true;
Ready(None)
}
RangeCompletableItem::Data(item) => {
if let Some(values) = &mut self.values {
values.append(&item);
} else {
let mut values = item.empty_like_self();
values.append(&item);
self.values = Some(values);
Ok(item) => {
self.stream_from_other_inputs = Some(fut);
match item {
StreamItem::Log(item) => Ready(Some(Ok(StreamItem::Log(item)))),
StreamItem::Stats(item) => Ready(Some(Ok(StreamItem::Stats(item)))),
StreamItem::DataItem(item) => match item {
RangeCompletableItem::RangeComplete => {
self.range_complete_observed = true;
Ready(None)
}
Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(item)))))
}
},
},
RangeCompletableItem::Data(item) => {
if let Some(values) = &mut self.values {
values.append(&item);
} else {
let mut values = item.empty_like_self();
values.append(&item);
self.values = Some(values);
}
Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(item)))))
}
},
}
}
Err(e) => {
self.errored = true;
Ready(Some(Err(e)))
@@ -388,7 +398,7 @@ where
Ready(None)
}
Pending => {
self.fut2 = Some(fut);
self.stream_from_other_inputs = Some(fut);
Pending
}
}
@@ -416,7 +426,7 @@ where
// TODO other error kinds
io::ErrorKind::NotFound => match self.try_setup_fetch_prebinned_higher_res() {
Ok(_) => {
if self.fut2.is_none() {
if self.stream_from_other_inputs.is_none() {
let e =
Err(Error::with_msg(format!("try_setup_fetch_prebinned_higher_res failed")));
self.errored = true;
@@ -447,10 +457,16 @@ where
}
}
}
}
fn _check_for_existing_cached_data(&mut self) -> Result<(), Error> {
todo!()
}
macro_rules! some_or_continue {
($x:expr) => {
if let Ready(None) = $x {
continue;
} else {
$x
}
};
}
impl<NTY, END, EVS, ENP> Stream for PreBinnedValueStream<NTY, END, EVS, ENP>
@@ -468,7 +484,7 @@ where
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
'outer: loop {
loop {
break if self.completed {
panic!("PreBinnedValueStream poll_next on completed");
} else if self.errored {
@@ -480,18 +496,23 @@ where
} else if let Some(item) = self.streamlog.pop() {
Ready(Some(Ok(StreamItem::Log(item))))
} else if let Some(fut) = self.write_fut.take() {
Self::poll_write_fut(&mut self, fut, cx)
let x = Self::poll_write_fut(&mut self, fut, cx);
some_or_continue!(x)
} else if let Some(fut) = self.read_cache_fut.take() {
Self::poll_read_cache_fut(&mut self, fut, cx)
let x = Self::poll_read_cache_fut(&mut self, fut, cx);
some_or_continue!(x)
} else if self.range_complete_emitted {
self.completed = true;
Ready(None)
} else if self.data_complete {
Self::handle_data_complete(&mut self)
} else if let Some(fut) = self.fut2.take() {
Self::poll_fut2(&mut self, fut, cx)
let x = Self::handle_data_complete(&mut self);
some_or_continue!(x)
} else if let Some(fut) = self.stream_from_other_inputs.take() {
let x = Self::poll_stream_from_other_inputs(&mut self, fut, cx);
some_or_continue!(x)
} else if let Some(fut) = self.open_check_local_file.take() {
Self::poll_open_check_local_file(&mut self, fut, cx)
let x = Self::poll_open_check_local_file(&mut self, fut, cx);
some_or_continue!(x)
} else {
let cfd = CacheFileDesc::new(
self.query.channel().clone(),
@@ -504,7 +525,7 @@ where
};
let fut = async { OpenOptions::new().read(true).open(path).await };
self.open_check_local_file = Some(Box::pin(fut));
continue 'outer;
continue;
};
}
}