From 018b44c9f6f542de0a34b60912b6b34e2af69b80 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Thu, 8 Apr 2021 13:45:20 +0200 Subject: [PATCH] Result of test agg_x_dim_1 agrees with ui-data-api --- Cargo.toml | 4 +- disk/src/agg.rs | 326 ++++++++++++++++++++++++++++++++++++++++++------ 2 files changed, 289 insertions(+), 41 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 9df41a8..55dd421 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,7 +2,7 @@ members = ["retrieval", "httpret", "err", "disk"] [profile.release] -debug = 1 -opt-level = 1 +#debug = 1 +#opt-level = 0 #overflow-checks = true #debug-assertions = true diff --git a/disk/src/agg.rs b/disk/src/agg.rs index f9c2835..b75f784 100644 --- a/disk/src/agg.rs +++ b/disk/src/agg.rs @@ -11,7 +11,10 @@ use netpod::ScalarType; pub trait AggregatorTdim { type InputValue; type OutputValue: AggregatableXdim1Bin + AggregatableTdim; - fn ingest(&mut self, v: &Self::InputValue); + fn ends_before(&self, inp: &Self::InputValue) -> bool; + fn ends_after(&self, inp: &Self::InputValue) -> bool; + fn starts_after(&self, inp: &Self::InputValue) -> bool; + fn ingest(&mut self, inp: &Self::InputValue); fn result(self) -> Self::OutputValue; } @@ -23,7 +26,7 @@ pub trait AggregatableXdim1Bin { pub trait AggregatableTdim { type Output: AggregatableXdim1Bin + AggregatableTdim; type Aggregator: AggregatorTdim; - fn aggregator_new(&self) -> Self::Aggregator; + fn aggregator_new(&self, ts1: u64, ts2: u64) -> Self::Aggregator; } @@ -35,13 +38,26 @@ impl AggregatableXdim1Bin for () { impl AggregatableTdim for () { type Output = (); type Aggregator = (); - fn aggregator_new(&self) -> Self::Aggregator { + fn aggregator_new(&self, ts1: u64, ts2: u64) -> Self::Aggregator { todo!() } } impl AggregatorTdim for () { type InputValue = (); type OutputValue = (); + + fn ends_before(&self, inp: &Self::InputValue) -> bool { + todo!() + } + + fn ends_after(&self, inp: &Self::InputValue) -> bool { + todo!() + } + + fn starts_after(&self, inp: &Self::InputValue) -> bool { + todo!() + } + fn ingest(&mut self, v: &Self::InputValue) { todo!() } fn result(self) -> Self::OutputValue { todo!() } } @@ -74,16 +90,20 @@ impl AggregatableXdim1Bin for ValuesDim1 { let mut max = f32::MIN; let mut sum = 0f32; let vals = &self.values[i1]; + assert!(vals.len() > 0); for i2 in 0..vals.len() { let v = vals[i2]; + //info!("value {} {} {}", i1, i2, v); min = min.min(v); max = max.max(v); sum += v; } + if min == f32::MAX { min = f32::NAN; } + if max == f32::MIN { max = f32::NAN; } ret.tss.push(ts); ret.mins.push(min); ret.maxs.push(max); - ret.avgs.push(sum / ret.tss.len() as f32); + ret.avgs.push(sum / vals.len() as f32); } ret } @@ -115,19 +135,31 @@ impl AggregatableTdim for MinMaxAvgScalarEventBatch { type Output = MinMaxAvgScalarBinBatch; type Aggregator = MinMaxAvgScalarEventBatchAggregator; - fn aggregator_new(&self) -> Self::Aggregator { - MinMaxAvgScalarEventBatchAggregator::new() + fn aggregator_new(&self, ts1: u64, ts2: u64) -> Self::Aggregator { + MinMaxAvgScalarEventBatchAggregator::new(ts1, ts2) } } pub struct MinMaxAvgScalarEventBatchAggregator { + ts1: u64, + ts2: u64, + min: f32, + max: f32, + sum: f32, + count: u64, } impl MinMaxAvgScalarEventBatchAggregator { - pub fn new() -> Self { + pub fn new(ts1: u64, ts2: u64) -> Self { Self { + ts1, + ts2, + min: f32::MAX, + max: f32::MIN, + sum: 0f32, + count: 0, } } @@ -135,13 +167,68 @@ impl MinMaxAvgScalarEventBatchAggregator { impl AggregatorTdim for MinMaxAvgScalarEventBatchAggregator { type InputValue = MinMaxAvgScalarEventBatch; - type OutputValue = MinMaxAvgScalarBinBatch; + type OutputValue = MinMaxAvgScalarBinSingle; - fn ingest(&mut self, v: &Self::InputValue) { - todo!() + fn ends_before(&self, inp: &Self::InputValue) -> bool { + match inp.tss.last() { + Some(ts) => { + *ts < self.ts1 + } + _ => panic!() + } } - fn result(self) -> Self::OutputValue { todo!() } + fn ends_after(&self, inp: &Self::InputValue) -> bool { + match inp.tss.last() { + Some(ts) => { + *ts >= self.ts2 + } + _ => panic!() + } + } + + fn starts_after(&self, inp: &Self::InputValue) -> bool { + match inp.tss.first() { + Some(ts) => { + *ts >= self.ts2 + } + _ => panic!() + } + } + + fn ingest(&mut self, v: &Self::InputValue) { + for i1 in 0..v.tss.len() { + let ts = v.tss[i1]; + if ts < self.ts1 { + //info!("EventBatchAgg {} {} {} {} IS BEFORE", v.tss[i1], v.mins[i1], v.maxs[i1], v.avgs[i1]); + continue; + } + else if ts >= self.ts2 { + //info!("EventBatchAgg {} {} {} {} IS AFTER", v.tss[i1], v.mins[i1], v.maxs[i1], v.avgs[i1]); + continue; + } + else { + //info!("EventBatchAgg {} {} {} {}", v.tss[i1], v.mins[i1], v.maxs[i1], v.avgs[i1]); + self.min = self.min.min(v.mins[i1]); + self.max = self.max.max(v.maxs[i1]); + self.sum += v.avgs[i1]; + self.count += 1; + } + } + } + + fn result(self) -> Self::OutputValue { + let min = if self.min == f32::MAX { f32::NAN } else { self.min }; + let max = if self.max == f32::MIN { f32::NAN } else { self.max }; + let avg = if self.count == 0 { f32::NAN } else { self.sum / self.count as f32 }; + MinMaxAvgScalarBinSingle { + ts1: self.ts1, + ts2: self.ts2, + min, + max, + avg, + } + } } @@ -170,7 +257,7 @@ impl AggregatableXdim1Bin for MinMaxAvgScalarBinBatch { impl AggregatableTdim for MinMaxAvgScalarBinBatch { type Output = MinMaxAvgScalarBinSingle; type Aggregator = MinMaxAvgScalarBinBatchAggregator; - fn aggregator_new(&self) -> Self::Aggregator { + fn aggregator_new(&self, ts1: u64, ts2: u64) -> Self::Aggregator { todo!() } } @@ -181,6 +268,19 @@ impl AggregatorTdim for MinMaxAvgScalarBinBatchAggregator { type InputValue = MinMaxAvgScalarBinBatch; type OutputValue = MinMaxAvgScalarBinSingle; + fn ends_before(&self, inp: &Self::InputValue) -> bool { + todo!() + } + + fn ends_after(&self, inp: &Self::InputValue) -> bool { + todo!() + } + + fn starts_after(&self, inp: &Self::InputValue) -> bool { + todo!() + } + + fn ingest(&mut self, v: &Self::InputValue) { todo!() } @@ -198,10 +298,16 @@ pub struct MinMaxAvgScalarBinSingle { avg: f32, } +impl std::fmt::Debug for MinMaxAvgScalarBinSingle { + fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(fmt, "MinMaxAvgScalarBinSingle ts1 {} ts2 {} min {:7.2e} max {:7.2e} avg {:7.2e}", self.ts1, self.ts2, self.min, self.max, self.avg) + } +} + impl AggregatableTdim for MinMaxAvgScalarBinSingle { type Output = MinMaxAvgScalarBinSingle; type Aggregator = MinMaxAvgScalarBinSingleAggregator; - fn aggregator_new(&self) -> Self::Aggregator { + fn aggregator_new(&self, ts1: u64, ts2: u64) -> Self::Aggregator { todo!() } } @@ -219,6 +325,19 @@ impl AggregatorTdim for MinMaxAvgScalarBinSingleAggregator { type InputValue = MinMaxAvgScalarBinSingle; type OutputValue = MinMaxAvgScalarBinSingle; + fn ends_before(&self, inp: &Self::InputValue) -> bool { + todo!() + } + + fn ends_after(&self, inp: &Self::InputValue) -> bool { + todo!() + } + + fn starts_after(&self, inp: &Self::InputValue) -> bool { + todo!() + } + + fn ingest(&mut self, v: &Self::InputValue) { todo!() } @@ -256,6 +375,7 @@ where S: Stream> + Unpin let decomp = k.decomps[i1].as_ref().unwrap(); match ty { F64 => { + const BY: usize = 8; // do the conversion let n1 = decomp.len(); assert!(n1 % ty.bytes() as usize == 0); @@ -265,10 +385,14 @@ where S: Stream> + Unpin unsafe { j.set_len(ele_count); } let mut p1 = 0; for i1 in 0..ele_count { - unsafe { - j[i1] = std::mem::transmute_copy::<_, f64>(&decomp[p1]) as f32; - p1 += 8; - } + let u = unsafe { + let mut r = [0u8; BY]; + std::ptr::copy_nonoverlapping(&decomp[p1], r.as_mut_ptr(), BY); + f64::from_be_bytes(r) + //f64::from_be_bytes(std::mem::transmute::<_, [u8; 8]>(&decomp[p1])) + }; + j[i1] = u as f32; + p1 += BY; } ret.tss.push(k.tss[i1]); ret.values.push(j); @@ -345,14 +469,14 @@ impl Stream for IntoBinnedXBins1DefaultStream where S: Stream Self::StreamOut; + fn into_binned_t(self, spec: BinSpecDimT) -> Self::StreamOut; } impl IntoBinnedT for T where I: AggregatableTdim + Unpin, T: Stream> + Unpin, I::Aggregator: Unpin { type StreamOut = IntoBinnedTDefaultStream; - fn into_binned_t(self) -> Self::StreamOut { - IntoBinnedTDefaultStream::new(self) + fn into_binned_t(self, spec: BinSpecDimT) -> Self::StreamOut { + IntoBinnedTDefaultStream::new(self, spec) } } @@ -360,44 +484,165 @@ impl IntoBinnedT for T where I: AggregatableTdim + Unpin, T: Stream where I: AggregatableTdim, S: Stream> { inp: S, aggtor: Option, + spec: BinSpecDimT, + curbin: u32, + left: Option>>>, } impl IntoBinnedTDefaultStream where I: AggregatableTdim, S: Stream> { - pub fn new(inp: S) -> Self { + pub fn new(inp: S, spec: BinSpecDimT) -> Self { + //info!("spec ts {} {}", spec.ts1, spec.ts2); Self { - inp: inp, + inp, aggtor: None, + spec, + curbin: 0, + left: None, } } } impl Stream for IntoBinnedTDefaultStream -where I: AggregatableTdim, T: Stream> + Unpin, I::Aggregator: Unpin +where I: AggregatableTdim + Unpin, T: Stream> + Unpin, I::Aggregator: Unpin { type Item = Result<::OutputValue, Error>; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; - match self.inp.poll_next_unpin(cx) { - Ready(Some(Ok(k))) => { - if self.aggtor.is_none() { - self.aggtor = Some(k.aggregator_new()); - } - let ag = self.aggtor.as_mut().unwrap(); - ag.ingest(&k); - Ready(Some(Ok(self.aggtor.take().unwrap().result()))) + 'outer: loop { + let cur = if self.curbin as u64 >= self.spec.count { + Ready(None) } - Ready(Some(Err(e))) => Ready(Some(Err(e))), - Ready(None) => Ready(None), - Pending => Pending, + else if let Some(k) = self.left.take() { + k + } + else { + self.inp.poll_next_unpin(cx) + }; + break match cur { + Ready(Some(Ok(k))) => { + if self.aggtor.is_none() { + let range = self.spec.get_range(self.curbin); + //info!("range: {} {}", range.ts1, range.ts2); + self.aggtor = Some(k.aggregator_new(range.ts1, range.ts2)); + } + let ag = self.aggtor.as_mut().unwrap(); + if ag.ends_before(&k) { + //info!("ENDS BEFORE"); + continue 'outer; + } + else if ag.starts_after(&k) { + //info!("STARTS AFTER"); + self.left = Some(Ready(Some(Ok(k)))); + self.curbin += 1; + Ready(Some(Ok(self.aggtor.take().unwrap().result()))) + } + else { + //info!("INGEST"); + ag.ingest(&k); + // if this input contains also data after the current bin, then I need to keep + // it for the next round. + if ag.ends_after(&k) { + //info!("ENDS AFTER"); + self.left = Some(Ready(Some(Ok(k)))); + self.curbin += 1; + Ready(Some(Ok(self.aggtor.take().unwrap().result()))) + } + else { + //info!("ENDS WITHIN"); + continue 'outer; + } + } + } + Ready(Some(Err(e))) => Ready(Some(Err(e))), + Ready(None) => { + match self.aggtor.take() { + Some(ag) => { + Ready(Some(Ok(ag.result()))) + } + None => { + warn!("TODO add trailing bins"); + Ready(None) + } + } + }, + Pending => Pending, + }; } } } +pub struct BinSpecDimT { + count: u64, + ts1: u64, + ts2: u64, + bs: u64, +} +impl BinSpecDimT { + + pub fn over_range(count: u64, ts1: u64, ts2: u64) -> Self { + assert!(count >= 1); + assert!(count <= 2000); + assert!(ts2 > ts1); + let dt = ts2 - ts1; + assert!(dt <= DAY * 14); + let bs = dt / count; + let thresholds = [ + 2, 10, 100, + 1000, 10_000, 100_000, + MU, MU * 10, MU * 100, + MS, MS * 10, MS * 100, + SEC, SEC * 5, SEC * 10, SEC * 20, + MIN, MIN * 5, MIN * 10, MIN * 20, + HOUR, HOUR * 2, HOUR * 4, HOUR * 12, + DAY, DAY * 2, DAY * 4, DAY * 8, DAY * 16, + WEEK, WEEK * 2, WEEK * 10, WEEK * 60, + ]; + let mut i1 = 0; + let bs = loop { + if i1 >= thresholds.len() { break *thresholds.last().unwrap(); } + let t = thresholds[i1]; + if bs < t { break t; } + i1 += 1; + }; + //info!("INPUT TS {} {}", ts1, ts2); + //info!("chosen binsize: {} {}", i1, bs); + let ts1 = ts1 / bs * bs; + let ts2 = (ts2 + bs - 1) / bs * bs; + //info!("ADJUSTED TS {} {}", ts1, ts2); + BinSpecDimT { + count, + ts1, + ts2, + bs, + } + } + + pub fn get_range(&self, ix: u32) -> TimeRange { + TimeRange { + ts1: self.ts1 + ix as u64 * self.bs, + ts2: self.ts1 + (ix as u64 + 1) * self.bs, + } + } + +} + +pub struct TimeRange { + ts1: u64, + ts2: u64, +} + +const MU: u64 = 1000; +const MS: u64 = MU * 1000; +const SEC: u64 = MS * 1000; +const MIN: u64 = SEC * 60; +const HOUR: u64 = MIN * 60; +const DAY: u64 = HOUR * 24; +const WEEK: u64 = DAY * 7; @@ -443,27 +688,30 @@ async fn agg_x_dim_1_inner() { name: "S10BC01-DBAM070:BAM_CH1_NORM".into(), backend: "sf-databuffer".into(), }, - timebin: 18721, + timebin: 18722, tb_file_count: 1, split: 12, tbsize: 1000 * 60 * 60 * 24, buffer_size: 1024 * 4, }; + let bin_count = 100; + let ts1 = query.timebin as u64 * query.tbsize as u64 * MS; + let ts2 = ts1 + HOUR * 24; let fut1 = crate::EventBlobsComplete::new(&query) .into_dim_1_f32_stream() - .take(10) + //.take(1000) .map(|q| { if let Ok(ref k) = q { - info!("vals: {:?}", k); + //info!("vals: {:?}", k); } q }) .into_binned_x_bins_1() .map(|k| { - info!("after X binning {:?}", k.as_ref().unwrap()); + //info!("after X binning {:?}", k.as_ref().unwrap()); k }) - .into_binned_t() + .into_binned_t(BinSpecDimT::over_range(bin_count, ts1, ts2)) .map(|k| { info!("after T binning {:?}", k.as_ref().unwrap()); k