use crate::minmaxavgbins::MinMaxAvgBins; use crate::numops::NumOps; use crate::streams::{Collectable, Collector}; use crate::{ ts_offs_from_abs, Appendable, ByteEstimate, Clearable, EventAppendable, FilterFittingInside, Fits, FitsInside, PushableIndex, RangeOverlapInfo, ReadPbv, ReadableFromFile, SitemtyFrameType, TimeBinnableType, TimeBinnableTypeAggregator, WithLen, WithTimestamps, }; use err::Error; use netpod::timeunits::*; use netpod::NanoRange; use serde::{Deserialize, Serialize}; use std::fmt; use tokio::fs::File; // TODO add pulse. Is this even used?? // TODO change name, it's not only about values, but more like batch of whole events. #[derive(Serialize, Deserialize)] pub struct EventValues { pub tss: Vec, pub values: Vec, } impl SitemtyFrameType for EventValues where NTY: NumOps, { const FRAME_TYPE_ID: u32 = crate::EVENT_VALUES_FRAME_TYPE_ID + NTY::SUB; } impl EventValues { pub fn empty() -> Self { Self { tss: vec![], values: vec![], } } } impl fmt::Debug for EventValues where NTY: fmt::Debug, { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { write!( fmt, "count {} ts {:?} .. {:?} vals {:?} .. {:?}", self.tss.len(), self.tss.first(), self.tss.last(), self.values.first(), self.values.last(), ) } } impl WithLen for EventValues where NTY: NumOps, { fn len(&self) -> usize { self.tss.len() } } impl WithTimestamps for EventValues where NTY: NumOps, { fn ts(&self, ix: usize) -> u64 { self.tss[ix] } } impl ByteEstimate for EventValues where NTY: NumOps, { fn byte_estimate(&self) -> u64 { if self.tss.len() == 0 { 0 } else { // TODO improve via a const fn on NTY self.tss.len() as u64 * 16 } } } impl RangeOverlapInfo for EventValues { fn ends_before(&self, range: NanoRange) -> bool { match self.tss.last() { Some(&ts) => ts < range.beg, None => true, } } fn ends_after(&self, range: NanoRange) -> bool { match self.tss.last() { Some(&ts) => ts >= range.end, None => panic!(), } } fn starts_after(&self, range: NanoRange) -> bool { match self.tss.first() { Some(&ts) => ts >= range.end, None => panic!(), } } } impl FitsInside for EventValues { fn fits_inside(&self, range: NanoRange) -> Fits { if self.tss.is_empty() { Fits::Empty } else { let t1 = *self.tss.first().unwrap(); let t2 = *self.tss.last().unwrap(); if t2 < range.beg { Fits::Lower } else if t1 > range.end { Fits::Greater } else if t1 < range.beg && t2 > range.end { Fits::PartlyLowerAndGreater } else if t1 < range.beg { Fits::PartlyLower } else if t2 > range.end { Fits::PartlyGreater } else { Fits::Inside } } } } impl FilterFittingInside for EventValues { fn filter_fitting_inside(self, fit_range: NanoRange) -> Option { match self.fits_inside(fit_range) { Fits::Inside | Fits::PartlyGreater | Fits::PartlyLower | Fits::PartlyLowerAndGreater => Some(self), _ => None, } } } impl PushableIndex for EventValues where NTY: NumOps, { fn push_index(&mut self, src: &Self, ix: usize) { self.tss.push(src.tss[ix]); self.values.push(src.values[ix]); } } impl Appendable for EventValues where NTY: NumOps, { fn empty_like_self(&self) -> Self { Self::empty() } fn append(&mut self, src: &Self) { self.tss.extend_from_slice(&src.tss); self.values.extend_from_slice(&src.values); } } impl Clearable for EventValues { fn clear(&mut self) { self.tss.clear(); self.values.clear(); } } impl ReadableFromFile for EventValues where NTY: NumOps, { fn read_from_file(_file: File) -> Result, Error> { // TODO refactor types such that this can be removed. panic!() } fn from_buf(_buf: &[u8]) -> Result { panic!() } } impl TimeBinnableType for EventValues where NTY: NumOps, { type Output = MinMaxAvgBins; type Aggregator = EventValuesAggregator; fn aggregator(range: NanoRange, _bin_count: usize, do_time_weight: bool) -> Self::Aggregator { // TODO remove output if range.delta() > SEC * 5000 { netpod::log::info!("TimeBinnableType for EventValues aggregator() range {:?}", range); } Self::Aggregator::new(range, do_time_weight) } } pub struct EventValuesCollector { vals: EventValues, range_complete: bool, timed_out: bool, } impl EventValuesCollector { pub fn new() -> Self { Self { vals: EventValues::empty(), range_complete: false, timed_out: false, } } } impl WithLen for EventValuesCollector { fn len(&self) -> usize { self.vals.tss.len() } } #[derive(Serialize)] pub struct EventValuesCollectorOutput { #[serde(rename = "tsAnchor")] ts_anchor_sec: u64, #[serde(rename = "tsMs")] ts_off_ms: Vec, #[serde(rename = "tsNs")] ts_off_ns: Vec, values: Vec, #[serde(skip_serializing_if = "crate::bool_is_false", rename = "finalisedRange")] range_complete: bool, #[serde(skip_serializing_if = "crate::bool_is_false", rename = "timedOut")] timed_out: bool, } impl Collector for EventValuesCollector where NTY: NumOps, { type Input = EventValues; type Output = EventValuesCollectorOutput; fn ingest(&mut self, src: &Self::Input) { self.vals.append(src); } fn set_range_complete(&mut self) { self.range_complete = true; } fn set_timed_out(&mut self) { self.timed_out = true; } fn result(self) -> Result { let tst = ts_offs_from_abs(&self.vals.tss); let ret = Self::Output { ts_anchor_sec: tst.0, ts_off_ms: tst.1, ts_off_ns: tst.2, values: self.vals.values, range_complete: self.range_complete, timed_out: self.timed_out, }; Ok(ret) } } impl Collectable for EventValues where NTY: NumOps, { type Collector = EventValuesCollector; fn new_collector(_bin_count_exp: u32) -> Self::Collector { Self::Collector::new() } } pub struct EventValuesAggregator { range: NanoRange, count: u64, min: Option, max: Option, sumc: u64, sum: f32, int_ts: u64, last_ts: u64, last_val: Option, do_time_weight: bool, } impl EventValuesAggregator where NTY: NumOps, { pub fn new(range: NanoRange, do_time_weight: bool) -> Self { let int_ts = range.beg; Self { range, count: 0, min: None, max: None, sum: 0f32, sumc: 0, int_ts, last_ts: 0, last_val: None, do_time_weight, } } fn apply_min_max(&mut self, val: NTY) { self.min = match self.min { None => Some(val), Some(min) => { if val < min { Some(val) } else { Some(min) } } }; self.max = match self.max { None => Some(val), Some(max) => { if val > max { Some(val) } else { Some(max) } } }; } fn apply_event_unweight(&mut self, val: NTY) { self.apply_min_max(val); let vf = val.as_(); if vf.is_nan() { } else { self.sum += vf; self.sumc += 1; } } fn apply_event_time_weight(&mut self, ts: u64, val: Option) { if let Some(v) = self.last_val { self.apply_min_max(v); let w = if self.do_time_weight { (ts - self.int_ts) as f32 * 1e-9 } else { 1. }; let vf = v.as_(); if vf.is_nan() { } else { self.sum += vf * w; self.sumc += 1; } self.int_ts = ts; } self.last_ts = ts; self.last_val = val; } fn ingest_unweight(&mut self, item: &::Input) { for i1 in 0..item.tss.len() { let ts = item.tss[i1]; let val = item.values[i1]; if ts < self.range.beg { } else if ts >= self.range.end { } else { self.count += 1; self.apply_event_unweight(val); } } } fn ingest_time_weight(&mut self, item: &::Input) { for i1 in 0..item.tss.len() { let ts = item.tss[i1]; let val = item.values[i1]; if ts < self.int_ts { self.last_ts = ts; self.last_val = Some(val); } else if ts >= self.range.end { return; } else { self.count += 1; self.apply_event_time_weight(ts, Some(val)); } } } fn result_reset_unweight(&mut self, range: NanoRange, _expand: bool) -> MinMaxAvgBins { let avg = if self.sumc == 0 { None } else { Some(self.sum / self.sumc as f32) }; let ret = MinMaxAvgBins { ts1s: vec![self.range.beg], ts2s: vec![self.range.end], counts: vec![self.count], mins: vec![self.min], maxs: vec![self.max], avgs: vec![avg], }; self.range = range; self.count = 0; self.min = None; self.max = None; self.sum = 0f32; self.sumc = 0; ret } fn result_reset_time_weight(&mut self, range: NanoRange, expand: bool) -> MinMaxAvgBins { if expand { self.apply_event_time_weight(self.range.end, self.last_val); } let avg = { let sc = self.range.delta() as f32 * 1e-9; Some(self.sum / sc) }; let ret = MinMaxAvgBins { ts1s: vec![self.range.beg], ts2s: vec![self.range.end], counts: vec![self.count], mins: vec![self.min], maxs: vec![self.max], avgs: vec![avg], }; self.range = range; self.count = 0; self.min = None; self.max = None; self.sum = 0f32; self.sumc = 0; ret } } impl TimeBinnableTypeAggregator for EventValuesAggregator where NTY: NumOps, { type Input = EventValues; type Output = MinMaxAvgBins; fn range(&self) -> &NanoRange { &self.range } fn ingest(&mut self, item: &Self::Input) { if self.do_time_weight { self.ingest_time_weight(item) } else { self.ingest_unweight(item) } } fn result_reset(&mut self, range: NanoRange, expand: bool) -> Self::Output { if self.do_time_weight { self.result_reset_time_weight(range, expand) } else { self.result_reset_unweight(range, expand) } } } impl EventAppendable for EventValues where NTY: NumOps, { type Value = NTY; fn append_event(ret: Option, ts: u64, value: Self::Value) -> Self { let mut ret = if let Some(ret) = ret { ret } else { Self::empty() }; ret.tss.push(ts); ret.values.push(value); ret } }