This commit is contained in:
Dominik Werder
2023-04-26 16:27:30 +02:00
parent 773901939d
commit 524d89b7f9
9 changed files with 157 additions and 115 deletions

View File

@@ -433,28 +433,23 @@ pub struct EventsDim0Aggregator<STY> {
sumc: u64,
sum: f32,
int_ts: u64,
last_seen_ts: u64,
last_seen_val: Option<STY>,
last_ts: u64,
last_val: Option<STY>,
did_min_max: bool,
do_time_weight: bool,
events_taken_count: u64,
events_ignored_count: u64,
}
impl<STY> Drop for EventsDim0Aggregator<STY> {
fn drop(&mut self) {
// TODO collect as stats for the request context:
trace!(
"taken {} ignored {}",
self.events_taken_count,
self.events_ignored_count
);
trace!("count {} ignored {}", self.count, self.events_ignored_count);
}
}
impl<STY: ScalarOps> EventsDim0Aggregator<STY> {
fn self_name() -> String {
format!("{}<{}>", any::type_name::<Self>(), any::type_name::<STY>())
any::type_name::<Self>().to_string()
}
pub fn new(range: SeriesRange, do_time_weight: bool) -> Self {
@@ -464,14 +459,13 @@ impl<STY: ScalarOps> EventsDim0Aggregator<STY> {
count: 0,
min: STY::zero_b(),
max: STY::zero_b(),
sum: 0.,
sumc: 0,
sum: 0.,
int_ts,
last_seen_ts: 0,
last_seen_val: None,
last_ts: 0,
last_val: None,
did_min_max: false,
do_time_weight,
events_taken_count: 0,
events_ignored_count: 0,
}
}
@@ -513,19 +507,19 @@ impl<STY: ScalarOps> EventsDim0Aggregator<STY> {
}
}
fn apply_event_time_weight(&mut self, px: u64, pxbeg: u64) {
if let Some(v) = &self.last_seen_val {
fn apply_event_time_weight(&mut self, px: u64) {
if let Some(v) = &self.last_val {
trace_ingest!("apply_event_time_weight with v {v:?}");
let vf = v.as_prim_f32_b();
let v2 = v.clone();
if px > pxbeg {
if px > self.range.beg_u64() {
let v2 = v.clone();
self.apply_min_max(v2);
}
self.sumc += 1;
let w = (px - self.int_ts) as f32 * 1e-9;
if vf.is_nan() {
} else {
self.sum += vf * w;
self.sumc += 1;
}
self.int_ts = px;
} else {
@@ -548,7 +542,6 @@ impl<STY: ScalarOps> EventsDim0Aggregator<STY> {
} else {
self.apply_event_unweight(val);
self.count += 1;
self.events_taken_count += 1;
}
}
} else {
@@ -561,34 +554,25 @@ impl<STY: ScalarOps> EventsDim0Aggregator<STY> {
let self_name = any::type_name::<Self>();
trace_ingest!("{self_name}::ingest_time_weight item len {}", item.len());
if self.range.is_time() {
for i1 in 0..item.tss.len() {
let ts = item.tss[i1];
let val = item.values[i1].clone();
if ts < self.int_ts {
trace_ingest!("{self_name} ingest {:6} {:20} {:10?} BEFORE", i1, ts, val);
self.last_seen_ts = ts;
self.last_seen_val = Some(val);
self.events_ignored_count += 1;
} else if ts >= self.range.end_u64() {
let range_beg = self.range.beg_u64();
let range_end = self.range.end_u64();
for (&ts, val) in item.tss.iter().zip(item.values.iter()) {
if ts >= range_end {
trace_ingest!("{self_name} ingest {:6} {:20} {:10?} AFTER", i1, ts, val);
self.events_ignored_count += 1;
return;
} else {
trace_ingest!("{self_name} ingest {:6} {:20} {:10?} IN", i1, ts, val);
if false && self.last_seen_val.is_none() {
// TODO no longer needed or?
trace_ingest!(
"call apply_min_max without last val, use current instead {} {:?}",
ts,
val
);
self.apply_min_max(val.clone());
}
self.apply_event_time_weight(ts, self.range.beg_u64());
// TODO count all the ignored events for stats
break;
} else if ts >= range_beg {
trace_ingest!("{self_name} ingest {:6} {:20} {:10?} INSIDE", i1, ts, val);
self.apply_event_time_weight(ts);
self.count += 1;
self.last_seen_ts = ts;
self.last_seen_val = Some(val);
self.events_taken_count += 1;
self.last_ts = ts;
self.last_val = Some(val.clone());
} else {
trace_ingest!("{self_name} ingest {:6} {:20} {:10?} BEFORE", i1, ts, val);
self.events_ignored_count += 1;
self.last_ts = ts;
self.last_val = Some(val.clone());
}
}
} else {
@@ -604,7 +588,7 @@ impl<STY: ScalarOps> EventsDim0Aggregator<STY> {
let avg = self.sum / self.sumc as f32;
(self.min.clone(), self.max.clone(), avg)
} else {
let g = match &self.last_seen_val {
let g = match &self.last_val {
Some(x) => x.clone(),
None => STY::zero_b(),
};
@@ -627,8 +611,10 @@ impl<STY: ScalarOps> EventsDim0Aggregator<STY> {
self.int_ts = range.beg_u64();
self.range = range;
self.count = 0;
self.sum = 0f32;
self.sum = 0.;
self.sumc = 0;
self.min = STY::zero_b();
self.max = STY::zero_b();
self.did_min_max = false;
ret
}
@@ -639,7 +625,7 @@ impl<STY: ScalarOps> EventsDim0Aggregator<STY> {
let range_beg = self.range.beg_u64();
let range_end = self.range.end_u64();
if self.range.is_time() {
self.apply_event_time_weight(range_end, range_beg);
self.apply_event_time_weight(range_end);
} else {
error!("TODO result_reset_time_weight");
err::todoval()
@@ -648,7 +634,7 @@ impl<STY: ScalarOps> EventsDim0Aggregator<STY> {
let avg = self.sum / (self.range.delta_u64() as f32 * 1e-9);
(self.min.clone(), self.max.clone(), avg)
} else {
let g = match &self.last_seen_val {
let g = match &self.last_val {
Some(x) => x.clone(),
None => STY::zero_b(),
};
@@ -671,11 +657,11 @@ impl<STY: ScalarOps> EventsDim0Aggregator<STY> {
self.int_ts = range_beg;
self.range = range;
self.count = 0;
self.sum = 0.;
self.sumc = 0;
self.did_min_max = false;
self.sum = 0.;
self.min = STY::zero_b();
self.max = STY::zero_b();
self.did_min_max = false;
ret
}
}

View File

@@ -619,15 +619,19 @@ where
STY: ScalarOps,
{
range: SeriesRange,
/// Number of events which actually fall in this bin.
count: u64,
min: STY,
max: STY,
/// Number of times we accumulated to the sum of this bin.
sumc: u64,
sum: f32,
int_ts: u64,
last_ts: u64,
last_vals: Option<(STY, STY, f32)>,
did_min_max: bool,
do_time_weight: bool,
events_ignored_count: u64,
}
impl<STY> EventsXbinDim0Aggregator<STY>
@@ -642,6 +646,7 @@ where
let int_ts = range.beg_u64();
Self {
range,
did_min_max: false,
count: 0,
min: STY::zero_b(),
max: STY::zero_b(),
@@ -650,12 +655,17 @@ where
int_ts,
last_ts: 0,
last_vals: None,
events_ignored_count: 0,
do_time_weight,
}
}
fn apply_min_max(&mut self, min: &STY, max: &STY) {
if self.count == 0 {
if self.did_min_max != (self.sumc > 0) {
panic!("logic error apply_min_max {} {}", self.did_min_max, self.sumc);
}
if self.sumc == 0 {
self.did_min_max = true;
self.min = min.clone();
self.max = max.clone();
} else {
@@ -671,35 +681,39 @@ where
fn apply_event_unweight(&mut self, avg: f32, min: STY, max: STY) {
//debug!("apply_event_unweight");
self.apply_min_max(&min, &max);
self.sumc += 1;
let vf = avg;
if vf.is_nan() {
} else {
self.sum += vf;
self.sumc += 1;
}
}
fn apply_event_time_weight(&mut self, px: u64, pxbeg: u64) {
// Only integrate, do not count because it is used even if the event does not fall into current bin.
fn apply_event_time_weight(&mut self, px: u64) {
trace_ingest!(
"apply_event_time_weight px {} pxbeg {} count {}",
"apply_event_time_weight px {} count {} sumc {} events_ignored_count {}",
px,
pxbeg,
self.count
self.count,
self.sumc,
self.events_ignored_count
);
if let Some((min, max, avg)) = self.last_vals.as_ref() {
let vf = *avg;
if px > pxbeg {
{
let min = min.clone();
let max = max.clone();
self.apply_min_max(&min, &max);
}
self.sumc += 1;
let w = (px - self.int_ts) as f32 * 1e-9;
if vf.is_nan() {
} else {
self.sum += vf * w;
self.sumc += 1;
}
self.int_ts = px;
} else {
debug!("apply_event_time_weight NO VALUE");
}
}
@@ -713,7 +727,6 @@ where
} else if ts >= self.range.end {
} else {
self.apply_event_unweight(avg, min, max);
self.count += 1;
}
}*/
todo!()
@@ -721,11 +734,13 @@ where
fn ingest_time_weight(&mut self, item: &EventsXbinDim0<STY>) {
trace!(
"{} ingest_time_weight range {:?} int_ts {:?}",
"{} ingest_time_weight range {:?} last_ts {:?} int_ts {:?}",
Self::type_name(),
self.range,
self.last_ts,
self.int_ts
);
let range_beg = self.range.beg_u64();
let range_end = self.range.end_u64();
for (((&ts, min), max), avg) in item
.tss
@@ -734,24 +749,24 @@ where
.zip(item.maxs.iter())
.zip(item.avgs.iter())
{
if ts < self.int_ts {
self.last_ts = ts;
self.last_vals = Some((min.clone(), max.clone(), avg.clone()));
//self.events_ignored_count += 1;
} else if ts >= self.range.end_u64() {
//self.events_ignored_count += 1;
return;
} else {
self.apply_event_time_weight(ts, self.range.beg_u64());
if ts >= range_end {
self.events_ignored_count += 1;
// TODO break early when tests pass.
//break;
} else if ts >= range_beg {
self.apply_event_time_weight(ts);
self.count += 1;
self.last_ts = ts;
self.last_vals = Some((min.clone(), max.clone(), avg.clone()));
//self.events_taken_count += 1;
} else {
self.events_ignored_count += 1;
self.last_ts = ts;
self.last_vals = Some((min.clone(), max.clone(), avg.clone()));
}
}
}
fn result_reset_unweight(&mut self, range: SeriesRange, _expand: bool) -> BinsXbinDim0<STY> {
fn result_reset_unweight(&mut self, range: SeriesRange) -> BinsXbinDim0<STY> {
/*let avg = if self.sumc == 0 {
0f32
} else {
@@ -767,20 +782,20 @@ where
);
self.int_ts = range.beg;
self.range = range;
self.count = 0;
self.min = NTY::zero_b();
self.max = NTY::zero_b();
self.sum = 0f32;
self.sumc = 0;
self.did_min_max = false;
self.min = NTY::zero_b();
self.max = NTY::zero_b();
ret*/
todo!()
}
fn result_reset_time_weight(&mut self, range: SeriesRange, expand: bool) -> BinsXbinDim0<STY> {
fn result_reset_time_weight(&mut self, range: SeriesRange) -> BinsXbinDim0<STY> {
trace!("{} result_reset_time_weight", Self::type_name());
// TODO check callsite for correct expand status.
if self.range.is_time() {
self.apply_event_time_weight(self.range.end_u64(), self.range.beg_u64());
self.apply_event_time_weight(self.range.end_u64());
} else {
error!("TODO result_reset_time_weight");
err::todoval()
@@ -792,7 +807,10 @@ where
(self.min.clone(), self.max.clone(), avg)
} else {
let (min, max, avg) = match &self.last_vals {
Some((min, max, avg)) => (min.clone(), max.clone(), avg.clone()),
Some((min, max, avg)) => {
warn!("\n\n\n!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! SHOULD ALWAYS HAVE ACCUMULATED IN THIS CASE");
(min.clone(), max.clone(), avg.clone())
}
None => (STY::zero_b(), STY::zero_b(), 0.),
};
(min, max, avg)
@@ -808,8 +826,9 @@ where
self.int_ts = range_beg;
self.range = range;
self.count = 0;
self.sum = 0.;
self.sumc = 0;
self.sum = 0.;
self.did_min_max = false;
self.min = STY::zero_b();
self.max = STY::zero_b();
ret
@@ -838,9 +857,9 @@ where
fn result_reset(&mut self, range: SeriesRange, expand: bool) -> Self::Output {
if self.do_time_weight {
self.result_reset_time_weight(range, expand)
self.result_reset_time_weight(range)
} else {
self.result_reset_unweight(range, expand)
self.result_reset_unweight(range)
}
}
}