From 2b1be2f2b9922293b87045fce691a67a8e73c32b Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Wed, 26 May 2021 09:08:44 +0200 Subject: [PATCH] WIP not compile --- disk/src/agg.rs | 19 ++++++-- disk/src/agg/binnedt.rs | 60 ++++++++++++++++-------- disk/src/agg/binnedt2.rs | 84 ++++++++++++++++++++++++++++++---- disk/src/agg/binnedx.rs | 30 +++++++----- disk/src/agg/eventbatch.rs | 19 ++++++-- disk/src/agg/scalarbinbatch.rs | 19 ++++++-- disk/src/agg/streams.rs | 38 ++++++++------- disk/src/aggtest.rs | 32 ++++++++----- disk/src/binned.rs | 15 ++++-- disk/src/cache/pbv.rs | 29 +----------- disk/src/raw/conn.rs | 7 +-- 11 files changed, 235 insertions(+), 117 deletions(-) diff --git a/disk/src/agg.rs b/disk/src/agg.rs index 95889ee..13c748a 100644 --- a/disk/src/agg.rs +++ b/disk/src/agg.rs @@ -6,7 +6,7 @@ use super::eventchunker::EventFull; use crate::agg::binnedt::AggregatableTdim; use crate::agg::eventbatch::MinMaxAvgScalarEventBatch; use crate::agg::streams::StreamItem; -use crate::binned::RangeCompletableItem; +use crate::binned::{BinnedStreamKind, RangeCompletableItem}; use bytes::BytesMut; use err::Error; use futures_core::Stream; @@ -25,8 +25,11 @@ pub mod eventbatch; pub mod scalarbinbatch; pub mod streams; -pub trait AggregatableXdim1Bin { - type Output: AggregatableXdim1Bin + AggregatableTdim; +pub trait AggregatableXdim1Bin +where + SK: BinnedStreamKind, +{ + type Output: AggregatableXdim1Bin + AggregatableTdim; fn into_agg(self) -> Self::Output; } @@ -48,7 +51,10 @@ impl std::fmt::Debug for ValuesDim0 { } } -impl AggregatableXdim1Bin for ValuesDim1 { +impl AggregatableXdim1Bin for ValuesDim1 +where + SK: BinnedStreamKind, +{ type Output = MinMaxAvgScalarEventBatch; fn into_agg(self) -> Self::Output { @@ -142,7 +148,10 @@ impl std::fmt::Debug for ValuesDim1 { } } -impl AggregatableXdim1Bin for ValuesDim0 { +impl AggregatableXdim1Bin for ValuesDim0 +where + SK: BinnedStreamKind, +{ type Output = MinMaxAvgScalarEventBatch; fn into_agg(self) -> Self::Output { diff --git a/disk/src/agg/binnedt.rs b/disk/src/agg/binnedt.rs index ad4a833..ba95e52 100644 --- a/disk/src/agg/binnedt.rs +++ b/disk/src/agg/binnedt.rs @@ -1,6 +1,6 @@ use crate::agg::streams::StreamItem; use crate::agg::AggregatableXdim1Bin; -use crate::binned::RangeCompletableItem; +use crate::binned::{BinnedStreamKind, RangeCompletableItem}; use err::Error; use futures_core::Stream; use futures_util::StreamExt; @@ -10,9 +10,12 @@ use std::collections::VecDeque; use std::pin::Pin; use std::task::{Context, Poll}; -pub trait AggregatorTdim: Sized + Unpin { +pub trait AggregatorTdim: Sized + Unpin +where + SK: BinnedStreamKind, +{ type InputValue; - type OutputValue: AggregatableXdim1Bin + AggregatableTdim + Unpin; + type OutputValue: AggregatableXdim1Bin + AggregatableTdim + Unpin; fn ends_before(&self, inp: &Self::InputValue) -> bool; fn ends_after(&self, inp: &Self::InputValue) -> bool; fn starts_after(&self, inp: &Self::InputValue) -> bool; @@ -20,34 +23,44 @@ pub trait AggregatorTdim: Sized + Unpin { fn result(self) -> Vec; } -pub trait AggregatableTdim: Sized { - type Output: AggregatableXdim1Bin + AggregatableTdim; - type Aggregator: AggregatorTdim; +pub trait AggregatableTdim: Sized +where + SK: BinnedStreamKind, +{ + //type Output: AggregatableXdim1Bin + AggregatableTdim; + type Aggregator: AggregatorTdim::TBinnedBins>; fn aggregator_new_static(ts1: u64, ts2: u64) -> Self::Aggregator; } -pub trait IntoBinnedT { - type StreamOut: Stream; +pub trait IntoBinnedT +where + SK: BinnedStreamKind, +{ + type StreamOut: Stream< + Item = Result::TBinnedBins>>, Error>, + >; fn into_binned_t(self, spec: BinnedRange) -> Self::StreamOut; } -impl IntoBinnedT for S +impl IntoBinnedT for S where + SK: BinnedStreamKind, S: Stream>, Error>> + Unpin, - I: AggregatableTdim + Unpin, + I: AggregatableTdim + Unpin, I::Aggregator: Unpin, { - type StreamOut = IntoBinnedTDefaultStream; + type StreamOut = IntoBinnedTDefaultStream; fn into_binned_t(self, spec: BinnedRange) -> Self::StreamOut { IntoBinnedTDefaultStream::new(self, spec) } } -pub struct IntoBinnedTDefaultStream +pub struct IntoBinnedTDefaultStream where S: Stream>, Error>> + Unpin, - I: AggregatableTdim, + I: AggregatableTdim, + SK: BinnedStreamKind, { inp: S, aggtor: Option, @@ -60,13 +73,15 @@ where left: Option>, Error>>>>, errored: bool, completed: bool, - tmp_agg_results: VecDeque<::OutputValue>, + tmp_agg_results: VecDeque<>::OutputValue>, + _marker: std::marker::PhantomData, } -impl IntoBinnedTDefaultStream +impl IntoBinnedTDefaultStream where S: Stream>, Error>> + Unpin, - I: AggregatableTdim, + I: AggregatableTdim, + SK: BinnedStreamKind, { pub fn new(inp: S, spec: BinnedRange) -> Self { let range = spec.get_range(0); @@ -83,6 +98,7 @@ where errored: false, completed: false, tmp_agg_results: VecDeque::new(), + _marker: std::marker::PhantomData::default(), } } @@ -116,7 +132,9 @@ where &mut self, cur: Poll>, Error>>>, ) -> Option< - Poll::OutputValue>>, Error>>>, + Poll< + Option>::OutputValue>>, Error>>, + >, > { use Poll::*; match cur { @@ -178,13 +196,15 @@ where } } -impl Stream for IntoBinnedTDefaultStream +impl Stream for IntoBinnedTDefaultStream where S: Stream>, Error>> + Unpin, - I: AggregatableTdim + Unpin, + I: AggregatableTdim + Unpin, I::Aggregator: Unpin, + SK: BinnedStreamKind, { - type Item = Result::OutputValue>>, Error>; + //type Item = Result::OutputValue>>, Error>; + type Item = Result::TBinnedBins>>, Error>; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; diff --git a/disk/src/agg/binnedt2.rs b/disk/src/agg/binnedt2.rs index 7bf9d17..f39c13b 100644 --- a/disk/src/agg/binnedt2.rs +++ b/disk/src/agg/binnedt2.rs @@ -213,29 +213,97 @@ where } } -pub struct MinMaxAvgScalarBinBatchAgg {} +pub struct MinMaxAvgScalarBinBatchAgg { + ts1: u64, + ts2: u64, + count: u64, + min: f32, + max: f32, + sum: f32, + sumc: u64, +} + +impl MinMaxAvgScalarBinBatchAgg { + pub fn new(ts1: u64, ts2: u64) -> Self { + Self { + ts1, + ts2, + count: 0, + min: f32::MAX, + max: f32::MIN, + sum: 0f32, + sumc: 0, + } + } +} impl AggregatorTdim2 for MinMaxAvgScalarBinBatchAgg { type InputValue = MinMaxAvgScalarBinBatch; fn ends_before(&self, inp: &Self::InputValue) -> bool { - todo!() + match inp.ts2s.last() { + Some(&ts) => ts <= self.ts1, + None => true, + } } fn ends_after(&self, inp: &Self::InputValue) -> bool { - todo!() + match inp.ts2s.last() { + Some(&ts) => ts >= self.ts2, + _ => panic!(), + } } fn starts_after(&self, inp: &Self::InputValue) -> bool { - todo!() + match inp.ts1s.first() { + Some(&ts) => ts >= self.ts2, + _ => panic!(), + } } - fn ingest(&mut self, inp: &mut Self::InputValue) { - todo!() + fn ingest(&mut self, v: &mut Self::InputValue) { + for i1 in 0..v.ts1s.len() { + let ts1 = v.ts1s[i1]; + let ts2 = v.ts2s[i1]; + if ts2 <= self.ts1 { + continue; + } else if ts1 >= self.ts2 { + continue; + } else { + self.count += v.counts[i1]; + self.min = self.min.min(v.mins[i1]); + self.max = self.max.max(v.maxs[i1]); + let x = v.avgs[i1]; + if x.is_nan() { + } else { + if self.sum.is_nan() { + self.sum = x; + } else { + self.sum += x; + } + self.sumc += 1; + } + } + } } fn result(self) -> Vec { - todo!() + let min = if self.min == f32::MAX { f32::NAN } else { self.min }; + let max = if self.max == f32::MIN { f32::NAN } else { self.max }; + let avg = if self.sumc == 0 { + f32::NAN + } else { + self.sum / self.sumc as f32 + }; + let v = MinMaxAvgScalarBinBatch { + ts1s: vec![self.ts1], + ts2s: vec![self.ts2], + counts: vec![self.count], + mins: vec![min], + maxs: vec![max], + avgs: vec![avg], + }; + vec![v] } } @@ -243,6 +311,6 @@ impl AggregatableTdim2 for MinMaxAvgScalarBinBatch { type Aggregator = MinMaxAvgScalarBinBatchAgg; fn aggregator_new_static(ts1: u64, ts2: u64) -> Self::Aggregator { - todo!() + Self::Aggregator::new(ts1, ts2) } } diff --git a/disk/src/agg/binnedx.rs b/disk/src/agg/binnedx.rs index bf0fa0d..0c0bdeb 100644 --- a/disk/src/agg/binnedx.rs +++ b/disk/src/agg/binnedx.rs @@ -1,45 +1,53 @@ use crate::agg::streams::StreamItem; use crate::agg::AggregatableXdim1Bin; -use crate::binned::RangeCompletableItem; +use crate::binned::{BinnedStreamKind, RangeCompletableItem}; use err::Error; use futures_core::Stream; use futures_util::StreamExt; use std::pin::Pin; use std::task::{Context, Poll}; -pub trait IntoBinnedXBins1 +pub trait IntoBinnedXBins1 where + SK: BinnedStreamKind, Self: Stream>, Error>> + Unpin, - I: AggregatableXdim1Bin, + I: AggregatableXdim1Bin, { type StreamOut; fn into_binned_x_bins_1(self) -> Self::StreamOut; } -impl IntoBinnedXBins1 for S +impl IntoBinnedXBins1 for S where + SK: BinnedStreamKind, S: Stream>, Error>> + Unpin, - I: AggregatableXdim1Bin, + I: AggregatableXdim1Bin, { - type StreamOut = IntoBinnedXBins1DefaultStream; + type StreamOut = IntoBinnedXBins1DefaultStream; fn into_binned_x_bins_1(self) -> Self::StreamOut { - IntoBinnedXBins1DefaultStream { inp: self } + IntoBinnedXBins1DefaultStream { + inp: self, + _marker: std::marker::PhantomData::default(), + } } } -pub struct IntoBinnedXBins1DefaultStream +pub struct IntoBinnedXBins1DefaultStream where + SK: BinnedStreamKind, S: Stream>, Error>> + Unpin, - I: AggregatableXdim1Bin, + I: AggregatableXdim1Bin, { inp: S, + _marker: std::marker::PhantomData, } -impl Stream for IntoBinnedXBins1DefaultStream +impl Stream for IntoBinnedXBins1DefaultStream where + SK: BinnedStreamKind, S: Stream>, Error>> + Unpin, - I: AggregatableXdim1Bin, + I: AggregatableXdim1Bin, { type Item = Result>, Error>; diff --git a/disk/src/agg/eventbatch.rs b/disk/src/agg/eventbatch.rs index eab5a84..eff5224 100644 --- a/disk/src/agg/eventbatch.rs +++ b/disk/src/agg/eventbatch.rs @@ -2,7 +2,7 @@ use crate::agg::binnedt::{AggregatableTdim, AggregatorTdim}; use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch; use crate::agg::streams::StreamItem; use crate::agg::AggregatableXdim1Bin; -use crate::binned::{MakeBytesFrame, RangeCompletableItem}; +use crate::binned::{BinnedStreamKind, MakeBytesFrame, RangeCompletableItem}; use crate::frame::makeframe::make_frame; use bytes::{BufMut, Bytes, BytesMut}; use err::Error; @@ -100,15 +100,21 @@ impl std::fmt::Debug for MinMaxAvgScalarEventBatch { } } -impl AggregatableXdim1Bin for MinMaxAvgScalarEventBatch { +impl AggregatableXdim1Bin for MinMaxAvgScalarEventBatch +where + SK: BinnedStreamKind, +{ type Output = MinMaxAvgScalarEventBatch; fn into_agg(self) -> Self::Output { self } } -impl AggregatableTdim for MinMaxAvgScalarEventBatch { - type Output = MinMaxAvgScalarBinBatch; +impl AggregatableTdim for MinMaxAvgScalarEventBatch +where + SK: BinnedStreamKind, +{ + //type Output = MinMaxAvgScalarBinBatch; type Aggregator = MinMaxAvgScalarEventBatchAggregator; fn aggregator_new_static(ts1: u64, ts2: u64) -> Self::Aggregator { @@ -165,7 +171,10 @@ impl MinMaxAvgScalarEventBatchAggregator { } } -impl AggregatorTdim for MinMaxAvgScalarEventBatchAggregator { +impl AggregatorTdim for MinMaxAvgScalarEventBatchAggregator +where + SK: BinnedStreamKind, +{ type InputValue = MinMaxAvgScalarEventBatch; type OutputValue = MinMaxAvgScalarBinBatch; diff --git a/disk/src/agg/scalarbinbatch.rs b/disk/src/agg/scalarbinbatch.rs index 9779167..37a98d7 100644 --- a/disk/src/agg/scalarbinbatch.rs +++ b/disk/src/agg/scalarbinbatch.rs @@ -1,7 +1,7 @@ use crate::agg::binnedt::{AggregatableTdim, AggregatorTdim}; use crate::agg::streams::{Bins, StreamItem}; use crate::agg::{AggregatableXdim1Bin, Fits, FitsInside}; -use crate::binned::{MakeBytesFrame, RangeCompletableItem}; +use crate::binned::{BinnedStreamKind, MakeBytesFrame, RangeCompletableItem}; use crate::frame::makeframe::make_frame; use bytes::{BufMut, Bytes, BytesMut}; use err::Error; @@ -185,15 +185,21 @@ impl MinMaxAvgScalarBinBatch { } } -impl AggregatableXdim1Bin for MinMaxAvgScalarBinBatch { +impl AggregatableXdim1Bin for MinMaxAvgScalarBinBatch +where + SK: BinnedStreamKind, +{ type Output = MinMaxAvgScalarBinBatch; fn into_agg(self) -> Self::Output { todo!() } } -impl AggregatableTdim for MinMaxAvgScalarBinBatch { - type Output = MinMaxAvgScalarBinBatch; +impl AggregatableTdim for MinMaxAvgScalarBinBatch +where + SK: BinnedStreamKind, +{ + //type Output = MinMaxAvgScalarBinBatch; type Aggregator = MinMaxAvgScalarBinBatchAggregator; fn aggregator_new_static(ts1: u64, ts2: u64) -> Self::Aggregator { @@ -231,7 +237,10 @@ impl MinMaxAvgScalarBinBatchAggregator { } } -impl AggregatorTdim for MinMaxAvgScalarBinBatchAggregator { +impl AggregatorTdim for MinMaxAvgScalarBinBatchAggregator +where + SK: BinnedStreamKind, +{ type InputValue = MinMaxAvgScalarBinBatch; type OutputValue = MinMaxAvgScalarBinBatch; diff --git a/disk/src/agg/streams.rs b/disk/src/agg/streams.rs index 4ea2f6b..d1ffcb7 100644 --- a/disk/src/agg/streams.rs +++ b/disk/src/agg/streams.rs @@ -1,5 +1,6 @@ use crate::agg::binnedt::{AggregatableTdim, AggregatorTdim}; use crate::agg::AggregatableXdim1Bin; +use crate::binned::BinnedStreamKind; use crate::streamlog::LogItem; use err::Error; use netpod::EventDataReadStats; @@ -36,11 +37,12 @@ pub trait ToJsonResult { fn to_json_result(&self) -> Result; } -impl AggregatableXdim1Bin for StreamItem +impl AggregatableXdim1Bin for StreamItem where - T: AggregatableTdim + AggregatableXdim1Bin, + SK: BinnedStreamKind, + T: AggregatableTdim + AggregatableXdim1Bin, { - type Output = StreamItem<::Output>; + type Output = StreamItem<>::Output>; fn into_agg(self) -> Self::Output { match self { @@ -51,30 +53,33 @@ where } } -pub struct StreamItemAggregator +pub struct StreamItemAggregator where - T: AggregatableTdim, + T: AggregatableTdim, + SK: BinnedStreamKind, { - inner_agg: ::Aggregator, + inner_agg: >::Aggregator, } -impl StreamItemAggregator +impl StreamItemAggregator where - T: AggregatableTdim, + T: AggregatableTdim, + SK: BinnedStreamKind, { pub fn new(ts1: u64, ts2: u64) -> Self { Self { - inner_agg: ::aggregator_new_static(ts1, ts2), + inner_agg: >::aggregator_new_static(ts1, ts2), } } } -impl AggregatorTdim for StreamItemAggregator +impl AggregatorTdim for StreamItemAggregator where - T: AggregatableTdim, + T: AggregatableTdim, + SK: BinnedStreamKind, { type InputValue = StreamItem; - type OutputValue = StreamItem<<::Aggregator as AggregatorTdim>::OutputValue>; + type OutputValue = StreamItem<<>::Aggregator as AggregatorTdim>::OutputValue>; fn ends_before(&self, inp: &Self::InputValue) -> bool { match inp { @@ -119,12 +124,13 @@ where } } -impl AggregatableTdim for StreamItem +impl AggregatableTdim for StreamItem where - T: AggregatableTdim, + T: AggregatableTdim, + SK: BinnedStreamKind, { - type Output = StreamItem< as AggregatorTdim>::OutputValue>; - type Aggregator = StreamItemAggregator; + //type Output = StreamItem< as AggregatorTdim>::OutputValue>; + type Aggregator = StreamItemAggregator; fn aggregator_new_static(ts1: u64, ts2: u64) -> Self::Aggregator { Self::Aggregator::new(ts1, ts2) diff --git a/disk/src/aggtest.rs b/disk/src/aggtest.rs index 5b8d395..790daf9 100644 --- a/disk/src/aggtest.rs +++ b/disk/src/aggtest.rs @@ -1,6 +1,7 @@ use super::agg::IntoDim1F32Stream; use crate::agg::binnedt::IntoBinnedT; use crate::agg::binnedx::IntoBinnedXBins1; +use crate::binned::BinnedStreamKindScalar; use crate::eventblobs::EventBlobsComplete; use crate::eventchunker::EventChunkerConf; use futures_util::StreamExt; @@ -66,8 +67,11 @@ async fn agg_x_dim_0_inner() { event_chunker_conf, ); let fut1 = IntoDim1F32Stream::into_dim_1_f32_stream(fut1); - let fut1 = IntoBinnedXBins1::into_binned_x_bins_1(fut1); - let fut1 = IntoBinnedT::into_binned_t(fut1, BinnedRange::covering_range(range, bin_count).unwrap().unwrap()); + let fut1 = IntoBinnedXBins1::<_, BinnedStreamKindScalar>::into_binned_x_bins_1(fut1); + let fut1 = IntoBinnedT::::into_binned_t( + fut1, + BinnedRange::covering_range(range, bin_count).unwrap().unwrap(), + ); let fut1 = fut1 //.into_binned_t(BinnedRange::covering_range(range, bin_count).unwrap().unwrap()) .for_each(|_k| ready(())); @@ -128,17 +132,21 @@ async fn agg_x_dim_1_inner() { } } q - }) - .into_binned_x_bins_1() - .map(|k| { + }); + let fut1 = IntoBinnedXBins1::<_, BinnedStreamKindScalar>::into_binned_x_bins_1(fut1); + let fut1 = fut1.map(|k| { //info!("after X binning {:?}", k.as_ref().unwrap()); k - }) - .into_binned_t(BinnedRange::covering_range(range, bin_count).unwrap().unwrap()) - .map(|k| { - info!("after T binning {:?}", k.as_ref().unwrap()); - k - }) - .for_each(|_k| ready(())); + }); + let fut1 = crate::agg::binnedt::IntoBinnedT::::into_binned_t( + fut1, + BinnedRange::covering_range(range, bin_count).unwrap().unwrap(), + ); + let fut1 = fut1 + .map(|k| { + info!("after T binning {:?}", k.as_ref().unwrap()); + k + }) + .for_each(|_k| ready(())); fut1.await; } diff --git a/disk/src/binned.rs b/disk/src/binned.rs index 2a28e36..da523da 100644 --- a/disk/src/binned.rs +++ b/disk/src/binned.rs @@ -473,7 +473,7 @@ impl Collectable for MinMaxAvgScalarBinBatch { } } -pub trait XBinnedEvents: +pub trait XBinnedEvents: Sized + Unpin + Send @@ -481,10 +481,12 @@ pub trait XBinnedEvents: + DeserializeOwned + Collectable + Collected - + AggregatableTdim + + AggregatableTdim + WithLen + WithTimestamps + PushableIndex +where + SK: BinnedStreamKind, { fn frame_type() -> u32; } @@ -505,7 +507,10 @@ pub trait TBinnedBins: fn frame_type() -> u32; } -impl XBinnedEvents for MinMaxAvgScalarEventBatch { +impl XBinnedEvents for MinMaxAvgScalarEventBatch +where + SK: BinnedStreamKind, +{ fn frame_type() -> u32 { >, Error> as FrameType>::FRAME_TYPE_ID } @@ -524,7 +529,7 @@ pub trait BinnedStreamKind: Clone + Unpin + Send + Sync + 'static type TBinnedStreamType: Stream>, Error>> + Send + 'static; - type XBinnedEvents: XBinnedEvents; + type XBinnedEvents: XBinnedEvents; type TBinnedBins: TBinnedBins; fn new_binned_from_prebinned( @@ -603,7 +608,7 @@ impl BinnedStreamKind for BinnedStreamKindScalar { ) -> Result { let s = MergedFromRemotes::new(evq, perf_opts, node_config.node_config.cluster.clone(), self.clone()); // TODO use the binned2 instead - let s = crate::agg::binnedt::IntoBinnedT::into_binned_t(s, range); + let s = crate::agg::binnedt::IntoBinnedT::::into_binned_t(s, range); Ok(BoxedStream::new(Box::pin(s))?) } } diff --git a/disk/src/cache/pbv.rs b/disk/src/cache/pbv.rs index 9079932..ee9342e 100644 --- a/disk/src/cache/pbv.rs +++ b/disk/src/cache/pbv.rs @@ -1,4 +1,3 @@ -use crate::agg::binnedt::IntoBinnedT; use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch; use crate::agg::streams::{Collectable, Collected, StreamItem}; use crate::binned::RangeCompletableItem::RangeComplete; @@ -168,33 +167,9 @@ where self.node_config.node_config.cluster.clone(), self.stream_kind.clone(), ); - let s1 = IntoBinnedT::into_binned_t(s1, range); - let s1 = s1.map(|item| { - // TODO does this do anything? - match item { - Ok(item) => match item { - StreamItem::Log(item) => Ok(StreamItem::Log(item)), - StreamItem::Stats(item) => Ok(StreamItem::Stats(item)), - StreamItem::DataItem(item) => Ok(StreamItem::DataItem(item)), - /*StreamItem::DataItem(item) => match item { - MinMaxAvgScalarBinBatchStreamItem::RangeComplete => { - Ok(StreamItem::DataItem(PreBinnedScalarItem::RangeComplete)) - } - MinMaxAvgScalarBinBatchStreamItem::Values(item) => { - Ok(StreamItem::DataItem(PreBinnedScalarItem::Batch(item))) - } - },*/ - }, - Err(e) => Err(e), - } - }); - - // TODO - // In the above must introduce a trait to convert to the generic item type: - - // TODO!! - self.fut2 = Some(err::todoval()); + let s1 = crate::agg::binnedt::IntoBinnedT::::into_binned_t(s1, range); //self.fut2 = Some(Box::pin(s1)); + self.fut2 = err::todoval(); } fn setup_from_higher_res_prebinned(&mut self, range: PreBinnedPatchRange) { diff --git a/disk/src/raw/conn.rs b/disk/src/raw/conn.rs index 170db0c..5129dda 100644 --- a/disk/src/raw/conn.rs +++ b/disk/src/raw/conn.rs @@ -154,7 +154,7 @@ async fn events_conn_handler_inner_try( // TODO use a requested buffer size let buffer_size = 1024 * 4; let event_chunker_conf = EventChunkerConf::new(ByteSize::kb(1024)); - let mut s1 = EventBlobsComplete::new( + let s1 = EventBlobsComplete::new( range.clone(), channel_config.clone(), node_config.node.clone(), @@ -162,8 +162,9 @@ async fn events_conn_handler_inner_try( buffer_size, event_chunker_conf, ) - .into_dim_1_f32_stream() - .into_binned_x_bins_1(); + .into_dim_1_f32_stream(); + // TODO need to decide already here on the type I want to use. + let mut s1 = IntoBinnedXBins1::<_, BinnedStreamKindScalar>::into_binned_x_bins_1(s1); let mut e = 0; while let Some(item) = s1.next().await { match &item {