Switch other endpoints to new timestamp format

This commit is contained in:
Dominik Werder
2021-06-16 15:02:20 +02:00
parent 6b27185af9
commit 7077d6b09a
5 changed files with 50 additions and 75 deletions

View File

@@ -63,7 +63,7 @@ async fn get_binned_json_2_inner() -> Result<(), Error> {
"1970-01-01T00:20:10.000Z",
"1970-01-01T00:20:20.000Z",
2,
AggKind::DimXBinsN(2),
AggKind::DimXBinsN(3),
cluster,
2,
true,

View File

@@ -42,7 +42,6 @@ pub struct XBinnedScalarEvents<NTY> {
mins: Vec<NTY>,
maxs: Vec<NTY>,
avgs: Vec<f32>,
xbincount: Vec<u32>,
}
impl<NTY> XBinnedScalarEvents<NTY> {
@@ -52,7 +51,6 @@ impl<NTY> XBinnedScalarEvents<NTY> {
mins: vec![],
maxs: vec![],
avgs: vec![],
xbincount: vec![],
}
}
}
@@ -131,7 +129,6 @@ where
{
fn push_index(&mut self, src: &Self, ix: usize) {
self.tss.push(src.tss[ix]);
self.xbincount.push(src.xbincount[ix]);
self.mins.push(src.mins[ix]);
self.maxs.push(src.maxs[ix]);
self.avgs.push(src.avgs[ix]);
@@ -148,7 +145,6 @@ where
fn append(&mut self, src: &Self) {
self.tss.extend_from_slice(&src.tss);
self.xbincount.extend_from_slice(&src.xbincount);
self.mins.extend_from_slice(&src.mins);
self.maxs.extend_from_slice(&src.maxs);
self.avgs.extend_from_slice(&src.avgs);
@@ -982,56 +978,36 @@ where
let nev = inp.tss.len();
let mut ret = Self::Output {
tss: inp.tss,
xbincount: Vec::with_capacity(nev),
mins: Vec::with_capacity(nev),
maxs: Vec::with_capacity(nev),
avgs: Vec::with_capacity(nev),
};
for i1 in 0..nev {
// TODO why do I work here with Option?
err::todo();
let mut min = None;
let mut max = None;
let mut min = NTY::max_or_nan();
let mut max = NTY::min_or_nan();
let mut sum = 0f32;
let mut count = 0;
let mut sumc = 0;
let vals = &inp.values[i1];
for i2 in 0..vals.len() {
let v = vals[i2];
min = match min {
None => Some(v),
Some(min) => {
if v < min {
Some(v)
} else {
Some(min)
}
}
};
max = match max {
None => Some(v),
Some(max) => {
if v > max {
Some(v)
} else {
Some(max)
}
}
};
for &v in vals {
if v < min || min.is_nan() {
min = v;
}
if v > max || max.is_nan() {
max = v;
}
let vf = v.as_();
if vf.is_nan() {
} else {
sum += vf;
count += 1;
sumc += 1;
}
}
// TODO while X-binning I expect values, otherwise it is illegal input.
ret.xbincount.push(nev as u32);
ret.mins.push(min.unwrap());
ret.maxs.push(max.unwrap());
if count == 0 {
ret.mins.push(min);
ret.maxs.push(max);
if sumc == 0 {
ret.avgs.push(f32::NAN);
} else {
ret.avgs.push(sum / count as f32);
ret.avgs.push(sum / sumc as f32);
}
}
ret

View File

@@ -1246,8 +1246,7 @@ pub struct MinMaxAvgBinsCollectedResult<NTY> {
#[serde(skip_serializing_if = "Zero::is_zero", rename = "missingBins")]
missing_bins: u32,
#[serde(skip_serializing_if = "Option::is_none", rename = "continueAt")]
//continue_at: Option<IsoDateTime>,
continue_at: Option<u64>,
continue_at: Option<IsoDateTime>,
}
pub struct MinMaxAvgBinsCollector<NTY> {
@@ -1299,30 +1298,23 @@ where
}
fn result(self) -> Result<Self::Output, Error> {
let ts0 = self.vals.ts1s.first().map_or(0, |k| *k / SEC);
let bin_count = self.vals.ts1s.len() as u32;
let mut tsoff: Vec<_> = self.vals.ts1s.iter().map(|k| *k - ts0 * SEC).collect();
if let Some(&k) = self.vals.ts2s.last() {
tsoff.push(k - ts0 * SEC);
}
let tsoff = tsoff;
let _iso: Vec<_> = tsoff
.iter()
.map(|&k| IsoDateTime(Utc.timestamp_nanos(k as i64)))
.collect();
let continue_at = if self.vals.ts1s.len() < self.bin_count_exp as usize {
match tsoff.last() {
Some(k) => Some(k.clone()),
None => Err(Error::with_msg("partial_content but no bin in result"))?,
}
} else {
None
};
// TODO could save the copy:
let mut ts_all = self.vals.ts1s.clone();
if self.vals.ts2s.len() > 0 {
ts_all.push(*self.vals.ts2s.last().unwrap());
}
let continue_at = if self.vals.ts1s.len() < self.bin_count_exp as usize {
match ts_all.last() {
Some(&k) => {
let iso = IsoDateTime(Utc.timestamp_nanos(k as i64));
Some(iso)
}
None => Err(Error::with_msg("partial_content but no bin in result"))?,
}
} else {
None
};
let tst = ts_offs_from_abs(&ts_all);
let ret = MinMaxAvgBinsCollectedResult::<NTY> {
ts_anchor_sec: tst.0,

View File

@@ -1,5 +1,5 @@
use crate::agg::binnedt::{TimeBinnableType, TimeBinnableTypeAggregator};
use crate::agg::enp::WaveEvents;
use crate::agg::enp::{ts_offs_from_abs, WaveEvents};
use crate::agg::streams::{Appendable, Collectable, Collector, ToJsonBytes, ToJsonResult};
use crate::agg::{Fits, FitsInside};
use crate::binned::{
@@ -423,8 +423,12 @@ where
#[derive(Serialize)]
pub struct WaveEventsCollectedResult<NTY> {
ts0: u64,
tsoff: Vec<u64>,
#[serde(rename = "tsAnchor")]
ts_anchor_sec: u64,
#[serde(rename = "tsMs")]
ts_off_ms: Vec<u64>,
#[serde(rename = "tsNs")]
ts_off_ns: Vec<u64>,
values: Vec<Vec<NTY>>,
#[serde(skip_serializing_if = "Bool::is_false", rename = "finalisedRange")]
range_complete: bool,
@@ -475,11 +479,11 @@ where
}
fn result(self) -> Result<Self::Output, Error> {
let ts0 = self.vals.tss.first().map_or(0, |k| *k / SEC);
let tsoff = self.vals.tss.into_iter().map(|k| k - ts0 * SEC).collect();
let tst = ts_offs_from_abs(&self.vals.tss);
let ret = Self::Output {
ts0,
tsoff,
ts_anchor_sec: tst.0,
ts_off_ms: tst.1,
ts_off_ns: tst.2,
values: self.vals.vals,
range_complete: self.range_complete,
timed_out: self.timed_out,

View File

@@ -1,5 +1,5 @@
use crate::agg::binnedt::TimeBinnableType;
use crate::agg::enp::{Identity, WaveNBinner, WavePlainProc, WaveXBinner};
use crate::agg::enp::{ts_offs_from_abs, Identity, WaveNBinner, WavePlainProc, WaveXBinner};
use crate::agg::streams::{Appendable, Collectable, Collector, StreamItem};
use crate::agg::{Fits, FitsInside};
use crate::binned::{
@@ -11,7 +11,6 @@ use crate::eventchunker::EventFull;
use err::Error;
use futures_core::Stream;
use futures_util::StreamExt;
use netpod::timeunits::SEC;
use netpod::NanoRange;
use serde::{Deserialize, Serialize};
use std::marker::PhantomData;
@@ -324,8 +323,12 @@ impl<NTY> WithLen for EventValuesCollector<NTY> {
#[derive(Serialize)]
pub struct EventValuesCollectorOutput<NTY> {
ts0: u64,
tsoff: Vec<u64>,
#[serde(rename = "tsAnchor")]
ts_anchor_sec: u64,
#[serde(rename = "tsMs")]
ts_off_ms: Vec<u64>,
#[serde(rename = "tsNs")]
ts_off_ns: Vec<u64>,
values: Vec<NTY>,
#[serde(skip_serializing_if = "Bool::is_false", rename = "finalisedRange")]
range_complete: bool,
@@ -353,11 +356,11 @@ where
}
fn result(self) -> Result<Self::Output, Error> {
let ts0 = self.vals.tss.first().map_or(0, |k| *k / SEC);
let tsoff = self.vals.tss.into_iter().map(|k| k - ts0 * SEC).collect();
let tst = ts_offs_from_abs(&self.vals.tss);
let ret = Self::Output {
ts0,
tsoff,
ts_anchor_sec: tst.0,
ts_off_ms: tst.1,
ts_off_ns: tst.2,
values: self.vals.values,
range_complete: self.range_complete,
timed_out: self.timed_out,