Move types into separate module

This commit is contained in:
Dominik Werder
2021-07-07 17:12:22 +02:00
parent a8f15da101
commit c92e266662
41 changed files with 3516 additions and 3546 deletions

341
items/src/eventvalues.rs Normal file
View File

@@ -0,0 +1,341 @@
use crate::minmaxavgbins::MinMaxAvgBins;
use crate::numops::NumOps;
use crate::streams::{Collectable, Collector};
use crate::{
ts_offs_from_abs, Appendable, FilterFittingInside, Fits, FitsInside, PushableIndex, RangeOverlapInfo, ReadPbv,
ReadableFromFile, SitemtyFrameType, TimeBinnableType, TimeBinnableTypeAggregator, WithLen, WithTimestamps,
};
use err::Error;
use netpod::NanoRange;
use serde::{Deserialize, Serialize};
use std::fmt;
use tokio::fs::File;
// TODO add pulse.
// TODO change name, it's not only about values, but more like batch of whole events.
#[derive(Serialize, Deserialize)]
pub struct EventValues<VT> {
pub tss: Vec<u64>,
pub values: Vec<VT>,
}
impl<NTY> SitemtyFrameType for EventValues<NTY>
where
NTY: NumOps,
{
const FRAME_TYPE_ID: u32 = 0x500 + NTY::SUB;
}
impl<VT> EventValues<VT> {
pub fn empty() -> Self {
Self {
tss: vec![],
values: vec![],
}
}
}
impl<VT> fmt::Debug for EventValues<VT>
where
VT: fmt::Debug,
{
fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(
fmt,
"count {} ts {:?} .. {:?} vals {:?} .. {:?}",
self.tss.len(),
self.tss.first(),
self.tss.last(),
self.values.first(),
self.values.last(),
)
}
}
impl<VT> WithLen for EventValues<VT> {
fn len(&self) -> usize {
self.tss.len()
}
}
impl<VT> WithTimestamps for EventValues<VT> {
fn ts(&self, ix: usize) -> u64 {
self.tss[ix]
}
}
impl<VT> RangeOverlapInfo for EventValues<VT> {
fn ends_before(&self, range: NanoRange) -> bool {
match self.tss.last() {
Some(&ts) => ts < range.beg,
None => true,
}
}
fn ends_after(&self, range: NanoRange) -> bool {
match self.tss.last() {
Some(&ts) => ts >= range.end,
None => panic!(),
}
}
fn starts_after(&self, range: NanoRange) -> bool {
match self.tss.first() {
Some(&ts) => ts >= range.end,
None => panic!(),
}
}
}
impl<VT> FitsInside for EventValues<VT> {
fn fits_inside(&self, range: NanoRange) -> Fits {
if self.tss.is_empty() {
Fits::Empty
} else {
let t1 = *self.tss.first().unwrap();
let t2 = *self.tss.last().unwrap();
if t2 < range.beg {
Fits::Lower
} else if t1 > range.end {
Fits::Greater
} else if t1 < range.beg && t2 > range.end {
Fits::PartlyLowerAndGreater
} else if t1 < range.beg {
Fits::PartlyLower
} else if t2 > range.end {
Fits::PartlyGreater
} else {
Fits::Inside
}
}
}
}
impl<VT> FilterFittingInside for EventValues<VT> {
fn filter_fitting_inside(self, fit_range: NanoRange) -> Option<Self> {
match self.fits_inside(fit_range) {
Fits::Inside | Fits::PartlyGreater | Fits::PartlyLower | Fits::PartlyLowerAndGreater => Some(self),
_ => None,
}
}
}
impl<NTY> PushableIndex for EventValues<NTY>
where
NTY: NumOps,
{
fn push_index(&mut self, src: &Self, ix: usize) {
self.tss.push(src.tss[ix]);
self.values.push(src.values[ix]);
}
}
impl<NTY> Appendable for EventValues<NTY>
where
NTY: NumOps,
{
fn empty() -> Self {
Self::empty()
}
fn append(&mut self, src: &Self) {
self.tss.extend_from_slice(&src.tss);
self.values.extend_from_slice(&src.values);
}
}
impl<NTY> ReadableFromFile for EventValues<NTY>
where
NTY: NumOps,
{
fn read_from_file(_file: File) -> Result<ReadPbv<Self>, Error> {
// TODO refactor types such that this can be removed.
panic!()
}
fn from_buf(_buf: &[u8]) -> Result<Self, Error> {
panic!()
}
}
impl<NTY> TimeBinnableType for EventValues<NTY>
where
NTY: NumOps,
{
type Output = MinMaxAvgBins<NTY>;
type Aggregator = EventValuesAggregator<NTY>;
fn aggregator(range: NanoRange, _bin_count: usize) -> Self::Aggregator {
Self::Aggregator::new(range)
}
}
pub struct EventValuesCollector<NTY> {
vals: EventValues<NTY>,
range_complete: bool,
timed_out: bool,
}
impl<NTY> EventValuesCollector<NTY> {
pub fn new() -> Self {
Self {
vals: EventValues::empty(),
range_complete: false,
timed_out: false,
}
}
}
impl<NTY> WithLen for EventValuesCollector<NTY> {
fn len(&self) -> usize {
self.vals.tss.len()
}
}
#[derive(Serialize)]
pub struct EventValuesCollectorOutput<NTY> {
#[serde(rename = "tsAnchor")]
ts_anchor_sec: u64,
#[serde(rename = "tsMs")]
ts_off_ms: Vec<u64>,
#[serde(rename = "tsNs")]
ts_off_ns: Vec<u64>,
values: Vec<NTY>,
#[serde(skip_serializing_if = "crate::bool_is_false", rename = "finalisedRange")]
range_complete: bool,
#[serde(skip_serializing_if = "crate::bool_is_false", rename = "timedOut")]
timed_out: bool,
}
impl<NTY> Collector for EventValuesCollector<NTY>
where
NTY: NumOps,
{
type Input = EventValues<NTY>;
type Output = EventValuesCollectorOutput<NTY>;
fn ingest(&mut self, src: &Self::Input) {
self.vals.append(src);
}
fn set_range_complete(&mut self) {
self.range_complete = true;
}
fn set_timed_out(&mut self) {
self.timed_out = true;
}
fn result(self) -> Result<Self::Output, Error> {
let tst = ts_offs_from_abs(&self.vals.tss);
let ret = Self::Output {
ts_anchor_sec: tst.0,
ts_off_ms: tst.1,
ts_off_ns: tst.2,
values: self.vals.values,
range_complete: self.range_complete,
timed_out: self.timed_out,
};
Ok(ret)
}
}
impl<NTY> Collectable for EventValues<NTY>
where
NTY: NumOps,
{
type Collector = EventValuesCollector<NTY>;
fn new_collector(_bin_count_exp: u32) -> Self::Collector {
Self::Collector::new()
}
}
pub struct EventValuesAggregator<NTY> {
range: NanoRange,
count: u64,
min: Option<NTY>,
max: Option<NTY>,
sumc: u64,
sum: f32,
}
impl<NTY> EventValuesAggregator<NTY> {
pub fn new(range: NanoRange) -> Self {
Self {
range,
count: 0,
min: None,
max: None,
sum: 0f32,
sumc: 0,
}
}
}
impl<NTY> TimeBinnableTypeAggregator for EventValuesAggregator<NTY>
where
NTY: NumOps,
{
type Input = EventValues<NTY>;
type Output = MinMaxAvgBins<NTY>;
fn range(&self) -> &NanoRange {
&self.range
}
fn ingest(&mut self, item: &Self::Input) {
for i1 in 0..item.tss.len() {
let ts = item.tss[i1];
if ts < self.range.beg {
continue;
} else if ts >= self.range.end {
continue;
} else {
let v = item.values[i1];
self.min = match self.min {
None => Some(v),
Some(min) => {
if v < min {
Some(v)
} else {
Some(min)
}
}
};
self.max = match self.max {
None => Some(v),
Some(max) => {
if v > max {
Some(v)
} else {
Some(max)
}
}
};
let vf = v.as_();
if vf.is_nan() {
} else {
self.sum += vf;
self.sumc += 1;
}
self.count += 1;
}
}
}
fn result(self) -> Self::Output {
let avg = if self.sumc == 0 {
None
} else {
Some(self.sum / self.sumc as f32)
};
Self::Output {
ts1s: vec![self.range.beg],
ts2s: vec![self.range.end],
counts: vec![self.count],
mins: vec![self.min],
maxs: vec![self.max],
avgs: vec![avg],
}
}
}

96
items/src/frame.rs Normal file
View File

@@ -0,0 +1,96 @@
use crate::inmem::InMemoryFrame;
use crate::{FrameType, INMEM_FRAME_ENCID, INMEM_FRAME_HEAD, INMEM_FRAME_MAGIC};
use bytes::{BufMut, Bytes, BytesMut};
use err::Error;
use serde::{de::DeserializeOwned, Serialize};
pub trait MakeBytesFrame {
fn make_bytes_frame(&self) -> Result<Bytes, Error> {
// TODO only implemented for one type, remove
err::todoval()
}
}
pub fn make_frame<FT>(item: &FT) -> Result<BytesMut, Error>
where
FT: FrameType + Serialize,
{
match bincode::serialize(item) {
Ok(enc) => {
if enc.len() > u32::MAX as usize {
return Err(Error::with_msg(format!("too long payload {}", enc.len())));
}
let mut h = crc32fast::Hasher::new();
h.update(&enc);
let payload_crc = h.finalize();
let mut buf = BytesMut::with_capacity(enc.len() + INMEM_FRAME_HEAD);
buf.put_u32_le(INMEM_FRAME_MAGIC);
buf.put_u32_le(INMEM_FRAME_ENCID);
buf.put_u32_le(FT::FRAME_TYPE_ID);
buf.put_u32_le(enc.len() as u32);
buf.put_u32_le(payload_crc);
buf.put(enc.as_ref());
let mut h = crc32fast::Hasher::new();
h.update(&buf);
let frame_crc = h.finalize();
buf.put_u32_le(frame_crc);
Ok(buf)
}
Err(e) => Err(e)?,
}
}
pub fn make_term_frame() -> BytesMut {
let mut h = crc32fast::Hasher::new();
h.update(&[]);
let payload_crc = h.finalize();
let mut buf = BytesMut::with_capacity(INMEM_FRAME_HEAD);
buf.put_u32_le(INMEM_FRAME_MAGIC);
buf.put_u32_le(INMEM_FRAME_ENCID);
buf.put_u32_le(0x01);
buf.put_u32_le(0);
buf.put_u32_le(payload_crc);
let mut h = crc32fast::Hasher::new();
h.update(&buf);
let frame_crc = h.finalize();
buf.put_u32_le(frame_crc);
buf
}
pub fn decode_frame<T>(frame: &InMemoryFrame) -> Result<T, Error>
where
T: FrameType + DeserializeOwned,
{
if frame.encid() != INMEM_FRAME_ENCID {
return Err(Error::with_msg(format!("unknown encoder id {:?}", frame)));
}
if frame.tyid() != <T as FrameType>::FRAME_TYPE_ID {
return Err(Error::with_msg(format!(
"type id mismatch expect {:x} found {:?}",
<T as FrameType>::FRAME_TYPE_ID,
frame
)));
}
if frame.len() as usize != frame.buf().len() {
return Err(Error::with_msg(format!(
"buf mismatch {} vs {} in {:?}",
frame.len(),
frame.buf().len(),
frame
)));
}
match bincode::deserialize(frame.buf()) {
Ok(item) => Ok(item),
Err(e) => Err(e.into()),
}
}
pub fn crchex<T>(t: T) -> String
where
T: AsRef<[u8]>,
{
let mut h = crc32fast::Hasher::new();
h.update(t.as_ref());
let crc = h.finalize();
format!("{:08x}", crc)
}

34
items/src/inmem.rs Normal file
View File

@@ -0,0 +1,34 @@
use bytes::Bytes;
use std::fmt;
pub struct InMemoryFrame {
pub encid: u32,
pub tyid: u32,
pub len: u32,
pub buf: Bytes,
}
impl InMemoryFrame {
pub fn encid(&self) -> u32 {
self.encid
}
pub fn tyid(&self) -> u32 {
self.tyid
}
pub fn len(&self) -> u32 {
self.len
}
pub fn buf(&self) -> &Bytes {
&self.buf
}
}
impl fmt::Debug for InMemoryFrame {
fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(
fmt,
"InMemoryFrame {{ encid: {:x} tyid: {:x} len {} }}",
self.encid, self.tyid, self.len
)
}
}

View File

@@ -1,9 +1,35 @@
use crate::eventvalues::EventValues;
use crate::numops::BoolNum;
use bytes::BytesMut;
use chrono::{TimeZone, Utc};
use err::Error;
use netpod::{log::Level, BoolNum, EventDataReadStats, EventQueryJsonStringFrame};
use serde::de::{self, Visitor};
use serde::{Deserialize, Serialize};
use netpod::timeunits::{MS, SEC};
use netpod::{log::Level, AggKind, EventDataReadStats, EventQueryJsonStringFrame, NanoRange, Shape};
use serde::de::{self, DeserializeOwned, Visitor};
use serde::{Deserialize, Serialize, Serializer};
use std::fmt;
use std::future::Future;
use std::marker::PhantomData;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::fs::File;
use tokio::io::{AsyncRead, ReadBuf};
pub mod eventvalues;
pub mod frame;
pub mod inmem;
pub mod minmaxavgbins;
pub mod minmaxavgdim1bins;
pub mod minmaxavgwavebins;
pub mod numops;
pub mod streams;
pub mod waveevents;
pub mod xbinnedscalarevents;
pub mod xbinnedwaveevents;
pub fn bool_is_false(j: &bool) -> bool {
*j == false
}
#[derive(Debug, Serialize, Deserialize)]
pub enum RangeCompletableItem<T> {
@@ -313,3 +339,175 @@ where
}
}
*/
pub trait EventsNodeProcessor: Send + Unpin {
type Input;
type Output: Send + Unpin + DeserializeOwned + WithTimestamps + TimeBinnableType;
fn create(shape: Shape, agg_kind: AggKind) -> Self;
fn process(&self, inp: EventValues<Self::Input>) -> Self::Output;
}
#[derive(Clone, Debug, Deserialize)]
pub struct IsoDateTime(chrono::DateTime<Utc>);
impl Serialize for IsoDateTime {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
serializer.serialize_str(&self.0.format("%Y-%m-%dT%H:%M:%S.%3fZ").to_string())
}
}
pub fn make_iso_ts(tss: &[u64]) -> Vec<IsoDateTime> {
tss.iter()
.map(|&k| IsoDateTime(Utc.timestamp_nanos(k as i64)))
.collect()
}
pub enum Fits {
Empty,
Lower,
Greater,
Inside,
PartlyLower,
PartlyGreater,
PartlyLowerAndGreater,
}
pub trait WithLen {
fn len(&self) -> usize;
}
pub trait WithTimestamps {
fn ts(&self, ix: usize) -> u64;
}
pub trait RangeOverlapInfo {
fn ends_before(&self, range: NanoRange) -> bool;
fn ends_after(&self, range: NanoRange) -> bool;
fn starts_after(&self, range: NanoRange) -> bool;
}
pub trait FitsInside {
fn fits_inside(&self, range: NanoRange) -> Fits;
}
pub trait FilterFittingInside: Sized {
fn filter_fitting_inside(self, fit_range: NanoRange) -> Option<Self>;
}
pub trait PushableIndex {
// TODO check whether it makes sense to allow a move out of src. Or use a deque for src type and pop?
fn push_index(&mut self, src: &Self, ix: usize);
}
pub trait Appendable: WithLen {
fn empty() -> Self;
fn append(&mut self, src: &Self);
}
pub trait TimeBins: Send + Unpin + WithLen + Appendable + FilterFittingInside {
fn ts1s(&self) -> &Vec<u64>;
fn ts2s(&self) -> &Vec<u64>;
}
pub trait TimeBinnableType:
Send + Unpin + RangeOverlapInfo + FilterFittingInside + Appendable + Serialize + ReadableFromFile
{
type Output: TimeBinnableType;
type Aggregator: TimeBinnableTypeAggregator<Input = Self, Output = Self::Output> + Send + Unpin;
fn aggregator(range: NanoRange, bin_count: usize) -> Self::Aggregator;
}
// TODO should get I/O and tokio dependence out of this crate
pub trait ReadableFromFile: Sized {
fn read_from_file(file: File) -> Result<ReadPbv<Self>, Error>;
// TODO should not need this:
fn from_buf(buf: &[u8]) -> Result<Self, Error>;
}
// TODO should get I/O and tokio dependence out of this crate
pub struct ReadPbv<T>
where
T: ReadableFromFile,
{
buf: Vec<u8>,
all: Vec<u8>,
file: Option<File>,
_m1: PhantomData<T>,
}
impl<T> ReadPbv<T>
where
T: ReadableFromFile,
{
fn new(file: File) -> Self {
Self {
// TODO make buffer size a parameter:
buf: vec![0; 1024 * 32],
all: vec![],
file: Some(file),
_m1: PhantomData,
}
}
}
impl<T> Future for ReadPbv<T>
where
T: ReadableFromFile + Unpin,
{
type Output = Result<StreamItem<RangeCompletableItem<T>>, Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
use Poll::*;
let mut buf = std::mem::replace(&mut self.buf, Vec::new());
let ret = 'outer: loop {
let mut dst = ReadBuf::new(&mut buf);
if dst.remaining() == 0 || dst.capacity() == 0 {
break Ready(Err(Error::with_msg("bad read buffer")));
}
let fp = self.file.as_mut().unwrap();
let f = Pin::new(fp);
break match File::poll_read(f, cx, &mut dst) {
Ready(res) => match res {
Ok(_) => {
if dst.filled().len() > 0 {
self.all.extend_from_slice(dst.filled());
continue 'outer;
} else {
match T::from_buf(&mut self.all) {
Ok(item) => Ready(Ok(StreamItem::DataItem(RangeCompletableItem::Data(item)))),
Err(e) => Ready(Err(e)),
}
}
}
Err(e) => Ready(Err(e.into())),
},
Pending => Pending,
};
};
self.buf = buf;
ret
}
}
pub fn ts_offs_from_abs(tss: &[u64]) -> (u64, Vec<u64>, Vec<u64>) {
let ts_anchor_sec = tss.first().map_or(0, |&k| k) / SEC;
let ts_anchor_ns = ts_anchor_sec * SEC;
let ts_off_ms: Vec<_> = tss.iter().map(|&k| (k - ts_anchor_ns) / MS).collect();
let ts_off_ns = tss
.iter()
.zip(ts_off_ms.iter().map(|&k| k * MS))
.map(|(&j, k)| (j - ts_anchor_ns - k))
.collect();
(ts_anchor_sec, ts_off_ms, ts_off_ns)
}
pub trait TimeBinnableTypeAggregator: Send {
type Input: TimeBinnableType;
type Output: TimeBinnableType;
fn range(&self) -> &NanoRange;
fn ingest(&mut self, item: &Self::Input);
fn result(self) -> Self::Output;
}

420
items/src/minmaxavgbins.rs Normal file
View File

@@ -0,0 +1,420 @@
use crate::numops::NumOps;
use crate::streams::{Collectable, Collector, ToJsonBytes, ToJsonResult};
use crate::{
ts_offs_from_abs, Appendable, FilterFittingInside, Fits, FitsInside, IsoDateTime, RangeOverlapInfo, ReadPbv,
ReadableFromFile, Sitemty, SitemtyFrameType, SubFrId, TimeBinnableType, TimeBinnableTypeAggregator, TimeBins,
WithLen,
};
use chrono::{TimeZone, Utc};
use err::Error;
use netpod::timeunits::SEC;
use netpod::NanoRange;
use num_traits::Zero;
use serde::{Deserialize, Serialize};
use std::fmt;
use std::marker::PhantomData;
use tokio::fs::File;
#[derive(Clone, Serialize, Deserialize)]
pub struct MinMaxAvgBins<NTY> {
pub ts1s: Vec<u64>,
pub ts2s: Vec<u64>,
pub counts: Vec<u64>,
// TODO get rid of Option:
pub mins: Vec<Option<NTY>>,
pub maxs: Vec<Option<NTY>>,
pub avgs: Vec<Option<f32>>,
}
impl<NTY> SitemtyFrameType for MinMaxAvgBins<NTY>
where
NTY: SubFrId,
{
const FRAME_TYPE_ID: u32 = 0x700 + NTY::SUB;
}
impl<NTY> fmt::Debug for MinMaxAvgBins<NTY>
where
NTY: fmt::Debug,
{
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
write!(
fmt,
"MinMaxAvgBins count {} ts1s {:?} ts2s {:?} counts {:?} mins {:?} maxs {:?} avgs {:?}",
self.ts1s.len(),
self.ts1s.iter().map(|k| k / SEC).collect::<Vec<_>>(),
self.ts2s.iter().map(|k| k / SEC).collect::<Vec<_>>(),
self.counts,
self.mins,
self.maxs,
self.avgs,
)
}
}
impl<NTY> MinMaxAvgBins<NTY> {
pub fn empty() -> Self {
Self {
ts1s: vec![],
ts2s: vec![],
counts: vec![],
mins: vec![],
maxs: vec![],
avgs: vec![],
}
}
}
impl<NTY> FitsInside for MinMaxAvgBins<NTY> {
fn fits_inside(&self, range: NanoRange) -> Fits {
if self.ts1s.is_empty() {
Fits::Empty
} else {
let t1 = *self.ts1s.first().unwrap();
let t2 = *self.ts2s.last().unwrap();
if t2 <= range.beg {
Fits::Lower
} else if t1 >= range.end {
Fits::Greater
} else if t1 < range.beg && t2 > range.end {
Fits::PartlyLowerAndGreater
} else if t1 < range.beg {
Fits::PartlyLower
} else if t2 > range.end {
Fits::PartlyGreater
} else {
Fits::Inside
}
}
}
}
impl<NTY> FilterFittingInside for MinMaxAvgBins<NTY> {
fn filter_fitting_inside(self, fit_range: NanoRange) -> Option<Self> {
match self.fits_inside(fit_range) {
Fits::Inside | Fits::PartlyGreater | Fits::PartlyLower | Fits::PartlyLowerAndGreater => Some(self),
_ => None,
}
}
}
impl<NTY> RangeOverlapInfo for MinMaxAvgBins<NTY> {
fn ends_before(&self, range: NanoRange) -> bool {
match self.ts2s.last() {
Some(&ts) => ts <= range.beg,
None => true,
}
}
fn ends_after(&self, range: NanoRange) -> bool {
match self.ts2s.last() {
Some(&ts) => ts > range.end,
None => panic!(),
}
}
fn starts_after(&self, range: NanoRange) -> bool {
match self.ts1s.first() {
Some(&ts) => ts >= range.end,
None => panic!(),
}
}
}
impl<NTY> TimeBins for MinMaxAvgBins<NTY>
where
NTY: NumOps,
{
fn ts1s(&self) -> &Vec<u64> {
&self.ts1s
}
fn ts2s(&self) -> &Vec<u64> {
&self.ts2s
}
}
impl<NTY> WithLen for MinMaxAvgBins<NTY> {
fn len(&self) -> usize {
self.ts1s.len()
}
}
impl<NTY> Appendable for MinMaxAvgBins<NTY>
where
NTY: NumOps,
{
fn empty() -> Self {
Self::empty()
}
fn append(&mut self, src: &Self) {
self.ts1s.extend_from_slice(&src.ts1s);
self.ts2s.extend_from_slice(&src.ts2s);
self.counts.extend_from_slice(&src.counts);
self.mins.extend_from_slice(&src.mins);
self.maxs.extend_from_slice(&src.maxs);
self.avgs.extend_from_slice(&src.avgs);
}
}
impl<NTY> ReadableFromFile for MinMaxAvgBins<NTY>
where
NTY: NumOps,
{
// TODO this function is not needed in the trait:
fn read_from_file(file: File) -> Result<ReadPbv<Self>, Error> {
Ok(ReadPbv::new(file))
}
fn from_buf(buf: &[u8]) -> Result<Self, Error> {
let dec = serde_cbor::from_slice(&buf)?;
Ok(dec)
}
}
impl<NTY> TimeBinnableType for MinMaxAvgBins<NTY>
where
NTY: NumOps,
{
type Output = MinMaxAvgBins<NTY>;
type Aggregator = MinMaxAvgBinsAggregator<NTY>;
fn aggregator(range: NanoRange, _x_bin_count: usize) -> Self::Aggregator {
Self::Aggregator::new(range)
}
}
impl<NTY> ToJsonResult for Sitemty<MinMaxAvgBins<NTY>>
where
NTY: NumOps,
{
fn to_json_result(&self) -> Result<Box<dyn ToJsonBytes>, Error> {
Ok(Box::new(serde_json::Value::String(format!(
"MinMaxAvgBins/non-json-item"
))))
}
}
pub struct MinMaxAvgBinsCollected<NTY> {
_m1: PhantomData<NTY>,
}
impl<NTY> MinMaxAvgBinsCollected<NTY> {
pub fn new() -> Self {
Self { _m1: PhantomData }
}
}
#[derive(Serialize)]
pub struct MinMaxAvgBinsCollectedResult<NTY> {
#[serde(rename = "tsAnchor")]
ts_anchor_sec: u64,
#[serde(rename = "tsMs")]
ts_off_ms: Vec<u64>,
#[serde(rename = "tsNs")]
ts_off_ns: Vec<u64>,
//ts_bin_edges: Vec<IsoDateTime>,
counts: Vec<u64>,
mins: Vec<Option<NTY>>,
maxs: Vec<Option<NTY>>,
avgs: Vec<Option<f32>>,
#[serde(skip_serializing_if = "crate::bool_is_false", rename = "finalisedRange")]
finalised_range: bool,
#[serde(skip_serializing_if = "Zero::is_zero", rename = "missingBins")]
missing_bins: u32,
#[serde(skip_serializing_if = "Option::is_none", rename = "continueAt")]
continue_at: Option<IsoDateTime>,
}
pub struct MinMaxAvgBinsCollector<NTY> {
bin_count_exp: u32,
timed_out: bool,
range_complete: bool,
vals: MinMaxAvgBins<NTY>,
_m1: PhantomData<NTY>,
}
impl<NTY> MinMaxAvgBinsCollector<NTY> {
pub fn new(bin_count_exp: u32) -> Self {
Self {
bin_count_exp,
timed_out: false,
range_complete: false,
vals: MinMaxAvgBins::<NTY>::empty(),
_m1: PhantomData,
}
}
}
impl<NTY> WithLen for MinMaxAvgBinsCollector<NTY>
where
NTY: NumOps + Serialize,
{
fn len(&self) -> usize {
self.vals.ts1s.len()
}
}
impl<NTY> Collector for MinMaxAvgBinsCollector<NTY>
where
NTY: NumOps + Serialize,
{
type Input = MinMaxAvgBins<NTY>;
type Output = MinMaxAvgBinsCollectedResult<NTY>;
fn ingest(&mut self, src: &Self::Input) {
Appendable::append(&mut self.vals, src);
}
fn set_range_complete(&mut self) {
self.range_complete = true;
}
fn set_timed_out(&mut self) {
self.timed_out = true;
}
fn result(self) -> Result<Self::Output, Error> {
let bin_count = self.vals.ts1s.len() as u32;
// TODO could save the copy:
let mut ts_all = self.vals.ts1s.clone();
if self.vals.ts2s.len() > 0 {
ts_all.push(*self.vals.ts2s.last().unwrap());
}
let continue_at = if self.vals.ts1s.len() < self.bin_count_exp as usize {
match ts_all.last() {
Some(&k) => {
let iso = IsoDateTime(Utc.timestamp_nanos(k as i64));
Some(iso)
}
None => Err(Error::with_msg("partial_content but no bin in result"))?,
}
} else {
None
};
let tst = ts_offs_from_abs(&ts_all);
let ret = MinMaxAvgBinsCollectedResult::<NTY> {
ts_anchor_sec: tst.0,
ts_off_ms: tst.1,
ts_off_ns: tst.2,
counts: self.vals.counts,
mins: self.vals.mins,
maxs: self.vals.maxs,
avgs: self.vals.avgs,
finalised_range: self.range_complete,
missing_bins: self.bin_count_exp - bin_count,
continue_at,
};
Ok(ret)
}
}
impl<NTY> Collectable for MinMaxAvgBins<NTY>
where
NTY: NumOps + Serialize,
{
type Collector = MinMaxAvgBinsCollector<NTY>;
fn new_collector(bin_count_exp: u32) -> Self::Collector {
Self::Collector::new(bin_count_exp)
}
}
pub struct MinMaxAvgBinsAggregator<NTY> {
range: NanoRange,
count: u64,
min: Option<NTY>,
max: Option<NTY>,
sumc: u64,
sum: f32,
}
impl<NTY> MinMaxAvgBinsAggregator<NTY> {
pub fn new(range: NanoRange) -> Self {
Self {
range,
count: 0,
min: None,
max: None,
sumc: 0,
sum: 0f32,
}
}
}
impl<NTY> TimeBinnableTypeAggregator for MinMaxAvgBinsAggregator<NTY>
where
NTY: NumOps,
{
type Input = MinMaxAvgBins<NTY>;
type Output = MinMaxAvgBins<NTY>;
fn range(&self) -> &NanoRange {
&self.range
}
fn ingest(&mut self, item: &Self::Input) {
for i1 in 0..item.ts1s.len() {
if item.ts2s[i1] <= self.range.beg {
continue;
} else if item.ts1s[i1] >= self.range.end {
continue;
} else {
self.min = match self.min {
None => item.mins[i1],
Some(min) => match item.mins[i1] {
None => Some(min),
Some(v) => {
if v < min {
Some(v)
} else {
Some(min)
}
}
},
};
self.max = match self.max {
None => item.maxs[i1],
Some(max) => match item.maxs[i1] {
None => Some(max),
Some(v) => {
if v > max {
Some(v)
} else {
Some(max)
}
}
},
};
match item.avgs[i1] {
None => {}
Some(v) => {
if v.is_nan() {
} else {
self.sum += v;
self.sumc += 1;
}
}
}
self.count += item.counts[i1];
}
}
}
fn result(self) -> Self::Output {
let avg = if self.sumc == 0 {
None
} else {
Some(self.sum / self.sumc as f32)
};
Self::Output {
ts1s: vec![self.range.beg],
ts2s: vec![self.range.end],
counts: vec![self.count],
mins: vec![self.min],
maxs: vec![self.max],
avgs: vec![avg],
}
}
}

View File

@@ -0,0 +1,511 @@
use crate::numops::NumOps;
use crate::streams::{Collectable, Collector, ToJsonBytes, ToJsonResult};
use crate::waveevents::WaveEvents;
use crate::{
ts_offs_from_abs, Appendable, FilterFittingInside, Fits, FitsInside, IsoDateTime, RangeOverlapInfo, ReadPbv,
ReadableFromFile, Sitemty, SitemtyFrameType, SubFrId, TimeBinnableType, TimeBinnableTypeAggregator, TimeBins,
WithLen,
};
use chrono::{TimeZone, Utc};
use err::Error;
use netpod::log::*;
use netpod::timeunits::SEC;
use netpod::NanoRange;
use num_traits::Zero;
use serde::{Deserialize, Serialize};
use std::fmt;
use std::marker::PhantomData;
use tokio::fs::File;
#[derive(Serialize, Deserialize)]
pub struct MinMaxAvgDim1Bins<NTY> {
pub ts1s: Vec<u64>,
pub ts2s: Vec<u64>,
pub counts: Vec<u64>,
pub mins: Vec<Option<Vec<NTY>>>,
pub maxs: Vec<Option<Vec<NTY>>>,
pub avgs: Vec<Option<Vec<f32>>>,
}
impl<NTY> SitemtyFrameType for MinMaxAvgDim1Bins<NTY>
where
NTY: SubFrId,
{
const FRAME_TYPE_ID: u32 = 0xb00 + NTY::SUB;
}
impl<NTY> fmt::Debug for MinMaxAvgDim1Bins<NTY>
where
NTY: fmt::Debug,
{
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
write!(
fmt,
"MinMaxAvgDim1Bins count {} ts1s {:?} ts2s {:?} counts {:?} mins {:?} maxs {:?} avgs {:?}",
self.ts1s.len(),
self.ts1s.iter().map(|k| k / SEC).collect::<Vec<_>>(),
self.ts2s.iter().map(|k| k / SEC).collect::<Vec<_>>(),
self.counts,
self.mins.first(),
self.maxs.first(),
self.avgs.first(),
)
}
}
impl<NTY> MinMaxAvgDim1Bins<NTY> {
pub fn empty() -> Self {
Self {
ts1s: vec![],
ts2s: vec![],
counts: vec![],
mins: vec![],
maxs: vec![],
avgs: vec![],
}
}
}
impl<NTY> FitsInside for MinMaxAvgDim1Bins<NTY> {
fn fits_inside(&self, range: NanoRange) -> Fits {
if self.ts1s.is_empty() {
Fits::Empty
} else {
let t1 = *self.ts1s.first().unwrap();
let t2 = *self.ts2s.last().unwrap();
if t2 <= range.beg {
Fits::Lower
} else if t1 >= range.end {
Fits::Greater
} else if t1 < range.beg && t2 > range.end {
Fits::PartlyLowerAndGreater
} else if t1 < range.beg {
Fits::PartlyLower
} else if t2 > range.end {
Fits::PartlyGreater
} else {
Fits::Inside
}
}
}
}
impl<NTY> FilterFittingInside for MinMaxAvgDim1Bins<NTY> {
fn filter_fitting_inside(self, fit_range: NanoRange) -> Option<Self> {
match self.fits_inside(fit_range) {
Fits::Inside | Fits::PartlyGreater | Fits::PartlyLower | Fits::PartlyLowerAndGreater => Some(self),
_ => None,
}
}
}
impl<NTY> RangeOverlapInfo for MinMaxAvgDim1Bins<NTY> {
fn ends_before(&self, range: NanoRange) -> bool {
match self.ts2s.last() {
Some(&ts) => ts <= range.beg,
None => true,
}
}
fn ends_after(&self, range: NanoRange) -> bool {
match self.ts2s.last() {
Some(&ts) => ts > range.end,
None => panic!(),
}
}
fn starts_after(&self, range: NanoRange) -> bool {
match self.ts1s.first() {
Some(&ts) => ts >= range.end,
None => panic!(),
}
}
}
impl<NTY> TimeBins for MinMaxAvgDim1Bins<NTY>
where
NTY: NumOps,
{
fn ts1s(&self) -> &Vec<u64> {
&self.ts1s
}
fn ts2s(&self) -> &Vec<u64> {
&self.ts2s
}
}
impl<NTY> WithLen for MinMaxAvgDim1Bins<NTY> {
fn len(&self) -> usize {
self.ts1s.len()
}
}
impl<NTY> Appendable for MinMaxAvgDim1Bins<NTY>
where
NTY: NumOps,
{
fn empty() -> Self {
Self::empty()
}
fn append(&mut self, src: &Self) {
self.ts1s.extend_from_slice(&src.ts1s);
self.ts2s.extend_from_slice(&src.ts2s);
self.counts.extend_from_slice(&src.counts);
self.mins.extend_from_slice(&src.mins);
self.maxs.extend_from_slice(&src.maxs);
self.avgs.extend_from_slice(&src.avgs);
}
}
impl<NTY> ReadableFromFile for MinMaxAvgDim1Bins<NTY>
where
NTY: NumOps,
{
// TODO this function is not needed in the trait:
fn read_from_file(file: File) -> Result<ReadPbv<Self>, Error> {
Ok(ReadPbv::new(file))
}
fn from_buf(buf: &[u8]) -> Result<Self, Error> {
let dec = serde_cbor::from_slice(&buf)?;
Ok(dec)
}
}
impl<NTY> TimeBinnableType for MinMaxAvgDim1Bins<NTY>
where
NTY: NumOps,
{
type Output = MinMaxAvgDim1Bins<NTY>;
type Aggregator = MinMaxAvgDim1BinsAggregator<NTY>;
fn aggregator(range: NanoRange, x_bin_count: usize) -> Self::Aggregator {
Self::Aggregator::new(range, x_bin_count)
}
}
impl<NTY> ToJsonResult for Sitemty<MinMaxAvgDim1Bins<NTY>>
where
NTY: NumOps,
{
fn to_json_result(&self) -> Result<Box<dyn ToJsonBytes>, Error> {
Ok(Box::new(serde_json::Value::String(format!(
"MinMaxAvgDim1Bins/non-json-item"
))))
}
}
pub struct MinMaxAvgDim1BinsCollected<NTY> {
_m1: PhantomData<NTY>,
}
impl<NTY> MinMaxAvgDim1BinsCollected<NTY> {
pub fn new() -> Self {
Self { _m1: PhantomData }
}
}
#[derive(Serialize)]
pub struct MinMaxAvgDim1BinsCollectedResult<NTY> {
ts_bin_edges: Vec<IsoDateTime>,
counts: Vec<u64>,
mins: Vec<Option<Vec<NTY>>>,
maxs: Vec<Option<Vec<NTY>>>,
avgs: Vec<Option<Vec<f32>>>,
#[serde(skip_serializing_if = "crate::bool_is_false", rename = "finalisedRange")]
finalised_range: bool,
#[serde(skip_serializing_if = "Zero::is_zero", rename = "missingBins")]
missing_bins: u32,
#[serde(skip_serializing_if = "Option::is_none", rename = "continueAt")]
continue_at: Option<IsoDateTime>,
}
pub struct MinMaxAvgDim1BinsCollector<NTY> {
bin_count_exp: u32,
timed_out: bool,
range_complete: bool,
vals: MinMaxAvgDim1Bins<NTY>,
_m1: PhantomData<NTY>,
}
impl<NTY> MinMaxAvgDim1BinsCollector<NTY> {
pub fn new(bin_count_exp: u32) -> Self {
Self {
bin_count_exp,
timed_out: false,
range_complete: false,
vals: MinMaxAvgDim1Bins::<NTY>::empty(),
_m1: PhantomData,
}
}
}
impl<NTY> WithLen for MinMaxAvgDim1BinsCollector<NTY>
where
NTY: NumOps + Serialize,
{
fn len(&self) -> usize {
self.vals.ts1s.len()
}
}
impl<NTY> Collector for MinMaxAvgDim1BinsCollector<NTY>
where
NTY: NumOps + Serialize,
{
type Input = MinMaxAvgDim1Bins<NTY>;
type Output = MinMaxAvgDim1BinsCollectedResult<NTY>;
fn ingest(&mut self, src: &Self::Input) {
Appendable::append(&mut self.vals, src);
}
fn set_range_complete(&mut self) {
self.range_complete = true;
}
fn set_timed_out(&mut self) {
self.timed_out = true;
}
fn result(self) -> Result<Self::Output, Error> {
let bin_count = self.vals.ts1s.len() as u32;
let mut tsa: Vec<_> = self
.vals
.ts1s
.iter()
.map(|&k| IsoDateTime(Utc.timestamp_nanos(k as i64)))
.collect();
if let Some(&z) = self.vals.ts2s.last() {
tsa.push(IsoDateTime(Utc.timestamp_nanos(z as i64)));
}
let tsa = tsa;
let continue_at = if self.vals.ts1s.len() < self.bin_count_exp as usize {
match tsa.last() {
Some(k) => Some(k.clone()),
None => Err(Error::with_msg("partial_content but no bin in result"))?,
}
} else {
None
};
let ret = MinMaxAvgDim1BinsCollectedResult::<NTY> {
ts_bin_edges: tsa,
counts: self.vals.counts,
mins: self.vals.mins,
maxs: self.vals.maxs,
avgs: self.vals.avgs,
finalised_range: self.range_complete,
missing_bins: self.bin_count_exp - bin_count,
continue_at,
};
Ok(ret)
}
}
impl<NTY> Collectable for MinMaxAvgDim1Bins<NTY>
where
NTY: NumOps + Serialize,
{
type Collector = MinMaxAvgDim1BinsCollector<NTY>;
fn new_collector(bin_count_exp: u32) -> Self::Collector {
Self::Collector::new(bin_count_exp)
}
}
pub struct MinMaxAvgDim1BinsAggregator<NTY> {
range: NanoRange,
count: u64,
min: Option<Vec<NTY>>,
max: Option<Vec<NTY>>,
sumc: u64,
sum: Option<Vec<f32>>,
}
impl<NTY> MinMaxAvgDim1BinsAggregator<NTY> {
pub fn new(range: NanoRange, _x_bin_count: usize) -> Self {
Self {
range,
count: 0,
// TODO get rid of Option
min: err::todoval(),
max: None,
sumc: 0,
sum: None,
}
}
}
impl<NTY> TimeBinnableTypeAggregator for MinMaxAvgDim1BinsAggregator<NTY>
where
NTY: NumOps,
{
type Input = MinMaxAvgDim1Bins<NTY>;
type Output = MinMaxAvgDim1Bins<NTY>;
fn range(&self) -> &NanoRange {
&self.range
}
fn ingest(&mut self, item: &Self::Input) {
for i1 in 0..item.ts1s.len() {
if item.ts2s[i1] <= self.range.beg {
continue;
} else if item.ts1s[i1] >= self.range.end {
continue;
} else {
match self.min.as_mut() {
None => self.min = item.mins[i1].clone(),
Some(min) => match item.mins[i1].as_ref() {
None => {}
Some(v) => {
for (a, b) in min.iter_mut().zip(v.iter()) {
if *b < *a {
*a = *b;
}
}
}
},
};
match self.max.as_mut() {
None => self.max = item.maxs[i1].clone(),
Some(max) => match item.maxs[i1].as_ref() {
None => {}
Some(v) => {
for (a, b) in max.iter_mut().zip(v.iter()) {
if *b > *a {
*a = *b;
}
}
}
},
};
match self.sum.as_mut() {
None => {
self.sum = item.avgs[i1].clone();
}
Some(sum) => match item.avgs[i1].as_ref() {
None => {}
Some(v) => {
for (a, b) in sum.iter_mut().zip(v.iter()) {
if (*b).is_nan() {
} else {
*a += *b;
}
}
self.sumc += 1;
}
},
}
self.count += item.counts[i1];
}
}
}
fn result(self) -> Self::Output {
let avg = if self.sumc == 0 {
None
} else {
let avg = self
.sum
.as_ref()
.unwrap()
.iter()
.map(|k| k / self.sumc as f32)
.collect();
Some(avg)
};
Self::Output {
ts1s: vec![self.range.beg],
ts2s: vec![self.range.end],
counts: vec![self.count],
mins: vec![self.min],
maxs: vec![self.max],
avgs: vec![avg],
}
}
}
#[derive(Serialize)]
pub struct WaveEventsCollectedResult<NTY> {
#[serde(rename = "tsAnchor")]
ts_anchor_sec: u64,
#[serde(rename = "tsMs")]
ts_off_ms: Vec<u64>,
#[serde(rename = "tsNs")]
ts_off_ns: Vec<u64>,
values: Vec<Vec<NTY>>,
#[serde(skip_serializing_if = "crate::bool_is_false", rename = "finalisedRange")]
range_complete: bool,
#[serde(skip_serializing_if = "crate::bool_is_false", rename = "timedOut")]
timed_out: bool,
}
pub struct WaveEventsCollector<NTY> {
vals: WaveEvents<NTY>,
range_complete: bool,
timed_out: bool,
}
impl<NTY> WaveEventsCollector<NTY> {
pub fn new(_bin_count_exp: u32) -> Self {
info!("\n\nWaveEventsCollector\n\n");
Self {
vals: WaveEvents::empty(),
range_complete: false,
timed_out: false,
}
}
}
impl<NTY> WithLen for WaveEventsCollector<NTY> {
fn len(&self) -> usize {
self.vals.tss.len()
}
}
impl<NTY> Collector for WaveEventsCollector<NTY>
where
NTY: NumOps,
{
type Input = WaveEvents<NTY>;
type Output = WaveEventsCollectedResult<NTY>;
fn ingest(&mut self, src: &Self::Input) {
self.vals.append(src);
}
fn set_range_complete(&mut self) {
self.range_complete = true;
}
fn set_timed_out(&mut self) {
self.timed_out = true;
}
fn result(self) -> Result<Self::Output, Error> {
let tst = ts_offs_from_abs(&self.vals.tss);
let ret = Self::Output {
ts_anchor_sec: tst.0,
ts_off_ms: tst.1,
ts_off_ns: tst.2,
values: self.vals.vals,
range_complete: self.range_complete,
timed_out: self.timed_out,
};
Ok(ret)
}
}
impl<NTY> Collectable for WaveEvents<NTY>
where
NTY: NumOps,
{
type Collector = WaveEventsCollector<NTY>;
fn new_collector(bin_count_exp: u32) -> Self::Collector {
Self::Collector::new(bin_count_exp)
}
}

View File

@@ -0,0 +1,422 @@
use crate::numops::NumOps;
use crate::streams::{Collectable, Collector, ToJsonBytes, ToJsonResult};
use crate::{
ts_offs_from_abs, Appendable, FilterFittingInside, Fits, FitsInside, IsoDateTime, RangeOverlapInfo, ReadPbv,
ReadableFromFile, Sitemty, SitemtyFrameType, SubFrId, TimeBinnableType, TimeBinnableTypeAggregator, TimeBins,
WithLen,
};
use chrono::{TimeZone, Utc};
use err::Error;
use netpod::timeunits::SEC;
use netpod::NanoRange;
use num_traits::Zero;
use serde::{Deserialize, Serialize};
use std::fmt;
use std::marker::PhantomData;
use tokio::fs::File;
#[derive(Serialize, Deserialize)]
pub struct MinMaxAvgWaveBins<NTY> {
pub ts1s: Vec<u64>,
pub ts2s: Vec<u64>,
pub counts: Vec<u64>,
pub mins: Vec<Option<Vec<NTY>>>,
pub maxs: Vec<Option<Vec<NTY>>>,
pub avgs: Vec<Option<Vec<f32>>>,
}
impl<NTY> SitemtyFrameType for MinMaxAvgWaveBins<NTY>
where
NTY: SubFrId,
{
const FRAME_TYPE_ID: u32 = 0xa00 + NTY::SUB;
}
impl<NTY> fmt::Debug for MinMaxAvgWaveBins<NTY>
where
NTY: fmt::Debug,
{
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
write!(
fmt,
"MinMaxAvgWaveBins count {} ts1s {:?} ts2s {:?} counts {:?} mins {:?} maxs {:?} avgs {:?}",
self.ts1s.len(),
self.ts1s.iter().map(|k| k / SEC).collect::<Vec<_>>(),
self.ts2s.iter().map(|k| k / SEC).collect::<Vec<_>>(),
self.counts,
self.mins,
self.maxs,
self.avgs,
)
}
}
impl<NTY> MinMaxAvgWaveBins<NTY> {
pub fn empty() -> Self {
Self {
ts1s: vec![],
ts2s: vec![],
counts: vec![],
mins: vec![],
maxs: vec![],
avgs: vec![],
}
}
}
impl<NTY> FitsInside for MinMaxAvgWaveBins<NTY> {
fn fits_inside(&self, range: NanoRange) -> Fits {
if self.ts1s.is_empty() {
Fits::Empty
} else {
let t1 = *self.ts1s.first().unwrap();
let t2 = *self.ts2s.last().unwrap();
if t2 <= range.beg {
Fits::Lower
} else if t1 >= range.end {
Fits::Greater
} else if t1 < range.beg && t2 > range.end {
Fits::PartlyLowerAndGreater
} else if t1 < range.beg {
Fits::PartlyLower
} else if t2 > range.end {
Fits::PartlyGreater
} else {
Fits::Inside
}
}
}
}
impl<NTY> FilterFittingInside for MinMaxAvgWaveBins<NTY> {
fn filter_fitting_inside(self, fit_range: NanoRange) -> Option<Self> {
match self.fits_inside(fit_range) {
Fits::Inside | Fits::PartlyGreater | Fits::PartlyLower | Fits::PartlyLowerAndGreater => Some(self),
_ => None,
}
}
}
impl<NTY> RangeOverlapInfo for MinMaxAvgWaveBins<NTY> {
fn ends_before(&self, range: NanoRange) -> bool {
match self.ts2s.last() {
Some(&ts) => ts <= range.beg,
None => true,
}
}
fn ends_after(&self, range: NanoRange) -> bool {
match self.ts2s.last() {
Some(&ts) => ts > range.end,
None => panic!(),
}
}
fn starts_after(&self, range: NanoRange) -> bool {
match self.ts1s.first() {
Some(&ts) => ts >= range.end,
None => panic!(),
}
}
}
impl<NTY> TimeBins for MinMaxAvgWaveBins<NTY>
where
NTY: NumOps,
{
fn ts1s(&self) -> &Vec<u64> {
&self.ts1s
}
fn ts2s(&self) -> &Vec<u64> {
&self.ts2s
}
}
impl<NTY> WithLen for MinMaxAvgWaveBins<NTY> {
fn len(&self) -> usize {
self.ts1s.len()
}
}
impl<NTY> Appendable for MinMaxAvgWaveBins<NTY>
where
NTY: NumOps,
{
fn empty() -> Self {
Self::empty()
}
fn append(&mut self, src: &Self) {
self.ts1s.extend_from_slice(&src.ts1s);
self.ts2s.extend_from_slice(&src.ts2s);
self.counts.extend_from_slice(&src.counts);
self.mins.extend_from_slice(&src.mins);
self.maxs.extend_from_slice(&src.maxs);
self.avgs.extend_from_slice(&src.avgs);
}
}
impl<NTY> ReadableFromFile for MinMaxAvgWaveBins<NTY>
where
NTY: NumOps,
{
// TODO this function is not needed in the trait:
fn read_from_file(file: File) -> Result<ReadPbv<Self>, Error> {
Ok(ReadPbv::new(file))
}
fn from_buf(buf: &[u8]) -> Result<Self, Error> {
let dec = serde_cbor::from_slice(&buf)?;
Ok(dec)
}
}
impl<NTY> TimeBinnableType for MinMaxAvgWaveBins<NTY>
where
NTY: NumOps,
{
type Output = MinMaxAvgWaveBins<NTY>;
type Aggregator = MinMaxAvgWaveBinsAggregator<NTY>;
fn aggregator(range: NanoRange, x_bin_count: usize) -> Self::Aggregator {
Self::Aggregator::new(range, x_bin_count)
}
}
impl<NTY> ToJsonResult for Sitemty<MinMaxAvgWaveBins<NTY>>
where
NTY: NumOps,
{
fn to_json_result(&self) -> Result<Box<dyn ToJsonBytes>, Error> {
Ok(Box::new(serde_json::Value::String(format!(
"MinMaxAvgBins/non-json-item"
))))
}
}
pub struct MinMaxAvgWaveBinsCollected<NTY> {
_m1: PhantomData<NTY>,
}
impl<NTY> MinMaxAvgWaveBinsCollected<NTY> {
pub fn new() -> Self {
Self { _m1: PhantomData }
}
}
#[derive(Serialize)]
pub struct MinMaxAvgWaveBinsCollectedResult<NTY> {
#[serde(rename = "tsAnchor")]
ts_anchor_sec: u64,
#[serde(rename = "tsMs")]
ts_off_ms: Vec<u64>,
#[serde(rename = "tsNs")]
ts_off_ns: Vec<u64>,
counts: Vec<u64>,
mins: Vec<Option<Vec<NTY>>>,
maxs: Vec<Option<Vec<NTY>>>,
avgs: Vec<Option<Vec<f32>>>,
#[serde(skip_serializing_if = "crate::bool_is_false", rename = "finalisedRange")]
finalised_range: bool,
#[serde(skip_serializing_if = "Zero::is_zero", rename = "missingBins")]
missing_bins: u32,
#[serde(skip_serializing_if = "Option::is_none", rename = "continueAt")]
continue_at: Option<IsoDateTime>,
}
pub struct MinMaxAvgWaveBinsCollector<NTY> {
bin_count_exp: u32,
timed_out: bool,
range_complete: bool,
vals: MinMaxAvgWaveBins<NTY>,
_m1: PhantomData<NTY>,
}
impl<NTY> MinMaxAvgWaveBinsCollector<NTY> {
pub fn new(bin_count_exp: u32) -> Self {
Self {
bin_count_exp,
timed_out: false,
range_complete: false,
vals: MinMaxAvgWaveBins::<NTY>::empty(),
_m1: PhantomData,
}
}
}
impl<NTY> WithLen for MinMaxAvgWaveBinsCollector<NTY>
where
NTY: NumOps + Serialize,
{
fn len(&self) -> usize {
self.vals.ts1s.len()
}
}
impl<NTY> Collector for MinMaxAvgWaveBinsCollector<NTY>
where
NTY: NumOps + Serialize,
{
type Input = MinMaxAvgWaveBins<NTY>;
type Output = MinMaxAvgWaveBinsCollectedResult<NTY>;
fn ingest(&mut self, src: &Self::Input) {
Appendable::append(&mut self.vals, src);
}
fn set_range_complete(&mut self) {
self.range_complete = true;
}
fn set_timed_out(&mut self) {
self.timed_out = true;
}
fn result(self) -> Result<Self::Output, Error> {
let t_bin_count = self.vals.counts.len();
// TODO could save the copy:
let mut ts_all = self.vals.ts1s.clone();
if self.vals.ts2s.len() > 0 {
ts_all.push(*self.vals.ts2s.last().unwrap());
}
let continue_at = if self.vals.ts1s.len() < self.bin_count_exp as usize {
match ts_all.last() {
Some(&k) => {
let iso = IsoDateTime(Utc.timestamp_nanos(k as i64));
Some(iso)
}
None => Err(Error::with_msg("partial_content but no bin in result"))?,
}
} else {
None
};
let tst = ts_offs_from_abs(&ts_all);
let ret = MinMaxAvgWaveBinsCollectedResult {
ts_anchor_sec: tst.0,
ts_off_ms: tst.1,
ts_off_ns: tst.2,
counts: self.vals.counts,
mins: self.vals.mins,
maxs: self.vals.maxs,
avgs: self.vals.avgs,
finalised_range: self.range_complete,
missing_bins: self.bin_count_exp - t_bin_count as u32,
continue_at,
};
Ok(ret)
}
}
impl<NTY> Collectable for MinMaxAvgWaveBins<NTY>
where
NTY: NumOps + Serialize,
{
type Collector = MinMaxAvgWaveBinsCollector<NTY>;
fn new_collector(bin_count_exp: u32) -> Self::Collector {
Self::Collector::new(bin_count_exp)
}
}
pub struct MinMaxAvgWaveBinsAggregator<NTY> {
range: NanoRange,
count: u64,
min: Vec<NTY>,
max: Vec<NTY>,
sum: Vec<f32>,
sumc: u64,
}
impl<NTY> MinMaxAvgWaveBinsAggregator<NTY>
where
NTY: NumOps,
{
pub fn new(range: NanoRange, x_bin_count: usize) -> Self {
Self {
range,
count: 0,
min: vec![NTY::max_or_nan(); x_bin_count],
max: vec![NTY::min_or_nan(); x_bin_count],
sum: vec![0f32; x_bin_count],
sumc: 0,
}
}
}
impl<NTY> TimeBinnableTypeAggregator for MinMaxAvgWaveBinsAggregator<NTY>
where
NTY: NumOps,
{
type Input = MinMaxAvgWaveBins<NTY>;
type Output = MinMaxAvgWaveBins<NTY>;
fn range(&self) -> &NanoRange {
&self.range
}
fn ingest(&mut self, item: &Self::Input) {
for i1 in 0..item.ts1s.len() {
if item.ts2s[i1] <= self.range.beg {
continue;
} else if item.ts1s[i1] >= self.range.end {
continue;
} else {
// the input can contain bins where no events did fall into.
match &item.mins[i1] {
None => {}
Some(inp) => {
for (a, b) in self.min.iter_mut().zip(inp.iter()) {
if *b < *a || a.is_nan() {
*a = *b;
}
}
}
}
match &item.maxs[i1] {
None => {}
Some(inp) => {
for (a, b) in self.max.iter_mut().zip(inp.iter()) {
if *b > *a || a.is_nan() {
*a = *b;
}
}
}
}
match &item.avgs[i1] {
None => {}
Some(inp) => {
for (a, b) in self.sum.iter_mut().zip(inp.iter()) {
*a += *b;
}
}
}
self.sumc += 1;
self.count += item.counts[i1];
}
}
}
fn result(self) -> Self::Output {
if self.sumc == 0 {
Self::Output {
ts1s: vec![self.range.beg],
ts2s: vec![self.range.end],
counts: vec![self.count],
mins: vec![None],
maxs: vec![None],
avgs: vec![None],
}
} else {
let avg = self.sum.iter().map(|j| *j / self.sumc as f32).collect();
Self::Output {
ts1s: vec![self.range.beg],
ts2s: vec![self.range.end],
counts: vec![self.count],
mins: vec![Some(self.min)],
maxs: vec![Some(self.max)],
avgs: vec![Some(avg)],
}
}
}
}

116
items/src/numops.rs Normal file
View File

@@ -0,0 +1,116 @@
use crate::SubFrId;
use num_traits::{AsPrimitive, Bounded, Float, Zero};
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use std::cmp::Ordering;
use std::fmt::Debug;
use std::ops::Add;
#[derive(Copy, Clone, Debug, Serialize, Deserialize)]
pub struct BoolNum(pub u8);
impl BoolNum {
pub const MIN: Self = Self(0);
pub const MAX: Self = Self(1);
}
impl Add<BoolNum> for BoolNum {
type Output = BoolNum;
fn add(self, rhs: BoolNum) -> Self::Output {
Self(self.0 + rhs.0)
}
}
impl num_traits::Zero for BoolNum {
fn zero() -> Self {
Self(0)
}
fn is_zero(&self) -> bool {
self.0 == 0
}
}
impl num_traits::AsPrimitive<f32> for BoolNum {
fn as_(self) -> f32 {
self.0 as f32
}
}
impl num_traits::Bounded for BoolNum {
fn min_value() -> Self {
Self(0)
}
fn max_value() -> Self {
Self(1)
}
}
impl PartialEq for BoolNum {
fn eq(&self, other: &Self) -> bool {
PartialEq::eq(&self.0, &other.0)
}
}
impl PartialOrd for BoolNum {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
PartialOrd::partial_cmp(&self.0, &other.0)
}
}
pub trait NumOps:
Sized
+ Copy
+ Send
+ Unpin
+ Debug
+ Zero
+ AsPrimitive<f32>
+ Bounded
+ PartialOrd
+ SubFrId
+ Serialize
+ DeserializeOwned
{
fn min_or_nan() -> Self;
fn max_or_nan() -> Self;
fn is_nan(&self) -> bool;
}
macro_rules! impl_num_ops {
($ty:ident, $min_or_nan:ident, $max_or_nan:ident, $is_nan:ident) => {
impl NumOps for $ty {
fn min_or_nan() -> Self {
$ty::$min_or_nan
}
fn max_or_nan() -> Self {
$ty::$max_or_nan
}
fn is_nan(&self) -> bool {
$is_nan(self)
}
}
};
}
fn is_nan_int<T>(_x: &T) -> bool {
false
}
fn is_nan_float<T: Float>(x: &T) -> bool {
x.is_nan()
}
impl_num_ops!(u8, MIN, MAX, is_nan_int);
impl_num_ops!(u16, MIN, MAX, is_nan_int);
impl_num_ops!(u32, MIN, MAX, is_nan_int);
impl_num_ops!(u64, MIN, MAX, is_nan_int);
impl_num_ops!(i8, MIN, MAX, is_nan_int);
impl_num_ops!(i16, MIN, MAX, is_nan_int);
impl_num_ops!(i32, MIN, MAX, is_nan_int);
impl_num_ops!(i64, MIN, MAX, is_nan_int);
impl_num_ops!(f32, NAN, NAN, is_nan_float);
impl_num_ops!(f64, NAN, NAN, is_nan_float);
impl_num_ops!(BoolNum, MIN, MAX, is_nan_int);

47
items/src/streams.rs Normal file
View File

@@ -0,0 +1,47 @@
use crate::{RangeCompletableItem, Sitemty, StreamItem, WithLen};
use err::Error;
use serde::Serialize;
pub trait Collector: Send + Unpin + WithLen {
type Input: Collectable;
type Output: Serialize;
fn ingest(&mut self, src: &Self::Input);
fn set_range_complete(&mut self);
fn set_timed_out(&mut self);
fn result(self) -> Result<Self::Output, Error>;
}
pub trait Collectable {
type Collector: Collector<Input = Self>;
fn new_collector(bin_count_exp: u32) -> Self::Collector;
}
pub trait ToJsonBytes {
fn to_json_bytes(&self) -> Result<Vec<u8>, Error>;
}
pub trait ToJsonResult {
fn to_json_result(&self) -> Result<Box<dyn ToJsonBytes>, Error>;
}
impl ToJsonBytes for serde_json::Value {
fn to_json_bytes(&self) -> Result<Vec<u8>, Error> {
Ok(serde_json::to_vec(self)?)
}
}
impl ToJsonResult for Sitemty<serde_json::Value> {
fn to_json_result(&self) -> Result<Box<dyn ToJsonBytes>, Error> {
match self {
Ok(item) => match item {
StreamItem::DataItem(item) => match item {
RangeCompletableItem::Data(item) => Ok(Box::new(item.clone())),
RangeCompletableItem::RangeComplete => Err(Error::with_msg("RangeComplete")),
},
StreamItem::Log(item) => Err(Error::with_msg(format!("Log {:?}", item))),
StreamItem::Stats(item) => Err(Error::with_msg(format!("Stats {:?}", item))),
},
Err(e) => Err(Error::with_msg(format!("Error {:?}", e))),
}
}
}

426
items/src/waveevents.rs Normal file
View File

@@ -0,0 +1,426 @@
use crate::eventvalues::EventValues;
use crate::minmaxavgdim1bins::MinMaxAvgDim1Bins;
use crate::numops::NumOps;
use crate::xbinnedscalarevents::XBinnedScalarEvents;
use crate::xbinnedwaveevents::XBinnedWaveEvents;
use crate::{
Appendable, EventsNodeProcessor, FilterFittingInside, Fits, FitsInside, PushableIndex, RangeOverlapInfo, ReadPbv,
ReadableFromFile, SitemtyFrameType, SubFrId, TimeBinnableType, TimeBinnableTypeAggregator, WithLen, WithTimestamps,
};
use err::Error;
use netpod::log::*;
use netpod::{x_bin_count, AggKind, NanoRange, Shape};
use serde::{Deserialize, Serialize};
use std::marker::PhantomData;
use tokio::fs::File;
#[derive(Debug, Serialize, Deserialize)]
pub struct WaveEvents<NTY> {
pub tss: Vec<u64>,
pub vals: Vec<Vec<NTY>>,
}
impl<NTY> SitemtyFrameType for WaveEvents<NTY>
where
NTY: SubFrId,
{
const FRAME_TYPE_ID: u32 = 0x800 + NTY::SUB;
}
impl<NTY> WaveEvents<NTY> {
pub fn empty() -> Self {
Self {
tss: vec![],
vals: vec![],
}
}
}
impl<NTY> WithLen for WaveEvents<NTY> {
fn len(&self) -> usize {
self.tss.len()
}
}
impl<NTY> WithTimestamps for WaveEvents<NTY> {
fn ts(&self, ix: usize) -> u64 {
self.tss[ix]
}
}
impl<NTY> RangeOverlapInfo for WaveEvents<NTY> {
fn ends_before(&self, range: NanoRange) -> bool {
match self.tss.last() {
Some(&ts) => ts < range.beg,
None => true,
}
}
fn ends_after(&self, range: NanoRange) -> bool {
match self.tss.last() {
Some(&ts) => ts >= range.end,
None => panic!(),
}
}
fn starts_after(&self, range: NanoRange) -> bool {
match self.tss.first() {
Some(&ts) => ts >= range.end,
None => panic!(),
}
}
}
impl<NTY> FitsInside for WaveEvents<NTY> {
fn fits_inside(&self, range: NanoRange) -> Fits {
if self.tss.is_empty() {
Fits::Empty
} else {
let t1 = *self.tss.first().unwrap();
let t2 = *self.tss.last().unwrap();
if t2 < range.beg {
Fits::Lower
} else if t1 > range.end {
Fits::Greater
} else if t1 < range.beg && t2 > range.end {
Fits::PartlyLowerAndGreater
} else if t1 < range.beg {
Fits::PartlyLower
} else if t2 > range.end {
Fits::PartlyGreater
} else {
Fits::Inside
}
}
}
}
impl<NTY> FilterFittingInside for WaveEvents<NTY> {
fn filter_fitting_inside(self, fit_range: NanoRange) -> Option<Self> {
match self.fits_inside(fit_range) {
Fits::Inside | Fits::PartlyGreater | Fits::PartlyLower | Fits::PartlyLowerAndGreater => Some(self),
_ => None,
}
}
}
impl<NTY> PushableIndex for WaveEvents<NTY>
where
NTY: NumOps,
{
fn push_index(&mut self, src: &Self, ix: usize) {
self.tss.push(src.tss[ix]);
// TODO trait should allow to move from source.
self.vals.push(src.vals[ix].clone());
}
}
impl<NTY> Appendable for WaveEvents<NTY>
where
NTY: NumOps,
{
fn empty() -> Self {
Self::empty()
}
fn append(&mut self, src: &Self) {
self.tss.extend_from_slice(&src.tss);
self.vals.extend_from_slice(&src.vals);
}
}
impl<NTY> ReadableFromFile for WaveEvents<NTY>
where
NTY: NumOps,
{
fn read_from_file(_file: File) -> Result<ReadPbv<Self>, Error> {
// TODO refactor types such that this impl is not needed.
panic!()
}
fn from_buf(_buf: &[u8]) -> Result<Self, Error> {
panic!()
}
}
impl<NTY> TimeBinnableType for WaveEvents<NTY>
where
NTY: NumOps,
{
type Output = MinMaxAvgDim1Bins<NTY>;
type Aggregator = WaveEventsAggregator<NTY>;
fn aggregator(range: NanoRange, bin_count: usize) -> Self::Aggregator {
Self::Aggregator::new(range, bin_count)
}
}
pub struct WaveEventsAggregator<NTY>
where
NTY: NumOps,
{
range: NanoRange,
count: u64,
min: Option<Vec<NTY>>,
max: Option<Vec<NTY>>,
sumc: u64,
sum: Option<Vec<f32>>,
}
impl<NTY> WaveEventsAggregator<NTY>
where
NTY: NumOps,
{
pub fn new(range: NanoRange, _x_bin_count: usize) -> Self {
Self {
range,
count: 0,
// TODO create the right number of bins right here:
min: err::todoval(),
max: None,
sumc: 0,
sum: None,
}
}
}
impl<NTY> TimeBinnableTypeAggregator for WaveEventsAggregator<NTY>
where
NTY: NumOps,
{
type Input = WaveEvents<NTY>;
type Output = MinMaxAvgDim1Bins<NTY>;
fn range(&self) -> &NanoRange {
&self.range
}
fn ingest(&mut self, item: &Self::Input) {
for i1 in 0..item.tss.len() {
let ts = item.tss[i1];
if ts < self.range.beg {
continue;
} else if ts >= self.range.end {
continue;
} else {
match &mut self.min {
None => self.min = Some(item.vals[i1].clone()),
Some(min) => {
for (a, b) in min.iter_mut().zip(item.vals[i1].iter()) {
if b < a {
*a = *b;
}
}
}
};
match &mut self.max {
None => self.max = Some(item.vals[i1].clone()),
Some(max) => {
for (a, b) in max.iter_mut().zip(item.vals[i1].iter()) {
if b < a {
*a = *b;
}
}
}
};
match self.sum.as_mut() {
None => {
self.sum = Some(item.vals[i1].iter().map(|k| k.as_()).collect());
}
Some(sum) => {
for (a, b) in sum.iter_mut().zip(item.vals[i1].iter()) {
let vf = b.as_();
if vf.is_nan() {
} else {
*a += vf;
}
}
}
}
self.sumc += 1;
self.count += 1;
}
}
}
fn result(self) -> Self::Output {
let avg = if self.sumc == 0 {
None
} else {
let avg = self
.sum
.as_ref()
.unwrap()
.iter()
.map(|item| item / self.sumc as f32)
.collect();
Some(avg)
};
Self::Output {
ts1s: vec![self.range.beg],
ts2s: vec![self.range.end],
counts: vec![self.count],
mins: vec![self.min],
maxs: vec![self.max],
avgs: vec![avg],
}
}
}
pub struct WaveXBinner<NTY> {
_m1: PhantomData<NTY>,
}
impl<NTY> EventsNodeProcessor for WaveXBinner<NTY>
where
NTY: NumOps,
{
type Input = Vec<NTY>;
type Output = XBinnedScalarEvents<NTY>;
fn create(_shape: Shape, _agg_kind: AggKind) -> Self {
Self { _m1: PhantomData }
}
fn process(&self, inp: EventValues<Self::Input>) -> Self::Output {
let nev = inp.tss.len();
let mut ret = Self::Output {
tss: inp.tss,
mins: Vec::with_capacity(nev),
maxs: Vec::with_capacity(nev),
avgs: Vec::with_capacity(nev),
};
for i1 in 0..nev {
let mut min = NTY::max_or_nan();
let mut max = NTY::min_or_nan();
let mut sum = 0f32;
let mut sumc = 0;
let vals = &inp.values[i1];
for &v in vals {
if v < min || min.is_nan() {
min = v;
}
if v > max || max.is_nan() {
max = v;
}
let vf = v.as_();
if vf.is_nan() {
} else {
sum += vf;
sumc += 1;
}
}
ret.mins.push(min);
ret.maxs.push(max);
if sumc == 0 {
ret.avgs.push(f32::NAN);
} else {
ret.avgs.push(sum / sumc as f32);
}
}
ret
}
}
pub struct WaveNBinner<NTY> {
shape_bin_count: usize,
x_bin_count: usize,
_m1: PhantomData<NTY>,
}
impl<NTY> EventsNodeProcessor for WaveNBinner<NTY>
where
NTY: NumOps,
{
type Input = Vec<NTY>;
type Output = XBinnedWaveEvents<NTY>;
fn create(shape: Shape, agg_kind: AggKind) -> Self {
info!("WaveNBinner::create");
// TODO get rid of panic potential
let shape_bin_count = if let Shape::Wave(n) = shape { n } else { panic!() } as usize;
let x_bin_count = x_bin_count(&shape, &agg_kind);
info!("shape_bin_count {} x_bin_count {}", shape_bin_count, x_bin_count);
Self {
shape_bin_count,
x_bin_count,
_m1: PhantomData,
}
}
fn process(&self, inp: EventValues<Self::Input>) -> Self::Output {
let nev = inp.tss.len();
let mut ret = Self::Output {
// TODO get rid of this clone:
tss: inp.tss.clone(),
mins: Vec::with_capacity(nev),
maxs: Vec::with_capacity(nev),
avgs: Vec::with_capacity(nev),
};
for i1 in 0..nev {
let mut min = vec![NTY::max_or_nan(); self.x_bin_count];
let mut max = vec![NTY::min_or_nan(); self.x_bin_count];
let mut sum = vec![0f32; self.x_bin_count];
let mut sumc = vec![0u64; self.x_bin_count];
for (i2, &v) in inp.values[i1].iter().enumerate() {
let i3 = i2 * self.x_bin_count / self.shape_bin_count;
if v < min[i3] || min[i3].is_nan() {
min[i3] = v;
}
if v > max[i3] || max[i3].is_nan() {
max[i3] = v;
}
if v.is_nan() {
} else {
sum[i3] += v.as_();
sumc[i3] += 1;
}
}
// TODO
if false && inp.tss[0] < 1300 {
info!("WaveNBinner process push min {:?}", min);
}
ret.mins.push(min);
ret.maxs.push(max);
let avg = sum
.into_iter()
.zip(sumc.into_iter())
.map(|(j, k)| if k > 0 { j / k as f32 } else { f32::NAN })
.collect();
ret.avgs.push(avg);
}
ret
}
}
pub struct WavePlainProc<NTY> {
_m1: PhantomData<NTY>,
}
impl<NTY> EventsNodeProcessor for WavePlainProc<NTY>
where
NTY: NumOps,
{
type Input = Vec<NTY>;
type Output = WaveEvents<NTY>;
fn create(_shape: Shape, _agg_kind: AggKind) -> Self {
Self { _m1: PhantomData }
}
fn process(&self, inp: EventValues<Self::Input>) -> Self::Output {
if false {
let n = if inp.values.len() > 0 { inp.values[0].len() } else { 0 };
let n = if n > 5 { 5 } else { n };
WaveEvents {
tss: inp.tss,
vals: inp.values.iter().map(|k| k[..n].to_vec()).collect(),
}
} else {
WaveEvents {
tss: inp.tss,
vals: inp.values,
}
}
}
}

View File

@@ -0,0 +1,343 @@
use crate::minmaxavgbins::MinMaxAvgBins;
use crate::numops::NumOps;
use crate::streams::{Collectable, Collector};
use crate::{
ts_offs_from_abs, Appendable, FilterFittingInside, Fits, FitsInside, PushableIndex, RangeOverlapInfo, ReadPbv,
ReadableFromFile, SitemtyFrameType, SubFrId, TimeBinnableType, TimeBinnableTypeAggregator, WithLen, WithTimestamps,
};
use err::Error;
use netpod::NanoRange;
use serde::{Deserialize, Serialize};
use tokio::fs::File;
// TODO rename Scalar -> Dim0
#[derive(Debug, Serialize, Deserialize)]
pub struct XBinnedScalarEvents<NTY> {
pub tss: Vec<u64>,
pub mins: Vec<NTY>,
pub maxs: Vec<NTY>,
pub avgs: Vec<f32>,
}
impl<NTY> SitemtyFrameType for XBinnedScalarEvents<NTY>
where
NTY: SubFrId,
{
const FRAME_TYPE_ID: u32 = 0x600 + NTY::SUB;
}
impl<NTY> XBinnedScalarEvents<NTY> {
pub fn empty() -> Self {
Self {
tss: vec![],
mins: vec![],
maxs: vec![],
avgs: vec![],
}
}
}
impl<NTY> WithLen for XBinnedScalarEvents<NTY> {
fn len(&self) -> usize {
self.tss.len()
}
}
impl<NTY> WithTimestamps for XBinnedScalarEvents<NTY> {
fn ts(&self, ix: usize) -> u64 {
self.tss[ix]
}
}
impl<NTY> RangeOverlapInfo for XBinnedScalarEvents<NTY> {
fn ends_before(&self, range: NanoRange) -> bool {
match self.tss.last() {
Some(&ts) => ts < range.beg,
None => true,
}
}
fn ends_after(&self, range: NanoRange) -> bool {
match self.tss.last() {
Some(&ts) => ts >= range.end,
None => panic!(),
}
}
fn starts_after(&self, range: NanoRange) -> bool {
match self.tss.first() {
Some(&ts) => ts >= range.end,
None => panic!(),
}
}
}
impl<NTY> FitsInside for XBinnedScalarEvents<NTY> {
fn fits_inside(&self, range: NanoRange) -> Fits {
if self.tss.is_empty() {
Fits::Empty
} else {
let t1 = *self.tss.first().unwrap();
let t2 = *self.tss.last().unwrap();
if t2 < range.beg {
Fits::Lower
} else if t1 > range.end {
Fits::Greater
} else if t1 < range.beg && t2 > range.end {
Fits::PartlyLowerAndGreater
} else if t1 < range.beg {
Fits::PartlyLower
} else if t2 > range.end {
Fits::PartlyGreater
} else {
Fits::Inside
}
}
}
}
impl<NTY> FilterFittingInside for XBinnedScalarEvents<NTY> {
fn filter_fitting_inside(self, fit_range: NanoRange) -> Option<Self> {
match self.fits_inside(fit_range) {
Fits::Inside | Fits::PartlyGreater | Fits::PartlyLower | Fits::PartlyLowerAndGreater => Some(self),
_ => None,
}
}
}
impl<NTY> PushableIndex for XBinnedScalarEvents<NTY>
where
NTY: NumOps,
{
fn push_index(&mut self, src: &Self, ix: usize) {
self.tss.push(src.tss[ix]);
self.mins.push(src.mins[ix]);
self.maxs.push(src.maxs[ix]);
self.avgs.push(src.avgs[ix]);
}
}
impl<NTY> Appendable for XBinnedScalarEvents<NTY>
where
NTY: NumOps,
{
fn empty() -> Self {
Self::empty()
}
fn append(&mut self, src: &Self) {
self.tss.extend_from_slice(&src.tss);
self.mins.extend_from_slice(&src.mins);
self.maxs.extend_from_slice(&src.maxs);
self.avgs.extend_from_slice(&src.avgs);
}
}
impl<NTY> ReadableFromFile for XBinnedScalarEvents<NTY>
where
NTY: NumOps,
{
fn read_from_file(_file: File) -> Result<ReadPbv<Self>, Error> {
// TODO refactor types such that this impl is not needed.
panic!()
}
fn from_buf(_buf: &[u8]) -> Result<Self, Error> {
panic!()
}
}
impl<NTY> TimeBinnableType for XBinnedScalarEvents<NTY>
where
NTY: NumOps,
{
type Output = MinMaxAvgBins<NTY>;
type Aggregator = XBinnedScalarEventsAggregator<NTY>;
fn aggregator(range: NanoRange, _x_bin_count: usize) -> Self::Aggregator {
Self::Aggregator::new(range)
}
}
pub struct XBinnedScalarEventsAggregator<NTY>
where
NTY: NumOps,
{
range: NanoRange,
count: u64,
min: Option<NTY>,
max: Option<NTY>,
sumc: u64,
sum: f32,
}
impl<NTY> XBinnedScalarEventsAggregator<NTY>
where
NTY: NumOps,
{
pub fn new(range: NanoRange) -> Self {
Self {
range,
count: 0,
min: None,
max: None,
sumc: 0,
sum: 0f32,
}
}
}
impl<NTY> TimeBinnableTypeAggregator for XBinnedScalarEventsAggregator<NTY>
where
NTY: NumOps,
{
type Input = XBinnedScalarEvents<NTY>;
type Output = MinMaxAvgBins<NTY>;
fn range(&self) -> &NanoRange {
&self.range
}
fn ingest(&mut self, item: &Self::Input) {
for i1 in 0..item.tss.len() {
let ts = item.tss[i1];
if ts < self.range.beg {
continue;
} else if ts >= self.range.end {
continue;
} else {
self.min = match self.min {
None => Some(item.mins[i1]),
Some(min) => {
if item.mins[i1] < min {
Some(item.mins[i1])
} else {
Some(min)
}
}
};
self.max = match self.max {
None => Some(item.maxs[i1]),
Some(max) => {
if item.maxs[i1] > max {
Some(item.maxs[i1])
} else {
Some(max)
}
}
};
let x = item.avgs[i1];
if x.is_nan() {
} else {
self.sum += x;
self.sumc += 1;
}
self.count += 1;
}
}
}
fn result(self) -> Self::Output {
let avg = if self.sumc == 0 {
None
} else {
Some(self.sum / self.sumc as f32)
};
Self::Output {
ts1s: vec![self.range.beg],
ts2s: vec![self.range.end],
counts: vec![self.count],
mins: vec![self.min],
maxs: vec![self.max],
avgs: vec![avg],
}
}
}
#[derive(Serialize, Deserialize)]
pub struct XBinnedScalarEventsCollectedResult<NTY> {
#[serde(rename = "tsAnchor")]
ts_anchor_sec: u64,
#[serde(rename = "tsMs")]
ts_off_ms: Vec<u64>,
#[serde(rename = "tsNs")]
ts_off_ns: Vec<u64>,
mins: Vec<NTY>,
maxs: Vec<NTY>,
avgs: Vec<f32>,
#[serde(skip_serializing_if = "crate::bool_is_false", rename = "finalisedRange")]
finalised_range: bool,
#[serde(skip_serializing_if = "crate::bool_is_false", rename = "timedOut")]
timed_out: bool,
}
pub struct XBinnedScalarEventsCollector<NTY> {
vals: XBinnedScalarEvents<NTY>,
finalised_range: bool,
timed_out: bool,
#[allow(dead_code)]
bin_count_exp: u32,
}
impl<NTY> XBinnedScalarEventsCollector<NTY> {
pub fn new(bin_count_exp: u32) -> Self {
Self {
finalised_range: false,
timed_out: false,
vals: XBinnedScalarEvents::empty(),
bin_count_exp,
}
}
}
impl<NTY> WithLen for XBinnedScalarEventsCollector<NTY> {
fn len(&self) -> usize {
self.vals.tss.len()
}
}
impl<NTY> Collector for XBinnedScalarEventsCollector<NTY>
where
NTY: NumOps,
{
type Input = XBinnedScalarEvents<NTY>;
type Output = XBinnedScalarEventsCollectedResult<NTY>;
fn ingest(&mut self, src: &Self::Input) {
self.vals.append(src);
}
fn set_range_complete(&mut self) {
self.finalised_range = true;
}
fn set_timed_out(&mut self) {
self.timed_out = true;
}
fn result(self) -> Result<Self::Output, Error> {
let tst = ts_offs_from_abs(&self.vals.tss);
let ret = Self::Output {
ts_anchor_sec: tst.0,
ts_off_ms: tst.1,
ts_off_ns: tst.2,
mins: self.vals.mins,
maxs: self.vals.maxs,
avgs: self.vals.avgs,
finalised_range: self.finalised_range,
timed_out: self.timed_out,
};
Ok(ret)
}
}
impl<NTY> Collectable for XBinnedScalarEvents<NTY>
where
NTY: NumOps,
{
type Collector = XBinnedScalarEventsCollector<NTY>;
fn new_collector(bin_count_exp: u32) -> Self::Collector {
Self::Collector::new(bin_count_exp)
}
}

View File

@@ -0,0 +1,361 @@
use crate::minmaxavgwavebins::MinMaxAvgWaveBins;
use crate::numops::NumOps;
use crate::streams::{Collectable, Collector};
use crate::{
Appendable, FilterFittingInside, Fits, FitsInside, PushableIndex, RangeOverlapInfo, ReadPbv, ReadableFromFile,
SitemtyFrameType, SubFrId, TimeBinnableType, TimeBinnableTypeAggregator, WithLen, WithTimestamps,
};
use err::Error;
use netpod::log::*;
use netpod::timeunits::{MS, SEC};
use netpod::NanoRange;
use serde::{Deserialize, Serialize};
use tokio::fs::File;
// TODO rename Wave -> Dim1
#[derive(Debug, Serialize, Deserialize)]
pub struct XBinnedWaveEvents<NTY> {
pub tss: Vec<u64>,
pub mins: Vec<Vec<NTY>>,
pub maxs: Vec<Vec<NTY>>,
pub avgs: Vec<Vec<f32>>,
}
impl<NTY> SitemtyFrameType for XBinnedWaveEvents<NTY>
where
NTY: SubFrId,
{
const FRAME_TYPE_ID: u32 = 0x900 + NTY::SUB;
}
impl<NTY> XBinnedWaveEvents<NTY> {
pub fn empty() -> Self {
Self {
tss: vec![],
mins: vec![],
maxs: vec![],
avgs: vec![],
}
}
}
impl<NTY> WithLen for XBinnedWaveEvents<NTY> {
fn len(&self) -> usize {
self.tss.len()
}
}
impl<NTY> WithTimestamps for XBinnedWaveEvents<NTY> {
fn ts(&self, ix: usize) -> u64 {
self.tss[ix]
}
}
impl<NTY> RangeOverlapInfo for XBinnedWaveEvents<NTY> {
fn ends_before(&self, range: NanoRange) -> bool {
match self.tss.last() {
Some(&ts) => ts < range.beg,
None => true,
}
}
fn ends_after(&self, range: NanoRange) -> bool {
match self.tss.last() {
Some(&ts) => ts >= range.end,
None => panic!(),
}
}
fn starts_after(&self, range: NanoRange) -> bool {
match self.tss.first() {
Some(&ts) => ts >= range.end,
None => panic!(),
}
}
}
impl<NTY> FitsInside for XBinnedWaveEvents<NTY> {
fn fits_inside(&self, range: NanoRange) -> Fits {
if self.tss.is_empty() {
Fits::Empty
} else {
let t1 = *self.tss.first().unwrap();
let t2 = *self.tss.last().unwrap();
if t2 < range.beg {
Fits::Lower
} else if t1 > range.end {
Fits::Greater
} else if t1 < range.beg && t2 > range.end {
Fits::PartlyLowerAndGreater
} else if t1 < range.beg {
Fits::PartlyLower
} else if t2 > range.end {
Fits::PartlyGreater
} else {
Fits::Inside
}
}
}
}
impl<NTY> FilterFittingInside for XBinnedWaveEvents<NTY> {
fn filter_fitting_inside(self, fit_range: NanoRange) -> Option<Self> {
match self.fits_inside(fit_range) {
Fits::Inside | Fits::PartlyGreater | Fits::PartlyLower | Fits::PartlyLowerAndGreater => Some(self),
_ => None,
}
}
}
impl<NTY> PushableIndex for XBinnedWaveEvents<NTY>
where
NTY: NumOps,
{
fn push_index(&mut self, src: &Self, ix: usize) {
self.tss.push(src.tss[ix]);
// TODO not nice.
self.mins.push(src.mins[ix].clone());
self.maxs.push(src.maxs[ix].clone());
self.avgs.push(src.avgs[ix].clone());
}
}
impl<NTY> Appendable for XBinnedWaveEvents<NTY>
where
NTY: NumOps,
{
fn empty() -> Self {
Self::empty()
}
fn append(&mut self, src: &Self) {
self.tss.extend_from_slice(&src.tss);
self.mins.extend_from_slice(&src.mins);
self.maxs.extend_from_slice(&src.maxs);
self.avgs.extend_from_slice(&src.avgs);
}
}
impl<NTY> ReadableFromFile for XBinnedWaveEvents<NTY>
where
NTY: NumOps,
{
fn read_from_file(_file: File) -> Result<ReadPbv<Self>, Error> {
// TODO refactor types such that this impl is not needed.
panic!()
}
fn from_buf(_buf: &[u8]) -> Result<Self, Error> {
panic!()
}
}
impl<NTY> TimeBinnableType for XBinnedWaveEvents<NTY>
where
NTY: NumOps,
{
type Output = MinMaxAvgWaveBins<NTY>;
type Aggregator = XBinnedWaveEventsAggregator<NTY>;
fn aggregator(range: NanoRange, bin_count: usize) -> Self::Aggregator {
Self::Aggregator::new(range, bin_count)
}
}
pub struct XBinnedWaveEventsAggregator<NTY>
where
NTY: NumOps,
{
range: NanoRange,
count: u64,
min: Vec<NTY>,
max: Vec<NTY>,
sum: Vec<f32>,
sumc: u64,
}
impl<NTY> XBinnedWaveEventsAggregator<NTY>
where
NTY: NumOps,
{
pub fn new(range: NanoRange, bin_count: usize) -> Self {
if bin_count == 0 {
panic!("bin_count == 0");
}
Self {
range,
count: 0,
min: vec![NTY::max_or_nan(); bin_count],
max: vec![NTY::min_or_nan(); bin_count],
sum: vec![0f32; bin_count],
sumc: 0,
}
}
}
impl<NTY> TimeBinnableTypeAggregator for XBinnedWaveEventsAggregator<NTY>
where
NTY: NumOps,
{
type Input = XBinnedWaveEvents<NTY>;
type Output = MinMaxAvgWaveBins<NTY>;
fn range(&self) -> &NanoRange {
&self.range
}
fn ingest(&mut self, item: &Self::Input) {
//info!("XBinnedWaveEventsAggregator ingest item {:?}", item);
for i1 in 0..item.tss.len() {
let ts = item.tss[i1];
if ts < self.range.beg {
continue;
} else if ts >= self.range.end {
continue;
} else {
for (i2, &v) in item.mins[i1].iter().enumerate() {
if v < self.min[i2] || self.min[i2].is_nan() {
self.min[i2] = v;
}
}
for (i2, &v) in item.maxs[i1].iter().enumerate() {
if v > self.max[i2] || self.max[i2].is_nan() {
self.max[i2] = v;
}
}
for (i2, &v) in item.avgs[i1].iter().enumerate() {
if v.is_nan() {
} else {
self.sum[i2] += v;
}
}
self.sumc += 1;
self.count += 1;
}
}
}
fn result(self) -> Self::Output {
if self.sumc == 0 {
Self::Output {
ts1s: vec![self.range.beg],
ts2s: vec![self.range.end],
counts: vec![self.count],
mins: vec![None],
maxs: vec![None],
avgs: vec![None],
}
} else {
let avg = self.sum.iter().map(|k| *k / self.sumc as f32).collect();
let ret = Self::Output {
ts1s: vec![self.range.beg],
ts2s: vec![self.range.end],
counts: vec![self.count],
mins: vec![Some(self.min)],
maxs: vec![Some(self.max)],
avgs: vec![Some(avg)],
};
if ret.ts1s[0] < 1300 {
info!("XBinnedWaveEventsAggregator result {:?}", ret);
}
ret
}
}
}
#[derive(Serialize, Deserialize)]
pub struct XBinnedWaveEventsCollectedResult<NTY> {
#[serde(rename = "tsAnchor")]
ts_anchor_sec: u64,
#[serde(rename = "tsMs")]
ts_off_ms: Vec<u64>,
#[serde(rename = "tsNs")]
ts_off_ns: Vec<u64>,
mins: Vec<Vec<NTY>>,
maxs: Vec<Vec<NTY>>,
avgs: Vec<Vec<f32>>,
#[serde(skip_serializing_if = "crate::bool_is_false", rename = "finalisedRange")]
finalised_range: bool,
#[serde(skip_serializing_if = "crate::bool_is_false", rename = "timedOut")]
timed_out: bool,
}
pub struct XBinnedWaveEventsCollector<NTY> {
vals: XBinnedWaveEvents<NTY>,
finalised_range: bool,
timed_out: bool,
#[allow(dead_code)]
bin_count_exp: u32,
}
impl<NTY> XBinnedWaveEventsCollector<NTY> {
pub fn new(bin_count_exp: u32) -> Self {
Self {
finalised_range: false,
timed_out: false,
vals: XBinnedWaveEvents::empty(),
bin_count_exp,
}
}
}
impl<NTY> WithLen for XBinnedWaveEventsCollector<NTY> {
fn len(&self) -> usize {
self.vals.tss.len()
}
}
impl<NTY> Collector for XBinnedWaveEventsCollector<NTY>
where
NTY: NumOps,
{
type Input = XBinnedWaveEvents<NTY>;
type Output = XBinnedWaveEventsCollectedResult<NTY>;
fn ingest(&mut self, src: &Self::Input) {
self.vals.append(src);
}
fn set_range_complete(&mut self) {
self.finalised_range = true;
}
fn set_timed_out(&mut self) {
self.timed_out = true;
}
fn result(self) -> Result<Self::Output, Error> {
let ts_anchor_sec = self.vals.tss.first().map_or(0, |&k| k) / SEC;
let ts_anchor_ns = ts_anchor_sec * SEC;
let ts_off_ms: Vec<_> = self.vals.tss.iter().map(|&k| (k - ts_anchor_ns) / MS).collect();
let ts_off_ns = self
.vals
.tss
.iter()
.zip(ts_off_ms.iter().map(|&k| k * MS))
.map(|(&j, k)| (j - ts_anchor_ns - k))
.collect();
let ret = Self::Output {
finalised_range: self.finalised_range,
timed_out: self.timed_out,
ts_anchor_sec,
ts_off_ms,
ts_off_ns,
mins: self.vals.mins,
maxs: self.vals.maxs,
avgs: self.vals.avgs,
};
Ok(ret)
}
}
impl<NTY> Collectable for XBinnedWaveEvents<NTY>
where
NTY: NumOps,
{
type Collector = XBinnedWaveEventsCollector<NTY>;
fn new_collector(bin_count_exp: u32) -> Self::Collector {
Self::Collector::new(bin_count_exp)
}
}