WIP on StreamItem unified top-level type, at least it compiles

This commit is contained in:
Dominik Werder
2021-05-20 15:39:24 +02:00
parent 72006fbf17
commit 49af7ce561
7 changed files with 394 additions and 139 deletions
+8 -12
View File
@@ -38,13 +38,14 @@ pub trait IntoBinnedT {
fn into_binned_t(self, spec: BinnedRange) -> Self::StreamOut; fn into_binned_t(self, spec: BinnedRange) -> Self::StreamOut;
} }
impl<T, I> IntoBinnedT for T impl<S, I> IntoBinnedT for S
where where
S: Stream<Item = Result<I, Error>> + Unpin,
I: AggregatableTdim + Unpin, I: AggregatableTdim + Unpin,
T: Stream<Item = Result<I, Error>> + Unpin, //I: AggregatableTdim,
I::Aggregator: Unpin, I::Aggregator: Unpin,
{ {
type StreamOut = IntoBinnedTDefaultStream<T, I>; type StreamOut = IntoBinnedTDefaultStream<S, I>;
fn into_binned_t(self, spec: BinnedRange) -> Self::StreamOut { fn into_binned_t(self, spec: BinnedRange) -> Self::StreamOut {
IntoBinnedTDefaultStream::new(self, spec) IntoBinnedTDefaultStream::new(self, spec)
@@ -53,8 +54,8 @@ where
pub struct IntoBinnedTDefaultStream<S, I> pub struct IntoBinnedTDefaultStream<S, I>
where where
I: AggregatableTdim,
S: Stream<Item = Result<I, Error>>, S: Stream<Item = Result<I, Error>>,
I: AggregatableTdim,
{ {
inp: S, inp: S,
aggtor: Option<I::Aggregator>, aggtor: Option<I::Aggregator>,
@@ -72,8 +73,8 @@ where
impl<S, I> IntoBinnedTDefaultStream<S, I> impl<S, I> IntoBinnedTDefaultStream<S, I>
where where
I: AggregatableTdim,
S: Stream<Item = Result<I, Error>> + Unpin, S: Stream<Item = Result<I, Error>> + Unpin,
I: AggregatableTdim,
{ {
pub fn new(inp: S, spec: BinnedRange) -> Self { pub fn new(inp: S, spec: BinnedRange) -> Self {
let range = spec.get_range(0); let range = spec.get_range(0);
@@ -200,20 +201,15 @@ where
impl<S, I> Stream for IntoBinnedTDefaultStream<S, I> impl<S, I> Stream for IntoBinnedTDefaultStream<S, I>
where where
I: AggregatableTdim + Unpin,
S: Stream<Item = Result<I, Error>> + Unpin, S: Stream<Item = Result<I, Error>> + Unpin,
//I: AggregatableTdim,
I: AggregatableTdim + Unpin,
I::Aggregator: Unpin, I::Aggregator: Unpin,
{ {
type Item = Result<<I::Aggregator as AggregatorTdim>::OutputValue, Error>; type Item = Result<<I::Aggregator as AggregatorTdim>::OutputValue, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*; use Poll::*;
/*
Reconsider structure here:
I want to exhaust the input stream until it gives Ready(None) because there can be more Status or other new events.
The first time that I recognize that the requested data range is complete, I can set a flag.
After that, I can dismiss incoming data events.
*/
'outer: loop { 'outer: loop {
break if self.completed { break if self.completed {
panic!("IntoBinnedTDefaultStream poll_next on completed"); panic!("IntoBinnedTDefaultStream poll_next on completed");
+144
View File
@@ -1,3 +1,5 @@
use crate::agg::binnedt::{AggregatableTdim, AggregatorTdim};
use crate::agg::AggregatableXdim1Bin;
use crate::streamlog::LogItem; use crate::streamlog::LogItem;
use err::Error; use err::Error;
use netpod::EventDataReadStats; use netpod::EventDataReadStats;
@@ -33,3 +35,145 @@ pub trait ToJsonResult {
type Output; type Output;
fn to_json_result(&self) -> Result<Self::Output, Error>; fn to_json_result(&self) -> Result<Self::Output, Error>;
} }
impl<T> AggregatableXdim1Bin for StreamItem<T>
where
// TODO bound on the Output ???
//T: AggregatableTdim + AggregatableXdim1Bin<Output = T>,
T: AggregatableTdim + AggregatableXdim1Bin,
{
type Output = StreamItem<<T as AggregatableXdim1Bin>::Output>;
fn into_agg(self) -> Self::Output {
// TODO how to handle the type mismatch?
/*match self {
Self::Log(item) => Self::Log(item),
Self::Stats(item) => Self::Stats(item),
Self::DataItem(item) => Self::DataItem(item.into_agg()),
}*/
err::todoval()
}
}
pub struct StreamItemAggregator<T>
where
T: AggregatableTdim,
{
inner_agg: <T as AggregatableTdim>::Aggregator,
_mark: std::marker::PhantomData<T>,
}
impl<T> StreamItemAggregator<T>
where
T: AggregatableTdim,
{
pub fn new(ts1: u64, ts2: u64) -> Self {
Self {
inner_agg: <T as AggregatableTdim>::aggregator_new_static(ts1, ts2),
_mark: std::marker::PhantomData::default(),
}
}
}
impl<T> AggregatorTdim for StreamItemAggregator<T>
where
T: AggregatableTdim,
{
type InputValue = StreamItem<T>;
type OutputValue = StreamItem<<<T as AggregatableTdim>::Aggregator as AggregatorTdim>::OutputValue>;
fn ends_before(&self, inp: &Self::InputValue) -> bool {
todo!()
}
fn ends_after(&self, inp: &Self::InputValue) -> bool {
todo!()
}
fn starts_after(&self, inp: &Self::InputValue) -> bool {
todo!()
}
fn ingest(&mut self, inp: &mut Self::InputValue) {
todo!()
}
fn result(self) -> Vec<Self::OutputValue> {
todo!()
}
}
impl<T> AggregatableTdim for StreamItem<T>
where
T: AggregatableTdim,
{
type Output = StreamItem<<StreamItemAggregator<T> as AggregatorTdim>::OutputValue>;
type Aggregator = StreamItemAggregator<T>;
fn aggregator_new_static(ts1: u64, ts2: u64) -> Self::Aggregator {
Self::Aggregator::new(ts1, ts2)
}
fn is_range_complete(&self) -> bool {
match self {
Self::DataItem(item) => item.is_range_complete(),
Self::Log(_) => false,
Self::Stats(_) => false,
}
}
// TODO refactor: is this necessary to have on the trait?
fn make_range_complete_item() -> Option<Self> {
match <T as AggregatableTdim>::make_range_complete_item() {
Some(k) => Some(Self::DataItem(k)),
None => None,
}
}
// TODO refactor: the point of having the StreamItem is that this function is no longer necessary:
fn is_log_item(&self) -> bool {
if let Self::Log(_) = self {
true
} else {
false
}
}
// TODO should be able to remove this from trait:
fn log_item(self) -> Option<LogItem> {
if let Self::Log(item) = self {
Some(item)
} else {
None
}
}
// TODO should be able to remove this from trait:
fn make_log_item(item: LogItem) -> Option<Self> {
Some(Self::Log(item))
}
// TODO should be able to remove this from trait:
fn is_stats_item(&self) -> bool {
if let Self::Stats(_) = self {
true
} else {
false
}
}
// TODO should be able to remove this from trait:
fn stats_item(self) -> Option<EventDataReadStats> {
if let Self::Stats(_item) = self {
// TODO this whole function should no longer be needed.
Some(err::todoval())
} else {
None
}
}
// TODO should be able to remove this from trait:
fn make_stats_item(item: EventDataReadStats) -> Option<Self> {
Some(Self::Stats(StatsItem::EventDataReadStats(item)))
}
}
+89 -91
View File
@@ -1,24 +1,28 @@
use crate::agg::binnedt::IntoBinnedT; use crate::agg::binnedt::{AggregatableTdim, AggregatorTdim};
use crate::agg::scalarbinbatch::{MinMaxAvgScalarBinBatch, MinMaxAvgScalarBinBatchStreamItem}; use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch;
use crate::agg::streams::{Collectable, Collected, StatsItem, StreamItem, ToJsonResult}; use crate::agg::streams::{Collectable, Collected, StreamItem, ToJsonResult};
use crate::binnedstream::{BinnedStream, BinnedStreamFromPreBinnedPatches}; use crate::agg::AggregatableXdim1Bin;
use crate::cache::{BinnedQuery, MergedFromRemotes}; use crate::binned::scalar::binned_scalar_stream;
use crate::binnedstream::{BinnedScalarStreamFromPreBinnedPatches, BinnedStream};
use crate::cache::BinnedQuery;
use crate::channelconfig::{extract_matching_config_entry, read_local_config}; use crate::channelconfig::{extract_matching_config_entry, read_local_config};
use crate::frame::makeframe::make_frame; use crate::frame::makeframe::make_frame;
use crate::raw::EventsQuery; use crate::streamlog::LogItem;
use bytes::Bytes; use bytes::Bytes;
use chrono::{TimeZone, Utc}; use chrono::{TimeZone, Utc};
use err::Error; use err::Error;
use futures_core::Stream; use futures_core::Stream;
use futures_util::StreamExt; use futures_util::StreamExt;
use netpod::log::*; use netpod::log::*;
use netpod::{AggKind, BinnedRange, NodeConfigCached, PerfOpts, PreBinnedPatchIterator, PreBinnedPatchRange}; use netpod::{AggKind, BinnedRange, EventDataReadStats, NodeConfigCached};
use num_traits::Zero; use num_traits::Zero;
use serde::{Deserialize, Serialize, Serializer}; use serde::{Deserialize, Serialize, Serializer};
use std::pin::Pin; use std::pin::Pin;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use std::time::Duration; use std::time::Duration;
pub mod scalar;
pub struct BinnedStreamRes<I> { pub struct BinnedStreamRes<I> {
pub binned_stream: BinnedStream<I>, pub binned_stream: BinnedStream<I>,
pub range: BinnedRange, pub range: BinnedRange,
@@ -133,92 +137,82 @@ impl MakeBytesFrame for Result<StreamItem<BinnedScalarStreamItem>, Error> {
} }
} }
fn adapter_to_stream_item( impl AggregatableXdim1Bin for BinnedScalarStreamItem {
k: Result<MinMaxAvgScalarBinBatchStreamItem, Error>, // TODO does this already include all cases?
) -> Result<StreamItem<BinnedScalarStreamItem>, Error> { type Output = BinnedScalarStreamItem;
match k {
Ok(k) => match k { fn into_agg(self) -> Self::Output {
MinMaxAvgScalarBinBatchStreamItem::Log(item) => Ok(StreamItem::Log(item)), todo!()
MinMaxAvgScalarBinBatchStreamItem::EventDataReadStats(item) => {
Ok(StreamItem::Stats(StatsItem::EventDataReadStats(item)))
}
MinMaxAvgScalarBinBatchStreamItem::RangeComplete => {
Ok(StreamItem::DataItem(BinnedScalarStreamItem::RangeComplete))
}
MinMaxAvgScalarBinBatchStreamItem::Values(item) => {
Ok(StreamItem::DataItem(BinnedScalarStreamItem::Values(item)))
}
},
Err(e) => Err(e),
} }
} }
pub async fn binned_scalar_stream( pub struct BinnedScalarStreamItemAggregator {}
node_config: &NodeConfigCached,
query: &BinnedQuery, impl AggregatorTdim for BinnedScalarStreamItemAggregator {
) -> Result<BinnedStreamRes<Result<StreamItem<BinnedScalarStreamItem>, Error>>, Error> { type InputValue = BinnedScalarStreamItem;
if query.channel().backend != node_config.node.backend { // TODO using the same type for the output, does this cover all cases?
let err = Error::with_msg(format!( type OutputValue = BinnedScalarStreamItem;
"backend mismatch node: {} requested: {}",
node_config.node.backend, fn ends_before(&self, inp: &Self::InputValue) -> bool {
query.channel().backend todo!()
));
return Err(err);
} }
let range = BinnedRange::covering_range(query.range().clone(), query.bin_count())?.ok_or(Error::with_msg(
format!("binned_bytes_for_http BinnedRange::covering_range returned None"), fn ends_after(&self, inp: &Self::InputValue) -> bool {
))?; todo!()
let perf_opts = PerfOpts { inmem_bufcap: 512 }; }
//let _shape = entry.to_shape()?;
match PreBinnedPatchRange::covering_range(query.range().clone(), query.bin_count()) { fn starts_after(&self, inp: &Self::InputValue) -> bool {
Ok(Some(pre_range)) => { todo!()
info!("binned_bytes_for_http found pre_range: {:?}", pre_range); }
if range.grid_spec.bin_t_len() < pre_range.grid_spec.bin_t_len() {
let msg = format!( fn ingest(&mut self, inp: &mut Self::InputValue) {
"binned_bytes_for_http incompatible ranges:\npre_range: {:?}\nrange: {:?}", todo!()
pre_range, range }
);
return Err(Error::with_msg(msg)); fn result(self) -> Vec<Self::OutputValue> {
} todo!()
let s1 = BinnedStreamFromPreBinnedPatches::new( }
PreBinnedPatchIterator::from_range(pre_range), }
query.channel().clone(),
range.clone(), impl AggregatableTdim for BinnedScalarStreamItem {
query.agg_kind().clone(), type Aggregator = BinnedScalarStreamItemAggregator;
query.cache_usage().clone(), // TODO isn't this already defined in terms of the Aggregator?
node_config, type Output = BinnedScalarStreamItem;
query.disk_stats_every().clone(),
)? fn aggregator_new_static(ts1: u64, ts2: u64) -> Self::Aggregator {
.map(adapter_to_stream_item); todo!()
let s = BinnedStream::new(Box::pin(s1))?; }
let ret = BinnedStreamRes {
binned_stream: s, fn is_range_complete(&self) -> bool {
range, todo!()
}; }
Ok(ret)
} fn make_range_complete_item() -> Option<Self> {
Ok(None) => { todo!()
info!( }
"binned_bytes_for_http no covering range for prebinned, merge from remotes instead {:?}",
range fn is_log_item(&self) -> bool {
); todo!()
let evq = EventsQuery { }
channel: query.channel().clone(),
range: query.range().clone(), fn log_item(self) -> Option<LogItem> {
agg_kind: query.agg_kind().clone(), todo!()
}; }
// TODO do I need to set up more transformations or binning to deliver the requested data?
let s = MergedFromRemotes::new(evq, perf_opts, node_config.node_config.cluster.clone()); fn make_log_item(item: LogItem) -> Option<Self> {
let s = s.into_binned_t(range.clone()); todo!()
let s = s.map(adapter_to_stream_item); }
let s = BinnedStream::new(Box::pin(s))?;
let ret = BinnedStreamRes { fn is_stats_item(&self) -> bool {
binned_stream: s, todo!()
range, }
};
Ok(ret) fn stats_item(self) -> Option<EventDataReadStats> {
} todo!()
Err(e) => Err(e), }
fn make_stats_item(item: EventDataReadStats) -> Option<Self> {
todo!()
} }
} }
@@ -237,12 +231,16 @@ pub async fn binned_bytes_for_http(
let ret = BinnedBytesForHttpStream::new(res.binned_stream); let ret = BinnedBytesForHttpStream::new(res.binned_stream);
Ok(Box::pin(ret)) Ok(Box::pin(ret))
} }
AggKind::DimXBinsN(_) => err::todoval(), AggKind::DimXBinsN(_) => {
let res = binned_scalar_stream(node_config, query).await?;
let ret = BinnedBytesForHttpStream::new(res.binned_stream);
Ok(Box::pin(ret))
}
} }
} }
// TODO remove this when no longer used, gets replaced by Result<StreamItem<BinnedStreamItem>, Error> // TODO remove this when no longer used, gets replaced by Result<StreamItem<BinnedStreamItem>, Error>
pub type BinnedBytesForHttpStreamFrame = <BinnedStreamFromPreBinnedPatches as Stream>::Item; pub type BinnedBytesForHttpStreamFrame = <BinnedScalarStreamFromPreBinnedPatches as Stream>::Item;
pub struct BinnedBytesForHttpStream<S> { pub struct BinnedBytesForHttpStream<S> {
inp: S, inp: S,
+99
View File
@@ -0,0 +1,99 @@
use crate::agg::binnedt::IntoBinnedT;
use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatchStreamItem;
use crate::agg::streams::{StatsItem, StreamItem};
use crate::binned::{BinnedScalarStreamItem, BinnedStreamRes};
use crate::binnedstream::{BinnedScalarStreamFromPreBinnedPatches, BinnedStream};
use crate::cache::{BinnedQuery, MergedFromRemotes};
use crate::raw::EventsQuery;
use err::Error;
use futures_util::StreamExt;
use netpod::log::*;
use netpod::{BinnedRange, NodeConfigCached, PerfOpts, PreBinnedPatchIterator, PreBinnedPatchRange};
pub fn adapter_to_stream_item(
k: Result<MinMaxAvgScalarBinBatchStreamItem, Error>,
) -> Result<StreamItem<BinnedScalarStreamItem>, Error> {
match k {
Ok(k) => match k {
MinMaxAvgScalarBinBatchStreamItem::Log(item) => Ok(StreamItem::Log(item)),
MinMaxAvgScalarBinBatchStreamItem::EventDataReadStats(item) => {
Ok(StreamItem::Stats(StatsItem::EventDataReadStats(item)))
}
MinMaxAvgScalarBinBatchStreamItem::RangeComplete => {
Ok(StreamItem::DataItem(BinnedScalarStreamItem::RangeComplete))
}
MinMaxAvgScalarBinBatchStreamItem::Values(item) => {
Ok(StreamItem::DataItem(BinnedScalarStreamItem::Values(item)))
}
},
Err(e) => Err(e),
}
}
pub async fn binned_scalar_stream(
node_config: &NodeConfigCached,
query: &BinnedQuery,
) -> Result<BinnedStreamRes<Result<StreamItem<BinnedScalarStreamItem>, Error>>, Error> {
if query.channel().backend != node_config.node.backend {
let err = Error::with_msg(format!(
"backend mismatch node: {} requested: {}",
node_config.node.backend,
query.channel().backend
));
return Err(err);
}
let range = BinnedRange::covering_range(query.range().clone(), query.bin_count())?.ok_or(Error::with_msg(
format!("binned_bytes_for_http BinnedRange::covering_range returned None"),
))?;
let perf_opts = PerfOpts { inmem_bufcap: 512 };
//let _shape = entry.to_shape()?;
match PreBinnedPatchRange::covering_range(query.range().clone(), query.bin_count()) {
Ok(Some(pre_range)) => {
info!("binned_bytes_for_http found pre_range: {:?}", pre_range);
if range.grid_spec.bin_t_len() < pre_range.grid_spec.bin_t_len() {
let msg = format!(
"binned_bytes_for_http incompatible ranges:\npre_range: {:?}\nrange: {:?}",
pre_range, range
);
return Err(Error::with_msg(msg));
}
let s1 = BinnedScalarStreamFromPreBinnedPatches::new(
PreBinnedPatchIterator::from_range(pre_range),
query.channel().clone(),
range.clone(),
query.agg_kind().clone(),
query.cache_usage().clone(),
node_config,
query.disk_stats_every().clone(),
)?;
let s = BinnedStream::new(Box::pin(s1))?;
let ret = BinnedStreamRes {
binned_stream: s,
range,
};
Ok(ret)
}
Ok(None) => {
info!(
"binned_bytes_for_http no covering range for prebinned, merge from remotes instead {:?}",
range
);
let evq = EventsQuery {
channel: query.channel().clone(),
range: query.range().clone(),
agg_kind: query.agg_kind().clone(),
};
// TODO do I need to set up more transformations or binning to deliver the requested data?
let s = MergedFromRemotes::new(evq, perf_opts, node_config.node_config.cluster.clone());
let s = s.into_binned_t(range.clone());
let s = s.map(adapter_to_stream_item);
let s = BinnedStream::new(Box::pin(s))?;
let ret = BinnedStreamRes {
binned_stream: s,
range,
};
Ok(ret)
}
Err(e) => Err(e),
}
}
+38 -22
View File
@@ -1,21 +1,24 @@
use crate::agg::binnedt::IntoBinnedT;
use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatchStreamItem; use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatchStreamItem;
use crate::agg::streams::{StatsItem, StreamItem};
use crate::binned::scalar::adapter_to_stream_item;
use crate::binned::BinnedScalarStreamItem;
use crate::cache::pbvfs::{PreBinnedItem, PreBinnedValueFetchedStream}; use crate::cache::pbvfs::{PreBinnedItem, PreBinnedValueFetchedStream};
use crate::cache::{CacheUsage, PreBinnedQuery}; use crate::cache::{CacheUsage, PreBinnedQuery};
use err::Error; use err::Error;
use futures_core::Stream; use futures_core::Stream;
use futures_util::StreamExt; use futures_util::StreamExt;
#[allow(unused_imports)]
use netpod::log::*; use netpod::log::*;
use netpod::{AggKind, BinnedRange, ByteSize, Channel, NodeConfigCached, PreBinnedPatchIterator}; use netpod::{AggKind, BinnedRange, ByteSize, Channel, NodeConfigCached, PreBinnedPatchIterator};
use std::future::ready; use std::future::ready;
use std::pin::Pin; use std::pin::Pin;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
pub struct BinnedStreamFromPreBinnedPatches { pub struct BinnedScalarStreamFromPreBinnedPatches {
inp: Pin<Box<dyn Stream<Item = Result<MinMaxAvgScalarBinBatchStreamItem, Error>> + Send>>, inp: Pin<Box<dyn Stream<Item = Result<StreamItem<BinnedScalarStreamItem>, Error>> + Send>>,
} }
impl BinnedStreamFromPreBinnedPatches { impl BinnedScalarStreamFromPreBinnedPatches {
pub fn new( pub fn new(
patch_it: PreBinnedPatchIterator, patch_it: PreBinnedPatchIterator,
channel: Channel, channel: Channel,
@@ -27,12 +30,14 @@ impl BinnedStreamFromPreBinnedPatches {
) -> Result<Self, Error> { ) -> Result<Self, Error> {
let patches: Vec<_> = patch_it.collect(); let patches: Vec<_> = patch_it.collect();
let mut sp = String::new(); let mut sp = String::new();
for (i, p) in patches.iter().enumerate() { if false {
use std::fmt::Write; // Convert this to a StreamLog message:
write!(sp, " • patch {:2} {:?}\n", i, p)?; for (i, p) in patches.iter().enumerate() {
use std::fmt::Write;
write!(sp, " • patch {:2} {:?}\n", i, p)?;
}
info!("BinnedStream::new\n{}", sp);
} }
info!("BinnedStream::new\n{}", sp);
use super::agg::binnedt::IntoBinnedT;
let inp = futures_util::stream::iter(patches.into_iter()) let inp = futures_util::stream::iter(patches.into_iter())
.map({ .map({
let node_config = node_config.clone(); let node_config = node_config.clone();
@@ -67,31 +72,44 @@ impl BinnedStreamFromPreBinnedPatches {
Fits::Inside Fits::Inside
| Fits::PartlyGreater | Fits::PartlyGreater
| Fits::PartlyLower | Fits::PartlyLower
| Fits::PartlyLowerAndGreater => Some(Ok(MinMaxAvgScalarBinBatchStreamItem::Values(k))), | Fits::PartlyLowerAndGreater => {
Some(Ok(StreamItem::DataItem(BinnedScalarStreamItem::Values(k))))
}
_ => None, _ => None,
} }
} }
Ok(PreBinnedItem::RangeComplete) => Some(Ok(MinMaxAvgScalarBinBatchStreamItem::RangeComplete)), Ok(PreBinnedItem::RangeComplete) => {
Ok(PreBinnedItem::EventDataReadStats(stats)) => { Some(Ok(StreamItem::DataItem(BinnedScalarStreamItem::RangeComplete)))
Some(Ok(MinMaxAvgScalarBinBatchStreamItem::EventDataReadStats(stats)))
} }
Ok(PreBinnedItem::Log(item)) => Some(Ok(MinMaxAvgScalarBinBatchStreamItem::Log(item))), Ok(PreBinnedItem::EventDataReadStats(item)) => {
Some(Ok(StreamItem::Stats(StatsItem::EventDataReadStats(item))))
}
Ok(PreBinnedItem::Log(item)) => Some(Ok(StreamItem::Log(item))),
Err(e) => Some(Err(e)), Err(e) => Some(Err(e)),
}; };
ready(g) ready(g)
} }
}) });
.into_binned_t(range); //let inp: Box<dyn Stream<Item = Result<StreamItem<BinnedScalarStreamItem>, Error>> + Send + Unpin> =
// Box::new(inp);
//let inp: &Stream<Item = Result<StreamItem<BinnedScalarStreamItem>, Error>> + Send + Unpin>> = &inp
//() == inp;
let inp = IntoBinnedT::into_binned_t(inp, range);
Ok(Self { inp: Box::pin(inp) }) Ok(Self { inp: Box::pin(inp) })
//err::todoval()
} }
} }
impl Stream for BinnedStreamFromPreBinnedPatches { impl Stream for BinnedScalarStreamFromPreBinnedPatches {
// TODO make this generic over all possible things type Item = Result<StreamItem<BinnedScalarStreamItem>, Error>;
type Item = Result<MinMaxAvgScalarBinBatchStreamItem, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
self.inp.poll_next_unpin(cx) use Poll::*;
match self.inp.poll_next_unpin(cx) {
Ready(Some(item)) => Ready(Some(item)),
Ready(None) => Ready(None),
Pending => Pending,
}
} }
} }
@@ -100,14 +118,12 @@ pub struct BinnedStream<I> {
} }
impl<I> BinnedStream<I> { impl<I> BinnedStream<I> {
// Item was: Result<MinMaxAvgScalarBinBatchStreamItem, Error>
pub fn new(inp: Pin<Box<dyn Stream<Item = I> + Send>>) -> Result<Self, Error> { pub fn new(inp: Pin<Box<dyn Stream<Item = I> + Send>>) -> Result<Self, Error> {
Ok(Self { inp }) Ok(Self { inp })
} }
} }
impl<I> Stream for BinnedStream<I> { impl<I> Stream for BinnedStream<I> {
//type Item = Result<MinMaxAvgScalarBinBatchStreamItem, Error>;
type Item = I; type Item = I;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
+6 -6
View File
@@ -1,5 +1,6 @@
use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatchStreamItem;
use crate::agg::streams::StreamItem; use crate::agg::streams::StreamItem;
use crate::binned::{BinnedBytesForHttpStreamFrame, BinnedScalarStreamItem}; use crate::binned::BinnedScalarStreamItem;
use crate::cache::pbvfs::PreBinnedItem; use crate::cache::pbvfs::PreBinnedItem;
use crate::frame::inmem::InMemoryFrame; use crate::frame::inmem::InMemoryFrame;
use crate::raw::conn::RawConnOut; use crate::raw::conn::RawConnOut;
@@ -16,11 +17,6 @@ pub trait FrameType {
const FRAME_TYPE_ID: u32; const FRAME_TYPE_ID: u32;
} }
// TODO replaced by Result<StreamItem<BinnedStreamItem>, Error>
impl FrameType for BinnedBytesForHttpStreamFrame {
const FRAME_TYPE_ID: u32 = 0x02;
}
impl FrameType for EventQueryJsonStringFrame { impl FrameType for EventQueryJsonStringFrame {
const FRAME_TYPE_ID: u32 = 0x03; const FRAME_TYPE_ID: u32 = 0x03;
} }
@@ -37,6 +33,10 @@ impl FrameType for Result<StreamItem<BinnedScalarStreamItem>, Error> {
const FRAME_TYPE_ID: u32 = 0x06; const FRAME_TYPE_ID: u32 = 0x06;
} }
impl FrameType for Result<MinMaxAvgScalarBinBatchStreamItem, Error> {
const FRAME_TYPE_ID: u32 = 0x07;
}
pub fn make_frame<FT>(item: &FT) -> Result<BytesMut, Error> pub fn make_frame<FT>(item: &FT) -> Result<BytesMut, Error>
where where
FT: FrameType + Serialize, FT: FrameType + Serialize,
+10 -8
View File
@@ -1,7 +1,7 @@
use crate::spawn_test_hosts; use crate::spawn_test_hosts;
use bytes::BytesMut; use bytes::BytesMut;
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
use disk::agg::scalarbinbatch::MinMaxAvgScalarBinBatchStreamItem; use disk::agg::streams::StreamItem;
use disk::frame::inmem::InMemoryFrameAsyncReadStream; use disk::frame::inmem::InMemoryFrameAsyncReadStream;
use disk::streamlog::Streamlog; use disk::streamlog::Streamlog;
use err::Error; use err::Error;
@@ -157,9 +157,9 @@ where
match bincode::deserialize::<ExpectedType>(frame.buf()) { match bincode::deserialize::<ExpectedType>(frame.buf()) {
Ok(item) => match item { Ok(item) => match item {
Ok(item) => match item { Ok(item) => match item {
MinMaxAvgScalarBinBatchStreamItem::Log(item) => { StreamItem::Log(item) => {
Streamlog::emit(&item); Streamlog::emit(&item);
Some(Ok(MinMaxAvgScalarBinBatchStreamItem::Log(item))) Some(Ok(StreamItem::Log(item)))
} }
item => { item => {
info!("TEST GOT ITEM {:?}", item); info!("TEST GOT ITEM {:?}", item);
@@ -183,13 +183,15 @@ where
}) })
.fold(Ok(BinnedResponse::new()), |a, k| { .fold(Ok(BinnedResponse::new()), |a, k| {
let g = match a { let g = match a {
Ok(mut a) => match k { Ok(a) => match k {
Ok(MinMaxAvgScalarBinBatchStreamItem::Values(k)) => { Ok(StreamItem::DataItem(_item)) => {
a.bin_count += k.ts1s.len(); // TODO extract bin count from item
//a.bin_count += k.ts1s.len();
Ok(a) Ok(a)
} }
Ok(MinMaxAvgScalarBinBatchStreamItem::EventDataReadStats(stats)) => { Ok(StreamItem::Stats(_item)) => {
a.bytes_read += stats.parsed_bytes; // TODO adapt to new Stats type:
//a.bytes_read += stats.parsed_bytes;
Ok(a) Ok(a)
} }
Ok(_) => Ok(a), Ok(_) => Ok(a),