From 0662a6b494abbf2a6b6adde242d3970f1f4e0526 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Thu, 24 Oct 2024 10:01:08 +0200 Subject: [PATCH] Remove dead code --- crates/disk/src/binned/binnedfrompbv.rs | 264 ------------ crates/disk/src/binned/dim1.rs | 1 - crates/disk/src/binned/pbv.rs | 537 ------------------------ crates/disk/src/binned/prebinned.rs | 260 ------------ 4 files changed, 1062 deletions(-) delete mode 100644 crates/disk/src/binned/binnedfrompbv.rs delete mode 100644 crates/disk/src/binned/dim1.rs delete mode 100644 crates/disk/src/binned/pbv.rs delete mode 100644 crates/disk/src/binned/prebinned.rs diff --git a/crates/disk/src/binned/binnedfrompbv.rs b/crates/disk/src/binned/binnedfrompbv.rs deleted file mode 100644 index c39eea9..0000000 --- a/crates/disk/src/binned/binnedfrompbv.rs +++ /dev/null @@ -1,264 +0,0 @@ -use crate::agg::binnedt::TBinnerStream; -use crate::binned::query::PreBinnedQuery; -use crate::cache::node_ix_for_patch; -use err::Error; -use futures_core::Stream; -use futures_util::{FutureExt, StreamExt}; -use http::{StatusCode, Uri}; -use httpclient::HttpBodyAsAsyncRead; -use items::frame::decode_frame; -use items::{FrameDecodable, FrameType, FrameTypeInnerStatic, TimeBinnableType}; -use items::{RangeCompletableItem, Sitemty, StreamItem}; -use netpod::log::*; -use netpod::query::CacheUsage; -use netpod::x_bin_count; -use netpod::PreBinnedPatchIterator; -use netpod::{AggKind, AppendToUrl, BinnedRange, ByteSize, Channel, NodeConfigCached, PerfOpts, ScalarType, Shape}; -use std::future::ready; -use std::marker::PhantomData; -use std::pin::Pin; -use std::str::FromStr; -use std::task::{Context, Poll}; -use streams::frames::inmem::InMemoryFrameAsyncReadStream; -use url::Url; - -pub struct FetchedPreBinned { - uri: Uri, - resfut: Option, - res: Option>, - errored: bool, - completed: bool, - _m1: PhantomData, -} - -impl FetchedPreBinned { - pub fn new(query: &PreBinnedQuery, host: String, port: u16) -> Result - where - TBT: FrameTypeInnerStatic + TimeBinnableType, - Sitemty: FrameDecodable, - { - // TODO should not assume http: - let mut url = Url::parse(&format!("http://{host}:{port}/api/4/prebinned"))?; - query.append_to_url(&mut url); - let ret = Self { - uri: Uri::from_str(&url.to_string()).map_err(Error::from_string)?, - resfut: None, - res: None, - errored: false, - completed: false, - _m1: PhantomData, - }; - Ok(ret) - } -} - -impl Stream for FetchedPreBinned -where - TBT: FrameTypeInnerStatic + TimeBinnableType, - Sitemty: FrameDecodable, -{ - type Item = Sitemty; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - use Poll::*; - 'outer: loop { - break if self.completed { - panic!("poll_next on completed"); - } else if self.errored { - self.completed = true; - return Ready(None); - } else if let Some(res) = self.res.as_mut() { - match res.poll_next_unpin(cx) { - Ready(Some(Ok(item))) => match item { - StreamItem::Log(item) => Ready(Some(Ok(StreamItem::Log(item)))), - StreamItem::Stats(item) => Ready(Some(Ok(StreamItem::Stats(item)))), - StreamItem::DataItem(item) => match decode_frame::>(&item) { - Ok(Ok(item)) => Ready(Some(Ok(item))), - Ok(Err(e)) => { - self.errored = true; - Ready(Some(Err(e))) - } - Err(e) => { - self.errored = true; - Ready(Some(Err(e))) - } - }, - }, - Ready(Some(Err(e))) => { - self.errored = true; - Ready(Some(Err(e))) - } - Ready(None) => { - self.completed = true; - Ready(None) - } - Pending => Pending, - } - } else if let Some(resfut) = self.resfut.as_mut() { - match resfut.poll_unpin(cx) { - Ready(res) => match res { - Ok(res) => { - if res.status() == StatusCode::OK { - let perf_opts = PerfOpts { inmem_bufcap: 512 }; - let s1 = HttpBodyAsAsyncRead::new(res); - let s2 = InMemoryFrameAsyncReadStream::new(s1, perf_opts.inmem_bufcap); - self.res = Some(s2); - continue 'outer; - } else { - let msg = - format!("PreBinnedValueFetchedStream non-OK result from sub request: {res:?}"); - error!("{msg}"); - let e = Error::with_msg_no_trace(msg); - self.errored = true; - Ready(Some(Err(e))) - } - } - Err(e) => { - error!("PreBinnedValueStream error in stream {e:?}"); - self.errored = true; - Ready(Some(Err(Error::from_string(e)))) - } - }, - Pending => Pending, - } - } else { - match hyper::Request::builder() - .method(http::Method::GET) - .uri(&self.uri) - .body(hyper::Body::empty()) - { - Ok(req) => { - let client = hyper::Client::new(); - self.resfut = Some(client.request(req)); - continue 'outer; - } - Err(e) => { - self.errored = true; - Ready(Some(Err(Error::from_string(e)))) - } - } - }; - } - } -} - -/// Generate bins from a range of pre-binned patches. -/// -/// Takes an iterator over the necessary patches. -pub struct BinnedFromPreBinned -where - TBT: TimeBinnableType, -{ - // TODO get rid of box: - inp: Pin> + Send>>, - _m1: PhantomData, -} - -impl BinnedFromPreBinned -where - TBT: TimeBinnableType + Unpin + 'static, - Sitemty: FrameType + FrameDecodable, -{ - pub fn new( - patch_it: PreBinnedPatchIterator, - channel: Channel, - range: BinnedRange, - scalar_type: ScalarType, - shape: Shape, - agg_kind: AggKind, - cache_usage: CacheUsage, - disk_io_buffer_size: usize, - node_config: &NodeConfigCached, - disk_stats_every: ByteSize, - report_error: bool, - ) -> Result { - let patches: Vec<_> = patch_it.collect(); - let mut sp = String::new(); - if false { - // Convert this to a StreamLog message: - for (i, p) in patches.iter().enumerate() { - use std::fmt::Write; - write!(sp, " • patch {i:2} {p:?}\n")?; - } - info!("Using these pre-binned patches:\n{sp}"); - } - let pmax = patches.len(); - let inp = futures_util::stream::iter(patches.into_iter().enumerate()) - .map({ - let shape = shape.clone(); - let agg_kind = agg_kind.clone(); - let node_config = node_config.clone(); - move |(pix, patch)| { - let query = PreBinnedQuery::new( - patch, - channel.clone(), - scalar_type.clone(), - shape.clone(), - agg_kind.clone(), - cache_usage.clone(), - disk_io_buffer_size, - disk_stats_every.clone(), - report_error, - ); - let nodeix = node_ix_for_patch(&query.patch(), &query.channel(), &node_config.node_config.cluster); - let node = &node_config.node_config.cluster.nodes[nodeix as usize]; - let ret: Pin + Send>> = - match FetchedPreBinned::::new(&query, node.host.clone(), node.port.clone()) { - Ok(stream) => Box::pin(stream.map(move |q| (pix, q))), - Err(e) => { - error!("error from PreBinnedValueFetchedStream::new {e:?}"); - Box::pin(futures_util::stream::iter(vec![(pix, Err(e))])) - } - }; - ret - } - }) - .flatten() - .filter_map({ - let range = range.clone(); - move |(pix, k)| { - let fit_range = range.full_range(); - let g = match k { - Ok(item) => match item { - StreamItem::Log(item) => Some(Ok(StreamItem::Log(item))), - StreamItem::Stats(item) => Some(Ok(StreamItem::Stats(item))), - StreamItem::DataItem(item) => match item { - RangeCompletableItem::RangeComplete => { - if pix + 1 == pmax { - Some(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete))) - } else { - None - } - } - RangeCompletableItem::Data(item) => { - match crate::binned::FilterFittingInside::filter_fitting_inside(item, fit_range) { - Some(item) => Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(item)))), - None => None, - } - } - }, - }, - Err(e) => Some(Err(e)), - }; - ready(g) - } - }); - let inp = TBinnerStream::<_, TBT>::new(inp, range, x_bin_count(&shape, &agg_kind), agg_kind.do_time_weighted()); - Ok(Self { - inp: Box::pin(inp), - _m1: PhantomData, - }) - } -} - -impl Stream for BinnedFromPreBinned -where - TBT: TimeBinnableType + Unpin + 'static, - Sitemty: FrameType + FrameDecodable, -{ - type Item = Sitemty; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - self.inp.poll_next_unpin(cx) - } -} diff --git a/crates/disk/src/binned/dim1.rs b/crates/disk/src/binned/dim1.rs deleted file mode 100644 index 8b13789..0000000 --- a/crates/disk/src/binned/dim1.rs +++ /dev/null @@ -1 +0,0 @@ - diff --git a/crates/disk/src/binned/pbv.rs b/crates/disk/src/binned/pbv.rs deleted file mode 100644 index a635f42..0000000 --- a/crates/disk/src/binned/pbv.rs +++ /dev/null @@ -1,537 +0,0 @@ -use crate::agg::binnedt::TBinnerStream; -use crate::binned::binnedfrompbv::FetchedPreBinned; -use crate::binned::query::PreBinnedQuery; -use crate::binned::WithLen; -use crate::cache::{write_pb_cache_min_max_avg_scalar, CacheFileDesc, WrittenPbCache}; -use crate::decode::{Endianness, EventValueFromBytes, EventValueShape, NumFromBytes}; -use crate::merge::mergedfromremotes::MergedFromRemotes; -use crate::streamlog::Streamlog; -use err::Error; -use futures_core::Stream; -use futures_util::{FutureExt, StreamExt}; -use items::numops::NumOps; -use items::{ - Appendable, Clearable, EventsNodeProcessor, EventsTypeAliases, FrameDecodable, FrameType, PushableIndex, - RangeCompletableItem, ReadableFromFile, Sitemty, StreamItem, TimeBinnableType, -}; -use netpod::log::*; -use netpod::query::{CacheUsage, RawEventsQuery}; -use netpod::x_bin_count; -use netpod::{AggKind, BinnedRange, PreBinnedPatchIterator, PreBinnedPatchRange}; -use netpod::{NodeConfigCached, PerfOpts}; -use serde::Serialize; -use std::future::Future; -use std::io; -use std::marker::PhantomData; -use std::path::PathBuf; -use std::pin::Pin; -use std::task::{Context, Poll}; -use tokio::fs::{File, OpenOptions}; - -pub struct PreBinnedValueStream -where - NTY: NumOps + NumFromBytes + Serialize + 'static, - END: Endianness + 'static, - EVS: EventValueShape + EventValueFromBytes + 'static, - ENP: EventsNodeProcessor>::Batch>, -{ - query: PreBinnedQuery, - agg_kind: AggKind, - node_config: NodeConfigCached, - open_check_local_file: Option> + Send>>>, - stream_from_other_inputs: - Option::TimeBinOutput>> + Send>>>, - read_from_cache: bool, - cache_written: bool, - data_complete: bool, - range_complete_observed: bool, - range_complete_emitted: bool, - errored: bool, - all_done: bool, - completed: bool, - streamlog: Streamlog, - values: Option<::TimeBinOutput>, - write_fut: Option> + Send>>>, - read_cache_fut: Option::TimeBinOutput>> + Send>>>, - _m1: PhantomData, - _m2: PhantomData, - _m3: PhantomData, - _m4: PhantomData, -} - -impl PreBinnedValueStream -where - NTY: NumOps + NumFromBytes + Serialize + 'static, - END: Endianness + 'static, - EVS: EventValueShape + EventValueFromBytes + 'static, - ENP: EventsNodeProcessor>::Batch> + 'static, - ::Output: PushableIndex + Appendable + Clearable, - // TODO is this needed: - Sitemty<::Output>: FrameType, - // TODO who exactly needs this DeserializeOwned? - Sitemty<<::Output as TimeBinnableType>::Output>: FrameType + FrameDecodable, -{ - pub fn new(query: PreBinnedQuery, agg_kind: AggKind, node_config: &NodeConfigCached) -> Self { - Self { - query, - agg_kind, - node_config: node_config.clone(), - open_check_local_file: None, - stream_from_other_inputs: None, - read_from_cache: false, - cache_written: false, - data_complete: false, - range_complete_observed: false, - range_complete_emitted: false, - errored: false, - all_done: false, - completed: false, - streamlog: Streamlog::new(node_config.ix as u32), - // TODO use alias via some trait associated type: - //values: <<::Output as TimeBinnableType>::Output as Appendable>::empty(), - values: None, - write_fut: None, - read_cache_fut: None, - _m1: PhantomData, - _m2: PhantomData, - _m3: PhantomData, - _m4: PhantomData, - } - } - - fn setup_merged_from_remotes( - &mut self, - ) -> Result< - Pin::Output as TimeBinnableType>::Output>> + Send>>, - Error, - > { - // TODO let PreBinnedQuery provide the tune and pass to RawEventsQuery: - let evq = RawEventsQuery::new( - self.query.channel().clone(), - self.query.patch().patch_range(), - self.query.agg_kind().clone(), - ); - if self.query.patch().patch_t_len() % self.query.patch().bin_t_len() != 0 { - let msg = format!( - "Patch length inconsistency {} {}", - self.query.patch().patch_t_len(), - self.query.patch().bin_t_len() - ); - error!("{}", msg); - return Err(Error::with_msg(msg)); - } - // TODO do I need to set up more transformations or binning to deliver the requested data? - let count = self.query.patch().patch_t_len() / self.query.patch().bin_t_len(); - let range = BinnedRange::covering_range(evq.range.clone(), count as u32)?; - let perf_opts = PerfOpts { inmem_bufcap: 512 }; - let s = MergedFromRemotes::::new(evq, perf_opts, self.node_config.node_config.cluster.clone()); - let ret = TBinnerStream::<_, ::Output>::new( - s, - range, - x_bin_count(&self.query.shape().clone(), &self.agg_kind), - self.agg_kind.do_time_weighted(), - ); - Ok(Box::pin(ret)) - } - - fn setup_from_higher_res_prebinned( - &mut self, - range: PreBinnedPatchRange, - ) -> Result< - Pin::Output as TimeBinnableType>::Output>> + Send>>, - Error, - > { - let g = self.query.patch().bin_t_len(); - let h = range.grid_spec.bin_t_len(); - trace!( - "try_setup_fetch_prebinned_higher_res found g {} h {} ratio {} mod {} {:?}", - g, - h, - g / h, - g % h, - range, - ); - if g / h <= 1 { - let msg = format!("try_setup_fetch_prebinned_higher_res g {} h {}", g, h); - return Err(Error::with_msg(msg)); - } - if g / h > 1024 * 10 { - let msg = format!("try_setup_fetch_prebinned_higher_res g {} h {}", g, h); - return Err(Error::with_msg(msg)); - } - if g % h != 0 { - let msg = format!("try_setup_fetch_prebinned_higher_res g {} h {}", g, h); - return Err(Error::with_msg(msg)); - } - let node_config = self.node_config.clone(); - let patch_it = PreBinnedPatchIterator::from_range(range); - let s = futures_util::stream::iter(patch_it) - .map({ - let q2 = self.query.clone(); - let disk_io_buffer_size = self.query.disk_io_buffer_size(); - let disk_stats_every = self.query.disk_stats_every().clone(); - let report_error = self.query.report_error(); - move |patch| { - let query = PreBinnedQuery::new( - patch, - q2.channel().clone(), - q2.scalar_type().clone(), - q2.shape().clone(), - q2.agg_kind().clone(), - q2.cache_usage().clone(), - disk_io_buffer_size, - disk_stats_every.clone(), - report_error, - ); - let nodeix = crate::cache::node_ix_for_patch( - &query.patch(), - &query.channel(), - &node_config.node_config.cluster, - ); - let node = &node_config.node_config.cluster.nodes[nodeix as usize]; - let ret = - FetchedPreBinned::<<::Output as TimeBinnableType>::Output>::new( - &query, - node.host.clone(), - node.port.clone(), - )?; - Ok(ret) - } - }) - .map(|k| { - let s: Pin + Send>> = match k { - Ok(k) => Box::pin(k), - Err(e) => Box::pin(futures_util::stream::iter(vec![Err(e)])), - }; - s - }) - .flatten(); - Ok(Box::pin(s)) - } - - fn try_setup_fetch_prebinned_higher_res(&mut self) -> Result<(), Error> { - info!("try_setup_fetch_prebinned_higher_res"); - let range = self.query.patch().patch_range(); - match PreBinnedPatchRange::covering_range(range, self.query.patch().bin_count() + 1) { - Ok(Some(range)) => { - self.stream_from_other_inputs = Some(self.setup_from_higher_res_prebinned(range)?); - } - Ok(None) => { - self.stream_from_other_inputs = Some(self.setup_merged_from_remotes()?); - } - Err(e) => return Err(e), - } - Ok(()) - } - - fn poll_write_fut( - self: &mut Self, - mut fut: Pin> + Send>>, - cx: &mut Context, - ) -> Poll::Output as TimeBinnableType>::Output>>> { - trace!("poll_write_fut"); - use Poll::*; - match fut.poll_unpin(cx) { - Ready(item) => { - self.cache_written = true; - self.write_fut = None; - match item { - Ok(res) => { - self.streamlog.append( - Level::INFO, - format!( - "cache file written bytes: {} duration {} ms", - res.bytes, - res.duration.as_millis() - ), - ); - self.all_done = true; - Ready(None) - } - Err(e) => { - self.errored = true; - Ready(Some(Err(e))) - } - } - } - Pending => { - self.write_fut = Some(fut); - Pending - } - } - } - - fn poll_read_cache_fut( - self: &mut Self, - mut fut: Pin< - Box< - dyn Future< - Output = Result< - StreamItem::TimeBinOutput>>, - Error, - >, - > + Send, - >, - >, - cx: &mut Context, - ) -> Poll::Output as TimeBinnableType>::Output>>> { - trace!("poll_read_cache_fut"); - use Poll::*; - match fut.poll_unpin(cx) { - Ready(item) => { - self.read_cache_fut = None; - match item { - Ok(item) => { - self.data_complete = true; - self.range_complete_observed = true; - Ready(Some(Ok(item))) - } - Err(e) => { - self.errored = true; - Ready(Some(Err(e))) - } - } - } - Pending => { - self.read_cache_fut = Some(fut); - Pending - } - } - } - - fn handle_data_complete( - self: &mut Self, - ) -> Poll::Output as TimeBinnableType>::Output>>> { - trace!("handle_data_complete"); - use Poll::*; - if self.cache_written { - // TODO can we ever get here? - if self.range_complete_observed { - self.range_complete_emitted = true; - let item = RangeCompletableItem::RangeComplete; - Ready(Some(Ok(StreamItem::DataItem(item)))) - } else { - self.all_done = true; - Ready(None) - } - } else if self.read_from_cache { - // TODO refactor: raising cache_written even though we did not actually write is misleading. - self.cache_written = true; - self.all_done = true; - Ready(None) - } else { - match self.query.cache_usage() { - CacheUsage::Use | CacheUsage::Recreate => { - if let Some(values) = self.values.take() { - let msg = format!( - "write cache file query: {:?} bin count: {}", - self.query.patch(), - values.len(), - ); - self.streamlog.append(Level::INFO, msg); - let fut = write_pb_cache_min_max_avg_scalar( - values, - self.query.patch().clone(), - self.query.agg_kind().clone(), - self.query.channel().clone(), - self.node_config.clone(), - ); - self.write_fut = Some(Box::pin(fut)); - Ready(None) - } else { - warn!("no values to write to cache"); - Ready(None) - } - } - _ => { - // TODO refactor: raising cache_written even though we did not actually write is misleading. - self.cache_written = true; - self.all_done = true; - Ready(None) - } - } - } - } - - fn poll_stream_from_other_inputs( - self: &mut Self, - mut fut: Pin< - Box< - dyn Stream< - Item = Result< - StreamItem::TimeBinOutput>>, - Error, - >, - > + Send, - >, - >, - cx: &mut Context, - ) -> Poll::Output as TimeBinnableType>::Output>>> { - use Poll::*; - match fut.poll_next_unpin(cx) { - Ready(Some(k)) => match k { - Ok(item) => { - self.stream_from_other_inputs = Some(fut); - match item { - StreamItem::Log(item) => Ready(Some(Ok(StreamItem::Log(item)))), - StreamItem::Stats(item) => Ready(Some(Ok(StreamItem::Stats(item)))), - StreamItem::DataItem(item) => match item { - RangeCompletableItem::RangeComplete => { - self.range_complete_observed = true; - Ready(None) - } - RangeCompletableItem::Data(item) => { - if let Some(values) = &mut self.values { - values.append(&item); - } else { - let mut values = item.empty_like_self(); - values.append(&item); - self.values = Some(values); - } - Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))))) - } - }, - } - } - Err(e) => { - self.errored = true; - Ready(Some(Err(e))) - } - }, - Ready(None) => { - self.data_complete = true; - Ready(None) - } - Pending => { - self.stream_from_other_inputs = Some(fut); - Pending - } - } - } - - fn poll_open_check_local_file( - self: &mut Self, - mut fut: Pin> + Send>>, - cx: &mut Context, - ) -> Poll::Output as TimeBinnableType>::Output>>> { - use Poll::*; - match fut.poll_unpin(cx) { - Ready(item) => { - match item { - Ok(file) => { - self.read_from_cache = true; - let fut = - <<::Output as TimeBinnableType>::Output as ReadableFromFile>::read_from_file(file)?; - self.read_cache_fut = Some(Box::pin(fut)); - // Return Ready(None) to signal that nothing is Pending but we need to get polled again. - //continue 'outer; - Ready(None) - } - Err(e) => match e.kind() { - // TODO other error kinds - io::ErrorKind::NotFound => match self.try_setup_fetch_prebinned_higher_res() { - Ok(_) => { - if self.stream_from_other_inputs.is_none() { - let e = - Err(Error::with_msg(format!("try_setup_fetch_prebinned_higher_res failed"))); - self.errored = true; - Ready(Some(e)) - } else { - //continue 'outer; - Ready(None) - } - } - Err(e) => { - let e = - Error::with_msg(format!("try_setup_fetch_prebinned_higher_res error: {:?}", e)); - self.errored = true; - Ready(Some(Err(e))) - } - }, - _ => { - error!("File I/O error: kind {:?} {:?}\n\n..............", e.kind(), e); - self.errored = true; - Ready(Some(Err(e.into()))) - } - }, - } - } - Pending => { - self.open_check_local_file = Some(fut); - Pending - } - } - } -} - -macro_rules! some_or_continue { - ($x:expr) => { - if let Ready(None) = $x { - continue; - } else { - $x - } - }; -} - -impl Stream for PreBinnedValueStream -where - NTY: NumOps + NumFromBytes + Serialize + Unpin + 'static, - END: Endianness + Unpin + 'static, - EVS: EventValueShape + EventValueFromBytes + Unpin + 'static, - ENP: EventsNodeProcessor>::Batch> + Unpin + 'static, - ::Output: PushableIndex + Appendable + Clearable, - // TODO needed? - Sitemty<::Output>: FrameType, - Sitemty<<::Output as TimeBinnableType>::Output>: FrameType + FrameDecodable, -{ - type Item = Sitemty<<::Output as TimeBinnableType>::Output>; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - use Poll::*; - loop { - break if self.completed { - panic!("PreBinnedValueStream poll_next on completed"); - } else if self.errored { - self.completed = true; - Ready(None) - } else if self.all_done { - self.completed = true; - Ready(None) - } else if let Some(item) = self.streamlog.pop() { - Ready(Some(Ok(StreamItem::Log(item)))) - } else if let Some(fut) = self.write_fut.take() { - let x = Self::poll_write_fut(&mut self, fut, cx); - some_or_continue!(x) - } else if let Some(fut) = self.read_cache_fut.take() { - let x = Self::poll_read_cache_fut(&mut self, fut, cx); - some_or_continue!(x) - } else if self.range_complete_emitted { - self.completed = true; - Ready(None) - } else if self.data_complete { - let x = Self::handle_data_complete(&mut self); - some_or_continue!(x) - } else if let Some(fut) = self.stream_from_other_inputs.take() { - let x = Self::poll_stream_from_other_inputs(&mut self, fut, cx); - some_or_continue!(x) - } else if let Some(fut) = self.open_check_local_file.take() { - let x = Self::poll_open_check_local_file(&mut self, fut, cx); - some_or_continue!(x) - } else { - let cfd = CacheFileDesc::new( - self.query.channel().clone(), - self.query.patch().clone(), - self.query.agg_kind().clone(), - ); - let path = match self.query.cache_usage() { - CacheUsage::Use => cfd.path(&self.node_config), - _ => PathBuf::from("DOESNOTEXIST"), - }; - let fut = async { OpenOptions::new().read(true).open(path).await }; - self.open_check_local_file = Some(Box::pin(fut)); - continue; - }; - } - } -} diff --git a/crates/disk/src/binned/prebinned.rs b/crates/disk/src/binned/prebinned.rs deleted file mode 100644 index 448560f..0000000 --- a/crates/disk/src/binned/prebinned.rs +++ /dev/null @@ -1,260 +0,0 @@ -use crate::binned::pbv::PreBinnedValueStream; -use crate::binned::query::PreBinnedQuery; -use crate::cache::node_ix_for_patch; -use crate::decode::{ - BigEndian, Endianness, EventValueFromBytes, EventValueShape, EventValuesDim0Case, EventValuesDim1Case, - LittleEndian, NumFromBytes, -}; -use bytes::Bytes; -use dbconn::bincache::pre_binned_value_stream; -use err::Error; -use futures_core::Stream; -use futures_util::StreamExt; -use items::numops::{BoolNum, NumOps, StringNum}; -use items::{ - Appendable, Clearable, EventsNodeProcessor, Framable, FrameDecodable, FrameType, FrameTypeInnerDyn, PushableIndex, - RangeCompletableItem, Sitemty, StreamItem, TimeBinnableType, TimeBinned, -}; -use netpod::log::*; -use netpod::{AggKind, ByteOrder, ChannelTyped, NodeConfigCached, ScalarType, Shape}; -use serde::Serialize; -use std::pin::Pin; - -async fn make_num_pipeline_nty_end_evs_enp( - scalar_type: ScalarType, - shape: Shape, - agg_kind: AggKind, - _event_value_shape: EVS, - _events_node_proc: ENP, - query: PreBinnedQuery, - node_config: &NodeConfigCached, -) -> Result>> + Send>>, Error> -where - NTY: NumOps + NumFromBytes + Serialize + 'static, - END: Endianness + 'static, - EVS: EventValueShape + EventValueFromBytes + 'static, - ENP: EventsNodeProcessor>::Batch> + 'static, - ::Output: PushableIndex + Appendable + Clearable + 'static, - <::Output as TimeBinnableType>::Output: FrameTypeInnerDyn + TimeBinned, - Sitemty<::Output>: FrameType + Framable + 'static, - Sitemty<<::Output as TimeBinnableType>::Output>: Framable + FrameType + FrameDecodable, -{ - if let Some(scyconf) = &node_config.node_config.cluster.cache_scylla { - trace!("~~~~~~~~~~~~~~~ make_num_pipeline_nty_end_evs_enp using scylla as cache"); - let chn = ChannelTyped { - channel: query.channel().clone(), - scalar_type, - shape, - }; - let stream = pre_binned_value_stream( - chn.channel().series().unwrap(), - &chn, - query.patch(), - agg_kind, - query.cache_usage(), - scyconf, - ) - .await?; - let stream = stream.map(|x| { - let ret = match x { - Ok(k) => Ok(StreamItem::DataItem(RangeCompletableItem::Data(k))), - Err(e) => Err(e), - }; - ret - }); - let stream = Box::pin(stream) as Pin>> + Send>>; - Ok(stream) - } else { - let ret = PreBinnedValueStream::::new(query, agg_kind, node_config); - let ret = StreamExt::map(ret, |item| { - // - match item { - Ok(StreamItem::DataItem(RangeCompletableItem::Data(k))) => { - let g = Box::new(k) as Box; - Ok(StreamItem::DataItem(RangeCompletableItem::Data(g))) - } - Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)) => { - Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)) - } - Ok(StreamItem::Log(k)) => Ok(StreamItem::Log(k)), - Ok(StreamItem::Stats(k)) => Ok(StreamItem::Stats(k)), - Err(e) => Err(e), - } - }); - Ok(Box::pin(ret)) - } -} - -async fn make_num_pipeline_nty_end( - scalar_type: ScalarType, - shape: Shape, - agg_kind: AggKind, - query: PreBinnedQuery, - node_config: &NodeConfigCached, -) -> Result>> + Send>>, Error> -where - NTY: NumOps + NumFromBytes + Serialize + 'static, - END: Endianness + 'static, -{ - match shape { - Shape::Scalar => { - let evs = EventValuesDim0Case::new(); - match agg_kind { - AggKind::EventBlobs => panic!(), - AggKind::TimeWeightedScalar | AggKind::DimXBins1 => { - let events_node_proc = < as EventValueShape>::NumXAggToSingleBin as EventsNodeProcessor>::create(shape.clone(), agg_kind.clone()); - make_num_pipeline_nty_end_evs_enp::( - scalar_type, - shape, - agg_kind, - evs, - events_node_proc, - query, - node_config, - ) - .await - } - AggKind::DimXBinsN(_) => { - let events_node_proc = < as EventValueShape>::NumXAggToNBins as EventsNodeProcessor>::create(shape.clone(), agg_kind.clone()); - make_num_pipeline_nty_end_evs_enp::( - scalar_type, - shape, - agg_kind, - evs, - events_node_proc, - query, - node_config, - ) - .await - } - AggKind::Plain => { - panic!(); - } - AggKind::Stats1 => { - // Currently not meant to be binned. - panic!(); - } - } - } - Shape::Wave(n) => { - let evs = EventValuesDim1Case::new(n); - match agg_kind { - AggKind::EventBlobs => panic!(), - AggKind::TimeWeightedScalar | AggKind::DimXBins1 => { - let events_node_proc = < as EventValueShape>::NumXAggToSingleBin as EventsNodeProcessor>::create(shape.clone(), agg_kind.clone()); - make_num_pipeline_nty_end_evs_enp::( - scalar_type, - shape, - agg_kind, - evs, - events_node_proc, - query, - node_config, - ) - .await - } - AggKind::DimXBinsN(_) => { - let events_node_proc = < as EventValueShape>::NumXAggToNBins as EventsNodeProcessor>::create(shape.clone(), agg_kind.clone()); - make_num_pipeline_nty_end_evs_enp::( - scalar_type, - shape, - agg_kind, - evs, - events_node_proc, - query, - node_config, - ) - .await - } - AggKind::Plain => { - panic!(); - } - AggKind::Stats1 => { - // Currently not meant to be binned. - panic!(); - } - } - } - Shape::Image(..) => { - // TODO image binning/aggregation - err::todoval() - } - } -} - -macro_rules! match_end { - ($nty:ident, $end:expr, $scalar_type:expr, $shape:expr, $agg_kind:expr, $query:expr, $node_config:expr) => { - match $end { - ByteOrder::Little => { - make_num_pipeline_nty_end::<$nty, LittleEndian>($scalar_type, $shape, $agg_kind, $query, $node_config) - .await - } - ByteOrder::Big => { - make_num_pipeline_nty_end::<$nty, BigEndian>($scalar_type, $shape, $agg_kind, $query, $node_config) - .await - } - } - }; -} - -async fn make_num_pipeline( - scalar_type: ScalarType, - byte_order: ByteOrder, - shape: Shape, - agg_kind: AggKind, - query: PreBinnedQuery, - node_config: &NodeConfigCached, -) -> Result>> + Send>>, Error> { - match scalar_type { - ScalarType::U8 => match_end!(u8, byte_order, scalar_type, shape, agg_kind, query, node_config), - ScalarType::U16 => match_end!(u16, byte_order, scalar_type, shape, agg_kind, query, node_config), - ScalarType::U32 => match_end!(u32, byte_order, scalar_type, shape, agg_kind, query, node_config), - ScalarType::U64 => match_end!(u64, byte_order, scalar_type, shape, agg_kind, query, node_config), - ScalarType::I8 => match_end!(i8, byte_order, scalar_type, shape, agg_kind, query, node_config), - ScalarType::I16 => match_end!(i16, byte_order, scalar_type, shape, agg_kind, query, node_config), - ScalarType::I32 => match_end!(i32, byte_order, scalar_type, shape, agg_kind, query, node_config), - ScalarType::I64 => match_end!(i64, byte_order, scalar_type, shape, agg_kind, query, node_config), - ScalarType::F32 => match_end!(f32, byte_order, scalar_type, shape, agg_kind, query, node_config), - ScalarType::F64 => match_end!(f64, byte_order, scalar_type, shape, agg_kind, query, node_config), - ScalarType::BOOL => match_end!(BoolNum, byte_order, scalar_type, shape, agg_kind, query, node_config), - ScalarType::STRING => match_end!(StringNum, byte_order, scalar_type, shape, agg_kind, query, node_config), - } -} - -pub async fn pre_binned_bytes_for_http( - node_config: &NodeConfigCached, - query: &PreBinnedQuery, -) -> Result> + Send>>, Error> { - if query.channel().backend != node_config.node_config.cluster.backend { - let err = Error::with_msg(format!( - "backend mismatch node: {} requested: {}", - node_config.node_config.cluster.backend, - query.channel().backend - )); - return Err(err); - } - let patch_node_ix = node_ix_for_patch(query.patch(), query.channel(), &node_config.node_config.cluster); - if node_config.ix as u32 != patch_node_ix { - let err = Error::with_msg(format!( - "pre_binned_bytes_for_http node mismatch node_config.ix {} patch_node_ix {}", - node_config.ix, patch_node_ix - )); - return Err(err); - } - let ret = make_num_pipeline( - query.scalar_type().clone(), - // TODO actually, make_num_pipeline should not depend on endianness. - ByteOrder::Little, - query.shape().clone(), - query.agg_kind().clone(), - query.clone(), - node_config, - ) - .await? - .map(|item| match item.make_frame________() { - Ok(item) => Ok(item.freeze()), - Err(e) => Err(e), - }); - let ret = Box::pin(ret); - Ok(ret) -}