From 5c7262c2c3a918940e171a4a11c0c6337bf035d5 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Mon, 7 Jun 2021 16:21:52 +0200 Subject: [PATCH] WIP Start to add alternative tbin pipeline --- disk/src/agg.rs | 2 + disk/src/agg/binnedt4.rs | 304 ++++++++++++++++++++++++++++++ disk/src/agg/enp.rs | 46 +++++ disk/src/binned.rs | 131 ++++++------- disk/src/binned/pbv.rs | 142 ++++++++------ disk/src/binned/pbv2.rs | 7 +- disk/src/binned/prebinned.rs | 59 ++++-- disk/src/cache.rs | 13 +- disk/src/decode.rs | 4 + disk/src/frame/makeframe.rs | 101 ++++++---- disk/src/merge.rs | 2 + disk/src/merge/mergefromremote.rs | 120 ++++++++++++ disk/src/raw.rs | 34 +++- disk/src/raw/bffr.rs | 57 +++--- disk/src/raw/conn.rs | 5 +- 15 files changed, 803 insertions(+), 224 deletions(-) create mode 100644 disk/src/agg/binnedt4.rs create mode 100644 disk/src/agg/enp.rs create mode 100644 disk/src/merge/mergefromremote.rs diff --git a/disk/src/agg.rs b/disk/src/agg.rs index a687c2a..556e6d4 100644 --- a/disk/src/agg.rs +++ b/disk/src/agg.rs @@ -21,7 +21,9 @@ use std::time::{Duration, Instant}; pub mod binnedt; pub mod binnedt2; pub mod binnedt3; +pub mod binnedt4; pub mod binnedx; +pub mod enp; pub mod eventbatch; pub mod scalarbinbatch; pub mod streams; diff --git a/disk/src/agg/binnedt4.rs b/disk/src/agg/binnedt4.rs new file mode 100644 index 0000000..cc512ee --- /dev/null +++ b/disk/src/agg/binnedt4.rs @@ -0,0 +1,304 @@ +use crate::agg::enp::XBinnedScalarEvents; +use crate::agg::eventbatch::MinMaxAvgScalarEventBatch; +use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch; +use crate::agg::streams::StreamItem; +use crate::binned::{BinsTimeBinner, EventsTimeBinner, MinMaxAvgBins, NumOps, RangeCompletableItem, RangeOverlapInfo}; +use crate::decode::EventValues; +use err::Error; +use futures_core::Stream; +use futures_util::StreamExt; +use netpod::log::*; +use netpod::{BinnedRange, NanoRange}; +use std::collections::VecDeque; +use std::marker::PhantomData; +use std::pin::Pin; +use std::task::{Context, Poll}; + +pub struct DefaultScalarEventsTimeBinner { + _m1: PhantomData, +} + +impl EventsTimeBinner for DefaultScalarEventsTimeBinner +where + NTY: NumOps, +{ + type Input = EventValues; + type Output = MinMaxAvgBins; + + fn process(inp: Self::Input) -> Self::Output { + todo!() + } +} + +pub struct DefaultSingleXBinTimeBinner { + _m1: PhantomData, +} + +impl EventsTimeBinner for DefaultSingleXBinTimeBinner +where + NTY: NumOps, +{ + type Input = XBinnedScalarEvents; + // TODO is that output type good enough for now? + type Output = MinMaxAvgBins; + + fn process(inp: Self::Input) -> Self::Output { + todo!() + } +} + +pub struct DefaultBinsTimeBinner { + _m1: PhantomData, +} + +impl BinsTimeBinner for DefaultBinsTimeBinner +where + NTY: NumOps, +{ + type Input = MinMaxAvgBins; + type Output = MinMaxAvgBins; + + fn process(inp: Self::Input) -> Self::Output { + todo!() + } +} + +pub trait Aggregator3Tdim { + type InputValue; + type OutputValue; +} + +pub struct Agg3 { + range: NanoRange, + count: u64, + min: f32, + max: f32, + sum: f32, + sumc: u64, +} + +impl Agg3 { + fn new(range: NanoRange) -> Self { + Self { + range, + count: 0, + min: f32::MAX, + max: f32::MIN, + sum: f32::NAN, + sumc: 0, + } + } + + fn ingest(&mut self, item: &MinMaxAvgScalarEventBatch) { + for i1 in 0..item.tss.len() { + let ts = item.tss[i1]; + if ts < self.range.beg { + continue; + } else if ts >= self.range.end { + continue; + } else { + self.count += 1; + self.min = self.min.min(item.mins[i1]); + self.max = self.max.max(item.maxs[i1]); + let x = item.avgs[i1]; + if x.is_nan() { + } else { + if self.sum.is_nan() { + self.sum = x; + } else { + self.sum += x; + } + self.sumc += 1; + } + } + } + } + + fn result(self) -> Vec { + let min = if self.min == f32::MAX { f32::NAN } else { self.min }; + let max = if self.max == f32::MIN { f32::NAN } else { self.max }; + let avg = if self.sumc == 0 { + f32::NAN + } else { + self.sum / self.sumc as f32 + }; + let v = MinMaxAvgScalarBinBatch { + ts1s: vec![self.range.beg], + ts2s: vec![self.range.end], + counts: vec![self.count], + mins: vec![min], + maxs: vec![max], + avgs: vec![avg], + }; + vec![v] + } +} + +impl Aggregator3Tdim for Agg3 { + type InputValue = MinMaxAvgScalarEventBatch; + type OutputValue = MinMaxAvgScalarBinBatch; +} + +pub struct BinnedT3Stream { + // TODO get rid of box: + inp: Pin>, Error>> + Send>>, + //aggtor: Option<<::XBinnedEvents as AggregatableTdim>::Aggregator>, + aggtor: Option, + spec: BinnedRange, + curbin: u32, + inp_completed: bool, + all_bins_emitted: bool, + range_complete_observed: bool, + range_complete_emitted: bool, + left: Option>, Error>>>>, + errored: bool, + completed: bool, + tmp_agg_results: VecDeque, +} + +impl BinnedT3Stream { + pub fn new(inp: S, spec: BinnedRange) -> Self + where + S: Stream>, Error>> + Send + 'static, + { + let range = spec.get_range(0); + Self { + inp: Box::pin(inp), + aggtor: Some(Agg3::new(range)), + spec, + curbin: 0, + inp_completed: false, + all_bins_emitted: false, + range_complete_observed: false, + range_complete_emitted: false, + left: None, + errored: false, + completed: false, + tmp_agg_results: VecDeque::new(), + } + } + + fn cur( + &mut self, + cx: &mut Context, + ) -> Poll>, Error>>> { + if let Some(cur) = self.left.take() { + cur + } else if self.inp_completed { + Poll::Ready(None) + } else { + let inp_poll_span = span!(Level::TRACE, "into_t_inp_poll"); + inp_poll_span.in_scope(|| self.inp.poll_next_unpin(cx)) + } + } + + fn cycle_current_bin(&mut self) { + self.curbin += 1; + let range = self.spec.get_range(self.curbin); + let ret = self + .aggtor + .replace(Agg3::new(range)) + // TODO handle None case, or remove Option if Agg is always present + .unwrap() + .result(); + self.tmp_agg_results = VecDeque::from(ret); + if self.curbin >= self.spec.count as u32 { + self.all_bins_emitted = true; + } + } + + fn handle( + &mut self, + cur: Poll>, Error>>>, + ) -> Option>, Error>>>> { + use Poll::*; + match cur { + Ready(Some(Ok(item))) => match item { + StreamItem::Log(item) => Some(Ready(Some(Ok(StreamItem::Log(item))))), + StreamItem::Stats(item) => Some(Ready(Some(Ok(StreamItem::Stats(item))))), + StreamItem::DataItem(item) => match item { + RangeCompletableItem::RangeComplete => { + self.range_complete_observed = true; + None + } + RangeCompletableItem::Data(item) => { + if self.all_bins_emitted { + // Just drop the item because we will not emit anymore data. + // Could also at least gather some stats. + None + } else { + let ag = self.aggtor.as_mut().unwrap(); + if item.ends_before(ag.range.clone()) { + None + } else if item.starts_after(ag.range.clone()) { + self.left = + Some(Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(item)))))); + self.cycle_current_bin(); + // TODO cycle_current_bin enqueues the bin, can I return here instead? + None + } else { + ag.ingest(&item); + if item.ends_after(ag.range.clone()) { + self.left = + Some(Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(item)))))); + self.cycle_current_bin(); + } + // TODO cycle_current_bin enqueues the bin, can I return here instead? + None + } + } + } + }, + }, + Ready(Some(Err(e))) => { + self.errored = true; + Some(Ready(Some(Err(e)))) + } + Ready(None) => { + self.inp_completed = true; + if self.all_bins_emitted { + None + } else { + self.cycle_current_bin(); + // TODO cycle_current_bin enqueues the bin, can I return here instead? + None + } + } + Pending => Some(Pending), + } + } +} + +impl Stream for BinnedT3Stream { + type Item = Result>, Error>; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + use Poll::*; + 'outer: loop { + break if self.completed { + panic!("IntoBinnedTDefaultStream poll_next on completed"); + } else if self.errored { + self.completed = true; + Ready(None) + } else if let Some(item) = self.tmp_agg_results.pop_front() { + Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))))) + } else if self.range_complete_emitted { + self.completed = true; + Ready(None) + } else if self.inp_completed && self.all_bins_emitted { + self.range_complete_emitted = true; + if self.range_complete_observed { + Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)))) + } else { + continue 'outer; + } + } else { + let cur = self.cur(cx); + match self.handle(cur) { + Some(item) => item, + None => continue 'outer, + } + }; + } + } +} diff --git a/disk/src/agg/enp.rs b/disk/src/agg/enp.rs new file mode 100644 index 0000000..5bbf96f --- /dev/null +++ b/disk/src/agg/enp.rs @@ -0,0 +1,46 @@ +use crate::binned::{EventsNodeProcessor, NumOps}; +use crate::decode::EventValues; +use serde::de::DeserializeOwned; +use serde::{Deserialize, Serialize}; +use std::marker::PhantomData; + +pub struct Identity { + _m1: PhantomData, +} + +impl EventsNodeProcessor for Identity +where + NTY: NumOps, +{ + type Input = NTY; + type Output = EventValues; + + fn process(inp: EventValues) -> Self::Output { + todo!() + } +} + +#[derive(Serialize, Deserialize)] +pub struct XBinnedScalarEvents { + tss: Vec, + mins: Vec, + maxs: Vec, + avgs: Vec, + xbincount: Vec, +} + +pub struct WaveXBinner { + _m1: PhantomData, +} + +impl EventsNodeProcessor for WaveXBinner +where + NTY: NumOps, +{ + type Input = Vec; + type Output = XBinnedScalarEvents; + + fn process(_inp: EventValues) -> Self::Output { + todo!() + } +} diff --git a/disk/src/binned.rs b/disk/src/binned.rs index 559f705..022d6e9 100644 --- a/disk/src/binned.rs +++ b/disk/src/binned.rs @@ -544,8 +544,14 @@ impl TBinnedBins for MinMaxAvgScalarBinBatch { } } -pub trait NumOps: Sized + Copy + Send + Unpin + Zero + AsPrimitive + Bounded + PartialOrd + SubFrId {} -impl NumOps for T where T: Sized + Copy + Send + Unpin + Zero + AsPrimitive + Bounded + PartialOrd + SubFrId {} +pub trait NumOps: + Sized + Copy + Send + Unpin + Zero + AsPrimitive + Bounded + PartialOrd + SubFrId + DeserializeOwned +{ +} +impl NumOps for T where + T: Sized + Copy + Send + Unpin + Zero + AsPrimitive + Bounded + PartialOrd + SubFrId + DeserializeOwned +{ +} pub trait EventsDecoder { type Output; @@ -555,100 +561,83 @@ pub trait EventsDecoder { pub trait EventsNodeProcessor { type Input; - type Output; + type Output: Send + DeserializeOwned; fn process(inp: EventValues) -> Self::Output; } -pub struct NumEvents { - _m: PhantomData, -} -impl NumEvents { - pub fn new() -> Self { - Self { _m: PhantomData } - } +pub trait TimeBins: Send + Unpin + WithLen + Appendable { + fn ts1s(&self) -> &Vec; + fn ts2s(&self) -> &Vec; } -pub struct NumSingleXBin { - _m: PhantomData, -} -impl NumSingleXBin { - pub fn new() -> Self { - Self { _m: PhantomData } - } +pub trait EventsTimeBinner { + type Input; + type Output: TimeBins; + fn process(inp: Self::Input) -> Self::Output; } -pub struct NumEventsDecoder -where - END: Endianness, -{ - _m1: PhantomData, - _m2: PhantomData, +pub trait BinsTimeBinner { + type Input: TimeBins; + type Output: TimeBins; + fn process(inp: Self::Input) -> Self::Output; } -impl NumEventsDecoder -where - END: Endianness, -{ - pub fn new() -> Self { +pub struct MinMaxAvgBins { + ts1s: Vec, + ts2s: Vec, + counts: Vec, + mins: Vec>, + maxs: Vec>, + avgs: Vec>, +} + +impl MinMaxAvgBins { + pub fn empty() -> Self { Self { - _m1: PhantomData, - _m2: PhantomData, + ts1s: vec![], + ts2s: vec![], + counts: vec![], + mins: vec![], + maxs: vec![], + avgs: vec![], } } } -impl EventsDecoder for NumEventsDecoder +impl TimeBins for MinMaxAvgBins where - END: Endianness, + NTY: NumOps, { - type Output = NumEvents; - fn ingest(&mut self, _event: &[u8]) {} - fn result(&mut self) -> Self::Output { - err::todoval() + fn ts1s(&self) -> &Vec { + &self.ts1s + } + + fn ts2s(&self) -> &Vec { + &self.ts2s } } -pub trait BinnedPipeline { - type EventsDecoder: EventsDecoder; - type EventsNodeProcessor: EventsNodeProcessor; - fn events_decoder(&self) -> Self::EventsDecoder; - fn events_node_processor(&self) -> Self::EventsNodeProcessor; -} - -pub struct NumBinnedPipeline -where - END: Endianness, -{ - _m1: PhantomData, - _m2: PhantomData, - _m3: PhantomData, -} - -impl NumBinnedPipeline -where - END: Endianness, -{ - pub fn new() -> Self { - Self { - _m1: PhantomData, - _m2: PhantomData, - _m3: PhantomData, - } +impl WithLen for MinMaxAvgBins { + fn len(&self) -> usize { + self.ts1s.len() } } -impl BinnedPipeline for NumBinnedPipeline +impl Appendable for MinMaxAvgBins where - ENP: EventsNodeProcessor, - END: Endianness, + NTY: NumOps, { - type EventsDecoder = NumEventsDecoder; - type EventsNodeProcessor = ENP; - fn events_decoder(&self) -> Self::EventsDecoder { - todo!() + fn empty() -> Self { + Self::empty() } - fn events_node_processor(&self) -> Self::EventsNodeProcessor { - todo!() + + fn append(&mut self, src: &Self) { + self.ts1s.extend_from_slice(&src.ts1s); + self.ts2s.extend_from_slice(&src.ts2s); + self.counts.extend_from_slice(&src.counts); + self.mins.extend_from_slice(&src.mins); + self.maxs.extend_from_slice(&src.maxs); + self.avgs.extend_from_slice(&src.avgs); } } diff --git a/disk/src/binned/pbv.rs b/disk/src/binned/pbv.rs index 47ed4da..2425616 100644 --- a/disk/src/binned/pbv.rs +++ b/disk/src/binned/pbv.rs @@ -1,18 +1,25 @@ use crate::agg::streams::{Appendable, StreamItem}; use crate::binned::query::{CacheUsage, PreBinnedQuery}; -use crate::binned::{RangeCompletableItem, StreamKind, WithLen}; +use crate::binned::{ + BinnedStreamKindScalar, BinsTimeBinner, EventsNodeProcessor, EventsTimeBinner, NumOps, RangeCompletableItem, + ReadableFromFile, StreamKind, WithLen, +}; use crate::cache::pbvfs::PreBinnedScalarValueFetchedStream; use crate::cache::{write_pb_cache_min_max_avg_scalar, CacheFileDesc, MergedFromRemotes, WrittenPbCache}; +use crate::decode::{Endianness, EventValueFromBytes, EventValueShape, NumFromBytes}; use crate::frame::makeframe::{make_frame, FrameType}; use crate::raw::EventsQuery; use crate::streamlog::Streamlog; +use crate::Sitemty; use err::Error; use futures_core::Stream; use futures_util::{FutureExt, StreamExt}; use netpod::log::*; use netpod::{BinnedRange, NodeConfigCached, PerfOpts, PreBinnedPatchIterator, PreBinnedPatchRange}; +use serde::Serialize; use std::future::Future; use std::io; +use std::marker::PhantomData; use std::path::PathBuf; use std::pin::Pin; use std::task::{Context, Poll}; @@ -20,21 +27,19 @@ use tokio::fs::{File, OpenOptions}; //pub type SomeScc = netpod::streamext::SCC; -pub struct PreBinnedValueStream +pub struct PreBinnedValueStream where - SK: StreamKind, + NTY: NumOps + NumFromBytes + Serialize + 'static, + END: Endianness + 'static, + EVS: EventValueShape + EventValueFromBytes + 'static, + ENP: EventsNodeProcessor>::Output>, + ETB: EventsTimeBinner::Output>, + BTB: BinsTimeBinner::Output, Output = ::Output>, { query: PreBinnedQuery, node_config: NodeConfigCached, - open_check_local_file: Option> + Send>>>, - fut2: Option< - Pin< - Box< - dyn Stream::TBinnedBins>>, err::Error>> - + Send, - >, - >, - >, + open_check_local_file: Option> + Send>>>, + fut2: Option::Output>> + Send>>>, read_from_cache: bool, cache_written: bool, data_complete: bool, @@ -43,26 +48,27 @@ where errored: bool, completed: bool, streamlog: Streamlog, - values: ::TBinnedBins, + values: ::Output, write_fut: Option> + Send>>>, - read_cache_fut: Option< - Pin< - Box< - dyn Future< - Output = Result::TBinnedBins>>, err::Error>, - > + Send, - >, - >, - >, - stream_kind: SK, + read_cache_fut: Option::Output>> + Send>>>, + _m1: PhantomData, + _m2: PhantomData, + _m3: PhantomData, + _m4: PhantomData, + _m5: PhantomData, } -impl PreBinnedValueStream +impl PreBinnedValueStream where - SK: StreamKind, - Result::TBinnedBins>>, err::Error>: FrameType, + NTY: NumOps + NumFromBytes + Serialize + 'static, + END: Endianness + 'static, + EVS: EventValueShape + EventValueFromBytes + 'static, + ENP: EventsNodeProcessor>::Output>, + ETB: EventsTimeBinner::Output>, + BTB: BinsTimeBinner::Output, Output = ::Output>, + ::Output: Appendable, { - pub fn new(query: PreBinnedQuery, node_config: &NodeConfigCached, stream_kind: SK) -> Self { + pub fn new(query: PreBinnedQuery, node_config: &NodeConfigCached) -> Self { Self { query, node_config: node_config.clone(), @@ -76,27 +82,34 @@ where errored: false, completed: false, streamlog: Streamlog::new(node_config.ix as u32), - values: <::TBinnedBins as Appendable>::empty(), + values: <::Output as Appendable>::empty(), write_fut: None, read_cache_fut: None, - stream_kind, + _m1: PhantomData, + _m2: PhantomData, + _m3: PhantomData, + _m4: PhantomData, + _m5: PhantomData, } } // TODO handle errors also here via return type. - fn setup_merged_from_remotes(&mut self) { + fn setup_merged_from_remotes( + &mut self, + ) -> Result::Output>> + Send>>, Error> { let evq = EventsQuery { channel: self.query.channel().clone(), range: self.query.patch().patch_range(), agg_kind: self.query.agg_kind().clone(), }; if self.query.patch().patch_t_len() % self.query.patch().bin_t_len() != 0 { - error!( + let msg = format!( "Patch length inconsistency {} {}", self.query.patch().patch_t_len(), self.query.patch().bin_t_len() ); - return; + error!("{}", msg); + return Err(Error::with_msg(msg)); } // TODO do I need to set up more transformations or binning to deliver the requested data? let count = self.query.patch().patch_t_len() / self.query.patch().bin_t_len(); @@ -105,17 +118,25 @@ where .ok_or(Error::with_msg("covering_range returns None")) .unwrap(); let perf_opts = PerfOpts { inmem_bufcap: 512 }; - let s1 = MergedFromRemotes::new( + + // TODO copy the MergedFromRemotes and adapt... + /*let s1 = MergedFromRemotes::new( evq, perf_opts, self.node_config.node_config.cluster.clone(), - self.stream_kind.clone(), - ); - let s1 = ::xbinned_to_tbinned(s1, range); - self.fut2 = Some(Box::pin(s1)); + .........., + );*/ + let s1: MergedFromRemotes = err::todoval(); + + // TODO + //let s1 = todo_convert_stream_to_tbinned_stream(s1, range); + Ok(err::todoval()) } - fn setup_from_higher_res_prebinned(&mut self, range: PreBinnedPatchRange) { + fn setup_from_higher_res_prebinned( + &mut self, + range: PreBinnedPatchRange, + ) -> Result::Output>> + Send>>, Error> { let g = self.query.patch().bin_t_len(); let h = range.grid_spec.bin_t_len(); trace!( @@ -127,16 +148,16 @@ where range, ); if g / h <= 1 { - error!("try_setup_fetch_prebinned_higher_res g {} h {}", g, h); - return; + let msg = format!("try_setup_fetch_prebinned_higher_res g {} h {}", g, h); + return Err(Error::with_msg(msg)); } if g / h > 1024 * 10 { - error!("try_setup_fetch_prebinned_higher_res g {} h {}", g, h); - return; + let msg = format!("try_setup_fetch_prebinned_higher_res g {} h {}", g, h); + return Err(Error::with_msg(msg)); } if g % h != 0 { - error!("try_setup_fetch_prebinned_higher_res g {} h {}", g, h); - return; + let msg = format!("try_setup_fetch_prebinned_higher_res g {} h {}", g, h); + return Err(Error::with_msg(msg)); } let node_config = self.node_config.clone(); let patch_it = PreBinnedPatchIterator::from_range(range); @@ -144,7 +165,6 @@ where .map({ let q2 = self.query.clone(); let disk_stats_every = self.query.disk_stats_every().clone(); - let stream_kind = self.stream_kind.clone(); let report_error = self.query.report_error(); move |patch| { let query = PreBinnedQuery::new( @@ -155,7 +175,10 @@ where disk_stats_every.clone(), report_error, ); - PreBinnedScalarValueFetchedStream::new(&query, &node_config, &stream_kind) + // TODO copy and adapt PreBinnedScalarValueFetchedStream + //PreBinnedScalarValueFetchedStream::new(&query, &node_config, &stream_kind) + let tmp: PreBinnedScalarValueFetchedStream = err::todoval(); + Ok(tmp) } }) .map(|k| { @@ -166,17 +189,17 @@ where s }) .flatten(); - self.fut2 = Some(Box::pin(s)); + Err(err::todoval()) } fn try_setup_fetch_prebinned_higher_res(&mut self) -> Result<(), Error> { let range = self.query.patch().patch_range(); match PreBinnedPatchRange::covering_range(range, self.query.patch().bin_count() + 1) { Ok(Some(range)) => { - self.setup_from_higher_res_prebinned(range); + self.fut2 = Some(self.setup_from_higher_res_prebinned(range)?); } Ok(None) => { - self.setup_merged_from_remotes(); + self.fut2 = Some(self.setup_merged_from_remotes()?); } Err(e) => return Err(e), } @@ -184,12 +207,17 @@ where } } -impl Stream for PreBinnedValueStream +impl Stream for PreBinnedValueStream where - SK: StreamKind + Unpin, - Result::TBinnedBins>>, err::Error>: FrameType, + NTY: NumOps + NumFromBytes + Serialize + Unpin + 'static, + END: Endianness + Unpin + 'static, + EVS: EventValueShape + EventValueFromBytes + Unpin + 'static, + ENP: EventsNodeProcessor>::Output> + Unpin, + ETB: EventsTimeBinner::Output> + Unpin, + BTB: BinsTimeBinner::Output, Output = ::Output>, + ::Output: Serialize + ReadableFromFile + 'static, { - type Item = Result::TBinnedBins>>, err::Error>; + type Item = Sitemty<::Output>; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; @@ -263,10 +291,8 @@ where self.values.len(), ); self.streamlog.append(Level::INFO, msg); - let values = std::mem::replace( - &mut self.values, - <::TBinnedBins as Appendable>::empty(), - ); + let emp = <::Output as Appendable>::empty(); + let values = std::mem::replace(&mut self.values, emp); let fut = write_pb_cache_min_max_avg_scalar( values, self.query.patch().clone(), @@ -318,7 +344,7 @@ where match item { Ok(file) => { self.read_from_cache = true; - let fut = <::TBinnedBins as crate::binned::ReadableFromFile>::read_from_file(file)?; + let fut = <::Output as ReadableFromFile>::read_from_file(file)?; self.read_cache_fut = Some(Box::pin(fut)); continue 'outer; } diff --git a/disk/src/binned/pbv2.rs b/disk/src/binned/pbv2.rs index 190b4a1..3e5e368 100644 --- a/disk/src/binned/pbv2.rs +++ b/disk/src/binned/pbv2.rs @@ -152,14 +152,17 @@ where .ok_or(Error::with_msg("covering_range returns None")) .unwrap(); let perf_opts = PerfOpts { inmem_bufcap: 512 }; - let s1 = MergedFromRemotes::new( + // TODO remove whole mod after refactor + /*let s1 = MergedFromRemotes::new( evq, perf_opts, self.node_config.node_config.cluster.clone(), self.stream_kind.clone(), ); let s1 = ::xbinned_to_tbinned(s1, range); - self.fut2 = Some(Box::pin(s1)); + self.fut2 = Some(Box::pin(s1));*/ + err::todo(); + self.fut2 = None; } fn setup_from_higher_res_prebinned(&mut self, range: PreBinnedPatchRange) { diff --git a/disk/src/binned/prebinned.rs b/disk/src/binned/prebinned.rs index 7323975..81d4275 100644 --- a/disk/src/binned/prebinned.rs +++ b/disk/src/binned/prebinned.rs @@ -1,7 +1,11 @@ +use crate::agg::binnedt4::{DefaultBinsTimeBinner, DefaultScalarEventsTimeBinner, DefaultSingleXBinTimeBinner}; +use crate::agg::enp::{Identity, WaveXBinner}; use crate::agg::streams::StreamItem; -use crate::binned::pbv2::{pre_binned_value_byte_stream_new, PreBinnedValueByteStream, PreBinnedValueStream}; +use crate::binned::pbv2::{ + pre_binned_value_byte_stream_new, PreBinnedValueByteStream, PreBinnedValueByteStreamInner, PreBinnedValueStream, +}; use crate::binned::query::PreBinnedQuery; -use crate::binned::{EventsNodeProcessor, NumOps, RangeCompletableItem, StreamKind}; +use crate::binned::{BinsTimeBinner, EventsNodeProcessor, EventsTimeBinner, NumOps, RangeCompletableItem, StreamKind}; use crate::cache::node_ix_for_patch; use crate::decode::{ BigEndian, Endianness, EventValueFromBytes, EventValueShape, EventValuesDim0Case, EventValuesDim1Case, @@ -9,37 +13,54 @@ use crate::decode::{ }; use crate::frame::makeframe::{Framable, FrameType}; use crate::Sitemty; +use bytes::Bytes; use err::Error; use futures_core::Stream; +use futures_util::StreamExt; +use netpod::streamext::SCC; use netpod::{ByteOrder, NodeConfigCached, ScalarType, Shape}; use parse::channelconfig::{extract_matching_config_entry, read_local_config, MatchingConfigEntry}; +use serde::Serialize; use std::pin::Pin; // TODO instead of EventNodeProcessor, use a T-binning processor here // TODO might also want another stateful processor which can run on the merged event stream, like smoothing. -fn make_num_pipeline_nty_end_evs_enp( +fn make_num_pipeline_nty_end_evs_enp( event_value_shape: EVS, ) -> Pin> + Send>> where - NTY: NumOps + NumFromBytes + 'static, + NTY: NumOps + NumFromBytes + Serialize + 'static, END: Endianness + 'static, EVS: EventValueShape + EventValueFromBytes + 'static, ENP: EventsNodeProcessor>::Output>, + ETB: EventsTimeBinner::Output>, Sitemty<::Output>: Framable + 'static, ::Output: 'static, { + // TODO + // Use the pre-binned fetch machinery, refactored... err::todoval() } fn make_num_pipeline_nty_end(shape: Shape) -> Pin> + Send>> where - NTY: NumOps + NumFromBytes + 'static, + NTY: NumOps + NumFromBytes + Serialize + 'static, END: Endianness + 'static, { + // TODO pass all the correct types. + err::todo(); match shape { - Shape::Scalar => make_num_pipeline_nty_end_evs_enp::>(EventValuesDim0Case::new()), - Shape::Wave(n) => make_num_pipeline_nty_end_evs_enp::>(EventValuesDim1Case::new(n)), + Shape::Scalar => { + make_num_pipeline_nty_end_evs_enp::, DefaultScalarEventsTimeBinner>( + EventValuesDim0Case::new(), + ) + } + Shape::Wave(n) => { + make_num_pipeline_nty_end_evs_enp::, DefaultSingleXBinTimeBinner>( + EventValuesDim1Case::new(n), + ) + } } } @@ -59,15 +80,17 @@ fn make_num_pipeline( ) -> Pin> + Send>> { match scalar_type { ScalarType::I32 => match_end!(i32, byte_order, shape), + ScalarType::F32 => match_end!(f32, byte_order, shape), _ => todo!(), } } +// TODO after the refactor, return direct value instead of boxed. pub async fn pre_binned_bytes_for_http( node_config: &NodeConfigCached, query: &PreBinnedQuery, stream_kind: SK, -) -> Result, Error> +) -> Result> + Send>>, Error> where SK: StreamKind, Result>, err::Error>: FrameType, @@ -101,15 +124,21 @@ where Err(e) => return Err(e), }; - // TODO enable - if false { - // TODO - // Decide here analogue to conn in some steps which generic pipeline we use. - - PreBinnedValueStream::new(query.clone(), node_config, stream_kind.clone()); - err::todoval() + if true { + let ret = make_num_pipeline( + entry.scalar_type.clone(), + entry.byte_order.clone(), + entry.to_shape().unwrap(), + ) + .map(|item| match item.make_frame() { + Ok(item) => Ok(item.freeze()), + Err(e) => Err(e), + }); + let ret = Box::pin(ret); + Ok(ret) } else { let ret = pre_binned_value_byte_stream_new(query, node_config, stream_kind); + let ret = Box::pin(ret); Ok(ret) } } diff --git a/disk/src/cache.rs b/disk/src/cache.rs index 302274d..12229f0 100644 --- a/disk/src/cache.rs +++ b/disk/src/cache.rs @@ -1,7 +1,9 @@ use crate::agg::streams::StreamItem; use crate::binned::{RangeCompletableItem, StreamKind}; +use crate::frame::makeframe::FrameType; use crate::merge::MergedStream; -use crate::raw::EventsQuery; +use crate::raw::{x_processed_stream_from_node, EventsQuery}; +use crate::Sitemty; use bytes::Bytes; use chrono::Utc; use err::Error; @@ -86,6 +88,7 @@ impl AsyncRead for HttpBodyAsAsyncRead { type T001 = Pin, Error>> + Send>>; type T002 = Pin, Error>> + Send>>; +// TODO remove after refactoring. pub struct MergedFromRemotes where SK: StreamKind, @@ -100,16 +103,12 @@ where impl MergedFromRemotes where SK: StreamKind, + Sitemty<::XBinnedEvents>: FrameType, { pub fn new(evq: EventsQuery, perf_opts: PerfOpts, cluster: Cluster, stream_kind: SK) -> Self { let mut tcp_establish_futs = vec![]; for node in &cluster.nodes { - let f = super::raw::x_processed_stream_from_node( - evq.clone(), - perf_opts.clone(), - node.clone(), - stream_kind.clone(), - ); + let f = x_processed_stream_from_node(evq.clone(), perf_opts.clone(), node.clone(), stream_kind.clone()); let f: T002::XBinnedEvents>> = Box::pin(f); tcp_establish_futs.push(f); } diff --git a/disk/src/decode.rs b/disk/src/decode.rs index 8ad535c..396b758 100644 --- a/disk/src/decode.rs +++ b/disk/src/decode.rs @@ -245,6 +245,10 @@ where type NumXAggToNBins = ProcBB; } +// TODO why not Serialize? +// TODO add pulse. +// TODO change name, it's not only about values, but more like batch of whole events. +#[derive(Serialize, Deserialize)] pub struct EventValues { pub tss: Vec, pub values: Vec, diff --git a/disk/src/frame/makeframe.rs b/disk/src/frame/makeframe.rs index 2972fff..5fb7569 100644 --- a/disk/src/frame/makeframe.rs +++ b/disk/src/frame/makeframe.rs @@ -1,41 +1,20 @@ +use crate::agg::enp::XBinnedScalarEvents; use crate::agg::eventbatch::MinMaxAvgScalarEventBatch; use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch; use crate::agg::streams::StreamItem; -use crate::binned::RangeCompletableItem; -use crate::decode::MinMaxAvgScalarEventBatchGen; +use crate::binned::{NumOps, RangeCompletableItem}; +use crate::decode::{EventValues, MinMaxAvgScalarEventBatchGen}; use crate::frame::inmem::InMemoryFrame; use crate::raw::EventQueryJsonStringFrame; use bytes::{BufMut, BytesMut}; use err::Error; use serde::{de::DeserializeOwned, Serialize}; +pub const INMEM_FRAME_ENCID: u32 = 0x12121212; pub const INMEM_FRAME_HEAD: usize = 20; pub const INMEM_FRAME_FOOT: usize = 4; pub const INMEM_FRAME_MAGIC: u32 = 0xc6c3b73d; -pub trait FrameType { - const FRAME_TYPE_ID: u32; -} - -impl FrameType for EventQueryJsonStringFrame { - const FRAME_TYPE_ID: u32 = 0x03; -} - -impl FrameType for Result>, Error> { - const FRAME_TYPE_ID: u32 = 0x10; -} - -impl FrameType for Result>, Error> { - const FRAME_TYPE_ID: u32 = 0x11; -} - -impl FrameType for Result>>, Error> -where - NTY: SubFrId, -{ - const FRAME_TYPE_ID: u32 = 0x28c4a100 + NTY::SUB; -} - pub trait SubFrId { const SUB: u32; } @@ -80,6 +59,43 @@ impl SubFrId for f64 { const SUB: u32 = 12; } +pub trait FrameType { + const FRAME_TYPE_ID: u32; +} + +impl FrameType for EventQueryJsonStringFrame { + const FRAME_TYPE_ID: u32 = 0x100; +} + +impl FrameType for Result>, Error> { + const FRAME_TYPE_ID: u32 = 0x200; +} + +impl FrameType for Result>, Error> { + const FRAME_TYPE_ID: u32 = 0x300; +} + +impl FrameType for Result>>, Error> +where + NTY: SubFrId, +{ + const FRAME_TYPE_ID: u32 = 0x400 + NTY::SUB; +} + +impl FrameType for Result>>, Error> +where + NTY: SubFrId, +{ + const FRAME_TYPE_ID: u32 = 0x500 + NTY::SUB; +} + +impl FrameType for Result>>, Error> +where + NTY: SubFrId, +{ + const FRAME_TYPE_ID: u32 = 0x600 + NTY::SUB; +} + pub trait ProvidesFrameType { fn frame_type_id(&self) -> u32; } @@ -100,6 +116,24 @@ impl Framable for Result Framable for Result>>, err::Error> +where + NTY: NumOps + Serialize, +{ + fn make_frame(&self) -> Result { + make_frame(self) + } +} + +impl Framable for Result>>, err::Error> +where + NTY: NumOps + Serialize, +{ + fn make_frame(&self) -> Result { + make_frame(self) + } +} + pub fn make_frame(item: &FT) -> Result where FT: FrameType + Serialize, @@ -112,10 +146,9 @@ where let mut h = crc32fast::Hasher::new(); h.update(&enc); let payload_crc = h.finalize(); - let encid = 0x12121212; let mut buf = BytesMut::with_capacity(enc.len() + INMEM_FRAME_HEAD); buf.put_u32_le(INMEM_FRAME_MAGIC); - buf.put_u32_le(encid); + buf.put_u32_le(INMEM_FRAME_ENCID); buf.put_u32_le(FT::FRAME_TYPE_ID); buf.put_u32_le(enc.len() as u32); buf.put_u32_le(payload_crc); @@ -134,10 +167,9 @@ pub fn make_term_frame() -> BytesMut { let mut h = crc32fast::Hasher::new(); h.update(&[]); let payload_crc = h.finalize(); - let encid = 0x12121313; let mut buf = BytesMut::with_capacity(INMEM_FRAME_HEAD); buf.put_u32_le(INMEM_FRAME_MAGIC); - buf.put_u32_le(encid); + buf.put_u32_le(INMEM_FRAME_ENCID); buf.put_u32_le(0x01); buf.put_u32_le(0); buf.put_u32_le(payload_crc); @@ -150,15 +182,16 @@ pub fn make_term_frame() -> BytesMut { pub fn decode_frame(frame: &InMemoryFrame, frame_type: u32) -> Result where - T: DeserializeOwned, + T: FrameType + DeserializeOwned, { - if frame.encid() != 0x12121212 { + if frame.encid() != INMEM_FRAME_ENCID { return Err(Error::with_msg(format!("unknown encoder id {:?}", frame))); } - if frame.tyid() != frame_type { + if frame.tyid() != ::FRAME_TYPE_ID { return Err(Error::with_msg(format!( - "type id mismatch expect {} found {:?}", - frame_type, frame + "type id mismatch expect {} found {:?}", + ::FRAME_TYPE_ID, + frame ))); } if frame.len() as usize != frame.buf().len() { diff --git a/disk/src/merge.rs b/disk/src/merge.rs index db62d3a..f094779 100644 --- a/disk/src/merge.rs +++ b/disk/src/merge.rs @@ -10,6 +10,8 @@ use std::collections::VecDeque; use std::pin::Pin; use std::task::{Context, Poll}; +pub mod mergefromremote; + enum MergedCurVal { None, Finish, diff --git a/disk/src/merge/mergefromremote.rs b/disk/src/merge/mergefromremote.rs new file mode 100644 index 0000000..65d4855 --- /dev/null +++ b/disk/src/merge/mergefromremote.rs @@ -0,0 +1,120 @@ +use crate::binned::EventsNodeProcessor; +use crate::frame::makeframe::FrameType; +use crate::raw::{x_processed_stream_from_node2, EventsQuery}; +use crate::Sitemty; +use err::Error; +use futures_core::Stream; +use futures_util::{pin_mut, StreamExt}; +use netpod::{Cluster, PerfOpts}; +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; + +type T001 = Pin> + Send>>; +type T002 = Pin, Error>> + Send>>; + +pub struct MergedFromRemotes2 +where + ENP: EventsNodeProcessor, +{ + tcp_establish_futs: Vec::Output>>, + nodein: Vec::Output>>>, + merged: Option::Output>>, + completed: bool, + errored: bool, +} + +impl MergedFromRemotes2 +where + ENP: EventsNodeProcessor + 'static, + ::Output: 'static, + ::Output: Unpin, + Sitemty<::Output>: FrameType, +{ + pub fn new(evq: EventsQuery, perf_opts: PerfOpts, cluster: Cluster, stream_kind: ENP) -> Self { + let mut tcp_establish_futs = vec![]; + for node in &cluster.nodes { + let f = x_processed_stream_from_node2::(evq.clone(), perf_opts.clone(), node.clone()); + let f: T002<::Output> = Box::pin(f); + tcp_establish_futs.push(f); + } + let n = tcp_establish_futs.len(); + Self { + tcp_establish_futs, + nodein: (0..n).into_iter().map(|_| None).collect(), + merged: None, + completed: false, + errored: false, + } + } +} + +impl Stream for MergedFromRemotes2 +where + ENP: EventsNodeProcessor, +{ + type Item = Sitemty<::Output>; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + use Poll::*; + 'outer: loop { + break if self.completed { + panic!("MergedFromRemotes poll_next on completed"); + } else if self.errored { + self.completed = true; + return Ready(None); + } else if let Some(fut) = &mut self.merged { + match fut.poll_next_unpin(cx) { + Ready(Some(Ok(k))) => Ready(Some(Ok(k))), + Ready(Some(Err(e))) => { + self.errored = true; + Ready(Some(Err(e))) + } + Ready(None) => { + self.completed = true; + Ready(None) + } + Pending => Pending, + } + } else { + let mut pend = false; + let mut c1 = 0; + for i1 in 0..self.tcp_establish_futs.len() { + if self.nodein[i1].is_none() { + let f = &mut self.tcp_establish_futs[i1]; + pin_mut!(f); + match f.poll(cx) { + Ready(Ok(k)) => { + self.nodein[i1] = Some(k); + } + Ready(Err(e)) => { + self.errored = true; + return Ready(Some(Err(e))); + } + Pending => { + pend = true; + } + } + } else { + c1 += 1; + } + } + if pend { + Pending + } else { + if c1 == self.tcp_establish_futs.len() { + let inps: Vec<_> = self.nodein.iter_mut().map(|k| k.take().unwrap()).collect(); + //let s1 = MergedStream::<_, ENP>::new(inps); + + // TODO + + err::todo(); + //let s1 = err::todoval(); + //self.merged = Some(Box::pin(s1)); + } + continue 'outer; + } + }; + } + } +} diff --git a/disk/src/raw.rs b/disk/src/raw.rs index 8fb6c6f..3bee2b0 100644 --- a/disk/src/raw.rs +++ b/disk/src/raw.rs @@ -6,10 +6,11 @@ to request such data from nodes. */ use crate::agg::streams::StreamItem; -use crate::binned::{RangeCompletableItem, StreamKind}; +use crate::binned::{EventsNodeProcessor, RangeCompletableItem, StreamKind}; use crate::frame::inmem::InMemoryFrameAsyncReadStream; -use crate::frame::makeframe::{make_frame, make_term_frame}; +use crate::frame::makeframe::{make_frame, make_term_frame, FrameType}; use crate::raw::bffr::EventsFromFrames; +use crate::Sitemty; use err::Error; use futures_core::Stream; use netpod::{AggKind, Channel, NanoRange, Node, PerfOpts}; @@ -36,6 +37,7 @@ pub struct EventsQuery { #[derive(Serialize, Deserialize)] pub struct EventQueryJsonStringFrame(String); +// TODO remove after refactor. pub async fn x_processed_stream_from_node( query: EventsQuery, perf_opts: PerfOpts, @@ -52,6 +54,7 @@ pub async fn x_processed_stream_from_node( > where SK: StreamKind, + Result::XBinnedEvents>>, err::Error>: FrameType, { let net = TcpStream::connect(format!("{}:{}", node.host, node.port_raw)).await?; let qjs = serde_json::to_string(&query)?; @@ -63,7 +66,32 @@ where netout.flush().await?; netout.forget(); let frames = InMemoryFrameAsyncReadStream::new(netin, perf_opts.inmem_bufcap); - let items = EventsFromFrames::new(frames, stream_kind); + //let items = EventsFromFrames::new(frames); + //Ok(Box::pin(items)) + Ok(err::todoval()) +} + +pub async fn x_processed_stream_from_node2( + query: EventsQuery, + perf_opts: PerfOpts, + node: Node, +) -> Result::Output>> + Send>>, Error> +where + ENP: EventsNodeProcessor, + ::Output: Unpin + 'static, + Result::Output>>, err::Error>: FrameType, +{ + let net = TcpStream::connect(format!("{}:{}", node.host, node.port_raw)).await?; + let qjs = serde_json::to_string(&query)?; + let (netin, mut netout) = net.into_split(); + let buf = make_frame(&EventQueryJsonStringFrame(qjs))?; + netout.write_all(&buf).await?; + let buf = make_term_frame(); + netout.write_all(&buf).await?; + netout.flush().await?; + netout.forget(); + let frames = InMemoryFrameAsyncReadStream::new(netin, perf_opts.inmem_bufcap); + let items = EventsFromFrames::new(frames); Ok(Box::pin(items)) } diff --git a/disk/src/raw/bffr.rs b/disk/src/raw/bffr.rs index 1c11113..7fb8919 100644 --- a/disk/src/raw/bffr.rs +++ b/disk/src/raw/bffr.rs @@ -1,49 +1,53 @@ use crate::agg::streams::StreamItem; use crate::binned::{RangeCompletableItem, StreamKind, XBinnedEvents}; use crate::frame::inmem::InMemoryFrameAsyncReadStream; -use crate::frame::makeframe::decode_frame; +use crate::frame::makeframe::{decode_frame, FrameType}; +use crate::Sitemty; use err::Error; use futures_core::Stream; use futures_util::StreamExt; use netpod::log::*; +use serde::de::DeserializeOwned; +use std::marker::PhantomData; use std::pin::Pin; use std::task::{Context, Poll}; use tokio::io::AsyncRead; -pub struct EventsFromFrames +// TODO remove usage of SK, no longer needed. +pub struct EventsFromFrames where T: AsyncRead + Unpin, - SK: StreamKind, { inp: InMemoryFrameAsyncReadStream, errored: bool, completed: bool, - _stream_kind: SK, + _m2: PhantomData, } -impl EventsFromFrames +impl EventsFromFrames where T: AsyncRead + Unpin, - SK: StreamKind, { - pub fn new(inp: InMemoryFrameAsyncReadStream, stream_kind: SK) -> Self { + pub fn new(inp: InMemoryFrameAsyncReadStream) -> Self { Self { inp, errored: false, completed: false, - _stream_kind: stream_kind, + _m2: PhantomData, } } } -impl Stream for EventsFromFrames +impl Stream for EventsFromFrames where T: AsyncRead + Unpin, - SK: StreamKind, + //SK: StreamKind, + I: DeserializeOwned + Unpin, // TODO see binned.rs better to express it on trait? //Result::XBinnedEvents>>, Error>: FrameType, + Sitemty: FrameType, { - type Item = Result::XBinnedEvents>>, Error>; + type Item = Sitemty; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; @@ -58,30 +62,23 @@ where Ready(Some(Ok(item))) => match item { StreamItem::Log(item) => Ready(Some(Ok(StreamItem::Log(item)))), StreamItem::Stats(item) => Ready(Some(Ok(StreamItem::Stats(item)))), - StreamItem::DataItem(frame) => { - match decode_frame::< - Result::XBinnedEvents>>, Error>, - >( - &frame, - <::XBinnedEvents as XBinnedEvents>::frame_type(), - ) { - Ok(item) => match item { - Ok(item) => Ready(Some(Ok(item))), - Err(e) => { - self.errored = true; - Ready(Some(Err(e))) - } - }, + StreamItem::DataItem(frame) => match decode_frame::>(&frame, 0) { + Ok(item) => match item { + Ok(item) => Ready(Some(Ok(item))), Err(e) => { - error!( - "EventsFromFrames ~~~~~~~~ ERROR on frame payload {}", - frame.buf().len(), - ); self.errored = true; Ready(Some(Err(e))) } + }, + Err(e) => { + error!( + "EventsFromFrames ~~~~~~~~ ERROR on frame payload {}", + frame.buf().len(), + ); + self.errored = true; + Ready(Some(Err(e))) } - } + }, }, Ready(Some(Err(e))) => { self.errored = true; diff --git a/disk/src/raw/conn.rs b/disk/src/raw/conn.rs index 313c972..c578086 100644 --- a/disk/src/raw/conn.rs +++ b/disk/src/raw/conn.rs @@ -2,9 +2,7 @@ use crate::agg::binnedx::IntoBinnedXBins1; use crate::agg::eventbatch::MinMaxAvgScalarEventBatch; use crate::agg::streams::StreamItem; use crate::agg::IntoDim1F32Stream; -use crate::binned::{ - BinnedStreamKindScalar, EventsNodeProcessor, NumBinnedPipeline, NumOps, RangeCompletableItem, StreamKind, -}; +use crate::binned::{BinnedStreamKindScalar, EventsNodeProcessor, NumOps, RangeCompletableItem, StreamKind}; use crate::decode::{ BigEndian, Endianness, EventValueFromBytes, EventValueShape, EventValuesDim0Case, EventValuesDim1Case, EventsDecodedStream, LittleEndian, NumFromBytes, @@ -111,7 +109,6 @@ where Sitemty<::Output>: Framable + 'static, ::Output: 'static, { - NumBinnedPipeline::::new(); let decs = EventsDecodedStream::::new(event_value_shape, event_blobs); let s2 = StreamExt::map(decs, |item| match item { Ok(item) => match item {