From 30be7d1c4411122265ad0aaeaf11a53d2c945dbc Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Thu, 20 May 2021 20:04:21 +0200 Subject: [PATCH] It compiles --- disk/src/agg.rs | 50 ++++++--------- disk/src/agg/binnedt.rs | 114 ++++++++++++--------------------- disk/src/agg/binnedx.rs | 25 +++++--- disk/src/agg/eventbatch.rs | 70 -------------------- disk/src/agg/scalarbinbatch.rs | 71 +------------------- disk/src/agg/streams.rs | 95 +++++++++------------------ disk/src/aggtest.rs | 32 ++++----- disk/src/binned.rs | 78 +++++++++++----------- disk/src/binned/scalar.rs | 24 +++---- disk/src/binnedstream.rs | 51 +++++++-------- disk/src/cache.rs | 23 ++++--- disk/src/cache/pbv.rs | 74 +++++++++++---------- disk/src/cache/pbvfs.rs | 37 +++++------ disk/src/eventblobs.rs | 9 +-- disk/src/eventchunker.rs | 14 ++-- disk/src/frame/inmem.rs | 5 +- disk/src/frame/makeframe.rs | 8 +-- disk/src/lib.rs | 46 ------------- disk/src/merge.rs | 63 ++++++++++-------- disk/src/raw.rs | 3 +- disk/src/raw/bffr.rs | 45 ++++++------- disk/src/raw/conn.rs | 13 ++-- httpret/src/lib.rs | 24 +------ retrieval/src/client.rs | 66 +++++++++++-------- retrieval/src/test.rs | 47 ++++++++------ 25 files changed, 424 insertions(+), 663 deletions(-) diff --git a/disk/src/agg.rs b/disk/src/agg.rs index 7994518..cfc3a0e 100644 --- a/disk/src/agg.rs +++ b/disk/src/agg.rs @@ -5,20 +5,18 @@ Aggregation and binning support. use super::eventchunker::EventFull; use crate::agg::binnedt::AggregatableTdim; use crate::agg::eventbatch::{MinMaxAvgScalarEventBatch, MinMaxAvgScalarEventBatchStreamItem}; +use crate::agg::streams::StreamItem; use crate::eventchunker::EventChunkerItem; -use crate::streamlog::LogItem; use bytes::BytesMut; use err::Error; use futures_core::Stream; use futures_util::StreamExt; +use netpod::NanoRange; use netpod::ScalarType; -use netpod::{EventDataReadStats, NanoRange}; use serde::{Deserialize, Serialize}; use std::pin::Pin; use std::task::{Context, Poll}; use std::time::{Duration, Instant}; -#[allow(unused_imports)] -use tracing::{debug, error, info, span, trace, warn, Level}; pub mod binnedt; pub mod binnedx; @@ -35,7 +33,6 @@ pub trait AggregatableXdim1Bin { pub struct ValuesDim0 { tss: Vec, values: Vec>, - // TODO add the stats and flags } impl std::fmt::Debug for ValuesDim0 { @@ -509,15 +506,13 @@ impl Dim1F32Stream { pub enum Dim1F32StreamItem { Values(ValuesDim1), RangeComplete, - EventDataReadStats(EventDataReadStats), - Log(LogItem), } impl Stream for Dim1F32Stream where - S: Stream> + Unpin, + S: Stream, Error>> + Unpin, { - type Item = Result; + type Item = Result, Error>; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; @@ -532,22 +527,23 @@ where Ready(Some(Ok(k))) => { let inst1 = Instant::now(); let u = match k { - EventChunkerItem::Events(events) => match self.process_event_data(&events) { - Ok(k) => { - let ret = Dim1F32StreamItem::Values(k); - Ready(Some(Ok(ret))) - } - Err(e) => { - self.errored = true; - Ready(Some(Err(e))) + StreamItem::DataItem(item) => match item { + EventChunkerItem::Events(events) => match self.process_event_data(&events) { + Ok(k) => { + let ret = Dim1F32StreamItem::Values(k); + Ready(Some(Ok(StreamItem::DataItem(ret)))) + } + Err(e) => { + self.errored = true; + Ready(Some(Err(e))) + } + }, + EventChunkerItem::RangeComplete => { + Ready(Some(Ok(StreamItem::DataItem(Dim1F32StreamItem::RangeComplete)))) } }, - EventChunkerItem::RangeComplete => Ready(Some(Ok(Dim1F32StreamItem::RangeComplete))), - EventChunkerItem::Log(item) => Ready(Some(Ok(Dim1F32StreamItem::Log(item)))), - EventChunkerItem::EventDataReadStats(stats) => { - let ret = Dim1F32StreamItem::EventDataReadStats(stats); - Ready(Some(Ok(ret))) - } + StreamItem::Log(item) => Ready(Some(Ok(StreamItem::Log(item)))), + StreamItem::Stats(item) => Ready(Some(Ok(StreamItem::Stats(item)))), }; let inst2 = Instant::now(); // TODO do something with the measured time. @@ -570,12 +566,12 @@ where pub trait IntoDim1F32Stream { fn into_dim_1_f32_stream(self) -> Dim1F32Stream where - Self: Stream> + Sized; + Self: Stream, Error>> + Sized; } impl IntoDim1F32Stream for T where - T: Stream>, + T: Stream, Error>>, { fn into_dim_1_f32_stream(self) -> Dim1F32Stream { Dim1F32Stream::new(self) @@ -588,10 +584,6 @@ impl AggregatableXdim1Bin for Dim1F32StreamItem { fn into_agg(self) -> Self::Output { match self { Dim1F32StreamItem::Values(vals) => MinMaxAvgScalarEventBatchStreamItem::Values(vals.into_agg()), - Dim1F32StreamItem::EventDataReadStats(stats) => { - MinMaxAvgScalarEventBatchStreamItem::EventDataReadStats(stats) - } - Dim1F32StreamItem::Log(item) => MinMaxAvgScalarEventBatchStreamItem::Log(item), Dim1F32StreamItem::RangeComplete => MinMaxAvgScalarEventBatchStreamItem::RangeComplete, } } diff --git a/disk/src/agg/binnedt.rs b/disk/src/agg/binnedt.rs index 9f37c37..302a42b 100644 --- a/disk/src/agg/binnedt.rs +++ b/disk/src/agg/binnedt.rs @@ -1,10 +1,10 @@ +use crate::agg::streams::StreamItem; use crate::agg::AggregatableXdim1Bin; -use crate::streamlog::LogItem; use err::Error; use futures_core::Stream; use futures_util::StreamExt; use netpod::log::*; -use netpod::{BinnedRange, EventDataReadStats}; +use netpod::BinnedRange; use std::collections::VecDeque; use std::pin::Pin; use std::task::{Context, Poll}; @@ -25,12 +25,6 @@ pub trait AggregatableTdim: Sized { fn aggregator_new_static(ts1: u64, ts2: u64) -> Self::Aggregator; fn is_range_complete(&self) -> bool; fn make_range_complete_item() -> Option; - fn is_log_item(&self) -> bool; - fn log_item(self) -> Option; - fn make_log_item(item: LogItem) -> Option; - fn is_stats_item(&self) -> bool; - fn stats_item(self) -> Option; - fn make_stats_item(item: EventDataReadStats) -> Option; } pub trait IntoBinnedT { @@ -40,9 +34,8 @@ pub trait IntoBinnedT { impl IntoBinnedT for S where - S: Stream> + Unpin, + S: Stream, Error>> + Unpin, I: AggregatableTdim + Unpin, - //I: AggregatableTdim, I::Aggregator: Unpin, { type StreamOut = IntoBinnedTDefaultStream; @@ -54,7 +47,7 @@ where pub struct IntoBinnedTDefaultStream where - S: Stream>, + S: Stream, Error>>, I: AggregatableTdim, { inp: S, @@ -65,7 +58,7 @@ where all_bins_emitted: bool, range_complete_observed: bool, range_complete_emitted: bool, - left: Option>>>, + left: Option, Error>>>>, errored: bool, completed: bool, tmp_agg_results: VecDeque<::OutputValue>, @@ -73,7 +66,7 @@ where impl IntoBinnedTDefaultStream where - S: Stream> + Unpin, + S: Stream, Error>> + Unpin, I: AggregatableTdim, { pub fn new(inp: S, spec: BinnedRange) -> Self { @@ -94,7 +87,7 @@ where } } - fn cur(&mut self, cx: &mut Context) -> Poll>> { + fn cur(&mut self, cx: &mut Context) -> Poll, Error>>> { if let Some(cur) = self.left.take() { cur } else if self.inp_completed { @@ -122,64 +115,44 @@ where fn handle( &mut self, - cur: Poll>>, - ) -> Option::OutputValue, Error>>>> { + cur: Poll, Error>>>, + ) -> Option::OutputValue>, Error>>>> { use Poll::*; match cur { - Ready(Some(Ok(k))) => { - if k.is_range_complete() { - self.range_complete_observed = true; - None - } else if k.is_log_item() { - if let Some(item) = k.log_item() { - if let Some(item) = ::OutputValue::make_log_item(item) { - Some(Ready(Some(Ok(item)))) - } else { - error!("IntoBinnedTDefaultStream can not create log item"); + Ready(Some(Ok(item))) => match item { + StreamItem::Log(item) => Some(Ready(Some(Ok(StreamItem::Log(item))))), + StreamItem::Stats(item) => Some(Ready(Some(Ok(StreamItem::Stats(item))))), + StreamItem::DataItem(item) => { + if item.is_range_complete() { + self.range_complete_observed = true; + None + } else if self.all_bins_emitted { + // Just drop the item because we will not emit anymore data. + // Could also at least gather some stats. + None + } else { + let ag = self.aggtor.as_mut().unwrap(); + if ag.ends_before(&item) { None - } - } else { - error!("supposed to be log item but can't take it"); - None - } - } else if k.is_stats_item() { - if let Some(item) = k.stats_item() { - if let Some(item) = ::OutputValue::make_stats_item(item) { - Some(Ready(Some(Ok(item)))) - } else { - error!("IntoBinnedTDefaultStream can not create stats item"); - None - } - } else { - error!("supposed to be stats item but can't take it"); - None - } - } else if self.all_bins_emitted { - // Just drop the item because we will not emit anymore data. - // Could also at least gather some stats. - None - } else { - let ag = self.aggtor.as_mut().unwrap(); - if ag.ends_before(&k) { - None - } else if ag.starts_after(&k) { - self.left = Some(Ready(Some(Ok(k)))); - self.cycle_current_bin(); - // TODO cycle_current_bin enqueues the bin, can I return here instead? - None - } else { - let mut k = k; - ag.ingest(&mut k); - let k = k; - if ag.ends_after(&k) { - self.left = Some(Ready(Some(Ok(k)))); + } else if ag.starts_after(&item) { + self.left = Some(Ready(Some(Ok(StreamItem::DataItem(item))))); self.cycle_current_bin(); + // TODO cycle_current_bin enqueues the bin, can I return here instead? + None + } else { + let mut item = item; + ag.ingest(&mut item); + let item = item; + if ag.ends_after(&item) { + self.left = Some(Ready(Some(Ok(StreamItem::DataItem(item))))); + self.cycle_current_bin(); + } + // TODO cycle_current_bin enqueues the bin, can I return here instead? + None } - // TODO cycle_current_bin enqueues the bin, can I return here instead? - None } } - } + }, Ready(Some(Err(e))) => { self.errored = true; Some(Ready(Some(Err(e)))) @@ -201,12 +174,11 @@ where impl Stream for IntoBinnedTDefaultStream where - S: Stream> + Unpin, - //I: AggregatableTdim, + S: Stream, Error>> + Unpin, I: AggregatableTdim + Unpin, I::Aggregator: Unpin, { - type Item = Result<::OutputValue, Error>; + type Item = Result::OutputValue>, Error>; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; @@ -217,17 +189,15 @@ where self.completed = true; Ready(None) } else if let Some(item) = self.tmp_agg_results.pop_front() { - Ready(Some(Ok(item))) + Ready(Some(Ok(StreamItem::DataItem(item)))) } else if self.range_complete_emitted { self.completed = true; Ready(None) } else if self.inp_completed && self.all_bins_emitted { self.range_complete_emitted = true; if self.range_complete_observed { - // TODO why can't I declare that type alias? - //type TT = I; if let Some(item) = ::OutputValue::make_range_complete_item() { - Ready(Some(Ok(item))) + Ready(Some(Ok(StreamItem::DataItem(item)))) } else { warn!("IntoBinnedTDefaultStream should emit RangeComplete but it doesn't have one"); continue 'outer; diff --git a/disk/src/agg/binnedx.rs b/disk/src/agg/binnedx.rs index f2d1eba..3f3225f 100644 --- a/disk/src/agg/binnedx.rs +++ b/disk/src/agg/binnedx.rs @@ -1,3 +1,4 @@ +use crate::agg::streams::StreamItem; use crate::agg::AggregatableXdim1Bin; use err::Error; use futures_core::Stream; @@ -5,16 +6,20 @@ use futures_util::StreamExt; use std::pin::Pin; use std::task::{Context, Poll}; -pub trait IntoBinnedXBins1 { +pub trait IntoBinnedXBins1 +where + I: AggregatableXdim1Bin, +{ type StreamOut; fn into_binned_x_bins_1(self) -> Self::StreamOut where - Self: Stream>; + Self: Stream, Error>>; } -impl IntoBinnedXBins1 for T +impl IntoBinnedXBins1 for T where - T: Stream> + Unpin, + T: Stream, Error>> + Unpin, + I: AggregatableXdim1Bin, { type StreamOut = IntoBinnedXBins1DefaultStream; @@ -25,7 +30,7 @@ where pub struct IntoBinnedXBins1DefaultStream where - S: Stream> + Unpin, + S: Stream, Error>> + Unpin, I: AggregatableXdim1Bin, { inp: S, @@ -33,15 +38,19 @@ where impl Stream for IntoBinnedXBins1DefaultStream where - S: Stream> + Unpin, + S: Stream, Error>> + Unpin, I: AggregatableXdim1Bin, { - type Item = Result; + type Item = Result, Error>; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; match self.inp.poll_next_unpin(cx) { - Ready(Some(Ok(k))) => Ready(Some(Ok(k.into_agg()))), + Ready(Some(Ok(k))) => match k { + StreamItem::Log(item) => Ready(Some(Ok(StreamItem::Log(item)))), + StreamItem::Stats(item) => Ready(Some(Ok(StreamItem::Stats(item)))), + StreamItem::DataItem(item) => Ready(Some(Ok(StreamItem::DataItem(item.into_agg())))), + }, Ready(Some(Err(e))) => Ready(Some(Err(e))), Ready(None) => Ready(None), Pending => Pending, diff --git a/disk/src/agg/eventbatch.rs b/disk/src/agg/eventbatch.rs index 9e4bcaa..d6930b8 100644 --- a/disk/src/agg/eventbatch.rs +++ b/disk/src/agg/eventbatch.rs @@ -1,10 +1,8 @@ use crate::agg::binnedt::{AggregatableTdim, AggregatorTdim}; use crate::agg::scalarbinbatch::{MinMaxAvgScalarBinBatch, MinMaxAvgScalarBinBatchStreamItem}; use crate::agg::AggregatableXdim1Bin; -use crate::streamlog::LogItem; use bytes::{BufMut, Bytes, BytesMut}; use netpod::log::*; -use netpod::EventDataReadStats; use serde::{Deserialize, Serialize}; use std::mem::size_of; @@ -120,30 +118,6 @@ impl AggregatableTdim for MinMaxAvgScalarEventBatch { fn make_range_complete_item() -> Option { None } - - fn is_log_item(&self) -> bool { - false - } - - fn log_item(self) -> Option { - None - } - - fn make_log_item(_item: LogItem) -> Option { - None - } - - fn is_stats_item(&self) -> bool { - false - } - - fn stats_item(self) -> Option { - None - } - - fn make_stats_item(_item: EventDataReadStats) -> Option { - None - } } impl MinMaxAvgScalarEventBatch { @@ -269,8 +243,6 @@ impl AggregatorTdim for MinMaxAvgScalarEventBatchAggregator { pub enum MinMaxAvgScalarEventBatchStreamItem { Values(MinMaxAvgScalarEventBatch), RangeComplete, - EventDataReadStats(EventDataReadStats), - Log(LogItem), } impl AggregatableXdim1Bin for MinMaxAvgScalarEventBatchStreamItem { @@ -301,46 +273,6 @@ impl AggregatableTdim for MinMaxAvgScalarEventBatchStreamItem { fn make_range_complete_item() -> Option { Some(MinMaxAvgScalarEventBatchStreamItem::RangeComplete) } - - fn is_log_item(&self) -> bool { - if let MinMaxAvgScalarEventBatchStreamItem::Log(_) = self { - true - } else { - false - } - } - - fn log_item(self) -> Option { - if let MinMaxAvgScalarEventBatchStreamItem::Log(item) = self { - Some(item) - } else { - None - } - } - - fn make_log_item(item: LogItem) -> Option { - Some(MinMaxAvgScalarEventBatchStreamItem::Log(item)) - } - - fn is_stats_item(&self) -> bool { - if let MinMaxAvgScalarEventBatchStreamItem::EventDataReadStats(_) = self { - true - } else { - false - } - } - - fn stats_item(self) -> Option { - if let MinMaxAvgScalarEventBatchStreamItem::EventDataReadStats(item) = self { - Some(item) - } else { - None - } - } - - fn make_stats_item(item: EventDataReadStats) -> Option { - Some(MinMaxAvgScalarEventBatchStreamItem::EventDataReadStats(item)) - } } pub struct MinMaxAvgScalarEventBatchStreamItemAggregator { @@ -382,9 +314,7 @@ impl AggregatorTdim for MinMaxAvgScalarEventBatchStreamItemAggregator { fn ingest(&mut self, inp: &mut Self::InputValue) { match inp { MinMaxAvgScalarEventBatchStreamItem::Values(vals) => self.agg.ingest(vals), - MinMaxAvgScalarEventBatchStreamItem::EventDataReadStats(_) => panic!(), MinMaxAvgScalarEventBatchStreamItem::RangeComplete => panic!(), - MinMaxAvgScalarEventBatchStreamItem::Log(_) => panic!(), } } diff --git a/disk/src/agg/scalarbinbatch.rs b/disk/src/agg/scalarbinbatch.rs index b5dc530..976385a 100644 --- a/disk/src/agg/scalarbinbatch.rs +++ b/disk/src/agg/scalarbinbatch.rs @@ -3,12 +3,11 @@ use crate::agg::streams::Bins; use crate::agg::{AggregatableXdim1Bin, Fits, FitsInside}; use crate::binned::MakeBytesFrame; use crate::frame::makeframe::make_frame; -use crate::streamlog::LogItem; use bytes::{BufMut, Bytes, BytesMut}; use err::Error; use netpod::log::*; use netpod::timeunits::SEC; -use netpod::{EventDataReadStats, NanoRange}; +use netpod::NanoRange; use serde::{Deserialize, Serialize}; use std::mem::size_of; @@ -208,30 +207,6 @@ impl AggregatableTdim for MinMaxAvgScalarBinBatch { fn make_range_complete_item() -> Option { None } - - fn is_log_item(&self) -> bool { - false - } - - fn log_item(self) -> Option { - None - } - - fn make_log_item(_item: LogItem) -> Option { - None - } - - fn is_stats_item(&self) -> bool { - false - } - - fn stats_item(self) -> Option { - None - } - - fn make_stats_item(_item: EventDataReadStats) -> Option { - None - } } impl Bins for MinMaxAvgScalarBinBatch { @@ -339,8 +314,6 @@ impl AggregatorTdim for MinMaxAvgScalarBinBatchAggregator { pub enum MinMaxAvgScalarBinBatchStreamItem { Values(MinMaxAvgScalarBinBatch), RangeComplete, - EventDataReadStats(EventDataReadStats), - Log(LogItem), } impl AggregatableTdim for MinMaxAvgScalarBinBatchStreamItem { @@ -362,46 +335,6 @@ impl AggregatableTdim for MinMaxAvgScalarBinBatchStreamItem { fn make_range_complete_item() -> Option { Some(MinMaxAvgScalarBinBatchStreamItem::RangeComplete) } - - fn is_log_item(&self) -> bool { - if let MinMaxAvgScalarBinBatchStreamItem::Log(_) = self { - true - } else { - false - } - } - - fn log_item(self) -> Option { - if let MinMaxAvgScalarBinBatchStreamItem::Log(item) = self { - Some(item) - } else { - None - } - } - - fn make_log_item(item: LogItem) -> Option { - Some(MinMaxAvgScalarBinBatchStreamItem::Log(item)) - } - - fn is_stats_item(&self) -> bool { - if let MinMaxAvgScalarBinBatchStreamItem::EventDataReadStats(_) = self { - true - } else { - false - } - } - - fn stats_item(self) -> Option { - if let MinMaxAvgScalarBinBatchStreamItem::EventDataReadStats(item) = self { - Some(item) - } else { - None - } - } - - fn make_stats_item(item: EventDataReadStats) -> Option { - Some(MinMaxAvgScalarBinBatchStreamItem::EventDataReadStats(item)) - } } impl AggregatableXdim1Bin for MinMaxAvgScalarBinBatchStreamItem { @@ -457,9 +390,7 @@ impl AggregatorTdim for MinMaxAvgScalarBinBatchStreamItemAggregator { fn ingest(&mut self, inp: &mut Self::InputValue) { match inp { MinMaxAvgScalarBinBatchStreamItem::Values(vals) => self.agg.ingest(vals), - MinMaxAvgScalarBinBatchStreamItem::EventDataReadStats(_) => panic!(), MinMaxAvgScalarBinBatchStreamItem::RangeComplete => panic!(), - MinMaxAvgScalarBinBatchStreamItem::Log(_) => panic!(), } } diff --git a/disk/src/agg/streams.rs b/disk/src/agg/streams.rs index 2de26f8..06c7701 100644 --- a/disk/src/agg/streams.rs +++ b/disk/src/agg/streams.rs @@ -38,20 +38,16 @@ pub trait ToJsonResult { impl AggregatableXdim1Bin for StreamItem where - // TODO bound on the Output ??? - //T: AggregatableTdim + AggregatableXdim1Bin, T: AggregatableTdim + AggregatableXdim1Bin, { type Output = StreamItem<::Output>; fn into_agg(self) -> Self::Output { - // TODO how to handle the type mismatch? - /*match self { - Self::Log(item) => Self::Log(item), - Self::Stats(item) => Self::Stats(item), - Self::DataItem(item) => Self::DataItem(item.into_agg()), - }*/ - err::todoval() + match self { + Self::Log(item) => Self::Output::Log(item), + Self::Stats(item) => Self::Output::Stats(item), + Self::DataItem(item) => Self::Output::DataItem(item.into_agg()), + } } } @@ -60,7 +56,6 @@ where T: AggregatableTdim, { inner_agg: ::Aggregator, - _mark: std::marker::PhantomData, } impl StreamItemAggregator @@ -70,7 +65,6 @@ where pub fn new(ts1: u64, ts2: u64) -> Self { Self { inner_agg: ::aggregator_new_static(ts1, ts2), - _mark: std::marker::PhantomData::default(), } } } @@ -83,23 +77,45 @@ where type OutputValue = StreamItem<<::Aggregator as AggregatorTdim>::OutputValue>; fn ends_before(&self, inp: &Self::InputValue) -> bool { - todo!() + match inp { + StreamItem::Log(_) => false, + StreamItem::Stats(_) => false, + StreamItem::DataItem(item) => self.inner_agg.ends_before(item), + } } fn ends_after(&self, inp: &Self::InputValue) -> bool { - todo!() + match inp { + StreamItem::Log(_) => false, + StreamItem::Stats(_) => false, + StreamItem::DataItem(item) => self.inner_agg.ends_after(item), + } } fn starts_after(&self, inp: &Self::InputValue) -> bool { - todo!() + match inp { + StreamItem::Log(_) => false, + StreamItem::Stats(_) => false, + StreamItem::DataItem(item) => self.inner_agg.starts_after(item), + } } fn ingest(&mut self, inp: &mut Self::InputValue) { - todo!() + match inp { + StreamItem::Log(_) => {} + StreamItem::Stats(_) => {} + StreamItem::DataItem(item) => { + self.inner_agg.ingest(item); + } + } } fn result(self) -> Vec { - todo!() + self.inner_agg + .result() + .into_iter() + .map(|k| StreamItem::DataItem(k)) + .collect() } } @@ -129,51 +145,4 @@ where None => None, } } - - // TODO refactor: the point of having the StreamItem is that this function is no longer necessary: - fn is_log_item(&self) -> bool { - if let Self::Log(_) = self { - true - } else { - false - } - } - - // TODO should be able to remove this from trait: - fn log_item(self) -> Option { - if let Self::Log(item) = self { - Some(item) - } else { - None - } - } - - // TODO should be able to remove this from trait: - fn make_log_item(item: LogItem) -> Option { - Some(Self::Log(item)) - } - - // TODO should be able to remove this from trait: - fn is_stats_item(&self) -> bool { - if let Self::Stats(_) = self { - true - } else { - false - } - } - - // TODO should be able to remove this from trait: - fn stats_item(self) -> Option { - if let Self::Stats(_item) = self { - // TODO this whole function should no longer be needed. - Some(err::todoval()) - } else { - None - } - } - - // TODO should be able to remove this from trait: - fn make_stats_item(item: EventDataReadStats) -> Option { - Some(Self::Stats(StatsItem::EventDataReadStats(item))) - } } diff --git a/disk/src/aggtest.rs b/disk/src/aggtest.rs index 90de027..d22ada8 100644 --- a/disk/src/aggtest.rs +++ b/disk/src/aggtest.rs @@ -1,6 +1,7 @@ use super::agg::IntoDim1F32Stream; use crate::agg::binnedt::IntoBinnedT; use crate::agg::binnedx::IntoBinnedXBins1; +use crate::eventblobs::EventBlobsComplete; use crate::eventchunker::EventChunkerConf; use futures_util::StreamExt; use netpod::timeunits::*; @@ -56,30 +57,25 @@ async fn agg_x_dim_0_inner() { let ts2 = ts1 + HOUR * 24; let range = NanoRange { beg: ts1, end: ts2 }; let event_chunker_conf = EventChunkerConf::new(ByteSize::kb(1024)); - let fut1 = super::eventblobs::EventBlobsComplete::new( + let fut1 = EventBlobsComplete::new( range.clone(), query.channel_config.clone(), node.clone(), 0, query.buffer_size as usize, event_chunker_conf, - ) - .into_dim_1_f32_stream() - .into_binned_x_bins_1() - .map(|k| { - if false { - trace!("after X binning {:?}", k.as_ref().unwrap()); - } - k - }) - .into_binned_t(BinnedRange::covering_range(range, bin_count).unwrap().unwrap()) - .map(|k| { - if false { - trace!("after T binning {:?}", k.as_ref().unwrap()); - } - k - }) - .for_each(|_k| ready(())); + ); + let fut1 = IntoDim1F32Stream::into_dim_1_f32_stream(fut1); + let fut1 = IntoBinnedXBins1::into_binned_x_bins_1(fut1); + let fut1 = fut1 + .into_binned_t(BinnedRange::covering_range(range, bin_count).unwrap().unwrap()) + .map(|k| { + if false { + trace!("after T binning {:?}", k.as_ref().unwrap()); + } + k + }) + .for_each(|_k| ready(())); fut1.await; } diff --git a/disk/src/binned.rs b/disk/src/binned.rs index bc1ab5c..3e7c5c1 100644 --- a/disk/src/binned.rs +++ b/disk/src/binned.rs @@ -1,5 +1,5 @@ use crate::agg::binnedt::{AggregatableTdim, AggregatorTdim}; -use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch; +use crate::agg::scalarbinbatch::{MinMaxAvgScalarBinBatch, MinMaxAvgScalarBinBatchAggregator}; use crate::agg::streams::{Collectable, Collected, StreamItem, ToJsonResult}; use crate::agg::AggregatableXdim1Bin; use crate::binned::scalar::binned_scalar_stream; @@ -7,14 +7,13 @@ use crate::binnedstream::{BinnedScalarStreamFromPreBinnedPatches, BinnedStream}; use crate::cache::BinnedQuery; use crate::channelconfig::{extract_matching_config_entry, read_local_config}; use crate::frame::makeframe::make_frame; -use crate::streamlog::LogItem; use bytes::Bytes; use chrono::{TimeZone, Utc}; use err::Error; use futures_core::Stream; use futures_util::StreamExt; use netpod::log::*; -use netpod::{AggKind, BinnedRange, EventDataReadStats, NodeConfigCached}; +use netpod::{AggKind, BinnedRange, NodeConfigCached}; use num_traits::Zero; use serde::{Deserialize, Serialize, Serializer}; use std::pin::Pin; @@ -146,31 +145,58 @@ impl AggregatableXdim1Bin for BinnedScalarStreamItem { } } -pub struct BinnedScalarStreamItemAggregator {} +pub struct BinnedScalarStreamItemAggregator { + inner_agg: MinMaxAvgScalarBinBatchAggregator, +} +impl BinnedScalarStreamItemAggregator { + pub fn new(ts1: u64, ts2: u64) -> Self { + Self { + inner_agg: MinMaxAvgScalarBinBatchAggregator::new(ts1, ts2), + } + } +} + +// TODO this could be some generic impl for all wrapper that can carry some AggregatableTdim variant. impl AggregatorTdim for BinnedScalarStreamItemAggregator { type InputValue = BinnedScalarStreamItem; // TODO using the same type for the output, does this cover all cases? type OutputValue = BinnedScalarStreamItem; fn ends_before(&self, inp: &Self::InputValue) -> bool { - todo!() + match inp { + Self::OutputValue::Values(item) => self.inner_agg.ends_before(item), + Self::OutputValue::RangeComplete => false, + } } fn ends_after(&self, inp: &Self::InputValue) -> bool { - todo!() + match inp { + Self::OutputValue::Values(item) => self.inner_agg.ends_after(item), + Self::OutputValue::RangeComplete => false, + } } fn starts_after(&self, inp: &Self::InputValue) -> bool { - todo!() + match inp { + Self::OutputValue::Values(item) => self.inner_agg.starts_after(item), + Self::OutputValue::RangeComplete => false, + } } fn ingest(&mut self, inp: &mut Self::InputValue) { - todo!() + match inp { + Self::OutputValue::Values(item) => self.inner_agg.ingest(item), + Self::OutputValue::RangeComplete => (), + } } fn result(self) -> Vec { - todo!() + self.inner_agg + .result() + .into_iter() + .map(|k| BinnedScalarStreamItem::Values(k)) + .collect() } } @@ -180,39 +206,19 @@ impl AggregatableTdim for BinnedScalarStreamItem { type Output = BinnedScalarStreamItem; fn aggregator_new_static(ts1: u64, ts2: u64) -> Self::Aggregator { - todo!() + BinnedScalarStreamItemAggregator::new(ts1, ts2) } fn is_range_complete(&self) -> bool { - todo!() + if let Self::RangeComplete = self { + true + } else { + false + } } fn make_range_complete_item() -> Option { - todo!() - } - - fn is_log_item(&self) -> bool { - todo!() - } - - fn log_item(self) -> Option { - todo!() - } - - fn make_log_item(item: LogItem) -> Option { - todo!() - } - - fn is_stats_item(&self) -> bool { - todo!() - } - - fn stats_item(self) -> Option { - todo!() - } - - fn make_stats_item(item: EventDataReadStats) -> Option { - todo!() + Some(Self::RangeComplete) } } diff --git a/disk/src/binned/scalar.rs b/disk/src/binned/scalar.rs index afd2ec0..d03b828 100644 --- a/disk/src/binned/scalar.rs +++ b/disk/src/binned/scalar.rs @@ -1,6 +1,6 @@ use crate::agg::binnedt::IntoBinnedT; use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatchStreamItem; -use crate::agg::streams::{StatsItem, StreamItem}; +use crate::agg::streams::StreamItem; use crate::binned::{BinnedScalarStreamItem, BinnedStreamRes}; use crate::binnedstream::{BinnedScalarStreamFromPreBinnedPatches, BinnedStream}; use crate::cache::{BinnedQuery, MergedFromRemotes}; @@ -11,20 +11,20 @@ use netpod::log::*; use netpod::{BinnedRange, NodeConfigCached, PerfOpts, PreBinnedPatchIterator, PreBinnedPatchRange}; pub fn adapter_to_stream_item( - k: Result, + k: Result, Error>, ) -> Result, Error> { match k { Ok(k) => match k { - MinMaxAvgScalarBinBatchStreamItem::Log(item) => Ok(StreamItem::Log(item)), - MinMaxAvgScalarBinBatchStreamItem::EventDataReadStats(item) => { - Ok(StreamItem::Stats(StatsItem::EventDataReadStats(item))) - } - MinMaxAvgScalarBinBatchStreamItem::RangeComplete => { - Ok(StreamItem::DataItem(BinnedScalarStreamItem::RangeComplete)) - } - MinMaxAvgScalarBinBatchStreamItem::Values(item) => { - Ok(StreamItem::DataItem(BinnedScalarStreamItem::Values(item))) - } + StreamItem::Log(item) => Ok(StreamItem::Log(item)), + StreamItem::Stats(item) => Ok(StreamItem::Stats(item)), + StreamItem::DataItem(item) => match item { + MinMaxAvgScalarBinBatchStreamItem::RangeComplete => { + Ok(StreamItem::DataItem(BinnedScalarStreamItem::RangeComplete)) + } + MinMaxAvgScalarBinBatchStreamItem::Values(item) => { + Ok(StreamItem::DataItem(BinnedScalarStreamItem::Values(item))) + } + }, }, Err(e) => Err(e), } diff --git a/disk/src/binnedstream.rs b/disk/src/binnedstream.rs index eaf0c70..b4d905a 100644 --- a/disk/src/binnedstream.rs +++ b/disk/src/binnedstream.rs @@ -1,7 +1,5 @@ use crate::agg::binnedt::IntoBinnedT; -use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatchStreamItem; -use crate::agg::streams::{StatsItem, StreamItem}; -use crate::binned::scalar::adapter_to_stream_item; +use crate::agg::streams::StreamItem; use crate::binned::BinnedScalarStreamItem; use crate::cache::pbvfs::{PreBinnedItem, PreBinnedValueFetchedStream}; use crate::cache::{CacheUsage, PreBinnedQuery}; @@ -66,37 +64,34 @@ impl BinnedScalarStreamFromPreBinnedPatches { move |k| { let fit_range = range.full_range(); let g = match k { - Ok(PreBinnedItem::Batch(k)) => { - use super::agg::{Fits, FitsInside}; - match k.fits_inside(fit_range) { - Fits::Inside - | Fits::PartlyGreater - | Fits::PartlyLower - | Fits::PartlyLowerAndGreater => { - Some(Ok(StreamItem::DataItem(BinnedScalarStreamItem::Values(k)))) + Ok(item) => match item { + StreamItem::Log(item) => Some(Ok(StreamItem::Log(item))), + StreamItem::Stats(item) => Some(Ok(StreamItem::Stats(item))), + StreamItem::DataItem(item) => match item { + PreBinnedItem::RangeComplete => { + Some(Ok(StreamItem::DataItem(BinnedScalarStreamItem::RangeComplete))) } - _ => None, - } - } - Ok(PreBinnedItem::RangeComplete) => { - Some(Ok(StreamItem::DataItem(BinnedScalarStreamItem::RangeComplete))) - } - Ok(PreBinnedItem::EventDataReadStats(item)) => { - Some(Ok(StreamItem::Stats(StatsItem::EventDataReadStats(item)))) - } - Ok(PreBinnedItem::Log(item)) => Some(Ok(StreamItem::Log(item))), + PreBinnedItem::Batch(item) => { + use super::agg::{Fits, FitsInside}; + match item.fits_inside(fit_range) { + Fits::Inside + | Fits::PartlyGreater + | Fits::PartlyLower + | Fits::PartlyLowerAndGreater => { + Some(Ok(StreamItem::DataItem(BinnedScalarStreamItem::Values(item)))) + } + _ => None, + } + } + }, + }, Err(e) => Some(Err(e)), }; ready(g) } - }); - //let inp: Box, Error>> + Send + Unpin> = - // Box::new(inp); - //let inp: &Stream, Error>> + Send + Unpin>> = &inp - //() == inp; - let inp = IntoBinnedT::into_binned_t(inp, range); + }) + .into_binned_t(range); Ok(Self { inp: Box::pin(inp) }) - //err::todoval() } } diff --git a/disk/src/cache.rs b/disk/src/cache.rs index 02957e3..cd580b2 100644 --- a/disk/src/cache.rs +++ b/disk/src/cache.rs @@ -1,5 +1,6 @@ use crate::agg::eventbatch::MinMaxAvgScalarEventBatchStreamItem; use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch; +use crate::agg::streams::StreamItem; use crate::cache::pbv::PreBinnedValueByteStream; use crate::cache::pbvfs::PreBinnedItem; use crate::merge::MergedMinMaxAvgScalarStream; @@ -313,7 +314,7 @@ impl AsyncRead for HttpBodyAsAsyncRead { } } -type T001 = Pin> + Send>>; +type T001 = Pin, Error>> + Send>>; type T002 = Pin> + Send>>; pub struct MergedFromRemotes { tcp_establish_futs: Vec, @@ -344,19 +345,17 @@ impl MergedFromRemotes { impl Stream for MergedFromRemotes { // TODO need this generic for scalar and array (when wave is not binned down to a single scalar point) - type Item = Result; + type Item = Result, Error>; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; - if self.completed { - panic!("MergedFromRemotes poll_next on completed"); - } - if self.errored { - self.completed = true; - return Ready(None); - } 'outer: loop { - break if let Some(fut) = &mut self.merged { + break if self.completed { + panic!("MergedFromRemotes poll_next on completed"); + } else if self.errored { + self.completed = true; + return Ready(None); + } else if let Some(fut) = &mut self.merged { match fut.poll_next_unpin(cx) { Ready(Some(Ok(k))) => Ready(Some(Ok(k))), Ready(Some(Err(e))) => { @@ -527,10 +526,10 @@ pub async fn write_pb_cache_min_max_avg_scalar( Ok(()) } -pub async fn read_pbv(mut file: File) -> Result { +pub async fn read_pbv(mut file: File) -> Result, Error> { let mut buf = vec![]; file.read_to_end(&mut buf).await?; trace!("Read cached file len {}", buf.len()); let dec: MinMaxAvgScalarBinBatch = serde_cbor::from_slice(&buf)?; - Ok(PreBinnedItem::Batch(dec)) + Ok(StreamItem::DataItem(PreBinnedItem::Batch(dec))) } diff --git a/disk/src/cache/pbv.rs b/disk/src/cache/pbv.rs index 7e152e8..27a39ba 100644 --- a/disk/src/cache/pbv.rs +++ b/disk/src/cache/pbv.rs @@ -1,5 +1,6 @@ use crate::agg::binnedt::IntoBinnedT; use crate::agg::scalarbinbatch::{MinMaxAvgScalarBinBatch, MinMaxAvgScalarBinBatchStreamItem}; +use crate::agg::streams::StreamItem; use crate::cache::pbvfs::{PreBinnedItem, PreBinnedValueFetchedStream}; use crate::cache::{CacheFileDesc, MergedFromRemotes, PreBinnedQuery}; use crate::frame::makeframe::make_frame; @@ -38,7 +39,7 @@ impl Stream for PreBinnedValueByteStreamInner { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; match self.inp.poll_next_unpin(cx) { - Ready(Some(item)) => match make_frame::>(&item) { + Ready(Some(item)) => match make_frame::, Error>>(&item) { Ok(buf) => Ready(Some(Ok(buf.freeze()))), Err(e) => Ready(Some(Err(e.into()))), }, @@ -52,7 +53,7 @@ pub struct PreBinnedValueStream { query: PreBinnedQuery, node_config: NodeConfigCached, open_check_local_file: Option> + Send>>>, - fut2: Option> + Send>>>, + fut2: Option, Error>> + Send>>>, read_from_cache: bool, cache_written: bool, data_complete: bool, @@ -63,7 +64,7 @@ pub struct PreBinnedValueStream { streamlog: Streamlog, values: MinMaxAvgScalarBinBatch, write_fut: Option> + Send>>>, - read_cache_fut: Option> + Send>>>, + read_cache_fut: Option, Error>> + Send>>>, } impl PreBinnedValueStream { @@ -111,13 +112,21 @@ impl PreBinnedValueStream { let perf_opts = PerfOpts { inmem_bufcap: 512 }; let s1 = MergedFromRemotes::new(evq, perf_opts, self.node_config.node_config.cluster.clone()); let s1 = s1.into_binned_t(range); - let s1 = s1.map(|k| { - use MinMaxAvgScalarBinBatchStreamItem::*; - match k { - Ok(Values(k)) => Ok(PreBinnedItem::Batch(k)), - Ok(RangeComplete) => Ok(PreBinnedItem::RangeComplete), - Ok(EventDataReadStats(stats)) => Ok(PreBinnedItem::EventDataReadStats(stats)), - Ok(Log(item)) => Ok(PreBinnedItem::Log(item)), + let s1 = s1.map(|item| { + // TODO does this do anything? + match item { + Ok(item) => match item { + StreamItem::Log(item) => Ok(StreamItem::Log(item)), + StreamItem::Stats(item) => Ok(StreamItem::Stats(item)), + StreamItem::DataItem(item) => match item { + MinMaxAvgScalarBinBatchStreamItem::RangeComplete => { + Ok(StreamItem::DataItem(PreBinnedItem::RangeComplete)) + } + MinMaxAvgScalarBinBatchStreamItem::Values(item) => { + Ok(StreamItem::DataItem(PreBinnedItem::Batch(item))) + } + }, + }, Err(e) => Err(e), } }); @@ -192,7 +201,7 @@ impl PreBinnedValueStream { impl Stream for PreBinnedValueStream { // TODO need this generic for scalar and array (when wave is not binned down to a single scalar point) - type Item = Result; + type Item = Result, Error>; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; @@ -203,7 +212,7 @@ impl Stream for PreBinnedValueStream { self.completed = true; Ready(None) } else if let Some(item) = self.streamlog.pop() { - Ready(Some(Ok(PreBinnedItem::Log(item)))) + Ready(Some(Ok(StreamItem::Log(item)))) } else if let Some(fut) = &mut self.write_fut { match fut.poll_unpin(cx) { Ready(item) => { @@ -247,7 +256,7 @@ impl Stream for PreBinnedValueStream { if self.cache_written { if self.range_complete_observed { self.range_complete_emitted = true; - Ready(Some(Ok(PreBinnedItem::RangeComplete))) + Ready(Some(Ok(StreamItem::DataItem(PreBinnedItem::RangeComplete)))) } else { self.completed = true; Ready(None) @@ -284,26 +293,25 @@ impl Stream for PreBinnedValueStream { } else if let Some(fut) = self.fut2.as_mut() { match fut.poll_next_unpin(cx) { Ready(Some(k)) => match k { - Ok(PreBinnedItem::RangeComplete) => { - self.range_complete_observed = true; - continue 'outer; - } - Ok(PreBinnedItem::Batch(batch)) => { - self.values.ts1s.extend(batch.ts1s.iter()); - self.values.ts2s.extend(batch.ts2s.iter()); - self.values.counts.extend(batch.counts.iter()); - self.values.mins.extend(batch.mins.iter()); - self.values.maxs.extend(batch.maxs.iter()); - self.values.avgs.extend(batch.avgs.iter()); - Ready(Some(Ok(PreBinnedItem::Batch(batch)))) - } - Ok(PreBinnedItem::EventDataReadStats(stats)) => { - if false { - info!("PreBinnedValueStream ✙ ✙ ✙ ✙ ✙ ✙ ✙ ✙ ✙ ✙ ✙ ✙ ✙ stats {:?}", stats); - } - Ready(Some(Ok(PreBinnedItem::EventDataReadStats(stats)))) - } - Ok(PreBinnedItem::Log(item)) => Ready(Some(Ok(PreBinnedItem::Log(item)))), + Ok(item) => match item { + StreamItem::Log(item) => Ready(Some(Ok(StreamItem::Log(item)))), + StreamItem::Stats(item) => Ready(Some(Ok(StreamItem::Stats(item)))), + StreamItem::DataItem(item) => match item { + PreBinnedItem::RangeComplete => { + self.range_complete_observed = true; + continue 'outer; + } + PreBinnedItem::Batch(batch) => { + self.values.ts1s.extend(batch.ts1s.iter()); + self.values.ts2s.extend(batch.ts2s.iter()); + self.values.counts.extend(batch.counts.iter()); + self.values.mins.extend(batch.mins.iter()); + self.values.maxs.extend(batch.maxs.iter()); + self.values.avgs.extend(batch.avgs.iter()); + Ready(Some(Ok(StreamItem::DataItem(PreBinnedItem::Batch(batch))))) + } + }, + }, Err(e) => { self.errored = true; Ready(Some(Err(e))) diff --git a/disk/src/cache/pbvfs.rs b/disk/src/cache/pbvfs.rs index f33d6a2..50f8715 100644 --- a/disk/src/cache/pbvfs.rs +++ b/disk/src/cache/pbvfs.rs @@ -1,15 +1,15 @@ use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch; +use crate::agg::streams::StreamItem; use crate::cache::{node_ix_for_patch, HttpBodyAsAsyncRead, PreBinnedQuery}; use crate::frame::inmem::InMemoryFrameAsyncReadStream; use crate::frame::makeframe::decode_frame; -use crate::streamlog::LogItem; use err::Error; use futures_core::Stream; use futures_util::{pin_mut, FutureExt}; use http::StatusCode; #[allow(unused_imports)] use netpod::log::*; -use netpod::{EventDataReadStats, NodeConfigCached, PerfOpts}; +use netpod::{NodeConfigCached, PerfOpts}; use serde::{Deserialize, Serialize}; use std::pin::Pin; use std::task::{Context, Poll}; @@ -48,14 +48,11 @@ impl PreBinnedValueFetchedStream { pub enum PreBinnedItem { Batch(MinMaxAvgScalarBinBatch), RangeComplete, - EventDataReadStats(EventDataReadStats), - //ValuesExtractStats(ValuesExtractStats), - Log(LogItem), } impl Stream for PreBinnedValueFetchedStream { // TODO need this generic for scalar and array (when wave is not binned down to a single scalar point) - type Item = Result; + type Item = Result, Error>; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; @@ -70,23 +67,21 @@ impl Stream for PreBinnedValueFetchedStream { break if let Some(res) = self.res.as_mut() { pin_mut!(res); match res.poll_next(cx) { - Ready(Some(Ok(frame))) => match decode_frame::>(&frame) { - Ok(Ok(item)) => { - match &item { - PreBinnedItem::EventDataReadStats(stats) if false => { - info!("PreBinnedValueFetchedStream ✕ ✕ ✕ ✕ ✕ ✕ ✕ ✕ ✕ stats {:?}", stats); + Ready(Some(Ok(item))) => match item { + StreamItem::Log(item) => Ready(Some(Ok(StreamItem::Log(item)))), + StreamItem::Stats(item) => Ready(Some(Ok(StreamItem::Stats(item)))), + StreamItem::DataItem(item) => { + match decode_frame::, Error>>(&item) { + Ok(Ok(item)) => Ready(Some(Ok(item))), + Ok(Err(e)) => { + self.errored = true; + Ready(Some(Err(e))) + } + Err(e) => { + self.errored = true; + Ready(Some(Err(e))) } - _ => {} } - Ready(Some(Ok(item))) - } - Ok(Err(e)) => { - self.errored = true; - Ready(Some(Err(e))) - } - Err(e) => { - self.errored = true; - Ready(Some(Err(e))) } }, Ready(Some(Err(e))) => { diff --git a/disk/src/eventblobs.rs b/disk/src/eventblobs.rs index 2007578..9902896 100644 --- a/disk/src/eventblobs.rs +++ b/disk/src/eventblobs.rs @@ -1,3 +1,4 @@ +use crate::agg::streams::StreamItem; use crate::dataopen::{open_files, OpenedFile}; use crate::eventchunker::{EventChunker, EventChunkerConf, EventChunkerItem}; use crate::file_content_stream; @@ -55,9 +56,9 @@ impl EventBlobsComplete { } impl Stream for EventBlobsComplete { - type Item = Result; + type Item = Result, Error>; - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; 'outer: loop { break if self.completed { @@ -99,7 +100,7 @@ impl Stream for EventBlobsComplete { } None => {} } - Ready(Some(Ok(EventChunkerItem::Log(item)))) + Ready(Some(Ok(StreamItem::Log(item)))) } Err(e) => { self.errored = true; @@ -118,7 +119,7 @@ impl Stream for EventBlobsComplete { self.node_ix ), ); - Ready(Some(Ok(EventChunkerItem::Log(item)))) + Ready(Some(Ok(StreamItem::Log(item)))) } Pending => Pending, }, diff --git a/disk/src/eventchunker.rs b/disk/src/eventchunker.rs index 3d4a63b..2ba6f30 100644 --- a/disk/src/eventchunker.rs +++ b/disk/src/eventchunker.rs @@ -1,4 +1,4 @@ -use crate::streamlog::LogItem; +use crate::agg::streams::{StatsItem, StreamItem}; use crate::{FileChunkRead, NeedMinBuffer}; use bitshuffle::bitshuffle_decompress; use bytes::{Buf, BytesMut}; @@ -348,12 +348,10 @@ impl EventFull { pub enum EventChunkerItem { Events(EventFull), RangeComplete, - EventDataReadStats(EventDataReadStats), - Log(LogItem), } impl Stream for EventChunker { - type Item = Result; + type Item = Result, Error>; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; @@ -368,7 +366,7 @@ impl Stream for EventChunker { parsed_bytes: self.parsed_bytes, }; self.parsed_bytes = 0; - let ret = EventChunkerItem::EventDataReadStats(item); + let ret = StreamItem::Stats(StatsItem::EventDataReadStats(item)); Ready(Some(Ok(ret))) } else if self.sent_beyond_range { self.completed = true; @@ -376,7 +374,7 @@ impl Stream for EventChunker { } else if self.final_stats_sent { self.sent_beyond_range = true; if self.seen_beyond_range { - Ready(Some(Ok(EventChunkerItem::RangeComplete))) + Ready(Some(Ok(StreamItem::DataItem(EventChunkerItem::RangeComplete)))) } else { continue 'outer; } @@ -385,7 +383,7 @@ impl Stream for EventChunker { parsed_bytes: self.parsed_bytes, }; self.parsed_bytes = 0; - let ret = EventChunkerItem::EventDataReadStats(item); + let ret = StreamItem::Stats(StatsItem::EventDataReadStats(item)); self.final_stats_sent = true; Ready(Some(Ok(ret))) } else { @@ -406,7 +404,7 @@ impl Stream for EventChunker { } else { let x = self.need_min; self.inp.set_need_min(x); - let ret = EventChunkerItem::Events(res.events); + let ret = StreamItem::DataItem(EventChunkerItem::Events(res.events)); Ready(Some(Ok(ret))) } } diff --git a/disk/src/frame/inmem.rs b/disk/src/frame/inmem.rs index 7a66c1f..211c7c3 100644 --- a/disk/src/frame/inmem.rs +++ b/disk/src/frame/inmem.rs @@ -1,3 +1,4 @@ +use crate::agg::streams::StreamItem; use crate::frame::makeframe::{INMEM_FRAME_FOOT, INMEM_FRAME_HEAD, INMEM_FRAME_MAGIC}; use bytes::{BufMut, Bytes, BytesMut}; use err::Error; @@ -241,7 +242,7 @@ impl Stream for InMemoryFrameAsyncReadStream where T: AsyncRead + Unpin, { - type Item = Result; + type Item = Result, Error>; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; @@ -270,7 +271,7 @@ where self.completed = true; Ready(None) } - Some(Some(Ok(k))) => Ready(Some(Ok(k))), + Some(Some(Ok(item))) => Ready(Some(Ok(StreamItem::DataItem(item)))), Some(Some(Err(e))) => { self.tryparse = false; self.errored = true; diff --git a/disk/src/frame/makeframe.rs b/disk/src/frame/makeframe.rs index e222bae..ae681f8 100644 --- a/disk/src/frame/makeframe.rs +++ b/disk/src/frame/makeframe.rs @@ -25,10 +25,6 @@ impl FrameType for RawConnOut { const FRAME_TYPE_ID: u32 = 0x04; } -impl FrameType for Result { - const FRAME_TYPE_ID: u32 = 0x05; -} - impl FrameType for Result, Error> { const FRAME_TYPE_ID: u32 = 0x06; } @@ -37,6 +33,10 @@ impl FrameType for Result { const FRAME_TYPE_ID: u32 = 0x07; } +impl FrameType for Result, Error> { + const FRAME_TYPE_ID: u32 = 0x08; +} + pub fn make_frame(item: &FT) -> Result where FT: FrameType + Serialize, diff --git a/disk/src/lib.rs b/disk/src/lib.rs index 1041908..cff5be3 100644 --- a/disk/src/lib.rs +++ b/disk/src/lib.rs @@ -1,6 +1,5 @@ use crate::dataopen::open_files; use crate::dtflags::{ARRAY, BIG_ENDIAN, COMPRESSION, SHAPE}; -use crate::eventchunker::EventChunkerConf; use bytes::{Bytes, BytesMut}; use err::Error; use futures_core::Stream; @@ -334,51 +333,6 @@ pub fn file_content_stream( } } -pub fn parsed1( - query: &netpod::AggQuerySingleChannel, - node: &Node, - stats_conf: EventChunkerConf, -) -> impl Stream> + Send { - let query = query.clone(); - let node = node.clone(); - async_stream::stream! { - let filerx = open_files(err::todoval(), err::todoval(), node); - while let Ok(fileres) = filerx.recv().await { - match fileres { - Ok(file) => { - let inp = Box::pin(file_content_stream(file.file.unwrap(), query.buffer_size as usize)); - let range = err::todoval(); - let max_ts = err::todoval(); - let mut chunker = eventchunker::EventChunker::from_event_boundary(inp, err::todoval(), range, stats_conf.clone(), file.path, max_ts); - while let Some(evres) = chunker.next().await { - use eventchunker::EventChunkerItem; - match evres { - Ok(EventChunkerItem::Events(evres)) => { - //let mut buf = BytesMut::with_capacity(16); - // TODO put some interesting information to test - //buf.put_u64_le(0xcafecafe); - //yield Ok(buf.freeze()) - for bufopt in evres.decomps { - if let Some(buf) = bufopt { - yield Ok(buf.freeze()); - } - } - } - Err(e) => { - yield Err(e) - } - _ => todo!(), - } - } - } - Err(e) => { - yield Err(e); - } - } - } - } -} - pub struct NeedMinBuffer { inp: Pin> + Send>>, need_min: u32, diff --git a/disk/src/merge.rs b/disk/src/merge.rs index cca7251..8f57228 100644 --- a/disk/src/merge.rs +++ b/disk/src/merge.rs @@ -1,4 +1,5 @@ use crate::agg::eventbatch::{MinMaxAvgScalarEventBatch, MinMaxAvgScalarEventBatchStreamItem}; +use crate::agg::streams::{StatsItem, StreamItem}; use crate::streamlog::LogItem; use err::Error; use futures_core::Stream; @@ -11,7 +12,7 @@ use std::task::{Context, Poll}; pub struct MergedMinMaxAvgScalarStream where - S: Stream>, + S: Stream, Error>>, { inps: Vec, current: Vec, @@ -31,7 +32,7 @@ where impl MergedMinMaxAvgScalarStream where - S: Stream> + Unpin, + S: Stream, Error>> + Unpin, { pub fn new(inps: Vec) -> Self { let n = inps.len(); @@ -66,29 +67,35 @@ where 'l1: loop { break match self.inps[i1].poll_next_unpin(cx) { Ready(Some(Ok(k))) => match k { - MinMaxAvgScalarEventBatchStreamItem::Values(vals) => { - self.ixs[i1] = 0; - self.current[i1] = MergedMinMaxAvgScalarStreamCurVal::Val(vals); - } - MinMaxAvgScalarEventBatchStreamItem::RangeComplete => { - self.range_complete_observed[i1] = true; - let d = self.range_complete_observed.iter().filter(|&&k| k).count(); - if d == self.range_complete_observed.len() { - self.range_complete_observed_all = true; - debug!("MergedMinMaxAvgScalarStream range_complete d {} COMPLETE", d); - } else { - trace!("MergedMinMaxAvgScalarStream range_complete d {}", d); - } - continue 'l1; - } - MinMaxAvgScalarEventBatchStreamItem::Log(item) => { + StreamItem::Log(item) => { self.logitems.push_back(item); continue 'l1; } - MinMaxAvgScalarEventBatchStreamItem::EventDataReadStats(stats) => { - self.event_data_read_stats_items.push_back(stats); + StreamItem::Stats(item) => { + match item { + StatsItem::EventDataReadStats(item) => { + self.event_data_read_stats_items.push_back(item); + } + } continue 'l1; } + StreamItem::DataItem(item) => match item { + MinMaxAvgScalarEventBatchStreamItem::Values(vals) => { + self.ixs[i1] = 0; + self.current[i1] = MergedMinMaxAvgScalarStreamCurVal::Val(vals); + } + MinMaxAvgScalarEventBatchStreamItem::RangeComplete => { + self.range_complete_observed[i1] = true; + let d = self.range_complete_observed.iter().filter(|&&k| k).count(); + if d == self.range_complete_observed.len() { + self.range_complete_observed_all = true; + debug!("MergedMinMaxAvgScalarStream range_complete d {} COMPLETE", d); + } else { + trace!("MergedMinMaxAvgScalarStream range_complete d {}", d); + } + continue 'l1; + } + }, }, Ready(Some(Err(e))) => { // TODO emit this error, consider this stream as done, anything more to do here? @@ -118,9 +125,9 @@ where impl Stream for MergedMinMaxAvgScalarStream where - S: Stream> + Unpin, + S: Stream, Error>> + Unpin, { - type Item = Result; + type Item = Result, Error>; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; @@ -131,9 +138,9 @@ where self.completed = true; Ready(None) } else if let Some(item) = self.logitems.pop_front() { - Ready(Some(Ok(MinMaxAvgScalarEventBatchStreamItem::Log(item)))) + Ready(Some(Ok(StreamItem::Log(item)))) } else if let Some(item) = self.event_data_read_stats_items.pop_front() { - Ready(Some(Ok(MinMaxAvgScalarEventBatchStreamItem::EventDataReadStats(item)))) + Ready(Some(Ok(StreamItem::Stats(StatsItem::EventDataReadStats(item))))) } else if self.data_emit_complete { if self.range_complete_observed_all { if self.range_complete_observed_all_emitted { @@ -141,7 +148,9 @@ where Ready(None) } else { self.range_complete_observed_all_emitted = true; - Ready(Some(Ok(MinMaxAvgScalarEventBatchStreamItem::RangeComplete))) + Ready(Some(Ok(StreamItem::DataItem( + MinMaxAvgScalarEventBatchStreamItem::RangeComplete, + )))) } } else { self.completed = true; @@ -174,7 +183,7 @@ where let k = std::mem::replace(&mut self.batch, MinMaxAvgScalarEventBatch::empty()); let ret = MinMaxAvgScalarEventBatchStreamItem::Values(k); self.data_emit_complete = true; - Ready(Some(Ok(ret))) + Ready(Some(Ok(StreamItem::DataItem(ret)))) } else { self.data_emit_complete = true; continue 'outer; @@ -201,7 +210,7 @@ where if self.batch.tss.len() >= self.batch_size { let k = std::mem::replace(&mut self.batch, MinMaxAvgScalarEventBatch::empty()); let ret = MinMaxAvgScalarEventBatchStreamItem::Values(k); - Ready(Some(Ok(ret))) + Ready(Some(Ok(StreamItem::DataItem(ret)))) } else { continue 'outer; } diff --git a/disk/src/raw.rs b/disk/src/raw.rs index ad3ee4f..8b16790 100644 --- a/disk/src/raw.rs +++ b/disk/src/raw.rs @@ -6,6 +6,7 @@ to request such data from nodes. */ use crate::agg::eventbatch::MinMaxAvgScalarEventBatchStreamItem; +use crate::agg::streams::StreamItem; use crate::frame::inmem::InMemoryFrameAsyncReadStream; use crate::frame::makeframe::{make_frame, make_term_frame}; use crate::raw::bffr::MinMaxAvgScalarEventBatchStreamFromFrames; @@ -39,7 +40,7 @@ pub async fn x_processed_stream_from_node( query: EventsQuery, perf_opts: PerfOpts, node: Node, -) -> Result> + Send>>, Error> { +) -> Result, Error>> + Send>>, Error> { let net = TcpStream::connect(format!("{}:{}", node.host, node.port_raw)).await?; let qjs = serde_json::to_string(&query)?; let (netin, mut netout) = net.into_split(); diff --git a/disk/src/raw/bffr.rs b/disk/src/raw/bffr.rs index 38e9def..fa7c689 100644 --- a/disk/src/raw/bffr.rs +++ b/disk/src/raw/bffr.rs @@ -1,4 +1,5 @@ use crate::agg::eventbatch::MinMaxAvgScalarEventBatchStreamItem; +use crate::agg::streams::StreamItem; use crate::frame::inmem::InMemoryFrameAsyncReadStream; use crate::frame::makeframe::decode_frame; use crate::raw::conn::RawConnOut; @@ -36,7 +37,7 @@ impl Stream for MinMaxAvgScalarEventBatchStreamFromFrames where T: AsyncRead + Unpin, { - type Item = Result; + type Item = Result, Error>; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; @@ -48,38 +49,30 @@ where Ready(None) } else { match self.inp.poll_next_unpin(cx) { - Ready(Some(Ok(frame))) => { - type ExpectedType = RawConnOut; - match decode_frame::(&frame) { - Ok(item) => match item { - Ok(item) => { - if false { - match &item { - MinMaxAvgScalarEventBatchStreamItem::EventDataReadStats(stats) => { - info!("✒✒ ✒✒ ✒✒ ✒✒ ✒✒ ✒✒ stats {:?}", stats); - } - _ => { - info!("✒ ✒ ✒ ✒ other kind") - } - } + Ready(Some(Ok(item))) => match item { + StreamItem::Log(item) => Ready(Some(Ok(StreamItem::Log(item)))), + StreamItem::Stats(item) => Ready(Some(Ok(StreamItem::Stats(item)))), + StreamItem::DataItem(frame) => { + type ExpectedType = RawConnOut; + match decode_frame::(&frame) { + Ok(item) => match item { + Ok(item) => Ready(Some(Ok(item))), + Err(e) => { + self.errored = true; + Ready(Some(Err(e))) } - Ready(Some(Ok(item))) - } + }, Err(e) => { + error!( + "MinMaxAvgScalarEventBatchStreamFromFrames ~~~~~~~~ ERROR on frame payload {}", + frame.buf().len(), + ); self.errored = true; Ready(Some(Err(e))) } - }, - Err(e) => { - error!( - "MinMaxAvgScalarEventBatchStreamFromFrames ~~~~~~~~ ERROR on frame payload {}", - frame.buf().len(), - ); - self.errored = true; - Ready(Some(Err(e))) } } - } + }, Ready(Some(Err(e))) => { self.errored = true; Ready(Some(Err(e))) diff --git a/disk/src/raw/conn.rs b/disk/src/raw/conn.rs index 0117627..2462194 100644 --- a/disk/src/raw/conn.rs +++ b/disk/src/raw/conn.rs @@ -1,5 +1,6 @@ use crate::agg::binnedx::IntoBinnedXBins1; use crate::agg::eventbatch::MinMaxAvgScalarEventBatchStreamItem; +use crate::agg::streams::StreamItem; use crate::agg::IntoDim1F32Stream; use crate::channelconfig::{extract_matching_config_entry, read_local_config}; use crate::eventblobs::EventBlobsComplete; @@ -45,7 +46,7 @@ async fn raw_conn_handler(stream: TcpStream, addr: SocketAddr, node_config: Node } } -pub type RawConnOut = Result; +pub type RawConnOut = Result, Error>; async fn raw_conn_handler_inner( stream: TcpStream, @@ -95,9 +96,10 @@ async fn raw_conn_handler_inner_try( .await { match k { - Ok(k) => { - frames.push(k); + Ok(StreamItem::DataItem(item)) => { + frames.push(item); } + Ok(_) => {} Err(e) => { return Err((e, netout))?; } @@ -162,10 +164,11 @@ async fn raw_conn_handler_inner_try( let mut e = 0; while let Some(item) = s1.next().await { match &item { - Ok(MinMaxAvgScalarEventBatchStreamItem::Values(_)) => { + Ok(StreamItem::DataItem(_)) => { e += 1; } - _ => (), + Ok(_) => {} + Err(_) => {} } match make_frame::(&item) { Ok(buf) => match netout.write_all(&buf).await { diff --git a/httpret/src/lib.rs b/httpret/src/lib.rs index d87f208..a494b6a 100644 --- a/httpret/src/lib.rs +++ b/httpret/src/lib.rs @@ -1,6 +1,5 @@ use bytes::Bytes; use disk::cache::{BinnedQuery, PreBinnedQuery}; -use disk::eventchunker::EventChunkerConf; use disk::raw::conn::raw_service; use err::Error; use future::Future; @@ -11,7 +10,7 @@ use hyper::service::{make_service_fn, service_fn}; use hyper::{server::Server, Body, Request, Response}; use net::SocketAddr; use netpod::log::*; -use netpod::{ByteSize, Node, NodeConfigCached}; +use netpod::NodeConfigCached; use panic::{AssertUnwindSafe, UnwindSafe}; use pin::Pin; use serde::{Deserialize, Serialize}; @@ -124,12 +123,6 @@ async fn data_api_proxy_try(req: Request, node_config: &NodeConfigCached) } else { Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) } - } else if path == "/api/4/parsed_raw" { - if req.method() == Method::POST { - Ok(parsed_raw(req, &node_config.node).await?) - } else { - Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) - } } else if path == "/api/4/binned" { if req.method() == Method::GET { Ok(binned(req, node_config).await?) @@ -173,28 +166,17 @@ where .header("access-control-allow-headers", "*") } -async fn parsed_raw(req: Request, node: &Node) -> Result, Error> { - use netpod::AggQuerySingleChannel; - let reqbody = req.into_body(); - let bodyslice = hyper::body::to_bytes(reqbody).await?; - let query: AggQuerySingleChannel = serde_json::from_slice(&bodyslice)?; - let event_chunker_conf = EventChunkerConf::new(ByteSize::kb(1024)); - let s = disk::parsed1(&query, node, event_chunker_conf); - let res = response(StatusCode::OK).body(Body::wrap_stream(s))?; - Ok(res) -} - struct BodyStreamWrap(netpod::BodyStream); impl hyper::body::HttpBody for BodyStreamWrap { type Data = bytes::Bytes; type Error = Error; - fn poll_data(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll>> { + fn poll_data(self: Pin<&mut Self>, _cx: &mut Context) -> Poll>> { todo!() } - fn poll_trailers(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll, Self::Error>> { + fn poll_trailers(self: Pin<&mut Self>, _cx: &mut Context) -> Poll, Self::Error>> { Poll::Ready(Ok(None)) } } diff --git a/retrieval/src/client.rs b/retrieval/src/client.rs index 067094a..d2f25ea 100644 --- a/retrieval/src/client.rs +++ b/retrieval/src/client.rs @@ -81,41 +81,51 @@ pub async fn get_binned( .map_err(|e| error!("get_binned {:?}", e)) .filter_map(|item| { let g = match item { - Ok(frame) => { - type _ExpectedType2 = disk::binned::BinnedBytesForHttpStreamFrame; - type ExpectedType = Result, Error>; - let type_id_exp = ::FRAME_TYPE_ID; - if frame.tyid() != type_id_exp { - error!("unexpected type id got {} exp {}", frame.tyid(), type_id_exp); + Ok(item) => match item { + StreamItem::Log(item) => { + Streamlog::emit(&item); + None } - let n1 = frame.buf().len(); - match bincode::deserialize::(frame.buf()) { - Ok(item) => match item { - Ok(item) => { - match item { - StreamItem::Log(item) => { - Streamlog::emit(&item); - } - StreamItem::Stats(item) => { - info!("Stats: {:?}", item); - } - StreamItem::DataItem(item) => { - info!("DataItem: {:?}", item); + StreamItem::Stats(item) => { + info!("Stats: {:?}", item); + None + } + StreamItem::DataItem(frame) => { + type _ExpectedType2 = disk::binned::BinnedBytesForHttpStreamFrame; + type ExpectedType = Result, Error>; + let type_id_exp = ::FRAME_TYPE_ID; + if frame.tyid() != type_id_exp { + error!("unexpected type id got {} exp {}", frame.tyid(), type_id_exp); + } + let n1 = frame.buf().len(); + match bincode::deserialize::(frame.buf()) { + Ok(item) => match item { + Ok(item) => { + match item { + StreamItem::Log(item) => { + Streamlog::emit(&item); + } + StreamItem::Stats(item) => { + info!("Stats: {:?}", item); + } + StreamItem::DataItem(item) => { + info!("DataItem: {:?}", item); + } } + Some(Ok(())) } - Some(Ok(())) - } + Err(e) => { + error!("len {} error frame {:?}", n1, e); + Some(Err(e)) + } + }, Err(e) => { - error!("len {} error frame {:?}", n1, e); - Some(Err(e)) + error!("len {} bincode error {:?}", n1, e); + Some(Err(e.into())) } - }, - Err(e) => { - error!("len {} bincode error {:?}", n1, e); - Some(Err(e.into())) } } - } + }, Err(e) => Some(Err(Error::with_msg(format!("{:?}", e)))), }; ready(g) diff --git a/retrieval/src/test.rs b/retrieval/src/test.rs index 460ceb8..e6ac596 100644 --- a/retrieval/src/test.rs +++ b/retrieval/src/test.rs @@ -151,32 +151,41 @@ where .map_err(|e| error!("TEST GOT ERROR {:?}", e)) .filter_map(|item| { let g = match item { - Ok(frame) => { - type ExpectedType = disk::binned::BinnedBytesForHttpStreamFrame; - //info!("TEST GOT FRAME len {}", frame.buf().len()); - match bincode::deserialize::(frame.buf()) { - Ok(item) => match item { + Ok(item) => match item { + StreamItem::Log(item) => { + Streamlog::emit(&item); + None + } + StreamItem::Stats(item) => { + info!("Stats: {:?}", item); + None + } + StreamItem::DataItem(frame) => { + type ExpectedType = disk::binned::BinnedBytesForHttpStreamFrame; + match bincode::deserialize::(frame.buf()) { Ok(item) => match item { - StreamItem::Log(item) => { - Streamlog::emit(&item); - Some(Ok(StreamItem::Log(item))) - } - item => { - info!("TEST GOT ITEM {:?}", item); - Some(Ok(item)) + Ok(item) => match item { + StreamItem::Log(item) => { + Streamlog::emit(&item); + Some(Ok(StreamItem::Log(item))) + } + item => { + info!("TEST GOT ITEM {:?}", item); + Some(Ok(item)) + } + }, + Err(e) => { + error!("TEST GOT ERROR FRAME: {:?}", e); + Some(Err(e)) } }, Err(e) => { - error!("TEST GOT ERROR FRAME: {:?}", e); - Some(Err(e)) + error!("bincode error: {:?}", e); + Some(Err(e.into())) } - }, - Err(e) => { - error!("bincode error: {:?}", e); - Some(Err(e.into())) } } - } + }, Err(e) => Some(Err(Error::with_msg(format!("WEIRD EMPTY ERROR {:?}", e)))), }; ready(g)