diff --git a/disk/src/agg/binnedx.rs b/disk/src/agg/binnedx.rs index 09e889e..8b13789 100644 --- a/disk/src/agg/binnedx.rs +++ b/disk/src/agg/binnedx.rs @@ -1,74 +1 @@ -use crate::agg::streams::StreamItem; -use crate::agg::AggregatableXdim1Bin; -use crate::binned::{RangeCompletableItem, StreamKind}; -use err::Error; -use futures_core::Stream; -use futures_util::StreamExt; -use std::pin::Pin; -use std::task::{Context, Poll}; -pub trait IntoBinnedXBins1 -where - SK: StreamKind, - Self: Stream>, Error>> + Unpin, - I: AggregatableXdim1Bin, -{ - type StreamOut; - fn into_binned_x_bins_1(self) -> Self::StreamOut; -} - -impl IntoBinnedXBins1 for S -where - SK: StreamKind, - S: Stream>, Error>> + Unpin, - I: AggregatableXdim1Bin, -{ - type StreamOut = IntoBinnedXBins1DefaultStream; - - fn into_binned_x_bins_1(self) -> Self::StreamOut { - IntoBinnedXBins1DefaultStream { - inp: self, - _marker: std::marker::PhantomData::default(), - } - } -} - -pub struct IntoBinnedXBins1DefaultStream -where - SK: StreamKind, - S: Stream>, Error>> + Unpin, - I: AggregatableXdim1Bin, -{ - inp: S, - _marker: std::marker::PhantomData, -} - -impl Stream for IntoBinnedXBins1DefaultStream -where - SK: StreamKind, - S: Stream>, Error>> + Unpin, - I: AggregatableXdim1Bin, -{ - type Item = Result>, Error>; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - use Poll::*; - match self.inp.poll_next_unpin(cx) { - Ready(Some(Ok(k))) => match k { - StreamItem::Log(item) => Ready(Some(Ok(StreamItem::Log(item)))), - StreamItem::Stats(item) => Ready(Some(Ok(StreamItem::Stats(item)))), - StreamItem::DataItem(item) => match item { - RangeCompletableItem::RangeComplete => { - Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)))) - } - RangeCompletableItem::Data(item) => Ready(Some(Ok(StreamItem::DataItem( - RangeCompletableItem::Data(item.into_agg()), - )))), - }, - }, - Ready(Some(Err(e))) => Ready(Some(Err(e))), - Ready(None) => Ready(None), - Pending => Pending, - } - } -} diff --git a/disk/src/aggtest.rs b/disk/src/aggtest.rs index 9c8c1e5..421b2c8 100644 --- a/disk/src/aggtest.rs +++ b/disk/src/aggtest.rs @@ -1,5 +1,4 @@ use super::agg::IntoDim1F32Stream; -use crate::agg::binnedx::IntoBinnedXBins1; use crate::binned::BinnedStreamKindScalar; use crate::eventblobs::EventBlobsComplete; use crate::eventchunker::EventChunkerConf; @@ -65,12 +64,7 @@ async fn agg_x_dim_0_inner() { query.buffer_size as usize, event_chunker_conf, ); - let fut1 = IntoDim1F32Stream::into_dim_1_f32_stream(fut1); - let fut1 = IntoBinnedXBins1::<_, BinnedStreamKindScalar>::into_binned_x_bins_1(fut1); - - // TODO add the t binning and expectation. - let fut1 = fut1.for_each(|_k| ready(())); - fut1.await; + // TODO add the binning and expectation and await the result. } #[test] @@ -128,18 +122,5 @@ async fn agg_x_dim_1_inner() { } q }); - let fut1 = IntoBinnedXBins1::<_, BinnedStreamKindScalar>::into_binned_x_bins_1(fut1); - let fut1 = fut1.map(|k| { - //info!("after X binning {:?}", k.as_ref().unwrap()); - k - }); - - // TODO add T-binning and expectation. - let fut1 = fut1 - .map(|k| { - info!("after T binning {:?}", k.as_ref().unwrap()); - k - }) - .for_each(|_k| ready(())); - fut1.await; + // TODO add the binning and expectation and await the result. } diff --git a/disk/src/raw/conn.rs b/disk/src/raw/conn.rs index b60b630..4a89d9b 100644 --- a/disk/src/raw/conn.rs +++ b/disk/src/raw/conn.rs @@ -1,4 +1,3 @@ -use crate::agg::binnedx::IntoBinnedXBins1; use crate::agg::enp::{Identity, WaveXBinner}; use crate::agg::eventbatch::MinMaxAvgScalarEventBatch; use crate::agg::streams::StreamItem; @@ -283,112 +282,39 @@ async fn events_conn_handler_inner_try( compression: entry.is_compressed, }; - if true { - // TODO use a requested buffer size - let buffer_size = 1024 * 4; - let event_chunker_conf = EventChunkerConf::new(ByteSize::kb(1024)); - let event_blobs = EventBlobsComplete::new( - range.clone(), - channel_config.clone(), - node_config.node.clone(), - node_config.ix, - buffer_size, - event_chunker_conf, - ); - let shape = entry.to_shape().unwrap(); - let mut p1 = pipe1!(entry.scalar_type, entry.byte_order, shape, evq.agg_kind, event_blobs); - while let Some(item) = p1.next().await { - let item = item.make_frame(); - match item { - Ok(buf) => match netout.write_all(&buf).await { - Ok(_) => {} - Err(e) => return Err((e, netout))?, - }, - Err(e) => { - return Err((e, netout))?; - } - } - } - let buf = make_term_frame(); - match netout.write_all(&buf).await { - Ok(_) => (), - Err(e) => return Err((e, netout))?, - } - match netout.flush().await { - Ok(_) => (), - Err(e) => return Err((e, netout))?, - } - Ok(()) - } else { - // TODO remove this scope after refactor. - let buffer_size = 1024 * 4; - let event_chunker_conf = EventChunkerConf::new(ByteSize::kb(1024)); - let s1 = EventBlobsComplete::new( - range.clone(), - channel_config.clone(), - node_config.node.clone(), - node_config.ix, - buffer_size, - event_chunker_conf, - ) - .into_dim_1_f32_stream(); - // TODO need to decide already here on the type I want to use. - let mut s1 = IntoBinnedXBins1::<_, BinnedStreamKindScalar>::into_binned_x_bins_1(s1); - let mut e = 0; - while let Some(item) = s1.next().await { - match &item { - Ok(StreamItem::DataItem(_)) => { - e += 1; - } + // TODO use a requested buffer size + let buffer_size = 1024 * 4; + let event_chunker_conf = EventChunkerConf::new(ByteSize::kb(1024)); + let event_blobs = EventBlobsComplete::new( + range.clone(), + channel_config.clone(), + node_config.node.clone(), + node_config.ix, + buffer_size, + event_chunker_conf, + ); + let shape = entry.to_shape().unwrap(); + let mut p1 = pipe1!(entry.scalar_type, entry.byte_order, shape, evq.agg_kind, event_blobs); + while let Some(item) = p1.next().await { + let item = item.make_frame(); + match item { + Ok(buf) => match netout.write_all(&buf).await { Ok(_) => {} - Err(_) => {} - } - match evq.agg_kind { - AggKind::DimXBins1 => { - match make_frame::< - Result< - StreamItem::XBinnedEvents>>, - Error, - >, - >(&item) - { - Ok(buf) => match netout.write_all(&buf).await { - Ok(_) => {} - Err(e) => return Err((e, netout))?, - }, - Err(e) => { - return Err((e, netout))?; - } - } - } - // TODO define this case: - AggKind::DimXBinsN(_xbincount) => match make_frame::< - Result< - StreamItem::XBinnedEvents>>, - Error, - >, - >(err::todoval()) - { - Ok(buf) => match netout.write_all(&buf).await { - Ok(_) => {} - Err(e) => return Err((e, netout))?, - }, - Err(e) => { - return Err((e, netout))?; - } - }, + Err(e) => return Err((e, netout))?, + }, + Err(e) => { + return Err((e, netout))?; } } - let buf = make_term_frame(); - match netout.write_all(&buf).await { - Ok(_) => (), - Err(e) => return Err((e, netout))?, - } - match netout.flush().await { - Ok(_) => (), - Err(e) => return Err((e, netout))?, - } - let _total_written_value_items = e; - Ok(()) } + let buf = make_term_frame(); + match netout.write_all(&buf).await { + Ok(_) => (), + Err(e) => return Err((e, netout))?, + } + match netout.flush().await { + Ok(_) => (), + Err(e) => return Err((e, netout))?, + } + Ok(()) }