use crate::numops::NumOps; use crate::streams::{Collectable, Collector, ToJsonBytes, ToJsonResult}; use crate::{ ts_offs_from_abs, Appendable, FilterFittingInside, Fits, FitsInside, IsoDateTime, RangeOverlapInfo, ReadPbv, ReadableFromFile, Sitemty, SitemtyFrameType, SubFrId, TimeBinnableType, TimeBinnableTypeAggregator, TimeBins, WithLen, }; use chrono::{TimeZone, Utc}; use err::Error; use netpod::log::*; use netpod::timeunits::SEC; use netpod::NanoRange; use num_traits::Zero; use serde::{Deserialize, Serialize}; use std::fmt; use std::marker::PhantomData; use tokio::fs::File; #[derive(Clone, Serialize, Deserialize)] pub struct MinMaxAvgDim0Bins { pub ts1s: Vec, pub ts2s: Vec, pub counts: Vec, // TODO get rid of Option: pub mins: Vec>, pub maxs: Vec>, pub avgs: Vec>, } impl SitemtyFrameType for MinMaxAvgDim0Bins where NTY: SubFrId, { const FRAME_TYPE_ID: u32 = crate::MIN_MAX_AVG_BINS + NTY::SUB; } impl fmt::Debug for MinMaxAvgDim0Bins where NTY: fmt::Debug, { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { write!( fmt, "MinMaxAvgBins count {} ts1s {:?} ts2s {:?} counts {:?} mins {:?} maxs {:?} avgs {:?}", self.ts1s.len(), self.ts1s.iter().map(|k| k / SEC).collect::>(), self.ts2s.iter().map(|k| k / SEC).collect::>(), self.counts, self.mins, self.maxs, self.avgs, ) } } impl MinMaxAvgDim0Bins { pub fn empty() -> Self { Self { ts1s: vec![], ts2s: vec![], counts: vec![], mins: vec![], maxs: vec![], avgs: vec![], } } } impl FitsInside for MinMaxAvgDim0Bins { fn fits_inside(&self, range: NanoRange) -> Fits { if self.ts1s.is_empty() { Fits::Empty } else { let t1 = *self.ts1s.first().unwrap(); let t2 = *self.ts2s.last().unwrap(); if t2 <= range.beg { Fits::Lower } else if t1 >= range.end { Fits::Greater } else if t1 < range.beg && t2 > range.end { Fits::PartlyLowerAndGreater } else if t1 < range.beg { Fits::PartlyLower } else if t2 > range.end { Fits::PartlyGreater } else { Fits::Inside } } } } impl FilterFittingInside for MinMaxAvgDim0Bins { fn filter_fitting_inside(self, fit_range: NanoRange) -> Option { match self.fits_inside(fit_range) { Fits::Inside | Fits::PartlyGreater | Fits::PartlyLower | Fits::PartlyLowerAndGreater => Some(self), _ => None, } } } impl RangeOverlapInfo for MinMaxAvgDim0Bins { fn ends_before(&self, range: NanoRange) -> bool { match self.ts2s.last() { Some(&ts) => ts <= range.beg, None => true, } } fn ends_after(&self, range: NanoRange) -> bool { match self.ts2s.last() { Some(&ts) => ts > range.end, None => panic!(), } } fn starts_after(&self, range: NanoRange) -> bool { match self.ts1s.first() { Some(&ts) => ts >= range.end, None => panic!(), } } } impl TimeBins for MinMaxAvgDim0Bins where NTY: NumOps, { fn ts1s(&self) -> &Vec { &self.ts1s } fn ts2s(&self) -> &Vec { &self.ts2s } } impl WithLen for MinMaxAvgDim0Bins { fn len(&self) -> usize { self.ts1s.len() } } impl Appendable for MinMaxAvgDim0Bins where NTY: NumOps, { fn empty_like_self(&self) -> Self { Self::empty() } fn append(&mut self, src: &Self) { self.ts1s.extend_from_slice(&src.ts1s); self.ts2s.extend_from_slice(&src.ts2s); self.counts.extend_from_slice(&src.counts); self.mins.extend_from_slice(&src.mins); self.maxs.extend_from_slice(&src.maxs); self.avgs.extend_from_slice(&src.avgs); } } impl ReadableFromFile for MinMaxAvgDim0Bins where NTY: NumOps, { // TODO this function is not needed in the trait: fn read_from_file(file: File) -> Result, Error> { Ok(ReadPbv::new(file)) } fn from_buf(buf: &[u8]) -> Result { let dec = serde_cbor::from_slice(&buf)?; Ok(dec) } } impl TimeBinnableType for MinMaxAvgDim0Bins where NTY: NumOps, { type Output = MinMaxAvgDim0Bins; type Aggregator = MinMaxAvgBinsAggregator; fn aggregator(range: NanoRange, x_bin_count: usize, do_time_weight: bool) -> Self::Aggregator { debug!( "TimeBinnableType for XBinnedScalarEvents aggregator() range {:?} x_bin_count {} do_time_weight {}", range, x_bin_count, do_time_weight ); Self::Aggregator::new(range, do_time_weight) } } impl ToJsonResult for Sitemty> where NTY: NumOps, { fn to_json_result(&self) -> Result, Error> { Ok(Box::new(serde_json::Value::String(format!( "MinMaxAvgBins/non-json-item" )))) } } pub struct MinMaxAvgBinsCollected { _m1: PhantomData, } impl MinMaxAvgBinsCollected { pub fn new() -> Self { Self { _m1: PhantomData } } } #[derive(Serialize)] pub struct MinMaxAvgBinsCollectedResult { #[serde(rename = "tsAnchor")] ts_anchor_sec: u64, #[serde(rename = "tsMs")] ts_off_ms: Vec, #[serde(rename = "tsNs")] ts_off_ns: Vec, //ts_bin_edges: Vec, counts: Vec, mins: Vec>, maxs: Vec>, avgs: Vec>, #[serde(skip_serializing_if = "crate::bool_is_false", rename = "finalisedRange")] finalised_range: bool, #[serde(skip_serializing_if = "Zero::is_zero", rename = "missingBins")] missing_bins: u32, #[serde(skip_serializing_if = "Option::is_none", rename = "continueAt")] continue_at: Option, } pub struct MinMaxAvgBinsCollector { bin_count_exp: u32, timed_out: bool, range_complete: bool, vals: MinMaxAvgDim0Bins, _m1: PhantomData, } impl MinMaxAvgBinsCollector { pub fn new(bin_count_exp: u32) -> Self { Self { bin_count_exp, timed_out: false, range_complete: false, vals: MinMaxAvgDim0Bins::::empty(), _m1: PhantomData, } } } impl WithLen for MinMaxAvgBinsCollector where NTY: NumOps + Serialize, { fn len(&self) -> usize { self.vals.ts1s.len() } } impl Collector for MinMaxAvgBinsCollector where NTY: NumOps + Serialize, { type Input = MinMaxAvgDim0Bins; type Output = MinMaxAvgBinsCollectedResult; fn ingest(&mut self, src: &Self::Input) { Appendable::append(&mut self.vals, src); } fn set_range_complete(&mut self) { self.range_complete = true; } fn set_timed_out(&mut self) { self.timed_out = true; } fn result(self) -> Result { let bin_count = self.vals.ts1s.len() as u32; // TODO could save the copy: let mut ts_all = self.vals.ts1s.clone(); if self.vals.ts2s.len() > 0 { ts_all.push(*self.vals.ts2s.last().unwrap()); } let continue_at = if self.vals.ts1s.len() < self.bin_count_exp as usize { match ts_all.last() { Some(&k) => { let iso = IsoDateTime(Utc.timestamp_nanos(k as i64)); Some(iso) } None => Err(Error::with_msg("partial_content but no bin in result"))?, } } else { None }; let tst = ts_offs_from_abs(&ts_all); let ret = MinMaxAvgBinsCollectedResult:: { ts_anchor_sec: tst.0, ts_off_ms: tst.1, ts_off_ns: tst.2, counts: self.vals.counts, mins: self.vals.mins, maxs: self.vals.maxs, avgs: self.vals.avgs, finalised_range: self.range_complete, missing_bins: self.bin_count_exp - bin_count, continue_at, }; Ok(ret) } } impl Collectable for MinMaxAvgDim0Bins where NTY: NumOps + Serialize, { type Collector = MinMaxAvgBinsCollector; fn new_collector(bin_count_exp: u32) -> Self::Collector { Self::Collector::new(bin_count_exp) } } pub struct MinMaxAvgBinsAggregator { range: NanoRange, count: u64, min: Option, max: Option, sumc: u64, sum: f32, } impl MinMaxAvgBinsAggregator { pub fn new(range: NanoRange, _do_time_weight: bool) -> Self { Self { range, count: 0, min: None, max: None, sumc: 0, sum: 0f32, } } } impl TimeBinnableTypeAggregator for MinMaxAvgBinsAggregator where NTY: NumOps, { type Input = MinMaxAvgDim0Bins; type Output = MinMaxAvgDim0Bins; fn range(&self) -> &NanoRange { &self.range } fn ingest(&mut self, item: &Self::Input) { for i1 in 0..item.ts1s.len() { if item.ts2s[i1] <= self.range.beg { } else if item.ts1s[i1] >= self.range.end { } else { self.min = match &self.min { None => item.mins[i1].clone(), Some(min) => match &item.mins[i1] { None => Some(min.clone()), Some(v) => { if v < &min { Some(v.clone()) } else { Some(min.clone()) } } }, }; self.max = match &self.max { None => item.maxs[i1].clone(), Some(max) => match &item.maxs[i1] { None => Some(max.clone()), Some(v) => { if v > &max { Some(v.clone()) } else { Some(max.clone()) } } }, }; match item.avgs[i1] { None => {} Some(v) => { if v.is_nan() { } else { self.sum += v; self.sumc += 1; } } } self.count += item.counts[i1]; } } } fn result_reset(&mut self, range: NanoRange, _expand: bool) -> Self::Output { let avg = if self.sumc == 0 { None } else { Some(self.sum / self.sumc as f32) }; let ret = Self::Output { ts1s: vec![self.range.beg], ts2s: vec![self.range.end], counts: vec![self.count], mins: vec![self.min.clone()], maxs: vec![self.max.clone()], avgs: vec![avg], }; self.count = 0; self.min = None; self.max = None; self.range = range; self.sum = 0f32; self.sumc = 0; ret } }