This commit is contained in:
Dominik Werder
2023-05-05 11:45:38 +02:00
parent 7ae5b25107
commit 85bd7ba57e
2 changed files with 185 additions and 72 deletions

View File

@@ -1,6 +1,10 @@
use crate::binsdim0::BinsDim0;
use crate::framable::FrameType;
use crate::framable::FrameTypeStatic;
use crate::timebin::ChooseIndicesForTimeBin;
use crate::timebin::ChooseIndicesForTimeBinEvents;
use crate::timebin::TimeAggregatorCommonV0Func;
use crate::timebin::TimeAggregatorCommonV0Trait;
use crate::timebin::TimeBinnerCommonV0Func;
use crate::timebin::TimeBinnerCommonV0Trait;
use crate::IsoDateTime;
@@ -184,6 +188,16 @@ impl<STY: ScalarOps> HasTimestampDeque for EventsDim0<STY> {
items_0::impl_range_overlap_info_events!(EventsDim0);
impl<STY> ChooseIndicesForTimeBin for EventsDim0<STY> {
fn choose_indices_unweight(&self, beg: u64, end: u64) -> (Option<usize>, usize, usize) {
ChooseIndicesForTimeBinEvents::choose_unweight(beg, end, &self.tss)
}
fn choose_indices_timeweight(&self, beg: u64, end: u64) -> (Option<usize>, usize, usize) {
ChooseIndicesForTimeBinEvents::choose_timeweight(beg, end, &self.tss)
}
}
impl<STY> TimeBinnableType for EventsDim0<STY>
where
STY: ScalarOps,
@@ -457,6 +471,46 @@ impl<STY> Drop for EventsDim0Aggregator<STY> {
}
}
impl<STY: ScalarOps> TimeAggregatorCommonV0Trait for EventsDim0Aggregator<STY> {
type Input = <Self as TimeBinnableTypeAggregator>::Input;
type Output = <Self as TimeBinnableTypeAggregator>::Output;
fn type_name() -> &'static str {
Self::type_name()
}
fn common_range_current(&self) -> &SeriesRange {
&self.range
}
fn common_ingest_unweight_range(&mut self, item: &Self::Input, r: std::ops::Range<usize>) {
for (&ts, val) in item.tss.range(r.clone()).zip(item.values.range(r)) {
self.apply_event_unweight(val.clone());
self.count += 1;
self.last_ts = ts;
self.last_val = Some(val.clone());
}
}
fn common_ingest_one_before(&mut self, item: &Self::Input, j: usize) {
//trace_ingest!("{self_name} ingest {:6} {:20} {:10?} BEFORE", i1, ts, val);
self.last_ts = item.tss[j];
self.last_val = Some(item.values[j].clone());
}
fn common_ingest_range(&mut self, item: &Self::Input, r: std::ops::Range<usize>) {
let beg = self.range.beg_u64();
for (&ts, val) in item.tss.range(r.clone()).zip(item.values.range(r)) {
if ts > beg {
self.apply_event_time_weight(ts);
}
self.count += 1;
self.last_ts = ts;
self.last_val = Some(val.clone());
}
}
}
impl<STY: ScalarOps> EventsDim0Aggregator<STY> {
fn type_name() -> &'static str {
any::type_name::<Self>()
@@ -542,69 +596,25 @@ impl<STY: ScalarOps> EventsDim0Aggregator<STY> {
}
fn ingest_unweight(&mut self, item: &<Self as TimeBinnableTypeAggregator>::Input) {
error!("TODO check again result_reset_unweight");
err::todo();
if self.range.is_time() {
for i1 in 0..item.tss.len() {
let ts = item.tss[i1];
let val = item.values[i1].clone();
if ts < self.range.beg_u64() {
self.events_ignored_count += 1;
} else if ts >= self.range.end_u64() {
self.events_ignored_count += 1;
return;
} else {
self.apply_event_unweight(val);
self.count += 1;
}
}
} else {
error!("TODO ingest_unweight");
err::todo();
}
TimeAggregatorCommonV0Func::ingest_time_weight(self, item)
}
fn ingest_time_weight(&mut self, item: &<Self as TimeBinnableTypeAggregator>::Input) {
let self_name = any::type_name::<Self>();
trace_ingest!(
"{self_name}::ingest_time_weight item len {} items_seen {}",
item.len(),
self.items_seen
);
self.items_seen += 1;
if self.range.is_time() {
let range_beg = self.range.beg_u64();
let range_end = self.range.end_u64();
for (i1, (&ts, val)) in item.tss.iter().zip(item.values.iter()).enumerate() {
if ts >= range_end {
trace_ingest!("{self_name} ingest {:6} {:20} {:10?} AFTER", i1, ts, val);
self.events_ignored_count += 1;
// TODO count all the ignored events for stats
break;
} else if ts >= range_beg {
trace_ingest!("{self_name} ingest {:6} {:20} {:10?} INSIDE", i1, ts, val);
if ts > range_beg {
self.apply_event_time_weight(ts);
}
self.count += 1;
self.last_ts = ts;
self.last_val = Some(val.clone());
} else {
trace_ingest!("{self_name} ingest {:6} {:20} {:10?} BEFORE", i1, ts, val);
self.events_ignored_count += 1;
self.last_ts = ts;
self.last_val = Some(val.clone());
}
}
} else {
error!("TODO ingest_unweight");
err::todo();
}
TimeAggregatorCommonV0Func::ingest_time_weight(self, item)
}
fn reset_values(&mut self, range: SeriesRange) {
self.int_ts = range.beg_u64();
trace!("ON RESET SET int_ts {:10}", self.int_ts);
self.range = range;
self.count = 0;
self.sum = 0.;
self.sumc = 0;
self.minmax = None;
self.items_seen = 0;
}
fn result_reset_unweight(&mut self, range: SeriesRange) -> BinsDim0<STY> {
trace!("TODO check again result_reset_unweight");
err::todo();
let (min, max) = if let Some((min, max)) = self.minmax.take() {
(min, max)
} else {
@@ -629,14 +639,7 @@ impl<STY: ScalarOps> EventsDim0Aggregator<STY> {
error!("TODO result_reset_unweight");
err::todoval()
};
self.int_ts = range.beg_u64();
trace!("ON RESET SET int_ts {:10}", self.int_ts);
self.range = range;
self.count = 0;
self.sum = 0.;
self.sumc = 0;
self.minmax = None;
self.items_seen = 0;
self.reset_values(range);
ret
}
@@ -682,13 +685,7 @@ impl<STY: ScalarOps> EventsDim0Aggregator<STY> {
error!("TODO result_reset_time_weight");
err::todoval()
};
self.int_ts = range.beg_u64();
self.range = range;
self.count = 0;
self.sumc = 0;
self.sum = 0.;
self.minmax = None;
self.items_seen = 0;
self.reset_values(range);
ret
}
}

View File

@@ -1,16 +1,20 @@
use crate::eventsdim0::EventsDim0TimeBinner;
use items_0::overlap::HasTimestampDeque;
use items_0::overlap::RangeOverlapInfo;
use items_0::scalar_ops::ScalarOps;
use items_0::timebin::TimeBinnable;
use items_0::AppendEmptyBin;
use items_0::Appendable;
use items_0::Empty;
use items_0::Events;
use items_0::HasNonemptyFirstBin;
use items_0::WithLen;
use netpod::log::*;
use netpod::range::evrange::NanoRange;
use netpod::range::evrange::SeriesRange;
use std::any;
use std::collections::VecDeque;
use std::ops::Range;
#[allow(unused)]
macro_rules! trace_ingest {
@@ -184,3 +188,115 @@ impl TimeBinnerCommonV0Func {
}
}
}
pub trait ChooseIndicesForTimeBin {
fn choose_indices_unweight(&self, beg: u64, end: u64) -> (Option<usize>, usize, usize);
fn choose_indices_timeweight(&self, beg: u64, end: u64) -> (Option<usize>, usize, usize);
}
pub struct ChooseIndicesForTimeBinEvents {}
impl ChooseIndicesForTimeBinEvents {
pub fn choose_unweight(beg: u64, end: u64, tss: &VecDeque<u64>) -> (Option<usize>, usize, usize) {
// TODO improve via binary search.
let mut one_before = None;
let mut j = 0;
let mut k = tss.len();
for (i1, &ts) in tss.iter().enumerate() {
if ts >= end {
break;
} else if ts >= beg {
} else {
one_before = Some(i1);
j = i1 + 1;
}
}
(one_before, j, k)
}
pub fn choose_timeweight(beg: u64, end: u64, tss: &VecDeque<u64>) -> (Option<usize>, usize, usize) {
// TODO improve via binary search.
let mut one_before = None;
let mut j = 0;
let mut k = tss.len();
for (i1, &ts) in tss.iter().enumerate() {
if ts >= end {
trace_ingest!("{self_name} ingest {:6} {:20} {:10?} AFTER", i1, ts, val);
// TODO count all the ignored events for stats
k = i1;
break;
} else if ts >= beg {
trace_ingest!("{self_name} ingest {:6} {:20} {:10?} INSIDE", i1, ts, val);
} else {
trace_ingest!("{self_name} ingest {:6} {:20} {:10?} BEFORE", i1, ts, val);
one_before = Some(i1);
j = i1 + 1;
}
}
(one_before, j, k)
}
}
pub trait TimeAggregatorCommonV0Trait {
type Input: RangeOverlapInfo + ChooseIndicesForTimeBin + 'static;
type Output: WithLen + Empty + AppendEmptyBin + HasNonemptyFirstBin + 'static;
fn type_name() -> &'static str;
fn common_range_current(&self) -> &SeriesRange;
fn common_ingest_unweight_range(&mut self, item: &Self::Input, r: Range<usize>);
fn common_ingest_one_before(&mut self, item: &Self::Input, j: usize);
fn common_ingest_range(&mut self, item: &Self::Input, r: Range<usize>);
}
pub struct TimeAggregatorCommonV0Func {}
impl TimeAggregatorCommonV0Func {
pub fn ingest_unweight<B>(binner: &mut B, item: &B::Input)
where
B: TimeAggregatorCommonV0Trait,
{
let self_name = B::type_name();
trace_ingest!(
"{self_name}::ingest_unweight item len {} items_seen {}",
item.len(),
self.items_seen
);
let rng = B::common_range_current(binner);
if rng.is_time() {
let beg = rng.beg_u64();
let end = rng.end_u64();
let (one_before, j, k) = item.choose_indices_unweight(beg, end);
if let Some(j) = one_before {
//<B as TimeAggregatorCommonV0Trait>::common_ingest_one_before(binner, item, j);
}
<B as TimeAggregatorCommonV0Trait>::common_ingest_unweight_range(binner, item, j..k);
} else {
error!("TODO ingest_unweight for pulse range");
err::todo();
}
}
pub fn ingest_time_weight<B>(binner: &mut B, item: &B::Input)
where
B: TimeAggregatorCommonV0Trait,
{
let self_name = B::type_name();
trace_ingest!(
"{self_name}::ingest_time_weight item len {} items_seen {}",
item.len(),
self.items_seen
);
let rng = B::common_range_current(binner);
if rng.is_time() {
let beg = rng.beg_u64();
let end = rng.end_u64();
let (one_before, j, k) = item.choose_indices_timeweight(beg, end);
if let Some(j) = one_before {
<B as TimeAggregatorCommonV0Trait>::common_ingest_one_before(binner, item, j);
}
<B as TimeAggregatorCommonV0Trait>::common_ingest_range(binner, item, j..k);
} else {
error!("TODO ingest_time_weight for pulse range");
err::todo();
}
}
}