From 7b40938427d81a0760ae880cb7091a85564eef1e Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Tue, 8 Jun 2021 15:36:54 +0200 Subject: [PATCH] Refactor number pipeline for simple cases --- daqbuffer/src/test.rs | 7 +- disk/src/agg/binnedt4.rs | 132 ++++++-- disk/src/agg/enp.rs | 134 +++++++- disk/src/binned.rs | 304 +++++++++++++++--- disk/src/binned/binnedfrompbv.rs | 261 +++++++++++++++ disk/src/binned/pbv.rs | 51 +-- disk/src/binned/prebinned.rs | 5 +- disk/src/binnedstream.rs | 5 +- disk/src/cache.rs | 1 + disk/src/cache/pbvfs.rs | 3 +- disk/src/decode.rs | 104 +++++- disk/src/frame/makeframe.rs | 6 +- disk/src/merge.rs | 2 +- ...ergefromremote.rs => mergedfromremotes.rs} | 0 disk/src/raw.rs | 4 +- disk/src/raw/conn.rs | 20 +- disk/src/raw/{bffr.rs => eventsfromframes.rs} | 12 +- 17 files changed, 920 insertions(+), 131 deletions(-) create mode 100644 disk/src/binned/binnedfrompbv.rs rename disk/src/merge/{mergefromremote.rs => mergedfromremotes.rs} (100%) rename disk/src/raw/{bffr.rs => eventsfromframes.rs} (87%) diff --git a/daqbuffer/src/test.rs b/daqbuffer/src/test.rs index 20ddac4..4e32540 100644 --- a/daqbuffer/src/test.rs +++ b/daqbuffer/src/test.rs @@ -4,7 +4,7 @@ use chrono::{DateTime, Utc}; use disk::agg::scalarbinbatch::MinMaxAvgScalarBinBatch; use disk::agg::streams::{Bins, StatsItem, StreamItem}; use disk::binned::query::{BinnedQuery, CacheUsage}; -use disk::binned::RangeCompletableItem; +use disk::binned::{MinMaxAvgBins, RangeCompletableItem, WithLen}; use disk::frame::inmem::InMemoryFrameAsyncReadStream; use disk::streamlog::Streamlog; use err::Error; @@ -227,7 +227,8 @@ where None } StreamItem::DataItem(frame) => { - type ExpectedType = Result>, Error>; + info!("test receives tyid {:x}", frame.tyid()); + type ExpectedType = Result>>, Error>; match bincode::deserialize::(frame.buf()) { Ok(item) => match item { Ok(item) => match item { @@ -275,7 +276,7 @@ where } RangeCompletableItem::Data(item) => { a.data_item_count += 1; - a.bin_count += item.bin_count() as u64; + a.bin_count += WithLen::len(&item) as u64; a } }, diff --git a/disk/src/agg/binnedt4.rs b/disk/src/agg/binnedt4.rs index 2fa6cb7..18bbeb7 100644 --- a/disk/src/agg/binnedt4.rs +++ b/disk/src/agg/binnedt4.rs @@ -1,22 +1,25 @@ use crate::agg::enp::XBinnedScalarEvents; use crate::agg::eventbatch::MinMaxAvgScalarEventBatch; use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch; -use crate::agg::streams::StreamItem; +use crate::agg::streams::{Appendable, StreamItem}; use crate::binned::{ - BinsTimeBinner, EventsTimeBinner, EventsTimeBinnerAggregator, MinMaxAvgAggregator, MinMaxAvgBins, NumOps, - RangeCompletableItem, RangeOverlapInfo, SingleXBinAggregator, + BinsTimeBinner, EventsTimeBinner, EventsTimeBinnerAggregator, FilterFittingInside, MinMaxAvgAggregator, + MinMaxAvgBins, NumOps, RangeCompletableItem, RangeOverlapInfo, ReadPbv, ReadableFromFile, SingleXBinAggregator, }; -use crate::decode::EventValues; +use crate::decode::{EventValues, MinMaxAvgScalarEventBatchGen}; +use crate::frame::makeframe::Framable; use crate::Sitemty; use err::Error; use futures_core::Stream; use futures_util::StreamExt; use netpod::log::*; use netpod::{BinnedRange, NanoRange}; +use serde::Serialize; use std::collections::VecDeque; use std::marker::PhantomData; use std::pin::Pin; use std::task::{Context, Poll}; +use tokio::fs::File; pub struct DefaultScalarEventsTimeBinner { _m1: PhantomData, @@ -141,17 +144,33 @@ impl Agg3 { } } -pub struct TBinnerStream -where - S: Stream::Input>> + Send + Unpin + 'static, - ETB: EventsTimeBinner + Send + Unpin + 'static, +pub trait TimeBinnableTypeAggregator: Send { + type Input: TimeBinnableType; + type Output: TimeBinnableType; + fn range(&self) -> &NanoRange; + fn ingest(&mut self, item: &Self::Input); + fn result(self) -> Self::Output; +} + +pub trait TimeBinnableType: + Send + Unpin + RangeOverlapInfo + FilterFittingInside + Appendable + Serialize + ReadableFromFile { - inp: S, + type Output: TimeBinnableType; + type Aggregator: TimeBinnableTypeAggregator + Send + Unpin; + fn aggregator(range: NanoRange) -> Self::Aggregator; +} + +pub struct TBinnerStream +where + S: Stream>, + TBT: TimeBinnableType, +{ + inp: Pin>, spec: BinnedRange, curbin: u32, - left: Option::Input>>>>, - aggtor: Option<::Aggregator>, - tmp_agg_results: VecDeque<<::Aggregator as EventsTimeBinnerAggregator>::Output>, + left: Option>>>, + aggtor: Option<::Aggregator>, + tmp_agg_results: VecDeque<<::Aggregator as TimeBinnableTypeAggregator>::Output>, inp_completed: bool, all_bins_emitted: bool, range_complete_observed: bool, @@ -160,19 +179,19 @@ where completed: bool, } -impl TBinnerStream +impl TBinnerStream where - S: Stream::Input>> + Send + Unpin + 'static, - ETB: EventsTimeBinner, + S: Stream> + Send + Unpin + 'static, + TBT: TimeBinnableType, { pub fn new(inp: S, spec: BinnedRange) -> Self { let range = spec.get_range(0); Self { - inp, + inp: Box::pin(inp), spec, curbin: 0, left: None, - aggtor: Some(::aggregator(range)), + aggtor: Some(::aggregator(range)), tmp_agg_results: VecDeque::new(), inp_completed: false, all_bins_emitted: false, @@ -183,7 +202,7 @@ where } } - fn cur(&mut self, cx: &mut Context) -> Poll::Input>>> { + fn cur(&mut self, cx: &mut Context) -> Poll>> { if let Some(cur) = self.left.take() { cur } else if self.inp_completed { @@ -194,13 +213,13 @@ where } } + // TODO handle unwrap error, or use a mem replace type instead of option: fn cycle_current_bin(&mut self) { self.curbin += 1; let range = self.spec.get_range(self.curbin); let ret = self .aggtor - .replace(::aggregator(range)) - // TODO handle None case, or remove Option if Agg is always present + .replace(::aggregator(range)) .unwrap() .result(); // TODO should we accumulate bins before emit? Maybe not, we want to stay responsive. @@ -213,8 +232,9 @@ where fn handle( &mut self, - cur: Poll::Input>>>, - ) -> Option::Output>>>> { + cur: Poll>>, + ) -> Option::Aggregator as TimeBinnableTypeAggregator>::Output>>>> + { use Poll::*; match cur { Ready(Some(Ok(item))) => match item { @@ -228,7 +248,7 @@ where RangeCompletableItem::Data(item) => { if self.all_bins_emitted { // Just drop the item because we will not emit anymore data. - // Could also at least gather some stats. + // TODO gather stats. None } else { let ag = self.aggtor.as_mut().unwrap(); @@ -273,18 +293,20 @@ where } } -impl Stream for TBinnerStream +impl Stream for TBinnerStream where - S: Stream::Input>> + Send + Unpin + 'static, - ETB: EventsTimeBinner + Send + Unpin + 'static, + S: Stream> + Send + Unpin + 'static, + TBT: TimeBinnableType + Send + Unpin + 'static, + ::Aggregator: Unpin, + <::Aggregator as TimeBinnableTypeAggregator>::Output: Unpin, { - type Item = Sitemty<::Output>; + type Item = Sitemty<<::Aggregator as TimeBinnableTypeAggregator>::Output>; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; 'outer: loop { break if self.completed { - panic!("IntoBinnedTDefaultStream poll_next on completed"); + panic!("poll_next on completed"); } else if self.errored { self.completed = true; Ready(None) @@ -310,3 +332,57 @@ where } } } + +pub struct MinMaxAvgScalarEventBatchGenAggregator +where + NTY: NumOps, +{ + _m1: PhantomData, +} + +impl TimeBinnableTypeAggregator for MinMaxAvgScalarEventBatchGenAggregator +where + NTY: NumOps, +{ + type Input = MinMaxAvgScalarEventBatchGen; + type Output = MinMaxAvgScalarEventBatchGen; + + fn range(&self) -> &NanoRange { + todo!() + } + + fn ingest(&mut self, item: &Self::Input) { + todo!() + } + + fn result(self) -> Self::Output { + todo!() + } +} + +impl ReadableFromFile for MinMaxAvgScalarEventBatchGen +where + NTY: NumOps, +{ + fn read_from_file(file: File) -> Result, Error> { + todo!() + } + + fn from_buf(buf: &[u8]) -> Result { + todo!() + } +} + +// TODO this is just dummy, do I use this in the refactored code? +impl TimeBinnableType for MinMaxAvgScalarEventBatchGen +where + NTY: NumOps, +{ + // TODO Output is just dummy, because this type is probably unused anyways. + type Output = MinMaxAvgScalarEventBatchGen; + type Aggregator = MinMaxAvgScalarEventBatchGenAggregator; + + fn aggregator(range: NanoRange) -> Self::Aggregator { + todo!() + } +} diff --git a/disk/src/agg/enp.rs b/disk/src/agg/enp.rs index 1fa6fb7..e460602 100644 --- a/disk/src/agg/enp.rs +++ b/disk/src/agg/enp.rs @@ -1,10 +1,16 @@ +use crate::agg::binnedt4::{TimeBinnableType, TimeBinnableTypeAggregator}; use crate::agg::streams::Appendable; -use crate::binned::{EventsNodeProcessor, NumOps, PushableIndex, RangeOverlapInfo, WithLen, WithTimestamps}; -use crate::decode::EventValues; +use crate::binned::{ + EventsNodeProcessor, FilterFittingInside, MinMaxAvgBins, NumOps, PushableIndex, RangeOverlapInfo, ReadPbv, + ReadableFromFile, WithLen, WithTimestamps, +}; +use crate::decode::{EventValues, MinMaxAvgScalarEventBatchGen}; +use err::Error; use netpod::NanoRange; use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize}; use std::marker::PhantomData; +use tokio::fs::File; pub struct Identity { _m1: PhantomData, @@ -78,6 +84,12 @@ impl RangeOverlapInfo for XBinnedScalarEvents { } } +impl FilterFittingInside for XBinnedScalarEvents { + fn filter_fitting_inside(self, fit_range: NanoRange) -> Option { + todo!() + } +} + impl PushableIndex for XBinnedScalarEvents where NTY: NumOps, @@ -108,6 +120,124 @@ where } } +impl ReadableFromFile for XBinnedScalarEvents +where + NTY: NumOps, +{ + fn read_from_file(file: File) -> Result, Error> { + todo!() + } + + fn from_buf(buf: &[u8]) -> Result { + todo!() + } +} + +impl TimeBinnableType for XBinnedScalarEvents +where + NTY: NumOps, +{ + type Output = MinMaxAvgBins; + type Aggregator = XBinnedScalarEventsAggregator; + + fn aggregator(range: NanoRange) -> Self::Aggregator { + Self::Aggregator::new(range) + } +} + +pub struct XBinnedScalarEventsAggregator +where + NTY: NumOps, +{ + range: NanoRange, + min: Option, + max: Option, + sumc: u32, + sum: f32, +} + +impl XBinnedScalarEventsAggregator +where + NTY: NumOps, +{ + pub fn new(range: NanoRange) -> Self { + Self { + range, + min: None, + max: None, + sumc: 0, + sum: 0f32, + } + } +} + +impl TimeBinnableTypeAggregator for XBinnedScalarEventsAggregator +where + NTY: NumOps, +{ + type Input = XBinnedScalarEvents; + type Output = MinMaxAvgBins; + + fn range(&self) -> &NanoRange { + &self.range + } + + fn ingest(&mut self, item: &Self::Input) { + for i1 in 0..item.tss.len() { + let ts = item.tss[i1]; + if ts < self.range.beg { + continue; + } else if ts >= self.range.end { + continue; + } else { + self.min = match self.min { + None => Some(item.mins[i1]), + Some(min) => { + if item.mins[i1] < min { + Some(item.mins[i1]) + } else { + Some(min) + } + } + }; + self.max = match self.max { + None => Some(item.maxs[i1]), + Some(max) => { + if item.maxs[i1] > max { + Some(item.maxs[i1]) + } else { + Some(max) + } + } + }; + let x = item.avgs[i1]; + if x.is_nan() { + } else { + self.sum += x; + self.sumc += 1; + } + } + } + } + + fn result(self) -> Self::Output { + let avg = if self.sumc == 0 { + None + } else { + Some(self.sum / self.sumc as f32) + }; + Self::Output { + ts1s: vec![self.range.beg], + ts2s: vec![self.range.end], + // TODO + counts: vec![0], + mins: vec![self.min], + maxs: vec![self.max], + avgs: vec![avg], + } + } +} + pub struct WaveXBinner { _m1: PhantomData, } diff --git a/disk/src/binned.rs b/disk/src/binned.rs index ecd52c1..21a7e8a 100644 --- a/disk/src/binned.rs +++ b/disk/src/binned.rs @@ -1,12 +1,16 @@ use crate::agg::binnedt::AggregatableTdim; use crate::agg::binnedt2::AggregatableTdim2; use crate::agg::binnedt3::{Agg3, BinnedT3Stream}; -use crate::agg::binnedt4::{DefaultScalarEventsTimeBinner, DefaultSingleXBinTimeBinner}; +use crate::agg::binnedt4::{ + DefaultScalarEventsTimeBinner, DefaultSingleXBinTimeBinner, TBinnerStream, TimeBinnableType, + TimeBinnableTypeAggregator, +}; use crate::agg::enp::{Identity, WaveXBinner, XBinnedScalarEvents}; use crate::agg::eventbatch::MinMaxAvgScalarEventBatch; use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch; use crate::agg::streams::{Appendable, Collectable, Collected, StreamItem, ToJsonResult}; use crate::agg::{Fits, FitsInside}; +use crate::binned::binnedfrompbv::BinnedFromPreBinned; use crate::binned::query::{BinnedQuery, PreBinnedQuery}; use crate::binned::scalar::binned_stream; use crate::binnedstream::{BinnedScalarStreamFromPreBinnedPatches, BoxedStream}; @@ -16,15 +20,16 @@ use crate::decode::{ LittleEndian, NumFromBytes, }; use crate::frame::makeframe::{Framable, FrameType, SubFrId}; -use crate::merge::mergefromremote::MergedFromRemotes2; +use crate::merge::mergedfromremotes::MergedFromRemotes2; use crate::raw::EventsQuery; use crate::Sitemty; use bytes::Bytes; use chrono::{TimeZone, Utc}; use err::Error; use futures_core::Stream; -use futures_util::StreamExt; +use futures_util::{FutureExt, StreamExt}; use netpod::log::*; +use netpod::timeunits::SEC; use netpod::{ AggKind, BinnedRange, ByteOrder, NanoRange, NodeConfigCached, PerfOpts, PreBinnedPatchIterator, PreBinnedPatchRange, ScalarType, Shape, @@ -42,6 +47,7 @@ use std::time::Duration; use tokio::fs::File; use tokio::io::{AsyncRead, ReadBuf}; +pub mod binnedfrompbv; pub mod pbv; pub mod pbv2; pub mod prebinned; @@ -179,10 +185,19 @@ where EVS: EventValueShape + EventValueFromBytes + 'static, ENP: EventsNodeProcessor>::Output> + 'static, ETB: EventsTimeBinner::Output> + 'static, - ::Output: PushableIndex + Appendable + 'static, + ::Output: TimeBinnableType + PushableIndex + Appendable + 'static, + <::Output as TimeBinnableType>::Output: + TimeBinnableType::Output as TimeBinnableType>::Output> + Unpin, ::Output: Serialize + ReadableFromFile + 'static, + Sitemty< + <<::Output as TimeBinnableType>::Aggregator as TimeBinnableTypeAggregator>::Output, + >: Framable, + // TODO require these things in general? Sitemty<::Output>: FrameType + Framable + 'static, - Sitemty<::Output>: Framable, + Sitemty<::Output>: FrameType + Framable + DeserializeOwned, + Sitemty< + <<::Output as TimeBinnableType>::Aggregator as TimeBinnableTypeAggregator>::Output, + >: FrameType + Framable + DeserializeOwned, { // TODO construct the binned pipeline: // Either take from prebinned sub sstream, or directly from a merged. @@ -212,11 +227,7 @@ where ); return Err(Error::with_msg(msg)); } - - // TODO - // Must generify the BinnedScalarStreamFromPreBinnedPatches. - // Copy code and introduce type parameters. - let s = BinnedScalarStreamFromPreBinnedPatches::new( + let s = BinnedFromPreBinned::<<::Output as TimeBinnableType>::Output>::new( PreBinnedPatchIterator::from_range(pre_range), query.channel().clone(), range.clone(), @@ -225,15 +236,9 @@ where node_config, query.disk_stats_every().clone(), query.report_error(), - self.clone(), - )?; - - let s = BoxedStream::new(Box::pin(s))?; - let ret = BinnedStreamRes { - binned_stream: s, - range, - }; - Ok(ret) + )? + .map(|item| Box::new(item) as Box); + Ok(Box::pin(s)) } Ok(None) => { info!( @@ -245,24 +250,13 @@ where range: query.range().clone(), agg_kind: query.agg_kind().clone(), }; - - // TODO do I need to set up more transformations or binning to deliver the requested data? - //let s = SK::new_binned_from_merged(&stream_kind, evq, perf_opts, range.clone(), node_config)?; - - // TODO adapt the usage the same way how I do in prebinned.rs: - let s = MergedFromRemotes2::new(evq, perf_opts, node_config.node_config.cluster.clone(), self.clone()); - let s = Self::xbinned_to_tbinned(s, range); - - let s = BoxedStream::new(Box::pin(s))?; - let ret = BinnedStreamRes { - binned_stream: s, - range, - }; - Ok(ret) + let s = MergedFromRemotes2::::new(evq, perf_opts, node_config.node_config.cluster.clone()); + let s = TBinnerStream::<_, ::Output>::new(s, range); + let s = StreamExt::map(s, |item| Box::new(item) as Box); + Ok(Box::pin(s)) } Err(e) => Err(e), } - err::todoval() } fn make_num_pipeline_nty_end( @@ -719,11 +713,21 @@ impl TBinnedBins for MinMaxAvgScalarBinBatch { } pub trait NumOps: - Sized + Copy + Send + Unpin + Zero + AsPrimitive + Bounded + PartialOrd + SubFrId + DeserializeOwned + Sized + Copy + Send + Unpin + Zero + AsPrimitive + Bounded + PartialOrd + SubFrId + Serialize + DeserializeOwned { } impl NumOps for T where - T: Sized + Copy + Send + Unpin + Zero + AsPrimitive + Bounded + PartialOrd + SubFrId + DeserializeOwned + T: Sized + + Copy + + Send + + Unpin + + Zero + + AsPrimitive + + Bounded + + PartialOrd + + SubFrId + + Serialize + + DeserializeOwned { } @@ -735,15 +739,16 @@ pub trait EventsDecoder { pub trait EventsNodeProcessor: Send + Unpin { type Input; - type Output: Send + Unpin + DeserializeOwned + WithTimestamps; + type Output: Send + Unpin + DeserializeOwned + WithTimestamps + TimeBinnableType; fn process(inp: EventValues) -> Self::Output; } -pub trait TimeBins: Send + Unpin + WithLen + Appendable { +pub trait TimeBins: Send + Unpin + WithLen + Appendable + FilterFittingInside { fn ts1s(&self) -> &Vec; fn ts2s(&self) -> &Vec; } +// TODO remove in favor of the one in binnedt4 pub trait EventsTimeBinner: Send + Unpin { type Input: Unpin + RangeOverlapInfo; type Output: TimeBins; @@ -751,6 +756,7 @@ pub trait EventsTimeBinner: Send + Unpin { fn aggregator(range: NanoRange) -> Self::Aggregator; } +// TODO remove in favor of the one in binnedt4 pub trait EventsTimeBinnerAggregator: Send { type Input: Unpin; type Output: Unpin; @@ -767,12 +773,31 @@ pub trait BinsTimeBinner { #[derive(Serialize, Deserialize)] pub struct MinMaxAvgBins { - ts1s: Vec, - ts2s: Vec, - counts: Vec, - mins: Vec>, - maxs: Vec>, - avgs: Vec>, + pub ts1s: Vec, + pub ts2s: Vec, + pub counts: Vec, + pub mins: Vec>, + pub maxs: Vec>, + pub avgs: Vec>, +} + +impl std::fmt::Debug for MinMaxAvgBins +where + NTY: std::fmt::Debug, +{ + fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result { + write!( + fmt, + "MinMaxAvgBins count {} ts1s {:?} ts2s {:?} counts {:?} mins {:?} maxs {:?} avgs {:?}", + self.ts1s.len(), + self.ts1s.iter().map(|k| k / SEC).collect::>(), + self.ts2s.iter().map(|k| k / SEC).collect::>(), + self.counts, + self.mins, + self.maxs, + self.avgs, + ) + } } impl MinMaxAvgBins { @@ -788,6 +813,62 @@ impl MinMaxAvgBins { } } +impl FitsInside for MinMaxAvgBins { + fn fits_inside(&self, range: NanoRange) -> Fits { + if self.ts1s.is_empty() { + Fits::Empty + } else { + let t1 = *self.ts1s.first().unwrap(); + let t2 = *self.ts2s.last().unwrap(); + if t2 <= range.beg { + Fits::Lower + } else if t1 >= range.end { + Fits::Greater + } else if t1 < range.beg && t2 > range.end { + Fits::PartlyLowerAndGreater + } else if t1 < range.beg { + Fits::PartlyLower + } else if t2 > range.end { + Fits::PartlyGreater + } else { + Fits::Inside + } + } + } +} + +impl FilterFittingInside for MinMaxAvgBins { + fn filter_fitting_inside(self, fit_range: NanoRange) -> Option { + match self.fits_inside(fit_range) { + Fits::Inside | Fits::PartlyGreater | Fits::PartlyLower | Fits::PartlyLowerAndGreater => Some(self), + _ => None, + } + } +} + +impl RangeOverlapInfo for MinMaxAvgBins { + fn ends_before(&self, range: NanoRange) -> bool { + match self.ts2s.last() { + Some(&ts) => ts <= range.beg, + None => true, + } + } + + fn ends_after(&self, range: NanoRange) -> bool { + match self.ts2s.last() { + Some(&ts) => ts > range.end, + None => panic!(), + } + } + + fn starts_after(&self, range: NanoRange) -> bool { + match self.ts1s.first() { + Some(&ts) => ts >= range.end, + None => panic!(), + } + } +} + impl TimeBins for MinMaxAvgBins where NTY: NumOps, @@ -840,6 +921,18 @@ where } } +impl TimeBinnableType for MinMaxAvgBins +where + NTY: NumOps, +{ + type Output = MinMaxAvgBins; + type Aggregator = MinMaxAvgBinsAggregator; + + fn aggregator(range: NanoRange) -> Self::Aggregator { + Self::Aggregator::new(range) + } +} + pub struct MinMaxAvgAggregator { range: NanoRange, count: u32, @@ -860,6 +953,28 @@ impl MinMaxAvgAggregator { } } +// TODO rename to EventValuesAggregator +impl TimeBinnableTypeAggregator for MinMaxAvgAggregator +where + NTY: NumOps, +{ + type Input = EventValues; + type Output = MinMaxAvgBins; + + fn range(&self) -> &NanoRange { + &self.range + } + + fn ingest(&mut self, item: &Self::Input) { + todo!() + } + + fn result(self) -> Self::Output { + todo!() + } +} + +// TODO after refactor get rid of this impl: impl EventsTimeBinnerAggregator for MinMaxAvgAggregator where NTY: NumOps, @@ -880,6 +995,107 @@ where } } +pub struct MinMaxAvgBinsAggregator { + range: NanoRange, + count: u32, + min: Option, + max: Option, + avg: Option, + sum: f32, + sumc: u32, +} + +impl MinMaxAvgBinsAggregator { + pub fn new(range: NanoRange) -> Self { + Self { + range, + // TODO: count events here? + count: 0, + min: None, + max: None, + avg: None, + sum: 0f32, + sumc: 0, + } + } +} + +impl TimeBinnableTypeAggregator for MinMaxAvgBinsAggregator +where + NTY: NumOps, +{ + type Input = MinMaxAvgBins; + type Output = MinMaxAvgBins; + + fn range(&self) -> &NanoRange { + &self.range + } + + fn ingest(&mut self, item: &Self::Input) { + for i1 in 0..item.ts1s.len() { + if item.ts2s[i1] <= self.range.beg { + continue; + } else if item.ts1s[i1] >= self.range.end { + continue; + } else { + self.min = match self.min { + None => item.mins[i1], + Some(min) => match item.mins[i1] { + None => Some(min), + Some(v) => { + if v < min { + Some(v) + } else { + Some(min) + } + } + }, + }; + self.max = match self.max { + None => item.maxs[i1], + Some(max) => match item.maxs[i1] { + None => Some(max), + Some(v) => { + if v > max { + Some(v) + } else { + Some(max) + } + } + }, + }; + match item.avgs[i1] { + None => {} + Some(v) => { + if v.is_nan() { + } else { + self.sum += v; + self.sumc += 1; + } + } + } + } + } + } + + fn result(self) -> Self::Output { + let avg = if self.sumc == 0 { + None + } else { + Some(self.sum / self.sumc as f32) + }; + Self::Output { + ts1s: vec![self.range.beg], + ts2s: vec![self.range.end], + // TODO + counts: vec![0], + mins: vec![self.min], + maxs: vec![self.max], + avgs: vec![avg], + } + } +} + pub struct SingleXBinAggregator { range: NanoRange, count: u32, diff --git a/disk/src/binned/binnedfrompbv.rs b/disk/src/binned/binnedfrompbv.rs new file mode 100644 index 0000000..7292ac4 --- /dev/null +++ b/disk/src/binned/binnedfrompbv.rs @@ -0,0 +1,261 @@ +use crate::agg::binnedt4::{TBinnerStream, TimeBinnableType, TimeBinnableTypeAggregator}; +use crate::agg::streams::StreamItem; +use crate::binned::query::{CacheUsage, PreBinnedQuery}; +use crate::binned::{EventsTimeBinner, RangeCompletableItem}; +use crate::cache::{node_ix_for_patch, HttpBodyAsAsyncRead}; +use crate::frame::inmem::InMemoryFrameAsyncReadStream; +use crate::frame::makeframe::{decode_frame, FrameType}; +use crate::Sitemty; +use err::Error; +use futures_core::Stream; +use futures_util::{FutureExt, StreamExt}; +use http::{StatusCode, Uri}; +use netpod::log::*; +use netpod::{AggKind, BinnedRange, ByteSize, Channel, NodeConfigCached, PerfOpts, PreBinnedPatchIterator}; +use serde::de::DeserializeOwned; +use serde::Deserialize; +use std::future::ready; +use std::marker::PhantomData; +use std::pin::Pin; +use std::task::{Context, Poll}; + +pub struct FetchedPreBinned { + uri: Uri, + resfut: Option, + res: Option>, + errored: bool, + completed: bool, + _m1: PhantomData, +} + +impl FetchedPreBinned { + pub fn new(query: &PreBinnedQuery, node_config: &NodeConfigCached) -> Result { + let nodeix = node_ix_for_patch(&query.patch(), &query.channel(), &node_config.node_config.cluster); + let node = &node_config.node_config.cluster.nodes[nodeix as usize]; + let uri: hyper::Uri = format!( + "http://{}:{}/api/4/prebinned?{}", + node.host, + node.port, + query.make_query_string() + ) + .parse()?; + let ret = Self { + uri, + resfut: None, + res: None, + errored: false, + completed: false, + _m1: PhantomData, + }; + Ok(ret) + } +} + +impl Stream for FetchedPreBinned +where + TBT: TimeBinnableType, + Sitemty: FrameType + DeserializeOwned, +{ + type Item = Sitemty; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + use Poll::*; + 'outer: loop { + break if self.completed { + panic!("poll_next on completed"); + } else if self.errored { + self.completed = true; + return Ready(None); + } else if let Some(res) = self.res.as_mut() { + match res.poll_next_unpin(cx) { + Ready(Some(Ok(item))) => match item { + StreamItem::Log(item) => Ready(Some(Ok(StreamItem::Log(item)))), + StreamItem::Stats(item) => Ready(Some(Ok(StreamItem::Stats(item)))), + StreamItem::DataItem(item) => match decode_frame::>(&item) { + Ok(Ok(item)) => Ready(Some(Ok(item))), + Ok(Err(e)) => { + self.errored = true; + Ready(Some(Err(e))) + } + Err(e) => { + self.errored = true; + Ready(Some(Err(e))) + } + }, + }, + Ready(Some(Err(e))) => { + self.errored = true; + Ready(Some(Err(e))) + } + Ready(None) => { + self.completed = true; + Ready(None) + } + Pending => Pending, + } + } else if let Some(resfut) = self.resfut.as_mut() { + match resfut.poll_unpin(cx) { + Ready(res) => match res { + Ok(res) => { + if res.status() == StatusCode::OK { + let perf_opts = PerfOpts { inmem_bufcap: 512 }; + let s1 = HttpBodyAsAsyncRead::new(res); + let s2 = InMemoryFrameAsyncReadStream::new(s1, perf_opts.inmem_bufcap); + self.res = Some(s2); + continue 'outer; + } else { + error!( + "PreBinnedValueFetchedStream got non-OK result from sub request: {:?}", + res + ); + let e = Error::with_msg(format!( + "PreBinnedValueFetchedStream got non-OK result from sub request: {:?}", + res + )); + self.errored = true; + Ready(Some(Err(e))) + } + } + Err(e) => { + error!("PreBinnedValueStream error in stream {:?}", e); + self.errored = true; + Ready(Some(Err(e.into()))) + } + }, + Pending => Pending, + } + } else { + match hyper::Request::builder() + .method(http::Method::GET) + .uri(&self.uri) + .body(hyper::Body::empty()) + { + Ok(req) => { + let client = hyper::Client::new(); + self.resfut = Some(client.request(req)); + continue 'outer; + } + Err(e) => { + self.errored = true; + Ready(Some(Err(e.into()))) + } + } + }; + } + } +} + +/// Generate bins from a range of pre-binned patches. +/// +/// Takes an iterator over the necessary patches. +pub struct BinnedFromPreBinned +where + TBT: TimeBinnableType, +{ + // TODO get rid of box: + inp: Pin> + Send>>, + _m1: PhantomData, +} + +impl BinnedFromPreBinned +where + TBT: TimeBinnableType + Unpin + 'static, + Sitemty: FrameType + DeserializeOwned, +{ + pub fn new( + patch_it: PreBinnedPatchIterator, + channel: Channel, + range: BinnedRange, + agg_kind: AggKind, + cache_usage: CacheUsage, + node_config: &NodeConfigCached, + disk_stats_every: ByteSize, + report_error: bool, + ) -> Result { + let patches: Vec<_> = patch_it.collect(); + let mut sp = String::new(); + if false { + // Convert this to a StreamLog message: + for (i, p) in patches.iter().enumerate() { + use std::fmt::Write; + write!(sp, " • patch {:2} {:?}\n", i, p)?; + } + info!("Using these pre-binned patches:\n{}", sp); + } + let pmax = patches.len(); + let inp = futures_util::stream::iter(patches.into_iter().enumerate()) + .map({ + let node_config = node_config.clone(); + move |(pix, patch)| { + let query = PreBinnedQuery::new( + patch, + channel.clone(), + agg_kind.clone(), + cache_usage.clone(), + disk_stats_every.clone(), + report_error, + ); + let ret: Pin + Send>> = + match FetchedPreBinned::::new(&query, &node_config) { + Ok(stream) => Box::pin(stream.map(move |q| (pix, q))), + Err(e) => { + error!("error from PreBinnedValueFetchedStream::new {:?}", e); + Box::pin(futures_util::stream::iter(vec![(pix, Err(e))])) + } + }; + ret + } + }) + .flatten() + .filter_map({ + let range = range.clone(); + move |(pix, k)| { + let fit_range = range.full_range(); + let g = match k { + Ok(item) => match item { + StreamItem::Log(item) => Some(Ok(StreamItem::Log(item))), + StreamItem::Stats(item) => Some(Ok(StreamItem::Stats(item))), + StreamItem::DataItem(item) => match item { + RangeCompletableItem::RangeComplete => { + if pix + 1 == pmax { + Some(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete))) + } else { + None + } + } + RangeCompletableItem::Data(item) => { + match crate::binned::FilterFittingInside::filter_fitting_inside(item, fit_range) { + Some(item) => Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(item)))), + None => None, + } + } + }, + }, + Err(e) => Some(Err(e)), + }; + ready(g) + } + }); + + // TODO TBinnerStream is for T-binning events. + // But here, we need to bin bins into bigger bins. + // The logic in TBinnerStream is actually the same I think.. + // Reuse?? + let inp = TBinnerStream::<_, TBT>::new(inp, range); + Ok(Self { + inp: Box::pin(inp), + _m1: PhantomData, + }) + } +} + +impl Stream for BinnedFromPreBinned +where + TBT: TimeBinnableType, +{ + type Item = Sitemty; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + self.inp.poll_next_unpin(cx) + } +} diff --git a/disk/src/binned/pbv.rs b/disk/src/binned/pbv.rs index cbb22f9..8ded692 100644 --- a/disk/src/binned/pbv.rs +++ b/disk/src/binned/pbv.rs @@ -1,4 +1,4 @@ -use crate::agg::binnedt4::TBinnerStream; +use crate::agg::binnedt4::{TBinnerStream, TimeBinnableType}; use crate::agg::streams::{Appendable, StreamItem}; use crate::binned::query::{CacheUsage, PreBinnedQuery}; use crate::binned::{ @@ -9,7 +9,7 @@ use crate::cache::pbvfs::PreBinnedScalarValueFetchedStream; use crate::cache::{write_pb_cache_min_max_avg_scalar, CacheFileDesc, MergedFromRemotes, WrittenPbCache}; use crate::decode::{Endianness, EventValueFromBytes, EventValueShape, NumFromBytes}; use crate::frame::makeframe::{make_frame, FrameType}; -use crate::merge::mergefromremote::MergedFromRemotes2; +use crate::merge::mergedfromremotes::MergedFromRemotes2; use crate::raw::EventsQuery; use crate::streamlog::Streamlog; use crate::Sitemty; @@ -40,7 +40,9 @@ where query: PreBinnedQuery, node_config: NodeConfigCached, open_check_local_file: Option> + Send>>>, - fut2: Option::Output>> + Send>>>, + fut2: Option< + Pin::Output as TimeBinnableType>::Output>> + Send>>, + >, read_from_cache: bool, cache_written: bool, data_complete: bool, @@ -49,9 +51,15 @@ where errored: bool, completed: bool, streamlog: Streamlog, - values: ::Output, + values: <::Output as TimeBinnableType>::Output, write_fut: Option> + Send>>>, - read_cache_fut: Option::Output>> + Send>>>, + read_cache_fut: Option< + Pin< + Box< + dyn Future::Output as TimeBinnableType>::Output>> + Send, + >, + >, + >, _m1: PhantomData, _m2: PhantomData, _m3: PhantomData, @@ -84,7 +92,8 @@ where errored: false, completed: false, streamlog: Streamlog::new(node_config.ix as u32), - values: <::Output as Appendable>::empty(), + // TODO use alias via some trait associated type: + values: <<::Output as TimeBinnableType>::Output as Appendable>::empty(), write_fut: None, read_cache_fut: None, _m1: PhantomData, @@ -97,7 +106,10 @@ where fn setup_merged_from_remotes( &mut self, - ) -> Result::Output>> + Send>>, Error> { + ) -> Result< + Pin::Output as TimeBinnableType>::Output>> + Send>>, + Error, + > { let evq = EventsQuery { channel: self.query.channel().clone(), range: self.query.patch().patch_range(), @@ -117,24 +129,18 @@ where let range = BinnedRange::covering_range(evq.range.clone(), count as u32)? .ok_or(Error::with_msg("covering_range returns None"))?; let perf_opts = PerfOpts { inmem_bufcap: 512 }; - - // TODO copy the MergedFromRemotes and adapt... - let s1 = MergedFromRemotes2::::new(evq, perf_opts, self.node_config.node_config.cluster.clone()); - - // TODO - // Go from ENP values to a T-binned stream... - // Most of the algo is static same. - // What varies: init aggregator for next T-bin. - let ret = TBinnerStream::<_, ETB>::new(s1, range); - - //let s1 = todo_convert_stream_to_tbinned_stream(s1, range); + let s = MergedFromRemotes2::::new(evq, perf_opts, self.node_config.node_config.cluster.clone()); + let ret = TBinnerStream::<_, ::Output>::new(s, range); Ok(Box::pin(ret)) } fn setup_from_higher_res_prebinned( &mut self, range: PreBinnedPatchRange, - ) -> Result::Output>> + Send>>, Error> { + ) -> Result< + Pin::Output as TimeBinnableType>::Output>> + Send>>, + Error, + > { let g = self.query.patch().bin_t_len(); let h = range.grid_spec.bin_t_len(); trace!( @@ -216,7 +222,7 @@ where Sitemty<::Output>: FrameType, ::Output: Serialize + ReadableFromFile + 'static, { - type Item = Sitemty<::Output>; + type Item = Sitemty<<::Output as TimeBinnableType>::Output>; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; @@ -290,7 +296,8 @@ where self.values.len(), ); self.streamlog.append(Level::INFO, msg); - let emp = <::Output as Appendable>::empty(); + // TODO use alias vias trait: + let emp = <<::Output as TimeBinnableType>::Output as Appendable>::empty(); let values = std::mem::replace(&mut self.values, emp); let fut = write_pb_cache_min_max_avg_scalar( values, @@ -344,7 +351,7 @@ where Ok(file) => { self.read_from_cache = true; let fut = - <::Output as ReadableFromFile>::read_from_file(file)?; + <<::Output as TimeBinnableType>::Output as ReadableFromFile>::read_from_file(file)?; self.read_cache_fut = Some(Box::pin(fut)); continue 'outer; } diff --git a/disk/src/binned/prebinned.rs b/disk/src/binned/prebinned.rs index 22e59e4..4db5741 100644 --- a/disk/src/binned/prebinned.rs +++ b/disk/src/binned/prebinned.rs @@ -1,4 +1,6 @@ -use crate::agg::binnedt4::{DefaultBinsTimeBinner, DefaultScalarEventsTimeBinner, DefaultSingleXBinTimeBinner}; +use crate::agg::binnedt4::{ + DefaultBinsTimeBinner, DefaultScalarEventsTimeBinner, DefaultSingleXBinTimeBinner, TimeBinnableType, +}; use crate::agg::enp::{Identity, WaveXBinner}; use crate::agg::streams::{Appendable, StreamItem}; use crate::binned::pbv2::{ @@ -41,6 +43,7 @@ where ::Output: Serialize + ReadableFromFile + 'static, Sitemty<::Output>: FrameType + Framable + 'static, Sitemty<::Output>: Framable, + Sitemty<<::Output as TimeBinnableType>::Output>: Framable, { // TODO // Currently, this mod uses stuff from pbv2, therefore complete path: diff --git a/disk/src/binnedstream.rs b/disk/src/binnedstream.rs index 30a77db..658545b 100644 --- a/disk/src/binnedstream.rs +++ b/disk/src/binnedstream.rs @@ -12,6 +12,7 @@ use std::future::ready; use std::pin::Pin; use std::task::{Context, Poll}; +// TODO remove after refactor. pub struct BinnedScalarStreamFromPreBinnedPatches where SK: StreamKind, @@ -113,8 +114,7 @@ where } } -// TODO change name, type is generic now: -// Can I remove the whole type or keep for static check? +// TODO remove after SK no longer needed. impl Stream for BinnedScalarStreamFromPreBinnedPatches where SK: StreamKind, @@ -126,6 +126,7 @@ where } } +// TODO remove after refactor. pub struct BoxedStream { inp: Pin + Send>>, } diff --git a/disk/src/cache.rs b/disk/src/cache.rs index 12229f0..4815155 100644 --- a/disk/src/cache.rs +++ b/disk/src/cache.rs @@ -25,6 +25,7 @@ use tokio::io::{AsyncRead, ReadBuf}; pub mod pbvfs; +// TODO move to a better fitting module: pub struct HttpBodyAsAsyncRead { inp: Response, left: Bytes, diff --git a/disk/src/cache/pbvfs.rs b/disk/src/cache/pbvfs.rs index 6cedd0c..a2cbdd9 100644 --- a/disk/src/cache/pbvfs.rs +++ b/disk/src/cache/pbvfs.rs @@ -13,6 +13,7 @@ use netpod::{NodeConfigCached, PerfOpts}; use std::pin::Pin; use std::task::{Context, Poll}; +// TODO remove when SK no longer needed. pub struct PreBinnedScalarValueFetchedStream where SK: StreamKind, @@ -75,7 +76,7 @@ where StreamItem::Stats(item) => Ready(Some(Ok(StreamItem::Stats(item)))), StreamItem::DataItem(item) => { match decode_frame::>, Error>>( - &item, >, Error> as FrameType>::FRAME_TYPE_ID, + &item, ) { Ok(Ok(item)) => Ready(Some(Ok(item))), Ok(Err(e)) => { diff --git a/disk/src/decode.rs b/disk/src/decode.rs index 003d7af..3d0519e 100644 --- a/disk/src/decode.rs +++ b/disk/src/decode.rs @@ -1,7 +1,10 @@ +use crate::agg::binnedt4::{TimeBinnableType, TimeBinnableTypeAggregator}; +use crate::agg::enp::{Identity, WaveXBinner}; use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch; use crate::agg::streams::{Appendable, StreamItem}; use crate::binned::{ - EventsNodeProcessor, NumOps, PushableIndex, RangeCompletableItem, RangeOverlapInfo, WithLen, WithTimestamps, + EventsNodeProcessor, FilterFittingInside, MinMaxAvgAggregator, MinMaxAvgBins, MinMaxAvgBinsAggregator, NumOps, + PushableIndex, RangeCompletableItem, RangeOverlapInfo, ReadPbv, ReadableFromFile, WithLen, WithTimestamps, }; use crate::eventblobs::EventBlobsComplete; use crate::eventchunker::EventFull; @@ -11,11 +14,13 @@ use err::Error; use futures_core::Stream; use futures_util::StreamExt; use netpod::NanoRange; +use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize}; use std::marker::PhantomData; use std::mem::size_of; use std::pin::Pin; use std::task::{Context, Poll}; +use tokio::fs::File; pub trait Endianness: Send + Unpin {} pub struct LittleEndian {} @@ -135,7 +140,8 @@ impl EventValueShape for EventValuesDim0Case where NTY: NumOps + NumFromBytes, { - type NumXAggToSingleBin = ProcAA; + type NumXAggToSingleBin = Identity; + // TODO: type NumXAggToNBins = ProcAA; } @@ -160,6 +166,7 @@ pub struct ProcBB { _m1: PhantomData, } +// TODO still in use or can go away? #[derive(Serialize, Deserialize)] pub struct MinMaxAvgScalarEventBatchGen { pub tss: Vec, @@ -168,7 +175,10 @@ pub struct MinMaxAvgScalarEventBatchGen { pub avgs: Vec>, } -impl MinMaxAvgScalarEventBatchGen { +impl MinMaxAvgScalarEventBatchGen +where + NTY: NumOps, +{ pub fn empty() -> Self { Self { tss: vec![], @@ -179,12 +189,63 @@ impl MinMaxAvgScalarEventBatchGen { } } -impl WithTimestamps for MinMaxAvgScalarEventBatchGen { +impl WithTimestamps for MinMaxAvgScalarEventBatchGen +where + NTY: NumOps, +{ fn ts(&self, ix: usize) -> u64 { self.tss[ix] } } +impl FilterFittingInside for MinMaxAvgScalarEventBatchGen +where + NTY: NumOps, +{ + fn filter_fitting_inside(self, fit_range: NanoRange) -> Option { + todo!() + } +} + +impl WithLen for MinMaxAvgScalarEventBatchGen +where + NTY: NumOps, +{ + fn len(&self) -> usize { + todo!() + } +} + +impl Appendable for MinMaxAvgScalarEventBatchGen +where + NTY: NumOps, +{ + fn empty() -> Self { + todo!() + } + + fn append(&mut self, src: &Self) { + todo!() + } +} + +impl RangeOverlapInfo for MinMaxAvgScalarEventBatchGen +where + NTY: NumOps, +{ + fn ends_before(&self, range: NanoRange) -> bool { + todo!() + } + + fn ends_after(&self, range: NanoRange) -> bool { + todo!() + } + + fn starts_after(&self, range: NanoRange) -> bool { + todo!() + } +} + impl Framable for Result>>, Error> where NTY: NumOps + Serialize, @@ -210,11 +271,11 @@ impl EventValueShape for EventValuesDim1Case where NTY: NumOps + NumFromBytes, { - type NumXAggToSingleBin = ProcBB; + type NumXAggToSingleBin = WaveXBinner; + // TODO: type NumXAggToNBins = ProcBB; } -// TODO why not Serialize? // TODO add pulse. // TODO change name, it's not only about values, but more like batch of whole events. #[derive(Serialize, Deserialize)] @@ -284,6 +345,12 @@ impl RangeOverlapInfo for EventValues { } } +impl FilterFittingInside for EventValues { + fn filter_fitting_inside(self, fit_range: NanoRange) -> Option { + todo!() + } +} + impl PushableIndex for EventValues where NTY: NumOps, @@ -308,6 +375,31 @@ where } } +impl ReadableFromFile for EventValues +where + NTY: NumOps, +{ + fn read_from_file(file: File) -> Result, Error> { + todo!() + } + + fn from_buf(buf: &[u8]) -> Result { + todo!() + } +} + +impl TimeBinnableType for EventValues +where + NTY: NumOps, +{ + type Output = MinMaxAvgBins; + type Aggregator = MinMaxAvgAggregator; + + fn aggregator(range: NanoRange) -> Self::Aggregator { + Self::Aggregator::new(range) + } +} + pub struct EventsDecodedStream where NTY: NumOps + NumFromBytes, diff --git a/disk/src/frame/makeframe.rs b/disk/src/frame/makeframe.rs index 9e32265..4b25d15 100644 --- a/disk/src/frame/makeframe.rs +++ b/disk/src/frame/makeframe.rs @@ -78,14 +78,14 @@ impl FrameType for Sitemty { impl FrameType for Sitemty> where - NTY: SubFrId, + NTY: NumOps, { const FRAME_TYPE_ID: u32 = 0x400 + NTY::SUB; } impl FrameType for Sitemty> where - NTY: SubFrId, + NTY: NumOps, { const FRAME_TYPE_ID: u32 = 0x500 + NTY::SUB; } @@ -197,7 +197,7 @@ pub fn make_term_frame() -> BytesMut { buf } -pub fn decode_frame(frame: &InMemoryFrame, frame_type: u32) -> Result +pub fn decode_frame(frame: &InMemoryFrame) -> Result where T: FrameType + DeserializeOwned, { diff --git a/disk/src/merge.rs b/disk/src/merge.rs index db47d07..7eaa54b 100644 --- a/disk/src/merge.rs +++ b/disk/src/merge.rs @@ -11,7 +11,7 @@ use std::collections::VecDeque; use std::pin::Pin; use std::task::{Context, Poll}; -pub mod mergefromremote; +pub mod mergedfromremotes; enum MergedCurVal { None, diff --git a/disk/src/merge/mergefromremote.rs b/disk/src/merge/mergedfromremotes.rs similarity index 100% rename from disk/src/merge/mergefromremote.rs rename to disk/src/merge/mergedfromremotes.rs diff --git a/disk/src/raw.rs b/disk/src/raw.rs index 3bee2b0..d99cf28 100644 --- a/disk/src/raw.rs +++ b/disk/src/raw.rs @@ -9,7 +9,7 @@ use crate::agg::streams::StreamItem; use crate::binned::{EventsNodeProcessor, RangeCompletableItem, StreamKind}; use crate::frame::inmem::InMemoryFrameAsyncReadStream; use crate::frame::makeframe::{make_frame, make_term_frame, FrameType}; -use crate::raw::bffr::EventsFromFrames; +use crate::raw::eventsfromframes::EventsFromFrames; use crate::Sitemty; use err::Error; use futures_core::Stream; @@ -21,8 +21,8 @@ use tokio::net::TcpStream; #[allow(unused_imports)] use tracing::{debug, error, info, span, trace, warn, Level}; -pub mod bffr; pub mod conn; +pub mod eventsfromframes; /** Query parameters to request (optionally) X-processed, but not T-processed events. diff --git a/disk/src/raw/conn.rs b/disk/src/raw/conn.rs index c578086..b60b630 100644 --- a/disk/src/raw/conn.rs +++ b/disk/src/raw/conn.rs @@ -1,4 +1,5 @@ use crate::agg::binnedx::IntoBinnedXBins1; +use crate::agg::enp::{Identity, WaveXBinner}; use crate::agg::eventbatch::MinMaxAvgScalarEventBatch; use crate::agg::streams::StreamItem; use crate::agg::IntoDim1F32Stream; @@ -104,6 +105,9 @@ fn make_num_pipeline_stream_evs( where NTY: NumOps + NumFromBytes + 'static, END: Endianness + 'static, + + // TODO + // Can this work? EVS: EventValueShape + EventValueFromBytes + 'static, ENP: EventsNodeProcessor>::Output>, Sitemty<::Output>: Framable + 'static, @@ -136,12 +140,14 @@ macro_rules! pipe4 { $end, $evs<$nty>, <$evs<$nty> as EventValueShape<$nty, $end>>::NumXAggToSingleBin, + //Identity<$nty>, >($evsv, $event_blobs), AggKind::DimXBinsN(_) => make_num_pipeline_stream_evs::< $nty, $end, $evs<$nty>, <$evs<$nty> as EventValueShape<$nty, $end>>::NumXAggToSingleBin, + //WaveXBinner<$nty>, >($evsv, $event_blobs), } }; @@ -161,6 +167,9 @@ macro_rules! pipe3 { ) } Shape::Wave(n) => { + // TODO + // Issue is that I try to generate too many combinations. + // e.g. I try to generic code for the combination of Shape::Scalar with WaveXBinner which does not match. pipe4!( $nty, $end, @@ -229,8 +238,7 @@ async fn events_conn_handler_inner_try( error!("missing command frame"); return Err((Error::with_msg("missing command frame"), netout))?; } - let frame_type = ::FRAME_TYPE_ID; - let qitem: EventQueryJsonStringFrame = match decode_frame(&frames[0], frame_type) { + let qitem: EventQueryJsonStringFrame = match decode_frame(&frames[0]) { Ok(k) => k, Err(e) => return Err((e, netout).into()), }; @@ -275,7 +283,7 @@ async fn events_conn_handler_inner_try( compression: entry.is_compressed, }; - if false { + if true { // TODO use a requested buffer size let buffer_size = 1024 * 4; let event_chunker_conf = EventChunkerConf::new(ByteSize::kb(1024)); @@ -288,10 +296,6 @@ async fn events_conn_handler_inner_try( event_chunker_conf, ); let shape = entry.to_shape().unwrap(); - // TODO - // First, generalize over the number types. - // Then return boxed trait objects from the stream which are MakeFrame. - // The writeout does not need to be generic. let mut p1 = pipe1!(entry.scalar_type, entry.byte_order, shape, evq.agg_kind, event_blobs); while let Some(item) = p1.next().await { let item = item.make_frame(); @@ -316,7 +320,7 @@ async fn events_conn_handler_inner_try( } Ok(()) } else { - // TODO use a requested buffer size + // TODO remove this scope after refactor. let buffer_size = 1024 * 4; let event_chunker_conf = EventChunkerConf::new(ByteSize::kb(1024)); let s1 = EventBlobsComplete::new( diff --git a/disk/src/raw/bffr.rs b/disk/src/raw/eventsfromframes.rs similarity index 87% rename from disk/src/raw/bffr.rs rename to disk/src/raw/eventsfromframes.rs index 7fb8919..992c5fc 100644 --- a/disk/src/raw/bffr.rs +++ b/disk/src/raw/eventsfromframes.rs @@ -13,7 +13,6 @@ use std::pin::Pin; use std::task::{Context, Poll}; use tokio::io::AsyncRead; -// TODO remove usage of SK, no longer needed. pub struct EventsFromFrames where T: AsyncRead + Unpin, @@ -21,7 +20,7 @@ where inp: InMemoryFrameAsyncReadStream, errored: bool, completed: bool, - _m2: PhantomData, + _m1: PhantomData, } impl EventsFromFrames @@ -33,7 +32,7 @@ where inp, errored: false, completed: false, - _m2: PhantomData, + _m1: PhantomData, } } } @@ -41,10 +40,7 @@ where impl Stream for EventsFromFrames where T: AsyncRead + Unpin, - //SK: StreamKind, I: DeserializeOwned + Unpin, - // TODO see binned.rs better to express it on trait? - //Result::XBinnedEvents>>, Error>: FrameType, Sitemty: FrameType, { type Item = Sitemty; @@ -53,7 +49,7 @@ where use Poll::*; loop { break if self.completed { - panic!("EventsFromFrames poll_next on completed"); + panic!("poll_next on completed"); } else if self.errored { self.completed = true; Ready(None) @@ -62,7 +58,7 @@ where Ready(Some(Ok(item))) => match item { StreamItem::Log(item) => Ready(Some(Ok(StreamItem::Log(item)))), StreamItem::Stats(item) => Ready(Some(Ok(StreamItem::Stats(item)))), - StreamItem::DataItem(frame) => match decode_frame::>(&frame, 0) { + StreamItem::DataItem(frame) => match decode_frame::>(&frame) { Ok(item) => match item { Ok(item) => Ready(Some(Ok(item))), Err(e) => {