diff --git a/items_2/src/channelevents.rs b/items_2/src/channelevents.rs index 160fc2f..8c865c2 100644 --- a/items_2/src/channelevents.rs +++ b/items_2/src/channelevents.rs @@ -776,6 +776,8 @@ impl fmt::Debug for ChannelEventsTimeBinner { } } +impl ChannelEventsTimeBinner {} + impl TimeBinnerTy for ChannelEventsTimeBinner { type Input = ChannelEvents; type Output = Box; @@ -853,27 +855,30 @@ impl TimeBinner for ChannelEventsTimeBinner { } fn bins_ready_count(&self) -> usize { - todo!() + TimeBinnerTy::bins_ready_count(self) } fn bins_ready(&mut self) -> Option> { - todo!() + TimeBinnerTy::bins_ready(self) } fn push_in_progress(&mut self, push_empty: bool) { - todo!() + TimeBinnerTy::push_in_progress(self, push_empty) } fn cycle(&mut self) { - todo!() + TimeBinnerTy::cycle(self) } fn set_range_complete(&mut self) { - todo!() + TimeBinnerTy::set_range_complete(self) } fn empty(&self) -> Box { - todo!() + match TimeBinnerTy::empty(self) { + Some(x) => x, + None => panic!("TODO TimeBinner::empty for ChannelEventsTimeBinner"), + } } } diff --git a/items_2/src/eventsdim0.rs b/items_2/src/eventsdim0.rs index e8421e6..f3ce108 100644 --- a/items_2/src/eventsdim0.rs +++ b/items_2/src/eventsdim0.rs @@ -535,6 +535,7 @@ impl EventsDim0Aggregator { fn apply_event_time_weight(&mut self, px: u64, pxbeg: u64) { if let Some(v) = &self.last_seen_val { + trace!("apply_event_time_weight with v {v:?}"); let vf = v.as_prim_f32_b(); let v2 = v.clone(); if px > pxbeg { @@ -587,26 +588,23 @@ impl EventsDim0Aggregator { for i1 in 0..item.tss.len() { let ts = item.tss[i1]; let val = item.values[i1].clone(); - trace!("{self_name} ingest {:6} {:20} {:10?}", i1, ts, val); if ts < self.int_ts { - if self.last_seen_val.is_none() { - info!( - "ingest_time_weight event before range, only set last ts {} val {:?}", - ts, val - ); - } + trace!("{self_name} ingest {:6} {:20} {:10?} BEFORE", i1, ts, val); self.events_ignored_count += 1; self.last_seen_ts = ts; self.last_seen_val = Some(val); } else if ts >= self.range.end_u64() { + trace!("{self_name} ingest {:6} {:20} {:10?} AFTER", i1, ts, val); self.events_ignored_count += 1; return; } else { + trace!("{self_name} ingest {:6} {:20} {:10?} IN", i1, ts, val); if false && self.last_seen_val.is_none() { // TODO no longer needed or? - info!( + trace!( "call apply_min_max without last val, use current instead {} {:?}", - ts, val + ts, + val ); self.apply_min_max(val.clone()); } @@ -915,16 +913,16 @@ impl Events for EventsDim0 { } #[derive(Debug)] -pub struct EventsDim0TimeBinner { +pub struct EventsDim0TimeBinner { binrange: BinnedRangeEnum, rix: usize, rng: Option, - agg: EventsDim0Aggregator, - ready: Option< as TimeBinnableTypeAggregator>::Output>, + agg: EventsDim0Aggregator, + ready: Option< as TimeBinnableTypeAggregator>::Output>, range_final: bool, } -impl EventsDim0TimeBinner { +impl EventsDim0TimeBinner { fn self_name() -> &'static str { any::type_name::() } @@ -959,7 +957,7 @@ impl EventsDim0TimeBinner { } } -impl TimeBinner for EventsDim0TimeBinner { +impl TimeBinner for EventsDim0TimeBinner { fn bins_ready_count(&self) -> usize { match &self.ready { Some(k) => k.len(), @@ -1007,7 +1005,7 @@ impl TimeBinner for EventsDim0TimeBinner { if let Some(item) = item .as_any_ref() // TODO make statically sure that we attempt to cast to the correct type here: - .downcast_ref::< as TimeBinnableTypeAggregator>::Input>() + .downcast_ref::< as TimeBinnableTypeAggregator>::Input>() { // TODO collect statistics associated with this request: trace!("{self_name} FEED THE ITEM..."); @@ -1110,7 +1108,7 @@ impl TimeBinner for EventsDim0TimeBinner { } fn empty(&self) -> Box { - let ret = as TimeBinnableTypeAggregator>::Output::empty(); + let ret = as TimeBinnableTypeAggregator>::Output::empty(); Box::new(ret) } } diff --git a/netpod/src/netpod.rs b/netpod/src/netpod.rs index c3c9b0a..b29fdab 100644 --- a/netpod/src/netpod.rs +++ b/netpod/src/netpod.rs @@ -1563,7 +1563,10 @@ where beg: self.offset * self.grid_spec.bin_t_len, end: (self.offset + self.bin_count) * self.grid_spec.bin_t_len, }*/ - err::todoval() + let beg = self.bin_len.times(self.bin_off).as_u64(); + let end = self.bin_len.times(self.bin_off + self.bin_cnt).as_u64(); + warn!("TODO make generic for pulse"); + NanoRange { beg, end } } pub fn edges_u64(&self) -> Vec { @@ -1641,7 +1644,7 @@ impl BinnedRangeEnum { match self { BinnedRangeEnum::Time(k) => { if (i as u64) < k.bin_cnt { - let beg = k.bin_off + k.bin_len.0 * i as u64; + let beg = k.bin_len.0 * (k.bin_off + i as u64); let x = SeriesRange::TimeRange(NanoRange { beg, end: beg + k.bin_len.0, @@ -1653,7 +1656,7 @@ impl BinnedRangeEnum { } BinnedRangeEnum::Pulse(k) => { if (i as u64) < k.bin_cnt { - let beg = k.bin_off + k.bin_len.0 * i as u64; + let beg = k.bin_len.0 * (k.bin_off + i as u64); let x = SeriesRange::PulseRange(PulseRange { beg, end: beg + k.bin_len.0, diff --git a/netpod/src/range/binrange.rs b/netpod/src/range/binrange.rs index d118eab..58e2a68 100644 --- a/netpod/src/range/binrange.rs +++ b/netpod/src/range/binrange.rs @@ -25,4 +25,36 @@ fn test_binned_range_covering_00() { assert_eq!(a[0], TsNano((((10 * 60) + 10) * 60 + 0) * SEC)); assert_eq!(a[1], TsNano((((10 * 60) + 11) * 60 + 0) * SEC)); assert_eq!(a[10], TsNano((((10 * 60) + 20) * 60 + 0) * SEC)); + let x = r.range_at(2).unwrap(); + let y = SeriesRange::TimeRange(NanoRange { + beg: (((10 * 60) + 12) * 60 + 0) * SEC, + end: (((10 * 60) + 13) * 60 + 0) * SEC, + }); + assert_eq!(x, y); +} + +#[test] +fn test_binned_range_covering_01() { + let range = SeriesRange::TimeRange(NanoRange::from_date_time( + DateTime::parse_from_rfc3339("1970-01-01T00:20:04Z").unwrap().into(), + DateTime::parse_from_rfc3339("1970-01-01T00:21:10Z").unwrap().into(), + )); + let r = BinnedRangeEnum::covering_range(range, 9).unwrap(); + assert_eq!(r.bin_count(), 14); + if let Dim0Kind::Time = r.dim0kind() { + } else { + panic!() + } + let r2 = r.binned_range_time(); + let a = r2.edges(); + assert_eq!(a.len(), 15); + assert_eq!(a[0], TsNano((((0 * 60) + 20) * 60 + 0) * SEC)); + assert_eq!(a[1], TsNano((((0 * 60) + 20) * 60 + 5) * SEC)); + assert_eq!(a[14], TsNano((((0 * 60) + 21) * 60 + 10) * SEC)); + let x = r.range_at(0).unwrap(); + let y = SeriesRange::TimeRange(NanoRange { + beg: (((0 * 60) + 20) * 60 + 0) * SEC, + end: (((0 * 60) + 20) * 60 + 5) * SEC, + }); + assert_eq!(x, y); } diff --git a/netpod/src/range/evrange.rs b/netpod/src/range/evrange.rs index 7ea38ad..a0baf02 100644 --- a/netpod/src/range/evrange.rs +++ b/netpod/src/range/evrange.rs @@ -107,8 +107,8 @@ impl SeriesRange { pub fn end_u64(&self) -> u64 { match self { - SeriesRange::TimeRange(x) => x.beg, - SeriesRange::PulseRange(x) => x.beg, + SeriesRange::TimeRange(x) => x.end, + SeriesRange::PulseRange(x) => x.end, } } diff --git a/streams/src/generators.rs b/streams/src/generators.rs index 1b7366d..049cc4d 100644 --- a/streams/src/generators.rs +++ b/streams/src/generators.rs @@ -22,6 +22,8 @@ pub struct GenerateI32 { ts: u64, dts: u64, tsend: u64, + #[allow(unused)] + c1: u64, timeout: Option + Send>>>, } @@ -38,6 +40,7 @@ impl GenerateI32 { ts, dts, tsend, + c1: 0, timeout: None, } } @@ -51,12 +54,14 @@ impl GenerateI32 { break; } let pulse = ts; - item.push(ts, pulse, pulse as T); + let value = (ts / (MS * 100) % 1000) as T; + item.push(ts, pulse, value); ts += self.dts; } self.ts = ts; let w = ChannelEvents::Events(Box::new(item) as _); let w = sitem_data(w); + eprintln!("make_batch {w:?}"); w } } diff --git a/streams/src/test/timebin.rs b/streams/src/test/timebin.rs index d80b6bf..fff449a 100644 --- a/streams/src/test/timebin.rs +++ b/streams/src/test/timebin.rs @@ -172,8 +172,10 @@ fn time_bin_02() -> Result<(), Error> { break; } } + let event_range = binned_range.binned_range_time().full_range(); + let series_range = SeriesRange::TimeRange(event_range); // TODO the test stream must be able to generate also one-before (on demand) and RangeComplete (by default). - let stream = GenerateI32::new(0, 1, range); + let stream = GenerateI32::new(0, 1, series_range); // TODO apply first some box dyn EventTransform which later is provided by TransformQuery. // Then the Merge will happen always by default for backends where this is needed. // TODO then apply the transform chain for the after-merged-stream. @@ -189,17 +191,20 @@ fn time_bin_02() -> Result<(), Error> { // From there on it should no longer be neccessary to distinguish whether its still events or time bins. // Then, optionally collect for output type like json, or stream as batches. // TODO the timebinner should already provide batches to make this efficient. - while let Some(e) = binned_stream.next().await { - eprintln!("see item {e:?}"); - let x = on_sitemty_data!(e, |e| { - // - Ok(StreamItem::DataItem(RangeCompletableItem::Data(e))) - }); + if false { + while let Some(e) = binned_stream.next().await { + eprintln!("see item {e:?}"); + let x = on_sitemty_data!(e, |e| { + // + Ok(StreamItem::DataItem(RangeCompletableItem::Data(e))) + }); + } + } else { + let res = collect(binned_stream, deadline, 200, None, Some(binned_range)).await?; + let d = res.to_json_result()?.to_json_bytes()?; + let s = String::from_utf8_lossy(&d); + eprintln!("{s}"); } - /*let res = collect(binned_stream, deadline, 200, None, Some(binned_range)).await?; - let d = res.to_json_result()?.to_json_bytes()?; - let s = String::from_utf8_lossy(&d); - eprintln!("{s}");*/ Ok(()) }; runfut(fut) diff --git a/streams/src/timebin.rs b/streams/src/timebin.rs index f62b2bb..d189677 100644 --- a/streams/src/timebin.rs +++ b/streams/src/timebin.rs @@ -192,16 +192,16 @@ where } }, Ready(None) => { - trace2!("finish up"); + trace!("finish up"); let self_range_complete = self.range_complete; if let Some(binner) = self.binner.as_mut() { - trace2!("bins ready count before finish {}", binner.bins_ready_count()); + trace!("bins ready count before finish {}", binner.bins_ready_count()); // TODO rework the finish logic if self_range_complete { binner.set_range_complete(); } binner.push_in_progress(false); - trace2!("bins ready count after finish {}", binner.bins_ready_count()); + trace!("bins ready count after finish {}", binner.bins_ready_count()); if binner.bins_ready_count() > 0 { if let Some(bins) = binner.bins_ready() { self.done_data = true; @@ -226,7 +226,7 @@ where } } } else { - trace2!("input stream finished, still no binner"); + trace!("input stream finished, still no binner"); self.done_data = true; continue; }