From 8090088448c49ce756a8d6c380b2273981de7d2b Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Wed, 15 Jun 2022 14:56:26 +0200 Subject: [PATCH] Move match cases to functions --- disk/src/binned/pbv.rs | 316 +++++++++++++++++++++++++---------------- 1 file changed, 193 insertions(+), 123 deletions(-) diff --git a/disk/src/binned/pbv.rs b/disk/src/binned/pbv.rs index 9bc8c14..e04f68a 100644 --- a/disk/src/binned/pbv.rs +++ b/disk/src/binned/pbv.rs @@ -47,6 +47,7 @@ where range_complete_observed: bool, range_complete_emitted: bool, errored: bool, + all_done: bool, completed: bool, streamlog: Streamlog, values: Option<::TimeBinOutput>, @@ -83,6 +84,7 @@ where range_complete_observed: false, range_complete_emitted: false, errored: false, + all_done: false, completed: false, streamlog: Streamlog::new(node_config.ix as u32), // TODO use alias via some trait associated type: @@ -216,6 +218,182 @@ where Ok(()) } + fn poll_write_fut( + self: &mut Self, + mut fut: Pin> + Send>>, + cx: &mut Context, + ) -> Poll::Output as TimeBinnableType>::Output>>> { + use Poll::*; + match fut.poll_unpin(cx) { + Ready(item) => { + self.cache_written = true; + self.write_fut = None; + match item { + Ok(res) => { + self.streamlog.append( + Level::INFO, + format!( + "cache file written bytes: {} duration {} ms", + res.bytes, + res.duration.as_millis() + ), + ); + self.all_done = true; + Ready(None) + } + Err(e) => { + self.errored = true; + Ready(Some(Err(e))) + } + } + } + Pending => { + self.write_fut = Some(fut); + Pending + } + } + } + + fn poll_read_cache_fut( + self: &mut Self, + mut fut: Pin< + Box< + dyn Future< + Output = Result< + StreamItem::TimeBinOutput>>, + Error, + >, + > + Send, + >, + >, + cx: &mut Context, + ) -> Poll::Output as TimeBinnableType>::Output>>> { + use Poll::*; + match fut.poll_unpin(cx) { + Ready(item) => { + self.read_cache_fut = None; + match item { + Ok(item) => { + self.data_complete = true; + self.range_complete_observed = true; + Ready(Some(Ok(item))) + } + Err(e) => { + self.errored = true; + Ready(Some(Err(e))) + } + } + } + Pending => { + self.read_cache_fut = Some(fut); + Pending + } + } + } + + fn handle_data_complete( + self: &mut Self, + ) -> Poll::Output as TimeBinnableType>::Output>>> { + use Poll::*; + if self.cache_written { + if self.range_complete_observed { + self.range_complete_emitted = true; + let item = RangeCompletableItem::RangeComplete; + Ready(Some(Ok(StreamItem::DataItem(item)))) + } else { + self.all_done = true; + Ready(None) + } + } else if self.read_from_cache { + self.cache_written = true; + self.all_done = true; + Ready(None) + } else { + match self.query.cache_usage() { + CacheUsage::Use | CacheUsage::Recreate => { + if let Some(values) = self.values.take() { + let msg = format!( + "write cache file query: {:?} bin count: {}", + self.query.patch(), + values.len(), + ); + self.streamlog.append(Level::INFO, msg); + let fut = write_pb_cache_min_max_avg_scalar( + values, + self.query.patch().clone(), + self.query.agg_kind().clone(), + self.query.channel().clone(), + self.node_config.clone(), + ); + self.write_fut = Some(Box::pin(fut)); + Ready(None) + } else { + warn!("no values to write to cache"); + Ready(None) + } + } + _ => { + self.cache_written = true; + self.all_done = true; + Ready(None) + } + } + } + } + + fn poll_fut2( + self: &mut Self, + mut fut: Pin< + Box< + dyn Stream< + Item = Result< + StreamItem::TimeBinOutput>>, + Error, + >, + > + Send, + >, + >, + cx: &mut Context, + ) -> Poll::Output as TimeBinnableType>::Output>>> { + use Poll::*; + match fut.poll_next_unpin(cx) { + Ready(Some(k)) => match k { + Ok(item) => match item { + StreamItem::Log(item) => Ready(Some(Ok(StreamItem::Log(item)))), + StreamItem::Stats(item) => Ready(Some(Ok(StreamItem::Stats(item)))), + StreamItem::DataItem(item) => match item { + RangeCompletableItem::RangeComplete => { + self.range_complete_observed = true; + Ready(None) + } + RangeCompletableItem::Data(item) => { + if let Some(values) = &mut self.values { + values.append(&item); + } else { + let mut values = item.empty_like_self(); + values.append(&item); + self.values = Some(values); + } + Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))))) + } + }, + }, + Err(e) => { + self.errored = true; + Ready(Some(Err(e))) + } + }, + Ready(None) => { + self.data_complete = true; + Ready(None) + } + Pending => { + self.fut2 = Some(fut); + Pending + } + } + } + fn poll_open_check_local_file( self: &mut Self, mut fut: Pin> + Send>>, @@ -224,7 +402,6 @@ where use Poll::*; match fut.poll_unpin(cx) { Ready(item) => { - self.open_check_local_file = None; match item { Ok(file) => { self.read_from_cache = true; @@ -264,7 +441,10 @@ where }, } } - Pending => Pending, + Pending => { + self.open_check_local_file = Some(fut); + Pending + } } } @@ -294,134 +474,24 @@ where } else if self.errored { self.completed = true; Ready(None) + } else if self.all_done { + self.completed = true; + Ready(None) } else if let Some(item) = self.streamlog.pop() { Ready(Some(Ok(StreamItem::Log(item)))) - } else if let Some(fut) = &mut self.write_fut { - match fut.poll_unpin(cx) { - Ready(item) => { - self.cache_written = true; - self.write_fut = None; - match item { - Ok(res) => { - self.streamlog.append( - Level::INFO, - format!( - "cache file written bytes: {} duration {} ms", - res.bytes, - res.duration.as_millis() - ), - ); - continue 'outer; - } - Err(e) => { - self.errored = true; - Ready(Some(Err(e))) - } - } - } - Pending => Pending, - } - } else if let Some(fut) = &mut self.read_cache_fut { - match fut.poll_unpin(cx) { - Ready(item) => { - self.read_cache_fut = None; - match item { - Ok(item) => { - self.data_complete = true; - self.range_complete_observed = true; - Ready(Some(Ok(item))) - } - Err(e) => { - self.errored = true; - Ready(Some(Err(e))) - } - } - } - Pending => Pending, - } + } else if let Some(fut) = self.write_fut.take() { + Self::poll_write_fut(&mut self, fut, cx) + } else if let Some(fut) = self.read_cache_fut.take() { + Self::poll_read_cache_fut(&mut self, fut, cx) } else if self.range_complete_emitted { self.completed = true; Ready(None) } else if self.data_complete { - if self.cache_written { - if self.range_complete_observed { - self.range_complete_emitted = true; - let item = RangeCompletableItem::RangeComplete; - Ready(Some(Ok(StreamItem::DataItem(item)))) - } else { - self.completed = true; - Ready(None) - } - } else if self.read_from_cache { - self.cache_written = true; - continue 'outer; - } else { - match self.query.cache_usage() { - CacheUsage::Use | CacheUsage::Recreate => { - if let Some(values) = self.values.take() { - let msg = format!( - "write cache file query: {:?} bin count: {}", - self.query.patch(), - values.len(), - ); - self.streamlog.append(Level::INFO, msg); - let fut = write_pb_cache_min_max_avg_scalar( - values, - self.query.patch().clone(), - self.query.agg_kind().clone(), - self.query.channel().clone(), - self.node_config.clone(), - ); - self.write_fut = Some(Box::pin(fut)); - continue 'outer; - } else { - warn!("no values to write to cache"); - continue 'outer; - } - } - _ => { - self.cache_written = true; - continue 'outer; - } - } - } - } else if let Some(fut) = self.fut2.as_mut() { - match fut.poll_next_unpin(cx) { - Ready(Some(k)) => match k { - Ok(item) => match item { - StreamItem::Log(item) => Ready(Some(Ok(StreamItem::Log(item)))), - StreamItem::Stats(item) => Ready(Some(Ok(StreamItem::Stats(item)))), - StreamItem::DataItem(item) => match item { - RangeCompletableItem::RangeComplete => { - self.range_complete_observed = true; - continue 'outer; - } - RangeCompletableItem::Data(item) => { - if let Some(values) = &mut self.values { - values.append(&item); - } else { - let mut values = item.empty_like_self(); - values.append(&item); - self.values = Some(values); - } - Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))))) - } - }, - }, - Err(e) => { - self.errored = true; - Ready(Some(Err(e))) - } - }, - Ready(None) => { - self.data_complete = true; - continue 'outer; - } - Pending => Pending, - } + Self::handle_data_complete(&mut self) + } else if let Some(fut) = self.fut2.take() { + Self::poll_fut2(&mut self, fut, cx) } else if let Some(fut) = self.open_check_local_file.take() { - let res = Self::poll_open_check_local_file(&mut self, fut, cx); - res + Self::poll_open_check_local_file(&mut self, fut, cx) } else { let cfd = CacheFileDesc::new( self.query.channel().clone(),