Refactoring for binned view

This commit is contained in:
Dominik Werder
2022-09-09 20:59:26 +02:00
parent 5205f1f06d
commit 881ff1fa44
5 changed files with 259 additions and 294 deletions

View File

@@ -581,6 +581,8 @@ impl<NTY: ScalarOps> TimeBinner for BinsDim0TimeBinner<NTY> {
}
}
}
fn set_range_complete(&mut self) {}
}
impl<NTY: ScalarOps> TimeBinned for BinsDim0<NTY> {

View File

@@ -32,6 +32,13 @@ impl<NTY> EventsDim0<NTY> {
self.pulses.push_back(pulse);
self.values.push_back(value);
}
#[inline(always)]
pub fn push_front(&mut self, ts: u64, pulse: u64, value: NTY) {
self.tss.push_front(ts);
self.pulses.push_front(pulse);
self.values.push_front(value);
}
}
impl<NTY> Empty for EventsDim0<NTY> {
@@ -751,4 +758,6 @@ impl<NTY: ScalarOps> TimeBinner for EventsDim0TimeBinner<NTY> {
}
}
}
fn set_range_complete(&mut self) {}
}

View File

@@ -23,6 +23,8 @@ use std::time::Instant;
use streams::Collectable;
use streams::ToJsonResult;
use crate::streams::Collector;
pub fn bool_is_false(x: &bool) -> bool {
*x == false
}
@@ -235,6 +237,8 @@ pub trait TimeBinner: Send {
/// to `push_in_progress` did not change the result count, as long as edges are left.
/// The next call to `Self::bins_ready_count` must return one higher count than before.
fn cycle(&mut self);
fn set_range_complete(&mut self);
}
/// Provides a time-binned representation of the implementing type.
@@ -630,7 +634,7 @@ impl ChannelEventsMerger {
Ready(Some(Ok(k))) => {
if let ChannelEvents::Events(events) = &k {
if events.len() == 0 {
eprintln!("ERROR bad events item {events:?}");
warn!("empty events item {events:?}");
} else {
trace!("\nrefilled with events {}\nREFILLED\n{:?}\n\n", events.len(), events);
}
@@ -750,6 +754,7 @@ impl Collectable for Box<dyn Collectable> {
}
}
// TODO handle status information.
pub async fn binned_collected(
scalar_type: ScalarType,
shape: Shape,
@@ -759,8 +764,44 @@ pub async fn binned_collected(
inp: Pin<Box<dyn Stream<Item = Result<ChannelEvents, Error>> + Send>>,
) -> Result<Box<dyn ToJsonResult>, Error> {
let deadline = Instant::now() + timeout;
let mut did_timeout = false;
let bin_count_exp = edges.len().max(2) as u32 - 1;
let do_time_weight = agg_kind.do_time_weighted();
// TODO maybe TimeBinner should take all ChannelEvents and handle this?
let mut did_range_complete = false;
fn flush_binned(
binner: &mut Box<dyn TimeBinner>,
coll: &mut Option<Box<dyn Collector>>,
bin_count_exp: u32,
force: bool,
) -> Result<(), Error> {
//info!("bins_ready_count: {}", binner.bins_ready_count());
if force {
if binner.bins_ready_count() == 0 {
warn!("cycle the binner");
binner.cycle();
} else {
warn!("binner was some ready, do nothing");
}
}
if binner.bins_ready_count() > 0 {
let ready = binner.bins_ready();
match ready {
Some(mut ready) => {
trace!("binned_collected ready {ready:?}");
if coll.is_none() {
*coll = Some(ready.as_collectable_mut().new_collector(bin_count_exp));
}
let cl = coll.as_mut().unwrap();
cl.ingest(ready.as_collectable_mut());
Ok(())
}
None => Err(format!("bins_ready_count but no result").into()),
}
} else {
Ok(())
}
}
let mut coll = None;
let mut binner = None;
let empty_item = empty_events_dyn_2(&scalar_type, &shape, &AggKind::TimeWeightedScalar);
@@ -776,6 +817,7 @@ pub async fn binned_collected(
}
},
_ = tokio::time::sleep_until(deadline.into()).fuse() => {
did_timeout = true;
break;
}
};
@@ -788,51 +830,31 @@ pub async fn binned_collected(
}
let binner = binner.as_mut().unwrap();
binner.ingest(events.as_time_binnable());
trace!("bins_ready_count: {}", binner.bins_ready_count());
if binner.bins_ready_count() > 0 {
let ready = binner.bins_ready();
match ready {
Some(mut ready) => {
trace!("binned_collected ready {ready:?}");
if coll.is_none() {
coll = Some(ready.as_collectable_mut().new_collector(bin_count_exp));
}
let cl = coll.as_mut().unwrap();
cl.ingest(ready.as_collectable_mut());
}
None => {
return Err(format!("bins_ready_count but no result").into());
}
}
}
flush_binned(binner, &mut coll, bin_count_exp, false)?;
}
ChannelEvents::Status(_) => {
trace!("binned_collected TODO Status");
warn!("binned_collected TODO Status");
}
ChannelEvents::RangeComplete => {
trace!("binned_collected TODO RangeComplete");
warn!("binned_collected TODO RangeComplete");
did_range_complete = true;
}
}
}
if let Some(mut binner) = binner {
binner.cycle();
// TODO merge with the same logic above in the loop.
if binner.bins_ready_count() > 0 {
let ready = binner.bins_ready();
match ready {
Some(mut ready) => {
trace!("binned_collected ready {ready:?}");
if coll.is_none() {
coll = Some(ready.as_collectable_mut().new_collector(bin_count_exp));
}
let cl = coll.as_mut().unwrap();
cl.ingest(ready.as_collectable_mut());
}
None => {
return Err(format!("binned_collected bins_ready_count but no result").into());
}
}
if did_range_complete {
binner.set_range_complete();
}
if !did_timeout {
binner.cycle();
}
flush_binned(&mut binner, &mut coll, bin_count_exp, false)?;
if coll.is_none() {
warn!("force a bin");
flush_binned(&mut binner, &mut coll, bin_count_exp, true)?;
}
} else {
error!("no binner, should always have one");
}
match coll {
Some(mut coll) => {