diff --git a/daqbuffer/src/test/binnedjson.rs b/daqbuffer/src/test/binnedjson.rs index 7d95053..b7269f5 100644 --- a/daqbuffer/src/test/binnedjson.rs +++ b/daqbuffer/src/test/binnedjson.rs @@ -63,7 +63,7 @@ async fn get_binned_json_2_inner() -> Result<(), Error> { "1970-01-01T00:20:10.000Z", "1970-01-01T00:20:20.000Z", 2, - AggKind::DimXBinsN(2), + AggKind::DimXBinsN(3), cluster, 2, true, diff --git a/disk/src/agg/enp.rs b/disk/src/agg/enp.rs index 1d6e177..1b24a6f 100644 --- a/disk/src/agg/enp.rs +++ b/disk/src/agg/enp.rs @@ -42,7 +42,6 @@ pub struct XBinnedScalarEvents { mins: Vec, maxs: Vec, avgs: Vec, - xbincount: Vec, } impl XBinnedScalarEvents { @@ -52,7 +51,6 @@ impl XBinnedScalarEvents { mins: vec![], maxs: vec![], avgs: vec![], - xbincount: vec![], } } } @@ -131,7 +129,6 @@ where { fn push_index(&mut self, src: &Self, ix: usize) { self.tss.push(src.tss[ix]); - self.xbincount.push(src.xbincount[ix]); self.mins.push(src.mins[ix]); self.maxs.push(src.maxs[ix]); self.avgs.push(src.avgs[ix]); @@ -148,7 +145,6 @@ where fn append(&mut self, src: &Self) { self.tss.extend_from_slice(&src.tss); - self.xbincount.extend_from_slice(&src.xbincount); self.mins.extend_from_slice(&src.mins); self.maxs.extend_from_slice(&src.maxs); self.avgs.extend_from_slice(&src.avgs); @@ -982,56 +978,36 @@ where let nev = inp.tss.len(); let mut ret = Self::Output { tss: inp.tss, - xbincount: Vec::with_capacity(nev), mins: Vec::with_capacity(nev), maxs: Vec::with_capacity(nev), avgs: Vec::with_capacity(nev), }; for i1 in 0..nev { - // TODO why do I work here with Option? - err::todo(); - let mut min = None; - let mut max = None; + let mut min = NTY::max_or_nan(); + let mut max = NTY::min_or_nan(); let mut sum = 0f32; - let mut count = 0; + let mut sumc = 0; let vals = &inp.values[i1]; - for i2 in 0..vals.len() { - let v = vals[i2]; - min = match min { - None => Some(v), - Some(min) => { - if v < min { - Some(v) - } else { - Some(min) - } - } - }; - max = match max { - None => Some(v), - Some(max) => { - if v > max { - Some(v) - } else { - Some(max) - } - } - }; + for &v in vals { + if v < min || min.is_nan() { + min = v; + } + if v > max || max.is_nan() { + max = v; + } let vf = v.as_(); if vf.is_nan() { } else { sum += vf; - count += 1; + sumc += 1; } } - // TODO while X-binning I expect values, otherwise it is illegal input. - ret.xbincount.push(nev as u32); - ret.mins.push(min.unwrap()); - ret.maxs.push(max.unwrap()); - if count == 0 { + ret.mins.push(min); + ret.maxs.push(max); + if sumc == 0 { ret.avgs.push(f32::NAN); } else { - ret.avgs.push(sum / count as f32); + ret.avgs.push(sum / sumc as f32); } } ret diff --git a/disk/src/binned.rs b/disk/src/binned.rs index 321a01d..c9b553e 100644 --- a/disk/src/binned.rs +++ b/disk/src/binned.rs @@ -1246,8 +1246,7 @@ pub struct MinMaxAvgBinsCollectedResult { #[serde(skip_serializing_if = "Zero::is_zero", rename = "missingBins")] missing_bins: u32, #[serde(skip_serializing_if = "Option::is_none", rename = "continueAt")] - //continue_at: Option, - continue_at: Option, + continue_at: Option, } pub struct MinMaxAvgBinsCollector { @@ -1299,30 +1298,23 @@ where } fn result(self) -> Result { - let ts0 = self.vals.ts1s.first().map_or(0, |k| *k / SEC); let bin_count = self.vals.ts1s.len() as u32; - let mut tsoff: Vec<_> = self.vals.ts1s.iter().map(|k| *k - ts0 * SEC).collect(); - if let Some(&k) = self.vals.ts2s.last() { - tsoff.push(k - ts0 * SEC); - } - let tsoff = tsoff; - let _iso: Vec<_> = tsoff - .iter() - .map(|&k| IsoDateTime(Utc.timestamp_nanos(k as i64))) - .collect(); - let continue_at = if self.vals.ts1s.len() < self.bin_count_exp as usize { - match tsoff.last() { - Some(k) => Some(k.clone()), - None => Err(Error::with_msg("partial_content but no bin in result"))?, - } - } else { - None - }; // TODO could save the copy: let mut ts_all = self.vals.ts1s.clone(); if self.vals.ts2s.len() > 0 { ts_all.push(*self.vals.ts2s.last().unwrap()); } + let continue_at = if self.vals.ts1s.len() < self.bin_count_exp as usize { + match ts_all.last() { + Some(&k) => { + let iso = IsoDateTime(Utc.timestamp_nanos(k as i64)); + Some(iso) + } + None => Err(Error::with_msg("partial_content but no bin in result"))?, + } + } else { + None + }; let tst = ts_offs_from_abs(&ts_all); let ret = MinMaxAvgBinsCollectedResult:: { ts_anchor_sec: tst.0, diff --git a/disk/src/binned/dim1.rs b/disk/src/binned/dim1.rs index 701648e..6fef183 100644 --- a/disk/src/binned/dim1.rs +++ b/disk/src/binned/dim1.rs @@ -1,5 +1,5 @@ use crate::agg::binnedt::{TimeBinnableType, TimeBinnableTypeAggregator}; -use crate::agg::enp::WaveEvents; +use crate::agg::enp::{ts_offs_from_abs, WaveEvents}; use crate::agg::streams::{Appendable, Collectable, Collector, ToJsonBytes, ToJsonResult}; use crate::agg::{Fits, FitsInside}; use crate::binned::{ @@ -423,8 +423,12 @@ where #[derive(Serialize)] pub struct WaveEventsCollectedResult { - ts0: u64, - tsoff: Vec, + #[serde(rename = "tsAnchor")] + ts_anchor_sec: u64, + #[serde(rename = "tsMs")] + ts_off_ms: Vec, + #[serde(rename = "tsNs")] + ts_off_ns: Vec, values: Vec>, #[serde(skip_serializing_if = "Bool::is_false", rename = "finalisedRange")] range_complete: bool, @@ -475,11 +479,11 @@ where } fn result(self) -> Result { - let ts0 = self.vals.tss.first().map_or(0, |k| *k / SEC); - let tsoff = self.vals.tss.into_iter().map(|k| k - ts0 * SEC).collect(); + let tst = ts_offs_from_abs(&self.vals.tss); let ret = Self::Output { - ts0, - tsoff, + ts_anchor_sec: tst.0, + ts_off_ms: tst.1, + ts_off_ns: tst.2, values: self.vals.vals, range_complete: self.range_complete, timed_out: self.timed_out, diff --git a/disk/src/decode.rs b/disk/src/decode.rs index c38968f..d5983fc 100644 --- a/disk/src/decode.rs +++ b/disk/src/decode.rs @@ -1,5 +1,5 @@ use crate::agg::binnedt::TimeBinnableType; -use crate::agg::enp::{Identity, WaveNBinner, WavePlainProc, WaveXBinner}; +use crate::agg::enp::{ts_offs_from_abs, Identity, WaveNBinner, WavePlainProc, WaveXBinner}; use crate::agg::streams::{Appendable, Collectable, Collector, StreamItem}; use crate::agg::{Fits, FitsInside}; use crate::binned::{ @@ -11,7 +11,6 @@ use crate::eventchunker::EventFull; use err::Error; use futures_core::Stream; use futures_util::StreamExt; -use netpod::timeunits::SEC; use netpod::NanoRange; use serde::{Deserialize, Serialize}; use std::marker::PhantomData; @@ -324,8 +323,12 @@ impl WithLen for EventValuesCollector { #[derive(Serialize)] pub struct EventValuesCollectorOutput { - ts0: u64, - tsoff: Vec, + #[serde(rename = "tsAnchor")] + ts_anchor_sec: u64, + #[serde(rename = "tsMs")] + ts_off_ms: Vec, + #[serde(rename = "tsNs")] + ts_off_ns: Vec, values: Vec, #[serde(skip_serializing_if = "Bool::is_false", rename = "finalisedRange")] range_complete: bool, @@ -353,11 +356,11 @@ where } fn result(self) -> Result { - let ts0 = self.vals.tss.first().map_or(0, |k| *k / SEC); - let tsoff = self.vals.tss.into_iter().map(|k| k - ts0 * SEC).collect(); + let tst = ts_offs_from_abs(&self.vals.tss); let ret = Self::Output { - ts0, - tsoff, + ts_anchor_sec: tst.0, + ts_off_ms: tst.1, + ts_off_ns: tst.2, values: self.vals.values, range_complete: self.range_complete, timed_out: self.timed_out,