/*! Aggregation and binning support. */ use super::eventchunker::EventFull; use crate::agg::eventbatch::MinMaxAvgScalarEventBatch; use err::Error; use futures_core::Stream; use futures_util::StreamExt; use netpod::NanoRange; use netpod::{Node, ScalarType}; use std::pin::Pin; use std::task::{Context, Poll}; #[allow(unused_imports)] use tracing::{debug, error, info, span, trace, warn, Level}; pub mod binnedt; pub mod binnedx; pub mod eventbatch; pub mod scalarbinbatch; pub trait AggregatorTdim { type InputValue; type OutputValue: AggregatableXdim1Bin + AggregatableTdim; fn ends_before(&self, inp: &Self::InputValue) -> bool; fn ends_after(&self, inp: &Self::InputValue) -> bool; fn starts_after(&self, inp: &Self::InputValue) -> bool; fn ingest(&mut self, inp: &Self::InputValue); fn result(self) -> Self::OutputValue; } pub trait AggregatableXdim1Bin { type Output: AggregatableXdim1Bin + AggregatableTdim; fn into_agg(self) -> Self::Output; } pub trait AggregatableTdim { type Output: AggregatableXdim1Bin + AggregatableTdim; type Aggregator: AggregatorTdim; fn aggregator_new_static(ts1: u64, ts2: u64) -> Self::Aggregator; } /// DO NOT USE. This is just a dummy for some testing. impl AggregatableXdim1Bin for () { type Output = (); fn into_agg(self) -> Self::Output { todo!() } } /// DO NOT USE. This is just a dummy for some testing. impl AggregatableTdim for () { type Output = (); type Aggregator = (); fn aggregator_new_static(_ts1: u64, _ts2: u64) -> Self::Aggregator { todo!() } } /// DO NOT USE. This is just a dummy for some testing. impl AggregatorTdim for () { type InputValue = (); type OutputValue = (); fn ends_before(&self, _inp: &Self::InputValue) -> bool { todo!() } fn ends_after(&self, _inp: &Self::InputValue) -> bool { todo!() } fn starts_after(&self, _inp: &Self::InputValue) -> bool { todo!() } fn ingest(&mut self, _v: &Self::InputValue) { todo!() } fn result(self) -> Self::OutputValue { todo!() } } /// Batch of events with a scalar (zero dimensions) numeric value. pub struct ValuesDim0 { tss: Vec, values: Vec>, } impl std::fmt::Debug for ValuesDim0 { fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result { write!( fmt, "count {} tsA {:?} tsB {:?}", self.tss.len(), self.tss.first(), self.tss.last() ) } } impl AggregatableXdim1Bin for ValuesDim1 { type Output = MinMaxAvgScalarEventBatch; fn into_agg(self) -> Self::Output { let mut ret = MinMaxAvgScalarEventBatch { tss: Vec::with_capacity(self.tss.len()), mins: Vec::with_capacity(self.tss.len()), maxs: Vec::with_capacity(self.tss.len()), avgs: Vec::with_capacity(self.tss.len()), }; for i1 in 0..self.tss.len() { let ts = self.tss[i1]; let mut min = f32::MAX; let mut max = f32::MIN; let mut sum = 0f32; let vals = &self.values[i1]; assert!(vals.len() > 0); for i2 in 0..vals.len() { let v = vals[i2]; //info!("value {} {} {}", i1, i2, v); min = min.min(v); max = max.max(v); sum += v; } if min == f32::MAX { min = f32::NAN; } if max == f32::MIN { max = f32::NAN; } ret.tss.push(ts); ret.mins.push(min); ret.maxs.push(max); ret.avgs.push(sum / vals.len() as f32); } ret } } /// Batch of events with a numeric one-dimensional (i.e. array) value. pub struct ValuesDim1 { pub tss: Vec, pub values: Vec>, } impl ValuesDim1 { pub fn empty() -> Self { Self { tss: vec![], values: vec![], } } } impl std::fmt::Debug for ValuesDim1 { fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result { write!( fmt, "count {} tsA {:?} tsB {:?}", self.tss.len(), self.tss.first(), self.tss.last() ) } } impl AggregatableXdim1Bin for ValuesDim0 { type Output = MinMaxAvgScalarEventBatch; fn into_agg(self) -> Self::Output { let mut ret = MinMaxAvgScalarEventBatch { tss: Vec::with_capacity(self.tss.len()), mins: Vec::with_capacity(self.tss.len()), maxs: Vec::with_capacity(self.tss.len()), avgs: Vec::with_capacity(self.tss.len()), }; for i1 in 0..self.tss.len() { let ts = self.tss[i1]; let mut min = f32::MAX; let mut max = f32::MIN; let mut sum = 0f32; let vals = &self.values[i1]; assert!(vals.len() > 0); for i2 in 0..vals.len() { let v = vals[i2]; //info!("value {} {} {}", i1, i2, v); min = min.min(v); max = max.max(v); sum += v; } if min == f32::MAX { min = f32::NAN; } if max == f32::MIN { max = f32::NAN; } ret.tss.push(ts); ret.mins.push(min); ret.maxs.push(max); ret.avgs.push(sum / vals.len() as f32); } ret } } pub enum Fits { Empty, Lower, Greater, Inside, PartlyLower, PartlyGreater, PartlyLowerAndGreater, } pub trait FitsInside { fn fits_inside(&self, range: NanoRange) -> Fits; } pub struct MinMaxAvgScalarBinSingle { ts1: u64, ts2: u64, count: u64, min: f32, max: f32, avg: f32, } impl std::fmt::Debug for MinMaxAvgScalarBinSingle { fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result { write!( fmt, "MinMaxAvgScalarBinSingle ts1 {} ts2 {} count {} min {:7.2e} max {:7.2e} avg {:7.2e}", self.ts1, self.ts2, self.count, self.min, self.max, self.avg ) } } impl AggregatableTdim for MinMaxAvgScalarBinSingle { type Output = MinMaxAvgScalarBinSingle; type Aggregator = MinMaxAvgScalarBinSingleAggregator; fn aggregator_new_static(_ts1: u64, _ts2: u64) -> Self::Aggregator { todo!() } } impl AggregatableXdim1Bin for MinMaxAvgScalarBinSingle { type Output = MinMaxAvgScalarBinSingle; fn into_agg(self) -> Self::Output { self } } pub struct MinMaxAvgScalarBinSingleAggregator {} impl AggregatorTdim for MinMaxAvgScalarBinSingleAggregator { type InputValue = MinMaxAvgScalarBinSingle; type OutputValue = MinMaxAvgScalarBinSingle; fn ends_before(&self, _inp: &Self::InputValue) -> bool { todo!() } fn ends_after(&self, _inp: &Self::InputValue) -> bool { todo!() } fn starts_after(&self, _inp: &Self::InputValue) -> bool { todo!() } fn ingest(&mut self, _v: &Self::InputValue) { todo!() } fn result(self) -> Self::OutputValue { todo!() } } pub struct Dim0F32Stream where S: Stream>, { inp: S, } impl Stream for Dim0F32Stream where S: Stream> + Unpin, { type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; match self.inp.poll_next_unpin(cx) { Ready(Some(Ok(k))) => { let mut ret = ValuesDim1 { tss: vec![], values: vec![], }; use ScalarType::*; for i1 in 0..k.tss.len() { // TODO iterate sibling arrays after single bounds check let ty = &k.scalar_types[i1]; let decomp = k.decomps[i1].as_ref().unwrap(); match ty { F64 => { const BY: usize = 8; // do the conversion // TODO only a scalar! err::todoval::(); let n1 = decomp.len(); assert!(n1 % ty.bytes() as usize == 0); let ele_count = n1 / ty.bytes() as usize; let mut j = Vec::with_capacity(ele_count); // this is safe for ints and floats unsafe { j.set_len(ele_count); } let mut p1 = 0; for i1 in 0..ele_count { let u = unsafe { let mut r = [0u8; BY]; std::ptr::copy_nonoverlapping(&decomp[p1], r.as_mut_ptr(), BY); f64::from_be_bytes(r) //f64::from_be_bytes(std::mem::transmute::<_, [u8; 8]>(&decomp[p1])) }; j[i1] = u as f32; p1 += BY; } ret.tss.push(k.tss[i1]); ret.values.push(j); } _ => err::todoval(), } } Ready(Some(Ok(err::todoval()))) } Ready(Some(Err(e))) => Ready(Some(Err(e))), Ready(None) => Ready(None), Pending => Pending, } } } pub trait IntoDim0F32Stream { fn into_dim_0_f32_stream(self) -> Dim0F32Stream where Self: Stream> + Sized; } impl IntoDim0F32Stream for T where T: Stream>, { fn into_dim_0_f32_stream(self) -> Dim0F32Stream { Dim0F32Stream { inp: self } } } pub struct Dim1F32Stream where S: Stream>, { inp: S, } impl Stream for Dim1F32Stream where S: Stream> + Unpin, { type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; match self.inp.poll_next_unpin(cx) { Ready(Some(Ok(k))) => { let mut ret = ValuesDim1 { tss: vec![], values: vec![], }; use ScalarType::*; for i1 in 0..k.tss.len() { // TODO iterate sibling arrays after single bounds check let ty = &k.scalar_types[i1]; let decomp = k.decomps[i1].as_ref().unwrap(); match ty { F64 => { const BY: usize = 8; // do the conversion let n1 = decomp.len(); assert!(n1 % ty.bytes() as usize == 0); let ele_count = n1 / ty.bytes() as usize; let mut j = Vec::with_capacity(ele_count); // this is safe for ints and floats unsafe { j.set_len(ele_count); } let mut p1 = 0; for i1 in 0..ele_count { let u = unsafe { let mut r = [0u8; BY]; std::ptr::copy_nonoverlapping(&decomp[p1], r.as_mut_ptr(), BY); f64::from_be_bytes(r) //f64::from_be_bytes(std::mem::transmute::<_, [u8; 8]>(&decomp[p1])) }; j[i1] = u as f32; p1 += BY; } ret.tss.push(k.tss[i1]); ret.values.push(j); } _ => todo!(), } } Ready(Some(Ok(ret))) } Ready(Some(Err(e))) => Ready(Some(Err(e))), Ready(None) => Ready(None), Pending => Pending, } } } pub trait IntoDim1F32Stream { fn into_dim_1_f32_stream(self) -> Dim1F32Stream where Self: Stream> + Sized; } impl IntoDim1F32Stream for T where T: Stream>, { fn into_dim_1_f32_stream(self) -> Dim1F32Stream { Dim1F32Stream { inp: self } } } pub fn make_test_node(id: u32) -> Node { Node { id: format!("{:02}", id), host: "localhost".into(), listen: "0.0.0.0".into(), port: 8800 + id as u16, port_raw: 8800 + id as u16 + 100, data_base_path: format!("../tmpdata/node{:02}", id).into(), split: id, ksprefix: "ks".into(), } }