First stats on console

This commit is contained in:
Dominik Werder
2021-05-01 18:38:21 +02:00
parent 0eb6b3364d
commit a47ffc843d
8 changed files with 99 additions and 45 deletions
+17 -7
View File
@@ -9,6 +9,7 @@ use futures_core::Stream;
use futures_util::StreamExt; use futures_util::StreamExt;
use netpod::{EventDataReadStats, NanoRange}; use netpod::{EventDataReadStats, NanoRange};
use netpod::{Node, ScalarType}; use netpod::{Node, ScalarType};
use serde::{Deserialize, Serialize};
use std::pin::Pin; use std::pin::Pin;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
@@ -26,7 +27,7 @@ pub trait AggregatorTdim {
fn ends_before(&self, inp: &Self::InputValue) -> bool; fn ends_before(&self, inp: &Self::InputValue) -> bool;
fn ends_after(&self, inp: &Self::InputValue) -> bool; fn ends_after(&self, inp: &Self::InputValue) -> bool;
fn starts_after(&self, inp: &Self::InputValue) -> bool; fn starts_after(&self, inp: &Self::InputValue) -> bool;
fn ingest(&mut self, inp: &Self::InputValue); fn ingest(&mut self, inp: &mut Self::InputValue);
fn result(self) -> Self::OutputValue; fn result(self) -> Self::OutputValue;
} }
@@ -62,13 +63,17 @@ impl std::fmt::Debug for ValuesDim0 {
impl AggregatableXdim1Bin for ValuesDim1 { impl AggregatableXdim1Bin for ValuesDim1 {
type Output = MinMaxAvgScalarEventBatch; type Output = MinMaxAvgScalarEventBatch;
fn into_agg(self) -> Self::Output { fn into_agg(mut self) -> Self::Output {
let mut ret = MinMaxAvgScalarEventBatch { let mut ret = MinMaxAvgScalarEventBatch {
tss: Vec::with_capacity(self.tss.len()), tss: Vec::with_capacity(self.tss.len()),
mins: Vec::with_capacity(self.tss.len()), mins: Vec::with_capacity(self.tss.len()),
maxs: Vec::with_capacity(self.tss.len()), maxs: Vec::with_capacity(self.tss.len()),
avgs: Vec::with_capacity(self.tss.len()), avgs: Vec::with_capacity(self.tss.len()),
event_data_read_stats: EventDataReadStats::new(),
values_extract_stats: ValuesExtractStats::new(),
}; };
ret.event_data_read_stats.trans(&mut self.event_data_read_stats);
ret.values_extract_stats.trans(&mut self.values_extract_stats);
for i1 in 0..self.tss.len() { for i1 in 0..self.tss.len() {
let ts = self.tss[i1]; let ts = self.tss[i1];
let mut min = f32::MAX; let mut min = f32::MAX;
@@ -98,11 +103,12 @@ impl AggregatableXdim1Bin for ValuesDim1 {
} }
} }
pub struct ValuesDim1ExtractStats { #[derive(Debug, Serialize, Deserialize)]
pub struct ValuesExtractStats {
pub dur: Duration, pub dur: Duration,
} }
impl ValuesDim1ExtractStats { impl ValuesExtractStats {
pub fn new() -> Self { pub fn new() -> Self {
Self { Self {
dur: Duration::default(), dur: Duration::default(),
@@ -119,7 +125,7 @@ pub struct ValuesDim1 {
pub tss: Vec<u64>, pub tss: Vec<u64>,
pub values: Vec<Vec<f32>>, pub values: Vec<Vec<f32>>,
pub event_data_read_stats: EventDataReadStats, pub event_data_read_stats: EventDataReadStats,
pub values_dim_1_extract_stats: ValuesDim1ExtractStats, pub values_extract_stats: ValuesExtractStats,
} }
impl ValuesDim1 { impl ValuesDim1 {
@@ -128,7 +134,7 @@ impl ValuesDim1 {
tss: vec![], tss: vec![],
values: vec![], values: vec![],
event_data_read_stats: EventDataReadStats::new(), event_data_read_stats: EventDataReadStats::new(),
values_dim_1_extract_stats: ValuesDim1ExtractStats::new(), values_extract_stats: ValuesExtractStats::new(),
} }
} }
} }
@@ -154,7 +160,11 @@ impl AggregatableXdim1Bin for ValuesDim0 {
mins: Vec::with_capacity(self.tss.len()), mins: Vec::with_capacity(self.tss.len()),
maxs: Vec::with_capacity(self.tss.len()), maxs: Vec::with_capacity(self.tss.len()),
avgs: Vec::with_capacity(self.tss.len()), avgs: Vec::with_capacity(self.tss.len()),
event_data_read_stats: EventDataReadStats::new(),
values_extract_stats: ValuesExtractStats::new(),
}; };
// TODO stats are not yet in ValuesDim0
err::todoval::<u32>();
for i1 in 0..self.tss.len() { for i1 in 0..self.tss.len() {
let ts = self.tss[i1]; let ts = self.tss[i1];
let mut min = f32::MAX; let mut min = f32::MAX;
@@ -412,7 +422,7 @@ where
let inst2 = Instant::now(); let inst2 = Instant::now();
let mut k = k; let mut k = k;
ret.event_data_read_stats.trans(&mut k.event_data_read_stats); ret.event_data_read_stats.trans(&mut k.event_data_read_stats);
ret.values_dim_1_extract_stats.dur += inst2.duration_since(inst1); ret.values_extract_stats.dur += inst2.duration_since(inst1);
Ready(Some(Ok(ret))) Ready(Some(Ok(ret)))
} }
Ready(Some(Err(e))) => { Ready(Some(Err(e))) => {
+2 -1
View File
@@ -105,7 +105,8 @@ where
Ready(Some(Ok(ret))) Ready(Some(Ok(ret)))
} else { } else {
//info!("INGEST"); //info!("INGEST");
ag.ingest(&k); let mut k = k;
ag.ingest(&mut k);
// if this input contains also data after the current bin, then I need to keep // if this input contains also data after the current bin, then I need to keep
// it for the next round. // it for the next round.
if ag.ends_after(&k) { if ag.ends_after(&k) {
+29 -12
View File
@@ -1,8 +1,9 @@
use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch; use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch;
use crate::agg::{AggregatableTdim, AggregatableXdim1Bin, AggregatorTdim}; use crate::agg::{AggregatableTdim, AggregatableXdim1Bin, AggregatorTdim, ValuesExtractStats};
use bytes::{BufMut, Bytes, BytesMut}; use bytes::{BufMut, Bytes, BytesMut};
use netpod::log::*; use netpod::log::*;
use netpod::timeunits::SEC; use netpod::timeunits::SEC;
use netpod::EventDataReadStats;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::mem::size_of; use std::mem::size_of;
@@ -12,6 +13,8 @@ pub struct MinMaxAvgScalarEventBatch {
pub mins: Vec<f32>, pub mins: Vec<f32>,
pub maxs: Vec<f32>, pub maxs: Vec<f32>,
pub avgs: Vec<f32>, pub avgs: Vec<f32>,
pub event_data_read_stats: EventDataReadStats,
pub values_extract_stats: ValuesExtractStats,
} }
impl MinMaxAvgScalarEventBatch { impl MinMaxAvgScalarEventBatch {
@@ -21,6 +24,8 @@ impl MinMaxAvgScalarEventBatch {
mins: vec![], mins: vec![],
maxs: vec![], maxs: vec![],
avgs: vec![], avgs: vec![],
event_data_read_stats: EventDataReadStats::new(),
values_extract_stats: ValuesExtractStats::new(),
} }
} }
#[allow(dead_code)] #[allow(dead_code)]
@@ -85,12 +90,14 @@ impl std::fmt::Debug for MinMaxAvgScalarEventBatch {
fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result { fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
write!( write!(
fmt, fmt,
"MinMaxAvgScalarEventBatch count {} tss {:?} mins {:?} maxs {:?} avgs {:?}", "MinMaxAvgScalarEventBatch count {} tss {:?} mins {:?} maxs {:?} avgs {:?} EDS {:?} VXS {:?}",
self.tss.len(), self.tss.len(),
self.tss, self.tss,
self.mins, self.mins,
self.maxs, self.maxs,
self.avgs, self.avgs,
self.event_data_read_stats,
self.values_extract_stats,
) )
} }
} }
@@ -142,6 +149,8 @@ pub struct MinMaxAvgScalarEventBatchAggregator {
min: f32, min: f32,
max: f32, max: f32,
sum: f32, sum: f32,
event_data_read_stats: EventDataReadStats,
values_extract_stats: ValuesExtractStats,
} }
impl MinMaxAvgScalarEventBatchAggregator { impl MinMaxAvgScalarEventBatchAggregator {
@@ -153,6 +162,8 @@ impl MinMaxAvgScalarEventBatchAggregator {
max: f32::MIN, max: f32::MIN,
sum: 0f32, sum: 0f32,
count: 0, count: 0,
event_data_read_stats: EventDataReadStats::new(),
values_extract_stats: ValuesExtractStats::new(),
} }
} }
} }
@@ -182,15 +193,19 @@ impl AggregatorTdim for MinMaxAvgScalarEventBatchAggregator {
} }
} }
fn ingest(&mut self, v: &Self::InputValue) { fn ingest(&mut self, v: &mut Self::InputValue) {
trace!( if false {
"ingest {} {} {} {:?} {:?}", trace!(
self.ends_before(v), "ingest {} {} {} {:?} {:?}",
self.ends_after(v), self.ends_before(v),
self.starts_after(v), self.ends_after(v),
v.tss.first().map(|k| k / SEC), self.starts_after(v),
v.tss.last().map(|k| k / SEC), v.tss.first().map(|k| k / SEC),
); v.tss.last().map(|k| k / SEC),
);
}
self.event_data_read_stats.trans(&mut v.event_data_read_stats);
self.values_extract_stats.trans(&mut v.values_extract_stats);
for i1 in 0..v.tss.len() { for i1 in 0..v.tss.len() {
let ts = v.tss[i1]; let ts = v.tss[i1];
if ts < self.ts1 { if ts < self.ts1 {
@@ -227,7 +242,7 @@ impl AggregatorTdim for MinMaxAvgScalarEventBatchAggregator {
} }
} }
fn result(self) -> Self::OutputValue { fn result(mut self) -> Self::OutputValue {
let min = if self.min == f32::MAX { f32::NAN } else { self.min }; let min = if self.min == f32::MAX { f32::NAN } else { self.min };
let max = if self.max == f32::MIN { f32::NAN } else { self.max }; let max = if self.max == f32::MIN { f32::NAN } else { self.max };
let avg = if self.count == 0 { let avg = if self.count == 0 {
@@ -242,6 +257,8 @@ impl AggregatorTdim for MinMaxAvgScalarEventBatchAggregator {
mins: vec![min], mins: vec![min],
maxs: vec![max], maxs: vec![max],
avgs: vec![avg], avgs: vec![avg],
event_data_read_stats: std::mem::replace(&mut self.event_data_read_stats, EventDataReadStats::new()),
values_extract_stats: std::mem::replace(&mut self.values_extract_stats, ValuesExtractStats::new()),
} }
} }
} }
+20 -6
View File
@@ -1,8 +1,8 @@
use crate::agg::{AggregatableTdim, AggregatableXdim1Bin, AggregatorTdim, Fits, FitsInside}; use crate::agg::{AggregatableTdim, AggregatableXdim1Bin, AggregatorTdim, Fits, FitsInside, ValuesExtractStats};
use bytes::{BufMut, Bytes, BytesMut}; use bytes::{BufMut, Bytes, BytesMut};
use netpod::log::*; use netpod::log::*;
use netpod::timeunits::SEC; use netpod::timeunits::SEC;
use netpod::NanoRange; use netpod::{EventDataReadStats, NanoRange};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::mem::size_of; use std::mem::size_of;
@@ -15,6 +15,8 @@ pub struct MinMaxAvgScalarBinBatch {
pub mins: Vec<f32>, pub mins: Vec<f32>,
pub maxs: Vec<f32>, pub maxs: Vec<f32>,
pub avgs: Vec<f32>, pub avgs: Vec<f32>,
pub event_data_read_stats: EventDataReadStats,
pub values_extract_stats: ValuesExtractStats,
} }
impl MinMaxAvgScalarBinBatch { impl MinMaxAvgScalarBinBatch {
@@ -26,6 +28,8 @@ impl MinMaxAvgScalarBinBatch {
mins: vec![], mins: vec![],
maxs: vec![], maxs: vec![],
avgs: vec![], avgs: vec![],
event_data_read_stats: EventDataReadStats::new(),
values_extract_stats: ValuesExtractStats::new(),
} }
} }
@@ -168,12 +172,14 @@ impl std::fmt::Debug for MinMaxAvgScalarBinBatch {
fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result { fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
write!( write!(
fmt, fmt,
"MinMaxAvgScalarBinBatch count {} ts1s {:?} ts2s {:?} counts {:?} avgs {:?}", "MinMaxAvgScalarBinBatch count {} ts1s {:?} ts2s {:?} counts {:?} avgs {:?} EDS {:?} VXS {:?}",
self.ts1s.len(), self.ts1s.len(),
self.ts1s.iter().map(|k| k / SEC).collect::<Vec<_>>(), self.ts1s.iter().map(|k| k / SEC).collect::<Vec<_>>(),
self.ts2s.iter().map(|k| k / SEC).collect::<Vec<_>>(), self.ts2s.iter().map(|k| k / SEC).collect::<Vec<_>>(),
self.counts, self.counts,
self.avgs self.avgs,
self.event_data_read_stats,
self.values_extract_stats,
) )
} }
} }
@@ -201,6 +207,8 @@ pub struct MinMaxAvgScalarBinBatchAggregator {
max: f32, max: f32,
sum: f32, sum: f32,
sumc: u64, sumc: u64,
event_data_read_stats: EventDataReadStats,
values_extract_stats: ValuesExtractStats,
} }
impl MinMaxAvgScalarBinBatchAggregator { impl MinMaxAvgScalarBinBatchAggregator {
@@ -213,6 +221,8 @@ impl MinMaxAvgScalarBinBatchAggregator {
max: f32::MIN, max: f32::MIN,
sum: 0f32, sum: 0f32,
sumc: 0, sumc: 0,
event_data_read_stats: EventDataReadStats::new(),
values_extract_stats: ValuesExtractStats::new(),
} }
} }
} }
@@ -242,7 +252,9 @@ impl AggregatorTdim for MinMaxAvgScalarBinBatchAggregator {
} }
} }
fn ingest(&mut self, v: &Self::InputValue) { fn ingest(&mut self, v: &mut Self::InputValue) {
self.event_data_read_stats.trans(&mut v.event_data_read_stats);
self.values_extract_stats.trans(&mut v.values_extract_stats);
for i1 in 0..v.ts1s.len() { for i1 in 0..v.ts1s.len() {
let ts1 = v.ts1s[i1]; let ts1 = v.ts1s[i1];
let ts2 = v.ts2s[i1]; let ts2 = v.ts2s[i1];
@@ -260,7 +272,7 @@ impl AggregatorTdim for MinMaxAvgScalarBinBatchAggregator {
} }
} }
fn result(self) -> Self::OutputValue { fn result(mut self) -> Self::OutputValue {
let min = if self.min == f32::MAX { f32::NAN } else { self.min }; let min = if self.min == f32::MAX { f32::NAN } else { self.min };
let max = if self.max == f32::MIN { f32::NAN } else { self.max }; let max = if self.max == f32::MIN { f32::NAN } else { self.max };
let avg = if self.sumc == 0 { let avg = if self.sumc == 0 {
@@ -275,6 +287,8 @@ impl AggregatorTdim for MinMaxAvgScalarBinBatchAggregator {
mins: vec![min], mins: vec![min],
maxs: vec![max], maxs: vec![max],
avgs: vec![avg], avgs: vec![avg],
event_data_read_stats: std::mem::replace(&mut self.event_data_read_stats, EventDataReadStats::new()),
values_extract_stats: std::mem::replace(&mut self.values_extract_stats, ValuesExtractStats::new()),
} }
} }
} }
+4 -1
View File
@@ -90,7 +90,10 @@ impl Stream for PreBinnedValueFetchedStream {
Ready(Some(Ok(frame))) => match decode_frame::<PreBinnedFrame>(&frame) { Ready(Some(Ok(frame))) => match decode_frame::<PreBinnedFrame>(&frame) {
Ok(item) => match item.0 { Ok(item) => match item.0 {
Ok(item) => Ready(Some(PreBinnedFrame(Ok(item)))), Ok(item) => Ready(Some(PreBinnedFrame(Ok(item)))),
Err(e) => Ready(Some(PreBinnedFrame(Err(e)))), Err(e) => {
self.errored = true;
Ready(Some(PreBinnedFrame(Err(e))))
}
}, },
Err(e) => { Err(e) => {
self.errored = true; self.errored = true;
+16 -9
View File
@@ -146,7 +146,8 @@ where
inps: Vec<S>, inps: Vec<S>,
current: Vec<MergedMinMaxAvgScalarStreamCurVal>, current: Vec<MergedMinMaxAvgScalarStreamCurVal>,
ixs: Vec<usize>, ixs: Vec<usize>,
emitted_complete: bool, errored: bool,
completed: bool,
batch: MinMaxAvgScalarEventBatch, batch: MinMaxAvgScalarEventBatch,
ts_last_emit: u64, ts_last_emit: u64,
} }
@@ -165,7 +166,8 @@ where
inps, inps,
current: current, current: current,
ixs: vec![0; n], ixs: vec![0; n],
emitted_complete: false, errored: false,
completed: false,
batch: MinMaxAvgScalarEventBatch::empty(), batch: MinMaxAvgScalarEventBatch::empty(),
ts_last_emit: 0, ts_last_emit: 0,
} }
@@ -181,22 +183,27 @@ where
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*; use Poll::*;
'outer: loop { 'outer: loop {
if self.emitted_complete { if self.completed {
break Ready(Some(Err(Error::with_msg( panic!("MergedMinMaxAvgScalarStream poll_next on completed");
"MergedMinMaxAvgScalarStream poll on complete stream", }
)))); if self.errored {
self.completed = true;
return Ready(None);
} }
// can only run logic if all streams are either finished, errored or have some current value. // can only run logic if all streams are either finished, errored or have some current value.
for i1 in 0..self.inps.len() { for i1 in 0..self.inps.len() {
match self.current[i1] { match self.current[i1] {
MergedMinMaxAvgScalarStreamCurVal::None => { MergedMinMaxAvgScalarStreamCurVal::None => {
match self.inps[i1].poll_next_unpin(cx) { match self.inps[i1].poll_next_unpin(cx) {
Ready(Some(Ok(k))) => { Ready(Some(Ok(mut k))) => {
self.batch.event_data_read_stats.trans(&mut k.event_data_read_stats);
self.batch.values_extract_stats.trans(&mut k.values_extract_stats);
self.current[i1] = MergedMinMaxAvgScalarStreamCurVal::Val(k); self.current[i1] = MergedMinMaxAvgScalarStreamCurVal::Val(k);
} }
Ready(Some(Err(e))) => { Ready(Some(Err(e))) => {
// TODO emit this error, consider this stream as done, anything more to do here? // TODO emit this error, consider this stream as done, anything more to do here?
//self.current[i1] = CurVal::Err(e); //self.current[i1] = CurVal::Err(e);
self.errored = true;
return Ready(Some(Err(e))); return Ready(Some(Err(e)));
} }
Ready(None) => { Ready(None) => {
@@ -239,12 +246,12 @@ where
info!("```````````````` MergedMinMaxAvgScalarStream emit Ready(Some( current batch ))"); info!("```````````````` MergedMinMaxAvgScalarStream emit Ready(Some( current batch ))");
break Ready(Some(Ok(k))); break Ready(Some(Ok(k)));
} else { } else {
self.emitted_complete = true;
info!("```````````````` MergedMinMaxAvgScalarStream emit Ready(None)"); info!("```````````````` MergedMinMaxAvgScalarStream emit Ready(None)");
self.completed = true;
break Ready(None); break Ready(None);
} }
} else { } else {
trace!("decided on next lowest ts {} ix {}", lowest_ts, lowest_ix); //trace!("decided on next lowest ts {} ix {}", lowest_ts, lowest_ix);
assert!(lowest_ts >= self.ts_last_emit); assert!(lowest_ts >= self.ts_last_emit);
self.ts_last_emit = lowest_ts; self.ts_last_emit = lowest_ts;
self.batch.tss.push(lowest_ts); self.batch.tss.push(lowest_ts);
+10 -8
View File
@@ -184,14 +184,16 @@ async fn raw_conn_handler_inner_try(
while let Some(item) = s1.next().await { while let Some(item) = s1.next().await {
if let Ok(k) = &item { if let Ok(k) = &item {
e += 1; e += 1;
trace!( if false {
"emit items sp {:2} e {:3} len {:3} {:10?} {:10?}", trace!(
node_config.node.split, "emit items sp {:2} e {:3} len {:3} {:10?} {:10?}",
e, node_config.node.split,
k.tss.len(), e,
k.tss.first().map(|k| k / SEC), k.tss.len(),
k.tss.last().map(|k| k / SEC), k.tss.first().map(|k| k / SEC),
); k.tss.last().map(|k| k / SEC),
);
}
} }
match make_frame::<RawConnOut>(&item) { match make_frame::<RawConnOut>(&item) {
Ok(buf) => match netout.write_all(&buf).await { Ok(buf) => match netout.write_all(&buf).await {
+1 -1
View File
@@ -641,7 +641,7 @@ pub mod log {
pub use tracing::{debug, error, info, span, trace, warn, Level}; pub use tracing::{debug, error, info, span, trace, warn, Level};
} }
#[derive(Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
pub struct EventDataReadStats { pub struct EventDataReadStats {
pub parsed_bytes: u64, pub parsed_bytes: u64,
} }