Fix waveform binning
This commit is contained in:
@@ -3,10 +3,10 @@ use crate::archeng::blockstream::BlockStream;
|
||||
use crate::events::{FrameMaker, FrameMakerTrait};
|
||||
use err::Error;
|
||||
use futures_util::{Stream, StreamExt};
|
||||
use items::binnedevents::{SingleBinWaveEvents, XBinnedEvents};
|
||||
use items::binnedevents::{MultiBinWaveEvents, SingleBinWaveEvents, XBinnedEvents};
|
||||
use items::eventsitem::EventsItem;
|
||||
use items::plainevents::{PlainEvents, WavePlainEvents};
|
||||
use items::waveevents::WaveXBinner;
|
||||
use items::waveevents::{WaveNBinner, WaveXBinner};
|
||||
use items::{EventsNodeProcessor, Framable, LogItem, RangeCompletableItem, StreamItem};
|
||||
use netpod::query::RawEventsQuery;
|
||||
use netpod::{log::*, AggKind, Shape};
|
||||
@@ -177,7 +177,96 @@ pub async fn make_event_pipe(
|
||||
});
|
||||
Box::pin(tr) as _
|
||||
}
|
||||
AggKind::DimXBinsN(_) => err::todoval(),
|
||||
AggKind::DimXBinsN(_) => {
|
||||
let tr = filtered.map(move |j| match j {
|
||||
Ok(j) => match j {
|
||||
StreamItem::DataItem(j) => match j {
|
||||
RangeCompletableItem::RangeComplete => {
|
||||
Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete))
|
||||
}
|
||||
RangeCompletableItem::Data(j) => match j {
|
||||
EventsItem::Plain(j) => match j {
|
||||
PlainEvents::Scalar(_) => {
|
||||
warn!("EventsItem::Plain Scalar for {:?} {:?}", cfgshape, q_agg_kind);
|
||||
panic!()
|
||||
}
|
||||
PlainEvents::Wave(j) => {
|
||||
trace!("EventsItem::Plain Wave for {:?} {:?}", cfgshape, q_agg_kind);
|
||||
match j {
|
||||
WavePlainEvents::Byte(j) => {
|
||||
let binner =
|
||||
WaveNBinner::<i8>::create(cfgshape.clone(), q_agg_kind.clone());
|
||||
let out = binner.process(j);
|
||||
let item = MultiBinWaveEvents::Byte(out);
|
||||
let item = XBinnedEvents::MultiBinWave(item);
|
||||
let item = EventsItem::XBinnedEvents(item);
|
||||
Ok(StreamItem::DataItem(RangeCompletableItem::Data(item)))
|
||||
}
|
||||
WavePlainEvents::Short(j) => {
|
||||
let binner =
|
||||
WaveNBinner::<i16>::create(cfgshape.clone(), q_agg_kind.clone());
|
||||
let out = binner.process(j);
|
||||
let item = MultiBinWaveEvents::Short(out);
|
||||
let item = XBinnedEvents::MultiBinWave(item);
|
||||
let item = EventsItem::XBinnedEvents(item);
|
||||
Ok(StreamItem::DataItem(RangeCompletableItem::Data(item)))
|
||||
}
|
||||
WavePlainEvents::Int(j) => {
|
||||
let binner =
|
||||
WaveNBinner::<i32>::create(cfgshape.clone(), q_agg_kind.clone());
|
||||
let out = binner.process(j);
|
||||
let item = MultiBinWaveEvents::Int(out);
|
||||
let item = XBinnedEvents::MultiBinWave(item);
|
||||
let item = EventsItem::XBinnedEvents(item);
|
||||
Ok(StreamItem::DataItem(RangeCompletableItem::Data(item)))
|
||||
}
|
||||
WavePlainEvents::Float(j) => {
|
||||
let binner =
|
||||
WaveNBinner::<f32>::create(cfgshape.clone(), q_agg_kind.clone());
|
||||
let out = binner.process(j);
|
||||
let item = MultiBinWaveEvents::Float(out);
|
||||
let item = XBinnedEvents::MultiBinWave(item);
|
||||
let item = EventsItem::XBinnedEvents(item);
|
||||
Ok(StreamItem::DataItem(RangeCompletableItem::Data(item)))
|
||||
}
|
||||
WavePlainEvents::Double(j) => {
|
||||
let binner =
|
||||
WaveNBinner::<f64>::create(cfgshape.clone(), q_agg_kind.clone());
|
||||
let out = binner.process(j);
|
||||
let item = MultiBinWaveEvents::Double(out);
|
||||
let item = XBinnedEvents::MultiBinWave(item);
|
||||
let item = EventsItem::XBinnedEvents(item);
|
||||
Ok(StreamItem::DataItem(RangeCompletableItem::Data(item)))
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
EventsItem::XBinnedEvents(j) => match j {
|
||||
XBinnedEvents::Scalar(j) => {
|
||||
warn!("XBinnedEvents::Scalar for {:?} {:?}", cfgshape, q_agg_kind);
|
||||
err::todo();
|
||||
let item = XBinnedEvents::Scalar(j);
|
||||
let item = EventsItem::XBinnedEvents(item);
|
||||
Ok(StreamItem::DataItem(RangeCompletableItem::Data(item)))
|
||||
}
|
||||
XBinnedEvents::SingleBinWave(j) => {
|
||||
warn!("XBinnedEvents::SingleBinWave for {:?} {:?}", cfgshape, q_agg_kind);
|
||||
err::todo();
|
||||
let item = XBinnedEvents::SingleBinWave(j);
|
||||
let item = EventsItem::XBinnedEvents(item);
|
||||
Ok(StreamItem::DataItem(RangeCompletableItem::Data(item)))
|
||||
}
|
||||
XBinnedEvents::MultiBinWave(_) => todo!(),
|
||||
},
|
||||
},
|
||||
},
|
||||
StreamItem::Log(j) => Ok(StreamItem::Log(j)),
|
||||
StreamItem::Stats(j) => Ok(StreamItem::Stats(j)),
|
||||
},
|
||||
Err(e) => Err(e),
|
||||
});
|
||||
Box::pin(tr) as _
|
||||
}
|
||||
AggKind::EventBlobs => err::todoval(),
|
||||
},
|
||||
_ => {
|
||||
|
||||
@@ -396,11 +396,9 @@ where
|
||||
type Output = XBinnedWaveEvents<NTY>;
|
||||
|
||||
fn create(shape: Shape, agg_kind: AggKind) -> Self {
|
||||
info!("WaveNBinner::create");
|
||||
// TODO get rid of panic potential
|
||||
let shape_bin_count = if let Shape::Wave(n) = shape { n } else { panic!() } as usize;
|
||||
let x_bin_count = x_bin_count(&shape, &agg_kind);
|
||||
info!("shape_bin_count {} x_bin_count {}", shape_bin_count, x_bin_count);
|
||||
Self {
|
||||
shape_bin_count,
|
||||
x_bin_count,
|
||||
|
||||
@@ -208,15 +208,14 @@ where
|
||||
NTY: NumOps,
|
||||
{
|
||||
pub fn new(range: NanoRange, do_time_weight: bool) -> Self {
|
||||
let int_ts = range.beg;
|
||||
Self {
|
||||
int_ts: range.beg,
|
||||
range,
|
||||
count: 0,
|
||||
min: None,
|
||||
max: None,
|
||||
sumc: 0,
|
||||
sum: 0f32,
|
||||
int_ts,
|
||||
last_ts: 0,
|
||||
last_avg: None,
|
||||
last_min: None,
|
||||
@@ -261,19 +260,14 @@ where
|
||||
|
||||
fn apply_event_time_weight(&mut self, ts: u64) {
|
||||
//debug!("apply_event_time_weight");
|
||||
if let (Some(v), Some(min), Some(max)) = (self.last_avg, self.last_min, self.last_max) {
|
||||
if let (Some(avg), Some(min), Some(max)) = (self.last_avg, self.last_min, self.last_max) {
|
||||
self.apply_min_max(min, max);
|
||||
let w = if self.do_time_weight {
|
||||
(ts - self.int_ts) as f32 * 1e-9
|
||||
let w = (ts - self.int_ts) as f32 / self.range.delta() as f32;
|
||||
if avg.is_nan() {
|
||||
} else {
|
||||
1.
|
||||
};
|
||||
let vf = v;
|
||||
if vf.is_nan() {
|
||||
} else {
|
||||
self.sum += vf * w;
|
||||
self.sumc += 1;
|
||||
self.sum += avg * w;
|
||||
}
|
||||
self.sumc += 1;
|
||||
self.int_ts = ts;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
use std::mem;
|
||||
|
||||
use crate::minmaxavgwavebins::MinMaxAvgWaveBins;
|
||||
use crate::numops::NumOps;
|
||||
use crate::streams::{Collectable, Collector};
|
||||
@@ -193,32 +195,190 @@ where
|
||||
{
|
||||
range: NanoRange,
|
||||
count: u64,
|
||||
min: Vec<NTY>,
|
||||
max: Vec<NTY>,
|
||||
sum: Vec<f32>,
|
||||
min: Option<Vec<NTY>>,
|
||||
max: Option<Vec<NTY>>,
|
||||
sumc: u64,
|
||||
sum: Vec<f32>,
|
||||
int_ts: u64,
|
||||
last_ts: u64,
|
||||
last_avg: Option<Vec<f32>>,
|
||||
last_min: Option<Vec<NTY>>,
|
||||
last_max: Option<Vec<NTY>>,
|
||||
do_time_weight: bool,
|
||||
}
|
||||
|
||||
impl<NTY> XBinnedWaveEventsAggregator<NTY>
|
||||
where
|
||||
NTY: NumOps,
|
||||
{
|
||||
pub fn new(range: NanoRange, bin_count: usize, do_time_weight: bool) -> Self {
|
||||
if do_time_weight {
|
||||
err::todo();
|
||||
}
|
||||
if bin_count == 0 {
|
||||
panic!("bin_count == 0");
|
||||
}
|
||||
pub fn new(range: NanoRange, x_bin_count: usize, do_time_weight: bool) -> Self {
|
||||
Self {
|
||||
int_ts: range.beg,
|
||||
range,
|
||||
count: 0,
|
||||
min: vec![NTY::max_or_nan(); bin_count],
|
||||
max: vec![NTY::min_or_nan(); bin_count],
|
||||
sum: vec![0f32; bin_count],
|
||||
min: None,
|
||||
max: None,
|
||||
sumc: 0,
|
||||
sum: vec![0f32; x_bin_count],
|
||||
last_ts: 0,
|
||||
last_avg: None,
|
||||
last_min: None,
|
||||
last_max: None,
|
||||
do_time_weight,
|
||||
}
|
||||
}
|
||||
|
||||
fn apply_min_max(&mut self, min: &Vec<NTY>, max: &Vec<NTY>) {
|
||||
self.min = match self.min.take() {
|
||||
None => Some(min.clone()),
|
||||
Some(cmin) => {
|
||||
let a = cmin
|
||||
.into_iter()
|
||||
.zip(min)
|
||||
.map(|(a, b)| if a < *b { a } else { *b })
|
||||
.collect();
|
||||
Some(a)
|
||||
}
|
||||
};
|
||||
self.max = match self.max.take() {
|
||||
None => Some(max.clone()),
|
||||
Some(cmax) => {
|
||||
let a = cmax
|
||||
.into_iter()
|
||||
.zip(min)
|
||||
.map(|(a, b)| if a > *b { a } else { *b })
|
||||
.collect();
|
||||
Some(a)
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
fn apply_event_unweight(&mut self, avg: &Vec<f32>, min: &Vec<NTY>, max: &Vec<NTY>) {
|
||||
//debug!("apply_event_unweight");
|
||||
self.apply_min_max(&min, &max);
|
||||
let sum = mem::replace(&mut self.sum, vec![]);
|
||||
self.sum = sum
|
||||
.into_iter()
|
||||
.zip(avg)
|
||||
.map(|(a, &b)| if b.is_nan() { a } else { a + b })
|
||||
.collect();
|
||||
self.sumc += 1;
|
||||
}
|
||||
|
||||
fn apply_event_time_weight(&mut self, ts: u64) {
|
||||
//debug!("apply_event_time_weight");
|
||||
if let (Some(avg), Some(min), Some(max)) = (self.last_avg.take(), self.last_min.take(), self.last_max.take()) {
|
||||
self.apply_min_max(&min, &max);
|
||||
let w = (ts - self.int_ts) as f32 / self.range.delta() as f32;
|
||||
let sum = mem::replace(&mut self.sum, vec![]);
|
||||
self.sum = sum
|
||||
.into_iter()
|
||||
.zip(&avg)
|
||||
.map(|(a, &b)| if b.is_nan() { a } else { a + b * w })
|
||||
.collect();
|
||||
self.sumc += 1;
|
||||
self.int_ts = ts;
|
||||
self.last_avg = Some(avg);
|
||||
self.last_min = Some(min);
|
||||
self.last_max = Some(max);
|
||||
}
|
||||
}
|
||||
|
||||
fn ingest_unweight(&mut self, item: &XBinnedWaveEvents<NTY>) {
|
||||
for i1 in 0..item.tss.len() {
|
||||
let ts = item.tss[i1];
|
||||
let avg = &item.avgs[i1];
|
||||
let min = &item.mins[i1];
|
||||
let max = &item.maxs[i1];
|
||||
if ts < self.range.beg {
|
||||
} else if ts >= self.range.end {
|
||||
} else {
|
||||
self.apply_event_unweight(avg, min, max);
|
||||
self.count += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn ingest_time_weight(&mut self, item: &XBinnedWaveEvents<NTY>) {
|
||||
for i1 in 0..item.tss.len() {
|
||||
let ts = item.tss[i1];
|
||||
let avg = &item.avgs[i1];
|
||||
let min = &item.mins[i1];
|
||||
let max = &item.maxs[i1];
|
||||
if ts < self.int_ts {
|
||||
self.last_ts = ts;
|
||||
self.last_avg = Some(avg.clone());
|
||||
self.last_min = Some(min.clone());
|
||||
self.last_max = Some(max.clone());
|
||||
} else if ts >= self.range.end {
|
||||
return;
|
||||
} else {
|
||||
self.apply_event_time_weight(ts);
|
||||
self.count += 1;
|
||||
self.last_ts = ts;
|
||||
self.last_avg = Some(avg.clone());
|
||||
self.last_min = Some(min.clone());
|
||||
self.last_max = Some(max.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn result_reset_unweight(&mut self, range: NanoRange, _expand: bool) -> MinMaxAvgWaveBins<NTY> {
|
||||
let avg = if self.sumc == 0 {
|
||||
None
|
||||
} else {
|
||||
Some(self.sum.iter().map(|k| *k / self.sumc as f32).collect())
|
||||
};
|
||||
let min = mem::replace(&mut self.min, None);
|
||||
let max = mem::replace(&mut self.max, None);
|
||||
let ret = MinMaxAvgWaveBins {
|
||||
ts1s: vec![self.range.beg],
|
||||
ts2s: vec![self.range.end],
|
||||
counts: vec![self.count],
|
||||
mins: vec![min],
|
||||
maxs: vec![max],
|
||||
avgs: vec![avg],
|
||||
};
|
||||
self.int_ts = range.beg;
|
||||
self.range = range;
|
||||
self.count = 0;
|
||||
self.min = None;
|
||||
self.max = None;
|
||||
self.sumc = 0;
|
||||
self.sum = vec![0f32; ret.avgs.len()];
|
||||
ret
|
||||
}
|
||||
|
||||
fn result_reset_time_weight(&mut self, range: NanoRange, expand: bool) -> MinMaxAvgWaveBins<NTY> {
|
||||
// TODO check callsite for correct expand status.
|
||||
if true || expand {
|
||||
self.apply_event_time_weight(self.range.end);
|
||||
}
|
||||
let avg = if self.sumc == 0 {
|
||||
None
|
||||
} else {
|
||||
let n = self.sum.len();
|
||||
Some(mem::replace(&mut self.sum, vec![0f32; n]))
|
||||
};
|
||||
let min = mem::replace(&mut self.min, None);
|
||||
let max = mem::replace(&mut self.max, None);
|
||||
let ret = MinMaxAvgWaveBins {
|
||||
ts1s: vec![self.range.beg],
|
||||
ts2s: vec![self.range.end],
|
||||
counts: vec![self.count],
|
||||
mins: vec![min],
|
||||
maxs: vec![max],
|
||||
avgs: vec![avg],
|
||||
};
|
||||
self.int_ts = range.beg;
|
||||
self.range = range;
|
||||
self.count = 0;
|
||||
//self.min = None;
|
||||
//self.max = None;
|
||||
//self.sum = vec![0f32; ret.avgs.len()];
|
||||
self.sumc = 0;
|
||||
ret
|
||||
}
|
||||
}
|
||||
|
||||
impl<NTY> TimeBinnableTypeAggregator for XBinnedWaveEventsAggregator<NTY>
|
||||
@@ -233,70 +393,19 @@ where
|
||||
}
|
||||
|
||||
fn ingest(&mut self, item: &Self::Input) {
|
||||
error!("time-weighted binning not available");
|
||||
err::todo();
|
||||
//info!("XBinnedWaveEventsAggregator ingest item {:?}", item);
|
||||
for i1 in 0..item.tss.len() {
|
||||
let ts = item.tss[i1];
|
||||
if ts < self.range.beg {
|
||||
continue;
|
||||
} else if ts >= self.range.end {
|
||||
continue;
|
||||
} else {
|
||||
for (i2, &v) in item.mins[i1].iter().enumerate() {
|
||||
if v < self.min[i2] || self.min[i2].is_nan() {
|
||||
self.min[i2] = v;
|
||||
}
|
||||
}
|
||||
for (i2, &v) in item.maxs[i1].iter().enumerate() {
|
||||
if v > self.max[i2] || self.max[i2].is_nan() {
|
||||
self.max[i2] = v;
|
||||
}
|
||||
}
|
||||
for (i2, &v) in item.avgs[i1].iter().enumerate() {
|
||||
if v.is_nan() {
|
||||
} else {
|
||||
self.sum[i2] += v;
|
||||
}
|
||||
}
|
||||
self.sumc += 1;
|
||||
self.count += 1;
|
||||
}
|
||||
if self.do_time_weight {
|
||||
self.ingest_time_weight(item)
|
||||
} else {
|
||||
self.ingest_unweight(item)
|
||||
}
|
||||
}
|
||||
|
||||
fn result_reset(&mut self, range: NanoRange, _expand: bool) -> Self::Output {
|
||||
let ret;
|
||||
if self.sumc == 0 {
|
||||
ret = Self::Output {
|
||||
ts1s: vec![self.range.beg],
|
||||
ts2s: vec![self.range.end],
|
||||
counts: vec![self.count],
|
||||
mins: vec![None],
|
||||
maxs: vec![None],
|
||||
avgs: vec![None],
|
||||
};
|
||||
fn result_reset(&mut self, range: NanoRange, expand: bool) -> Self::Output {
|
||||
if self.do_time_weight {
|
||||
self.result_reset_time_weight(range, expand)
|
||||
} else {
|
||||
let avg = self.sum.iter().map(|k| *k / self.sumc as f32).collect();
|
||||
ret = Self::Output {
|
||||
ts1s: vec![self.range.beg],
|
||||
ts2s: vec![self.range.end],
|
||||
counts: vec![self.count],
|
||||
// TODO replace with the reset-value instead.
|
||||
mins: vec![Some(self.min.clone())],
|
||||
maxs: vec![Some(self.max.clone())],
|
||||
avgs: vec![Some(avg)],
|
||||
};
|
||||
if ret.ts1s[0] < 1300 {
|
||||
info!("XBinnedWaveEventsAggregator result {:?}", ret);
|
||||
}
|
||||
self.result_reset_unweight(range, expand)
|
||||
}
|
||||
self.range = range;
|
||||
self.count = 0;
|
||||
self.min = vec![NTY::max_or_nan(); self.min.len()];
|
||||
self.max = vec![NTY::min_or_nan(); self.min.len()];
|
||||
self.sum = vec![0f32; self.min.len()];
|
||||
ret
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -215,7 +215,7 @@ impl FromUrl for BinnedQuery {
|
||||
.map_err(|e| Error::with_msg(format!("can not parse reportError {:?}", e)))?,
|
||||
timeout: pairs
|
||||
.get("timeout")
|
||||
.map_or("2000", |k| k)
|
||||
.map_or("6000", |k| k)
|
||||
.parse::<u64>()
|
||||
.map(|k| Duration::from_millis(k))
|
||||
.map_err(|e| Error::with_msg(format!("can not parse timeout {:?}", e)))?,
|
||||
@@ -281,7 +281,7 @@ pub fn binning_scheme_append_to_url(agg_kind: &AggKind, url: &mut Url) {
|
||||
g.append_pair("binningScheme", "unweightedScalar");
|
||||
}
|
||||
AggKind::DimXBinsN(n) => {
|
||||
g.append_pair("binningScheme", "toScalarX");
|
||||
g.append_pair("binningScheme", "binnedX");
|
||||
g.append_pair("binnedXcount", &format!("{}", n));
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user