From c19c4997bba9d40bae5c085964a372e78d286f23 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Thu, 3 Jun 2021 12:47:50 +0200 Subject: [PATCH] Rename StreamKind --- disk/src/agg.rs | 8 ++--- disk/src/agg/binnedt.rs | 46 ++++++++++++++--------------- disk/src/agg/binnedx.rs | 10 +++---- disk/src/agg/eventbatch.rs | 8 ++--- disk/src/agg/scalarbinbatch.rs | 8 ++--- disk/src/binned.rs | 8 ++--- disk/src/binned/scalar.rs | 4 +-- disk/src/binnedstream.rs | 15 ++++------ disk/src/cache.rs | 20 ++++++------- disk/src/cache/pbv.rs | 54 +++++++++++++++------------------- disk/src/cache/pbvfs.rs | 8 ++--- disk/src/merge.rs | 24 +++++++-------- disk/src/raw.rs | 6 ++-- disk/src/raw/bffr.rs | 17 +++++------ disk/src/raw/conn.rs | 9 ++---- 15 files changed, 113 insertions(+), 132 deletions(-) diff --git a/disk/src/agg.rs b/disk/src/agg.rs index 4e7269c..1bb1bfc 100644 --- a/disk/src/agg.rs +++ b/disk/src/agg.rs @@ -6,7 +6,7 @@ use super::eventchunker::EventFull; use crate::agg::binnedt::AggregatableTdim; use crate::agg::eventbatch::MinMaxAvgScalarEventBatch; use crate::agg::streams::StreamItem; -use crate::binned::{BinnedStreamKind, RangeCompletableItem}; +use crate::binned::{RangeCompletableItem, StreamKind}; use bytes::BytesMut; use err::Error; use futures_core::Stream; @@ -28,7 +28,7 @@ pub mod streams; pub trait AggregatableXdim1Bin where - SK: BinnedStreamKind, + SK: StreamKind, { type Output: AggregatableXdim1Bin + AggregatableTdim; fn into_agg(self) -> Self::Output; @@ -54,7 +54,7 @@ impl std::fmt::Debug for ValuesDim0 { impl AggregatableXdim1Bin for ValuesDim1 where - SK: BinnedStreamKind, + SK: StreamKind, { type Output = MinMaxAvgScalarEventBatch; @@ -151,7 +151,7 @@ impl std::fmt::Debug for ValuesDim1 { impl AggregatableXdim1Bin for ValuesDim0 where - SK: BinnedStreamKind, + SK: StreamKind, { type Output = MinMaxAvgScalarEventBatch; diff --git a/disk/src/agg/binnedt.rs b/disk/src/agg/binnedt.rs index 0e1a28c..c13cd11 100644 --- a/disk/src/agg/binnedt.rs +++ b/disk/src/agg/binnedt.rs @@ -1,5 +1,5 @@ use crate::agg::streams::StreamItem; -use crate::binned::{BinnedStreamKind, RangeCompletableItem}; +use crate::binned::{RangeCompletableItem, StreamKind}; use err::Error; use futures_core::Stream; use futures_util::StreamExt; @@ -11,7 +11,7 @@ use std::task::{Context, Poll}; pub trait AggregatorTdim: Sized + Unpin where - SK: BinnedStreamKind, + SK: StreamKind, { type InputValue; type OutputValue; @@ -24,7 +24,7 @@ where pub trait AggregatableTdim: Sized where - SK: BinnedStreamKind, + SK: StreamKind, { type Aggregator: AggregatorTdim; fn aggregator_new_static(ts1: u64, ts2: u64) -> Self::Aggregator; @@ -32,16 +32,16 @@ where pub trait IntoBinnedT where - SK: BinnedStreamKind, - S: Stream::XBinnedEvents>>, Error>> + Unpin, + SK: StreamKind, + S: Stream::XBinnedEvents>>, Error>> + Unpin, { fn into_binned_t(self, spec: BinnedRange) -> IntoBinnedTDefaultStream; } impl IntoBinnedT for S where - SK: BinnedStreamKind, - S: Stream::XBinnedEvents>>, Error>> + Unpin, + SK: StreamKind, + S: Stream::XBinnedEvents>>, Error>> + Unpin, { fn into_binned_t(self, spec: BinnedRange) -> IntoBinnedTDefaultStream { IntoBinnedTDefaultStream::new(self, spec) @@ -50,36 +50,35 @@ where pub struct IntoBinnedTDefaultStream where - SK: BinnedStreamKind, - S: Stream::XBinnedEvents>>, Error>> + Unpin, + SK: StreamKind, + S: Stream::XBinnedEvents>>, Error>> + Unpin, { inp: S, - aggtor: Option<<::XBinnedEvents as AggregatableTdim>::Aggregator>, + aggtor: Option<<::XBinnedEvents as AggregatableTdim>::Aggregator>, spec: BinnedRange, curbin: u32, inp_completed: bool, all_bins_emitted: bool, range_complete_observed: bool, range_complete_emitted: bool, - left: - Option::XBinnedEvents>>, Error>>>>, + left: Option::XBinnedEvents>>, Error>>>>, errored: bool, completed: bool, - tmp_agg_results: VecDeque<::TBinnedBins>, + tmp_agg_results: VecDeque<::TBinnedBins>, _marker: std::marker::PhantomData, } impl IntoBinnedTDefaultStream where - SK: BinnedStreamKind, - S: Stream::XBinnedEvents>>, Error>> + Unpin, + SK: StreamKind, + S: Stream::XBinnedEvents>>, Error>> + Unpin, { pub fn new(inp: S, spec: BinnedRange) -> Self { let range = spec.get_range(0); Self { inp, aggtor: Some( - <::XBinnedEvents as AggregatableTdim>::aggregator_new_static( + <::XBinnedEvents as AggregatableTdim>::aggregator_new_static( range.beg, range.end, ), ), @@ -100,7 +99,7 @@ where fn cur( &mut self, cx: &mut Context, - ) -> Poll::XBinnedEvents>>, Error>>> { + ) -> Poll::XBinnedEvents>>, Error>>> { if let Some(cur) = self.left.take() { cur } else if self.inp_completed { @@ -117,7 +116,7 @@ where let _ret = self .aggtor .replace( - <::XBinnedEvents as AggregatableTdim>::aggregator_new_static( + <::XBinnedEvents as AggregatableTdim>::aggregator_new_static( range.beg, range.end, ), ) @@ -135,9 +134,8 @@ where fn handle( &mut self, - cur: Poll::XBinnedEvents>>, Error>>>, - ) -> Option::TBinnedBins>>, Error>>>> - { + cur: Poll::XBinnedEvents>>, Error>>>, + ) -> Option::TBinnedBins>>, Error>>>> { use Poll::*; match cur { Ready(Some(Ok(item))) => match item { @@ -205,10 +203,10 @@ where impl Stream for IntoBinnedTDefaultStream where - SK: BinnedStreamKind, - S: Stream::XBinnedEvents>>, Error>> + Unpin, + SK: StreamKind, + S: Stream::XBinnedEvents>>, Error>> + Unpin, { - type Item = Result::TBinnedBins>>, Error>; + type Item = Result::TBinnedBins>>, Error>; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; diff --git a/disk/src/agg/binnedx.rs b/disk/src/agg/binnedx.rs index 0c0bdeb..09e889e 100644 --- a/disk/src/agg/binnedx.rs +++ b/disk/src/agg/binnedx.rs @@ -1,6 +1,6 @@ use crate::agg::streams::StreamItem; use crate::agg::AggregatableXdim1Bin; -use crate::binned::{BinnedStreamKind, RangeCompletableItem}; +use crate::binned::{RangeCompletableItem, StreamKind}; use err::Error; use futures_core::Stream; use futures_util::StreamExt; @@ -9,7 +9,7 @@ use std::task::{Context, Poll}; pub trait IntoBinnedXBins1 where - SK: BinnedStreamKind, + SK: StreamKind, Self: Stream>, Error>> + Unpin, I: AggregatableXdim1Bin, { @@ -19,7 +19,7 @@ where impl IntoBinnedXBins1 for S where - SK: BinnedStreamKind, + SK: StreamKind, S: Stream>, Error>> + Unpin, I: AggregatableXdim1Bin, { @@ -35,7 +35,7 @@ where pub struct IntoBinnedXBins1DefaultStream where - SK: BinnedStreamKind, + SK: StreamKind, S: Stream>, Error>> + Unpin, I: AggregatableXdim1Bin, { @@ -45,7 +45,7 @@ where impl Stream for IntoBinnedXBins1DefaultStream where - SK: BinnedStreamKind, + SK: StreamKind, S: Stream>, Error>> + Unpin, I: AggregatableXdim1Bin, { diff --git a/disk/src/agg/eventbatch.rs b/disk/src/agg/eventbatch.rs index 9afc69a..0e50abf 100644 --- a/disk/src/agg/eventbatch.rs +++ b/disk/src/agg/eventbatch.rs @@ -2,7 +2,7 @@ use crate::agg::binnedt::{AggregatableTdim, AggregatorTdim}; use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch; use crate::agg::streams::{Appendable, StreamItem}; use crate::agg::AggregatableXdim1Bin; -use crate::binned::{BinnedStreamKind, MakeBytesFrame, RangeCompletableItem, RangeOverlapInfo}; +use crate::binned::{MakeBytesFrame, RangeCompletableItem, RangeOverlapInfo, StreamKind}; use crate::frame::makeframe::make_frame; use bytes::{BufMut, Bytes, BytesMut}; use err::Error; @@ -103,7 +103,7 @@ impl std::fmt::Debug for MinMaxAvgScalarEventBatch { impl AggregatableXdim1Bin for MinMaxAvgScalarEventBatch where - SK: BinnedStreamKind, + SK: StreamKind, { type Output = MinMaxAvgScalarEventBatch; fn into_agg(self) -> Self::Output { @@ -113,7 +113,7 @@ where impl AggregatableTdim for MinMaxAvgScalarEventBatch where - SK: BinnedStreamKind, + SK: StreamKind, { //type Output = MinMaxAvgScalarBinBatch; type Aggregator = MinMaxAvgScalarEventBatchAggregator; @@ -174,7 +174,7 @@ impl MinMaxAvgScalarEventBatchAggregator { impl AggregatorTdim for MinMaxAvgScalarEventBatchAggregator where - SK: BinnedStreamKind, + SK: StreamKind, { type InputValue = MinMaxAvgScalarEventBatch; type OutputValue = MinMaxAvgScalarBinBatch; diff --git a/disk/src/agg/scalarbinbatch.rs b/disk/src/agg/scalarbinbatch.rs index 5315094..00363c3 100644 --- a/disk/src/agg/scalarbinbatch.rs +++ b/disk/src/agg/scalarbinbatch.rs @@ -1,7 +1,7 @@ use crate::agg::binnedt::{AggregatableTdim, AggregatorTdim}; use crate::agg::streams::{Appendable, Bins, StreamItem}; use crate::agg::{AggregatableXdim1Bin, Fits, FitsInside}; -use crate::binned::{BinnedStreamKind, MakeBytesFrame, RangeCompletableItem}; +use crate::binned::{MakeBytesFrame, RangeCompletableItem, StreamKind}; use crate::frame::makeframe::make_frame; use bytes::{BufMut, Bytes, BytesMut}; use err::Error; @@ -187,7 +187,7 @@ impl MinMaxAvgScalarBinBatch { impl AggregatableXdim1Bin for MinMaxAvgScalarBinBatch where - SK: BinnedStreamKind, + SK: StreamKind, { type Output = MinMaxAvgScalarBinBatch; fn into_agg(self) -> Self::Output { @@ -197,7 +197,7 @@ where impl AggregatableTdim for MinMaxAvgScalarBinBatch where - SK: BinnedStreamKind, + SK: StreamKind, { //type Output = MinMaxAvgScalarBinBatch; type Aggregator = MinMaxAvgScalarBinBatchAggregator; @@ -239,7 +239,7 @@ impl MinMaxAvgScalarBinBatchAggregator { impl AggregatorTdim for MinMaxAvgScalarBinBatchAggregator where - SK: BinnedStreamKind, + SK: StreamKind, { type InputValue = MinMaxAvgScalarBinBatch; type OutputValue = MinMaxAvgScalarBinBatch; diff --git a/disk/src/binned.rs b/disk/src/binned.rs index 85ce579..8dacc2a 100644 --- a/disk/src/binned.rs +++ b/disk/src/binned.rs @@ -501,7 +501,7 @@ pub trait XBinnedEvents: + PushableIndex + Appendable where - SK: BinnedStreamKind, + SK: StreamKind, { fn frame_type() -> u32; } @@ -524,7 +524,7 @@ pub trait TBinnedBins: impl XBinnedEvents for MinMaxAvgScalarEventBatch where - SK: BinnedStreamKind, + SK: StreamKind, { fn frame_type() -> u32 { >, Error> as FrameType>::FRAME_TYPE_ID @@ -537,7 +537,7 @@ impl TBinnedBins for MinMaxAvgScalarBinBatch { } } -pub trait BinnedStreamKind: Clone + Unpin + Send + Sync + 'static { +pub trait StreamKind: Clone + Unpin + Send + Sync + 'static { type TBinnedStreamType: Stream>, Error>> + Send; type XBinnedEvents: XBinnedEvents; type TBinnedBins: TBinnedBins; @@ -590,7 +590,7 @@ pub enum RangeCompletableItem { Data(T), } -impl BinnedStreamKind for BinnedStreamKindScalar { +impl StreamKind for BinnedStreamKindScalar { // TODO is this really needed? type TBinnedStreamType = BoxedStream>, Error>>; type XBinnedEvents = MinMaxAvgScalarEventBatch; diff --git a/disk/src/binned/scalar.rs b/disk/src/binned/scalar.rs index 8a9f1a0..7cbef79 100644 --- a/disk/src/binned/scalar.rs +++ b/disk/src/binned/scalar.rs @@ -1,4 +1,4 @@ -use crate::binned::{BinnedStreamKind, BinnedStreamRes}; +use crate::binned::{BinnedStreamRes, StreamKind}; use crate::binnedstream::BoxedStream; use crate::cache::BinnedQuery; use crate::raw::EventsQuery; @@ -12,7 +12,7 @@ pub async fn binned_stream( stream_kind: SK, ) -> Result, Error> where - SK: BinnedStreamKind, + SK: StreamKind, { if query.channel().backend != node_config.node.backend { let err = Error::with_msg(format!( diff --git a/disk/src/binnedstream.rs b/disk/src/binnedstream.rs index 23dd52a..48d8108 100644 --- a/disk/src/binnedstream.rs +++ b/disk/src/binnedstream.rs @@ -1,5 +1,5 @@ use crate::agg::streams::StreamItem; -use crate::binned::{BinnedStreamKind, RangeCompletableItem}; +use crate::binned::{RangeCompletableItem, StreamKind}; use crate::cache::pbvfs::PreBinnedScalarValueFetchedStream; use crate::cache::{CacheUsage, PreBinnedQuery}; use crate::frame::makeframe::FrameType; @@ -14,21 +14,18 @@ use std::task::{Context, Poll}; pub struct BinnedScalarStreamFromPreBinnedPatches where - SK: BinnedStreamKind, + SK: StreamKind, { inp: Pin< - Box< - dyn Stream::TBinnedBins>>, Error>> - + Send, - >, + Box::TBinnedBins>>, Error>> + Send>, >, _stream_kind: SK, } impl BinnedScalarStreamFromPreBinnedPatches where - SK: BinnedStreamKind, - Result::TBinnedBins>>, Error>: FrameType, + SK: StreamKind, + Result::TBinnedBins>>, Error>: FrameType, { pub fn new( patch_it: PreBinnedPatchIterator, @@ -120,7 +117,7 @@ where // Can I remove the whole type or keep for static check? impl Stream for BinnedScalarStreamFromPreBinnedPatches where - SK: BinnedStreamKind, + SK: StreamKind, { type Item = Result>, Error>; diff --git a/disk/src/cache.rs b/disk/src/cache.rs index 88f62ed..75bbbe7 100644 --- a/disk/src/cache.rs +++ b/disk/src/cache.rs @@ -1,5 +1,5 @@ use crate::agg::streams::StreamItem; -use crate::binned::{BinnedStreamKind, RangeCompletableItem}; +use crate::binned::{RangeCompletableItem, StreamKind}; use crate::cache::pbv::PreBinnedValueByteStream; use crate::frame::makeframe::FrameType; use crate::merge::MergedMinMaxAvgScalarStream; @@ -309,7 +309,7 @@ pub fn pre_binned_bytes_for_http( stream_kind: SK, ) -> Result, Error> where - SK: BinnedStreamKind, + SK: StreamKind, Result>, err::Error>: FrameType, { if query.channel.backend != node_config.node.backend { @@ -396,18 +396,18 @@ type T002 = Pin, Error>> + Send>>; pub struct MergedFromRemotes where - SK: BinnedStreamKind, + SK: StreamKind, { - tcp_establish_futs: Vec::XBinnedEvents>>>, - nodein: Vec::XBinnedEvents>>>>, - merged: Option::XBinnedEvents>>>, + tcp_establish_futs: Vec::XBinnedEvents>>>, + nodein: Vec::XBinnedEvents>>>>, + merged: Option::XBinnedEvents>>>, completed: bool, errored: bool, } impl MergedFromRemotes where - SK: BinnedStreamKind, + SK: StreamKind, { pub fn new(evq: EventsQuery, perf_opts: PerfOpts, cluster: Cluster, stream_kind: SK) -> Self { let mut tcp_establish_futs = vec![]; @@ -418,7 +418,7 @@ where node.clone(), stream_kind.clone(), ); - let f: T002::XBinnedEvents>> = Box::pin(f); + let f: T002::XBinnedEvents>> = Box::pin(f); tcp_establish_futs.push(f); } let n = tcp_establish_futs.len(); @@ -434,9 +434,9 @@ where impl Stream for MergedFromRemotes where - SK: BinnedStreamKind, + SK: StreamKind, { - type Item = Result::XBinnedEvents>>, Error>; + type Item = Result::XBinnedEvents>>, Error>; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; diff --git a/disk/src/cache/pbv.rs b/disk/src/cache/pbv.rs index beff55d..07f3327 100644 --- a/disk/src/cache/pbv.rs +++ b/disk/src/cache/pbv.rs @@ -1,5 +1,5 @@ use crate::agg::streams::{Appendable, StreamItem}; -use crate::binned::{BinnedStreamKind, RangeCompletableItem, WithLen}; +use crate::binned::{RangeCompletableItem, StreamKind, WithLen}; use crate::cache::pbvfs::PreBinnedScalarValueFetchedStream; use crate::cache::{CacheFileDesc, MergedFromRemotes, PreBinnedQuery, WrittenPbCache}; use crate::frame::makeframe::{make_frame, FrameType}; @@ -22,7 +22,7 @@ pub type PreBinnedValueByteStream = SCC>; pub struct PreBinnedValueByteStreamInner where - SK: BinnedStreamKind, + SK: StreamKind, { inp: PreBinnedValueStream, } @@ -33,8 +33,8 @@ pub fn pre_binned_value_byte_stream_new( stream_kind: SK, ) -> PreBinnedValueByteStream where - SK: BinnedStreamKind, - Result::TBinnedBins>>, err::Error>: FrameType, + SK: StreamKind, + Result::TBinnedBins>>, err::Error>: FrameType, { let s1 = PreBinnedValueStream::new(query.clone(), node_config, stream_kind); let s2 = PreBinnedValueByteStreamInner { inp: s1 }; @@ -43,8 +43,8 @@ where impl Stream for PreBinnedValueByteStreamInner where - SK: BinnedStreamKind, - Result::TBinnedBins>>, err::Error>: FrameType, + SK: StreamKind, + Result::TBinnedBins>>, err::Error>: FrameType, { type Item = Result; @@ -52,10 +52,9 @@ where use Poll::*; match self.inp.poll_next_unpin(cx) { Ready(Some(item)) => { - match make_frame::< - Result::TBinnedBins>>, err::Error>, - >(&item) - { + match make_frame::::TBinnedBins>>, err::Error>>( + &item, + ) { Ok(buf) => Ready(Some(Ok(buf.freeze()))), Err(e) => Ready(Some(Err(e.into()))), } @@ -68,7 +67,7 @@ where pub struct PreBinnedValueStream where - SK: BinnedStreamKind, + SK: StreamKind, { query: PreBinnedQuery, node_config: NodeConfigCached, @@ -76,12 +75,8 @@ where fut2: Option< Pin< Box< - dyn Stream< - Item = Result< - StreamItem::TBinnedBins>>, - err::Error, - >, - > + Send, + dyn Stream::TBinnedBins>>, err::Error>> + + Send, >, >, >, @@ -93,16 +88,13 @@ where errored: bool, completed: bool, streamlog: Streamlog, - values: ::TBinnedBins, + values: ::TBinnedBins, write_fut: Option> + Send>>>, read_cache_fut: Option< Pin< Box< dyn Future< - Output = Result< - StreamItem::TBinnedBins>>, - err::Error, - >, + Output = Result::TBinnedBins>>, err::Error>, > + Send, >, >, @@ -112,8 +104,8 @@ where impl PreBinnedValueStream where - SK: BinnedStreamKind, - Result::TBinnedBins>>, err::Error>: FrameType, + SK: StreamKind, + Result::TBinnedBins>>, err::Error>: FrameType, { pub fn new(query: PreBinnedQuery, node_config: &NodeConfigCached, stream_kind: SK) -> Self { Self { @@ -129,7 +121,7 @@ where errored: false, completed: false, streamlog: Streamlog::new(node_config.ix as u32), - values: <::TBinnedBins as Appendable>::empty(), + values: <::TBinnedBins as Appendable>::empty(), write_fut: None, read_cache_fut: None, stream_kind, @@ -164,7 +156,7 @@ where self.node_config.node_config.cluster.clone(), self.stream_kind.clone(), ); - let s1 = ::xbinned_to_tbinned(s1, range); + let s1 = ::xbinned_to_tbinned(s1, range); self.fut2 = Some(Box::pin(s1)); } @@ -239,10 +231,10 @@ where impl Stream for PreBinnedValueStream where - SK: BinnedStreamKind + Unpin, - Result::TBinnedBins>>, err::Error>: FrameType, + SK: StreamKind + Unpin, + Result::TBinnedBins>>, err::Error>: FrameType, { - type Item = Result::TBinnedBins>>, err::Error>; + type Item = Result::TBinnedBins>>, err::Error>; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; @@ -318,7 +310,7 @@ where self.streamlog.append(Level::INFO, msg); let values = std::mem::replace( &mut self.values, - <::TBinnedBins as Appendable>::empty(), + <::TBinnedBins as Appendable>::empty(), ); let fut = super::write_pb_cache_min_max_avg_scalar( values, @@ -371,7 +363,7 @@ where match item { Ok(file) => { self.read_from_cache = true; - let fut = <::TBinnedBins as crate::binned::ReadableFromFile>::read_from_file(file)?; + let fut = <::TBinnedBins as crate::binned::ReadableFromFile>::read_from_file(file)?; self.read_cache_fut = Some(Box::pin(fut)); continue 'outer; } diff --git a/disk/src/cache/pbvfs.rs b/disk/src/cache/pbvfs.rs index 695ce1e..54df2cd 100644 --- a/disk/src/cache/pbvfs.rs +++ b/disk/src/cache/pbvfs.rs @@ -1,5 +1,5 @@ use crate::agg::streams::StreamItem; -use crate::binned::{BinnedStreamKind, RangeCompletableItem}; +use crate::binned::{RangeCompletableItem, StreamKind}; use crate::cache::{node_ix_for_patch, HttpBodyAsAsyncRead, PreBinnedQuery}; use crate::frame::inmem::InMemoryFrameAsyncReadStream; use crate::frame::makeframe::{decode_frame, FrameType}; @@ -14,7 +14,7 @@ use std::task::{Context, Poll}; pub struct PreBinnedScalarValueFetchedStream where - SK: BinnedStreamKind, + SK: StreamKind, { uri: http::Uri, resfut: Option, @@ -26,7 +26,7 @@ where impl PreBinnedScalarValueFetchedStream where - SK: BinnedStreamKind, + SK: StreamKind, { pub fn new(query: &PreBinnedQuery, node_config: &NodeConfigCached, stream_kind: &SK) -> Result { let nodeix = node_ix_for_patch(&query.patch, &query.channel, &node_config.node_config.cluster); @@ -53,7 +53,7 @@ where // TODO change name, is now generic: impl Stream for PreBinnedScalarValueFetchedStream where - SK: BinnedStreamKind, + SK: StreamKind, Result>, err::Error>: FrameType, { type Item = Result>, Error>; diff --git a/disk/src/merge.rs b/disk/src/merge.rs index ebab73e..8d3e7cf 100644 --- a/disk/src/merge.rs +++ b/disk/src/merge.rs @@ -1,5 +1,5 @@ use crate::agg::streams::{Appendable, StatsItem, StreamItem}; -use crate::binned::{BinnedStreamKind, PushableIndex, RangeCompletableItem, WithLen, WithTimestamps}; +use crate::binned::{PushableIndex, RangeCompletableItem, StreamKind, WithLen, WithTimestamps}; use crate::streamlog::LogItem; use err::Error; use futures_core::Stream; @@ -13,14 +13,14 @@ use std::task::{Context, Poll}; pub struct MergedMinMaxAvgScalarStream where S: Stream>, Error>> + Unpin, - SK: BinnedStreamKind, + SK: StreamKind, { inps: Vec, - current: Vec::XBinnedEvents>>, + current: Vec::XBinnedEvents>>, ixs: Vec, errored: bool, completed: bool, - batch: ::XBinnedEvents, + batch: ::XBinnedEvents, ts_last_emit: u64, range_complete_observed: Vec, range_complete_observed_all: bool, @@ -34,7 +34,7 @@ where impl MergedMinMaxAvgScalarStream where S: Stream>, Error>> + Unpin, - SK: BinnedStreamKind, + SK: StreamKind, { pub fn new(inps: Vec) -> Self { let n = inps.len(); @@ -45,7 +45,7 @@ where ixs: vec![0; n], errored: false, completed: false, - batch: <::XBinnedEvents as Appendable>::empty(), + batch: <::XBinnedEvents as Appendable>::empty(), ts_last_emit: 0, range_complete_observed: vec![false; n], range_complete_observed_all: false, @@ -125,10 +125,10 @@ where // TODO change name, it is generic now: impl Stream for MergedMinMaxAvgScalarStream where - S: Stream::XBinnedEvents>>, Error>> + Unpin, - SK: BinnedStreamKind, + S: Stream::XBinnedEvents>>, Error>> + Unpin, + SK: StreamKind, { - type Item = Result::XBinnedEvents>>, Error>; + type Item = Result::XBinnedEvents>>, Error>; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; @@ -181,7 +181,7 @@ where if self.batch.len() != 0 { //let k = std::mem::replace(&mut self.batch, MinMaxAvgScalarEventBatch::empty()); //let ret = MinMaxAvgScalarEventBatchStreamItem::Values(k); - let emp = <::XBinnedEvents as Appendable>::empty(); + let emp = <::XBinnedEvents as Appendable>::empty(); let ret = std::mem::replace(&mut self.batch, emp); self.data_emit_complete = true; Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(ret))))) @@ -191,7 +191,7 @@ where } } else { assert!(lowest_ts >= self.ts_last_emit); - let emp = <::XBinnedEvents as Appendable>::empty(); + let emp = <::XBinnedEvents as Appendable>::empty(); let mut local_batch = std::mem::replace(&mut self.batch, emp); self.ts_last_emit = lowest_ts; let rix = self.ixs[lowest_ix]; @@ -225,7 +225,7 @@ where if self.batch.len() >= self.batch_size { //let k = std::mem::replace(&mut self.batch, MinMaxAvgScalarEventBatch::empty()); //let ret = MinMaxAvgScalarEventBatchStreamItem::Values(k); - let emp = <::XBinnedEvents as Appendable>::empty(); + let emp = <::XBinnedEvents as Appendable>::empty(); let ret = std::mem::replace(&mut self.batch, emp); Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(ret))))) } else { diff --git a/disk/src/raw.rs b/disk/src/raw.rs index 0e57f5d..8fb6c6f 100644 --- a/disk/src/raw.rs +++ b/disk/src/raw.rs @@ -6,7 +6,7 @@ to request such data from nodes. */ use crate::agg::streams::StreamItem; -use crate::binned::{BinnedStreamKind, RangeCompletableItem}; +use crate::binned::{RangeCompletableItem, StreamKind}; use crate::frame::inmem::InMemoryFrameAsyncReadStream; use crate::frame::makeframe::{make_frame, make_term_frame}; use crate::raw::bffr::EventsFromFrames; @@ -44,14 +44,14 @@ pub async fn x_processed_stream_from_node( ) -> Result< Pin< Box< - dyn Stream::XBinnedEvents>>, Error>> + dyn Stream::XBinnedEvents>>, Error>> + Send, >, >, Error, > where - SK: BinnedStreamKind, + SK: StreamKind, { let net = TcpStream::connect(format!("{}:{}", node.host, node.port_raw)).await?; let qjs = serde_json::to_string(&query)?; diff --git a/disk/src/raw/bffr.rs b/disk/src/raw/bffr.rs index 1881579..1c11113 100644 --- a/disk/src/raw/bffr.rs +++ b/disk/src/raw/bffr.rs @@ -1,5 +1,5 @@ use crate::agg::streams::StreamItem; -use crate::binned::{BinnedStreamKind, RangeCompletableItem, XBinnedEvents}; +use crate::binned::{RangeCompletableItem, StreamKind, XBinnedEvents}; use crate::frame::inmem::InMemoryFrameAsyncReadStream; use crate::frame::makeframe::decode_frame; use err::Error; @@ -13,7 +13,7 @@ use tokio::io::AsyncRead; pub struct EventsFromFrames where T: AsyncRead + Unpin, - SK: BinnedStreamKind, + SK: StreamKind, { inp: InMemoryFrameAsyncReadStream, errored: bool, @@ -24,7 +24,7 @@ where impl EventsFromFrames where T: AsyncRead + Unpin, - SK: BinnedStreamKind, + SK: StreamKind, { pub fn new(inp: InMemoryFrameAsyncReadStream, stream_kind: SK) -> Self { Self { @@ -39,11 +39,11 @@ where impl Stream for EventsFromFrames where T: AsyncRead + Unpin, - SK: BinnedStreamKind, + SK: StreamKind, // TODO see binned.rs better to express it on trait? //Result::XBinnedEvents>>, Error>: FrameType, { - type Item = Result::XBinnedEvents>>, Error>; + type Item = Result::XBinnedEvents>>, Error>; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; @@ -60,13 +60,10 @@ where StreamItem::Stats(item) => Ready(Some(Ok(StreamItem::Stats(item)))), StreamItem::DataItem(frame) => { match decode_frame::< - Result< - StreamItem::XBinnedEvents>>, - Error, - >, + Result::XBinnedEvents>>, Error>, >( &frame, - <::XBinnedEvents as XBinnedEvents>::frame_type(), + <::XBinnedEvents as XBinnedEvents>::frame_type(), ) { Ok(item) => match item { Ok(item) => Ready(Some(Ok(item))), diff --git a/disk/src/raw/conn.rs b/disk/src/raw/conn.rs index 56ccb16..9bd59d3 100644 --- a/disk/src/raw/conn.rs +++ b/disk/src/raw/conn.rs @@ -2,7 +2,7 @@ use crate::agg::binnedx::IntoBinnedXBins1; use crate::agg::eventbatch::MinMaxAvgScalarEventBatch; use crate::agg::streams::StreamItem; use crate::agg::IntoDim1F32Stream; -use crate::binned::{BinnedStreamKind, BinnedStreamKindScalar, RangeCompletableItem}; +use crate::binned::{BinnedStreamKindScalar, RangeCompletableItem, StreamKind}; use crate::eventblobs::EventBlobsComplete; use crate::eventchunker::EventChunkerConf; use crate::frame::inmem::InMemoryFrameAsyncReadStream; @@ -189,7 +189,7 @@ async fn events_conn_handler_inner_try( AggKind::DimXBins1 => { match make_frame::< Result< - StreamItem::XBinnedEvents>>, + StreamItem::XBinnedEvents>>, Error, >, >(&item) @@ -205,10 +205,7 @@ async fn events_conn_handler_inner_try( } // TODO define this case: AggKind::DimXBinsN(_xbincount) => match make_frame::< - Result< - StreamItem::XBinnedEvents>>, - Error, - >, + Result::XBinnedEvents>>, Error>, >(err::todoval()) { Ok(buf) => match netout.write_all(&buf).await {