WIP time bin chain test case

This commit is contained in:
Dominik Werder
2023-04-06 13:22:44 +02:00
parent 32efc693f5
commit 6500310cf3
8 changed files with 91 additions and 43 deletions

View File

@@ -776,6 +776,8 @@ impl fmt::Debug for ChannelEventsTimeBinner {
}
}
impl ChannelEventsTimeBinner {}
impl TimeBinnerTy for ChannelEventsTimeBinner {
type Input = ChannelEvents;
type Output = Box<dyn TimeBinned>;
@@ -853,27 +855,30 @@ impl TimeBinner for ChannelEventsTimeBinner {
}
fn bins_ready_count(&self) -> usize {
todo!()
TimeBinnerTy::bins_ready_count(self)
}
fn bins_ready(&mut self) -> Option<Box<dyn TimeBinned>> {
todo!()
TimeBinnerTy::bins_ready(self)
}
fn push_in_progress(&mut self, push_empty: bool) {
todo!()
TimeBinnerTy::push_in_progress(self, push_empty)
}
fn cycle(&mut self) {
todo!()
TimeBinnerTy::cycle(self)
}
fn set_range_complete(&mut self) {
todo!()
TimeBinnerTy::set_range_complete(self)
}
fn empty(&self) -> Box<dyn TimeBinned> {
todo!()
match TimeBinnerTy::empty(self) {
Some(x) => x,
None => panic!("TODO TimeBinner::empty for ChannelEventsTimeBinner"),
}
}
}

View File

@@ -535,6 +535,7 @@ impl<NTY: ScalarOps> EventsDim0Aggregator<NTY> {
fn apply_event_time_weight(&mut self, px: u64, pxbeg: u64) {
if let Some(v) = &self.last_seen_val {
trace!("apply_event_time_weight with v {v:?}");
let vf = v.as_prim_f32_b();
let v2 = v.clone();
if px > pxbeg {
@@ -587,26 +588,23 @@ impl<NTY: ScalarOps> EventsDim0Aggregator<NTY> {
for i1 in 0..item.tss.len() {
let ts = item.tss[i1];
let val = item.values[i1].clone();
trace!("{self_name} ingest {:6} {:20} {:10?}", i1, ts, val);
if ts < self.int_ts {
if self.last_seen_val.is_none() {
info!(
"ingest_time_weight event before range, only set last ts {} val {:?}",
ts, val
);
}
trace!("{self_name} ingest {:6} {:20} {:10?} BEFORE", i1, ts, val);
self.events_ignored_count += 1;
self.last_seen_ts = ts;
self.last_seen_val = Some(val);
} else if ts >= self.range.end_u64() {
trace!("{self_name} ingest {:6} {:20} {:10?} AFTER", i1, ts, val);
self.events_ignored_count += 1;
return;
} else {
trace!("{self_name} ingest {:6} {:20} {:10?} IN", i1, ts, val);
if false && self.last_seen_val.is_none() {
// TODO no longer needed or?
info!(
trace!(
"call apply_min_max without last val, use current instead {} {:?}",
ts, val
ts,
val
);
self.apply_min_max(val.clone());
}
@@ -915,16 +913,16 @@ impl<STY: ScalarOps> Events for EventsDim0<STY> {
}
#[derive(Debug)]
pub struct EventsDim0TimeBinner<NTY: ScalarOps> {
pub struct EventsDim0TimeBinner<STY: ScalarOps> {
binrange: BinnedRangeEnum,
rix: usize,
rng: Option<SeriesRange>,
agg: EventsDim0Aggregator<NTY>,
ready: Option<<EventsDim0Aggregator<NTY> as TimeBinnableTypeAggregator>::Output>,
agg: EventsDim0Aggregator<STY>,
ready: Option<<EventsDim0Aggregator<STY> as TimeBinnableTypeAggregator>::Output>,
range_final: bool,
}
impl<NTY: ScalarOps> EventsDim0TimeBinner<NTY> {
impl<STY: ScalarOps> EventsDim0TimeBinner<STY> {
fn self_name() -> &'static str {
any::type_name::<Self>()
}
@@ -959,7 +957,7 @@ impl<NTY: ScalarOps> EventsDim0TimeBinner<NTY> {
}
}
impl<NTY: ScalarOps> TimeBinner for EventsDim0TimeBinner<NTY> {
impl<STY: ScalarOps> TimeBinner for EventsDim0TimeBinner<STY> {
fn bins_ready_count(&self) -> usize {
match &self.ready {
Some(k) => k.len(),
@@ -1007,7 +1005,7 @@ impl<NTY: ScalarOps> TimeBinner for EventsDim0TimeBinner<NTY> {
if let Some(item) = item
.as_any_ref()
// TODO make statically sure that we attempt to cast to the correct type here:
.downcast_ref::<<EventsDim0Aggregator<NTY> as TimeBinnableTypeAggregator>::Input>()
.downcast_ref::<<EventsDim0Aggregator<STY> as TimeBinnableTypeAggregator>::Input>()
{
// TODO collect statistics associated with this request:
trace!("{self_name} FEED THE ITEM...");
@@ -1110,7 +1108,7 @@ impl<NTY: ScalarOps> TimeBinner for EventsDim0TimeBinner<NTY> {
}
fn empty(&self) -> Box<dyn TimeBinned> {
let ret = <EventsDim0Aggregator<NTY> as TimeBinnableTypeAggregator>::Output::empty();
let ret = <EventsDim0Aggregator<STY> as TimeBinnableTypeAggregator>::Output::empty();
Box::new(ret)
}
}

View File

@@ -1563,7 +1563,10 @@ where
beg: self.offset * self.grid_spec.bin_t_len,
end: (self.offset + self.bin_count) * self.grid_spec.bin_t_len,
}*/
err::todoval()
let beg = self.bin_len.times(self.bin_off).as_u64();
let end = self.bin_len.times(self.bin_off + self.bin_cnt).as_u64();
warn!("TODO make generic for pulse");
NanoRange { beg, end }
}
pub fn edges_u64(&self) -> Vec<u64> {
@@ -1641,7 +1644,7 @@ impl BinnedRangeEnum {
match self {
BinnedRangeEnum::Time(k) => {
if (i as u64) < k.bin_cnt {
let beg = k.bin_off + k.bin_len.0 * i as u64;
let beg = k.bin_len.0 * (k.bin_off + i as u64);
let x = SeriesRange::TimeRange(NanoRange {
beg,
end: beg + k.bin_len.0,
@@ -1653,7 +1656,7 @@ impl BinnedRangeEnum {
}
BinnedRangeEnum::Pulse(k) => {
if (i as u64) < k.bin_cnt {
let beg = k.bin_off + k.bin_len.0 * i as u64;
let beg = k.bin_len.0 * (k.bin_off + i as u64);
let x = SeriesRange::PulseRange(PulseRange {
beg,
end: beg + k.bin_len.0,

View File

@@ -25,4 +25,36 @@ fn test_binned_range_covering_00() {
assert_eq!(a[0], TsNano((((10 * 60) + 10) * 60 + 0) * SEC));
assert_eq!(a[1], TsNano((((10 * 60) + 11) * 60 + 0) * SEC));
assert_eq!(a[10], TsNano((((10 * 60) + 20) * 60 + 0) * SEC));
let x = r.range_at(2).unwrap();
let y = SeriesRange::TimeRange(NanoRange {
beg: (((10 * 60) + 12) * 60 + 0) * SEC,
end: (((10 * 60) + 13) * 60 + 0) * SEC,
});
assert_eq!(x, y);
}
#[test]
fn test_binned_range_covering_01() {
let range = SeriesRange::TimeRange(NanoRange::from_date_time(
DateTime::parse_from_rfc3339("1970-01-01T00:20:04Z").unwrap().into(),
DateTime::parse_from_rfc3339("1970-01-01T00:21:10Z").unwrap().into(),
));
let r = BinnedRangeEnum::covering_range(range, 9).unwrap();
assert_eq!(r.bin_count(), 14);
if let Dim0Kind::Time = r.dim0kind() {
} else {
panic!()
}
let r2 = r.binned_range_time();
let a = r2.edges();
assert_eq!(a.len(), 15);
assert_eq!(a[0], TsNano((((0 * 60) + 20) * 60 + 0) * SEC));
assert_eq!(a[1], TsNano((((0 * 60) + 20) * 60 + 5) * SEC));
assert_eq!(a[14], TsNano((((0 * 60) + 21) * 60 + 10) * SEC));
let x = r.range_at(0).unwrap();
let y = SeriesRange::TimeRange(NanoRange {
beg: (((0 * 60) + 20) * 60 + 0) * SEC,
end: (((0 * 60) + 20) * 60 + 5) * SEC,
});
assert_eq!(x, y);
}

View File

@@ -107,8 +107,8 @@ impl SeriesRange {
pub fn end_u64(&self) -> u64 {
match self {
SeriesRange::TimeRange(x) => x.beg,
SeriesRange::PulseRange(x) => x.beg,
SeriesRange::TimeRange(x) => x.end,
SeriesRange::PulseRange(x) => x.end,
}
}

View File

@@ -22,6 +22,8 @@ pub struct GenerateI32 {
ts: u64,
dts: u64,
tsend: u64,
#[allow(unused)]
c1: u64,
timeout: Option<Pin<Box<dyn Future<Output = ()> + Send>>>,
}
@@ -38,6 +40,7 @@ impl GenerateI32 {
ts,
dts,
tsend,
c1: 0,
timeout: None,
}
}
@@ -51,12 +54,14 @@ impl GenerateI32 {
break;
}
let pulse = ts;
item.push(ts, pulse, pulse as T);
let value = (ts / (MS * 100) % 1000) as T;
item.push(ts, pulse, value);
ts += self.dts;
}
self.ts = ts;
let w = ChannelEvents::Events(Box::new(item) as _);
let w = sitem_data(w);
eprintln!("make_batch {w:?}");
w
}
}

View File

@@ -172,8 +172,10 @@ fn time_bin_02() -> Result<(), Error> {
break;
}
}
let event_range = binned_range.binned_range_time().full_range();
let series_range = SeriesRange::TimeRange(event_range);
// TODO the test stream must be able to generate also one-before (on demand) and RangeComplete (by default).
let stream = GenerateI32::new(0, 1, range);
let stream = GenerateI32::new(0, 1, series_range);
// TODO apply first some box dyn EventTransform which later is provided by TransformQuery.
// Then the Merge will happen always by default for backends where this is needed.
// TODO then apply the transform chain for the after-merged-stream.
@@ -189,17 +191,20 @@ fn time_bin_02() -> Result<(), Error> {
// From there on it should no longer be neccessary to distinguish whether its still events or time bins.
// Then, optionally collect for output type like json, or stream as batches.
// TODO the timebinner should already provide batches to make this efficient.
while let Some(e) = binned_stream.next().await {
eprintln!("see item {e:?}");
let x = on_sitemty_data!(e, |e| {
//
Ok(StreamItem::DataItem(RangeCompletableItem::Data(e)))
});
if false {
while let Some(e) = binned_stream.next().await {
eprintln!("see item {e:?}");
let x = on_sitemty_data!(e, |e| {
//
Ok(StreamItem::DataItem(RangeCompletableItem::Data(e)))
});
}
} else {
let res = collect(binned_stream, deadline, 200, None, Some(binned_range)).await?;
let d = res.to_json_result()?.to_json_bytes()?;
let s = String::from_utf8_lossy(&d);
eprintln!("{s}");
}
/*let res = collect(binned_stream, deadline, 200, None, Some(binned_range)).await?;
let d = res.to_json_result()?.to_json_bytes()?;
let s = String::from_utf8_lossy(&d);
eprintln!("{s}");*/
Ok(())
};
runfut(fut)

View File

@@ -192,16 +192,16 @@ where
}
},
Ready(None) => {
trace2!("finish up");
trace!("finish up");
let self_range_complete = self.range_complete;
if let Some(binner) = self.binner.as_mut() {
trace2!("bins ready count before finish {}", binner.bins_ready_count());
trace!("bins ready count before finish {}", binner.bins_ready_count());
// TODO rework the finish logic
if self_range_complete {
binner.set_range_complete();
}
binner.push_in_progress(false);
trace2!("bins ready count after finish {}", binner.bins_ready_count());
trace!("bins ready count after finish {}", binner.bins_ready_count());
if binner.bins_ready_count() > 0 {
if let Some(bins) = binner.bins_ready() {
self.done_data = true;
@@ -226,7 +226,7 @@ where
}
}
} else {
trace2!("input stream finished, still no binner");
trace!("input stream finished, still no binner");
self.done_data = true;
continue;
}