diff --git a/disk/src/binned.rs b/disk/src/binned.rs index d4704e7..94a4a0d 100644 --- a/disk/src/binned.rs +++ b/disk/src/binned.rs @@ -246,9 +246,6 @@ pub async fn binned_bytes_for_http( } } -// TODO remove this when no longer used, gets replaced by Result, Error> -pub type BinnedBytesForHttpStreamFrame = ::Item; - pub struct BinnedBytesForHttpStream { inp: S, errored: bool, @@ -397,8 +394,10 @@ pub async fn binned_json(node_config: &NodeConfigCached, query: &BinnedQuery) -> pub trait BinnedStreamKind { type BinnedStreamItem: MakeBytesFrame; type BinnedStreamType: Stream + Send + 'static; + type Dummy: Default + Unpin; fn new_binned_from_prebinned( + &self, query: &BinnedQuery, range: BinnedRange, pre_range: PreBinnedPatchRange, @@ -406,6 +405,7 @@ pub trait BinnedStreamKind { ) -> Result; fn new_binned_from_merged( + &self, evq: EventsQuery, perf_opts: PerfOpts, range: BinnedRange, @@ -432,8 +432,10 @@ impl BinnedStreamKindWave { impl BinnedStreamKind for BinnedStreamKindScalar { type BinnedStreamItem = Result, Error>; type BinnedStreamType = BinnedStream; + type Dummy = u32; fn new_binned_from_prebinned( + &self, query: &BinnedQuery, range: BinnedRange, pre_range: PreBinnedPatchRange, @@ -447,11 +449,13 @@ impl BinnedStreamKind for BinnedStreamKindScalar { query.cache_usage().clone(), node_config, query.disk_stats_every().clone(), + self, )?; Ok(BinnedStream::new(Box::pin(s))?) } fn new_binned_from_merged( + &self, evq: EventsQuery, perf_opts: PerfOpts, range: BinnedRange, diff --git a/disk/src/binned/scalar.rs b/disk/src/binned/scalar.rs index 29335d1..624bcb9 100644 --- a/disk/src/binned/scalar.rs +++ b/disk/src/binned/scalar.rs @@ -61,7 +61,7 @@ where ); return Err(Error::with_msg(msg)); } - let s = BK::new_binned_from_prebinned(query, range.clone(), pre_range, node_config)?; + let s = BK::new_binned_from_prebinned(&stream_kind, query, range.clone(), pre_range, node_config)?; let s = BinnedStream::new(Box::pin(s))?; let ret = BinnedStreamRes { binned_stream: s, @@ -80,7 +80,7 @@ where agg_kind: query.agg_kind().clone(), }; // TODO do I need to set up more transformations or binning to deliver the requested data? - let s = BK::new_binned_from_merged(evq, perf_opts, range.clone(), node_config)?; + let s = BK::new_binned_from_merged(&stream_kind, evq, perf_opts, range.clone(), node_config)?; let s = BinnedStream::new(Box::pin(s))?; let ret = BinnedStreamRes { binned_stream: s, diff --git a/disk/src/binnedstream.rs b/disk/src/binnedstream.rs index b4d905a..588325d 100644 --- a/disk/src/binnedstream.rs +++ b/disk/src/binnedstream.rs @@ -1,7 +1,7 @@ use crate::agg::binnedt::IntoBinnedT; use crate::agg::streams::StreamItem; -use crate::binned::BinnedScalarStreamItem; -use crate::cache::pbvfs::{PreBinnedItem, PreBinnedValueFetchedStream}; +use crate::binned::{BinnedScalarStreamItem, BinnedStreamKind}; +use crate::cache::pbvfs::{PreBinnedScalarItem, PreBinnedScalarValueFetchedStream}; use crate::cache::{CacheUsage, PreBinnedQuery}; use err::Error; use futures_core::Stream; @@ -12,11 +12,18 @@ use std::future::ready; use std::pin::Pin; use std::task::{Context, Poll}; -pub struct BinnedScalarStreamFromPreBinnedPatches { +pub struct BinnedScalarStreamFromPreBinnedPatches +where + BK: BinnedStreamKind, +{ inp: Pin, Error>> + Send>>, + _marker: BK::Dummy, } -impl BinnedScalarStreamFromPreBinnedPatches { +impl BinnedScalarStreamFromPreBinnedPatches +where + BK: BinnedStreamKind, +{ pub fn new( patch_it: PreBinnedPatchIterator, channel: Channel, @@ -25,6 +32,7 @@ impl BinnedScalarStreamFromPreBinnedPatches { cache_usage: CacheUsage, node_config: &NodeConfigCached, disk_stats_every: ByteSize, + stream_kind: &BK, ) -> Result { let patches: Vec<_> = patch_it.collect(); let mut sp = String::new(); @@ -34,7 +42,7 @@ impl BinnedScalarStreamFromPreBinnedPatches { use std::fmt::Write; write!(sp, " • patch {:2} {:?}\n", i, p)?; } - info!("BinnedStream::new\n{}", sp); + info!("Using these pre-binned patches:\n{}", sp); } let inp = futures_util::stream::iter(patches.into_iter()) .map({ @@ -48,7 +56,7 @@ impl BinnedScalarStreamFromPreBinnedPatches { disk_stats_every.clone(), ); let s: Pin + Send>> = - match PreBinnedValueFetchedStream::new(&query, &node_config) { + match PreBinnedScalarValueFetchedStream::new(&query, &node_config) { Ok(k) => Box::pin(k), Err(e) => { error!("error from PreBinnedValueFetchedStream::new {:?}", e); @@ -68,10 +76,10 @@ impl BinnedScalarStreamFromPreBinnedPatches { StreamItem::Log(item) => Some(Ok(StreamItem::Log(item))), StreamItem::Stats(item) => Some(Ok(StreamItem::Stats(item))), StreamItem::DataItem(item) => match item { - PreBinnedItem::RangeComplete => { + PreBinnedScalarItem::RangeComplete => { Some(Ok(StreamItem::DataItem(BinnedScalarStreamItem::RangeComplete))) } - PreBinnedItem::Batch(item) => { + PreBinnedScalarItem::Batch(item) => { use super::agg::{Fits, FitsInside}; match item.fits_inside(fit_range) { Fits::Inside @@ -91,11 +99,17 @@ impl BinnedScalarStreamFromPreBinnedPatches { } }) .into_binned_t(range); - Ok(Self { inp: Box::pin(inp) }) + Ok(Self { + inp: Box::pin(inp), + _marker: BK::Dummy::default(), + }) } } -impl Stream for BinnedScalarStreamFromPreBinnedPatches { +impl Stream for BinnedScalarStreamFromPreBinnedPatches +where + BK: BinnedStreamKind, +{ type Item = Result, Error>; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { diff --git a/disk/src/cache.rs b/disk/src/cache.rs index cd580b2..9063e4e 100644 --- a/disk/src/cache.rs +++ b/disk/src/cache.rs @@ -2,7 +2,7 @@ use crate::agg::eventbatch::MinMaxAvgScalarEventBatchStreamItem; use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch; use crate::agg::streams::StreamItem; use crate::cache::pbv::PreBinnedValueByteStream; -use crate::cache::pbvfs::PreBinnedItem; +use crate::cache::pbvfs::PreBinnedScalarItem; use crate::merge::MergedMinMaxAvgScalarStream; use crate::raw::EventsQuery; use bytes::Bytes; @@ -526,10 +526,10 @@ pub async fn write_pb_cache_min_max_avg_scalar( Ok(()) } -pub async fn read_pbv(mut file: File) -> Result, Error> { +pub async fn read_pbv(mut file: File) -> Result, Error> { let mut buf = vec![]; file.read_to_end(&mut buf).await?; trace!("Read cached file len {}", buf.len()); let dec: MinMaxAvgScalarBinBatch = serde_cbor::from_slice(&buf)?; - Ok(StreamItem::DataItem(PreBinnedItem::Batch(dec))) + Ok(StreamItem::DataItem(PreBinnedScalarItem::Batch(dec))) } diff --git a/disk/src/cache/pbv.rs b/disk/src/cache/pbv.rs index 27a39ba..60ef315 100644 --- a/disk/src/cache/pbv.rs +++ b/disk/src/cache/pbv.rs @@ -1,7 +1,7 @@ use crate::agg::binnedt::IntoBinnedT; use crate::agg::scalarbinbatch::{MinMaxAvgScalarBinBatch, MinMaxAvgScalarBinBatchStreamItem}; use crate::agg::streams::StreamItem; -use crate::cache::pbvfs::{PreBinnedItem, PreBinnedValueFetchedStream}; +use crate::cache::pbvfs::{PreBinnedScalarItem, PreBinnedScalarValueFetchedStream}; use crate::cache::{CacheFileDesc, MergedFromRemotes, PreBinnedQuery}; use crate::frame::makeframe::make_frame; use crate::raw::EventsQuery; @@ -39,7 +39,7 @@ impl Stream for PreBinnedValueByteStreamInner { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; match self.inp.poll_next_unpin(cx) { - Ready(Some(item)) => match make_frame::, Error>>(&item) { + Ready(Some(item)) => match make_frame::, Error>>(&item) { Ok(buf) => Ready(Some(Ok(buf.freeze()))), Err(e) => Ready(Some(Err(e.into()))), }, @@ -53,7 +53,7 @@ pub struct PreBinnedValueStream { query: PreBinnedQuery, node_config: NodeConfigCached, open_check_local_file: Option> + Send>>>, - fut2: Option, Error>> + Send>>>, + fut2: Option, Error>> + Send>>>, read_from_cache: bool, cache_written: bool, data_complete: bool, @@ -64,7 +64,7 @@ pub struct PreBinnedValueStream { streamlog: Streamlog, values: MinMaxAvgScalarBinBatch, write_fut: Option> + Send>>>, - read_cache_fut: Option, Error>> + Send>>>, + read_cache_fut: Option, Error>> + Send>>>, } impl PreBinnedValueStream { @@ -120,10 +120,10 @@ impl PreBinnedValueStream { StreamItem::Stats(item) => Ok(StreamItem::Stats(item)), StreamItem::DataItem(item) => match item { MinMaxAvgScalarBinBatchStreamItem::RangeComplete => { - Ok(StreamItem::DataItem(PreBinnedItem::RangeComplete)) + Ok(StreamItem::DataItem(PreBinnedScalarItem::RangeComplete)) } MinMaxAvgScalarBinBatchStreamItem::Values(item) => { - Ok(StreamItem::DataItem(PreBinnedItem::Batch(item))) + Ok(StreamItem::DataItem(PreBinnedScalarItem::Batch(item))) } }, }, @@ -170,7 +170,7 @@ impl PreBinnedValueStream { cache_usage: q2.cache_usage.clone(), disk_stats_every: disk_stats_every.clone(), }; - PreBinnedValueFetchedStream::new(&query, &node_config) + PreBinnedScalarValueFetchedStream::new(&query, &node_config) } }) .map(|k| { @@ -201,7 +201,7 @@ impl PreBinnedValueStream { impl Stream for PreBinnedValueStream { // TODO need this generic for scalar and array (when wave is not binned down to a single scalar point) - type Item = Result, Error>; + type Item = Result, Error>; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; @@ -256,7 +256,7 @@ impl Stream for PreBinnedValueStream { if self.cache_written { if self.range_complete_observed { self.range_complete_emitted = true; - Ready(Some(Ok(StreamItem::DataItem(PreBinnedItem::RangeComplete)))) + Ready(Some(Ok(StreamItem::DataItem(PreBinnedScalarItem::RangeComplete)))) } else { self.completed = true; Ready(None) @@ -297,18 +297,18 @@ impl Stream for PreBinnedValueStream { StreamItem::Log(item) => Ready(Some(Ok(StreamItem::Log(item)))), StreamItem::Stats(item) => Ready(Some(Ok(StreamItem::Stats(item)))), StreamItem::DataItem(item) => match item { - PreBinnedItem::RangeComplete => { + PreBinnedScalarItem::RangeComplete => { self.range_complete_observed = true; continue 'outer; } - PreBinnedItem::Batch(batch) => { + PreBinnedScalarItem::Batch(batch) => { self.values.ts1s.extend(batch.ts1s.iter()); self.values.ts2s.extend(batch.ts2s.iter()); self.values.counts.extend(batch.counts.iter()); self.values.mins.extend(batch.mins.iter()); self.values.maxs.extend(batch.maxs.iter()); self.values.avgs.extend(batch.avgs.iter()); - Ready(Some(Ok(StreamItem::DataItem(PreBinnedItem::Batch(batch))))) + Ready(Some(Ok(StreamItem::DataItem(PreBinnedScalarItem::Batch(batch))))) } }, }, diff --git a/disk/src/cache/pbvfs.rs b/disk/src/cache/pbvfs.rs index 50f8715..88c2b78 100644 --- a/disk/src/cache/pbvfs.rs +++ b/disk/src/cache/pbvfs.rs @@ -14,7 +14,7 @@ use serde::{Deserialize, Serialize}; use std::pin::Pin; use std::task::{Context, Poll}; -pub struct PreBinnedValueFetchedStream { +pub struct PreBinnedScalarValueFetchedStream { uri: http::Uri, resfut: Option, res: Option>, @@ -22,7 +22,7 @@ pub struct PreBinnedValueFetchedStream { completed: bool, } -impl PreBinnedValueFetchedStream { +impl PreBinnedScalarValueFetchedStream { pub fn new(query: &PreBinnedQuery, node_config: &NodeConfigCached) -> Result { let nodeix = node_ix_for_patch(&query.patch, &query.channel, &node_config.node_config.cluster); let node = &node_config.node_config.cluster.nodes[nodeix as usize]; @@ -45,33 +45,30 @@ impl PreBinnedValueFetchedStream { } #[derive(Debug, Serialize, Deserialize)] -pub enum PreBinnedItem { +pub enum PreBinnedScalarItem { Batch(MinMaxAvgScalarBinBatch), RangeComplete, } -impl Stream for PreBinnedValueFetchedStream { - // TODO need this generic for scalar and array (when wave is not binned down to a single scalar point) - type Item = Result, Error>; +impl Stream for PreBinnedScalarValueFetchedStream { + type Item = Result, Error>; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; - if self.completed { - panic!("poll_next on completed"); - } - if self.errored { - self.completed = true; - return Ready(None); - } 'outer: loop { - break if let Some(res) = self.res.as_mut() { + break if self.completed { + panic!("poll_next on completed"); + } else if self.errored { + self.completed = true; + return Ready(None); + } else if let Some(res) = self.res.as_mut() { pin_mut!(res); match res.poll_next(cx) { Ready(Some(Ok(item))) => match item { StreamItem::Log(item) => Ready(Some(Ok(StreamItem::Log(item)))), StreamItem::Stats(item) => Ready(Some(Ok(StreamItem::Stats(item)))), StreamItem::DataItem(item) => { - match decode_frame::, Error>>(&item) { + match decode_frame::, Error>>(&item) { Ok(Ok(item)) => Ready(Some(Ok(item))), Ok(Err(e)) => { self.errored = true; diff --git a/disk/src/frame/makeframe.rs b/disk/src/frame/makeframe.rs index ae681f8..d53e5d9 100644 --- a/disk/src/frame/makeframe.rs +++ b/disk/src/frame/makeframe.rs @@ -1,7 +1,7 @@ use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatchStreamItem; use crate::agg::streams::StreamItem; use crate::binned::BinnedScalarStreamItem; -use crate::cache::pbvfs::PreBinnedItem; +use crate::cache::pbvfs::PreBinnedScalarItem; use crate::frame::inmem::InMemoryFrame; use crate::raw::conn::RawConnOut; use crate::raw::EventQueryJsonStringFrame; @@ -33,7 +33,7 @@ impl FrameType for Result { const FRAME_TYPE_ID: u32 = 0x07; } -impl FrameType for Result, Error> { +impl FrameType for Result, Error> { const FRAME_TYPE_ID: u32 = 0x08; } diff --git a/retrieval/src/client.rs b/retrieval/src/client.rs index d2f25ea..698c1a1 100644 --- a/retrieval/src/client.rs +++ b/retrieval/src/client.rs @@ -91,7 +91,6 @@ pub async fn get_binned( None } StreamItem::DataItem(frame) => { - type _ExpectedType2 = disk::binned::BinnedBytesForHttpStreamFrame; type ExpectedType = Result, Error>; let type_id_exp = ::FRAME_TYPE_ID; if frame.tyid() != type_id_exp { diff --git a/retrieval/src/test.rs b/retrieval/src/test.rs index e6ac596..99408c9 100644 --- a/retrieval/src/test.rs +++ b/retrieval/src/test.rs @@ -2,6 +2,7 @@ use crate::spawn_test_hosts; use bytes::BytesMut; use chrono::{DateTime, Utc}; use disk::agg::streams::StreamItem; +use disk::binned::BinnedScalarStreamItem; use disk::frame::inmem::InMemoryFrameAsyncReadStream; use disk::streamlog::Streamlog; use err::Error; @@ -161,7 +162,7 @@ where None } StreamItem::DataItem(frame) => { - type ExpectedType = disk::binned::BinnedBytesForHttpStreamFrame; + type ExpectedType = Result, Error>; match bincode::deserialize::(frame.buf()) { Ok(item) => match item { Ok(item) => match item {