use crate::agg::binnedt::{TimeBinnableType, TimeBinnableTypeAggregator}; use crate::agg::enp::{ts_offs_from_abs, WaveEvents}; use crate::agg::streams::{Appendable, Collectable, Collector, ToJsonBytes, ToJsonResult}; use crate::agg::{Fits, FitsInside}; use crate::binned::{ Bool, FilterFittingInside, IsoDateTime, NumOps, RangeOverlapInfo, ReadPbv, ReadableFromFile, TimeBins, WithLen, }; use chrono::{TimeZone, Utc}; use err::Error; use items::{Sitemty, SitemtyFrameType, SubFrId}; use netpod::log::*; use netpod::timeunits::SEC; use netpod::NanoRange; use num_traits::Zero; use serde::{Deserialize, Serialize}; use std::fmt; use std::marker::PhantomData; use tokio::fs::File; #[derive(Serialize, Deserialize)] pub struct MinMaxAvgDim1Bins { pub ts1s: Vec, pub ts2s: Vec, pub counts: Vec, pub mins: Vec>>, pub maxs: Vec>>, pub avgs: Vec>>, } impl SitemtyFrameType for MinMaxAvgDim1Bins where NTY: SubFrId, { const FRAME_TYPE_ID: u32 = 0xb00 + NTY::SUB; } impl fmt::Debug for MinMaxAvgDim1Bins where NTY: fmt::Debug, { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { write!( fmt, "MinMaxAvgDim1Bins count {} ts1s {:?} ts2s {:?} counts {:?} mins {:?} maxs {:?} avgs {:?}", self.ts1s.len(), self.ts1s.iter().map(|k| k / SEC).collect::>(), self.ts2s.iter().map(|k| k / SEC).collect::>(), self.counts, self.mins.first(), self.maxs.first(), self.avgs.first(), ) } } impl MinMaxAvgDim1Bins { pub fn empty() -> Self { Self { ts1s: vec![], ts2s: vec![], counts: vec![], mins: vec![], maxs: vec![], avgs: vec![], } } } impl FitsInside for MinMaxAvgDim1Bins { fn fits_inside(&self, range: NanoRange) -> Fits { if self.ts1s.is_empty() { Fits::Empty } else { let t1 = *self.ts1s.first().unwrap(); let t2 = *self.ts2s.last().unwrap(); if t2 <= range.beg { Fits::Lower } else if t1 >= range.end { Fits::Greater } else if t1 < range.beg && t2 > range.end { Fits::PartlyLowerAndGreater } else if t1 < range.beg { Fits::PartlyLower } else if t2 > range.end { Fits::PartlyGreater } else { Fits::Inside } } } } impl FilterFittingInside for MinMaxAvgDim1Bins { fn filter_fitting_inside(self, fit_range: NanoRange) -> Option { match self.fits_inside(fit_range) { Fits::Inside | Fits::PartlyGreater | Fits::PartlyLower | Fits::PartlyLowerAndGreater => Some(self), _ => None, } } } impl RangeOverlapInfo for MinMaxAvgDim1Bins { fn ends_before(&self, range: NanoRange) -> bool { match self.ts2s.last() { Some(&ts) => ts <= range.beg, None => true, } } fn ends_after(&self, range: NanoRange) -> bool { match self.ts2s.last() { Some(&ts) => ts > range.end, None => panic!(), } } fn starts_after(&self, range: NanoRange) -> bool { match self.ts1s.first() { Some(&ts) => ts >= range.end, None => panic!(), } } } impl TimeBins for MinMaxAvgDim1Bins where NTY: NumOps, { fn ts1s(&self) -> &Vec { &self.ts1s } fn ts2s(&self) -> &Vec { &self.ts2s } } impl WithLen for MinMaxAvgDim1Bins { fn len(&self) -> usize { self.ts1s.len() } } impl Appendable for MinMaxAvgDim1Bins where NTY: NumOps, { fn empty() -> Self { Self::empty() } fn append(&mut self, src: &Self) { self.ts1s.extend_from_slice(&src.ts1s); self.ts2s.extend_from_slice(&src.ts2s); self.counts.extend_from_slice(&src.counts); self.mins.extend_from_slice(&src.mins); self.maxs.extend_from_slice(&src.maxs); self.avgs.extend_from_slice(&src.avgs); } } impl ReadableFromFile for MinMaxAvgDim1Bins where NTY: NumOps, { // TODO this function is not needed in the trait: fn read_from_file(file: File) -> Result, Error> { Ok(ReadPbv::new(file)) } fn from_buf(buf: &[u8]) -> Result { let dec = serde_cbor::from_slice(&buf)?; Ok(dec) } } impl TimeBinnableType for MinMaxAvgDim1Bins where NTY: NumOps, { type Output = MinMaxAvgDim1Bins; type Aggregator = MinMaxAvgDim1BinsAggregator; fn aggregator(range: NanoRange, x_bin_count: usize) -> Self::Aggregator { Self::Aggregator::new(range, x_bin_count) } } impl ToJsonResult for Sitemty> where NTY: NumOps, { fn to_json_result(&self) -> Result, Error> { Ok(Box::new(serde_json::Value::String(format!( "MinMaxAvgDim1Bins/non-json-item" )))) } } pub struct MinMaxAvgDim1BinsCollected { _m1: PhantomData, } impl MinMaxAvgDim1BinsCollected { pub fn new() -> Self { Self { _m1: PhantomData } } } #[derive(Serialize)] pub struct MinMaxAvgDim1BinsCollectedResult { ts_bin_edges: Vec, counts: Vec, mins: Vec>>, maxs: Vec>>, avgs: Vec>>, #[serde(skip_serializing_if = "Bool::is_false", rename = "finalisedRange")] finalised_range: bool, #[serde(skip_serializing_if = "Zero::is_zero", rename = "missingBins")] missing_bins: u32, #[serde(skip_serializing_if = "Option::is_none", rename = "continueAt")] continue_at: Option, } pub struct MinMaxAvgDim1BinsCollector { bin_count_exp: u32, timed_out: bool, range_complete: bool, vals: MinMaxAvgDim1Bins, _m1: PhantomData, } impl MinMaxAvgDim1BinsCollector { pub fn new(bin_count_exp: u32) -> Self { Self { bin_count_exp, timed_out: false, range_complete: false, vals: MinMaxAvgDim1Bins::::empty(), _m1: PhantomData, } } } impl WithLen for MinMaxAvgDim1BinsCollector where NTY: NumOps + Serialize, { fn len(&self) -> usize { self.vals.ts1s.len() } } impl Collector for MinMaxAvgDim1BinsCollector where NTY: NumOps + Serialize, { type Input = MinMaxAvgDim1Bins; type Output = MinMaxAvgDim1BinsCollectedResult; fn ingest(&mut self, src: &Self::Input) { Appendable::append(&mut self.vals, src); } fn set_range_complete(&mut self) { self.range_complete = true; } fn set_timed_out(&mut self) { self.timed_out = true; } fn result(self) -> Result { let bin_count = self.vals.ts1s.len() as u32; let mut tsa: Vec<_> = self .vals .ts1s .iter() .map(|&k| IsoDateTime(Utc.timestamp_nanos(k as i64))) .collect(); if let Some(&z) = self.vals.ts2s.last() { tsa.push(IsoDateTime(Utc.timestamp_nanos(z as i64))); } let tsa = tsa; let continue_at = if self.vals.ts1s.len() < self.bin_count_exp as usize { match tsa.last() { Some(k) => Some(k.clone()), None => Err(Error::with_msg("partial_content but no bin in result"))?, } } else { None }; let ret = MinMaxAvgDim1BinsCollectedResult:: { ts_bin_edges: tsa, counts: self.vals.counts, mins: self.vals.mins, maxs: self.vals.maxs, avgs: self.vals.avgs, finalised_range: self.range_complete, missing_bins: self.bin_count_exp - bin_count, continue_at, }; Ok(ret) } } impl Collectable for MinMaxAvgDim1Bins where NTY: NumOps + Serialize, { type Collector = MinMaxAvgDim1BinsCollector; fn new_collector(bin_count_exp: u32) -> Self::Collector { Self::Collector::new(bin_count_exp) } } pub struct MinMaxAvgDim1BinsAggregator { range: NanoRange, count: u64, min: Option>, max: Option>, sumc: u64, sum: Option>, } impl MinMaxAvgDim1BinsAggregator { pub fn new(range: NanoRange, _x_bin_count: usize) -> Self { Self { range, count: 0, // TODO get rid of Option min: err::todoval(), max: None, sumc: 0, sum: None, } } } impl TimeBinnableTypeAggregator for MinMaxAvgDim1BinsAggregator where NTY: NumOps, { type Input = MinMaxAvgDim1Bins; type Output = MinMaxAvgDim1Bins; fn range(&self) -> &NanoRange { &self.range } fn ingest(&mut self, item: &Self::Input) { for i1 in 0..item.ts1s.len() { if item.ts2s[i1] <= self.range.beg { continue; } else if item.ts1s[i1] >= self.range.end { continue; } else { match self.min.as_mut() { None => self.min = item.mins[i1].clone(), Some(min) => match item.mins[i1].as_ref() { None => {} Some(v) => { for (a, b) in min.iter_mut().zip(v.iter()) { if *b < *a { *a = *b; } } } }, }; match self.max.as_mut() { None => self.max = item.maxs[i1].clone(), Some(max) => match item.maxs[i1].as_ref() { None => {} Some(v) => { for (a, b) in max.iter_mut().zip(v.iter()) { if *b > *a { *a = *b; } } } }, }; match self.sum.as_mut() { None => { self.sum = item.avgs[i1].clone(); } Some(sum) => match item.avgs[i1].as_ref() { None => {} Some(v) => { for (a, b) in sum.iter_mut().zip(v.iter()) { if (*b).is_nan() { } else { *a += *b; } } self.sumc += 1; } }, } self.count += item.counts[i1]; } } } fn result(self) -> Self::Output { let avg = if self.sumc == 0 { None } else { let avg = self .sum .as_ref() .unwrap() .iter() .map(|k| k / self.sumc as f32) .collect(); Some(avg) }; Self::Output { ts1s: vec![self.range.beg], ts2s: vec![self.range.end], counts: vec![self.count], mins: vec![self.min], maxs: vec![self.max], avgs: vec![avg], } } } #[derive(Serialize)] pub struct WaveEventsCollectedResult { #[serde(rename = "tsAnchor")] ts_anchor_sec: u64, #[serde(rename = "tsMs")] ts_off_ms: Vec, #[serde(rename = "tsNs")] ts_off_ns: Vec, values: Vec>, #[serde(skip_serializing_if = "Bool::is_false", rename = "finalisedRange")] range_complete: bool, #[serde(skip_serializing_if = "Bool::is_false", rename = "timedOut")] timed_out: bool, } pub struct WaveEventsCollector { vals: WaveEvents, range_complete: bool, timed_out: bool, } impl WaveEventsCollector { pub fn new(_bin_count_exp: u32) -> Self { info!("\n\nWaveEventsCollector\n\n"); Self { vals: WaveEvents::empty(), range_complete: false, timed_out: false, } } } impl WithLen for WaveEventsCollector { fn len(&self) -> usize { self.vals.tss.len() } } impl Collector for WaveEventsCollector where NTY: NumOps, { type Input = WaveEvents; type Output = WaveEventsCollectedResult; fn ingest(&mut self, src: &Self::Input) { self.vals.append(src); } fn set_range_complete(&mut self) { self.range_complete = true; } fn set_timed_out(&mut self) { self.timed_out = true; } fn result(self) -> Result { let tst = ts_offs_from_abs(&self.vals.tss); let ret = Self::Output { ts_anchor_sec: tst.0, ts_off_ms: tst.1, ts_off_ns: tst.2, values: self.vals.vals, range_complete: self.range_complete, timed_out: self.timed_out, }; Ok(ret) } } impl Collectable for WaveEvents where NTY: NumOps, { type Collector = WaveEventsCollector; fn new_collector(bin_count_exp: u32) -> Self::Collector { Self::Collector::new(bin_count_exp) } }