From 97516601183d932f91ef3b44cc7ac438c3d7df42 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Tue, 25 May 2021 23:41:22 +0200 Subject: [PATCH] WIP --- disk/src/agg/binnedt.rs | 79 +++++----- disk/src/agg/binnedt2.rs | 119 +++++++++----- disk/src/agg/binnedx.rs | 27 ++-- disk/src/agg/eventbatch.rs | 10 ++ disk/src/agg/scalarbinbatch.rs | 4 +- disk/src/agg/streams.rs | 2 +- disk/src/aggtest.rs | 9 +- disk/src/binned.rs | 277 +++++++++++++++++---------------- disk/src/binned/scalar.rs | 18 +-- disk/src/binnedstream.rs | 60 +++---- disk/src/cache.rs | 19 ++- disk/src/cache/pbv.rs | 91 ++++++++--- disk/src/cache/pbvfs.rs | 19 +-- disk/src/eventblobs.rs | 5 +- disk/src/frame/makeframe.rs | 19 +-- disk/src/merge.rs | 25 ++- disk/src/raw/conn.rs | 4 +- 17 files changed, 438 insertions(+), 349 deletions(-) diff --git a/disk/src/agg/binnedt.rs b/disk/src/agg/binnedt.rs index 4d61b1a..ad4a833 100644 --- a/disk/src/agg/binnedt.rs +++ b/disk/src/agg/binnedt.rs @@ -1,5 +1,6 @@ use crate::agg::streams::StreamItem; use crate::agg::AggregatableXdim1Bin; +use crate::binned::RangeCompletableItem; use err::Error; use futures_core::Stream; use futures_util::StreamExt; @@ -32,7 +33,7 @@ pub trait IntoBinnedT { impl IntoBinnedT for S where - S: Stream, Error>> + Unpin, + S: Stream>, Error>> + Unpin, I: AggregatableTdim + Unpin, I::Aggregator: Unpin, { @@ -45,7 +46,7 @@ where pub struct IntoBinnedTDefaultStream where - S: Stream, Error>>, + S: Stream>, Error>> + Unpin, I: AggregatableTdim, { inp: S, @@ -56,7 +57,7 @@ where all_bins_emitted: bool, range_complete_observed: bool, range_complete_emitted: bool, - left: Option, Error>>>>, + left: Option>, Error>>>>, errored: bool, completed: bool, tmp_agg_results: VecDeque<::OutputValue>, @@ -64,7 +65,7 @@ where impl IntoBinnedTDefaultStream where - S: Stream, Error>> + Unpin, + S: Stream>, Error>> + Unpin, I: AggregatableTdim, { pub fn new(inp: S, spec: BinnedRange) -> Self { @@ -85,7 +86,7 @@ where } } - fn cur(&mut self, cx: &mut Context) -> Poll, Error>>> { + fn cur(&mut self, cx: &mut Context) -> Poll>, Error>>> { if let Some(cur) = self.left.take() { cur } else if self.inp_completed { @@ -113,43 +114,50 @@ where fn handle( &mut self, - cur: Poll, Error>>>, - ) -> Option::OutputValue>, Error>>>> { + cur: Poll>, Error>>>, + ) -> Option< + Poll::OutputValue>>, Error>>>, + > { use Poll::*; match cur { Ready(Some(Ok(item))) => match item { StreamItem::Log(item) => Some(Ready(Some(Ok(StreamItem::Log(item))))), StreamItem::Stats(item) => Some(Ready(Some(Ok(StreamItem::Stats(item))))), - StreamItem::DataItem(item) => { - if item.is_range_complete() { + StreamItem::DataItem(item) => match item { + RangeCompletableItem::RangeComplete => { self.range_complete_observed = true; None - } else if self.all_bins_emitted { - // Just drop the item because we will not emit anymore data. - // Could also at least gather some stats. - None - } else { - let ag = self.aggtor.as_mut().unwrap(); - if ag.ends_before(&item) { - None - } else if ag.starts_after(&item) { - self.left = Some(Ready(Some(Ok(StreamItem::DataItem(item))))); - self.cycle_current_bin(); - // TODO cycle_current_bin enqueues the bin, can I return here instead? + } + RangeCompletableItem::Data(item) => { + if self.all_bins_emitted { + // Just drop the item because we will not emit anymore data. + // Could also at least gather some stats. None } else { - let mut item = item; - ag.ingest(&mut item); - let item = item; - if ag.ends_after(&item) { - self.left = Some(Ready(Some(Ok(StreamItem::DataItem(item))))); + let ag = self.aggtor.as_mut().unwrap(); + if ag.ends_before(&item) { + None + } else if ag.starts_after(&item) { + self.left = + Some(Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(item)))))); self.cycle_current_bin(); + // TODO cycle_current_bin enqueues the bin, can I return here instead? + None + } else { + let mut item = item; + ag.ingest(&mut item); + let item = item; + if ag.ends_after(&item) { + self.left = + Some(Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(item)))))); + self.cycle_current_bin(); + } + // TODO cycle_current_bin enqueues the bin, can I return here instead? + None } - // TODO cycle_current_bin enqueues the bin, can I return here instead? - None } } - } + }, }, Ready(Some(Err(e))) => { self.errored = true; @@ -172,11 +180,11 @@ where impl Stream for IntoBinnedTDefaultStream where - S: Stream, Error>> + Unpin, + S: Stream>, Error>> + Unpin, I: AggregatableTdim + Unpin, I::Aggregator: Unpin, { - type Item = Result::OutputValue>, Error>; + type Item = Result::OutputValue>>, Error>; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; @@ -187,19 +195,14 @@ where self.completed = true; Ready(None) } else if let Some(item) = self.tmp_agg_results.pop_front() { - Ready(Some(Ok(StreamItem::DataItem(item)))) + Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))))) } else if self.range_complete_emitted { self.completed = true; Ready(None) } else if self.inp_completed && self.all_bins_emitted { self.range_complete_emitted = true; if self.range_complete_observed { - if let Some(item) = ::OutputValue::make_range_complete_item() { - Ready(Some(Ok(StreamItem::DataItem(item)))) - } else { - warn!("IntoBinnedTDefaultStream should emit RangeComplete but it doesn't have one"); - continue 'outer; - } + Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)))) } else { continue 'outer; } diff --git a/disk/src/agg/binnedt2.rs b/disk/src/agg/binnedt2.rs index 2ed6fd3..7bf9d17 100644 --- a/disk/src/agg/binnedt2.rs +++ b/disk/src/agg/binnedt2.rs @@ -1,5 +1,7 @@ +use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch; use crate::agg::streams::StreamItem; use crate::agg::AggregatableXdim1Bin; +use crate::binned::RangeCompletableItem; use err::Error; use futures_core::Stream; use futures_util::StreamExt; @@ -9,7 +11,7 @@ use std::collections::VecDeque; use std::pin::Pin; use std::task::{Context, Poll}; -pub trait AggregatorTdim2: Sized + Unpin { +pub trait AggregatorTdim2: Sized + Send + Unpin { type InputValue; fn ends_before(&self, inp: &Self::InputValue) -> bool; fn ends_after(&self, inp: &Self::InputValue) -> bool; @@ -21,8 +23,6 @@ pub trait AggregatorTdim2: Sized + Unpin { pub trait AggregatableTdim2: Sized { type Aggregator: AggregatorTdim2; fn aggregator_new_static(ts1: u64, ts2: u64) -> Self::Aggregator; - fn is_range_complete(&self) -> bool; - fn make_range_complete_item() -> Option; } pub trait IntoBinnedT { @@ -32,7 +32,7 @@ pub trait IntoBinnedT { impl IntoBinnedT for S where - S: Stream, Error>> + Unpin, + S: Stream>, Error>> + Unpin, I: AggregatableTdim2 + Unpin, I::Aggregator: Unpin, { @@ -45,7 +45,7 @@ where pub struct IntoBinnedTDefaultStream where - S: Stream, Error>>, + S: Stream>, Error>>, I: AggregatableTdim2, { inp: S, @@ -56,7 +56,7 @@ where all_bins_emitted: bool, range_complete_observed: bool, range_complete_emitted: bool, - left: Option, Error>>>>, + left: Option>, Error>>>>, errored: bool, completed: bool, tmp_agg_results: VecDeque, @@ -64,7 +64,7 @@ where impl IntoBinnedTDefaultStream where - S: Stream, Error>> + Unpin, + S: Stream>, Error>> + Unpin, I: AggregatableTdim2, { pub fn new(inp: S, spec: BinnedRange) -> Self { @@ -85,7 +85,7 @@ where } } - fn cur(&mut self, cx: &mut Context) -> Poll, Error>>> { + fn cur(&mut self, cx: &mut Context) -> Poll>, Error>>> { if let Some(cur) = self.left.take() { cur } else if self.inp_completed { @@ -113,43 +113,47 @@ where fn handle( &mut self, - cur: Poll, Error>>>, - ) -> Option, Error>>>> { + cur: Poll>, Error>>>, + ) -> Option>, Error>>>> { use Poll::*; match cur { Ready(Some(Ok(item))) => match item { StreamItem::Log(item) => Some(Ready(Some(Ok(StreamItem::Log(item))))), StreamItem::Stats(item) => Some(Ready(Some(Ok(StreamItem::Stats(item))))), - StreamItem::DataItem(item) => { - if item.is_range_complete() { - self.range_complete_observed = true; - None - } else if self.all_bins_emitted { - // Just drop the item because we will not emit anymore data. - // Could also at least gather some stats. - None - } else { - let ag = self.aggtor.as_mut().unwrap(); - if ag.ends_before(&item) { - None - } else if ag.starts_after(&item) { - self.left = Some(Ready(Some(Ok(StreamItem::DataItem(item))))); - self.cycle_current_bin(); - // TODO cycle_current_bin enqueues the bin, can I return here instead? + StreamItem::DataItem(item) => match item { + RangeCompletableItem::RangeComplete => Some(Ready(Some(Ok(StreamItem::DataItem( + RangeCompletableItem::RangeComplete, + ))))), + RangeCompletableItem::Data(item) => { + if self.all_bins_emitted { + // Just drop the item because we will not emit anymore data. + // Could also at least gather some stats. None } else { - let mut item = item; - ag.ingest(&mut item); - let item = item; - if ag.ends_after(&item) { - self.left = Some(Ready(Some(Ok(StreamItem::DataItem(item))))); + let ag = self.aggtor.as_mut().unwrap(); + if ag.ends_before(&item) { + None + } else if ag.starts_after(&item) { + self.left = + Some(Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(item)))))); self.cycle_current_bin(); + // TODO cycle_current_bin enqueues the bin, can I return here instead? + None + } else { + let mut item = item; + ag.ingest(&mut item); + let item = item; + if ag.ends_after(&item) { + self.left = + Some(Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(item)))))); + self.cycle_current_bin(); + } + // TODO cycle_current_bin enqueues the bin, can I return here instead? + None } - // TODO cycle_current_bin enqueues the bin, can I return here instead? - None } } - } + }, }, Ready(Some(Err(e))) => { self.errored = true; @@ -172,11 +176,11 @@ where impl Stream for IntoBinnedTDefaultStream where - S: Stream, Error>> + Unpin, + S: Stream>, Error>> + Unpin, I: AggregatableTdim2 + Unpin, I::Aggregator: Unpin, { - type Item = Result, Error>; + type Item = Result>, Error>; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; @@ -187,19 +191,14 @@ where self.completed = true; Ready(None) } else if let Some(item) = self.tmp_agg_results.pop_front() { - Ready(Some(Ok(StreamItem::DataItem(item)))) + Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))))) } else if self.range_complete_emitted { self.completed = true; Ready(None) } else if self.inp_completed && self.all_bins_emitted { self.range_complete_emitted = true; if self.range_complete_observed { - if let Some(item) = I::make_range_complete_item() { - Ready(Some(Ok(StreamItem::DataItem(item)))) - } else { - warn!("IntoBinnedTDefaultStream should emit RangeComplete but it doesn't have one"); - continue 'outer; - } + Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)))) } else { continue 'outer; } @@ -213,3 +212,37 @@ where } } } + +pub struct MinMaxAvgScalarBinBatchAgg {} + +impl AggregatorTdim2 for MinMaxAvgScalarBinBatchAgg { + type InputValue = MinMaxAvgScalarBinBatch; + + fn ends_before(&self, inp: &Self::InputValue) -> bool { + todo!() + } + + fn ends_after(&self, inp: &Self::InputValue) -> bool { + todo!() + } + + fn starts_after(&self, inp: &Self::InputValue) -> bool { + todo!() + } + + fn ingest(&mut self, inp: &mut Self::InputValue) { + todo!() + } + + fn result(self) -> Vec { + todo!() + } +} + +impl AggregatableTdim2 for MinMaxAvgScalarBinBatch { + type Aggregator = MinMaxAvgScalarBinBatchAgg; + + fn aggregator_new_static(ts1: u64, ts2: u64) -> Self::Aggregator { + todo!() + } +} diff --git a/disk/src/agg/binnedx.rs b/disk/src/agg/binnedx.rs index 3f3225f..bf0fa0d 100644 --- a/disk/src/agg/binnedx.rs +++ b/disk/src/agg/binnedx.rs @@ -1,5 +1,6 @@ use crate::agg::streams::StreamItem; use crate::agg::AggregatableXdim1Bin; +use crate::binned::RangeCompletableItem; use err::Error; use futures_core::Stream; use futures_util::StreamExt; @@ -8,20 +9,19 @@ use std::task::{Context, Poll}; pub trait IntoBinnedXBins1 where + Self: Stream>, Error>> + Unpin, I: AggregatableXdim1Bin, { type StreamOut; - fn into_binned_x_bins_1(self) -> Self::StreamOut - where - Self: Stream, Error>>; + fn into_binned_x_bins_1(self) -> Self::StreamOut; } -impl IntoBinnedXBins1 for T +impl IntoBinnedXBins1 for S where - T: Stream, Error>> + Unpin, + S: Stream>, Error>> + Unpin, I: AggregatableXdim1Bin, { - type StreamOut = IntoBinnedXBins1DefaultStream; + type StreamOut = IntoBinnedXBins1DefaultStream; fn into_binned_x_bins_1(self) -> Self::StreamOut { IntoBinnedXBins1DefaultStream { inp: self } @@ -30,7 +30,7 @@ where pub struct IntoBinnedXBins1DefaultStream where - S: Stream, Error>> + Unpin, + S: Stream>, Error>> + Unpin, I: AggregatableXdim1Bin, { inp: S, @@ -38,10 +38,10 @@ where impl Stream for IntoBinnedXBins1DefaultStream where - S: Stream, Error>> + Unpin, + S: Stream>, Error>> + Unpin, I: AggregatableXdim1Bin, { - type Item = Result, Error>; + type Item = Result>, Error>; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; @@ -49,7 +49,14 @@ where Ready(Some(Ok(k))) => match k { StreamItem::Log(item) => Ready(Some(Ok(StreamItem::Log(item)))), StreamItem::Stats(item) => Ready(Some(Ok(StreamItem::Stats(item)))), - StreamItem::DataItem(item) => Ready(Some(Ok(StreamItem::DataItem(item.into_agg())))), + StreamItem::DataItem(item) => match item { + RangeCompletableItem::RangeComplete => { + Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)))) + } + RangeCompletableItem::Data(item) => Ready(Some(Ok(StreamItem::DataItem( + RangeCompletableItem::Data(item.into_agg()), + )))), + }, }, Ready(Some(Err(e))) => Ready(Some(Err(e))), Ready(None) => Ready(None), diff --git a/disk/src/agg/eventbatch.rs b/disk/src/agg/eventbatch.rs index 291e33e..eab5a84 100644 --- a/disk/src/agg/eventbatch.rs +++ b/disk/src/agg/eventbatch.rs @@ -1,7 +1,11 @@ use crate::agg::binnedt::{AggregatableTdim, AggregatorTdim}; use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch; +use crate::agg::streams::StreamItem; use crate::agg::AggregatableXdim1Bin; +use crate::binned::{MakeBytesFrame, RangeCompletableItem}; +use crate::frame::makeframe::make_frame; use bytes::{BufMut, Bytes, BytesMut}; +use err::Error; use netpod::log::*; use serde::{Deserialize, Serialize}; use std::mem::size_of; @@ -230,3 +234,9 @@ impl AggregatorTdim for MinMaxAvgScalarEventBatchAggregator { vec![v] } } + +impl MakeBytesFrame for Result>, Error> { + fn make_bytes_frame(&self) -> Result { + Ok(make_frame(self)?.freeze()) + } +} diff --git a/disk/src/agg/scalarbinbatch.rs b/disk/src/agg/scalarbinbatch.rs index 1b1b208..9779167 100644 --- a/disk/src/agg/scalarbinbatch.rs +++ b/disk/src/agg/scalarbinbatch.rs @@ -1,5 +1,5 @@ use crate::agg::binnedt::{AggregatableTdim, AggregatorTdim}; -use crate::agg::streams::Bins; +use crate::agg::streams::{Bins, StreamItem}; use crate::agg::{AggregatableXdim1Bin, Fits, FitsInside}; use crate::binned::{MakeBytesFrame, RangeCompletableItem}; use crate::frame::makeframe::make_frame; @@ -302,7 +302,7 @@ impl AggregatorTdim for MinMaxAvgScalarBinBatchAggregator { } } -impl MakeBytesFrame for Result, Error> { +impl MakeBytesFrame for Result>, Error> { fn make_bytes_frame(&self) -> Result { Ok(make_frame(self)?.freeze()) } diff --git a/disk/src/agg/streams.rs b/disk/src/agg/streams.rs index be259a7..4ea2f6b 100644 --- a/disk/src/agg/streams.rs +++ b/disk/src/agg/streams.rs @@ -28,7 +28,7 @@ pub trait Collected { pub trait Collectable { type Collected: Collected; - fn append_to(&mut self, collected: &mut Self::Collected); + fn append_to(&self, collected: &mut Self::Collected); } pub trait ToJsonResult { diff --git a/disk/src/aggtest.rs b/disk/src/aggtest.rs index d22ada8..5b8d395 100644 --- a/disk/src/aggtest.rs +++ b/disk/src/aggtest.rs @@ -67,14 +67,9 @@ async fn agg_x_dim_0_inner() { ); let fut1 = IntoDim1F32Stream::into_dim_1_f32_stream(fut1); let fut1 = IntoBinnedXBins1::into_binned_x_bins_1(fut1); + let fut1 = IntoBinnedT::into_binned_t(fut1, BinnedRange::covering_range(range, bin_count).unwrap().unwrap()); let fut1 = fut1 - .into_binned_t(BinnedRange::covering_range(range, bin_count).unwrap().unwrap()) - .map(|k| { - if false { - trace!("after T binning {:?}", k.as_ref().unwrap()); - } - k - }) + //.into_binned_t(BinnedRange::covering_range(range, bin_count).unwrap().unwrap()) .for_each(|_k| ready(())); fut1.await; } diff --git a/disk/src/binned.rs b/disk/src/binned.rs index eb617ca..430cd51 100644 --- a/disk/src/binned.rs +++ b/disk/src/binned.rs @@ -1,11 +1,11 @@ use crate::agg::binnedt::{AggregatableTdim, AggregatorTdim, IntoBinnedT}; +use crate::agg::binnedt2::AggregatableTdim2; use crate::agg::eventbatch::MinMaxAvgScalarEventBatch; use crate::agg::scalarbinbatch::{MinMaxAvgScalarBinBatch, MinMaxAvgScalarBinBatchAggregator}; use crate::agg::streams::{Collectable, Collected, StreamItem, ToJsonResult}; use crate::agg::{AggregatableXdim1Bin, Fits, FitsInside}; use crate::binned::scalar::binned_stream; -use crate::binnedstream::{BinnedScalarStreamFromPreBinnedPatches, BinnedStream}; -use crate::cache::pbvfs::PreBinnedScalarItem; +use crate::binnedstream::{BinnedScalarStreamFromPreBinnedPatches, BoxedStream}; use crate::cache::{BinnedQuery, MergedFromRemotes}; use crate::channelconfig::{extract_matching_config_entry, read_local_config}; use crate::frame::makeframe::make_frame; @@ -32,7 +32,7 @@ use tokio::io::{AsyncRead, ReadBuf}; pub mod scalar; pub struct BinnedStreamRes { - pub binned_stream: BinnedStream, + pub binned_stream: BoxedStream>, Error>>, pub range: BinnedRange, } @@ -117,18 +117,29 @@ impl ToJsonResult for MinMaxAvgScalarBinBatchCollected { } } -impl MakeBytesFrame for Result>, Error> { - fn make_bytes_frame(&self) -> Result { - Ok(make_frame(self)?.freeze()) +impl ToJsonResult for MinMaxAvgScalarBinBatch { + type Output = MinMaxAvgScalarBinBatch; + + fn to_json_result(&self) -> Result { + err::todo(); + let ret = MinMaxAvgScalarBinBatch { + ts1s: self.ts1s.clone(), + ts2s: self.ts2s.clone(), + counts: self.counts.clone(), + mins: self.mins.clone(), + maxs: self.maxs.clone(), + avgs: self.avgs.clone(), + }; + Ok(ret) } } -type BinnedStreamBox = Pin> + Send>>; +type BinnedBytesStreamBox = Pin> + Send>>; pub async fn binned_bytes_for_http( node_config: &NodeConfigCached, query: &BinnedQuery, -) -> Result { +) -> Result { let channel_config = read_local_config(&query.channel(), &node_config.node).await?; let entry = extract_matching_config_entry(query.range(), &channel_config)?; info!("binned_bytes_for_http found config entry {:?}", entry); @@ -139,6 +150,8 @@ pub async fn binned_bytes_for_http( Ok(Box::pin(ret)) } AggKind::DimXBinsN(_) => { + // TODO pass a different stream kind here: + err::todo(); let res = binned_stream(node_config, query, BinnedStreamKindScalar::new()).await?; let ret = BinnedBytesForHttpStream::new(res.binned_stream); Ok(Box::pin(ret)) @@ -220,7 +233,7 @@ impl Serialize for IsoDateTime { } pub async fn collect_all( - stream: impl Stream, Error>> + Unpin, + stream: impl Stream>, Error>> + Unpin, bin_count_exp: u32, ) -> Result<::Collected, Error> where @@ -249,10 +262,13 @@ where Ok(item) => match item { StreamItem::Log(_) => {} StreamItem::Stats(_) => {} - StreamItem::DataItem(mut item) => { - item.append_to(&mut main_item); - i1 += 1; - } + StreamItem::DataItem(item) => match item { + RangeCompletableItem::RangeComplete => {} + RangeCompletableItem::Data(mut item) => { + item.append_to(&mut main_item); + i1 += 1; + } + }, }, Err(e) => { // TODO Need to use some flags to get good enough error message for remote user. @@ -286,23 +302,24 @@ pub async fn binned_json(node_config: &NodeConfigCached, query: &BinnedQuery) -> // TODO create the matching stream based on AggKind and ConfigEntry. let t = binned_stream(node_config, query, BinnedStreamKindScalar::new()).await?; + // TODO need to collect also timeout, number of missing expected bins, ... let collected = collect_all(t.binned_stream, t.range.count as u32).await?; - let ret = collected.to_json_result()?; + let ret = ToJsonResult::to_json_result(&collected)?; Ok(serde_json::to_value(ret)?) } -pub struct ReadPbv +pub struct ReadPbv where - PBI: PreBinnedItem, + T: ReadableFromFile, { buf: Vec, file: Option, - _mark: std::marker::PhantomData, + _mark: std::marker::PhantomData, } -impl ReadPbv +impl ReadPbv where - PBI: PreBinnedItem, + T: ReadableFromFile, { fn new(file: File) -> Self { Self { @@ -313,11 +330,11 @@ where } } -impl Future for ReadPbv +impl Future for ReadPbv where - PBI: PreBinnedItem, + T: ReadableFromFile, { - type Output = Result, Error>; + type Output = Result>, Error>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { use Poll::*; @@ -332,8 +349,8 @@ where self.buf.extend_from_slice(&mut buf); Pending } else { - match PBI::from_buf(&mut self.buf) { - Ok(item) => Ready(Ok(StreamItem::DataItem(item))), + match T::from_buf(&mut self.buf) { + Ok(item) => Ready(Ok(StreamItem::DataItem(RangeCompletableItem::Data(item)))), Err(e) => Ready(Err(e)), } } @@ -345,93 +362,61 @@ where } } -pub trait PreBinnedItem: Send + Serialize + DeserializeOwned + Unpin { - type BinnedStreamItem: AggregatableTdim + Unpin + Send; - fn into_binned_stream_item(self, fit_range: NanoRange) -> Option; - fn make_range_complete() -> Self; - fn read_pbv(file: File) -> Result, Error> - where - Self: Sized; - fn from_buf(buf: &[u8]) -> Result - where - Self: Sized; +pub trait ReadableFromFile: Sized { + fn read_from_file(file: File) -> Result, Error>; + // TODO should not need this: + fn from_buf(buf: &[u8]) -> Result; } -impl PreBinnedItem for PreBinnedScalarItem { - type BinnedStreamItem = BinnedScalarStreamItem; - - fn into_binned_stream_item(self, fit_range: NanoRange) -> Option { - match self { - Self::RangeComplete => Some(Self::BinnedStreamItem::RangeComplete), - Self::Batch(item) => match item.fits_inside(fit_range) { - Fits::Inside | Fits::PartlyGreater | Fits::PartlyLower | Fits::PartlyLowerAndGreater => { - Some(Self::BinnedStreamItem::Values(item)) - } - _ => None, - }, - } - } - - fn make_range_complete() -> Self { - Self::RangeComplete - } - - fn read_pbv(file: File) -> Result, Error> { +impl ReadableFromFile for MinMaxAvgScalarBinBatch { + fn read_from_file(file: File) -> Result, Error> { Ok(ReadPbv::new(file)) } - fn from_buf(buf: &[u8]) -> Result { let dec: MinMaxAvgScalarBinBatch = serde_cbor::from_slice(&buf)?; - Ok(Self::Batch(dec)) + Ok(dec) } } -pub trait XBinnedEventsStreamItem: - Send + Serialize + DeserializeOwned + Unpin + Collectable + Collected + AggregatableTdim -{ - fn make_range_complete() -> Self; +pub trait FilterFittingInside: Sized { + fn filter_fitting_inside(self, fit_range: NanoRange) -> Option; } -impl Collected for MinMaxAvgScalarEventBatchStreamItem { +impl FilterFittingInside for MinMaxAvgScalarBinBatch { + fn filter_fitting_inside(self, fit_range: NanoRange) -> Option { + match self.fits_inside(fit_range) { + Fits::Inside | Fits::PartlyGreater | Fits::PartlyLower | Fits::PartlyLowerAndGreater => Some(self), + _ => None, + } + } +} + +impl Collected for MinMaxAvgScalarEventBatch { // TODO for this case we don't have an expected number of events. Factor out into another trait? fn new(bin_count_exp: u32) -> Self { // TODO factor out the concept of RangeComplete into another trait layer: - Self::Values(MinMaxAvgScalarEventBatch::empty()) + MinMaxAvgScalarEventBatch::empty() } + fn timed_out(&mut self, k: bool) {} } -impl Collectable for MinMaxAvgScalarEventBatchStreamItem { - type Collected = MinMaxAvgScalarEventBatchStreamItem; - fn append_to(&mut self, collected: &mut Self::Collected) { - match self { - Self::RangeComplete => { - // TODO would be more nice to insert another type layer for RangeComplete concept. - panic!() - } - Self::Values(this) => match collected { - Self::RangeComplete => {} - Self::Values(coll) => { - coll.tss.append(&mut coll.tss); - coll.mins.append(&mut coll.mins); - coll.maxs.append(&mut coll.maxs); - coll.avgs.append(&mut coll.avgs); - } - }, - } - } -} +impl Collectable for MinMaxAvgScalarEventBatch { + type Collected = MinMaxAvgScalarEventBatch; -impl XBinnedEventsStreamItem for MinMaxAvgScalarEventBatchStreamItem { - fn make_range_complete() -> Self { - Self::RangeComplete + fn append_to(&self, collected: &mut Self::Collected) { + // TODO create separate traits for different concerns: + // Some occasion I want to just append. + // In other case, I need to collect also timeout flag, missing bin count and such. + collected.tss.extend_from_slice(&self.tss); + collected.mins.extend_from_slice(&self.mins); + collected.maxs.extend_from_slice(&self.maxs); + collected.avgs.extend_from_slice(&self.avgs); } } pub trait TBinned: Send + Serialize + DeserializeOwned + Unpin + Collectable + AggregatableTdim {} -impl TBinned for MinMaxAvgScalarBinBatchStreamItem {} - impl Collected for MinMaxAvgScalarBinBatch { fn new(bin_count_exp: u32) -> Self { MinMaxAvgScalarBinBatch::empty() @@ -441,21 +426,45 @@ impl Collected for MinMaxAvgScalarBinBatch { impl Collectable for MinMaxAvgScalarBinBatch { type Collected = MinMaxAvgScalarBinBatch; - fn append_to(&mut self, collected: &mut Self::Collected) { - collected.ts1s.append(&mut self.ts1s); - collected.ts2s.append(&mut self.ts2s); - collected.counts.append(&mut self.counts); - collected.mins.append(&mut self.mins); - collected.maxs.append(&mut self.maxs); - collected.avgs.append(&mut self.avgs); + fn append_to(&self, collected: &mut Self::Collected) { + collected.ts1s.extend_from_slice(&self.ts1s); + collected.ts2s.extend_from_slice(&self.ts2s); + collected.counts.extend_from_slice(&self.counts); + collected.mins.extend_from_slice(&self.mins); + collected.maxs.extend_from_slice(&self.maxs); + collected.avgs.extend_from_slice(&self.avgs); } } +pub trait XBinnedEvents: + Sized + Unpin + Send + Serialize + DeserializeOwned + Collectable + Collected + AggregatableTdim +{ +} + +impl XBinnedEvents for MinMaxAvgScalarEventBatch {} + +impl TBinnedBins for MinMaxAvgScalarBinBatch {} + +pub trait TBinnedBins: + Sized + + Unpin + + Send + + Serialize + + DeserializeOwned + + Collectable + + Collected + + ReadableFromFile + + FilterFittingInside + + AggregatableTdim2 +{ +} + pub trait BinnedStreamKind: Clone + Unpin + Send + Sync + 'static { - type BinnedStreamItem: MakeBytesFrame; - type BinnedStreamType: Stream + Send + 'static; - type PreBinnedItem: PreBinnedItem; - type XBinnedEvents; + type TBinnedStreamType: Stream>, Error>> + + Send + + 'static; + type XBinnedEvents: XBinnedEvents; + type TBinnedBins: TBinnedBins; fn new_binned_from_prebinned( &self, @@ -463,7 +472,7 @@ pub trait BinnedStreamKind: Clone + Unpin + Send + Sync + 'static { range: BinnedRange, pre_range: PreBinnedPatchRange, node_config: &NodeConfigCached, - ) -> Result; + ) -> Result; fn new_binned_from_merged( &self, @@ -471,14 +480,13 @@ pub trait BinnedStreamKind: Clone + Unpin + Send + Sync + 'static { perf_opts: PerfOpts, range: BinnedRange, node_config: &NodeConfigCached, - ) -> Result; - - fn pbv_handle_fut2_item(item: StreamItem) -> Option>; + ) -> Result; } #[derive(Clone)] pub struct BinnedStreamKindScalar {} +#[derive(Clone)] pub struct BinnedStreamKindWave {} impl BinnedStreamKindScalar { @@ -493,16 +501,17 @@ impl BinnedStreamKindWave { } } +#[derive(Debug, Serialize, Deserialize)] pub enum RangeCompletableItem { RangeComplete, Data(T), } impl BinnedStreamKind for BinnedStreamKindScalar { - type BinnedStreamItem = Result, Error>; - type BinnedStreamType = BinnedStream; - type PreBinnedItem = PreBinnedScalarItem; + // TODO is this really needed? + type TBinnedStreamType = BoxedStream>, Error>>; type XBinnedEvents = MinMaxAvgScalarEventBatch; + type TBinnedBins = MinMaxAvgScalarBinBatch; fn new_binned_from_prebinned( &self, @@ -510,7 +519,7 @@ impl BinnedStreamKind for BinnedStreamKindScalar { range: BinnedRange, pre_range: PreBinnedPatchRange, node_config: &NodeConfigCached, - ) -> Result { + ) -> Result { let s = BinnedScalarStreamFromPreBinnedPatches::new( PreBinnedPatchIterator::from_range(pre_range), query.channel().clone(), @@ -521,7 +530,7 @@ impl BinnedStreamKind for BinnedStreamKindScalar { query.disk_stats_every().clone(), self.clone(), )?; - Ok(BinnedStream::new(Box::pin(s))?) + Ok(BoxedStream::new(Box::pin(s))?) } fn new_binned_from_merged( @@ -530,37 +539,33 @@ impl BinnedStreamKind for BinnedStreamKindScalar { perf_opts: PerfOpts, range: BinnedRange, node_config: &NodeConfigCached, - ) -> Result { + ) -> Result { let s = MergedFromRemotes::new(evq, perf_opts, node_config.node_config.cluster.clone(), self.clone()); // TODO use the binned2 instead let s = crate::agg::binnedt::IntoBinnedT::into_binned_t(s, range); - let s = s.map(adapter_to_stream_item); - Ok(BinnedStream::new(Box::pin(s))?) - } - - fn pbv_handle_fut2_item(item: StreamItem) -> Option> { - // TODO make this code work in this context: - // Do I need more parameters here? - /*Ok(item) => match item { - StreamItem::Log(item) => Ready(Some(Ok(StreamItem::Log(item)))), - StreamItem::Stats(item) => Ready(Some(Ok(StreamItem::Stats(item)))), - StreamItem::DataItem(item) => match item { - PreBinnedScalarItem::RangeComplete => { - self.range_complete_observed = true; - None - } - PreBinnedScalarItem::Batch(batch) => { - self.values.ts1s.extend(batch.ts1s.iter()); - self.values.ts2s.extend(batch.ts2s.iter()); - self.values.counts.extend(batch.counts.iter()); - self.values.mins.extend(batch.mins.iter()); - self.values.maxs.extend(batch.maxs.iter()); - self.values.avgs.extend(batch.avgs.iter()); - StreamItem::DataItem(PreBinnedScalarItem::Batch(batch)) - } - }, - },*/ - err::todo(); - None + Ok(BoxedStream::new(Box::pin(s))?) } } + +// TODO this code is needed somewhere: +fn pbv_handle_fut2_item( + item: StreamItem>, +) -> Option>> { + // TODO make this code work in this context: + // Do I need more parameters here? + /*Ok(item) => match item { + StreamItem::DataItem(item) => match item { + PreBinnedScalarItem::Batch(batch) => { + self.values.ts1s.extend(batch.ts1s.iter()); + self.values.ts2s.extend(batch.ts2s.iter()); + self.values.counts.extend(batch.counts.iter()); + self.values.mins.extend(batch.mins.iter()); + self.values.maxs.extend(batch.maxs.iter()); + self.values.avgs.extend(batch.avgs.iter()); + StreamItem::DataItem(PreBinnedScalarItem::Batch(batch)) + } + }, + },*/ + err::todo(); + None +} diff --git a/disk/src/binned/scalar.rs b/disk/src/binned/scalar.rs index a66f90e..03241f3 100644 --- a/disk/src/binned/scalar.rs +++ b/disk/src/binned/scalar.rs @@ -1,7 +1,7 @@ use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch; use crate::agg::streams::StreamItem; use crate::binned::{BinnedStreamKind, BinnedStreamRes, RangeCompletableItem}; -use crate::binnedstream::BinnedStream; +use crate::binnedstream::BoxedStream; use crate::cache::BinnedQuery; use crate::raw::EventsQuery; use err::Error; @@ -9,13 +9,13 @@ use futures_core::Stream; use netpod::log::*; use netpod::{BinnedRange, NodeConfigCached, PerfOpts, PreBinnedPatchRange}; -pub async fn binned_stream( +pub async fn binned_stream( node_config: &NodeConfigCached, query: &BinnedQuery, - stream_kind: BK, -) -> Result::Item>, Error> + stream_kind: SK, +) -> Result, Error> where - BK: BinnedStreamKind, + SK: BinnedStreamKind, { if query.channel().backend != node_config.node.backend { let err = Error::with_msg(format!( @@ -40,8 +40,8 @@ where ); return Err(Error::with_msg(msg)); } - let s = BK::new_binned_from_prebinned(&stream_kind, query, range.clone(), pre_range, node_config)?; - let s = BinnedStream::new(Box::pin(s))?; + let s = SK::new_binned_from_prebinned(&stream_kind, query, range.clone(), pre_range, node_config)?; + let s = BoxedStream::new(Box::pin(s))?; let ret = BinnedStreamRes { binned_stream: s, range, @@ -59,8 +59,8 @@ where agg_kind: query.agg_kind().clone(), }; // TODO do I need to set up more transformations or binning to deliver the requested data? - let s = BK::new_binned_from_merged(&stream_kind, evq, perf_opts, range.clone(), node_config)?; - let s = BinnedStream::new(Box::pin(s))?; + let s = SK::new_binned_from_merged(&stream_kind, evq, perf_opts, range.clone(), node_config)?; + let s = BoxedStream::new(Box::pin(s))?; let ret = BinnedStreamRes { binned_stream: s, range, diff --git a/disk/src/binnedstream.rs b/disk/src/binnedstream.rs index a7b940a..601650e 100644 --- a/disk/src/binnedstream.rs +++ b/disk/src/binnedstream.rs @@ -1,5 +1,5 @@ use crate::agg::streams::StreamItem; -use crate::binned::{BinnedStreamKind, PreBinnedItem}; +use crate::binned::{BinnedStreamKind, RangeCompletableItem}; use crate::cache::pbvfs::PreBinnedScalarValueFetchedStream; use crate::cache::{CacheUsage, PreBinnedQuery}; use crate::frame::makeframe::FrameType; @@ -16,15 +16,10 @@ pub struct BinnedScalarStreamFromPreBinnedPatches where BK: BinnedStreamKind, { - //inp: Pin, Error>> + Send>>, inp: Pin< Box< - dyn Stream< - Item = Result< - StreamItem<<::PreBinnedItem as PreBinnedItem>::BinnedStreamItem>, - Error, - >, - > + Send, + dyn Stream::TBinnedBins>>, Error>> + + Send, >, >, stream_kind: BK, @@ -33,7 +28,7 @@ where impl BinnedScalarStreamFromPreBinnedPatches where BK: BinnedStreamKind, - Result::PreBinnedItem>, err::Error>: FrameType, + Result::TBinnedBins>>, Error>: FrameType, { pub fn new( patch_it: PreBinnedPatchIterator, @@ -87,12 +82,17 @@ where Ok(item) => match item { StreamItem::Log(item) => Some(Ok(StreamItem::Log(item))), StreamItem::Stats(item) => Some(Ok(StreamItem::Stats(item))), - StreamItem::DataItem(item) => { - match crate::binned::PreBinnedItem::into_binned_stream_item(item, fit_range) { - Some(item) => Some(Ok(StreamItem::DataItem(item))), - None => None, + StreamItem::DataItem(item) => match item { + RangeCompletableItem::RangeComplete => { + Some(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete))) } - } + RangeCompletableItem::Data(item) => { + match crate::binned::FilterFittingInside::filter_fitting_inside(item, fit_range) { + Some(item) => Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(item)))), + None => None, + } + } + }, }, Err(e) => Some(Err(e)), }; @@ -100,8 +100,8 @@ where } }); // TODO activate the T-binning via the bin-to-bin binning trait. - err::todo(); - //let inp = IntoBinnedT::into_binned_t(inp, range); + //err::todo(); + let inp = crate::agg::binnedt2::IntoBinnedT::into_binned_t(inp, range); Ok(Self { inp: Box::pin(inp), stream_kind, @@ -109,33 +109,33 @@ where } } -impl Stream for BinnedScalarStreamFromPreBinnedPatches +// TODO change name, type is generic now: +// Can I remove the whole type or keep for static check? +impl Stream for BinnedScalarStreamFromPreBinnedPatches where - BK: BinnedStreamKind, + SK: BinnedStreamKind, { - type Item = Result::PreBinnedItem as PreBinnedItem>::BinnedStreamItem>, Error>; + type Item = Result>, Error>; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - use Poll::*; - match self.inp.poll_next_unpin(cx) { - Ready(Some(item)) => Ready(Some(item)), - Ready(None) => Ready(None), - Pending => Pending, - } + self.inp.poll_next_unpin(cx) } } -pub struct BinnedStream { +pub struct BoxedStream { inp: Pin + Send>>, } -impl BinnedStream { - pub fn new(inp: Pin + Send>>) -> Result { - Ok(Self { inp }) +impl BoxedStream { + pub fn new(inp: T) -> Result + where + T: Stream + Send + 'static, + { + Ok(Self { inp: Box::pin(inp) }) } } -impl Stream for BinnedStream { +impl Stream for BoxedStream { type Item = I; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { diff --git a/disk/src/cache.rs b/disk/src/cache.rs index d3a7271..1ec494d 100644 --- a/disk/src/cache.rs +++ b/disk/src/cache.rs @@ -230,14 +230,14 @@ fn channel_from_params(params: &BTreeMap) -> Result( +pub fn pre_binned_bytes_for_http( node_config: &NodeConfigCached, query: &PreBinnedQuery, - stream_kind: BK, -) -> Result, Error> + stream_kind: SK, +) -> Result, Error> where - BK: BinnedStreamKind, - Result::PreBinnedItem>, err::Error>: FrameType, + SK: BinnedStreamKind, + Result>, err::Error>: FrameType, { if query.channel.backend != node_config.node.backend { let err = Error::with_msg(format!( @@ -508,13 +508,16 @@ impl CacheFileDesc { } } -pub async fn write_pb_cache_min_max_avg_scalar( - values: MinMaxAvgScalarBinBatch, +pub async fn write_pb_cache_min_max_avg_scalar( + values: T, patch: PreBinnedPatchCoord, agg_kind: AggKind, channel: Channel, node_config: NodeConfigCached, -) -> Result<(), Error> { +) -> Result<(), Error> +where + T: Serialize, +{ let cfd = CacheFileDesc { channel: channel.clone(), patch: patch.clone(), diff --git a/disk/src/cache/pbv.rs b/disk/src/cache/pbv.rs index 2cf10e4..9079932 100644 --- a/disk/src/cache/pbv.rs +++ b/disk/src/cache/pbv.rs @@ -1,8 +1,9 @@ use crate::agg::binnedt::IntoBinnedT; use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch; -use crate::agg::streams::StreamItem; -use crate::binned::{BinnedStreamKind, PreBinnedItem, RangeCompletableItem}; -use crate::cache::pbvfs::{PreBinnedScalarItem, PreBinnedScalarValueFetchedStream}; +use crate::agg::streams::{Collectable, Collected, StreamItem}; +use crate::binned::RangeCompletableItem::RangeComplete; +use crate::binned::{BinnedStreamKind, RangeCompletableItem}; +use crate::cache::pbvfs::PreBinnedScalarValueFetchedStream; use crate::cache::{CacheFileDesc, MergedFromRemotes, PreBinnedQuery}; use crate::frame::makeframe::{make_frame, FrameType}; use crate::raw::EventsQuery; @@ -35,8 +36,8 @@ pub fn pre_binned_value_byte_stream_new( stream_kind: SK, ) -> PreBinnedValueByteStream where - SK: BinnedStreamKind + Unpin, - Result::PreBinnedItem>, err::Error>: FrameType, + SK: BinnedStreamKind, + Result::TBinnedBins>>, err::Error>: FrameType, { let s1 = PreBinnedValueStream::new(query.clone(), node_config, stream_kind); let s2 = PreBinnedValueByteStreamInner { inp: s1 }; @@ -45,9 +46,8 @@ where impl Stream for PreBinnedValueByteStreamInner where - SK: BinnedStreamKind + Unpin, - Result::PreBinnedItem>, err::Error>: FrameType, - PreBinnedValueStream: Unpin, + SK: BinnedStreamKind, + Result::TBinnedBins>>, err::Error>: FrameType, { type Item = Result; @@ -55,7 +55,10 @@ where use Poll::*; match self.inp.poll_next_unpin(cx) { Ready(Some(item)) => { - match make_frame::::PreBinnedItem>, Error>>(&item) { + match make_frame::< + Result::TBinnedBins>>, err::Error>, + >(&item) + { Ok(buf) => Ready(Some(Ok(buf.freeze()))), Err(e) => Ready(Some(Err(e.into()))), } @@ -73,8 +76,18 @@ where query: PreBinnedQuery, node_config: NodeConfigCached, open_check_local_file: Option> + Send>>>, - fut2: - Option::PreBinnedItem>, Error>> + Send>>>, + fut2: Option< + Pin< + Box< + dyn Stream< + Item = Result< + StreamItem::TBinnedBins>>, + err::Error, + >, + > + Send, + >, + >, + >, read_from_cache: bool, cache_written: bool, data_complete: bool, @@ -83,10 +96,19 @@ where errored: bool, completed: bool, streamlog: Streamlog, - values: MinMaxAvgScalarBinBatch, + values: ::TBinnedBins, write_fut: Option> + Send>>>, read_cache_fut: Option< - Pin::PreBinnedItem>, Error>> + Send>>, + Pin< + Box< + dyn Future< + Output = Result< + StreamItem::TBinnedBins>>, + err::Error, + >, + > + Send, + >, + >, >, stream_kind: SK, } @@ -94,7 +116,7 @@ where impl PreBinnedValueStream where SK: BinnedStreamKind, - Result::PreBinnedItem>>, err::Error>: FrameType, + Result::TBinnedBins>>, err::Error>: FrameType, { pub fn new(query: PreBinnedQuery, node_config: &NodeConfigCached, stream_kind: SK) -> Self { Self { @@ -110,7 +132,8 @@ where errored: false, completed: false, streamlog: Streamlog::new(node_config.ix as u32), - values: MinMaxAvgScalarBinBatch::empty(), + // TODO refactor usage of parameter + values: <::TBinnedBins as Collected>::new(0), write_fut: None, read_cache_fut: None, stream_kind, @@ -244,9 +267,9 @@ where impl Stream for PreBinnedValueStream where SK: BinnedStreamKind + Unpin, - Result::PreBinnedItem>>, err::Error>: FrameType, + Result::TBinnedBins>>, err::Error>: FrameType, { - type Item = Result::PreBinnedItem>>, Error>; + type Item = Result::TBinnedBins>>, err::Error>; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; @@ -301,7 +324,7 @@ where if self.cache_written { if self.range_complete_observed { self.range_complete_emitted = true; - let item = <::PreBinnedItem as PreBinnedItem>::make_range_complete(); + let item = RangeCompletableItem::RangeComplete; Ready(Some(Ok(StreamItem::DataItem(item)))) } else { self.completed = true; @@ -316,10 +339,16 @@ where let msg = format!( "write cache file query: {:?} bin count: {}", self.query.patch, - self.values.ts1s.len() + //self.values.ts1s.len() + // TODO create trait to extract number of bins from item: + 0 ); self.streamlog.append(Level::INFO, msg); - let values = std::mem::replace(&mut self.values, MinMaxAvgScalarBinBatch::empty()); + let values = std::mem::replace( + &mut self.values, + // Do not use expectation on the number of bins here: + <::TBinnedBins as Collected>::new(0), + ); let fut = super::write_pb_cache_min_max_avg_scalar( values, self.query.patch.clone(), @@ -339,9 +368,22 @@ where } else if let Some(fut) = self.fut2.as_mut() { match fut.poll_next_unpin(cx) { Ready(Some(k)) => match k { - Ok(item) => match SK::pbv_handle_fut2_item(item) { - None => continue 'outer, - Some(item) => Ready(Some(Ok(item))), + Ok(item) => match item { + StreamItem::Log(item) => Ready(Some(Ok(StreamItem::Log(item)))), + StreamItem::Stats(item) => Ready(Some(Ok(StreamItem::Stats(item)))), + StreamItem::DataItem(item) => match item { + RangeCompletableItem::RangeComplete => { + self.range_complete_observed = true; + continue 'outer; + } + RangeCompletableItem::Data(item) => { + // TODO need trait Appendable which simply appends to the same type, so that I can + // write later the whole batch of numbers in one go. + err::todo(); + //item.append_to(&mut self.values); + Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))))) + } + }, }, Err(e) => { self.errored = true; @@ -361,7 +403,8 @@ where match item { Ok(file) => { self.read_from_cache = true; - let fut = ::PreBinnedItem::read_pbv(file)?; + use crate::binned::ReadableFromFile; + let fut = <::TBinnedBins as crate::binned::ReadableFromFile>::read_from_file(file)?; self.read_cache_fut = Some(Box::pin(fut)); continue 'outer; } diff --git a/disk/src/cache/pbvfs.rs b/disk/src/cache/pbvfs.rs index 2ec9448..6521e1a 100644 --- a/disk/src/cache/pbvfs.rs +++ b/disk/src/cache/pbvfs.rs @@ -1,6 +1,6 @@ use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch; use crate::agg::streams::StreamItem; -use crate::binned::BinnedStreamKind; +use crate::binned::{BinnedStreamKind, RangeCompletableItem}; use crate::cache::{node_ix_for_patch, HttpBodyAsAsyncRead, PreBinnedQuery}; use crate::frame::inmem::InMemoryFrameAsyncReadStream; use crate::frame::makeframe::{decode_frame, FrameType}; @@ -52,18 +52,13 @@ where } } -#[derive(Debug, Serialize, Deserialize)] -pub enum PreBinnedScalarItem { - Batch(MinMaxAvgScalarBinBatch), - RangeComplete, -} - -impl Stream for PreBinnedScalarValueFetchedStream +// TODO change name, is now generic: +impl Stream for PreBinnedScalarValueFetchedStream where - BK: BinnedStreamKind, - Result::PreBinnedItem>, err::Error>: FrameType, + SK: BinnedStreamKind, + Result>, err::Error>: FrameType, { - type Item = Result, Error>; + type Item = Result>, Error>; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; @@ -80,7 +75,7 @@ where StreamItem::Log(item) => Ready(Some(Ok(StreamItem::Log(item)))), StreamItem::Stats(item) => Ready(Some(Ok(StreamItem::Stats(item)))), StreamItem::DataItem(item) => { - match decode_frame::::PreBinnedItem>, Error>>( + match decode_frame::>, Error>>( &item, ) { Ok(Ok(item)) => Ready(Some(Ok(item))), diff --git a/disk/src/eventblobs.rs b/disk/src/eventblobs.rs index 72d04a4..d4db38b 100644 --- a/disk/src/eventblobs.rs +++ b/disk/src/eventblobs.rs @@ -1,6 +1,7 @@ use crate::agg::streams::StreamItem; +use crate::binned::RangeCompletableItem; use crate::dataopen::{open_files, OpenedFile}; -use crate::eventchunker::{EventChunker, EventChunkerConf}; +use crate::eventchunker::{EventChunker, EventChunkerConf, EventFull}; use crate::file_content_stream; use crate::streamlog::LogItem; use err::Error; @@ -56,7 +57,7 @@ impl EventBlobsComplete { } impl Stream for EventBlobsComplete { - type Item = Result, Error>; + type Item = Result>, Error>; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; diff --git a/disk/src/frame/makeframe.rs b/disk/src/frame/makeframe.rs index 3ffe1bf..6a82c4d 100644 --- a/disk/src/frame/makeframe.rs +++ b/disk/src/frame/makeframe.rs @@ -1,6 +1,7 @@ use crate::agg::eventbatch::MinMaxAvgScalarEventBatch; +use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch; use crate::agg::streams::StreamItem; -use crate::cache::pbvfs::PreBinnedScalarItem; +use crate::binned::RangeCompletableItem; use crate::frame::inmem::InMemoryFrame; use crate::raw::EventQueryJsonStringFrame; use bytes::{BufMut, BytesMut}; @@ -19,20 +20,12 @@ impl FrameType for EventQueryJsonStringFrame { const FRAME_TYPE_ID: u32 = 0x03; } -impl FrameType for Result, Error> { - const FRAME_TYPE_ID: u32 = 0x06; +impl FrameType for Result>, Error> { + const FRAME_TYPE_ID: u32 = 0x10; } -impl FrameType for Result { - const FRAME_TYPE_ID: u32 = 0x07; -} - -impl FrameType for Result, Error> { - const FRAME_TYPE_ID: u32 = 0x08; -} - -impl FrameType for Result, Error> { - const FRAME_TYPE_ID: u32 = 0x09; +impl FrameType for Result>, Error> { + const FRAME_TYPE_ID: u32 = 0x11; } pub fn make_frame(item: &FT) -> Result diff --git a/disk/src/merge.rs b/disk/src/merge.rs index 4f70927..215f6da 100644 --- a/disk/src/merge.rs +++ b/disk/src/merge.rs @@ -1,7 +1,7 @@ use crate::agg::binnedt::AggregatableTdim; use crate::agg::eventbatch::MinMaxAvgScalarEventBatch; use crate::agg::streams::{Collectable, Collected, StatsItem, StreamItem}; -use crate::binned::{BinnedStreamKind, XBinnedEventsStreamItem}; +use crate::binned::{BinnedStreamKind, RangeCompletableItem}; use crate::streamlog::LogItem; use err::Error; use futures_core::Stream; @@ -14,7 +14,7 @@ use std::task::{Context, Poll}; pub struct MergedMinMaxAvgScalarStream where - S: Stream::XBinnedEvents>, Error>> + Unpin, + S: Stream>, Error>> + Unpin, SK: BinnedStreamKind, { inps: Vec, @@ -35,7 +35,7 @@ where impl MergedMinMaxAvgScalarStream where - S: Stream::XBinnedEvents>, Error>> + Unpin, + S: Stream>, Error>> + Unpin, SK: BinnedStreamKind, { pub fn new(inps: Vec) -> Self { @@ -80,9 +80,8 @@ where } continue 'l1; } - StreamItem::DataItem(item) => { - // TODO factor out the concept of RangeComplete into another trait layer. - if item.is_range_complete() { + StreamItem::DataItem(item) => match item { + RangeCompletableItem::RangeComplete => { self.range_complete_observed[i1] = true; let d = self.range_complete_observed.iter().filter(|&&k| k).count(); if d == self.range_complete_observed.len() { @@ -92,11 +91,12 @@ where trace!("MergedStream range_complete d {}", d); } continue 'l1; - } else { + } + RangeCompletableItem::Data(item) => { self.ixs[i1] = 0; self.current[i1] = MergedCurVal::Val(item); } - } + }, }, Ready(Some(Err(e))) => { // TODO emit this error, consider this stream as done, anything more to do here? @@ -124,12 +124,13 @@ where } } +// TODO change name, it is generic now: impl Stream for MergedMinMaxAvgScalarStream where - S: Stream::XBinnedEvents>, Error>> + Unpin, + S: Stream::XBinnedEvents>>, Error>> + Unpin, SK: BinnedStreamKind, { - type Item = Result::XBinnedEvents>, Error>; + type Item = Result::XBinnedEvents>>, Error>; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; @@ -150,9 +151,7 @@ where Ready(None) } else { self.range_complete_observed_all_emitted = true; - Ready(Some(Ok(StreamItem::DataItem( - <::XBinnedEvents as XBinnedEventsStreamItem>::make_range_complete(), - )))) + Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)))) } } else { self.completed = true; diff --git a/disk/src/raw/conn.rs b/disk/src/raw/conn.rs index 8da848f..c85cb10 100644 --- a/disk/src/raw/conn.rs +++ b/disk/src/raw/conn.rs @@ -1,5 +1,6 @@ use crate::agg::binnedx::IntoBinnedXBins1; use crate::agg::eventbatch::MinMaxAvgScalarEventBatch; +use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch; use crate::agg::streams::StreamItem; use crate::agg::IntoDim1F32Stream; use crate::binned::{BinnedStreamKind, BinnedStreamKindScalar, RangeCompletableItem}; @@ -56,7 +57,8 @@ async fn events_conn_handler_inner( Ok(_) => (), Err(mut ce) => { // TODO is it guaranteed to be compatible to serialize this way? - let buf = make_frame::, Error>>(&Err(ce.err))?; + let buf = + make_frame::>, Error>>(&Err(ce.err))?; match ce.netout.write_all(&buf).await { Ok(_) => (), Err(e) => return Err(e)?,