From 49af7ce56190d76c8cfab1074aeb10e8810bd68a Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Thu, 20 May 2021 15:39:24 +0200 Subject: [PATCH] WIP on StreamItem unified top-level type, at least it compiles --- disk/src/agg/binnedt.rs | 20 ++-- disk/src/agg/streams.rs | 144 +++++++++++++++++++++++++++++ disk/src/binned.rs | 180 ++++++++++++++++++------------------ disk/src/binned/scalar.rs | 99 ++++++++++++++++++++ disk/src/binnedstream.rs | 60 +++++++----- disk/src/frame/makeframe.rs | 12 +-- retrieval/src/test.rs | 18 ++-- 7 files changed, 394 insertions(+), 139 deletions(-) create mode 100644 disk/src/binned/scalar.rs diff --git a/disk/src/agg/binnedt.rs b/disk/src/agg/binnedt.rs index 8501528..9f37c37 100644 --- a/disk/src/agg/binnedt.rs +++ b/disk/src/agg/binnedt.rs @@ -38,13 +38,14 @@ pub trait IntoBinnedT { fn into_binned_t(self, spec: BinnedRange) -> Self::StreamOut; } -impl IntoBinnedT for T +impl IntoBinnedT for S where + S: Stream> + Unpin, I: AggregatableTdim + Unpin, - T: Stream> + Unpin, + //I: AggregatableTdim, I::Aggregator: Unpin, { - type StreamOut = IntoBinnedTDefaultStream; + type StreamOut = IntoBinnedTDefaultStream; fn into_binned_t(self, spec: BinnedRange) -> Self::StreamOut { IntoBinnedTDefaultStream::new(self, spec) @@ -53,8 +54,8 @@ where pub struct IntoBinnedTDefaultStream where - I: AggregatableTdim, S: Stream>, + I: AggregatableTdim, { inp: S, aggtor: Option, @@ -72,8 +73,8 @@ where impl IntoBinnedTDefaultStream where - I: AggregatableTdim, S: Stream> + Unpin, + I: AggregatableTdim, { pub fn new(inp: S, spec: BinnedRange) -> Self { let range = spec.get_range(0); @@ -200,20 +201,15 @@ where impl Stream for IntoBinnedTDefaultStream where - I: AggregatableTdim + Unpin, S: Stream> + Unpin, + //I: AggregatableTdim, + I: AggregatableTdim + Unpin, I::Aggregator: Unpin, { type Item = Result<::OutputValue, Error>; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; - /* - Reconsider structure here: - I want to exhaust the input stream until it gives Ready(None) because there can be more Status or other new events. - The first time that I recognize that the requested data range is complete, I can set a flag. - After that, I can dismiss incoming data events. - */ 'outer: loop { break if self.completed { panic!("IntoBinnedTDefaultStream poll_next on completed"); diff --git a/disk/src/agg/streams.rs b/disk/src/agg/streams.rs index 24f2da0..2de26f8 100644 --- a/disk/src/agg/streams.rs +++ b/disk/src/agg/streams.rs @@ -1,3 +1,5 @@ +use crate::agg::binnedt::{AggregatableTdim, AggregatorTdim}; +use crate::agg::AggregatableXdim1Bin; use crate::streamlog::LogItem; use err::Error; use netpod::EventDataReadStats; @@ -33,3 +35,145 @@ pub trait ToJsonResult { type Output; fn to_json_result(&self) -> Result; } + +impl AggregatableXdim1Bin for StreamItem +where + // TODO bound on the Output ??? + //T: AggregatableTdim + AggregatableXdim1Bin, + T: AggregatableTdim + AggregatableXdim1Bin, +{ + type Output = StreamItem<::Output>; + + fn into_agg(self) -> Self::Output { + // TODO how to handle the type mismatch? + /*match self { + Self::Log(item) => Self::Log(item), + Self::Stats(item) => Self::Stats(item), + Self::DataItem(item) => Self::DataItem(item.into_agg()), + }*/ + err::todoval() + } +} + +pub struct StreamItemAggregator +where + T: AggregatableTdim, +{ + inner_agg: ::Aggregator, + _mark: std::marker::PhantomData, +} + +impl StreamItemAggregator +where + T: AggregatableTdim, +{ + pub fn new(ts1: u64, ts2: u64) -> Self { + Self { + inner_agg: ::aggregator_new_static(ts1, ts2), + _mark: std::marker::PhantomData::default(), + } + } +} + +impl AggregatorTdim for StreamItemAggregator +where + T: AggregatableTdim, +{ + type InputValue = StreamItem; + type OutputValue = StreamItem<<::Aggregator as AggregatorTdim>::OutputValue>; + + fn ends_before(&self, inp: &Self::InputValue) -> bool { + todo!() + } + + fn ends_after(&self, inp: &Self::InputValue) -> bool { + todo!() + } + + fn starts_after(&self, inp: &Self::InputValue) -> bool { + todo!() + } + + fn ingest(&mut self, inp: &mut Self::InputValue) { + todo!() + } + + fn result(self) -> Vec { + todo!() + } +} + +impl AggregatableTdim for StreamItem +where + T: AggregatableTdim, +{ + type Output = StreamItem< as AggregatorTdim>::OutputValue>; + type Aggregator = StreamItemAggregator; + + fn aggregator_new_static(ts1: u64, ts2: u64) -> Self::Aggregator { + Self::Aggregator::new(ts1, ts2) + } + + fn is_range_complete(&self) -> bool { + match self { + Self::DataItem(item) => item.is_range_complete(), + Self::Log(_) => false, + Self::Stats(_) => false, + } + } + + // TODO refactor: is this necessary to have on the trait? + fn make_range_complete_item() -> Option { + match ::make_range_complete_item() { + Some(k) => Some(Self::DataItem(k)), + None => None, + } + } + + // TODO refactor: the point of having the StreamItem is that this function is no longer necessary: + fn is_log_item(&self) -> bool { + if let Self::Log(_) = self { + true + } else { + false + } + } + + // TODO should be able to remove this from trait: + fn log_item(self) -> Option { + if let Self::Log(item) = self { + Some(item) + } else { + None + } + } + + // TODO should be able to remove this from trait: + fn make_log_item(item: LogItem) -> Option { + Some(Self::Log(item)) + } + + // TODO should be able to remove this from trait: + fn is_stats_item(&self) -> bool { + if let Self::Stats(_) = self { + true + } else { + false + } + } + + // TODO should be able to remove this from trait: + fn stats_item(self) -> Option { + if let Self::Stats(_item) = self { + // TODO this whole function should no longer be needed. + Some(err::todoval()) + } else { + None + } + } + + // TODO should be able to remove this from trait: + fn make_stats_item(item: EventDataReadStats) -> Option { + Some(Self::Stats(StatsItem::EventDataReadStats(item))) + } +} diff --git a/disk/src/binned.rs b/disk/src/binned.rs index 085e556..bc1ab5c 100644 --- a/disk/src/binned.rs +++ b/disk/src/binned.rs @@ -1,24 +1,28 @@ -use crate::agg::binnedt::IntoBinnedT; -use crate::agg::scalarbinbatch::{MinMaxAvgScalarBinBatch, MinMaxAvgScalarBinBatchStreamItem}; -use crate::agg::streams::{Collectable, Collected, StatsItem, StreamItem, ToJsonResult}; -use crate::binnedstream::{BinnedStream, BinnedStreamFromPreBinnedPatches}; -use crate::cache::{BinnedQuery, MergedFromRemotes}; +use crate::agg::binnedt::{AggregatableTdim, AggregatorTdim}; +use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch; +use crate::agg::streams::{Collectable, Collected, StreamItem, ToJsonResult}; +use crate::agg::AggregatableXdim1Bin; +use crate::binned::scalar::binned_scalar_stream; +use crate::binnedstream::{BinnedScalarStreamFromPreBinnedPatches, BinnedStream}; +use crate::cache::BinnedQuery; use crate::channelconfig::{extract_matching_config_entry, read_local_config}; use crate::frame::makeframe::make_frame; -use crate::raw::EventsQuery; +use crate::streamlog::LogItem; use bytes::Bytes; use chrono::{TimeZone, Utc}; use err::Error; use futures_core::Stream; use futures_util::StreamExt; use netpod::log::*; -use netpod::{AggKind, BinnedRange, NodeConfigCached, PerfOpts, PreBinnedPatchIterator, PreBinnedPatchRange}; +use netpod::{AggKind, BinnedRange, EventDataReadStats, NodeConfigCached}; use num_traits::Zero; use serde::{Deserialize, Serialize, Serializer}; use std::pin::Pin; use std::task::{Context, Poll}; use std::time::Duration; +pub mod scalar; + pub struct BinnedStreamRes { pub binned_stream: BinnedStream, pub range: BinnedRange, @@ -133,92 +137,82 @@ impl MakeBytesFrame for Result, Error> { } } -fn adapter_to_stream_item( - k: Result, -) -> Result, Error> { - match k { - Ok(k) => match k { - MinMaxAvgScalarBinBatchStreamItem::Log(item) => Ok(StreamItem::Log(item)), - MinMaxAvgScalarBinBatchStreamItem::EventDataReadStats(item) => { - Ok(StreamItem::Stats(StatsItem::EventDataReadStats(item))) - } - MinMaxAvgScalarBinBatchStreamItem::RangeComplete => { - Ok(StreamItem::DataItem(BinnedScalarStreamItem::RangeComplete)) - } - MinMaxAvgScalarBinBatchStreamItem::Values(item) => { - Ok(StreamItem::DataItem(BinnedScalarStreamItem::Values(item))) - } - }, - Err(e) => Err(e), +impl AggregatableXdim1Bin for BinnedScalarStreamItem { + // TODO does this already include all cases? + type Output = BinnedScalarStreamItem; + + fn into_agg(self) -> Self::Output { + todo!() } } -pub async fn binned_scalar_stream( - node_config: &NodeConfigCached, - query: &BinnedQuery, -) -> Result, Error>>, Error> { - if query.channel().backend != node_config.node.backend { - let err = Error::with_msg(format!( - "backend mismatch node: {} requested: {}", - node_config.node.backend, - query.channel().backend - )); - return Err(err); +pub struct BinnedScalarStreamItemAggregator {} + +impl AggregatorTdim for BinnedScalarStreamItemAggregator { + type InputValue = BinnedScalarStreamItem; + // TODO using the same type for the output, does this cover all cases? + type OutputValue = BinnedScalarStreamItem; + + fn ends_before(&self, inp: &Self::InputValue) -> bool { + todo!() } - let range = BinnedRange::covering_range(query.range().clone(), query.bin_count())?.ok_or(Error::with_msg( - format!("binned_bytes_for_http BinnedRange::covering_range returned None"), - ))?; - let perf_opts = PerfOpts { inmem_bufcap: 512 }; - //let _shape = entry.to_shape()?; - match PreBinnedPatchRange::covering_range(query.range().clone(), query.bin_count()) { - Ok(Some(pre_range)) => { - info!("binned_bytes_for_http found pre_range: {:?}", pre_range); - if range.grid_spec.bin_t_len() < pre_range.grid_spec.bin_t_len() { - let msg = format!( - "binned_bytes_for_http incompatible ranges:\npre_range: {:?}\nrange: {:?}", - pre_range, range - ); - return Err(Error::with_msg(msg)); - } - let s1 = BinnedStreamFromPreBinnedPatches::new( - PreBinnedPatchIterator::from_range(pre_range), - query.channel().clone(), - range.clone(), - query.agg_kind().clone(), - query.cache_usage().clone(), - node_config, - query.disk_stats_every().clone(), - )? - .map(adapter_to_stream_item); - let s = BinnedStream::new(Box::pin(s1))?; - let ret = BinnedStreamRes { - binned_stream: s, - range, - }; - Ok(ret) - } - Ok(None) => { - info!( - "binned_bytes_for_http no covering range for prebinned, merge from remotes instead {:?}", - range - ); - let evq = EventsQuery { - channel: query.channel().clone(), - range: query.range().clone(), - agg_kind: query.agg_kind().clone(), - }; - // TODO do I need to set up more transformations or binning to deliver the requested data? - let s = MergedFromRemotes::new(evq, perf_opts, node_config.node_config.cluster.clone()); - let s = s.into_binned_t(range.clone()); - let s = s.map(adapter_to_stream_item); - let s = BinnedStream::new(Box::pin(s))?; - let ret = BinnedStreamRes { - binned_stream: s, - range, - }; - Ok(ret) - } - Err(e) => Err(e), + + fn ends_after(&self, inp: &Self::InputValue) -> bool { + todo!() + } + + fn starts_after(&self, inp: &Self::InputValue) -> bool { + todo!() + } + + fn ingest(&mut self, inp: &mut Self::InputValue) { + todo!() + } + + fn result(self) -> Vec { + todo!() + } +} + +impl AggregatableTdim for BinnedScalarStreamItem { + type Aggregator = BinnedScalarStreamItemAggregator; + // TODO isn't this already defined in terms of the Aggregator? + type Output = BinnedScalarStreamItem; + + fn aggregator_new_static(ts1: u64, ts2: u64) -> Self::Aggregator { + todo!() + } + + fn is_range_complete(&self) -> bool { + todo!() + } + + fn make_range_complete_item() -> Option { + todo!() + } + + fn is_log_item(&self) -> bool { + todo!() + } + + fn log_item(self) -> Option { + todo!() + } + + fn make_log_item(item: LogItem) -> Option { + todo!() + } + + fn is_stats_item(&self) -> bool { + todo!() + } + + fn stats_item(self) -> Option { + todo!() + } + + fn make_stats_item(item: EventDataReadStats) -> Option { + todo!() } } @@ -237,12 +231,16 @@ pub async fn binned_bytes_for_http( let ret = BinnedBytesForHttpStream::new(res.binned_stream); Ok(Box::pin(ret)) } - AggKind::DimXBinsN(_) => err::todoval(), + AggKind::DimXBinsN(_) => { + let res = binned_scalar_stream(node_config, query).await?; + let ret = BinnedBytesForHttpStream::new(res.binned_stream); + Ok(Box::pin(ret)) + } } } // TODO remove this when no longer used, gets replaced by Result, Error> -pub type BinnedBytesForHttpStreamFrame = ::Item; +pub type BinnedBytesForHttpStreamFrame = ::Item; pub struct BinnedBytesForHttpStream { inp: S, diff --git a/disk/src/binned/scalar.rs b/disk/src/binned/scalar.rs new file mode 100644 index 0000000..afd2ec0 --- /dev/null +++ b/disk/src/binned/scalar.rs @@ -0,0 +1,99 @@ +use crate::agg::binnedt::IntoBinnedT; +use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatchStreamItem; +use crate::agg::streams::{StatsItem, StreamItem}; +use crate::binned::{BinnedScalarStreamItem, BinnedStreamRes}; +use crate::binnedstream::{BinnedScalarStreamFromPreBinnedPatches, BinnedStream}; +use crate::cache::{BinnedQuery, MergedFromRemotes}; +use crate::raw::EventsQuery; +use err::Error; +use futures_util::StreamExt; +use netpod::log::*; +use netpod::{BinnedRange, NodeConfigCached, PerfOpts, PreBinnedPatchIterator, PreBinnedPatchRange}; + +pub fn adapter_to_stream_item( + k: Result, +) -> Result, Error> { + match k { + Ok(k) => match k { + MinMaxAvgScalarBinBatchStreamItem::Log(item) => Ok(StreamItem::Log(item)), + MinMaxAvgScalarBinBatchStreamItem::EventDataReadStats(item) => { + Ok(StreamItem::Stats(StatsItem::EventDataReadStats(item))) + } + MinMaxAvgScalarBinBatchStreamItem::RangeComplete => { + Ok(StreamItem::DataItem(BinnedScalarStreamItem::RangeComplete)) + } + MinMaxAvgScalarBinBatchStreamItem::Values(item) => { + Ok(StreamItem::DataItem(BinnedScalarStreamItem::Values(item))) + } + }, + Err(e) => Err(e), + } +} + +pub async fn binned_scalar_stream( + node_config: &NodeConfigCached, + query: &BinnedQuery, +) -> Result, Error>>, Error> { + if query.channel().backend != node_config.node.backend { + let err = Error::with_msg(format!( + "backend mismatch node: {} requested: {}", + node_config.node.backend, + query.channel().backend + )); + return Err(err); + } + let range = BinnedRange::covering_range(query.range().clone(), query.bin_count())?.ok_or(Error::with_msg( + format!("binned_bytes_for_http BinnedRange::covering_range returned None"), + ))?; + let perf_opts = PerfOpts { inmem_bufcap: 512 }; + //let _shape = entry.to_shape()?; + match PreBinnedPatchRange::covering_range(query.range().clone(), query.bin_count()) { + Ok(Some(pre_range)) => { + info!("binned_bytes_for_http found pre_range: {:?}", pre_range); + if range.grid_spec.bin_t_len() < pre_range.grid_spec.bin_t_len() { + let msg = format!( + "binned_bytes_for_http incompatible ranges:\npre_range: {:?}\nrange: {:?}", + pre_range, range + ); + return Err(Error::with_msg(msg)); + } + let s1 = BinnedScalarStreamFromPreBinnedPatches::new( + PreBinnedPatchIterator::from_range(pre_range), + query.channel().clone(), + range.clone(), + query.agg_kind().clone(), + query.cache_usage().clone(), + node_config, + query.disk_stats_every().clone(), + )?; + let s = BinnedStream::new(Box::pin(s1))?; + let ret = BinnedStreamRes { + binned_stream: s, + range, + }; + Ok(ret) + } + Ok(None) => { + info!( + "binned_bytes_for_http no covering range for prebinned, merge from remotes instead {:?}", + range + ); + let evq = EventsQuery { + channel: query.channel().clone(), + range: query.range().clone(), + agg_kind: query.agg_kind().clone(), + }; + // TODO do I need to set up more transformations or binning to deliver the requested data? + let s = MergedFromRemotes::new(evq, perf_opts, node_config.node_config.cluster.clone()); + let s = s.into_binned_t(range.clone()); + let s = s.map(adapter_to_stream_item); + let s = BinnedStream::new(Box::pin(s))?; + let ret = BinnedStreamRes { + binned_stream: s, + range, + }; + Ok(ret) + } + Err(e) => Err(e), + } +} diff --git a/disk/src/binnedstream.rs b/disk/src/binnedstream.rs index cfecc43..eaf0c70 100644 --- a/disk/src/binnedstream.rs +++ b/disk/src/binnedstream.rs @@ -1,21 +1,24 @@ +use crate::agg::binnedt::IntoBinnedT; use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatchStreamItem; +use crate::agg::streams::{StatsItem, StreamItem}; +use crate::binned::scalar::adapter_to_stream_item; +use crate::binned::BinnedScalarStreamItem; use crate::cache::pbvfs::{PreBinnedItem, PreBinnedValueFetchedStream}; use crate::cache::{CacheUsage, PreBinnedQuery}; use err::Error; use futures_core::Stream; use futures_util::StreamExt; -#[allow(unused_imports)] use netpod::log::*; use netpod::{AggKind, BinnedRange, ByteSize, Channel, NodeConfigCached, PreBinnedPatchIterator}; use std::future::ready; use std::pin::Pin; use std::task::{Context, Poll}; -pub struct BinnedStreamFromPreBinnedPatches { - inp: Pin> + Send>>, +pub struct BinnedScalarStreamFromPreBinnedPatches { + inp: Pin, Error>> + Send>>, } -impl BinnedStreamFromPreBinnedPatches { +impl BinnedScalarStreamFromPreBinnedPatches { pub fn new( patch_it: PreBinnedPatchIterator, channel: Channel, @@ -27,12 +30,14 @@ impl BinnedStreamFromPreBinnedPatches { ) -> Result { let patches: Vec<_> = patch_it.collect(); let mut sp = String::new(); - for (i, p) in patches.iter().enumerate() { - use std::fmt::Write; - write!(sp, " • patch {:2} {:?}\n", i, p)?; + if false { + // Convert this to a StreamLog message: + for (i, p) in patches.iter().enumerate() { + use std::fmt::Write; + write!(sp, " • patch {:2} {:?}\n", i, p)?; + } + info!("BinnedStream::new\n{}", sp); } - info!("BinnedStream::new\n{}", sp); - use super::agg::binnedt::IntoBinnedT; let inp = futures_util::stream::iter(patches.into_iter()) .map({ let node_config = node_config.clone(); @@ -67,31 +72,44 @@ impl BinnedStreamFromPreBinnedPatches { Fits::Inside | Fits::PartlyGreater | Fits::PartlyLower - | Fits::PartlyLowerAndGreater => Some(Ok(MinMaxAvgScalarBinBatchStreamItem::Values(k))), + | Fits::PartlyLowerAndGreater => { + Some(Ok(StreamItem::DataItem(BinnedScalarStreamItem::Values(k)))) + } _ => None, } } - Ok(PreBinnedItem::RangeComplete) => Some(Ok(MinMaxAvgScalarBinBatchStreamItem::RangeComplete)), - Ok(PreBinnedItem::EventDataReadStats(stats)) => { - Some(Ok(MinMaxAvgScalarBinBatchStreamItem::EventDataReadStats(stats))) + Ok(PreBinnedItem::RangeComplete) => { + Some(Ok(StreamItem::DataItem(BinnedScalarStreamItem::RangeComplete))) } - Ok(PreBinnedItem::Log(item)) => Some(Ok(MinMaxAvgScalarBinBatchStreamItem::Log(item))), + Ok(PreBinnedItem::EventDataReadStats(item)) => { + Some(Ok(StreamItem::Stats(StatsItem::EventDataReadStats(item)))) + } + Ok(PreBinnedItem::Log(item)) => Some(Ok(StreamItem::Log(item))), Err(e) => Some(Err(e)), }; ready(g) } - }) - .into_binned_t(range); + }); + //let inp: Box, Error>> + Send + Unpin> = + // Box::new(inp); + //let inp: &Stream, Error>> + Send + Unpin>> = &inp + //() == inp; + let inp = IntoBinnedT::into_binned_t(inp, range); Ok(Self { inp: Box::pin(inp) }) + //err::todoval() } } -impl Stream for BinnedStreamFromPreBinnedPatches { - // TODO make this generic over all possible things - type Item = Result; +impl Stream for BinnedScalarStreamFromPreBinnedPatches { + type Item = Result, Error>; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - self.inp.poll_next_unpin(cx) + use Poll::*; + match self.inp.poll_next_unpin(cx) { + Ready(Some(item)) => Ready(Some(item)), + Ready(None) => Ready(None), + Pending => Pending, + } } } @@ -100,14 +118,12 @@ pub struct BinnedStream { } impl BinnedStream { - // Item was: Result pub fn new(inp: Pin + Send>>) -> Result { Ok(Self { inp }) } } impl Stream for BinnedStream { - //type Item = Result; type Item = I; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { diff --git a/disk/src/frame/makeframe.rs b/disk/src/frame/makeframe.rs index 318b4c2..e222bae 100644 --- a/disk/src/frame/makeframe.rs +++ b/disk/src/frame/makeframe.rs @@ -1,5 +1,6 @@ +use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatchStreamItem; use crate::agg::streams::StreamItem; -use crate::binned::{BinnedBytesForHttpStreamFrame, BinnedScalarStreamItem}; +use crate::binned::BinnedScalarStreamItem; use crate::cache::pbvfs::PreBinnedItem; use crate::frame::inmem::InMemoryFrame; use crate::raw::conn::RawConnOut; @@ -16,11 +17,6 @@ pub trait FrameType { const FRAME_TYPE_ID: u32; } -// TODO replaced by Result, Error> -impl FrameType for BinnedBytesForHttpStreamFrame { - const FRAME_TYPE_ID: u32 = 0x02; -} - impl FrameType for EventQueryJsonStringFrame { const FRAME_TYPE_ID: u32 = 0x03; } @@ -37,6 +33,10 @@ impl FrameType for Result, Error> { const FRAME_TYPE_ID: u32 = 0x06; } +impl FrameType for Result { + const FRAME_TYPE_ID: u32 = 0x07; +} + pub fn make_frame(item: &FT) -> Result where FT: FrameType + Serialize, diff --git a/retrieval/src/test.rs b/retrieval/src/test.rs index e2dddd8..460ceb8 100644 --- a/retrieval/src/test.rs +++ b/retrieval/src/test.rs @@ -1,7 +1,7 @@ use crate::spawn_test_hosts; use bytes::BytesMut; use chrono::{DateTime, Utc}; -use disk::agg::scalarbinbatch::MinMaxAvgScalarBinBatchStreamItem; +use disk::agg::streams::StreamItem; use disk::frame::inmem::InMemoryFrameAsyncReadStream; use disk::streamlog::Streamlog; use err::Error; @@ -157,9 +157,9 @@ where match bincode::deserialize::(frame.buf()) { Ok(item) => match item { Ok(item) => match item { - MinMaxAvgScalarBinBatchStreamItem::Log(item) => { + StreamItem::Log(item) => { Streamlog::emit(&item); - Some(Ok(MinMaxAvgScalarBinBatchStreamItem::Log(item))) + Some(Ok(StreamItem::Log(item))) } item => { info!("TEST GOT ITEM {:?}", item); @@ -183,13 +183,15 @@ where }) .fold(Ok(BinnedResponse::new()), |a, k| { let g = match a { - Ok(mut a) => match k { - Ok(MinMaxAvgScalarBinBatchStreamItem::Values(k)) => { - a.bin_count += k.ts1s.len(); + Ok(a) => match k { + Ok(StreamItem::DataItem(_item)) => { + // TODO extract bin count from item + //a.bin_count += k.ts1s.len(); Ok(a) } - Ok(MinMaxAvgScalarBinBatchStreamItem::EventDataReadStats(stats)) => { - a.bytes_read += stats.parsed_bytes; + Ok(StreamItem::Stats(_item)) => { + // TODO adapt to new Stats type: + //a.bytes_read += stats.parsed_bytes; Ok(a) } Ok(_) => Ok(a),