From 908207b71b2582bb19a3d6da103e90bdfd79f5be Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Mon, 7 Jun 2021 20:36:41 +0200 Subject: [PATCH] WIP on adding merger impls, it checks --- disk/src/agg/binnedt4.rs | 88 ++++++------ disk/src/agg/enp.rs | 138 +++++++++++++++++- disk/src/binned.rs | 117 ++++++++++++++- disk/src/binned/pbv.rs | 70 ++++----- disk/src/binned/prebinned.rs | 53 ++++--- disk/src/decode.rs | 138 +++++++++++------- disk/src/frame/makeframe.rs | 29 +++- disk/src/merge.rs | 229 +++++++++++++++++++++++++++++- disk/src/merge/mergefromremote.rs | 18 ++- 9 files changed, 707 insertions(+), 173 deletions(-) diff --git a/disk/src/agg/binnedt4.rs b/disk/src/agg/binnedt4.rs index cc512ee..2fa6cb7 100644 --- a/disk/src/agg/binnedt4.rs +++ b/disk/src/agg/binnedt4.rs @@ -2,8 +2,12 @@ use crate::agg::enp::XBinnedScalarEvents; use crate::agg::eventbatch::MinMaxAvgScalarEventBatch; use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch; use crate::agg::streams::StreamItem; -use crate::binned::{BinsTimeBinner, EventsTimeBinner, MinMaxAvgBins, NumOps, RangeCompletableItem, RangeOverlapInfo}; +use crate::binned::{ + BinsTimeBinner, EventsTimeBinner, EventsTimeBinnerAggregator, MinMaxAvgAggregator, MinMaxAvgBins, NumOps, + RangeCompletableItem, RangeOverlapInfo, SingleXBinAggregator, +}; use crate::decode::EventValues; +use crate::Sitemty; use err::Error; use futures_core::Stream; use futures_util::StreamExt; @@ -24,9 +28,10 @@ where { type Input = EventValues; type Output = MinMaxAvgBins; + type Aggregator = MinMaxAvgAggregator; - fn process(inp: Self::Input) -> Self::Output { - todo!() + fn aggregator(range: NanoRange) -> Self::Aggregator { + Self::Aggregator::new(range) } } @@ -39,11 +44,13 @@ where NTY: NumOps, { type Input = XBinnedScalarEvents; - // TODO is that output type good enough for now? + // TODO is that output type good enough for now? Maybe better with a new one also + // to distinguish from the earlier one. type Output = MinMaxAvgBins; + type Aggregator = SingleXBinAggregator; - fn process(inp: Self::Input) -> Self::Output { - todo!() + fn aggregator(range: NanoRange) -> Self::Aggregator { + Self::Aggregator::new(range) } } @@ -134,54 +141,49 @@ impl Agg3 { } } -impl Aggregator3Tdim for Agg3 { - type InputValue = MinMaxAvgScalarEventBatch; - type OutputValue = MinMaxAvgScalarBinBatch; -} - -pub struct BinnedT3Stream { - // TODO get rid of box: - inp: Pin>, Error>> + Send>>, - //aggtor: Option<<::XBinnedEvents as AggregatableTdim>::Aggregator>, - aggtor: Option, +pub struct TBinnerStream +where + S: Stream::Input>> + Send + Unpin + 'static, + ETB: EventsTimeBinner + Send + Unpin + 'static, +{ + inp: S, spec: BinnedRange, curbin: u32, + left: Option::Input>>>>, + aggtor: Option<::Aggregator>, + tmp_agg_results: VecDeque<<::Aggregator as EventsTimeBinnerAggregator>::Output>, inp_completed: bool, all_bins_emitted: bool, range_complete_observed: bool, range_complete_emitted: bool, - left: Option>, Error>>>>, errored: bool, completed: bool, - tmp_agg_results: VecDeque, } -impl BinnedT3Stream { - pub fn new(inp: S, spec: BinnedRange) -> Self - where - S: Stream>, Error>> + Send + 'static, - { +impl TBinnerStream +where + S: Stream::Input>> + Send + Unpin + 'static, + ETB: EventsTimeBinner, +{ + pub fn new(inp: S, spec: BinnedRange) -> Self { let range = spec.get_range(0); Self { - inp: Box::pin(inp), - aggtor: Some(Agg3::new(range)), + inp, spec, curbin: 0, + left: None, + aggtor: Some(::aggregator(range)), + tmp_agg_results: VecDeque::new(), inp_completed: false, all_bins_emitted: false, range_complete_observed: false, range_complete_emitted: false, - left: None, errored: false, completed: false, - tmp_agg_results: VecDeque::new(), } } - fn cur( - &mut self, - cx: &mut Context, - ) -> Poll>, Error>>> { + fn cur(&mut self, cx: &mut Context) -> Poll::Input>>> { if let Some(cur) = self.left.take() { cur } else if self.inp_completed { @@ -197,11 +199,13 @@ impl BinnedT3Stream { let range = self.spec.get_range(self.curbin); let ret = self .aggtor - .replace(Agg3::new(range)) + .replace(::aggregator(range)) // TODO handle None case, or remove Option if Agg is always present .unwrap() .result(); - self.tmp_agg_results = VecDeque::from(ret); + // TODO should we accumulate bins before emit? Maybe not, we want to stay responsive. + // Only if the frequency would be high, that would require cpu time checks. Worth it? Measure.. + self.tmp_agg_results.push_back(ret); if self.curbin >= self.spec.count as u32 { self.all_bins_emitted = true; } @@ -209,8 +213,8 @@ impl BinnedT3Stream { fn handle( &mut self, - cur: Poll>, Error>>>, - ) -> Option>, Error>>>> { + cur: Poll::Input>>>, + ) -> Option::Output>>>> { use Poll::*; match cur { Ready(Some(Ok(item))) => match item { @@ -228,9 +232,9 @@ impl BinnedT3Stream { None } else { let ag = self.aggtor.as_mut().unwrap(); - if item.ends_before(ag.range.clone()) { + if item.ends_before(ag.range().clone()) { None - } else if item.starts_after(ag.range.clone()) { + } else if item.starts_after(ag.range().clone()) { self.left = Some(Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(item)))))); self.cycle_current_bin(); @@ -238,7 +242,7 @@ impl BinnedT3Stream { None } else { ag.ingest(&item); - if item.ends_after(ag.range.clone()) { + if item.ends_after(ag.range().clone()) { self.left = Some(Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(item)))))); self.cycle_current_bin(); @@ -269,8 +273,12 @@ impl BinnedT3Stream { } } -impl Stream for BinnedT3Stream { - type Item = Result>, Error>; +impl Stream for TBinnerStream +where + S: Stream::Input>> + Send + Unpin + 'static, + ETB: EventsTimeBinner + Send + Unpin + 'static, +{ + type Item = Sitemty<::Output>; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; diff --git a/disk/src/agg/enp.rs b/disk/src/agg/enp.rs index 5bbf96f..1fa6fb7 100644 --- a/disk/src/agg/enp.rs +++ b/disk/src/agg/enp.rs @@ -1,5 +1,7 @@ -use crate::binned::{EventsNodeProcessor, NumOps}; +use crate::agg::streams::Appendable; +use crate::binned::{EventsNodeProcessor, NumOps, PushableIndex, RangeOverlapInfo, WithLen, WithTimestamps}; use crate::decode::EventValues; +use netpod::NanoRange; use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize}; use std::marker::PhantomData; @@ -29,6 +31,83 @@ pub struct XBinnedScalarEvents { xbincount: Vec, } +impl XBinnedScalarEvents { + pub fn empty() -> Self { + Self { + tss: vec![], + mins: vec![], + maxs: vec![], + avgs: vec![], + xbincount: vec![], + } + } +} + +impl WithLen for XBinnedScalarEvents { + fn len(&self) -> usize { + self.tss.len() + } +} + +impl WithTimestamps for XBinnedScalarEvents { + fn ts(&self, ix: usize) -> u64 { + self.tss[ix] + } +} + +impl RangeOverlapInfo for XBinnedScalarEvents { + fn ends_before(&self, range: NanoRange) -> bool { + match self.tss.last() { + Some(&ts) => ts < range.beg, + None => true, + } + } + + fn ends_after(&self, range: NanoRange) -> bool { + match self.tss.last() { + Some(&ts) => ts >= range.end, + None => panic!(), + } + } + + fn starts_after(&self, range: NanoRange) -> bool { + match self.tss.first() { + Some(&ts) => ts >= range.end, + None => panic!(), + } + } +} + +impl PushableIndex for XBinnedScalarEvents +where + NTY: NumOps, +{ + fn push_index(&mut self, src: &Self, ix: usize) { + self.tss.push(src.tss[ix]); + self.xbincount.push(src.xbincount[ix]); + self.mins.push(src.mins[ix]); + self.maxs.push(src.maxs[ix]); + self.avgs.push(src.avgs[ix]); + } +} + +impl Appendable for XBinnedScalarEvents +where + NTY: NumOps, +{ + fn empty() -> Self { + Self::empty() + } + + fn append(&mut self, src: &Self) { + self.tss.extend_from_slice(&src.tss); + self.xbincount.extend_from_slice(&src.xbincount); + self.mins.extend_from_slice(&src.mins); + self.maxs.extend_from_slice(&src.maxs); + self.avgs.extend_from_slice(&src.avgs); + } +} + pub struct WaveXBinner { _m1: PhantomData, } @@ -40,7 +119,60 @@ where type Input = Vec; type Output = XBinnedScalarEvents; - fn process(_inp: EventValues) -> Self::Output { - todo!() + fn process(inp: EventValues) -> Self::Output { + let nev = inp.tss.len(); + let mut ret = XBinnedScalarEvents { + tss: inp.tss, + xbincount: Vec::with_capacity(nev), + mins: Vec::with_capacity(nev), + maxs: Vec::with_capacity(nev), + avgs: Vec::with_capacity(nev), + }; + for i1 in 0..nev { + let mut min = None; + let mut max = None; + let mut sum = 0f32; + let mut count = 0; + let vals = &inp.values[i1]; + for i2 in 0..vals.len() { + let v = vals[i2]; + min = match min { + None => Some(v), + Some(min) => { + if v < min { + Some(v) + } else { + Some(min) + } + } + }; + max = match max { + None => Some(v), + Some(max) => { + if v > max { + Some(v) + } else { + Some(max) + } + } + }; + let vf = v.as_(); + if vf.is_nan() { + } else { + sum += vf; + count += 1; + } + } + // TODO while X-binning I expect values, otherwise it is illegal input. + ret.xbincount.push(nev as u32); + ret.mins.push(min.unwrap()); + ret.maxs.push(max.unwrap()); + if count == 0 { + ret.avgs.push(f32::NAN); + } else { + ret.avgs.push(sum / count as f32); + } + } + ret } } diff --git a/disk/src/binned.rs b/disk/src/binned.rs index 022d6e9..8065884 100644 --- a/disk/src/binned.rs +++ b/disk/src/binned.rs @@ -1,6 +1,7 @@ use crate::agg::binnedt::AggregatableTdim; use crate::agg::binnedt2::AggregatableTdim2; use crate::agg::binnedt3::{Agg3, BinnedT3Stream}; +use crate::agg::enp::XBinnedScalarEvents; use crate::agg::eventbatch::MinMaxAvgScalarEventBatch; use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch; use crate::agg::streams::{Appendable, Collectable, Collected, StreamItem, ToJsonResult}; @@ -559,9 +560,9 @@ pub trait EventsDecoder { fn result(&mut self) -> Self::Output; } -pub trait EventsNodeProcessor { +pub trait EventsNodeProcessor: Send + Unpin { type Input; - type Output: Send + DeserializeOwned; + type Output: Send + Unpin + DeserializeOwned + WithTimestamps; fn process(inp: EventValues) -> Self::Output; } @@ -570,10 +571,19 @@ pub trait TimeBins: Send + Unpin + WithLen + Appendable { fn ts2s(&self) -> &Vec; } -pub trait EventsTimeBinner { - type Input; +pub trait EventsTimeBinner: Send + Unpin { + type Input: Unpin + RangeOverlapInfo; type Output: TimeBins; - fn process(inp: Self::Input) -> Self::Output; + type Aggregator: EventsTimeBinnerAggregator + Unpin; + fn aggregator(range: NanoRange) -> Self::Aggregator; +} + +pub trait EventsTimeBinnerAggregator: Send { + type Input: Unpin; + type Output: Unpin; + fn range(&self) -> &NanoRange; + fn ingest(&mut self, item: &Self::Input); + fn result(self) -> Self::Output; } pub trait BinsTimeBinner { @@ -582,6 +592,7 @@ pub trait BinsTimeBinner { fn process(inp: Self::Input) -> Self::Output; } +#[derive(Serialize, Deserialize)] pub struct MinMaxAvgBins { ts1s: Vec, ts2s: Vec, @@ -641,6 +652,102 @@ where } } +impl ReadableFromFile for MinMaxAvgBins +where + NTY: NumOps, +{ + // TODO this function is not needed in the trait: + fn read_from_file(file: File) -> Result, Error> { + Ok(ReadPbv::new(file)) + } + + fn from_buf(buf: &[u8]) -> Result { + let dec = serde_cbor::from_slice(&buf)?; + Ok(dec) + } +} + +pub struct MinMaxAvgAggregator { + range: NanoRange, + count: u32, + min: Option, + max: Option, + avg: Option, +} + +impl MinMaxAvgAggregator { + pub fn new(range: NanoRange) -> Self { + Self { + range, + count: 0, + min: None, + max: None, + avg: None, + } + } +} + +impl EventsTimeBinnerAggregator for MinMaxAvgAggregator +where + NTY: NumOps, +{ + type Input = EventValues; + type Output = MinMaxAvgBins; + + fn range(&self) -> &NanoRange { + &self.range + } + + fn ingest(&mut self, item: &Self::Input) { + todo!() + } + + fn result(self) -> Self::Output { + todo!() + } +} + +pub struct SingleXBinAggregator { + range: NanoRange, + count: u32, + min: Option, + max: Option, + avg: Option, +} + +impl SingleXBinAggregator { + pub fn new(range: NanoRange) -> Self { + Self { + range, + count: 0, + min: None, + max: None, + avg: None, + } + } +} + +impl EventsTimeBinnerAggregator for SingleXBinAggregator +where + NTY: NumOps, +{ + type Input = XBinnedScalarEvents; + // TODO do I need another type to carry the x-bin count as well? No xbincount is static anyways. + type Output = MinMaxAvgBins; + + fn range(&self) -> &NanoRange { + &self.range + } + + fn ingest(&mut self, item: &Self::Input) { + todo!() + } + + fn result(self) -> Self::Output { + todo!() + } +} + pub trait StreamKind: Clone + Unpin + Send + Sync + 'static { type TBinnedStreamType: Stream>, Error>> + Send; type XBinnedEvents: XBinnedEvents; diff --git a/disk/src/binned/pbv.rs b/disk/src/binned/pbv.rs index 2425616..cbb22f9 100644 --- a/disk/src/binned/pbv.rs +++ b/disk/src/binned/pbv.rs @@ -1,13 +1,15 @@ +use crate::agg::binnedt4::TBinnerStream; use crate::agg::streams::{Appendable, StreamItem}; use crate::binned::query::{CacheUsage, PreBinnedQuery}; use crate::binned::{ - BinnedStreamKindScalar, BinsTimeBinner, EventsNodeProcessor, EventsTimeBinner, NumOps, RangeCompletableItem, - ReadableFromFile, StreamKind, WithLen, + BinnedStreamKindScalar, BinsTimeBinner, EventsNodeProcessor, EventsTimeBinner, NumOps, PushableIndex, + RangeCompletableItem, ReadableFromFile, StreamKind, WithLen, }; use crate::cache::pbvfs::PreBinnedScalarValueFetchedStream; use crate::cache::{write_pb_cache_min_max_avg_scalar, CacheFileDesc, MergedFromRemotes, WrittenPbCache}; use crate::decode::{Endianness, EventValueFromBytes, EventValueShape, NumFromBytes}; use crate::frame::makeframe::{make_frame, FrameType}; +use crate::merge::mergefromremote::MergedFromRemotes2; use crate::raw::EventsQuery; use crate::streamlog::Streamlog; use crate::Sitemty; @@ -27,19 +29,18 @@ use tokio::fs::{File, OpenOptions}; //pub type SomeScc = netpod::streamext::SCC; -pub struct PreBinnedValueStream +pub struct PreBinnedValueStream where NTY: NumOps + NumFromBytes + Serialize + 'static, END: Endianness + 'static, EVS: EventValueShape + EventValueFromBytes + 'static, ENP: EventsNodeProcessor>::Output>, ETB: EventsTimeBinner::Output>, - BTB: BinsTimeBinner::Output, Output = ::Output>, { query: PreBinnedQuery, node_config: NodeConfigCached, open_check_local_file: Option> + Send>>>, - fut2: Option::Output>> + Send>>>, + fut2: Option::Output>> + Send>>>, read_from_cache: bool, cache_written: bool, data_complete: bool, @@ -48,9 +49,9 @@ where errored: bool, completed: bool, streamlog: Streamlog, - values: ::Output, + values: ::Output, write_fut: Option> + Send>>>, - read_cache_fut: Option::Output>> + Send>>>, + read_cache_fut: Option::Output>> + Send>>>, _m1: PhantomData, _m2: PhantomData, _m3: PhantomData, @@ -58,15 +59,16 @@ where _m5: PhantomData, } -impl PreBinnedValueStream +impl PreBinnedValueStream where NTY: NumOps + NumFromBytes + Serialize + 'static, END: Endianness + 'static, EVS: EventValueShape + EventValueFromBytes + 'static, - ENP: EventsNodeProcessor>::Output>, - ETB: EventsTimeBinner::Output>, - BTB: BinsTimeBinner::Output, Output = ::Output>, - ::Output: Appendable, + ENP: EventsNodeProcessor>::Output> + 'static, + ETB: EventsTimeBinner::Output> + 'static, + ::Output: PushableIndex + Appendable, + Sitemty<::Output>: FrameType, + ::Output: Appendable, { pub fn new(query: PreBinnedQuery, node_config: &NodeConfigCached) -> Self { Self { @@ -82,7 +84,7 @@ where errored: false, completed: false, streamlog: Streamlog::new(node_config.ix as u32), - values: <::Output as Appendable>::empty(), + values: <::Output as Appendable>::empty(), write_fut: None, read_cache_fut: None, _m1: PhantomData, @@ -93,10 +95,9 @@ where } } - // TODO handle errors also here via return type. fn setup_merged_from_remotes( &mut self, - ) -> Result::Output>> + Send>>, Error> { + ) -> Result::Output>> + Send>>, Error> { let evq = EventsQuery { channel: self.query.channel().clone(), range: self.query.patch().patch_range(), @@ -113,30 +114,27 @@ where } // TODO do I need to set up more transformations or binning to deliver the requested data? let count = self.query.patch().patch_t_len() / self.query.patch().bin_t_len(); - let range = BinnedRange::covering_range(evq.range.clone(), count as u32) - .unwrap() - .ok_or(Error::with_msg("covering_range returns None")) - .unwrap(); + let range = BinnedRange::covering_range(evq.range.clone(), count as u32)? + .ok_or(Error::with_msg("covering_range returns None"))?; let perf_opts = PerfOpts { inmem_bufcap: 512 }; // TODO copy the MergedFromRemotes and adapt... - /*let s1 = MergedFromRemotes::new( - evq, - perf_opts, - self.node_config.node_config.cluster.clone(), - .........., - );*/ - let s1: MergedFromRemotes = err::todoval(); + let s1 = MergedFromRemotes2::::new(evq, perf_opts, self.node_config.node_config.cluster.clone()); // TODO + // Go from ENP values to a T-binned stream... + // Most of the algo is static same. + // What varies: init aggregator for next T-bin. + let ret = TBinnerStream::<_, ETB>::new(s1, range); + //let s1 = todo_convert_stream_to_tbinned_stream(s1, range); - Ok(err::todoval()) + Ok(Box::pin(ret)) } fn setup_from_higher_res_prebinned( &mut self, range: PreBinnedPatchRange, - ) -> Result::Output>> + Send>>, Error> { + ) -> Result::Output>> + Send>>, Error> { let g = self.query.patch().bin_t_len(); let h = range.grid_spec.bin_t_len(); trace!( @@ -207,17 +205,18 @@ where } } -impl Stream for PreBinnedValueStream +impl Stream for PreBinnedValueStream where NTY: NumOps + NumFromBytes + Serialize + Unpin + 'static, END: Endianness + Unpin + 'static, EVS: EventValueShape + EventValueFromBytes + Unpin + 'static, - ENP: EventsNodeProcessor>::Output> + Unpin, - ETB: EventsTimeBinner::Output> + Unpin, - BTB: BinsTimeBinner::Output, Output = ::Output>, + ENP: EventsNodeProcessor>::Output> + Unpin + 'static, + ETB: EventsTimeBinner::Output> + Unpin + 'static, + ::Output: PushableIndex + Appendable, + Sitemty<::Output>: FrameType, ::Output: Serialize + ReadableFromFile + 'static, { - type Item = Sitemty<::Output>; + type Item = Sitemty<::Output>; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; @@ -291,7 +290,7 @@ where self.values.len(), ); self.streamlog.append(Level::INFO, msg); - let emp = <::Output as Appendable>::empty(); + let emp = <::Output as Appendable>::empty(); let values = std::mem::replace(&mut self.values, emp); let fut = write_pb_cache_min_max_avg_scalar( values, @@ -344,7 +343,8 @@ where match item { Ok(file) => { self.read_from_cache = true; - let fut = <::Output as ReadableFromFile>::read_from_file(file)?; + let fut = + <::Output as ReadableFromFile>::read_from_file(file)?; self.read_cache_fut = Some(Box::pin(fut)); continue 'outer; } diff --git a/disk/src/binned/prebinned.rs b/disk/src/binned/prebinned.rs index 81d4275..419bc95 100644 --- a/disk/src/binned/prebinned.rs +++ b/disk/src/binned/prebinned.rs @@ -1,11 +1,14 @@ use crate::agg::binnedt4::{DefaultBinsTimeBinner, DefaultScalarEventsTimeBinner, DefaultSingleXBinTimeBinner}; use crate::agg::enp::{Identity, WaveXBinner}; -use crate::agg::streams::StreamItem; +use crate::agg::streams::{Appendable, StreamItem}; use crate::binned::pbv2::{ pre_binned_value_byte_stream_new, PreBinnedValueByteStream, PreBinnedValueByteStreamInner, PreBinnedValueStream, }; use crate::binned::query::PreBinnedQuery; -use crate::binned::{BinsTimeBinner, EventsNodeProcessor, EventsTimeBinner, NumOps, RangeCompletableItem, StreamKind}; +use crate::binned::{ + BinsTimeBinner, EventsNodeProcessor, EventsTimeBinner, NumOps, PushableIndex, RangeCompletableItem, + ReadableFromFile, StreamKind, +}; use crate::cache::node_ix_for_patch; use crate::decode::{ BigEndian, Endianness, EventValueFromBytes, EventValueShape, EventValuesDim0Case, EventValuesDim1Case, @@ -23,52 +26,62 @@ use parse::channelconfig::{extract_matching_config_entry, read_local_config, Mat use serde::Serialize; use std::pin::Pin; -// TODO instead of EventNodeProcessor, use a T-binning processor here -// TODO might also want another stateful processor which can run on the merged event stream, like smoothing. - fn make_num_pipeline_nty_end_evs_enp( + query: PreBinnedQuery, event_value_shape: EVS, + node_config: &NodeConfigCached, ) -> Pin> + Send>> where NTY: NumOps + NumFromBytes + Serialize + 'static, END: Endianness + 'static, EVS: EventValueShape + EventValueFromBytes + 'static, - ENP: EventsNodeProcessor>::Output>, - ETB: EventsTimeBinner::Output>, + ENP: EventsNodeProcessor>::Output> + 'static, + ETB: EventsTimeBinner::Output> + 'static, Sitemty<::Output>: Framable + 'static, - ::Output: 'static, + ::Output: PushableIndex + Appendable + 'static, + ::Output: Serialize + ReadableFromFile + 'static, + Sitemty<::Output>: FrameType, + Sitemty<::Output>: Framable, { // TODO - // Use the pre-binned fetch machinery, refactored... - err::todoval() + // Currently, this mod uses stuff from pbv2, therefore complete path: + let ret = crate::binned::pbv::PreBinnedValueStream::::new(query, node_config); + let ret = StreamExt::map(ret, |item| Box::new(item) as Box); + Box::pin(ret) } -fn make_num_pipeline_nty_end(shape: Shape) -> Pin> + Send>> +fn make_num_pipeline_nty_end( + shape: Shape, + query: PreBinnedQuery, + node_config: &NodeConfigCached, +) -> Pin> + Send>> where NTY: NumOps + NumFromBytes + Serialize + 'static, END: Endianness + 'static, { - // TODO pass all the correct types. - err::todo(); match shape { Shape::Scalar => { make_num_pipeline_nty_end_evs_enp::, DefaultScalarEventsTimeBinner>( + query, EventValuesDim0Case::new(), + node_config, ) } Shape::Wave(n) => { make_num_pipeline_nty_end_evs_enp::, DefaultSingleXBinTimeBinner>( + query, EventValuesDim1Case::new(n), + node_config, ) } } } macro_rules! match_end { - ($nty:ident, $end:expr, $shape:expr) => { + ($nty:ident, $end:expr, $shape:expr, $query:expr, $node_config:expr) => { match $end { - ByteOrder::LE => make_num_pipeline_nty_end::<$nty, LittleEndian>($shape), - ByteOrder::BE => make_num_pipeline_nty_end::<$nty, BigEndian>($shape), + ByteOrder::LE => make_num_pipeline_nty_end::<$nty, LittleEndian>($shape, $query, $node_config), + ByteOrder::BE => make_num_pipeline_nty_end::<$nty, BigEndian>($shape, $query, $node_config), } }; } @@ -77,10 +90,12 @@ fn make_num_pipeline( scalar_type: ScalarType, byte_order: ByteOrder, shape: Shape, + query: PreBinnedQuery, + node_config: &NodeConfigCached, ) -> Pin> + Send>> { match scalar_type { - ScalarType::I32 => match_end!(i32, byte_order, shape), - ScalarType::F32 => match_end!(f32, byte_order, shape), + ScalarType::I32 => match_end!(i32, byte_order, shape, query, node_config), + ScalarType::F64 => match_end!(f64, byte_order, shape, query, node_config), _ => todo!(), } } @@ -129,6 +144,8 @@ where entry.scalar_type.clone(), entry.byte_order.clone(), entry.to_shape().unwrap(), + query.clone(), + node_config, ) .map(|item| match item.make_frame() { Ok(item) => Ok(item.freeze()), diff --git a/disk/src/decode.rs b/disk/src/decode.rs index 396b758..003d7af 100644 --- a/disk/src/decode.rs +++ b/disk/src/decode.rs @@ -1,6 +1,8 @@ use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch; -use crate::agg::streams::StreamItem; -use crate::binned::{EventsNodeProcessor, NumOps, RangeCompletableItem}; +use crate::agg::streams::{Appendable, StreamItem}; +use crate::binned::{ + EventsNodeProcessor, NumOps, PushableIndex, RangeCompletableItem, RangeOverlapInfo, WithLen, WithTimestamps, +}; use crate::eventblobs::EventBlobsComplete; use crate::eventchunker::EventFull; use crate::frame::makeframe::{make_frame, Framable}; @@ -8,6 +10,7 @@ use bytes::BytesMut; use err::Error; use futures_core::Stream; use futures_util::StreamExt; +use netpod::NanoRange; use serde::{Deserialize, Serialize}; use std::marker::PhantomData; use std::mem::size_of; @@ -111,13 +114,17 @@ impl EventValuesDim0Case { } } +// TODO get rid of this dummy: pub struct ProcAA { _m1: PhantomData, } -impl EventsNodeProcessor for ProcAA { +impl EventsNodeProcessor for ProcAA +where + NTY: NumOps, +{ type Input = NTY; - type Output = MinMaxAvgScalarBinBatch; + type Output = EventValues; fn process(_inp: EventValues) -> Self::Output { todo!() @@ -132,6 +139,12 @@ where type NumXAggToNBins = ProcAA; } +impl WithTimestamps for ProcAA { + fn ts(&self, ix: usize) -> u64 { + todo!() + } +} + pub struct EventValuesDim1Case { n: u32, _m1: PhantomData, @@ -166,6 +179,12 @@ impl MinMaxAvgScalarEventBatchGen { } } +impl WithTimestamps for MinMaxAvgScalarEventBatchGen { + fn ts(&self, ix: usize) -> u64 { + self.tss[ix] + } +} + impl Framable for Result>>, Error> where NTY: NumOps + Serialize, @@ -183,57 +202,7 @@ where type Output = MinMaxAvgScalarEventBatchGen; fn process(inp: EventValues) -> Self::Output { - let nev = inp.tss.len(); - let mut ret = MinMaxAvgScalarEventBatchGen { - tss: inp.tss, - mins: Vec::with_capacity(nev), - maxs: Vec::with_capacity(nev), - avgs: Vec::with_capacity(nev), - }; - for i1 in 0..nev { - let mut min = None; - let mut max = None; - let mut sum = 0f32; - let mut count = 0; - let vals = &inp.values[i1]; - for i2 in 0..vals.len() { - let v = vals[i2]; - min = match min { - None => Some(v), - Some(min) => { - if v < min { - Some(v) - } else { - Some(min) - } - } - }; - max = match max { - None => Some(v), - Some(max) => { - if v > max { - Some(v) - } else { - Some(max) - } - } - }; - let vf = v.as_(); - if vf.is_nan() { - } else { - sum += vf; - count += 1; - } - } - ret.mins.push(min); - ret.maxs.push(max); - if count == 0 { - ret.avgs.push(None); - } else { - ret.avgs.push(Some(sum / count as f32)); - } - } - ret + err::todoval() } } @@ -280,6 +249,65 @@ where } } +impl WithLen for EventValues { + fn len(&self) -> usize { + self.tss.len() + } +} + +impl WithTimestamps for EventValues { + fn ts(&self, ix: usize) -> u64 { + self.tss[ix] + } +} + +impl RangeOverlapInfo for EventValues { + fn ends_before(&self, range: NanoRange) -> bool { + match self.tss.last() { + Some(&ts) => ts < range.beg, + None => true, + } + } + + fn ends_after(&self, range: NanoRange) -> bool { + match self.tss.last() { + Some(&ts) => ts >= range.end, + None => panic!(), + } + } + + fn starts_after(&self, range: NanoRange) -> bool { + match self.tss.first() { + Some(&ts) => ts >= range.end, + None => panic!(), + } + } +} + +impl PushableIndex for EventValues +where + NTY: NumOps, +{ + fn push_index(&mut self, src: &Self, ix: usize) { + self.tss.push(src.tss[ix]); + self.values.push(src.values[ix]); + } +} + +impl Appendable for EventValues +where + NTY: NumOps, +{ + fn empty() -> Self { + Self::empty() + } + + fn append(&mut self, src: &Self) { + self.tss.extend_from_slice(&src.tss); + self.values.extend_from_slice(&src.values); + } +} + pub struct EventsDecodedStream where NTY: NumOps + NumFromBytes, diff --git a/disk/src/frame/makeframe.rs b/disk/src/frame/makeframe.rs index 5fb7569..c0491ef 100644 --- a/disk/src/frame/makeframe.rs +++ b/disk/src/frame/makeframe.rs @@ -2,10 +2,11 @@ use crate::agg::enp::XBinnedScalarEvents; use crate::agg::eventbatch::MinMaxAvgScalarEventBatch; use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch; use crate::agg::streams::StreamItem; -use crate::binned::{NumOps, RangeCompletableItem}; +use crate::binned::{MinMaxAvgBins, NumOps, RangeCompletableItem}; use crate::decode::{EventValues, MinMaxAvgScalarEventBatchGen}; use crate::frame::inmem::InMemoryFrame; use crate::raw::EventQueryJsonStringFrame; +use crate::Sitemty; use bytes::{BufMut, BytesMut}; use err::Error; use serde::{de::DeserializeOwned, Serialize}; @@ -67,35 +68,42 @@ impl FrameType for EventQueryJsonStringFrame { const FRAME_TYPE_ID: u32 = 0x100; } -impl FrameType for Result>, Error> { +impl FrameType for Sitemty { const FRAME_TYPE_ID: u32 = 0x200; } -impl FrameType for Result>, Error> { +impl FrameType for Sitemty { const FRAME_TYPE_ID: u32 = 0x300; } -impl FrameType for Result>>, Error> +impl FrameType for Sitemty> where NTY: SubFrId, { const FRAME_TYPE_ID: u32 = 0x400 + NTY::SUB; } -impl FrameType for Result>>, Error> +impl FrameType for Sitemty> where NTY: SubFrId, { const FRAME_TYPE_ID: u32 = 0x500 + NTY::SUB; } -impl FrameType for Result>>, Error> +impl FrameType for Sitemty> where NTY: SubFrId, { const FRAME_TYPE_ID: u32 = 0x600 + NTY::SUB; } +impl FrameType for Sitemty> +where + NTY: SubFrId, +{ + const FRAME_TYPE_ID: u32 = 0x700 + NTY::SUB; +} + pub trait ProvidesFrameType { fn frame_type_id(&self) -> u32; } @@ -134,6 +142,15 @@ where } } +impl Framable for Sitemty> +where + NTY: NumOps + Serialize, +{ + fn make_frame(&self) -> Result { + make_frame(self) + } +} + pub fn make_frame(item: &FT) -> Result where FT: FrameType + Serialize, diff --git a/disk/src/merge.rs b/disk/src/merge.rs index f094779..db47d07 100644 --- a/disk/src/merge.rs +++ b/disk/src/merge.rs @@ -1,6 +1,7 @@ use crate::agg::streams::{Appendable, StatsItem, StreamItem}; -use crate::binned::{PushableIndex, RangeCompletableItem, StreamKind, WithLen, WithTimestamps}; +use crate::binned::{EventsNodeProcessor, PushableIndex, RangeCompletableItem, StreamKind, WithLen, WithTimestamps}; use crate::streamlog::LogItem; +use crate::Sitemty; use err::Error; use futures_core::Stream; use futures_util::StreamExt; @@ -18,6 +19,232 @@ enum MergedCurVal { Val(T), } +// TODO rename after refactor +pub struct MergedStream2 +where + S: Stream::Output>>, + ENP: EventsNodeProcessor, +{ + inps: Vec, + current: Vec::Output>>, + ixs: Vec, + errored: bool, + completed: bool, + batch: ::Output, + ts_last_emit: u64, + range_complete_observed: Vec, + range_complete_observed_all: bool, + range_complete_observed_all_emitted: bool, + data_emit_complete: bool, + batch_size: usize, + logitems: VecDeque, + event_data_read_stats_items: VecDeque, +} + +impl MergedStream2 +where + S: Stream::Output>> + Unpin, + ENP: EventsNodeProcessor, + ::Output: Appendable, +{ + pub fn new(inps: Vec) -> Self { + let n = inps.len(); + let current = (0..n).into_iter().map(|_| MergedCurVal::None).collect(); + Self { + inps, + current: current, + ixs: vec![0; n], + errored: false, + completed: false, + batch: <::Output as Appendable>::empty(), + ts_last_emit: 0, + range_complete_observed: vec![false; n], + range_complete_observed_all: false, + range_complete_observed_all_emitted: false, + data_emit_complete: false, + batch_size: 64, + logitems: VecDeque::new(), + event_data_read_stats_items: VecDeque::new(), + } + } + + fn replenish(self: &mut Pin<&mut Self>, cx: &mut Context) -> Poll> { + use Poll::*; + let mut pending = 0; + for i1 in 0..self.inps.len() { + match self.current[i1] { + MergedCurVal::None => { + 'l1: loop { + break match self.inps[i1].poll_next_unpin(cx) { + Ready(Some(Ok(k))) => match k { + StreamItem::Log(item) => { + self.logitems.push_back(item); + continue 'l1; + } + StreamItem::Stats(item) => { + match item { + StatsItem::EventDataReadStats(item) => { + self.event_data_read_stats_items.push_back(item); + } + } + continue 'l1; + } + StreamItem::DataItem(item) => match item { + RangeCompletableItem::RangeComplete => { + self.range_complete_observed[i1] = true; + let d = self.range_complete_observed.iter().filter(|&&k| k).count(); + if d == self.range_complete_observed.len() { + self.range_complete_observed_all = true; + debug!("MergedStream range_complete d {} COMPLETE", d); + } else { + trace!("MergedStream range_complete d {}", d); + } + continue 'l1; + } + RangeCompletableItem::Data(item) => { + self.ixs[i1] = 0; + self.current[i1] = MergedCurVal::Val(item); + } + }, + }, + Ready(Some(Err(e))) => { + // TODO emit this error, consider this stream as done, anything more to do here? + //self.current[i1] = CurVal::Err(e); + self.errored = true; + return Ready(Err(e)); + } + Ready(None) => { + self.current[i1] = MergedCurVal::Finish; + } + Pending => { + pending += 1; + } + }; + } + } + _ => (), + } + } + if pending > 0 { + Pending + } else { + Ready(Ok(())) + } + } +} + +impl Stream for MergedStream2 +where + S: Stream::Output>> + Unpin, + ENP: EventsNodeProcessor, + ::Output: PushableIndex + Appendable, +{ + type Item = Sitemty<::Output>; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + use Poll::*; + 'outer: loop { + break if self.completed { + panic!("poll_next on completed"); + } else if self.errored { + self.completed = true; + Ready(None) + } else if let Some(item) = self.logitems.pop_front() { + Ready(Some(Ok(StreamItem::Log(item)))) + } else if let Some(item) = self.event_data_read_stats_items.pop_front() { + Ready(Some(Ok(StreamItem::Stats(StatsItem::EventDataReadStats(item))))) + } else if self.data_emit_complete { + if self.range_complete_observed_all { + if self.range_complete_observed_all_emitted { + self.completed = true; + Ready(None) + } else { + self.range_complete_observed_all_emitted = true; + Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)))) + } + } else { + self.completed = true; + Ready(None) + } + } else { + // Can only run logic if all streams are either finished, errored or have some current value. + match self.replenish(cx) { + Ready(Ok(_)) => { + let mut lowest_ix = usize::MAX; + let mut lowest_ts = u64::MAX; + for i1 in 0..self.inps.len() { + if let MergedCurVal::Val(val) = &self.current[i1] { + let u = self.ixs[i1]; + if u >= val.len() { + self.ixs[i1] = 0; + self.current[i1] = MergedCurVal::None; + continue 'outer; + } else { + let ts = val.ts(u); + if ts < lowest_ts { + lowest_ix = i1; + lowest_ts = ts; + } + } + } + } + if lowest_ix == usize::MAX { + if self.batch.len() != 0 { + //let k = std::mem::replace(&mut self.batch, MinMaxAvgScalarEventBatch::empty()); + //let ret = MinMaxAvgScalarEventBatchStreamItem::Values(k); + let emp = <::Output>::empty(); + let ret = std::mem::replace(&mut self.batch, emp); + self.data_emit_complete = true; + Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(ret))))) + } else { + self.data_emit_complete = true; + continue 'outer; + } + } else { + assert!(lowest_ts >= self.ts_last_emit); + let emp = <::Output>::empty(); + let mut local_batch = std::mem::replace(&mut self.batch, emp); + self.ts_last_emit = lowest_ts; + let rix = self.ixs[lowest_ix]; + match &self.current[lowest_ix] { + MergedCurVal::Val(val) => { + local_batch.push_index(val, rix); + } + MergedCurVal::None => panic!(), + MergedCurVal::Finish => panic!(), + } + self.batch = local_batch; + self.ixs[lowest_ix] += 1; + let curlen = match &self.current[lowest_ix] { + MergedCurVal::Val(val) => val.len(), + MergedCurVal::None => panic!(), + MergedCurVal::Finish => panic!(), + }; + if self.ixs[lowest_ix] >= curlen { + self.ixs[lowest_ix] = 0; + self.current[lowest_ix] = MergedCurVal::None; + } + if self.batch.len() >= self.batch_size { + let emp = <::Output>::empty(); + let ret = std::mem::replace(&mut self.batch, emp); + Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(ret))))) + } else { + continue 'outer; + } + } + } + Ready(Err(e)) => { + self.errored = true; + Ready(Some(Err(e))) + } + Pending => Pending, + } + }; + } + } +} + +// TODO remove after refactor pub struct MergedStream where S: Stream>, Error>> + Unpin, diff --git a/disk/src/merge/mergefromremote.rs b/disk/src/merge/mergefromremote.rs index 65d4855..13db6d7 100644 --- a/disk/src/merge/mergefromremote.rs +++ b/disk/src/merge/mergefromremote.rs @@ -1,5 +1,7 @@ -use crate::binned::EventsNodeProcessor; +use crate::agg::streams::Appendable; +use crate::binned::{EventsNodeProcessor, PushableIndex}; use crate::frame::makeframe::FrameType; +use crate::merge::{MergedStream, MergedStream2}; use crate::raw::{x_processed_stream_from_node2, EventsQuery}; use crate::Sitemty; use err::Error; @@ -31,7 +33,7 @@ where ::Output: Unpin, Sitemty<::Output>: FrameType, { - pub fn new(evq: EventsQuery, perf_opts: PerfOpts, cluster: Cluster, stream_kind: ENP) -> Self { + pub fn new(evq: EventsQuery, perf_opts: PerfOpts, cluster: Cluster) -> Self { let mut tcp_establish_futs = vec![]; for node in &cluster.nodes { let f = x_processed_stream_from_node2::(evq.clone(), perf_opts.clone(), node.clone()); @@ -51,7 +53,8 @@ where impl Stream for MergedFromRemotes2 where - ENP: EventsNodeProcessor, + ENP: EventsNodeProcessor + 'static, + ::Output: PushableIndex + Appendable, { type Item = Sitemty<::Output>; @@ -104,13 +107,8 @@ where } else { if c1 == self.tcp_establish_futs.len() { let inps: Vec<_> = self.nodein.iter_mut().map(|k| k.take().unwrap()).collect(); - //let s1 = MergedStream::<_, ENP>::new(inps); - - // TODO - - err::todo(); - //let s1 = err::todoval(); - //self.merged = Some(Box::pin(s1)); + let s1 = MergedStream2::<_, ENP>::new(inps); + self.merged = Some(Box::pin(s1)); } continue 'outer; }