From f183f0bb2849796ae6799a104f41d0bb9c67442a Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Thu, 16 Jun 2022 19:24:55 +0200 Subject: [PATCH] Refactor and prepare to remove MinMaxAvgWaveBins --- dbconn/Cargo.toml | 1 + dbconn/src/bincache.rs | 9 ++++ dbconn/src/dbconn.rs | 1 + disk/src/binned/pbv.rs | 95 +++++++++++++++++++++------------- items/src/xbinnedwaveevents.rs | 19 ++++--- taskrun/src/taskrun.rs | 10 +--- 6 files changed, 79 insertions(+), 56 deletions(-) create mode 100644 dbconn/src/bincache.rs diff --git a/dbconn/Cargo.toml b/dbconn/Cargo.toml index 6d6c810..5606f12 100644 --- a/dbconn/Cargo.toml +++ b/dbconn/Cargo.toml @@ -30,3 +30,4 @@ err = { path = "../err" } netpod = { path = "../netpod" } parse = { path = "../parse" } taskrun = { path = "../taskrun" } +items = { path = "../items" } diff --git a/dbconn/src/bincache.rs b/dbconn/src/bincache.rs new file mode 100644 index 0000000..542604e --- /dev/null +++ b/dbconn/src/bincache.rs @@ -0,0 +1,9 @@ +use err::Error; +use scylla::Session as ScySession; + +pub async fn search_channel_scylla(_scy: &ScySession) -> Result<(), Error> +where + BINC: Clone, +{ + todo!() +} diff --git a/dbconn/src/dbconn.rs b/dbconn/src/dbconn.rs index b0a6c32..21ca370 100644 --- a/dbconn/src/dbconn.rs +++ b/dbconn/src/dbconn.rs @@ -1,4 +1,5 @@ pub mod scan; +pub mod bincache; pub mod search; pub mod pg { pub use tokio_postgres::{Client, Error}; diff --git a/disk/src/binned/pbv.rs b/disk/src/binned/pbv.rs index e04f68a..e47504a 100644 --- a/disk/src/binned/pbv.rs +++ b/disk/src/binned/pbv.rs @@ -40,7 +40,8 @@ where agg_kind: AggKind, node_config: NodeConfigCached, open_check_local_file: Option> + Send>>>, - fut2: Option::TimeBinOutput>> + Send>>>, + stream_from_other_inputs: + Option::TimeBinOutput>> + Send>>>, read_from_cache: bool, cache_written: bool, data_complete: bool, @@ -77,7 +78,7 @@ where agg_kind, node_config: node_config.clone(), open_check_local_file: None, - fut2: None, + stream_from_other_inputs: None, read_from_cache: false, cache_written: false, data_complete: false, @@ -208,10 +209,10 @@ where let range = self.query.patch().patch_range(); match PreBinnedPatchRange::covering_range(range, self.query.patch().bin_count() + 1) { Ok(Some(range)) => { - self.fut2 = Some(self.setup_from_higher_res_prebinned(range)?); + self.stream_from_other_inputs = Some(self.setup_from_higher_res_prebinned(range)?); } Ok(None) => { - self.fut2 = Some(self.setup_merged_from_remotes()?); + self.stream_from_other_inputs = Some(self.setup_merged_from_remotes()?); } Err(e) => return Err(e), } @@ -223,6 +224,7 @@ where mut fut: Pin> + Send>>, cx: &mut Context, ) -> Poll::Output as TimeBinnableType>::Output>>> { + trace!("poll_write_fut"); use Poll::*; match fut.poll_unpin(cx) { Ready(item) => { @@ -268,6 +270,7 @@ where >, cx: &mut Context, ) -> Poll::Output as TimeBinnableType>::Output>>> { + trace!("poll_read_cache_fut"); use Poll::*; match fut.poll_unpin(cx) { Ready(item) => { @@ -294,8 +297,10 @@ where fn handle_data_complete( self: &mut Self, ) -> Poll::Output as TimeBinnableType>::Output>>> { + trace!("handle_data_complete"); use Poll::*; if self.cache_written { + // TODO can we ever get here? if self.range_complete_observed { self.range_complete_emitted = true; let item = RangeCompletableItem::RangeComplete; @@ -305,6 +310,7 @@ where Ready(None) } } else if self.read_from_cache { + // TODO refactor: raising cache_written even though we did not actually write is misleading. self.cache_written = true; self.all_done = true; Ready(None) @@ -333,6 +339,7 @@ where } } _ => { + // TODO refactor: raising cache_written even though we did not actually write is misleading. self.cache_written = true; self.all_done = true; Ready(None) @@ -341,7 +348,7 @@ where } } - fn poll_fut2( + fn poll_stream_from_other_inputs( self: &mut Self, mut fut: Pin< Box< @@ -358,26 +365,29 @@ where use Poll::*; match fut.poll_next_unpin(cx) { Ready(Some(k)) => match k { - Ok(item) => match item { - StreamItem::Log(item) => Ready(Some(Ok(StreamItem::Log(item)))), - StreamItem::Stats(item) => Ready(Some(Ok(StreamItem::Stats(item)))), - StreamItem::DataItem(item) => match item { - RangeCompletableItem::RangeComplete => { - self.range_complete_observed = true; - Ready(None) - } - RangeCompletableItem::Data(item) => { - if let Some(values) = &mut self.values { - values.append(&item); - } else { - let mut values = item.empty_like_self(); - values.append(&item); - self.values = Some(values); + Ok(item) => { + self.stream_from_other_inputs = Some(fut); + match item { + StreamItem::Log(item) => Ready(Some(Ok(StreamItem::Log(item)))), + StreamItem::Stats(item) => Ready(Some(Ok(StreamItem::Stats(item)))), + StreamItem::DataItem(item) => match item { + RangeCompletableItem::RangeComplete => { + self.range_complete_observed = true; + Ready(None) } - Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))))) - } - }, - }, + RangeCompletableItem::Data(item) => { + if let Some(values) = &mut self.values { + values.append(&item); + } else { + let mut values = item.empty_like_self(); + values.append(&item); + self.values = Some(values); + } + Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))))) + } + }, + } + } Err(e) => { self.errored = true; Ready(Some(Err(e))) @@ -388,7 +398,7 @@ where Ready(None) } Pending => { - self.fut2 = Some(fut); + self.stream_from_other_inputs = Some(fut); Pending } } @@ -416,7 +426,7 @@ where // TODO other error kinds io::ErrorKind::NotFound => match self.try_setup_fetch_prebinned_higher_res() { Ok(_) => { - if self.fut2.is_none() { + if self.stream_from_other_inputs.is_none() { let e = Err(Error::with_msg(format!("try_setup_fetch_prebinned_higher_res failed"))); self.errored = true; @@ -447,10 +457,16 @@ where } } } +} - fn _check_for_existing_cached_data(&mut self) -> Result<(), Error> { - todo!() - } +macro_rules! some_or_continue { + ($x:expr) => { + if let Ready(None) = $x { + continue; + } else { + $x + } + }; } impl Stream for PreBinnedValueStream @@ -468,7 +484,7 @@ where fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; - 'outer: loop { + loop { break if self.completed { panic!("PreBinnedValueStream poll_next on completed"); } else if self.errored { @@ -480,18 +496,23 @@ where } else if let Some(item) = self.streamlog.pop() { Ready(Some(Ok(StreamItem::Log(item)))) } else if let Some(fut) = self.write_fut.take() { - Self::poll_write_fut(&mut self, fut, cx) + let x = Self::poll_write_fut(&mut self, fut, cx); + some_or_continue!(x) } else if let Some(fut) = self.read_cache_fut.take() { - Self::poll_read_cache_fut(&mut self, fut, cx) + let x = Self::poll_read_cache_fut(&mut self, fut, cx); + some_or_continue!(x) } else if self.range_complete_emitted { self.completed = true; Ready(None) } else if self.data_complete { - Self::handle_data_complete(&mut self) - } else if let Some(fut) = self.fut2.take() { - Self::poll_fut2(&mut self, fut, cx) + let x = Self::handle_data_complete(&mut self); + some_or_continue!(x) + } else if let Some(fut) = self.stream_from_other_inputs.take() { + let x = Self::poll_stream_from_other_inputs(&mut self, fut, cx); + some_or_continue!(x) } else if let Some(fut) = self.open_check_local_file.take() { - Self::poll_open_check_local_file(&mut self, fut, cx) + let x = Self::poll_open_check_local_file(&mut self, fut, cx); + some_or_continue!(x) } else { let cfd = CacheFileDesc::new( self.query.channel().clone(), @@ -504,7 +525,7 @@ where }; let fut = async { OpenOptions::new().read(true).open(path).await }; self.open_check_local_file = Some(Box::pin(fut)); - continue 'outer; + continue; }; } } diff --git a/items/src/xbinnedwaveevents.rs b/items/src/xbinnedwaveevents.rs index d60b4c1..933cd3e 100644 --- a/items/src/xbinnedwaveevents.rs +++ b/items/src/xbinnedwaveevents.rs @@ -1,6 +1,4 @@ -use std::mem; - -use crate::minmaxavgwavebins::MinMaxAvgWaveBins; +use crate::minmaxavgdim1bins::MinMaxAvgDim1Bins; use crate::numops::NumOps; use crate::streams::{Collectable, Collector}; use crate::{ @@ -10,9 +8,10 @@ use crate::{ }; use err::Error; use netpod::log::*; -use netpod::timeunits::{MS, SEC}; +use netpod::timeunits::*; use netpod::NanoRange; use serde::{Deserialize, Serialize}; +use std::mem; use tokio::fs::File; // TODO rename Wave -> Dim1 @@ -177,7 +176,7 @@ impl TimeBinnableType for XBinnedWaveEvents where NTY: NumOps, { - type Output = MinMaxAvgWaveBins; + type Output = MinMaxAvgDim1Bins; type Aggregator = XBinnedWaveEventsAggregator; fn aggregator(range: NanoRange, x_bin_count: usize, do_time_weight: bool) -> Self::Aggregator { @@ -324,7 +323,7 @@ where } } - fn result_reset_unweight(&mut self, range: NanoRange, _expand: bool) -> MinMaxAvgWaveBins { + fn result_reset_unweight(&mut self, range: NanoRange, _expand: bool) -> MinMaxAvgDim1Bins { let avg = if self.sumc == 0 { None } else { @@ -332,7 +331,7 @@ where }; let min = mem::replace(&mut self.min, None); let max = mem::replace(&mut self.max, None); - let ret = MinMaxAvgWaveBins { + let ret = MinMaxAvgDim1Bins { ts1s: vec![self.range.beg], ts2s: vec![self.range.end], counts: vec![self.count], @@ -350,7 +349,7 @@ where ret } - fn result_reset_time_weight(&mut self, range: NanoRange, expand: bool) -> MinMaxAvgWaveBins { + fn result_reset_time_weight(&mut self, range: NanoRange, expand: bool) -> MinMaxAvgDim1Bins { // TODO check callsite for correct expand status. if true || expand { self.apply_event_time_weight(self.range.end); @@ -363,7 +362,7 @@ where }; let min = mem::replace(&mut self.min, None); let max = mem::replace(&mut self.max, None); - let ret = MinMaxAvgWaveBins { + let ret = MinMaxAvgDim1Bins { ts1s: vec![self.range.beg], ts2s: vec![self.range.end], counts: vec![self.count], @@ -387,7 +386,7 @@ where NTY: NumOps, { type Input = XBinnedWaveEvents; - type Output = MinMaxAvgWaveBins; + type Output = MinMaxAvgDim1Bins; fn range(&self) -> &NanoRange { &self.range diff --git a/taskrun/src/taskrun.rs b/taskrun/src/taskrun.rs index d18011c..502cf4f 100644 --- a/taskrun/src/taskrun.rs +++ b/taskrun/src/taskrun.rs @@ -104,15 +104,7 @@ pub fn tracing_init() { //"tokio=trace", //"runtime=trace", "info", - "archapp::archeng=info", - "archapp::archeng::datablockstream=info", - "archapp::archeng::indextree=info", - "archapp::archeng::blockrefstream=info", - "archapp::archeng::blockstream=info", - "archapp::archeng::ringbuf=info", - "archapp::archeng::backreadbuf=info", - "archapp::archeng::pipe=debug", - "archapp::storagemerge=info", + "disk::binned::pbv=trace", "[log_span_d]=debug", "[log_span_t]=trace", ]