diff --git a/daqbuffer/src/test/binnedjson.rs b/daqbuffer/src/test/binnedjson.rs index c3892d0..add377f 100644 --- a/daqbuffer/src/test/binnedjson.rs +++ b/daqbuffer/src/test/binnedjson.rs @@ -21,6 +21,7 @@ async fn get_binned_json_0_inner() -> Result<(), Error> { "1970-01-01T00:20:10.000Z", "1970-01-01T01:20:30.000Z", 10, + AggKind::DimXBins1, cluster, 13, true, @@ -41,6 +42,7 @@ async fn get_binned_json_1_inner() -> Result<(), Error> { "1970-01-01T00:20:10.000Z", "1970-01-01T01:20:45.000Z", 10, + AggKind::DimXBins1, cluster, 13, true, @@ -48,17 +50,38 @@ async fn get_binned_json_1_inner() -> Result<(), Error> { .await } +#[test] +fn get_binned_json_2() { + taskrun::run(get_binned_json_2_inner()).unwrap(); +} + +async fn get_binned_json_2_inner() -> Result<(), Error> { + let rh = require_test_hosts_running()?; + let cluster = &rh.cluster; + get_binned_json_common( + "wave-f64-be-n21", + "1970-01-01T00:20:10.000Z", + "1970-01-01T02:20:10.000Z", + 2, + AggKind::DimXBinsN(0), + cluster, + 2, + true, + ) + .await +} + async fn get_binned_json_common( channel_name: &str, beg_date: &str, end_date: &str, bin_count: u32, + agg_kind: AggKind, cluster: &Cluster, expect_bin_count: u32, expect_finalised_range: bool, ) -> Result<(), Error> { let t1 = Utc::now(); - let agg_kind = AggKind::DimXBins1; let node0 = &cluster.nodes[0]; let beg_date: DateTime = beg_date.parse()?; let end_date: DateTime = end_date.parse()?; diff --git a/disk/src/agg/binnedt.rs b/disk/src/agg/binnedt.rs index 8489dcb..7add8a1 100644 --- a/disk/src/agg/binnedt.rs +++ b/disk/src/agg/binnedt.rs @@ -28,7 +28,7 @@ pub trait TimeBinnableType: { type Output: TimeBinnableType; type Aggregator: TimeBinnableTypeAggregator + Send + Unpin; - fn aggregator(range: NanoRange) -> Self::Aggregator; + fn aggregator(range: NanoRange, bin_count: usize) -> Self::Aggregator; } pub struct TBinnerStream @@ -38,6 +38,7 @@ where { inp: Pin>, spec: BinnedRange, + bin_count: usize, curbin: u32, left: Option>>>, aggtor: Option<::Aggregator>, @@ -55,14 +56,15 @@ where S: Stream> + Send + Unpin + 'static, TBT: TimeBinnableType, { - pub fn new(inp: S, spec: BinnedRange) -> Self { + pub fn new(inp: S, spec: BinnedRange, bin_count: usize) -> Self { let range = spec.get_range(0); Self { inp: Box::pin(inp), spec, + bin_count, curbin: 0, left: None, - aggtor: Some(::aggregator(range)), + aggtor: Some(::aggregator(range, bin_count)), tmp_agg_results: VecDeque::new(), inp_completed: false, all_bins_emitted: false, @@ -90,7 +92,7 @@ where let range = self.spec.get_range(self.curbin); let ret = self .aggtor - .replace(::aggregator(range)) + .replace(::aggregator(range, self.bin_count)) .unwrap() .result(); // TODO should we accumulate bins before emit? Maybe not, we want to stay responsive. diff --git a/disk/src/agg/enp.rs b/disk/src/agg/enp.rs index 8cd5139..e89d1f1 100644 --- a/disk/src/agg/enp.rs +++ b/disk/src/agg/enp.rs @@ -3,12 +3,12 @@ use crate::agg::streams::Appendable; use crate::agg::{Fits, FitsInside}; use crate::binned::dim1::MinMaxAvgDim1Bins; use crate::binned::{ - EventsNodeProcessor, FilterFittingInside, MinMaxAvgBins, NumOps, PushableIndex, RangeOverlapInfo, ReadPbv, - ReadableFromFile, WithLen, WithTimestamps, + EventsNodeProcessor, FilterFittingInside, MinMaxAvgBins, MinMaxAvgWaveBins, NumOps, PushableIndex, + RangeOverlapInfo, ReadPbv, ReadableFromFile, WithLen, WithTimestamps, }; use crate::decode::EventValues; use err::Error; -use netpod::NanoRange; +use netpod::{NanoRange, Shape}; use serde::{Deserialize, Serialize}; use std::marker::PhantomData; use tokio::fs::File; @@ -24,11 +24,16 @@ where type Input = NTY; type Output = EventValues; - fn process(inp: EventValues) -> Self::Output { + fn create(shape: Shape) -> Self { + Self { _m1: PhantomData } + } + + fn process(&self, inp: EventValues) -> Self::Output { inp } } +// TODO rename Scalar -> Dim0 #[derive(Serialize, Deserialize)] pub struct XBinnedScalarEvents { tss: Vec, @@ -169,7 +174,7 @@ where type Output = MinMaxAvgBins; type Aggregator = XBinnedScalarEventsAggregator; - fn aggregator(range: NanoRange) -> Self::Aggregator { + fn aggregator(range: NanoRange, bin_count: usize) -> Self::Aggregator { Self::Aggregator::new(range) } } @@ -269,6 +274,241 @@ where } } +// TODO rename Wave -> Dim1 +#[derive(Serialize, Deserialize)] +pub struct XBinnedWaveEvents { + tss: Vec, + mins: Vec>, + maxs: Vec>, + avgs: Vec>, +} + +impl XBinnedWaveEvents { + pub fn empty() -> Self { + Self { + tss: vec![], + mins: vec![], + maxs: vec![], + avgs: vec![], + } + } +} + +impl WithLen for XBinnedWaveEvents { + fn len(&self) -> usize { + self.tss.len() + } +} + +impl WithTimestamps for XBinnedWaveEvents { + fn ts(&self, ix: usize) -> u64 { + self.tss[ix] + } +} + +impl RangeOverlapInfo for XBinnedWaveEvents { + fn ends_before(&self, range: NanoRange) -> bool { + match self.tss.last() { + Some(&ts) => ts < range.beg, + None => true, + } + } + + fn ends_after(&self, range: NanoRange) -> bool { + match self.tss.last() { + Some(&ts) => ts >= range.end, + None => panic!(), + } + } + + fn starts_after(&self, range: NanoRange) -> bool { + match self.tss.first() { + Some(&ts) => ts >= range.end, + None => panic!(), + } + } +} + +impl FitsInside for XBinnedWaveEvents { + fn fits_inside(&self, range: NanoRange) -> Fits { + if self.tss.is_empty() { + Fits::Empty + } else { + let t1 = *self.tss.first().unwrap(); + let t2 = *self.tss.last().unwrap(); + if t2 < range.beg { + Fits::Lower + } else if t1 > range.end { + Fits::Greater + } else if t1 < range.beg && t2 > range.end { + Fits::PartlyLowerAndGreater + } else if t1 < range.beg { + Fits::PartlyLower + } else if t2 > range.end { + Fits::PartlyGreater + } else { + Fits::Inside + } + } + } +} + +impl FilterFittingInside for XBinnedWaveEvents { + fn filter_fitting_inside(self, fit_range: NanoRange) -> Option { + match self.fits_inside(fit_range) { + Fits::Inside | Fits::PartlyGreater | Fits::PartlyLower | Fits::PartlyLowerAndGreater => Some(self), + _ => None, + } + } +} + +impl PushableIndex for XBinnedWaveEvents +where + NTY: NumOps, +{ + fn push_index(&mut self, src: &Self, ix: usize) { + self.tss.push(src.tss[ix]); + self.mins.push(src.mins[ix]); + self.maxs.push(src.maxs[ix]); + self.avgs.push(src.avgs[ix]); + } +} + +impl Appendable for XBinnedWaveEvents +where + NTY: NumOps, +{ + fn empty() -> Self { + Self::empty() + } + + fn append(&mut self, src: &Self) { + self.tss.extend_from_slice(&src.tss); + self.mins.extend_from_slice(&src.mins); + self.maxs.extend_from_slice(&src.maxs); + self.avgs.extend_from_slice(&src.avgs); + } +} + +impl ReadableFromFile for XBinnedWaveEvents +where + NTY: NumOps, +{ + fn read_from_file(_file: File) -> Result, Error> { + // TODO refactor types such that this impl is not needed. + panic!() + } + + fn from_buf(_buf: &[u8]) -> Result { + panic!() + } +} + +impl TimeBinnableType for XBinnedWaveEvents +where + NTY: NumOps, +{ + type Output = MinMaxAvgWaveBins; + type Aggregator = XBinnedWaveEventsAggregator; + + fn aggregator(range: NanoRange, bin_count: usize) -> Self::Aggregator { + Self::Aggregator::new(range, bin_count) + } +} + +pub struct XBinnedWaveEventsAggregator +where + NTY: NumOps, +{ + range: NanoRange, + count: u64, + min: Vec, + max: Vec, + sum: Vec, + sumc: u64, +} + +impl XBinnedWaveEventsAggregator +where + NTY: NumOps, +{ + pub fn new(range: NanoRange, bin_count: usize) -> Self { + Self { + range, + count: 0, + min: vec![NTY::min_or_nan(); bin_count], + max: vec![NTY::max_or_nan(); bin_count], + sum: vec![0f32; bin_count], + sumc: 0, + } + } +} + +impl TimeBinnableTypeAggregator for XBinnedWaveEventsAggregator +where + NTY: NumOps, +{ + type Input = XBinnedWaveEvents; + type Output = MinMaxAvgWaveBins; + + fn range(&self) -> &NanoRange { + &self.range + } + + fn ingest(&mut self, item: &Self::Input) { + for i1 in 0..item.tss.len() { + let ts = item.tss[i1]; + if ts < self.range.beg { + continue; + } else if ts >= self.range.end { + continue; + } else { + for (i2, v) in item.mins[i1].iter().enumerate() { + if *v < self.min[i2] || self.min[i2].is_nan() { + self.min[i2] = *v; + } + } + for (i2, v) in item.maxs[i1].iter().enumerate() { + if *v > self.max[i2] || self.max[i2].is_nan() { + self.max[i2] = *v; + } + } + for (i2, v) in item.avgs[i1].iter().enumerate() { + if v.is_nan() { + } else { + self.sum[i2] += v; + } + } + self.sumc += 1; + self.count += 1; + } + } + } + + fn result(self) -> Self::Output { + if self.sumc == 0 { + Self::Output { + ts1s: vec![self.range.beg], + ts2s: vec![self.range.end], + counts: vec![self.count], + mins: vec![None], + maxs: vec![None], + avgs: vec![None], + } + } else { + let avg = self.sum.iter().map(|k| *k / self.sumc as f32).collect(); + Self::Output { + ts1s: vec![self.range.beg], + ts2s: vec![self.range.end], + counts: vec![self.count], + mins: vec![Some(self.min)], + maxs: vec![Some(self.max)], + avgs: vec![Some(avg)], + } + } + } +} + #[derive(Serialize, Deserialize)] pub struct WaveEvents { pub tss: Vec, @@ -398,8 +638,8 @@ where type Output = MinMaxAvgDim1Bins; type Aggregator = WaveEventsAggregator; - fn aggregator(range: NanoRange) -> Self::Aggregator { - Self::Aggregator::new(range) + fn aggregator(range: NanoRange, bin_count: usize) -> Self::Aggregator { + Self::Aggregator::new(range, bin_count) } } @@ -419,11 +659,12 @@ impl WaveEventsAggregator where NTY: NumOps, { - pub fn new(range: NanoRange) -> Self { + pub fn new(range: NanoRange, bin_count: usize) -> Self { Self { range, count: 0, - min: None, + // TODO create the right number of bins right here: + min: err::todoval(), max: None, sumc: 0, sum: None, @@ -525,9 +766,13 @@ where type Input = Vec; type Output = XBinnedScalarEvents; - fn process(inp: EventValues) -> Self::Output { + fn create(shape: Shape) -> Self { + Self { _m1: PhantomData } + } + + fn process(&self, inp: EventValues) -> Self::Output { let nev = inp.tss.len(); - let mut ret = XBinnedScalarEvents { + let mut ret = Self::Output { tss: inp.tss, xbincount: Vec::with_capacity(nev), mins: Vec::with_capacity(nev), @@ -535,6 +780,8 @@ where avgs: Vec::with_capacity(nev), }; for i1 in 0..nev { + // TODO why do I work here with Option? + err::todo(); let mut min = None; let mut max = None; let mut sum = 0f32; @@ -584,6 +831,7 @@ where } pub struct WaveNBinner { + bin_count: usize, _m1: PhantomData, } @@ -592,11 +840,60 @@ where NTY: NumOps, { type Input = Vec; - // TODO need new container type for this case: - type Output = XBinnedScalarEvents; + type Output = XBinnedWaveEvents; - fn process(_inp: EventValues) -> Self::Output { - err::todoval() + fn create(shape: Shape) -> Self { + // TODO get rid of panic potential + let bin_count = if let Shape::Wave(n) = shape { n } else { panic!() } as usize; + Self { + bin_count, + _m1: PhantomData, + } + } + + fn process(&self, inp: EventValues) -> Self::Output { + let nev = inp.tss.len(); + let mut ret = Self::Output { + tss: inp.tss, + mins: Vec::with_capacity(nev), + maxs: Vec::with_capacity(nev), + avgs: Vec::with_capacity(nev), + }; + for i1 in 0..nev { + let mut min = vec![NTY::min_or_nan(); self.bin_count]; + let mut max = vec![NTY::max_or_nan(); self.bin_count]; + let mut sum = vec![0f32; self.bin_count]; + let mut sumc = vec![0; self.bin_count]; + for (i2, &v) in inp.values[i1].iter().enumerate() { + let i3 = i2 * self.bin_count / inp.values[i1].len(); + if v < min[i3] { + min[i3] = v; + } + if v > max[i3] { + max[i3] = v; + } + if v.is_nan() { + } else { + sum[i3] += v.as_(); + sumc[i3] += 1; + } + } + ret.mins.push(min); + ret.maxs.push(max); + let avg = sum + .iter() + .enumerate() + .map(|(i3, &k)| { + if sumc[i3] > 0 { + sum[i3] / sumc[i3] as f32 + } else { + f32::NAN + } + }) + .collect(); + ret.avgs.push(avg); + } + ret } } @@ -611,7 +908,11 @@ where type Input = Vec; type Output = WaveEvents; - fn process(inp: EventValues) -> Self::Output { + fn create(shape: Shape) -> Self { + Self { _m1: PhantomData } + } + + fn process(&self, inp: EventValues) -> Self::Output { if false { let n = if inp.values.len() > 0 { inp.values[0].len() } else { 0 }; let n = if n > 5 { 5 } else { n }; diff --git a/disk/src/binned.rs b/disk/src/binned.rs index 917a683..47f6764 100644 --- a/disk/src/binned.rs +++ b/disk/src/binned.rs @@ -23,13 +23,14 @@ use futures_util::{FutureExt, StreamExt}; use netpod::log::*; use netpod::timeunits::SEC; use netpod::{ - BinnedRange, ByteOrder, NanoRange, NodeConfigCached, PerfOpts, PreBinnedPatchIterator, PreBinnedPatchRange, - ScalarType, Shape, + AggKind, BinnedRange, ByteOrder, NanoRange, NodeConfigCached, PerfOpts, PreBinnedPatchIterator, + PreBinnedPatchRange, ScalarType, Shape, }; -use num_traits::{AsPrimitive, Bounded, Zero}; +use num_traits::{AsPrimitive, Bounded, Float, Zero}; use parse::channelconfig::{extract_matching_config_entry, read_local_config, MatchingConfigEntry}; use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize, Serializer}; +use std::fmt; use std::fmt::Debug; use std::future::Future; use std::marker::PhantomData; @@ -122,8 +123,13 @@ where range: query.range().clone(), agg_kind: query.agg_kind().clone(), }; + let x_bin_count = if let AggKind::DimXBinsN(n) = query.agg_kind() { + *n as usize + } else { + 0 + }; let s = MergedFromRemotes::::new(evq, perf_opts, node_config.node_config.cluster.clone()); - let s = TBinnerStream::<_, ::Output>::new(s, range); + let s = TBinnerStream::<_, ::Output>::new(s, range, x_bin_count); let ret = BinnedResponseStat { stream: Box::pin(s), bin_count, @@ -770,13 +776,47 @@ pub trait NumOps: + Serialize + DeserializeOwned { + fn min_or_nan() -> Self; + fn max_or_nan() -> Self; + fn is_nan(&self) -> bool; } -impl NumOps for T where - T: Send + Unpin + Debug + Zero + AsPrimitive + Bounded + PartialOrd + SubFrId + Serialize + DeserializeOwned -{ +fn tmp() {} +macro_rules! impl_num_ops { + ($ty:ident, $min_or_nan:ident, $max_or_nan:ident, $is_nan:ident) => { + impl NumOps for $ty { + fn min_or_nan() -> Self { + $ty::$min_or_nan + } + fn max_or_nan() -> Self { + $ty::$max_or_nan + } + fn is_nan(&self) -> bool { + $is_nan(self) + } + } + }; } +fn is_nan_int(x: &T) -> bool { + false +} + +fn is_nan_float(x: &T) -> bool { + x.is_nan() +} + +impl_num_ops!(u8, MIN, MAX, is_nan_int); +impl_num_ops!(u16, MIN, MAX, is_nan_int); +impl_num_ops!(u32, MIN, MAX, is_nan_int); +impl_num_ops!(u64, MIN, MAX, is_nan_int); +impl_num_ops!(i8, MIN, MAX, is_nan_int); +impl_num_ops!(i16, MIN, MAX, is_nan_int); +impl_num_ops!(i32, MIN, MAX, is_nan_int); +impl_num_ops!(i64, MIN, MAX, is_nan_int); +impl_num_ops!(f32, NAN, NAN, is_nan_float); +impl_num_ops!(f64, NAN, NAN, is_nan_float); + pub trait EventsDecoder { type Output; fn ingest(&mut self, event: &[u8]); @@ -786,7 +826,8 @@ pub trait EventsDecoder { pub trait EventsNodeProcessor: Send + Unpin { type Input; type Output: Send + Unpin + DeserializeOwned + WithTimestamps + TimeBinnableType; - fn process(inp: EventValues) -> Self::Output; + fn create(shape: Shape) -> Self; + fn process(&self, inp: EventValues) -> Self::Output; } pub trait TimeBins: Send + Unpin + WithLen + Appendable + FilterFittingInside { @@ -799,16 +840,17 @@ pub struct MinMaxAvgBins { pub ts1s: Vec, pub ts2s: Vec, pub counts: Vec, + // TODO get rid of Option: pub mins: Vec>, pub maxs: Vec>, pub avgs: Vec>, } -impl std::fmt::Debug for MinMaxAvgBins +impl fmt::Debug for MinMaxAvgBins where - NTY: std::fmt::Debug, + NTY: fmt::Debug, { - fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { write!( fmt, "MinMaxAvgBins count {} ts1s {:?} ts2s {:?} counts {:?} mins {:?} maxs {:?} avgs {:?}", @@ -951,7 +993,7 @@ where type Output = MinMaxAvgBins; type Aggregator = MinMaxAvgBinsAggregator; - fn aggregator(range: NanoRange) -> Self::Aggregator { + fn aggregator(range: NanoRange, bin_count: usize) -> Self::Aggregator { Self::Aggregator::new(range) } } @@ -1103,7 +1145,8 @@ impl EventValuesAggregator { Self { range, count: 0, - min: None, + // TODO get rid of Option + min: err::todoval(), max: None, sumc: 0, sum: 0f32, @@ -1282,3 +1325,400 @@ pub enum RangeCompletableItem { RangeComplete, Data(T), } + +#[derive(Clone, Serialize, Deserialize)] +pub struct MinMaxAvgWaveBins { + pub ts1s: Vec, + pub ts2s: Vec, + pub counts: Vec, + pub mins: Vec>>, + pub maxs: Vec>>, + pub avgs: Vec>>, +} + +impl fmt::Debug for MinMaxAvgWaveBins +where + NTY: fmt::Debug, +{ + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + write!( + fmt, + "MinMaxAvgBins count {} ts1s {:?} ts2s {:?} counts {:?} mins {:?} maxs {:?} avgs {:?}", + self.ts1s.len(), + self.ts1s.iter().map(|k| k / SEC).collect::>(), + self.ts2s.iter().map(|k| k / SEC).collect::>(), + self.counts, + self.mins, + self.maxs, + self.avgs, + ) + } +} + +impl MinMaxAvgWaveBins { + pub fn empty() -> Self { + Self { + ts1s: vec![], + ts2s: vec![], + counts: vec![], + mins: vec![], + maxs: vec![], + avgs: vec![], + } + } +} + +impl FitsInside for MinMaxAvgWaveBins { + fn fits_inside(&self, range: NanoRange) -> Fits { + if self.ts1s.is_empty() { + Fits::Empty + } else { + let t1 = *self.ts1s.first().unwrap(); + let t2 = *self.ts2s.last().unwrap(); + if t2 <= range.beg { + Fits::Lower + } else if t1 >= range.end { + Fits::Greater + } else if t1 < range.beg && t2 > range.end { + Fits::PartlyLowerAndGreater + } else if t1 < range.beg { + Fits::PartlyLower + } else if t2 > range.end { + Fits::PartlyGreater + } else { + Fits::Inside + } + } + } +} + +impl FilterFittingInside for MinMaxAvgWaveBins { + fn filter_fitting_inside(self, fit_range: NanoRange) -> Option { + match self.fits_inside(fit_range) { + Fits::Inside | Fits::PartlyGreater | Fits::PartlyLower | Fits::PartlyLowerAndGreater => Some(self), + _ => None, + } + } +} + +impl RangeOverlapInfo for MinMaxAvgWaveBins { + fn ends_before(&self, range: NanoRange) -> bool { + match self.ts2s.last() { + Some(&ts) => ts <= range.beg, + None => true, + } + } + + fn ends_after(&self, range: NanoRange) -> bool { + match self.ts2s.last() { + Some(&ts) => ts > range.end, + None => panic!(), + } + } + + fn starts_after(&self, range: NanoRange) -> bool { + match self.ts1s.first() { + Some(&ts) => ts >= range.end, + None => panic!(), + } + } +} + +impl TimeBins for MinMaxAvgWaveBins +where + NTY: NumOps, +{ + fn ts1s(&self) -> &Vec { + &self.ts1s + } + + fn ts2s(&self) -> &Vec { + &self.ts2s + } +} + +impl WithLen for MinMaxAvgWaveBins { + fn len(&self) -> usize { + self.ts1s.len() + } +} + +impl Appendable for MinMaxAvgWaveBins +where + NTY: NumOps, +{ + fn empty() -> Self { + Self::empty() + } + + fn append(&mut self, src: &Self) { + self.ts1s.extend_from_slice(&src.ts1s); + self.ts2s.extend_from_slice(&src.ts2s); + self.counts.extend_from_slice(&src.counts); + self.mins.extend_from_slice(&src.mins); + self.maxs.extend_from_slice(&src.maxs); + self.avgs.extend_from_slice(&src.avgs); + } +} + +impl ReadableFromFile for MinMaxAvgWaveBins +where + NTY: NumOps, +{ + // TODO this function is not needed in the trait: + fn read_from_file(file: File) -> Result, Error> { + Ok(ReadPbv::new(file)) + } + + fn from_buf(buf: &[u8]) -> Result { + let dec = serde_cbor::from_slice(&buf)?; + Ok(dec) + } +} + +impl TimeBinnableType for MinMaxAvgWaveBins +where + NTY: NumOps, +{ + type Output = MinMaxAvgWaveBins; + type Aggregator = MinMaxAvgWaveBinsAggregator; + + fn aggregator(range: NanoRange, x_bin_count: usize) -> Self::Aggregator { + Self::Aggregator::new(range, x_bin_count) + } +} + +impl ToJsonResult for Sitemty> +where + NTY: NumOps, +{ + fn to_json_result(&self) -> Result, Error> { + Ok(Box::new(serde_json::Value::String(format!( + "MinMaxAvgBins/non-json-item" + )))) + } +} + +pub struct MinMaxAvgWaveBinsCollected { + _m1: PhantomData, +} + +impl MinMaxAvgWaveBinsCollected { + pub fn new() -> Self { + Self { _m1: PhantomData } + } +} + +#[derive(Serialize)] +pub struct MinMaxAvgWaveBinsCollectedResult { + ts0: u64, + tsoff: Vec, + //ts_bin_edges: Vec, + counts: Vec, + mins: Vec>>, + maxs: Vec>>, + avgs: Vec>>, + #[serde(skip_serializing_if = "Bool::is_false", rename = "finalisedRange")] + finalised_range: bool, + #[serde(skip_serializing_if = "Zero::is_zero", rename = "missingBins")] + missing_bins: u32, + #[serde(skip_serializing_if = "Option::is_none", rename = "continueAt")] + //continue_at: Option, + continue_at: Option, +} + +pub struct MinMaxAvgWaveBinsCollector { + bin_count_exp: u32, + timed_out: bool, + range_complete: bool, + vals: MinMaxAvgWaveBins, + _m1: PhantomData, +} + +impl MinMaxAvgWaveBinsCollector { + pub fn new(bin_count_exp: u32) -> Self { + Self { + bin_count_exp, + timed_out: false, + range_complete: false, + vals: MinMaxAvgWaveBins::::empty(), + _m1: PhantomData, + } + } +} + +impl WithLen for MinMaxAvgWaveBinsCollector +where + NTY: NumOps + Serialize, +{ + fn len(&self) -> usize { + self.vals.ts1s.len() + } +} + +impl Collector for MinMaxAvgWaveBinsCollector +where + NTY: NumOps + Serialize, +{ + type Input = MinMaxAvgWaveBins; + type Output = MinMaxAvgWaveBinsCollectedResult; + + fn ingest(&mut self, src: &Self::Input) { + Appendable::append(&mut self.vals, src); + } + + fn set_range_complete(&mut self) { + self.range_complete = true; + } + + fn set_timed_out(&mut self) { + self.timed_out = true; + } + + fn result(self) -> Result { + let ts0 = self.vals.ts1s.first().map_or(0, |k| *k / SEC); + let bin_count = self.vals.ts1s.len() as u32; + let mut tsoff: Vec<_> = self.vals.ts1s.iter().map(|k| *k - ts0 * SEC).collect(); + if let Some(&k) = self.vals.ts2s.last() { + tsoff.push(k - ts0 * SEC); + } + let tsoff = tsoff; + let _iso: Vec<_> = tsoff + .iter() + .map(|&k| IsoDateTime(Utc.timestamp_nanos(k as i64))) + .collect(); + let continue_at = if self.vals.ts1s.len() < self.bin_count_exp as usize { + match tsoff.last() { + Some(k) => Some(k.clone()), + None => Err(Error::with_msg("partial_content but no bin in result"))?, + } + } else { + None + }; + let ret = MinMaxAvgWaveBinsCollectedResult { + ts0, + tsoff, + counts: self.vals.counts, + mins: self.vals.mins, + maxs: self.vals.maxs, + avgs: self.vals.avgs, + finalised_range: self.range_complete, + missing_bins: self.bin_count_exp - bin_count, + continue_at, + }; + Ok(ret) + } +} + +impl Collectable for MinMaxAvgWaveBins +where + NTY: NumOps + Serialize, +{ + type Collector = MinMaxAvgWaveBinsCollector; + + fn new_collector(bin_count_exp: u32) -> Self::Collector { + Self::Collector::new(bin_count_exp) + } +} + +pub struct MinMaxAvgWaveBinsAggregator { + range: NanoRange, + count: u64, + min: Vec, + max: Vec, + sum: Vec, + sumc: u64, +} + +impl MinMaxAvgWaveBinsAggregator +where + NTY: NumOps, +{ + pub fn new(range: NanoRange, x_bin_count: usize) -> Self { + Self { + range, + count: 0, + min: vec![NTY::min_or_nan(); x_bin_count], + max: vec![NTY::max_or_nan(); x_bin_count], + sum: vec![0f32; x_bin_count], + sumc: 0, + } + } +} + +impl TimeBinnableTypeAggregator for MinMaxAvgWaveBinsAggregator +where + NTY: NumOps, +{ + type Input = MinMaxAvgWaveBins; + type Output = MinMaxAvgWaveBins; + + fn range(&self) -> &NanoRange { + &self.range + } + + fn ingest(&mut self, item: &Self::Input) { + for i1 in 0..item.ts1s.len() { + if item.ts2s[i1] <= self.range.beg { + continue; + } else if item.ts1s[i1] >= self.range.end { + continue; + } else { + // the input can contain bins where no events did fall into. + match &item.mins[i1] { + None => {} + Some(inp) => { + for (a, b) in self.min.iter_mut().zip(inp.iter()) { + if *b < *a { + *a = *b; + } + } + } + } + match &item.maxs[i1] { + None => {} + Some(inp) => { + for (a, b) in self.max.iter_mut().zip(inp.iter()) { + if *b > *a { + *a = *b; + } + } + } + } + match &item.avgs[i1] { + None => {} + Some(inp) => { + for (a, b) in self.sum.iter_mut().zip(inp.iter()) { + *a += *b; + } + } + } + self.sumc += 1; + self.count += item.counts[i1]; + } + } + } + + fn result(self) -> Self::Output { + if self.sumc == 0 { + Self::Output { + ts1s: vec![self.range.beg], + ts2s: vec![self.range.end], + counts: vec![self.count], + mins: vec![None], + maxs: vec![None], + avgs: vec![None], + } + } else { + let avg = self.sum.iter().map(|j| *j / self.sumc as f32).collect(); + Self::Output { + ts1s: vec![self.range.beg], + ts2s: vec![self.range.end], + counts: vec![self.count], + mins: vec![Some(self.min)], + maxs: vec![Some(self.max)], + avgs: vec![Some(avg)], + } + } + } +} diff --git a/disk/src/binned/dim1.rs b/disk/src/binned/dim1.rs index fc70776..5fd0537 100644 --- a/disk/src/binned/dim1.rs +++ b/disk/src/binned/dim1.rs @@ -173,7 +173,7 @@ where type Output = MinMaxAvgDim1Bins; type Aggregator = MinMaxAvgDim1BinsAggregator; - fn aggregator(range: NanoRange) -> Self::Aggregator { + fn aggregator(range: NanoRange, bin_count: usize) -> Self::Aggregator { Self::Aggregator::new(range) } } @@ -317,11 +317,12 @@ pub struct MinMaxAvgDim1BinsAggregator { } impl MinMaxAvgDim1BinsAggregator { - pub fn new(range: NanoRange) -> Self { + pub fn new(range: NanoRange, bin_count: usize) -> Self { Self { range, count: 0, - min: None, + // TODO get rid of Option + min: err::todoval(), max: None, sumc: 0, sum: None, diff --git a/disk/src/binned/query.rs b/disk/src/binned/query.rs index dcf6704..992a13f 100644 --- a/disk/src/binned/query.rs +++ b/disk/src/binned/query.rs @@ -57,11 +57,7 @@ impl PreBinnedQuery { .map_err(|e| Error::with_msg(format!("can not parse diskStatsEveryKb {:?}", e)))?; let ret = Self { patch: PreBinnedPatchCoord::new(bin_t_len, patch_t_len, patch_ix), - agg_kind: params - .get("aggKind") - .map_or(&format!("{}", AggKind::DimXBins1), |k| k) - .parse() - .map_err(|e| Error::with_msg(format!("can not parse aggKind {:?}", e)))?, + agg_kind: agg_kind_from_binning_scheme(¶ms).unwrap_or(AggKind::DimXBins1), channel: channel_from_params(¶ms)?, cache_usage: CacheUsage::from_params(¶ms)?, disk_stats_every: ByteSize::kb(disk_stats_every), @@ -76,11 +72,11 @@ impl PreBinnedQuery { pub fn make_query_string(&self) -> String { format!( - "{}&channelBackend={}&channelName={}&aggKind={}&cacheUsage={}&diskStatsEveryKb={}&reportError={}", + "{}&channelBackend={}&channelName={}&binningScheme={}&cacheUsage={}&diskStatsEveryKb={}&reportError={}", self.patch.to_url_params_strings(), self.channel.backend, self.channel.name, - self.agg_kind, + binning_scheme_string(&self.agg_kind), self.cache_usage, self.disk_stats_every.bytes() / 1024, self.report_error(), @@ -201,6 +197,7 @@ impl BinnedQuery { .parse() .map_err(|e| Error::with_msg(format!("can not parse diskStatsEveryKb {:?}", e)))?; let ret = Self { + channel: channel_from_params(¶ms)?, range: NanoRange { beg: beg_date.parse::>()?.to_nanos(), end: end_date.parse::>()?.to_nanos(), @@ -210,12 +207,7 @@ impl BinnedQuery { .ok_or(Error::with_msg("missing binCount"))? .parse() .map_err(|e| Error::with_msg(format!("can not parse binCount {:?}", e)))?, - agg_kind: params - .get("aggKind") - .map_or(&format!("{}", AggKind::DimXBins1), |k| k) - .parse() - .map_err(|e| Error::with_msg(format!("can not parse aggKind {:?}", e)))?, - channel: channel_from_params(¶ms)?, + agg_kind: agg_kind_from_binning_scheme(¶ms).unwrap_or(AggKind::DimXBins1), cache_usage: CacheUsage::from_params(¶ms)?, disk_stats_every: ByteSize::kb(disk_stats_every), report_error: params @@ -292,7 +284,7 @@ impl BinnedQuery { pub fn url(&self, host: &HostPort) -> String { let date_fmt = "%Y-%m-%dT%H:%M:%S.%3fZ"; format!( - "http://{}:{}/api/4/binned?cacheUsage={}&channelBackend={}&channelName={}&binCount={}&begDate={}&endDate={}&diskStatsEveryKb={}&timeout={}&abortAfterBinCount={}", + "http://{}:{}/api/4/binned?cacheUsage={}&channelBackend={}&channelName={}&binCount={}&begDate={}&endDate={}&binningScheme={}&diskStatsEveryKb={}&timeout={}&abortAfterBinCount={}", host.host, host.port, self.cache_usage, @@ -301,9 +293,35 @@ impl BinnedQuery { self.bin_count, Utc.timestamp_nanos(self.range.beg as i64).format(date_fmt), Utc.timestamp_nanos(self.range.end as i64).format(date_fmt), + binning_scheme_string(&self.agg_kind), self.disk_stats_every.bytes() / 1024, self.timeout.as_millis(), self.abort_after_bin_count, ) } } + +fn binning_scheme_string(agg_kind: &AggKind) -> String { + match agg_kind { + AggKind::Plain => "fullValue".into(), + AggKind::DimXBins1 => "toScalarX".into(), + AggKind::DimXBinsN(n) => format!("binnedXcount{}", n), + } +} + +fn agg_kind_from_binning_scheme(params: &BTreeMap) -> Result { + let key = "binningScheme"; + let s = params + .get(key) + .map_or(Err(Error::with_msg(format!("can not find {}", key))), |k| Ok(k))?; + let ret = if s == "fullValue" { + AggKind::Plain + } else if s == "toScalarX" { + AggKind::DimXBins1 + } else if s.starts_with("binnedXcount") { + AggKind::DimXBinsN(s[12..].parse()?) + } else { + return Err(Error::with_msg("can not extract binningScheme")); + }; + Ok(ret) +} diff --git a/disk/src/decode.rs b/disk/src/decode.rs index 16bcd69..c38968f 100644 --- a/disk/src/decode.rs +++ b/disk/src/decode.rs @@ -295,7 +295,7 @@ where type Output = MinMaxAvgBins; type Aggregator = EventValuesAggregator; - fn aggregator(range: NanoRange) -> Self::Aggregator { + fn aggregator(range: NanoRange, _bin_count: usize) -> Self::Aggregator { Self::Aggregator::new(range) } } diff --git a/disk/src/frame/makeframe.rs b/disk/src/frame/makeframe.rs index edac70f..fd5b1c3 100644 --- a/disk/src/frame/makeframe.rs +++ b/disk/src/frame/makeframe.rs @@ -1,4 +1,4 @@ -use crate::agg::enp::{WaveEvents, XBinnedScalarEvents}; +use crate::agg::enp::{WaveEvents, XBinnedScalarEvents, XBinnedWaveEvents}; use crate::agg::eventbatch::MinMaxAvgScalarEventBatch; use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch; use crate::agg::streams::StreamItem; @@ -104,6 +104,13 @@ where const FRAME_TYPE_ID: u32 = 0x800 + NTY::SUB; } +impl FrameType for Sitemty> +where + NTY: SubFrId, +{ + const FRAME_TYPE_ID: u32 = 0x800 + NTY::SUB; +} + pub trait ProvidesFrameType { fn frame_type_id(&self) -> u32; } @@ -160,6 +167,15 @@ where } } +impl Framable for Sitemty> +where + NTY: NumOps + Serialize, +{ + fn make_frame(&self) -> Result { + make_frame(self) + } +} + pub fn make_frame(item: &FT) -> Result where FT: FrameType + Serialize, diff --git a/disk/src/raw/conn.rs b/disk/src/raw/conn.rs index 66d7a5e..d76700a 100644 --- a/disk/src/raw/conn.rs +++ b/disk/src/raw/conn.rs @@ -96,18 +96,14 @@ impl> From<(E, OwnedWriteHalf)> for ConnErr { } } -// returns Pin::Output>> + Send>> - fn make_num_pipeline_stream_evs( event_value_shape: EVS, + events_node_proc: ENP, event_blobs: EventBlobsComplete, ) -> Pin> + Send>> where NTY: NumOps + NumFromBytes + 'static, END: Endianness + 'static, - - // TODO - // Can this work? EVS: EventValueShape + EventValueFromBytes + 'static, ENP: EventsNodeProcessor>::Output>, Sitemty<::Output>: Framable + 'static, @@ -118,7 +114,7 @@ where Ok(item) => match item { StreamItem::DataItem(item) => match item { RangeCompletableItem::Data(item) => { - let item = ::process(item); + let item = events_node_proc.process(item); Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))) } RangeCompletableItem::RangeComplete => Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)), @@ -133,30 +129,44 @@ where } macro_rules! pipe4 { - ($nty:ident, $end:ident, $evs:ident, $evsv:expr, $agg_kind:expr, $event_blobs:expr) => { + ($nty:ident, $end:ident, $shape:expr, $evs:ident, $evsv:expr, $agg_kind:expr, $event_blobs:expr) => { match $agg_kind { AggKind::DimXBins1 => make_num_pipeline_stream_evs::< $nty, $end, $evs<$nty>, - <$evs<$nty> as EventValueShape<$nty, $end>>::NumXAggToSingleBin, + //<$evs<$nty> as EventValueShape<$nty, $end>>::NumXAggToSingleBin, + _, //Identity<$nty>, - >($evsv, $event_blobs), + >( + $evsv, + <$evs<$nty> as EventValueShape<$nty, $end>>::NumXAggToSingleBin::create($shape), + $event_blobs, + ), AggKind::DimXBinsN(_) => make_num_pipeline_stream_evs::< $nty, $end, $evs<$nty>, - // TODO must pass on the requested number of bins: - <$evs<$nty> as EventValueShape<$nty, $end>>::NumXAggToNBins, + //<$evs<$nty> as EventValueShape<$nty, $end>>::NumXAggToNBins, + _, //WaveXBinner<$nty>, - >($evsv, $event_blobs), + >( + $evsv, + <$evs<$nty> as EventValueShape<$nty, $end>>::NumXAggToNBins::create($shape), + $event_blobs, + ), AggKind::Plain => make_num_pipeline_stream_evs::< $nty, $end, $evs<$nty>, - <$evs<$nty> as EventValueShape<$nty, $end>>::NumXAggPlain, + //<$evs<$nty> as EventValueShape<$nty, $end>>::NumXAggPlain, + _, //WaveXBinner<$nty>, - >($evsv, $event_blobs), + >( + $evsv, + <$evs<$nty> as EventValueShape<$nty, $end>>::NumXAggPlain::create($shape), + $event_blobs, + ), } }; } @@ -168,6 +178,7 @@ macro_rules! pipe3 { pipe4!( $nty, $end, + $shape, EventValuesDim0Case, EventValuesDim0Case::<$nty>::new(), $agg_kind, @@ -181,6 +192,7 @@ macro_rules! pipe3 { pipe4!( $nty, $end, + $shape, EventValuesDim1Case, EventValuesDim1Case::<$nty>::new(n), $agg_kind, @@ -258,6 +270,7 @@ async fn events_conn_handler_inner_try( return Err((Error::with_msg("json parse error"), netout))?; } }; + info!("---------------------------------------------------\nevq {:?}", evq); match dbconn::channel_exists(&evq.channel, &node_config).await { Ok(_) => (), Err(e) => return Err((e, netout))?,