diff --git a/daqbuffer/src/test.rs b/daqbuffer/src/test.rs index c34f433..7daf557 100644 --- a/daqbuffer/src/test.rs +++ b/daqbuffer/src/test.rs @@ -84,6 +84,18 @@ fn get_binned_binary() { async fn get_binned_binary_inner() -> Result<(), Error> { let rh = require_test_hosts_running()?; let cluster = &rh.cluster; + if true { + get_binned_channel::( + "scalar-i32-be", + "1970-01-01T00:20:10.000Z", + "1970-01-01T00:20:50.000Z", + 3, + cluster, + true, + 4, + ) + .await?; + } if true { get_binned_channel::( "wave-f64-be-n21", diff --git a/disk/src/agg/enp.rs b/disk/src/agg/enp.rs index 98c1d12..9ca4f45 100644 --- a/disk/src/agg/enp.rs +++ b/disk/src/agg/enp.rs @@ -22,8 +22,8 @@ where type Input = NTY; type Output = EventValues; - fn process(_inp: EventValues) -> Self::Output { - todo!() + fn process(inp: EventValues) -> Self::Output { + inp } } @@ -150,9 +150,10 @@ where NTY: NumOps, { range: NanoRange, + count: u64, min: Option, max: Option, - sumc: u32, + sumc: u64, sum: f32, } @@ -163,6 +164,7 @@ where pub fn new(range: NanoRange) -> Self { Self { range, + count: 0, min: None, max: None, sumc: 0, @@ -216,6 +218,7 @@ where self.sum += x; self.sumc += 1; } + self.count += 1; } } } @@ -229,8 +232,7 @@ where Self::Output { ts1s: vec![self.range.beg], ts2s: vec![self.range.end], - // TODO - counts: vec![0], + counts: vec![self.count], mins: vec![self.min], maxs: vec![self.max], avgs: vec![avg], diff --git a/disk/src/binned.rs b/disk/src/binned.rs index 7aa3233..903b187 100644 --- a/disk/src/binned.rs +++ b/disk/src/binned.rs @@ -999,10 +999,11 @@ where pub struct EventValuesAggregator { range: NanoRange, - count: u32, + count: u64, min: Option, max: Option, - avg: Option, + sumc: u64, + sum: f32, } impl EventValuesAggregator { @@ -1012,7 +1013,8 @@ impl EventValuesAggregator { count: 0, min: None, max: None, - avg: None, + sumc: 0, + sum: 0f32, } } } @@ -1028,13 +1030,60 @@ where &self.range } - fn ingest(&mut self, _item: &Self::Input) { - // TODO construct test case to hit this: - todo!() + fn ingest(&mut self, item: &Self::Input) { + for i1 in 0..item.tss.len() { + let ts = item.tss[i1]; + if ts < self.range.beg { + continue; + } else if ts >= self.range.end { + continue; + } else { + let v = item.values[i1]; + let vf = v.as_(); + self.min = match self.min { + None => Some(v), + Some(min) => { + if v < min { + Some(v) + } else { + Some(min) + } + } + }; + self.max = match self.max { + None => Some(v), + Some(max) => { + if v > max { + Some(v) + } else { + Some(max) + } + } + }; + if vf.is_nan() { + } else { + self.sum += vf; + self.sumc += 1; + } + self.count += 1; + } + } } fn result(self) -> Self::Output { - todo!() + let avg = if self.sumc == 0 { + None + } else { + Some(self.sum / self.sumc as f32) + }; + Self::Output { + ts1s: vec![self.range.beg], + ts2s: vec![self.range.end], + counts: vec![self.count], + mins: vec![self.min], + maxs: vec![self.max], + avgs: vec![avg], + } } } @@ -1043,20 +1092,19 @@ pub struct MinMaxAvgBinsAggregator { count: u64, min: Option, max: Option, + sumc: u64, sum: f32, - sumc: u32, } impl MinMaxAvgBinsAggregator { pub fn new(range: NanoRange) -> Self { Self { range, - // TODO: actually count events through the whole pipeline. count: 0, min: None, max: None, - sum: 0f32, sumc: 0, + sum: 0f32, } } } @@ -1115,6 +1163,7 @@ where } } } + self.count += item.counts[i1]; } } } @@ -1128,7 +1177,7 @@ where Self::Output { ts1s: vec![self.range.beg], ts2s: vec![self.range.end], - counts: vec![self.count as u64], + counts: vec![self.count], mins: vec![self.min], maxs: vec![self.max], avgs: vec![avg],