diff --git a/archapp/src/archeng/pipe.rs b/archapp/src/archeng/pipe.rs index d42596b..b41cca9 100644 --- a/archapp/src/archeng/pipe.rs +++ b/archapp/src/archeng/pipe.rs @@ -3,10 +3,10 @@ use crate::archeng::blockstream::BlockStream; use crate::events::{FrameMaker, FrameMakerTrait}; use err::Error; use futures_util::{Stream, StreamExt}; -use items::binnedevents::{SingleBinWaveEvents, XBinnedEvents}; +use items::binnedevents::{MultiBinWaveEvents, SingleBinWaveEvents, XBinnedEvents}; use items::eventsitem::EventsItem; use items::plainevents::{PlainEvents, WavePlainEvents}; -use items::waveevents::WaveXBinner; +use items::waveevents::{WaveNBinner, WaveXBinner}; use items::{EventsNodeProcessor, Framable, LogItem, RangeCompletableItem, StreamItem}; use netpod::query::RawEventsQuery; use netpod::{log::*, AggKind, Shape}; @@ -177,7 +177,96 @@ pub async fn make_event_pipe( }); Box::pin(tr) as _ } - AggKind::DimXBinsN(_) => err::todoval(), + AggKind::DimXBinsN(_) => { + let tr = filtered.map(move |j| match j { + Ok(j) => match j { + StreamItem::DataItem(j) => match j { + RangeCompletableItem::RangeComplete => { + Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)) + } + RangeCompletableItem::Data(j) => match j { + EventsItem::Plain(j) => match j { + PlainEvents::Scalar(_) => { + warn!("EventsItem::Plain Scalar for {:?} {:?}", cfgshape, q_agg_kind); + panic!() + } + PlainEvents::Wave(j) => { + trace!("EventsItem::Plain Wave for {:?} {:?}", cfgshape, q_agg_kind); + match j { + WavePlainEvents::Byte(j) => { + let binner = + WaveNBinner::::create(cfgshape.clone(), q_agg_kind.clone()); + let out = binner.process(j); + let item = MultiBinWaveEvents::Byte(out); + let item = XBinnedEvents::MultiBinWave(item); + let item = EventsItem::XBinnedEvents(item); + Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))) + } + WavePlainEvents::Short(j) => { + let binner = + WaveNBinner::::create(cfgshape.clone(), q_agg_kind.clone()); + let out = binner.process(j); + let item = MultiBinWaveEvents::Short(out); + let item = XBinnedEvents::MultiBinWave(item); + let item = EventsItem::XBinnedEvents(item); + Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))) + } + WavePlainEvents::Int(j) => { + let binner = + WaveNBinner::::create(cfgshape.clone(), q_agg_kind.clone()); + let out = binner.process(j); + let item = MultiBinWaveEvents::Int(out); + let item = XBinnedEvents::MultiBinWave(item); + let item = EventsItem::XBinnedEvents(item); + Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))) + } + WavePlainEvents::Float(j) => { + let binner = + WaveNBinner::::create(cfgshape.clone(), q_agg_kind.clone()); + let out = binner.process(j); + let item = MultiBinWaveEvents::Float(out); + let item = XBinnedEvents::MultiBinWave(item); + let item = EventsItem::XBinnedEvents(item); + Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))) + } + WavePlainEvents::Double(j) => { + let binner = + WaveNBinner::::create(cfgshape.clone(), q_agg_kind.clone()); + let out = binner.process(j); + let item = MultiBinWaveEvents::Double(out); + let item = XBinnedEvents::MultiBinWave(item); + let item = EventsItem::XBinnedEvents(item); + Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))) + } + } + } + }, + EventsItem::XBinnedEvents(j) => match j { + XBinnedEvents::Scalar(j) => { + warn!("XBinnedEvents::Scalar for {:?} {:?}", cfgshape, q_agg_kind); + err::todo(); + let item = XBinnedEvents::Scalar(j); + let item = EventsItem::XBinnedEvents(item); + Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))) + } + XBinnedEvents::SingleBinWave(j) => { + warn!("XBinnedEvents::SingleBinWave for {:?} {:?}", cfgshape, q_agg_kind); + err::todo(); + let item = XBinnedEvents::SingleBinWave(j); + let item = EventsItem::XBinnedEvents(item); + Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))) + } + XBinnedEvents::MultiBinWave(_) => todo!(), + }, + }, + }, + StreamItem::Log(j) => Ok(StreamItem::Log(j)), + StreamItem::Stats(j) => Ok(StreamItem::Stats(j)), + }, + Err(e) => Err(e), + }); + Box::pin(tr) as _ + } AggKind::EventBlobs => err::todoval(), }, _ => { diff --git a/items/src/waveevents.rs b/items/src/waveevents.rs index dd8554f..3c54879 100644 --- a/items/src/waveevents.rs +++ b/items/src/waveevents.rs @@ -396,11 +396,9 @@ where type Output = XBinnedWaveEvents; fn create(shape: Shape, agg_kind: AggKind) -> Self { - info!("WaveNBinner::create"); // TODO get rid of panic potential let shape_bin_count = if let Shape::Wave(n) = shape { n } else { panic!() } as usize; let x_bin_count = x_bin_count(&shape, &agg_kind); - info!("shape_bin_count {} x_bin_count {}", shape_bin_count, x_bin_count); Self { shape_bin_count, x_bin_count, diff --git a/items/src/xbinnedscalarevents.rs b/items/src/xbinnedscalarevents.rs index 85daad3..52ce8ec 100644 --- a/items/src/xbinnedscalarevents.rs +++ b/items/src/xbinnedscalarevents.rs @@ -208,15 +208,14 @@ where NTY: NumOps, { pub fn new(range: NanoRange, do_time_weight: bool) -> Self { - let int_ts = range.beg; Self { + int_ts: range.beg, range, count: 0, min: None, max: None, sumc: 0, sum: 0f32, - int_ts, last_ts: 0, last_avg: None, last_min: None, @@ -261,19 +260,14 @@ where fn apply_event_time_weight(&mut self, ts: u64) { //debug!("apply_event_time_weight"); - if let (Some(v), Some(min), Some(max)) = (self.last_avg, self.last_min, self.last_max) { + if let (Some(avg), Some(min), Some(max)) = (self.last_avg, self.last_min, self.last_max) { self.apply_min_max(min, max); - let w = if self.do_time_weight { - (ts - self.int_ts) as f32 * 1e-9 + let w = (ts - self.int_ts) as f32 / self.range.delta() as f32; + if avg.is_nan() { } else { - 1. - }; - let vf = v; - if vf.is_nan() { - } else { - self.sum += vf * w; - self.sumc += 1; + self.sum += avg * w; } + self.sumc += 1; self.int_ts = ts; } } diff --git a/items/src/xbinnedwaveevents.rs b/items/src/xbinnedwaveevents.rs index 4b6943d..2891687 100644 --- a/items/src/xbinnedwaveevents.rs +++ b/items/src/xbinnedwaveevents.rs @@ -1,3 +1,5 @@ +use std::mem; + use crate::minmaxavgwavebins::MinMaxAvgWaveBins; use crate::numops::NumOps; use crate::streams::{Collectable, Collector}; @@ -193,32 +195,190 @@ where { range: NanoRange, count: u64, - min: Vec, - max: Vec, - sum: Vec, + min: Option>, + max: Option>, sumc: u64, + sum: Vec, + int_ts: u64, + last_ts: u64, + last_avg: Option>, + last_min: Option>, + last_max: Option>, + do_time_weight: bool, } impl XBinnedWaveEventsAggregator where NTY: NumOps, { - pub fn new(range: NanoRange, bin_count: usize, do_time_weight: bool) -> Self { - if do_time_weight { - err::todo(); - } - if bin_count == 0 { - panic!("bin_count == 0"); - } + pub fn new(range: NanoRange, x_bin_count: usize, do_time_weight: bool) -> Self { Self { + int_ts: range.beg, range, count: 0, - min: vec![NTY::max_or_nan(); bin_count], - max: vec![NTY::min_or_nan(); bin_count], - sum: vec![0f32; bin_count], + min: None, + max: None, sumc: 0, + sum: vec![0f32; x_bin_count], + last_ts: 0, + last_avg: None, + last_min: None, + last_max: None, + do_time_weight, } } + + fn apply_min_max(&mut self, min: &Vec, max: &Vec) { + self.min = match self.min.take() { + None => Some(min.clone()), + Some(cmin) => { + let a = cmin + .into_iter() + .zip(min) + .map(|(a, b)| if a < *b { a } else { *b }) + .collect(); + Some(a) + } + }; + self.max = match self.max.take() { + None => Some(max.clone()), + Some(cmax) => { + let a = cmax + .into_iter() + .zip(min) + .map(|(a, b)| if a > *b { a } else { *b }) + .collect(); + Some(a) + } + }; + } + + fn apply_event_unweight(&mut self, avg: &Vec, min: &Vec, max: &Vec) { + //debug!("apply_event_unweight"); + self.apply_min_max(&min, &max); + let sum = mem::replace(&mut self.sum, vec![]); + self.sum = sum + .into_iter() + .zip(avg) + .map(|(a, &b)| if b.is_nan() { a } else { a + b }) + .collect(); + self.sumc += 1; + } + + fn apply_event_time_weight(&mut self, ts: u64) { + //debug!("apply_event_time_weight"); + if let (Some(avg), Some(min), Some(max)) = (self.last_avg.take(), self.last_min.take(), self.last_max.take()) { + self.apply_min_max(&min, &max); + let w = (ts - self.int_ts) as f32 / self.range.delta() as f32; + let sum = mem::replace(&mut self.sum, vec![]); + self.sum = sum + .into_iter() + .zip(&avg) + .map(|(a, &b)| if b.is_nan() { a } else { a + b * w }) + .collect(); + self.sumc += 1; + self.int_ts = ts; + self.last_avg = Some(avg); + self.last_min = Some(min); + self.last_max = Some(max); + } + } + + fn ingest_unweight(&mut self, item: &XBinnedWaveEvents) { + for i1 in 0..item.tss.len() { + let ts = item.tss[i1]; + let avg = &item.avgs[i1]; + let min = &item.mins[i1]; + let max = &item.maxs[i1]; + if ts < self.range.beg { + } else if ts >= self.range.end { + } else { + self.apply_event_unweight(avg, min, max); + self.count += 1; + } + } + } + + fn ingest_time_weight(&mut self, item: &XBinnedWaveEvents) { + for i1 in 0..item.tss.len() { + let ts = item.tss[i1]; + let avg = &item.avgs[i1]; + let min = &item.mins[i1]; + let max = &item.maxs[i1]; + if ts < self.int_ts { + self.last_ts = ts; + self.last_avg = Some(avg.clone()); + self.last_min = Some(min.clone()); + self.last_max = Some(max.clone()); + } else if ts >= self.range.end { + return; + } else { + self.apply_event_time_weight(ts); + self.count += 1; + self.last_ts = ts; + self.last_avg = Some(avg.clone()); + self.last_min = Some(min.clone()); + self.last_max = Some(max.clone()); + } + } + } + + fn result_reset_unweight(&mut self, range: NanoRange, _expand: bool) -> MinMaxAvgWaveBins { + let avg = if self.sumc == 0 { + None + } else { + Some(self.sum.iter().map(|k| *k / self.sumc as f32).collect()) + }; + let min = mem::replace(&mut self.min, None); + let max = mem::replace(&mut self.max, None); + let ret = MinMaxAvgWaveBins { + ts1s: vec![self.range.beg], + ts2s: vec![self.range.end], + counts: vec![self.count], + mins: vec![min], + maxs: vec![max], + avgs: vec![avg], + }; + self.int_ts = range.beg; + self.range = range; + self.count = 0; + self.min = None; + self.max = None; + self.sumc = 0; + self.sum = vec![0f32; ret.avgs.len()]; + ret + } + + fn result_reset_time_weight(&mut self, range: NanoRange, expand: bool) -> MinMaxAvgWaveBins { + // TODO check callsite for correct expand status. + if true || expand { + self.apply_event_time_weight(self.range.end); + } + let avg = if self.sumc == 0 { + None + } else { + let n = self.sum.len(); + Some(mem::replace(&mut self.sum, vec![0f32; n])) + }; + let min = mem::replace(&mut self.min, None); + let max = mem::replace(&mut self.max, None); + let ret = MinMaxAvgWaveBins { + ts1s: vec![self.range.beg], + ts2s: vec![self.range.end], + counts: vec![self.count], + mins: vec![min], + maxs: vec![max], + avgs: vec![avg], + }; + self.int_ts = range.beg; + self.range = range; + self.count = 0; + //self.min = None; + //self.max = None; + //self.sum = vec![0f32; ret.avgs.len()]; + self.sumc = 0; + ret + } } impl TimeBinnableTypeAggregator for XBinnedWaveEventsAggregator @@ -233,70 +393,19 @@ where } fn ingest(&mut self, item: &Self::Input) { - error!("time-weighted binning not available"); - err::todo(); - //info!("XBinnedWaveEventsAggregator ingest item {:?}", item); - for i1 in 0..item.tss.len() { - let ts = item.tss[i1]; - if ts < self.range.beg { - continue; - } else if ts >= self.range.end { - continue; - } else { - for (i2, &v) in item.mins[i1].iter().enumerate() { - if v < self.min[i2] || self.min[i2].is_nan() { - self.min[i2] = v; - } - } - for (i2, &v) in item.maxs[i1].iter().enumerate() { - if v > self.max[i2] || self.max[i2].is_nan() { - self.max[i2] = v; - } - } - for (i2, &v) in item.avgs[i1].iter().enumerate() { - if v.is_nan() { - } else { - self.sum[i2] += v; - } - } - self.sumc += 1; - self.count += 1; - } + if self.do_time_weight { + self.ingest_time_weight(item) + } else { + self.ingest_unweight(item) } } - fn result_reset(&mut self, range: NanoRange, _expand: bool) -> Self::Output { - let ret; - if self.sumc == 0 { - ret = Self::Output { - ts1s: vec![self.range.beg], - ts2s: vec![self.range.end], - counts: vec![self.count], - mins: vec![None], - maxs: vec![None], - avgs: vec![None], - }; + fn result_reset(&mut self, range: NanoRange, expand: bool) -> Self::Output { + if self.do_time_weight { + self.result_reset_time_weight(range, expand) } else { - let avg = self.sum.iter().map(|k| *k / self.sumc as f32).collect(); - ret = Self::Output { - ts1s: vec![self.range.beg], - ts2s: vec![self.range.end], - counts: vec![self.count], - // TODO replace with the reset-value instead. - mins: vec![Some(self.min.clone())], - maxs: vec![Some(self.max.clone())], - avgs: vec![Some(avg)], - }; - if ret.ts1s[0] < 1300 { - info!("XBinnedWaveEventsAggregator result {:?}", ret); - } + self.result_reset_unweight(range, expand) } - self.range = range; - self.count = 0; - self.min = vec![NTY::max_or_nan(); self.min.len()]; - self.max = vec![NTY::min_or_nan(); self.min.len()]; - self.sum = vec![0f32; self.min.len()]; - ret } } diff --git a/netpod/src/query.rs b/netpod/src/query.rs index 28342f8..562c0c8 100644 --- a/netpod/src/query.rs +++ b/netpod/src/query.rs @@ -215,7 +215,7 @@ impl FromUrl for BinnedQuery { .map_err(|e| Error::with_msg(format!("can not parse reportError {:?}", e)))?, timeout: pairs .get("timeout") - .map_or("2000", |k| k) + .map_or("6000", |k| k) .parse::() .map(|k| Duration::from_millis(k)) .map_err(|e| Error::with_msg(format!("can not parse timeout {:?}", e)))?, @@ -281,7 +281,7 @@ pub fn binning_scheme_append_to_url(agg_kind: &AggKind, url: &mut Url) { g.append_pair("binningScheme", "unweightedScalar"); } AggKind::DimXBinsN(n) => { - g.append_pair("binningScheme", "toScalarX"); + g.append_pair("binningScheme", "binnedX"); g.append_pair("binnedXcount", &format!("{}", n)); } }