diff --git a/disk/src/agg.rs b/disk/src/agg.rs index cfc3a0e..95889ee 100644 --- a/disk/src/agg.rs +++ b/disk/src/agg.rs @@ -4,9 +4,9 @@ Aggregation and binning support. use super::eventchunker::EventFull; use crate::agg::binnedt::AggregatableTdim; -use crate::agg::eventbatch::{MinMaxAvgScalarEventBatch, MinMaxAvgScalarEventBatchStreamItem}; +use crate::agg::eventbatch::MinMaxAvgScalarEventBatch; use crate::agg::streams::StreamItem; -use crate::eventchunker::EventChunkerItem; +use crate::binned::RangeCompletableItem; use bytes::BytesMut; use err::Error; use futures_core::Stream; @@ -19,6 +19,7 @@ use std::task::{Context, Poll}; use std::time::{Duration, Instant}; pub mod binnedt; +pub mod binnedt2; pub mod binnedx; pub mod eventbatch; pub mod scalarbinbatch; @@ -502,17 +503,11 @@ impl Dim1F32Stream { } } -#[derive(Debug)] -pub enum Dim1F32StreamItem { - Values(ValuesDim1), - RangeComplete, -} - impl Stream for Dim1F32Stream where - S: Stream, Error>> + Unpin, + S: Stream>, Error>> + Unpin, { - type Item = Result, Error>; + type Item = Result>, Error>; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; @@ -528,19 +523,20 @@ where let inst1 = Instant::now(); let u = match k { StreamItem::DataItem(item) => match item { - EventChunkerItem::Events(events) => match self.process_event_data(&events) { - Ok(k) => { - let ret = Dim1F32StreamItem::Values(k); - Ready(Some(Ok(StreamItem::DataItem(ret)))) + RangeCompletableItem::RangeComplete => { + Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)))) + } + RangeCompletableItem::Data(item) => match self.process_event_data(&item) { + Ok(item) => { + let ret = RangeCompletableItem::Data(item); + let ret = StreamItem::DataItem(ret); + Ready(Some(Ok(ret))) } Err(e) => { self.errored = true; Ready(Some(Err(e))) } }, - EventChunkerItem::RangeComplete => { - Ready(Some(Ok(StreamItem::DataItem(Dim1F32StreamItem::RangeComplete)))) - } }, StreamItem::Log(item) => Ready(Some(Ok(StreamItem::Log(item)))), StreamItem::Stats(item) => Ready(Some(Ok(StreamItem::Stats(item)))), @@ -566,25 +562,14 @@ where pub trait IntoDim1F32Stream { fn into_dim_1_f32_stream(self) -> Dim1F32Stream where - Self: Stream, Error>> + Sized; + Self: Stream>, Error>> + Sized; } impl IntoDim1F32Stream for T where - T: Stream, Error>>, + T: Stream>, Error>>, { fn into_dim_1_f32_stream(self) -> Dim1F32Stream { Dim1F32Stream::new(self) } } - -impl AggregatableXdim1Bin for Dim1F32StreamItem { - type Output = MinMaxAvgScalarEventBatchStreamItem; - - fn into_agg(self) -> Self::Output { - match self { - Dim1F32StreamItem::Values(vals) => MinMaxAvgScalarEventBatchStreamItem::Values(vals.into_agg()), - Dim1F32StreamItem::RangeComplete => MinMaxAvgScalarEventBatchStreamItem::RangeComplete, - } - } -} diff --git a/disk/src/agg/binnedt.rs b/disk/src/agg/binnedt.rs index 34be794..4d61b1a 100644 --- a/disk/src/agg/binnedt.rs +++ b/disk/src/agg/binnedt.rs @@ -23,8 +23,6 @@ pub trait AggregatableTdim: Sized { type Output: AggregatableXdim1Bin + AggregatableTdim; type Aggregator: AggregatorTdim; fn aggregator_new_static(ts1: u64, ts2: u64) -> Self::Aggregator; - fn is_range_complete(&self) -> bool; - fn make_range_complete_item() -> Option; } pub trait IntoBinnedT { diff --git a/disk/src/agg/binnedt2.rs b/disk/src/agg/binnedt2.rs new file mode 100644 index 0000000..2ed6fd3 --- /dev/null +++ b/disk/src/agg/binnedt2.rs @@ -0,0 +1,215 @@ +use crate::agg::streams::StreamItem; +use crate::agg::AggregatableXdim1Bin; +use err::Error; +use futures_core::Stream; +use futures_util::StreamExt; +use netpod::log::*; +use netpod::BinnedRange; +use std::collections::VecDeque; +use std::pin::Pin; +use std::task::{Context, Poll}; + +pub trait AggregatorTdim2: Sized + Unpin { + type InputValue; + fn ends_before(&self, inp: &Self::InputValue) -> bool; + fn ends_after(&self, inp: &Self::InputValue) -> bool; + fn starts_after(&self, inp: &Self::InputValue) -> bool; + fn ingest(&mut self, inp: &mut Self::InputValue); + fn result(self) -> Vec; +} + +pub trait AggregatableTdim2: Sized { + type Aggregator: AggregatorTdim2; + fn aggregator_new_static(ts1: u64, ts2: u64) -> Self::Aggregator; + fn is_range_complete(&self) -> bool; + fn make_range_complete_item() -> Option; +} + +pub trait IntoBinnedT { + type StreamOut: Stream; + fn into_binned_t(self, spec: BinnedRange) -> Self::StreamOut; +} + +impl IntoBinnedT for S +where + S: Stream, Error>> + Unpin, + I: AggregatableTdim2 + Unpin, + I::Aggregator: Unpin, +{ + type StreamOut = IntoBinnedTDefaultStream; + + fn into_binned_t(self, spec: BinnedRange) -> Self::StreamOut { + IntoBinnedTDefaultStream::new(self, spec) + } +} + +pub struct IntoBinnedTDefaultStream +where + S: Stream, Error>>, + I: AggregatableTdim2, +{ + inp: S, + aggtor: Option, + spec: BinnedRange, + curbin: u32, + inp_completed: bool, + all_bins_emitted: bool, + range_complete_observed: bool, + range_complete_emitted: bool, + left: Option, Error>>>>, + errored: bool, + completed: bool, + tmp_agg_results: VecDeque, +} + +impl IntoBinnedTDefaultStream +where + S: Stream, Error>> + Unpin, + I: AggregatableTdim2, +{ + pub fn new(inp: S, spec: BinnedRange) -> Self { + let range = spec.get_range(0); + Self { + inp, + aggtor: Some(I::aggregator_new_static(range.beg, range.end)), + spec, + curbin: 0, + inp_completed: false, + all_bins_emitted: false, + range_complete_observed: false, + range_complete_emitted: false, + left: None, + errored: false, + completed: false, + tmp_agg_results: VecDeque::new(), + } + } + + fn cur(&mut self, cx: &mut Context) -> Poll, Error>>> { + if let Some(cur) = self.left.take() { + cur + } else if self.inp_completed { + Poll::Ready(None) + } else { + let inp_poll_span = span!(Level::TRACE, "into_t_inp_poll"); + inp_poll_span.in_scope(|| self.inp.poll_next_unpin(cx)) + } + } + + fn cycle_current_bin(&mut self) { + self.curbin += 1; + let range = self.spec.get_range(self.curbin); + let ret = self + .aggtor + .replace(I::aggregator_new_static(range.beg, range.end)) + // TODO handle None case, or remove Option if Agg is always present + .unwrap() + .result(); + self.tmp_agg_results = ret.into(); + if self.curbin >= self.spec.count as u32 { + self.all_bins_emitted = true; + } + } + + fn handle( + &mut self, + cur: Poll, Error>>>, + ) -> Option, Error>>>> { + use Poll::*; + match cur { + Ready(Some(Ok(item))) => match item { + StreamItem::Log(item) => Some(Ready(Some(Ok(StreamItem::Log(item))))), + StreamItem::Stats(item) => Some(Ready(Some(Ok(StreamItem::Stats(item))))), + StreamItem::DataItem(item) => { + if item.is_range_complete() { + self.range_complete_observed = true; + None + } else if self.all_bins_emitted { + // Just drop the item because we will not emit anymore data. + // Could also at least gather some stats. + None + } else { + let ag = self.aggtor.as_mut().unwrap(); + if ag.ends_before(&item) { + None + } else if ag.starts_after(&item) { + self.left = Some(Ready(Some(Ok(StreamItem::DataItem(item))))); + self.cycle_current_bin(); + // TODO cycle_current_bin enqueues the bin, can I return here instead? + None + } else { + let mut item = item; + ag.ingest(&mut item); + let item = item; + if ag.ends_after(&item) { + self.left = Some(Ready(Some(Ok(StreamItem::DataItem(item))))); + self.cycle_current_bin(); + } + // TODO cycle_current_bin enqueues the bin, can I return here instead? + None + } + } + } + }, + Ready(Some(Err(e))) => { + self.errored = true; + Some(Ready(Some(Err(e)))) + } + Ready(None) => { + self.inp_completed = true; + if self.all_bins_emitted { + None + } else { + self.cycle_current_bin(); + // TODO cycle_current_bin enqueues the bin, can I return here instead? + None + } + } + Pending => Some(Pending), + } + } +} + +impl Stream for IntoBinnedTDefaultStream +where + S: Stream, Error>> + Unpin, + I: AggregatableTdim2 + Unpin, + I::Aggregator: Unpin, +{ + type Item = Result, Error>; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + use Poll::*; + 'outer: loop { + break if self.completed { + panic!("IntoBinnedTDefaultStream poll_next on completed"); + } else if self.errored { + self.completed = true; + Ready(None) + } else if let Some(item) = self.tmp_agg_results.pop_front() { + Ready(Some(Ok(StreamItem::DataItem(item)))) + } else if self.range_complete_emitted { + self.completed = true; + Ready(None) + } else if self.inp_completed && self.all_bins_emitted { + self.range_complete_emitted = true; + if self.range_complete_observed { + if let Some(item) = I::make_range_complete_item() { + Ready(Some(Ok(StreamItem::DataItem(item)))) + } else { + warn!("IntoBinnedTDefaultStream should emit RangeComplete but it doesn't have one"); + continue 'outer; + } + } else { + continue 'outer; + } + } else { + let cur = self.cur(cx); + match self.handle(cur) { + Some(item) => item, + None => continue 'outer, + } + }; + } + } +} diff --git a/disk/src/agg/eventbatch.rs b/disk/src/agg/eventbatch.rs index d6930b8..291e33e 100644 --- a/disk/src/agg/eventbatch.rs +++ b/disk/src/agg/eventbatch.rs @@ -1,5 +1,5 @@ use crate::agg::binnedt::{AggregatableTdim, AggregatorTdim}; -use crate::agg::scalarbinbatch::{MinMaxAvgScalarBinBatch, MinMaxAvgScalarBinBatchStreamItem}; +use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch; use crate::agg::AggregatableXdim1Bin; use bytes::{BufMut, Bytes, BytesMut}; use netpod::log::*; @@ -110,14 +110,6 @@ impl AggregatableTdim for MinMaxAvgScalarEventBatch { fn aggregator_new_static(ts1: u64, ts2: u64) -> Self::Aggregator { MinMaxAvgScalarEventBatchAggregator::new(ts1, ts2) } - - fn is_range_complete(&self) -> bool { - false - } - - fn make_range_complete_item() -> Option { - None - } } impl MinMaxAvgScalarEventBatch { @@ -238,93 +230,3 @@ impl AggregatorTdim for MinMaxAvgScalarEventBatchAggregator { vec![v] } } - -#[derive(Debug, Serialize, Deserialize)] -pub enum MinMaxAvgScalarEventBatchStreamItem { - Values(MinMaxAvgScalarEventBatch), - RangeComplete, -} - -impl AggregatableXdim1Bin for MinMaxAvgScalarEventBatchStreamItem { - type Output = MinMaxAvgScalarEventBatchStreamItem; - - fn into_agg(self) -> Self::Output { - self - } -} - -impl AggregatableTdim for MinMaxAvgScalarEventBatchStreamItem { - type Output = MinMaxAvgScalarBinBatchStreamItem; - type Aggregator = MinMaxAvgScalarEventBatchStreamItemAggregator; - - fn aggregator_new_static(ts1: u64, ts2: u64) -> Self::Aggregator { - //::Aggregator::new(ts1, ts2) - Self::Aggregator::new(ts1, ts2) - } - - fn is_range_complete(&self) -> bool { - if let MinMaxAvgScalarEventBatchStreamItem::RangeComplete = self { - true - } else { - false - } - } - - fn make_range_complete_item() -> Option { - Some(MinMaxAvgScalarEventBatchStreamItem::RangeComplete) - } -} - -pub struct MinMaxAvgScalarEventBatchStreamItemAggregator { - agg: MinMaxAvgScalarEventBatchAggregator, -} - -impl MinMaxAvgScalarEventBatchStreamItemAggregator { - pub fn new(ts1: u64, ts2: u64) -> Self { - let agg = ::aggregator_new_static(ts1, ts2); - Self { agg } - } -} - -impl AggregatorTdim for MinMaxAvgScalarEventBatchStreamItemAggregator { - type InputValue = MinMaxAvgScalarEventBatchStreamItem; - type OutputValue = MinMaxAvgScalarBinBatchStreamItem; - - fn ends_before(&self, inp: &Self::InputValue) -> bool { - match inp { - MinMaxAvgScalarEventBatchStreamItem::Values(vals) => self.agg.ends_before(vals), - _ => false, - } - } - - fn ends_after(&self, inp: &Self::InputValue) -> bool { - match inp { - MinMaxAvgScalarEventBatchStreamItem::Values(vals) => self.agg.ends_after(vals), - _ => false, - } - } - - fn starts_after(&self, inp: &Self::InputValue) -> bool { - match inp { - MinMaxAvgScalarEventBatchStreamItem::Values(vals) => self.agg.starts_after(vals), - _ => false, - } - } - - fn ingest(&mut self, inp: &mut Self::InputValue) { - match inp { - MinMaxAvgScalarEventBatchStreamItem::Values(vals) => self.agg.ingest(vals), - MinMaxAvgScalarEventBatchStreamItem::RangeComplete => panic!(), - } - } - - fn result(self) -> Vec { - let ret: Vec<_> = self - .agg - .result() - .into_iter() - .map(MinMaxAvgScalarBinBatchStreamItem::Values) - .collect(); - ret - } -} diff --git a/disk/src/agg/scalarbinbatch.rs b/disk/src/agg/scalarbinbatch.rs index 976385a..1b1b208 100644 --- a/disk/src/agg/scalarbinbatch.rs +++ b/disk/src/agg/scalarbinbatch.rs @@ -1,7 +1,7 @@ use crate::agg::binnedt::{AggregatableTdim, AggregatorTdim}; use crate::agg::streams::Bins; use crate::agg::{AggregatableXdim1Bin, Fits, FitsInside}; -use crate::binned::MakeBytesFrame; +use crate::binned::{MakeBytesFrame, RangeCompletableItem}; use crate::frame::makeframe::make_frame; use bytes::{BufMut, Bytes, BytesMut}; use err::Error; @@ -199,14 +199,6 @@ impl AggregatableTdim for MinMaxAvgScalarBinBatch { fn aggregator_new_static(ts1: u64, ts2: u64) -> Self::Aggregator { MinMaxAvgScalarBinBatchAggregator::new(ts1, ts2) } - - fn is_range_complete(&self) -> bool { - false - } - - fn make_range_complete_item() -> Option { - None - } } impl Bins for MinMaxAvgScalarBinBatch { @@ -310,97 +302,8 @@ impl AggregatorTdim for MinMaxAvgScalarBinBatchAggregator { } } -#[derive(Debug, Serialize, Deserialize)] -pub enum MinMaxAvgScalarBinBatchStreamItem { - Values(MinMaxAvgScalarBinBatch), - RangeComplete, -} - -impl AggregatableTdim for MinMaxAvgScalarBinBatchStreamItem { - type Output = MinMaxAvgScalarBinBatchStreamItem; - type Aggregator = MinMaxAvgScalarBinBatchStreamItemAggregator; - - fn aggregator_new_static(ts1: u64, ts2: u64) -> Self::Aggregator { - Self::Aggregator::new(ts1, ts2) - } - - fn is_range_complete(&self) -> bool { - if let MinMaxAvgScalarBinBatchStreamItem::RangeComplete = self { - true - } else { - false - } - } - - fn make_range_complete_item() -> Option { - Some(MinMaxAvgScalarBinBatchStreamItem::RangeComplete) - } -} - -impl AggregatableXdim1Bin for MinMaxAvgScalarBinBatchStreamItem { - type Output = MinMaxAvgScalarBinBatchStreamItem; - - fn into_agg(self) -> Self::Output { - self - } -} - -impl MakeBytesFrame for Result { +impl MakeBytesFrame for Result, Error> { fn make_bytes_frame(&self) -> Result { Ok(make_frame(self)?.freeze()) } } - -pub struct MinMaxAvgScalarBinBatchStreamItemAggregator { - agg: MinMaxAvgScalarBinBatchAggregator, -} - -impl MinMaxAvgScalarBinBatchStreamItemAggregator { - pub fn new(ts1: u64, ts2: u64) -> Self { - let agg = ::aggregator_new_static(ts1, ts2); - Self { agg } - } -} - -impl AggregatorTdim for MinMaxAvgScalarBinBatchStreamItemAggregator { - type InputValue = MinMaxAvgScalarBinBatchStreamItem; - type OutputValue = MinMaxAvgScalarBinBatchStreamItem; - - fn ends_before(&self, inp: &Self::InputValue) -> bool { - match inp { - MinMaxAvgScalarBinBatchStreamItem::Values(vals) => self.agg.ends_before(vals), - _ => false, - } - } - - fn ends_after(&self, inp: &Self::InputValue) -> bool { - match inp { - MinMaxAvgScalarBinBatchStreamItem::Values(vals) => self.agg.ends_after(vals), - _ => false, - } - } - - fn starts_after(&self, inp: &Self::InputValue) -> bool { - match inp { - MinMaxAvgScalarBinBatchStreamItem::Values(vals) => self.agg.starts_after(vals), - _ => false, - } - } - - fn ingest(&mut self, inp: &mut Self::InputValue) { - match inp { - MinMaxAvgScalarBinBatchStreamItem::Values(vals) => self.agg.ingest(vals), - MinMaxAvgScalarBinBatchStreamItem::RangeComplete => panic!(), - } - } - - fn result(self) -> Vec { - let ret: Vec<_> = self - .agg - .result() - .into_iter() - .map(MinMaxAvgScalarBinBatchStreamItem::Values) - .collect(); - ret - } -} diff --git a/disk/src/agg/streams.rs b/disk/src/agg/streams.rs index 06c7701..be259a7 100644 --- a/disk/src/agg/streams.rs +++ b/disk/src/agg/streams.rs @@ -129,20 +129,4 @@ where fn aggregator_new_static(ts1: u64, ts2: u64) -> Self::Aggregator { Self::Aggregator::new(ts1, ts2) } - - fn is_range_complete(&self) -> bool { - match self { - Self::DataItem(item) => item.is_range_complete(), - Self::Log(_) => false, - Self::Stats(_) => false, - } - } - - // TODO refactor: is this necessary to have on the trait? - fn make_range_complete_item() -> Option { - match ::make_range_complete_item() { - Some(k) => Some(Self::DataItem(k)), - None => None, - } - } } diff --git a/disk/src/binned.rs b/disk/src/binned.rs index 60271df..eb617ca 100644 --- a/disk/src/binned.rs +++ b/disk/src/binned.rs @@ -1,8 +1,9 @@ use crate::agg::binnedt::{AggregatableTdim, AggregatorTdim, IntoBinnedT}; +use crate::agg::eventbatch::MinMaxAvgScalarEventBatch; use crate::agg::scalarbinbatch::{MinMaxAvgScalarBinBatch, MinMaxAvgScalarBinBatchAggregator}; use crate::agg::streams::{Collectable, Collected, StreamItem, ToJsonResult}; -use crate::agg::{AggregatableXdim1Bin, FitsInside}; -use crate::binned::scalar::{adapter_to_stream_item, binned_stream}; +use crate::agg::{AggregatableXdim1Bin, Fits, FitsInside}; +use crate::binned::scalar::binned_stream; use crate::binnedstream::{BinnedScalarStreamFromPreBinnedPatches, BinnedStream}; use crate::cache::pbvfs::PreBinnedScalarItem; use crate::cache::{BinnedQuery, MergedFromRemotes}; @@ -19,10 +20,14 @@ use netpod::{ AggKind, BinnedRange, NanoRange, NodeConfigCached, PerfOpts, PreBinnedPatchIterator, PreBinnedPatchRange, }; use num_traits::Zero; +use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize, Serializer}; +use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; use std::time::Duration; +use tokio::fs::File; +use tokio::io::{AsyncRead, ReadBuf}; pub mod scalar; @@ -31,12 +36,6 @@ pub struct BinnedStreamRes { pub range: BinnedRange, } -#[derive(Debug, Serialize, Deserialize)] -pub enum BinnedScalarStreamItem { - Values(MinMaxAvgScalarBinBatch), - RangeComplete, -} - pub struct MinMaxAvgScalarBinBatchCollected { batch: MinMaxAvgScalarBinBatch, timed_out: bool, @@ -55,22 +54,6 @@ impl MinMaxAvgScalarBinBatchCollected { } } -impl Collectable for BinnedScalarStreamItem { - type Collected = MinMaxAvgScalarBinBatchCollected; - - fn append_to(&mut self, collected: &mut Self::Collected) { - use BinnedScalarStreamItem::*; - match self { - Values(item) => { - append_to_min_max_avg_scalar_bin_batch(&mut collected.batch, item); - } - RangeComplete => { - // TODO use some other batch type in order to raise the range complete flag. - } - } - } -} - fn append_to_min_max_avg_scalar_bin_batch(batch: &mut MinMaxAvgScalarBinBatch, item: &mut MinMaxAvgScalarBinBatch) { batch.ts1s.append(&mut item.ts1s); batch.ts2s.append(&mut item.ts2s); @@ -134,98 +117,12 @@ impl ToJsonResult for MinMaxAvgScalarBinBatchCollected { } } -impl MakeBytesFrame for Result, Error> { +impl MakeBytesFrame for Result>, Error> { fn make_bytes_frame(&self) -> Result { Ok(make_frame(self)?.freeze()) } } -impl AggregatableXdim1Bin for BinnedScalarStreamItem { - // TODO does this already include all cases? - type Output = BinnedScalarStreamItem; - - fn into_agg(self) -> Self::Output { - todo!() - } -} - -pub struct BinnedScalarStreamItemAggregator { - inner_agg: MinMaxAvgScalarBinBatchAggregator, -} - -impl BinnedScalarStreamItemAggregator { - pub fn new(ts1: u64, ts2: u64) -> Self { - Self { - inner_agg: MinMaxAvgScalarBinBatchAggregator::new(ts1, ts2), - } - } -} - -// TODO this could be some generic impl for all wrapper that can carry some AggregatableTdim variant. -impl AggregatorTdim for BinnedScalarStreamItemAggregator { - type InputValue = BinnedScalarStreamItem; - // TODO using the same type for the output, does this cover all cases? - type OutputValue = BinnedScalarStreamItem; - - fn ends_before(&self, inp: &Self::InputValue) -> bool { - match inp { - Self::OutputValue::Values(item) => self.inner_agg.ends_before(item), - Self::OutputValue::RangeComplete => false, - } - } - - fn ends_after(&self, inp: &Self::InputValue) -> bool { - match inp { - Self::OutputValue::Values(item) => self.inner_agg.ends_after(item), - Self::OutputValue::RangeComplete => false, - } - } - - fn starts_after(&self, inp: &Self::InputValue) -> bool { - match inp { - Self::OutputValue::Values(item) => self.inner_agg.starts_after(item), - Self::OutputValue::RangeComplete => false, - } - } - - fn ingest(&mut self, inp: &mut Self::InputValue) { - match inp { - Self::OutputValue::Values(item) => self.inner_agg.ingest(item), - Self::OutputValue::RangeComplete => (), - } - } - - fn result(self) -> Vec { - self.inner_agg - .result() - .into_iter() - .map(|k| BinnedScalarStreamItem::Values(k)) - .collect() - } -} - -impl AggregatableTdim for BinnedScalarStreamItem { - type Aggregator = BinnedScalarStreamItemAggregator; - // TODO isn't this already defined in terms of the Aggregator? - type Output = BinnedScalarStreamItem; - - fn aggregator_new_static(ts1: u64, ts2: u64) -> Self::Aggregator { - BinnedScalarStreamItemAggregator::new(ts1, ts2) - } - - fn is_range_complete(&self) -> bool { - if let Self::RangeComplete = self { - true - } else { - false - } - } - - fn make_range_complete_item() -> Option { - Some(Self::RangeComplete) - } -} - type BinnedStreamBox = Pin> + Send>>; pub async fn binned_bytes_for_http( @@ -394,9 +291,70 @@ pub async fn binned_json(node_config: &NodeConfigCached, query: &BinnedQuery) -> Ok(serde_json::to_value(ret)?) } -pub trait PreBinnedItem: Unpin { +pub struct ReadPbv +where + PBI: PreBinnedItem, +{ + buf: Vec, + file: Option, + _mark: std::marker::PhantomData, +} + +impl ReadPbv +where + PBI: PreBinnedItem, +{ + fn new(file: File) -> Self { + Self { + buf: vec![], + file: Some(file), + _mark: std::marker::PhantomData::default(), + } + } +} + +impl Future for ReadPbv +where + PBI: PreBinnedItem, +{ + type Output = Result, Error>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { + use Poll::*; + let mut buf = vec![]; + let mut dst = ReadBuf::new(&mut buf); + let fp = self.file.as_mut().unwrap(); + let f = Pin::new(fp); + match File::poll_read(f, cx, &mut dst) { + Ready(res) => match res { + Ok(_) => { + if dst.filled().len() > 0 { + self.buf.extend_from_slice(&mut buf); + Pending + } else { + match PBI::from_buf(&mut self.buf) { + Ok(item) => Ready(Ok(StreamItem::DataItem(item))), + Err(e) => Ready(Err(e)), + } + } + } + Err(e) => Ready(Err(e.into())), + }, + Pending => Pending, + } + } +} + +pub trait PreBinnedItem: Send + Serialize + DeserializeOwned + Unpin { type BinnedStreamItem: AggregatableTdim + Unpin + Send; fn into_binned_stream_item(self, fit_range: NanoRange) -> Option; + fn make_range_complete() -> Self; + fn read_pbv(file: File) -> Result, Error> + where + Self: Sized; + fn from_buf(buf: &[u8]) -> Result + where + Self: Sized; } impl PreBinnedItem for PreBinnedScalarItem { @@ -405,24 +363,99 @@ impl PreBinnedItem for PreBinnedScalarItem { fn into_binned_stream_item(self, fit_range: NanoRange) -> Option { match self { Self::RangeComplete => Some(Self::BinnedStreamItem::RangeComplete), - Self::Batch(item) => { - use super::agg::{Fits, FitsInside}; - match item.fits_inside(fit_range) { - Fits::Inside | Fits::PartlyGreater | Fits::PartlyLower | Fits::PartlyLowerAndGreater => { - Some(Self::BinnedStreamItem::Values(item)) - } - _ => None, + Self::Batch(item) => match item.fits_inside(fit_range) { + Fits::Inside | Fits::PartlyGreater | Fits::PartlyLower | Fits::PartlyLowerAndGreater => { + Some(Self::BinnedStreamItem::Values(item)) } - } + _ => None, + }, } } + + fn make_range_complete() -> Self { + Self::RangeComplete + } + + fn read_pbv(file: File) -> Result, Error> { + Ok(ReadPbv::new(file)) + } + + fn from_buf(buf: &[u8]) -> Result { + let dec: MinMaxAvgScalarBinBatch = serde_cbor::from_slice(&buf)?; + Ok(Self::Batch(dec)) + } +} + +pub trait XBinnedEventsStreamItem: + Send + Serialize + DeserializeOwned + Unpin + Collectable + Collected + AggregatableTdim +{ + fn make_range_complete() -> Self; +} + +impl Collected for MinMaxAvgScalarEventBatchStreamItem { + // TODO for this case we don't have an expected number of events. Factor out into another trait? + fn new(bin_count_exp: u32) -> Self { + // TODO factor out the concept of RangeComplete into another trait layer: + Self::Values(MinMaxAvgScalarEventBatch::empty()) + } + fn timed_out(&mut self, k: bool) {} +} + +impl Collectable for MinMaxAvgScalarEventBatchStreamItem { + type Collected = MinMaxAvgScalarEventBatchStreamItem; + fn append_to(&mut self, collected: &mut Self::Collected) { + match self { + Self::RangeComplete => { + // TODO would be more nice to insert another type layer for RangeComplete concept. + panic!() + } + Self::Values(this) => match collected { + Self::RangeComplete => {} + Self::Values(coll) => { + coll.tss.append(&mut coll.tss); + coll.mins.append(&mut coll.mins); + coll.maxs.append(&mut coll.maxs); + coll.avgs.append(&mut coll.avgs); + } + }, + } + } +} + +impl XBinnedEventsStreamItem for MinMaxAvgScalarEventBatchStreamItem { + fn make_range_complete() -> Self { + Self::RangeComplete + } +} + +pub trait TBinned: Send + Serialize + DeserializeOwned + Unpin + Collectable + AggregatableTdim {} + +impl TBinned for MinMaxAvgScalarBinBatchStreamItem {} + +impl Collected for MinMaxAvgScalarBinBatch { + fn new(bin_count_exp: u32) -> Self { + MinMaxAvgScalarBinBatch::empty() + } + fn timed_out(&mut self, k: bool) {} +} + +impl Collectable for MinMaxAvgScalarBinBatch { + type Collected = MinMaxAvgScalarBinBatch; + fn append_to(&mut self, collected: &mut Self::Collected) { + collected.ts1s.append(&mut self.ts1s); + collected.ts2s.append(&mut self.ts2s); + collected.counts.append(&mut self.counts); + collected.mins.append(&mut self.mins); + collected.maxs.append(&mut self.maxs); + collected.avgs.append(&mut self.avgs); + } } pub trait BinnedStreamKind: Clone + Unpin + Send + Sync + 'static { type BinnedStreamItem: MakeBytesFrame; type BinnedStreamType: Stream + Send + 'static; - type Dummy: Default + Unpin + Send; - type PreBinnedItem: PreBinnedItem + Send; + type PreBinnedItem: PreBinnedItem; + type XBinnedEvents; fn new_binned_from_prebinned( &self, @@ -439,6 +472,8 @@ pub trait BinnedStreamKind: Clone + Unpin + Send + Sync + 'static { range: BinnedRange, node_config: &NodeConfigCached, ) -> Result; + + fn pbv_handle_fut2_item(item: StreamItem) -> Option>; } #[derive(Clone)] @@ -458,11 +493,16 @@ impl BinnedStreamKindWave { } } +pub enum RangeCompletableItem { + RangeComplete, + Data(T), +} + impl BinnedStreamKind for BinnedStreamKindScalar { type BinnedStreamItem = Result, Error>; type BinnedStreamType = BinnedStream; - type Dummy = u32; type PreBinnedItem = PreBinnedScalarItem; + type XBinnedEvents = MinMaxAvgScalarEventBatch; fn new_binned_from_prebinned( &self, @@ -491,9 +531,36 @@ impl BinnedStreamKind for BinnedStreamKindScalar { range: BinnedRange, node_config: &NodeConfigCached, ) -> Result { - let s = MergedFromRemotes::new(evq, perf_opts, node_config.node_config.cluster.clone()) - .into_binned_t(range.clone()) - .map(adapter_to_stream_item); + let s = MergedFromRemotes::new(evq, perf_opts, node_config.node_config.cluster.clone(), self.clone()); + // TODO use the binned2 instead + let s = crate::agg::binnedt::IntoBinnedT::into_binned_t(s, range); + let s = s.map(adapter_to_stream_item); Ok(BinnedStream::new(Box::pin(s))?) } + + fn pbv_handle_fut2_item(item: StreamItem) -> Option> { + // TODO make this code work in this context: + // Do I need more parameters here? + /*Ok(item) => match item { + StreamItem::Log(item) => Ready(Some(Ok(StreamItem::Log(item)))), + StreamItem::Stats(item) => Ready(Some(Ok(StreamItem::Stats(item)))), + StreamItem::DataItem(item) => match item { + PreBinnedScalarItem::RangeComplete => { + self.range_complete_observed = true; + None + } + PreBinnedScalarItem::Batch(batch) => { + self.values.ts1s.extend(batch.ts1s.iter()); + self.values.ts2s.extend(batch.ts2s.iter()); + self.values.counts.extend(batch.counts.iter()); + self.values.mins.extend(batch.mins.iter()); + self.values.maxs.extend(batch.maxs.iter()); + self.values.avgs.extend(batch.avgs.iter()); + StreamItem::DataItem(PreBinnedScalarItem::Batch(batch)) + } + }, + },*/ + err::todo(); + None + } } diff --git a/disk/src/binned/scalar.rs b/disk/src/binned/scalar.rs index 624bcb9..a66f90e 100644 --- a/disk/src/binned/scalar.rs +++ b/disk/src/binned/scalar.rs @@ -1,35 +1,14 @@ -use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatchStreamItem; +use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch; use crate::agg::streams::StreamItem; -use crate::binned::{BinnedScalarStreamItem, BinnedStreamKind, BinnedStreamRes}; +use crate::binned::{BinnedStreamKind, BinnedStreamRes, RangeCompletableItem}; use crate::binnedstream::BinnedStream; use crate::cache::BinnedQuery; use crate::raw::EventsQuery; use err::Error; use futures_core::Stream; -use futures_util::StreamExt; use netpod::log::*; use netpod::{BinnedRange, NodeConfigCached, PerfOpts, PreBinnedPatchRange}; -pub fn adapter_to_stream_item( - k: Result, Error>, -) -> Result, Error> { - match k { - Ok(k) => match k { - StreamItem::Log(item) => Ok(StreamItem::Log(item)), - StreamItem::Stats(item) => Ok(StreamItem::Stats(item)), - StreamItem::DataItem(item) => match item { - MinMaxAvgScalarBinBatchStreamItem::RangeComplete => { - Ok(StreamItem::DataItem(BinnedScalarStreamItem::RangeComplete)) - } - MinMaxAvgScalarBinBatchStreamItem::Values(item) => { - Ok(StreamItem::DataItem(BinnedScalarStreamItem::Values(item))) - } - }, - }, - Err(e) => Err(e), - } -} - pub async fn binned_stream( node_config: &NodeConfigCached, query: &BinnedQuery, diff --git a/disk/src/binnedstream.rs b/disk/src/binnedstream.rs index 05bf407..a7b940a 100644 --- a/disk/src/binnedstream.rs +++ b/disk/src/binnedstream.rs @@ -1,8 +1,8 @@ -use crate::agg::binnedt::IntoBinnedT; use crate::agg::streams::StreamItem; -use crate::binned::{BinnedScalarStreamItem, BinnedStreamKind, PreBinnedItem}; -use crate::cache::pbvfs::{PreBinnedScalarItem, PreBinnedScalarValueFetchedStream}; +use crate::binned::{BinnedStreamKind, PreBinnedItem}; +use crate::cache::pbvfs::PreBinnedScalarValueFetchedStream; use crate::cache::{CacheUsage, PreBinnedQuery}; +use crate::frame::makeframe::FrameType; use err::Error; use futures_core::Stream; use futures_util::StreamExt; @@ -33,6 +33,7 @@ where impl BinnedScalarStreamFromPreBinnedPatches where BK: BinnedStreamKind, + Result::PreBinnedItem>, err::Error>: FrameType, { pub fn new( patch_it: PreBinnedPatchIterator, @@ -57,6 +58,7 @@ where let inp = futures_util::stream::iter(patches.into_iter()) .map({ let node_config = node_config.clone(); + let stream_kind = stream_kind.clone(); move |patch| { let query = PreBinnedQuery::new( patch, diff --git a/disk/src/cache.rs b/disk/src/cache.rs index a1ed4fa..d3a7271 100644 --- a/disk/src/cache.rs +++ b/disk/src/cache.rs @@ -1,9 +1,8 @@ -use crate::agg::eventbatch::MinMaxAvgScalarEventBatchStreamItem; use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch; use crate::agg::streams::StreamItem; -use crate::binned::BinnedStreamKind; +use crate::binned::{BinnedStreamKind, RangeCompletableItem}; use crate::cache::pbv::PreBinnedValueByteStream; -use crate::cache::pbvfs::PreBinnedScalarItem; +use crate::frame::makeframe::FrameType; use crate::merge::MergedMinMaxAvgScalarStream; use crate::raw::EventsQuery; use bytes::Bytes; @@ -24,8 +23,7 @@ use std::path::PathBuf; use std::pin::Pin; use std::task::{Context, Poll}; use tiny_keccak::Hasher; -use tokio::fs::File; -use tokio::io::{AsyncRead, AsyncReadExt, ReadBuf}; +use tokio::io::{AsyncRead, ReadBuf}; #[allow(unused_imports)] use tracing::{debug, error, info, trace, warn}; @@ -232,13 +230,14 @@ fn channel_from_params(params: &BTreeMap) -> Result( +pub fn pre_binned_bytes_for_http<'a, BK>( node_config: &NodeConfigCached, query: &PreBinnedQuery, stream_kind: BK, ) -> Result, Error> where BK: BinnedStreamKind, + Result::PreBinnedItem>, err::Error>: FrameType, { if query.channel.backend != node_config.node.backend { let err = Error::with_msg(format!( @@ -319,22 +318,34 @@ impl AsyncRead for HttpBodyAsAsyncRead { } } -type T001 = Pin, Error>> + Send>>; -type T002 = Pin> + Send>>; -pub struct MergedFromRemotes { - tcp_establish_futs: Vec, - nodein: Vec>, - merged: Option, +type T001 = Pin, Error>> + Send>>; +type T002 = Pin, Error>> + Send>>; + +pub struct MergedFromRemotes +where + SK: BinnedStreamKind, +{ + tcp_establish_futs: Vec::XBinnedEvents>>>, + nodein: Vec::XBinnedEvents>>>>, + merged: Option::XBinnedEvents>>>, completed: bool, errored: bool, } -impl MergedFromRemotes { - pub fn new(evq: EventsQuery, perf_opts: PerfOpts, cluster: Cluster) -> Self { +impl MergedFromRemotes +where + SK: BinnedStreamKind, +{ + pub fn new(evq: EventsQuery, perf_opts: PerfOpts, cluster: Cluster, stream_kind: SK) -> Self { let mut tcp_establish_futs = vec![]; for node in &cluster.nodes { - let f = super::raw::x_processed_stream_from_node(evq.clone(), perf_opts.clone(), node.clone()); - let f: T002 = Box::pin(f); + let f = super::raw::x_processed_stream_from_node( + evq.clone(), + perf_opts.clone(), + node.clone(), + stream_kind.clone(), + ); + let f: T002::XBinnedEvents>> = Box::pin(f); tcp_establish_futs.push(f); } let n = tcp_establish_futs.len(); @@ -348,9 +359,11 @@ impl MergedFromRemotes { } } -impl Stream for MergedFromRemotes { - // TODO need this generic for scalar and array (when wave is not binned down to a single scalar point) - type Item = Result, Error>; +impl Stream for MergedFromRemotes +where + SK: BinnedStreamKind, +{ + type Item = Result::XBinnedEvents>>, Error>; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; @@ -402,7 +415,7 @@ impl Stream for MergedFromRemotes { if c1 == self.tcp_establish_futs.len() { debug!("MergedFromRemotes setting up merged stream"); let inps = self.nodein.iter_mut().map(|k| k.take().unwrap()).collect(); - let s1 = MergedMinMaxAvgScalarStream::new(inps); + let s1 = MergedMinMaxAvgScalarStream::<_, SK>::new(inps); self.merged = Some(Box::pin(s1)); } else { debug!( @@ -530,11 +543,3 @@ pub async fn write_pb_cache_min_max_avg_scalar( .await??; Ok(()) } - -pub async fn read_pbv(mut file: File) -> Result, Error> { - let mut buf = vec![]; - file.read_to_end(&mut buf).await?; - trace!("Read cached file len {}", buf.len()); - let dec: MinMaxAvgScalarBinBatch = serde_cbor::from_slice(&buf)?; - Ok(StreamItem::DataItem(PreBinnedScalarItem::Batch(dec))) -} diff --git a/disk/src/cache/pbv.rs b/disk/src/cache/pbv.rs index ece10ad..2cf10e4 100644 --- a/disk/src/cache/pbv.rs +++ b/disk/src/cache/pbv.rs @@ -1,10 +1,10 @@ use crate::agg::binnedt::IntoBinnedT; -use crate::agg::scalarbinbatch::{MinMaxAvgScalarBinBatch, MinMaxAvgScalarBinBatchStreamItem}; +use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch; use crate::agg::streams::StreamItem; -use crate::binned::{BinnedStreamKind, PreBinnedItem}; +use crate::binned::{BinnedStreamKind, PreBinnedItem, RangeCompletableItem}; use crate::cache::pbvfs::{PreBinnedScalarItem, PreBinnedScalarValueFetchedStream}; use crate::cache::{CacheFileDesc, MergedFromRemotes, PreBinnedQuery}; -use crate::frame::makeframe::make_frame; +use crate::frame::makeframe::{make_frame, FrameType}; use crate::raw::EventsQuery; use crate::streamlog::Streamlog; use bytes::Bytes; @@ -18,57 +18,63 @@ use std::future::Future; use std::path::PathBuf; use std::pin::Pin; use std::task::{Context, Poll}; +use tokio::fs::File; pub type PreBinnedValueByteStream = SCC>; -pub struct PreBinnedValueByteStreamInner +pub struct PreBinnedValueByteStreamInner where - BK: BinnedStreamKind, + SK: BinnedStreamKind, { - inp: PreBinnedValueStream, + inp: PreBinnedValueStream, } -pub fn pre_binned_value_byte_stream_new( +pub fn pre_binned_value_byte_stream_new( query: &PreBinnedQuery, node_config: &NodeConfigCached, - stream_kind: BK, -) -> PreBinnedValueByteStream + stream_kind: SK, +) -> PreBinnedValueByteStream where - BK: BinnedStreamKind + Unpin, + SK: BinnedStreamKind + Unpin, + Result::PreBinnedItem>, err::Error>: FrameType, { let s1 = PreBinnedValueStream::new(query.clone(), node_config, stream_kind); let s2 = PreBinnedValueByteStreamInner { inp: s1 }; SCC::new(s2) } -impl Stream for PreBinnedValueByteStreamInner +impl Stream for PreBinnedValueByteStreamInner where - BK: BinnedStreamKind + Unpin, + SK: BinnedStreamKind + Unpin, + Result::PreBinnedItem>, err::Error>: FrameType, + PreBinnedValueStream: Unpin, { type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; match self.inp.poll_next_unpin(cx) { - Ready(Some(item)) => match make_frame::, Error>>(&item) { - Ok(buf) => Ready(Some(Ok(buf.freeze()))), - Err(e) => Ready(Some(Err(e.into()))), - }, + Ready(Some(item)) => { + match make_frame::::PreBinnedItem>, Error>>(&item) { + Ok(buf) => Ready(Some(Ok(buf.freeze()))), + Err(e) => Ready(Some(Err(e.into()))), + } + } Ready(None) => Ready(None), Pending => Pending, } } } -pub struct PreBinnedValueStream +pub struct PreBinnedValueStream where - BK: BinnedStreamKind, + SK: BinnedStreamKind, { query: PreBinnedQuery, node_config: NodeConfigCached, - open_check_local_file: Option> + Send>>>, + open_check_local_file: Option> + Send>>>, fut2: - Option::PreBinnedItem>, Error>> + Send>>>, + Option::PreBinnedItem>, Error>> + Send>>>, read_from_cache: bool, cache_written: bool, data_complete: bool, @@ -79,15 +85,18 @@ where streamlog: Streamlog, values: MinMaxAvgScalarBinBatch, write_fut: Option> + Send>>>, - read_cache_fut: Option, Error>> + Send>>>, - stream_kind: BK, + read_cache_fut: Option< + Pin::PreBinnedItem>, Error>> + Send>>, + >, + stream_kind: SK, } -impl PreBinnedValueStream +impl PreBinnedValueStream where - BK: BinnedStreamKind, + SK: BinnedStreamKind, + Result::PreBinnedItem>>, err::Error>: FrameType, { - pub fn new(query: PreBinnedQuery, node_config: &NodeConfigCached, stream_kind: BK) -> Self { + pub fn new(query: PreBinnedQuery, node_config: &NodeConfigCached, stream_kind: SK) -> Self { Self { query, node_config: node_config.clone(), @@ -130,22 +139,28 @@ where .ok_or(Error::with_msg("covering_range returns None")) .unwrap(); let perf_opts = PerfOpts { inmem_bufcap: 512 }; - let s1 = MergedFromRemotes::new(evq, perf_opts, self.node_config.node_config.cluster.clone()); - let s1 = s1.into_binned_t(range); + let s1 = MergedFromRemotes::new( + evq, + perf_opts, + self.node_config.node_config.cluster.clone(), + self.stream_kind.clone(), + ); + let s1 = IntoBinnedT::into_binned_t(s1, range); let s1 = s1.map(|item| { // TODO does this do anything? match item { Ok(item) => match item { StreamItem::Log(item) => Ok(StreamItem::Log(item)), StreamItem::Stats(item) => Ok(StreamItem::Stats(item)), - StreamItem::DataItem(item) => match item { + StreamItem::DataItem(item) => Ok(StreamItem::DataItem(item)), + /*StreamItem::DataItem(item) => match item { MinMaxAvgScalarBinBatchStreamItem::RangeComplete => { Ok(StreamItem::DataItem(PreBinnedScalarItem::RangeComplete)) } MinMaxAvgScalarBinBatchStreamItem::Values(item) => { Ok(StreamItem::DataItem(PreBinnedScalarItem::Batch(item))) } - }, + },*/ }, Err(e) => Err(e), } @@ -153,7 +168,10 @@ where // TODO // In the above must introduce a trait to convert to the generic item type: - self.fut2 = Some(Box::pin(s1)); + + // TODO!! + self.fut2 = Some(err::todoval()); + //self.fut2 = Some(Box::pin(s1)); } fn setup_from_higher_res_prebinned(&mut self, range: PreBinnedPatchRange) { @@ -223,12 +241,12 @@ where } } -impl Stream for PreBinnedValueStream +impl Stream for PreBinnedValueStream where - BK: BinnedStreamKind, + SK: BinnedStreamKind + Unpin, + Result::PreBinnedItem>>, err::Error>: FrameType, { - // TODO need this generic for scalar and array (when wave is not binned down to a single scalar point) - type Item = Result, Error>; + type Item = Result::PreBinnedItem>>, Error>; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; @@ -283,7 +301,8 @@ where if self.cache_written { if self.range_complete_observed { self.range_complete_emitted = true; - Ready(Some(Ok(StreamItem::DataItem(PreBinnedScalarItem::RangeComplete)))) + let item = <::PreBinnedItem as PreBinnedItem>::make_range_complete(); + Ready(Some(Ok(StreamItem::DataItem(item)))) } else { self.completed = true; Ready(None) @@ -320,24 +339,9 @@ where } else if let Some(fut) = self.fut2.as_mut() { match fut.poll_next_unpin(cx) { Ready(Some(k)) => match k { - Ok(item) => match item { - StreamItem::Log(item) => Ready(Some(Ok(StreamItem::Log(item)))), - StreamItem::Stats(item) => Ready(Some(Ok(StreamItem::Stats(item)))), - StreamItem::DataItem(item) => match item { - PreBinnedScalarItem::RangeComplete => { - self.range_complete_observed = true; - continue 'outer; - } - PreBinnedScalarItem::Batch(batch) => { - self.values.ts1s.extend(batch.ts1s.iter()); - self.values.ts2s.extend(batch.ts2s.iter()); - self.values.counts.extend(batch.counts.iter()); - self.values.mins.extend(batch.mins.iter()); - self.values.maxs.extend(batch.maxs.iter()); - self.values.avgs.extend(batch.avgs.iter()); - Ready(Some(Ok(StreamItem::DataItem(PreBinnedScalarItem::Batch(batch))))) - } - }, + Ok(item) => match SK::pbv_handle_fut2_item(item) { + None => continue 'outer, + Some(item) => Ready(Some(Ok(item))), }, Err(e) => { self.errored = true; @@ -357,7 +361,7 @@ where match item { Ok(file) => { self.read_from_cache = true; - let fut = super::read_pbv(file); + let fut = ::PreBinnedItem::read_pbv(file)?; self.read_cache_fut = Some(Box::pin(fut)); continue 'outer; } diff --git a/disk/src/cache/pbvfs.rs b/disk/src/cache/pbvfs.rs index 23e873b..2ec9448 100644 --- a/disk/src/cache/pbvfs.rs +++ b/disk/src/cache/pbvfs.rs @@ -3,7 +3,7 @@ use crate::agg::streams::StreamItem; use crate::binned::BinnedStreamKind; use crate::cache::{node_ix_for_patch, HttpBodyAsAsyncRead, PreBinnedQuery}; use crate::frame::inmem::InMemoryFrameAsyncReadStream; -use crate::frame::makeframe::decode_frame; +use crate::frame::makeframe::{decode_frame, FrameType}; use err::Error; use futures_core::Stream; use futures_util::{pin_mut, FutureExt}; @@ -61,6 +61,7 @@ pub enum PreBinnedScalarItem { impl Stream for PreBinnedScalarValueFetchedStream where BK: BinnedStreamKind, + Result::PreBinnedItem>, err::Error>: FrameType, { type Item = Result, Error>; @@ -79,7 +80,9 @@ where StreamItem::Log(item) => Ready(Some(Ok(StreamItem::Log(item)))), StreamItem::Stats(item) => Ready(Some(Ok(StreamItem::Stats(item)))), StreamItem::DataItem(item) => { - match decode_frame::, Error>>(&item) { + match decode_frame::::PreBinnedItem>, Error>>( + &item, + ) { Ok(Ok(item)) => Ready(Some(Ok(item))), Ok(Err(e)) => { self.errored = true; diff --git a/disk/src/eventblobs.rs b/disk/src/eventblobs.rs index 9902896..72d04a4 100644 --- a/disk/src/eventblobs.rs +++ b/disk/src/eventblobs.rs @@ -1,6 +1,6 @@ use crate::agg::streams::StreamItem; use crate::dataopen::{open_files, OpenedFile}; -use crate::eventchunker::{EventChunker, EventChunkerConf, EventChunkerItem}; +use crate::eventchunker::{EventChunker, EventChunkerConf}; use crate::file_content_stream; use crate::streamlog::LogItem; use err::Error; diff --git a/disk/src/eventchunker.rs b/disk/src/eventchunker.rs index 2ba6f30..055bd13 100644 --- a/disk/src/eventchunker.rs +++ b/disk/src/eventchunker.rs @@ -1,4 +1,5 @@ use crate::agg::streams::{StatsItem, StreamItem}; +use crate::binned::RangeCompletableItem; use crate::{FileChunkRead, NeedMinBuffer}; use bitshuffle::bitshuffle_decompress; use bytes::{Buf, BytesMut}; @@ -345,13 +346,8 @@ impl EventFull { } } -pub enum EventChunkerItem { - Events(EventFull), - RangeComplete, -} - impl Stream for EventChunker { - type Item = Result, Error>; + type Item = Result>, Error>; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; @@ -374,7 +370,7 @@ impl Stream for EventChunker { } else if self.final_stats_sent { self.sent_beyond_range = true; if self.seen_beyond_range { - Ready(Some(Ok(StreamItem::DataItem(EventChunkerItem::RangeComplete)))) + Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)))) } else { continue 'outer; } @@ -404,7 +400,7 @@ impl Stream for EventChunker { } else { let x = self.need_min; self.inp.set_need_min(x); - let ret = StreamItem::DataItem(EventChunkerItem::Events(res.events)); + let ret = StreamItem::DataItem(RangeCompletableItem::Data(res.events)); Ready(Some(Ok(ret))) } } diff --git a/disk/src/frame/makeframe.rs b/disk/src/frame/makeframe.rs index d53e5d9..3ffe1bf 100644 --- a/disk/src/frame/makeframe.rs +++ b/disk/src/frame/makeframe.rs @@ -1,13 +1,11 @@ -use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatchStreamItem; +use crate::agg::eventbatch::MinMaxAvgScalarEventBatch; use crate::agg::streams::StreamItem; -use crate::binned::BinnedScalarStreamItem; use crate::cache::pbvfs::PreBinnedScalarItem; use crate::frame::inmem::InMemoryFrame; -use crate::raw::conn::RawConnOut; use crate::raw::EventQueryJsonStringFrame; use bytes::{BufMut, BytesMut}; use err::Error; -use serde::{Deserialize, Serialize}; +use serde::{de::DeserializeOwned, Serialize}; pub const INMEM_FRAME_HEAD: usize = 20; pub const INMEM_FRAME_FOOT: usize = 4; @@ -21,10 +19,6 @@ impl FrameType for EventQueryJsonStringFrame { const FRAME_TYPE_ID: u32 = 0x03; } -impl FrameType for RawConnOut { - const FRAME_TYPE_ID: u32 = 0x04; -} - impl FrameType for Result, Error> { const FRAME_TYPE_ID: u32 = 0x06; } @@ -37,6 +31,10 @@ impl FrameType for Result, Error> { const FRAME_TYPE_ID: u32 = 0x08; } +impl FrameType for Result, Error> { + const FRAME_TYPE_ID: u32 = 0x09; +} + pub fn make_frame(item: &FT) -> Result where FT: FrameType + Serialize, @@ -85,9 +83,9 @@ pub fn make_term_frame() -> BytesMut { buf } -pub fn decode_frame<'a, FT>(frame: &'a InMemoryFrame) -> Result +pub fn decode_frame(frame: &InMemoryFrame) -> Result where - FT: FrameType + Deserialize<'a>, + FT: FrameType + DeserializeOwned, { if frame.encid() != 0x12121212 { return Err(Error::with_msg(format!("unknown encoder id {:?}", frame))); diff --git a/disk/src/merge.rs b/disk/src/merge.rs index 8f57228..4f70927 100644 --- a/disk/src/merge.rs +++ b/disk/src/merge.rs @@ -1,5 +1,7 @@ -use crate::agg::eventbatch::{MinMaxAvgScalarEventBatch, MinMaxAvgScalarEventBatchStreamItem}; -use crate::agg::streams::{StatsItem, StreamItem}; +use crate::agg::binnedt::AggregatableTdim; +use crate::agg::eventbatch::MinMaxAvgScalarEventBatch; +use crate::agg::streams::{Collectable, Collected, StatsItem, StreamItem}; +use crate::binned::{BinnedStreamKind, XBinnedEventsStreamItem}; use crate::streamlog::LogItem; use err::Error; use futures_core::Stream; @@ -10,16 +12,17 @@ use std::collections::VecDeque; use std::pin::Pin; use std::task::{Context, Poll}; -pub struct MergedMinMaxAvgScalarStream +pub struct MergedMinMaxAvgScalarStream where - S: Stream, Error>>, + S: Stream::XBinnedEvents>, Error>> + Unpin, + SK: BinnedStreamKind, { inps: Vec, - current: Vec, + current: Vec::XBinnedEvents>>, ixs: Vec, errored: bool, completed: bool, - batch: MinMaxAvgScalarEventBatch, + batch: ::XBinnedEvents, ts_last_emit: u64, range_complete_observed: Vec, range_complete_observed_all: bool, @@ -30,23 +33,21 @@ where event_data_read_stats_items: VecDeque, } -impl MergedMinMaxAvgScalarStream +impl MergedMinMaxAvgScalarStream where - S: Stream, Error>> + Unpin, + S: Stream::XBinnedEvents>, Error>> + Unpin, + SK: BinnedStreamKind, { pub fn new(inps: Vec) -> Self { let n = inps.len(); - let current = (0..n) - .into_iter() - .map(|_| MergedMinMaxAvgScalarStreamCurVal::None) - .collect(); + let current = (0..n).into_iter().map(|_| MergedCurVal::None).collect(); Self { inps, current: current, ixs: vec![0; n], errored: false, completed: false, - batch: MinMaxAvgScalarEventBatch::empty(), + batch: <::XBinnedEvents as Collected>::new(0), ts_last_emit: 0, range_complete_observed: vec![false; n], range_complete_observed_all: false, @@ -63,7 +64,7 @@ where let mut pending = 0; for i1 in 0..self.inps.len() { match self.current[i1] { - MergedMinMaxAvgScalarStreamCurVal::None => { + MergedCurVal::None => { 'l1: loop { break match self.inps[i1].poll_next_unpin(cx) { Ready(Some(Ok(k))) => match k { @@ -79,23 +80,23 @@ where } continue 'l1; } - StreamItem::DataItem(item) => match item { - MinMaxAvgScalarEventBatchStreamItem::Values(vals) => { - self.ixs[i1] = 0; - self.current[i1] = MergedMinMaxAvgScalarStreamCurVal::Val(vals); - } - MinMaxAvgScalarEventBatchStreamItem::RangeComplete => { + StreamItem::DataItem(item) => { + // TODO factor out the concept of RangeComplete into another trait layer. + if item.is_range_complete() { self.range_complete_observed[i1] = true; let d = self.range_complete_observed.iter().filter(|&&k| k).count(); if d == self.range_complete_observed.len() { self.range_complete_observed_all = true; - debug!("MergedMinMaxAvgScalarStream range_complete d {} COMPLETE", d); + debug!("MergedStream range_complete d {} COMPLETE", d); } else { - trace!("MergedMinMaxAvgScalarStream range_complete d {}", d); + trace!("MergedStream range_complete d {}", d); } continue 'l1; + } else { + self.ixs[i1] = 0; + self.current[i1] = MergedCurVal::Val(item); } - }, + } }, Ready(Some(Err(e))) => { // TODO emit this error, consider this stream as done, anything more to do here? @@ -104,7 +105,7 @@ where return Ready(Err(e)); } Ready(None) => { - self.current[i1] = MergedMinMaxAvgScalarStreamCurVal::Finish; + self.current[i1] = MergedCurVal::Finish; } Pending => { pending += 1; @@ -123,17 +124,18 @@ where } } -impl Stream for MergedMinMaxAvgScalarStream +impl Stream for MergedMinMaxAvgScalarStream where - S: Stream, Error>> + Unpin, + S: Stream::XBinnedEvents>, Error>> + Unpin, + SK: BinnedStreamKind, { - type Item = Result, Error>; + type Item = Result::XBinnedEvents>, Error>; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; 'outer: loop { break if self.completed { - panic!("MergedMinMaxAvgScalarStream poll_next on completed"); + panic!("MergedStream poll_next on completed"); } else if self.errored { self.completed = true; Ready(None) @@ -149,7 +151,7 @@ where } else { self.range_complete_observed_all_emitted = true; Ready(Some(Ok(StreamItem::DataItem( - MinMaxAvgScalarEventBatchStreamItem::RangeComplete, + <::XBinnedEvents as XBinnedEventsStreamItem>::make_range_complete(), )))) } } else { @@ -163,11 +165,11 @@ where let mut lowest_ix = usize::MAX; let mut lowest_ts = u64::MAX; for i1 in 0..self.inps.len() { - if let MergedMinMaxAvgScalarStreamCurVal::Val(val) = &self.current[i1] { + if let MergedCurVal::Val(val) = &self.current[i1] { let u = self.ixs[i1]; if u >= val.tss.len() { self.ixs[i1] = 0; - self.current[i1] = MergedMinMaxAvgScalarStreamCurVal::None; + self.current[i1] = MergedCurVal::None; continue 'outer; } else { let ts = val.tss[u]; @@ -180,8 +182,10 @@ where } if lowest_ix == usize::MAX { if self.batch.tss.len() != 0 { - let k = std::mem::replace(&mut self.batch, MinMaxAvgScalarEventBatch::empty()); - let ret = MinMaxAvgScalarEventBatchStreamItem::Values(k); + //let k = std::mem::replace(&mut self.batch, MinMaxAvgScalarEventBatch::empty()); + //let ret = MinMaxAvgScalarEventBatchStreamItem::Values(k); + let emp = <::XBinnedEvents as Collected>::new(0); + let ret = std::mem::replace(&mut self.batch, emp); self.data_emit_complete = true; Ready(Some(Ok(StreamItem::DataItem(ret)))) } else { @@ -194,9 +198,7 @@ where self.batch.tss.push(lowest_ts); let rix = self.ixs[lowest_ix]; let z = match &self.current[lowest_ix] { - MergedMinMaxAvgScalarStreamCurVal::Val(k) => { - (k.mins[rix], k.maxs[rix], k.avgs[rix], k.tss.len()) - } + MergedCurVal::Val(k) => (k.mins[rix], k.maxs[rix], k.avgs[rix], k.tss.len()), _ => panic!(), }; self.batch.mins.push(z.0); @@ -205,11 +207,13 @@ where self.ixs[lowest_ix] += 1; if self.ixs[lowest_ix] >= z.3 { self.ixs[lowest_ix] = 0; - self.current[lowest_ix] = MergedMinMaxAvgScalarStreamCurVal::None; + self.current[lowest_ix] = MergedCurVal::None; } if self.batch.tss.len() >= self.batch_size { - let k = std::mem::replace(&mut self.batch, MinMaxAvgScalarEventBatch::empty()); - let ret = MinMaxAvgScalarEventBatchStreamItem::Values(k); + //let k = std::mem::replace(&mut self.batch, MinMaxAvgScalarEventBatch::empty()); + //let ret = MinMaxAvgScalarEventBatchStreamItem::Values(k); + let emp = <::XBinnedEvents as Collected>::new(0); + let ret = std::mem::replace(&mut self.batch, emp); Ready(Some(Ok(StreamItem::DataItem(ret)))) } else { continue 'outer; @@ -227,8 +231,8 @@ where } } -enum MergedMinMaxAvgScalarStreamCurVal { +enum MergedCurVal { None, Finish, - Val(MinMaxAvgScalarEventBatch), + Val(T), } diff --git a/disk/src/raw.rs b/disk/src/raw.rs index 8b16790..0e57f5d 100644 --- a/disk/src/raw.rs +++ b/disk/src/raw.rs @@ -5,11 +5,11 @@ Delivers event data (not yet time-binned) from local storage and provides client to request such data from nodes. */ -use crate::agg::eventbatch::MinMaxAvgScalarEventBatchStreamItem; use crate::agg::streams::StreamItem; +use crate::binned::{BinnedStreamKind, RangeCompletableItem}; use crate::frame::inmem::InMemoryFrameAsyncReadStream; use crate::frame::makeframe::{make_frame, make_term_frame}; -use crate::raw::bffr::MinMaxAvgScalarEventBatchStreamFromFrames; +use crate::raw::bffr::EventsFromFrames; use err::Error; use futures_core::Stream; use netpod::{AggKind, Channel, NanoRange, Node, PerfOpts}; @@ -36,11 +36,23 @@ pub struct EventsQuery { #[derive(Serialize, Deserialize)] pub struct EventQueryJsonStringFrame(String); -pub async fn x_processed_stream_from_node( +pub async fn x_processed_stream_from_node( query: EventsQuery, perf_opts: PerfOpts, node: Node, -) -> Result, Error>> + Send>>, Error> { + stream_kind: SK, +) -> Result< + Pin< + Box< + dyn Stream::XBinnedEvents>>, Error>> + + Send, + >, + >, + Error, +> +where + SK: BinnedStreamKind, +{ let net = TcpStream::connect(format!("{}:{}", node.host, node.port_raw)).await?; let qjs = serde_json::to_string(&query)?; let (netin, mut netout) = net.into_split(); @@ -51,7 +63,7 @@ pub async fn x_processed_stream_from_node( netout.flush().await?; netout.forget(); let frames = InMemoryFrameAsyncReadStream::new(netin, perf_opts.inmem_bufcap); - let items = MinMaxAvgScalarEventBatchStreamFromFrames::new(frames); + let items = EventsFromFrames::new(frames, stream_kind); Ok(Box::pin(items)) } diff --git a/disk/src/raw/bffr.rs b/disk/src/raw/bffr.rs index fa7c689..845d457 100644 --- a/disk/src/raw/bffr.rs +++ b/disk/src/raw/bffr.rs @@ -1,8 +1,7 @@ -use crate::agg::eventbatch::MinMaxAvgScalarEventBatchStreamItem; use crate::agg::streams::StreamItem; +use crate::binned::{BinnedStreamKind, RangeCompletableItem}; use crate::frame::inmem::InMemoryFrameAsyncReadStream; use crate::frame::makeframe::decode_frame; -use crate::raw::conn::RawConnOut; use err::Error; use futures_core::Stream; use futures_util::StreamExt; @@ -11,39 +10,44 @@ use std::pin::Pin; use std::task::{Context, Poll}; use tokio::io::AsyncRead; -pub struct MinMaxAvgScalarEventBatchStreamFromFrames +pub struct EventsFromFrames where T: AsyncRead + Unpin, + SK: BinnedStreamKind, { inp: InMemoryFrameAsyncReadStream, errored: bool, completed: bool, + stream_kind: SK, } -impl MinMaxAvgScalarEventBatchStreamFromFrames +impl EventsFromFrames where T: AsyncRead + Unpin, + SK: BinnedStreamKind, { - pub fn new(inp: InMemoryFrameAsyncReadStream) -> Self { + pub fn new(inp: InMemoryFrameAsyncReadStream, stream_kind: SK) -> Self { Self { inp, errored: false, completed: false, + stream_kind, } } } -impl Stream for MinMaxAvgScalarEventBatchStreamFromFrames +impl Stream for EventsFromFrames where T: AsyncRead + Unpin, + SK: BinnedStreamKind, { - type Item = Result, Error>; + type Item = Result::XBinnedEvents>>, Error>; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; loop { break if self.completed { - panic!("MinMaxAvgScalarEventBatchStreamFromFrames poll_next on completed"); + panic!("EventsFromFrames poll_next on completed"); } else if self.errored { self.completed = true; Ready(None) @@ -53,8 +57,7 @@ where StreamItem::Log(item) => Ready(Some(Ok(StreamItem::Log(item)))), StreamItem::Stats(item) => Ready(Some(Ok(StreamItem::Stats(item)))), StreamItem::DataItem(frame) => { - type ExpectedType = RawConnOut; - match decode_frame::(&frame) { + match decode_frame::<::XBinnedEvents>(&frame) { Ok(item) => match item { Ok(item) => Ready(Some(Ok(item))), Err(e) => { @@ -64,7 +67,7 @@ where }, Err(e) => { error!( - "MinMaxAvgScalarEventBatchStreamFromFrames ~~~~~~~~ ERROR on frame payload {}", + "EventsFromFrames ~~~~~~~~ ERROR on frame payload {}", frame.buf().len(), ); self.errored = true; diff --git a/disk/src/raw/conn.rs b/disk/src/raw/conn.rs index 2462194..8da848f 100644 --- a/disk/src/raw/conn.rs +++ b/disk/src/raw/conn.rs @@ -1,7 +1,8 @@ use crate::agg::binnedx::IntoBinnedXBins1; -use crate::agg::eventbatch::MinMaxAvgScalarEventBatchStreamItem; +use crate::agg::eventbatch::MinMaxAvgScalarEventBatch; use crate::agg::streams::StreamItem; use crate::agg::IntoDim1F32Stream; +use crate::binned::{BinnedStreamKind, BinnedStreamKindScalar, RangeCompletableItem}; use crate::channelconfig::{extract_matching_config_entry, read_local_config}; use crate::eventblobs::EventBlobsComplete; use crate::eventchunker::EventChunkerConf; @@ -11,30 +12,30 @@ use crate::raw::{EventQueryJsonStringFrame, EventsQuery}; use err::Error; use futures_util::StreamExt; use netpod::log::*; -use netpod::{ByteSize, NodeConfigCached, PerfOpts}; +use netpod::{AggKind, ByteSize, NodeConfigCached, PerfOpts}; use std::net::SocketAddr; use tokio::io::AsyncWriteExt; use tokio::net::tcp::OwnedWriteHalf; use tokio::net::TcpStream; use tracing::Instrument; -pub async fn raw_service(node_config: NodeConfigCached) -> Result<(), Error> { +pub async fn events_service(node_config: NodeConfigCached) -> Result<(), Error> { let addr = format!("{}:{}", node_config.node.listen, node_config.node.port_raw); let lis = tokio::net::TcpListener::bind(addr).await?; loop { match lis.accept().await { Ok((stream, addr)) => { - taskrun::spawn(raw_conn_handler(stream, addr, node_config.clone())); + taskrun::spawn(events_conn_handler(stream, addr, node_config.clone())); } Err(e) => Err(e)?, } } } -async fn raw_conn_handler(stream: TcpStream, addr: SocketAddr, node_config: NodeConfigCached) -> Result<(), Error> { +async fn events_conn_handler(stream: TcpStream, addr: SocketAddr, node_config: NodeConfigCached) -> Result<(), Error> { //use tracing_futures::Instrument; let span1 = span!(Level::INFO, "raw::raw_conn_handler"); - let r = raw_conn_handler_inner(stream, addr, &node_config) + let r = events_conn_handler_inner(stream, addr, &node_config) .instrument(span1) .await; match r { @@ -46,17 +47,16 @@ async fn raw_conn_handler(stream: TcpStream, addr: SocketAddr, node_config: Node } } -pub type RawConnOut = Result, Error>; - -async fn raw_conn_handler_inner( +async fn events_conn_handler_inner( stream: TcpStream, addr: SocketAddr, node_config: &NodeConfigCached, ) -> Result<(), Error> { - match raw_conn_handler_inner_try(stream, addr, node_config).await { + match events_conn_handler_inner_try(stream, addr, node_config).await { Ok(_) => (), Err(mut ce) => { - let buf = make_frame::(&Err(ce.err))?; + // TODO is it guaranteed to be compatible to serialize this way? + let buf = make_frame::, Error>>(&Err(ce.err))?; match ce.netout.write_all(&buf).await { Ok(_) => (), Err(e) => return Err(e)?, @@ -80,7 +80,7 @@ impl> From<(E, OwnedWriteHalf)> for ConnErr { } } -async fn raw_conn_handler_inner_try( +async fn events_conn_handler_inner_try( stream: TcpStream, addr: SocketAddr, node_config: &NodeConfigCached, @@ -170,14 +170,40 @@ async fn raw_conn_handler_inner_try( Ok(_) => {} Err(_) => {} } - match make_frame::(&item) { - Ok(buf) => match netout.write_all(&buf).await { - Ok(_) => {} - Err(e) => return Err((e, netout))?, - }, - Err(e) => { - return Err((e, netout))?; + match evq.agg_kind { + AggKind::DimXBins1 => { + match make_frame::< + Result< + StreamItem::XBinnedEvents>>, + Error, + >, + >(&item) + { + Ok(buf) => match netout.write_all(&buf).await { + Ok(_) => {} + Err(e) => return Err((e, netout))?, + }, + Err(e) => { + return Err((e, netout))?; + } + } } + // TODO define this case: + AggKind::DimXBinsN(n1) => match make_frame::< + Result< + StreamItem::XBinnedEvents>>, + Error, + >, + >(err::todoval()) + { + Ok(buf) => match netout.write_all(&buf).await { + Ok(_) => {} + Err(e) => return Err((e, netout))?, + }, + Err(e) => { + return Err((e, netout))?; + } + }, } } let buf = make_term_frame(); diff --git a/httpret/src/lib.rs b/httpret/src/lib.rs index a3da1f3..937f7c7 100644 --- a/httpret/src/lib.rs +++ b/httpret/src/lib.rs @@ -1,7 +1,7 @@ use bytes::Bytes; use disk::binned::BinnedStreamKindScalar; use disk::cache::{BinnedQuery, PreBinnedQuery}; -use disk::raw::conn::raw_service; +use disk::raw::conn::events_service; use err::Error; use future::Future; use futures_core::Stream; @@ -23,7 +23,7 @@ pub mod gather; pub async fn host(node_config: NodeConfigCached) -> Result<(), Error> { let node_config = node_config.clone(); - let rawjh = taskrun::spawn(raw_service(node_config.clone())); + let rawjh = taskrun::spawn(events_service(node_config.clone())); use std::str::FromStr; let addr = SocketAddr::from_str(&format!("{}:{}", node_config.node.listen, node_config.node.port))?; let make_service = make_service_fn({