Move match cases to functions
This commit is contained in:
@@ -47,6 +47,7 @@ where
|
||||
range_complete_observed: bool,
|
||||
range_complete_emitted: bool,
|
||||
errored: bool,
|
||||
all_done: bool,
|
||||
completed: bool,
|
||||
streamlog: Streamlog,
|
||||
values: Option<<ENP as EventsTypeAliases>::TimeBinOutput>,
|
||||
@@ -83,6 +84,7 @@ where
|
||||
range_complete_observed: false,
|
||||
range_complete_emitted: false,
|
||||
errored: false,
|
||||
all_done: false,
|
||||
completed: false,
|
||||
streamlog: Streamlog::new(node_config.ix as u32),
|
||||
// TODO use alias via some trait associated type:
|
||||
@@ -216,6 +218,182 @@ where
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn poll_write_fut(
|
||||
self: &mut Self,
|
||||
mut fut: Pin<Box<dyn Future<Output = Result<WrittenPbCache, Error>> + Send>>,
|
||||
cx: &mut Context,
|
||||
) -> Poll<Option<Sitemty<<<ENP as EventsNodeProcessor>::Output as TimeBinnableType>::Output>>> {
|
||||
use Poll::*;
|
||||
match fut.poll_unpin(cx) {
|
||||
Ready(item) => {
|
||||
self.cache_written = true;
|
||||
self.write_fut = None;
|
||||
match item {
|
||||
Ok(res) => {
|
||||
self.streamlog.append(
|
||||
Level::INFO,
|
||||
format!(
|
||||
"cache file written bytes: {} duration {} ms",
|
||||
res.bytes,
|
||||
res.duration.as_millis()
|
||||
),
|
||||
);
|
||||
self.all_done = true;
|
||||
Ready(None)
|
||||
}
|
||||
Err(e) => {
|
||||
self.errored = true;
|
||||
Ready(Some(Err(e)))
|
||||
}
|
||||
}
|
||||
}
|
||||
Pending => {
|
||||
self.write_fut = Some(fut);
|
||||
Pending
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn poll_read_cache_fut(
|
||||
self: &mut Self,
|
||||
mut fut: Pin<
|
||||
Box<
|
||||
dyn Future<
|
||||
Output = Result<
|
||||
StreamItem<RangeCompletableItem<<ENP as EventsTypeAliases>::TimeBinOutput>>,
|
||||
Error,
|
||||
>,
|
||||
> + Send,
|
||||
>,
|
||||
>,
|
||||
cx: &mut Context,
|
||||
) -> Poll<Option<Sitemty<<<ENP as EventsNodeProcessor>::Output as TimeBinnableType>::Output>>> {
|
||||
use Poll::*;
|
||||
match fut.poll_unpin(cx) {
|
||||
Ready(item) => {
|
||||
self.read_cache_fut = None;
|
||||
match item {
|
||||
Ok(item) => {
|
||||
self.data_complete = true;
|
||||
self.range_complete_observed = true;
|
||||
Ready(Some(Ok(item)))
|
||||
}
|
||||
Err(e) => {
|
||||
self.errored = true;
|
||||
Ready(Some(Err(e)))
|
||||
}
|
||||
}
|
||||
}
|
||||
Pending => {
|
||||
self.read_cache_fut = Some(fut);
|
||||
Pending
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_data_complete(
|
||||
self: &mut Self,
|
||||
) -> Poll<Option<Sitemty<<<ENP as EventsNodeProcessor>::Output as TimeBinnableType>::Output>>> {
|
||||
use Poll::*;
|
||||
if self.cache_written {
|
||||
if self.range_complete_observed {
|
||||
self.range_complete_emitted = true;
|
||||
let item = RangeCompletableItem::RangeComplete;
|
||||
Ready(Some(Ok(StreamItem::DataItem(item))))
|
||||
} else {
|
||||
self.all_done = true;
|
||||
Ready(None)
|
||||
}
|
||||
} else if self.read_from_cache {
|
||||
self.cache_written = true;
|
||||
self.all_done = true;
|
||||
Ready(None)
|
||||
} else {
|
||||
match self.query.cache_usage() {
|
||||
CacheUsage::Use | CacheUsage::Recreate => {
|
||||
if let Some(values) = self.values.take() {
|
||||
let msg = format!(
|
||||
"write cache file query: {:?} bin count: {}",
|
||||
self.query.patch(),
|
||||
values.len(),
|
||||
);
|
||||
self.streamlog.append(Level::INFO, msg);
|
||||
let fut = write_pb_cache_min_max_avg_scalar(
|
||||
values,
|
||||
self.query.patch().clone(),
|
||||
self.query.agg_kind().clone(),
|
||||
self.query.channel().clone(),
|
||||
self.node_config.clone(),
|
||||
);
|
||||
self.write_fut = Some(Box::pin(fut));
|
||||
Ready(None)
|
||||
} else {
|
||||
warn!("no values to write to cache");
|
||||
Ready(None)
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
self.cache_written = true;
|
||||
self.all_done = true;
|
||||
Ready(None)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn poll_fut2(
|
||||
self: &mut Self,
|
||||
mut fut: Pin<
|
||||
Box<
|
||||
dyn Stream<
|
||||
Item = Result<
|
||||
StreamItem<RangeCompletableItem<<ENP as EventsTypeAliases>::TimeBinOutput>>,
|
||||
Error,
|
||||
>,
|
||||
> + Send,
|
||||
>,
|
||||
>,
|
||||
cx: &mut Context,
|
||||
) -> Poll<Option<Sitemty<<<ENP as EventsNodeProcessor>::Output as TimeBinnableType>::Output>>> {
|
||||
use Poll::*;
|
||||
match fut.poll_next_unpin(cx) {
|
||||
Ready(Some(k)) => match k {
|
||||
Ok(item) => match item {
|
||||
StreamItem::Log(item) => Ready(Some(Ok(StreamItem::Log(item)))),
|
||||
StreamItem::Stats(item) => Ready(Some(Ok(StreamItem::Stats(item)))),
|
||||
StreamItem::DataItem(item) => match item {
|
||||
RangeCompletableItem::RangeComplete => {
|
||||
self.range_complete_observed = true;
|
||||
Ready(None)
|
||||
}
|
||||
RangeCompletableItem::Data(item) => {
|
||||
if let Some(values) = &mut self.values {
|
||||
values.append(&item);
|
||||
} else {
|
||||
let mut values = item.empty_like_self();
|
||||
values.append(&item);
|
||||
self.values = Some(values);
|
||||
}
|
||||
Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(item)))))
|
||||
}
|
||||
},
|
||||
},
|
||||
Err(e) => {
|
||||
self.errored = true;
|
||||
Ready(Some(Err(e)))
|
||||
}
|
||||
},
|
||||
Ready(None) => {
|
||||
self.data_complete = true;
|
||||
Ready(None)
|
||||
}
|
||||
Pending => {
|
||||
self.fut2 = Some(fut);
|
||||
Pending
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn poll_open_check_local_file(
|
||||
self: &mut Self,
|
||||
mut fut: Pin<Box<dyn Future<Output = Result<File, io::Error>> + Send>>,
|
||||
@@ -224,7 +402,6 @@ where
|
||||
use Poll::*;
|
||||
match fut.poll_unpin(cx) {
|
||||
Ready(item) => {
|
||||
self.open_check_local_file = None;
|
||||
match item {
|
||||
Ok(file) => {
|
||||
self.read_from_cache = true;
|
||||
@@ -264,7 +441,10 @@ where
|
||||
},
|
||||
}
|
||||
}
|
||||
Pending => Pending,
|
||||
Pending => {
|
||||
self.open_check_local_file = Some(fut);
|
||||
Pending
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -294,134 +474,24 @@ where
|
||||
} else if self.errored {
|
||||
self.completed = true;
|
||||
Ready(None)
|
||||
} else if self.all_done {
|
||||
self.completed = true;
|
||||
Ready(None)
|
||||
} else if let Some(item) = self.streamlog.pop() {
|
||||
Ready(Some(Ok(StreamItem::Log(item))))
|
||||
} else if let Some(fut) = &mut self.write_fut {
|
||||
match fut.poll_unpin(cx) {
|
||||
Ready(item) => {
|
||||
self.cache_written = true;
|
||||
self.write_fut = None;
|
||||
match item {
|
||||
Ok(res) => {
|
||||
self.streamlog.append(
|
||||
Level::INFO,
|
||||
format!(
|
||||
"cache file written bytes: {} duration {} ms",
|
||||
res.bytes,
|
||||
res.duration.as_millis()
|
||||
),
|
||||
);
|
||||
continue 'outer;
|
||||
}
|
||||
Err(e) => {
|
||||
self.errored = true;
|
||||
Ready(Some(Err(e)))
|
||||
}
|
||||
}
|
||||
}
|
||||
Pending => Pending,
|
||||
}
|
||||
} else if let Some(fut) = &mut self.read_cache_fut {
|
||||
match fut.poll_unpin(cx) {
|
||||
Ready(item) => {
|
||||
self.read_cache_fut = None;
|
||||
match item {
|
||||
Ok(item) => {
|
||||
self.data_complete = true;
|
||||
self.range_complete_observed = true;
|
||||
Ready(Some(Ok(item)))
|
||||
}
|
||||
Err(e) => {
|
||||
self.errored = true;
|
||||
Ready(Some(Err(e)))
|
||||
}
|
||||
}
|
||||
}
|
||||
Pending => Pending,
|
||||
}
|
||||
} else if let Some(fut) = self.write_fut.take() {
|
||||
Self::poll_write_fut(&mut self, fut, cx)
|
||||
} else if let Some(fut) = self.read_cache_fut.take() {
|
||||
Self::poll_read_cache_fut(&mut self, fut, cx)
|
||||
} else if self.range_complete_emitted {
|
||||
self.completed = true;
|
||||
Ready(None)
|
||||
} else if self.data_complete {
|
||||
if self.cache_written {
|
||||
if self.range_complete_observed {
|
||||
self.range_complete_emitted = true;
|
||||
let item = RangeCompletableItem::RangeComplete;
|
||||
Ready(Some(Ok(StreamItem::DataItem(item))))
|
||||
} else {
|
||||
self.completed = true;
|
||||
Ready(None)
|
||||
}
|
||||
} else if self.read_from_cache {
|
||||
self.cache_written = true;
|
||||
continue 'outer;
|
||||
} else {
|
||||
match self.query.cache_usage() {
|
||||
CacheUsage::Use | CacheUsage::Recreate => {
|
||||
if let Some(values) = self.values.take() {
|
||||
let msg = format!(
|
||||
"write cache file query: {:?} bin count: {}",
|
||||
self.query.patch(),
|
||||
values.len(),
|
||||
);
|
||||
self.streamlog.append(Level::INFO, msg);
|
||||
let fut = write_pb_cache_min_max_avg_scalar(
|
||||
values,
|
||||
self.query.patch().clone(),
|
||||
self.query.agg_kind().clone(),
|
||||
self.query.channel().clone(),
|
||||
self.node_config.clone(),
|
||||
);
|
||||
self.write_fut = Some(Box::pin(fut));
|
||||
continue 'outer;
|
||||
} else {
|
||||
warn!("no values to write to cache");
|
||||
continue 'outer;
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
self.cache_written = true;
|
||||
continue 'outer;
|
||||
}
|
||||
}
|
||||
}
|
||||
} else if let Some(fut) = self.fut2.as_mut() {
|
||||
match fut.poll_next_unpin(cx) {
|
||||
Ready(Some(k)) => match k {
|
||||
Ok(item) => match item {
|
||||
StreamItem::Log(item) => Ready(Some(Ok(StreamItem::Log(item)))),
|
||||
StreamItem::Stats(item) => Ready(Some(Ok(StreamItem::Stats(item)))),
|
||||
StreamItem::DataItem(item) => match item {
|
||||
RangeCompletableItem::RangeComplete => {
|
||||
self.range_complete_observed = true;
|
||||
continue 'outer;
|
||||
}
|
||||
RangeCompletableItem::Data(item) => {
|
||||
if let Some(values) = &mut self.values {
|
||||
values.append(&item);
|
||||
} else {
|
||||
let mut values = item.empty_like_self();
|
||||
values.append(&item);
|
||||
self.values = Some(values);
|
||||
}
|
||||
Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(item)))))
|
||||
}
|
||||
},
|
||||
},
|
||||
Err(e) => {
|
||||
self.errored = true;
|
||||
Ready(Some(Err(e)))
|
||||
}
|
||||
},
|
||||
Ready(None) => {
|
||||
self.data_complete = true;
|
||||
continue 'outer;
|
||||
}
|
||||
Pending => Pending,
|
||||
}
|
||||
Self::handle_data_complete(&mut self)
|
||||
} else if let Some(fut) = self.fut2.take() {
|
||||
Self::poll_fut2(&mut self, fut, cx)
|
||||
} else if let Some(fut) = self.open_check_local_file.take() {
|
||||
let res = Self::poll_open_check_local_file(&mut self, fut, cx);
|
||||
res
|
||||
Self::poll_open_check_local_file(&mut self, fut, cx)
|
||||
} else {
|
||||
let cfd = CacheFileDesc::new(
|
||||
self.query.channel().clone(),
|
||||
|
||||
Reference in New Issue
Block a user