Support for time binning in ingest
This commit is contained in:
@@ -51,7 +51,7 @@ macro_rules! trace4 {
|
|||||||
pub struct BinsXbinDim0<NTY> {
|
pub struct BinsXbinDim0<NTY> {
|
||||||
ts1s: VecDeque<u64>,
|
ts1s: VecDeque<u64>,
|
||||||
ts2s: VecDeque<u64>,
|
ts2s: VecDeque<u64>,
|
||||||
pub counts: VecDeque<u64>,
|
counts: VecDeque<u64>,
|
||||||
mins: VecDeque<NTY>,
|
mins: VecDeque<NTY>,
|
||||||
maxs: VecDeque<NTY>,
|
maxs: VecDeque<NTY>,
|
||||||
avgs: VecDeque<f32>,
|
avgs: VecDeque<f32>,
|
||||||
@@ -106,6 +106,10 @@ impl<NTY: ScalarOps> BinsXbinDim0<NTY> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn counts(&self) -> &VecDeque<u64> {
|
||||||
|
&self.counts
|
||||||
|
}
|
||||||
|
|
||||||
pub fn push(&mut self, ts1: u64, ts2: u64, count: u64, min: NTY, max: NTY, avg: f32) {
|
pub fn push(&mut self, ts1: u64, ts2: u64, count: u64, min: NTY, max: NTY, avg: f32) {
|
||||||
self.ts1s.push_back(ts1);
|
self.ts1s.push_back(ts1);
|
||||||
self.ts2s.push_back(ts2);
|
self.ts2s.push_back(ts2);
|
||||||
@@ -837,7 +841,7 @@ impl<NTY: ScalarOps> TimeBinned for BinsXbinDim0<NTY> {
|
|||||||
|
|
||||||
fn to_simple_bins_f32(&mut self) -> Box<dyn TimeBinned> {
|
fn to_simple_bins_f32(&mut self) -> Box<dyn TimeBinned> {
|
||||||
use mem::replace;
|
use mem::replace;
|
||||||
let ret = BinsXbinDim0::<f32> {
|
let ret = super::binsdim0::BinsDim0::<f32> {
|
||||||
ts1s: replace(&mut self.ts1s, VecDeque::new()),
|
ts1s: replace(&mut self.ts1s, VecDeque::new()),
|
||||||
ts2s: replace(&mut self.ts2s, VecDeque::new()),
|
ts2s: replace(&mut self.ts2s, VecDeque::new()),
|
||||||
counts: replace(&mut self.counts, VecDeque::new()),
|
counts: replace(&mut self.counts, VecDeque::new()),
|
||||||
@@ -850,6 +854,20 @@ impl<NTY: ScalarOps> TimeBinned for BinsXbinDim0<NTY> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn drain_into_tb(&mut self, dst: &mut dyn TimeBinned, range: Range<usize>) -> Result<(), Error> {
|
fn drain_into_tb(&mut self, dst: &mut dyn TimeBinned, range: Range<usize>) -> Result<(), Error> {
|
||||||
todo!()
|
// TODO as_any and as_any_mut are declared on unrelated traits. Simplify.
|
||||||
|
if let Some(dst) = dst.as_any_mut().downcast_mut::<Self>() {
|
||||||
|
// TODO make it harder to forget new members when the struct may get modified in the future
|
||||||
|
dst.ts1s.extend(self.ts1s.drain(range.clone()));
|
||||||
|
dst.ts2s.extend(self.ts2s.drain(range.clone()));
|
||||||
|
dst.counts.extend(self.counts.drain(range.clone()));
|
||||||
|
dst.mins.extend(self.mins.drain(range.clone()));
|
||||||
|
dst.maxs.extend(self.maxs.drain(range.clone()));
|
||||||
|
dst.avgs.extend(self.avgs.drain(range.clone()));
|
||||||
|
Ok(())
|
||||||
|
} else {
|
||||||
|
let type_name = any::type_name::<Self>();
|
||||||
|
error!("downcast to {} FAILED", type_name);
|
||||||
|
Err(Error::with_msg_no_trace(format!("downcast to {} FAILED", type_name)))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -524,7 +524,7 @@ impl<STY: ScalarOps> TimeBinner for EventsXbinDim0TimeBinner<STY> {
|
|||||||
error!("{}::push_in_progress bins.len() {}", Self::type_name(), bins.len());
|
error!("{}::push_in_progress bins.len() {}", Self::type_name(), bins.len());
|
||||||
return;
|
return;
|
||||||
} else {
|
} else {
|
||||||
if push_empty || bins.counts[0] != 0 {
|
if push_empty || bins.counts()[0] != 0 {
|
||||||
match self.ready.as_mut() {
|
match self.ready.as_mut() {
|
||||||
Some(ready) => {
|
Some(ready) => {
|
||||||
ready.append_all_from(&mut bins);
|
ready.append_all_from(&mut bins);
|
||||||
|
|||||||
Reference in New Issue
Block a user