From f20beafd966bb69d38bd2b887fed0ac9234e81d5 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Fri, 14 Apr 2023 15:50:10 +0200 Subject: [PATCH] Add test binned json --- daqbufp2/src/test/api4/binnedjson.rs | 1 + daqbufp2/src/test/api4/eventsjson.rs | 1 + items_0/src/collect_s.rs | 8 +++- items_2/src/binsdim0.rs | 66 ++++++++++++++++------------ items_2/src/binsxbindim0.rs | 10 +++-- items_2/src/channelevents.rs | 6 +++ items_2/src/eventsdim0.rs | 54 ++++++++++++++--------- items_2/src/eventsdim1.rs | 10 +++-- items_2/src/eventsxbindim0.rs | 6 +++ netpod/src/range/binrange.rs | 30 ++++++++++++- streams/src/generators.rs | 10 ++--- streams/src/test/collect.rs | 1 + streams/src/test/timebin.rs | 34 ++++++++++++-- 13 files changed, 169 insertions(+), 68 deletions(-) diff --git a/daqbufp2/src/test/api4/binnedjson.rs b/daqbufp2/src/test/api4/binnedjson.rs index 07b8796..a581f68 100644 --- a/daqbufp2/src/test/api4/binnedjson.rs +++ b/daqbufp2/src/test/api4/binnedjson.rs @@ -5,6 +5,7 @@ use chrono::Utc; use err::Error; use http::StatusCode; use hyper::Body; +use items_0::WithLen; use netpod::log::*; use netpod::range::evrange::NanoRange; use netpod::AppendToUrl; diff --git a/daqbufp2/src/test/api4/eventsjson.rs b/daqbufp2/src/test/api4/eventsjson.rs index 5dc9df0..06fb69d 100644 --- a/daqbufp2/src/test/api4/eventsjson.rs +++ b/daqbufp2/src/test/api4/eventsjson.rs @@ -5,6 +5,7 @@ use chrono::Utc; use err::Error; use http::StatusCode; use hyper::Body; +use items_0::WithLen; use netpod::log::*; use netpod::range::evrange::NanoRange; use netpod::AppendToUrl; diff --git a/items_0/src/collect_s.rs b/items_0/src/collect_s.rs index 3ec455f..671dfdd 100644 --- a/items_0/src/collect_s.rs +++ b/items_0/src/collect_s.rs @@ -49,7 +49,7 @@ impl ToJsonBytes for serde_json::Value { } } -pub trait Collected: fmt::Debug + ToJsonResult + AsAnyRef + Send {} +pub trait Collected: fmt::Debug + Send + AsAnyRef + WithLen + ToJsonResult {} erased_serde::serialize_trait_object!(Collected); @@ -59,6 +59,12 @@ impl ToJsonResult for Box { } } +impl WithLen for Box { + fn len(&self) -> usize { + self.as_ref().len() + } +} + impl Collected for Box {} // TODO rename to `Typed` diff --git a/items_2/src/binsdim0.rs b/items_2/src/binsdim0.rs index bb26084..6b62acf 100644 --- a/items_2/src/binsdim0.rs +++ b/items_2/src/binsdim0.rs @@ -1,7 +1,11 @@ +use crate::ts_offs_from_abs; +use crate::ts_offs_from_abs_with_anchor; use crate::IsoDateTime; use crate::RangeOverlapInfo; use crate::TimeBinnableType; use crate::TimeBinnableTypeAggregator; +use chrono::TimeZone; +use chrono::Utc; use err::Error; use items_0::collect_s::Collectable; use items_0::collect_s::CollectableType; @@ -342,13 +346,15 @@ where } } +impl WithLen for BinsDim0CollectedResult { + fn len(&self) -> usize { + self.mins.len() + } +} + impl Collected for BinsDim0CollectedResult {} impl BinsDim0CollectedResult { - pub fn len(&self) -> usize { - self.ts1_off_ms.len() - } - pub fn ts_anchor_sec(&self) -> u64 { self.ts_anchor_sec } @@ -450,24 +456,29 @@ impl CollectorType for BinsDim0Collector { _range: Option, binrange: Option, ) -> Result { - eprintln!("trying to make a result from {self:?}"); - /*let bin_count_exp = if let Some(r) = &binrange { + trace!("trying to make a result from {self:?}"); + let bin_count_exp = if let Some(r) = &binrange { r.bin_count() as u32 } else { - eprintln!("no binrange given"); + warn!("no binrange given"); 0 }; - let bin_count = self.vals.ts1s.len() as u32; + let mut vals = if let Some(x) = self.vals.take() { + x + } else { + return Err(Error::with_msg_no_trace("BinsDim0Collector without vals")); + }; + let bin_count = vals.ts1s.len() as u32; eprintln!( "-------------- MAKE MISSING BINS bin_count_exp {} bin_count {}", bin_count_exp, bin_count ); let (missing_bins, continue_at, finished_at) = if bin_count < bin_count_exp { - match self.vals.ts2s.back() { + match vals.ts2s.back() { Some(&k) => { let missing_bins = bin_count_exp - bin_count; let continue_at = IsoDateTime(Utc.timestamp_nanos(k as i64)); - let u = k + (k - self.vals.ts1s.back().unwrap()) * missing_bins as u64; + let u = k + (k - vals.ts1s.back().unwrap()) * missing_bins as u64; let finished_at = IsoDateTime(Utc.timestamp_nanos(u as i64)); (missing_bins, Some(continue_at), Some(finished_at)) } @@ -479,24 +490,26 @@ impl CollectorType for BinsDim0Collector { } else { (0, None, None) }; - if self.vals.ts1s.as_slices().1.len() != 0 { - panic!(); + if vals.ts1s.as_slices().1.len() != 0 { + warn!("ts1s non-contiguous"); } - if self.vals.ts2s.as_slices().1.len() != 0 { - panic!(); + if vals.ts2s.as_slices().1.len() != 0 { + warn!("ts2s non-contiguous"); } - let tst1 = ts_offs_from_abs(self.vals.ts1s.as_slices().0); - let tst2 = ts_offs_from_abs_with_anchor(tst1.0, self.vals.ts2s.as_slices().0); - let counts = mem::replace(&mut self.vals.counts, VecDeque::new()); - let mins = mem::replace(&mut self.vals.mins, VecDeque::new()); - let maxs = mem::replace(&mut self.vals.maxs, VecDeque::new()); - let avgs = mem::replace(&mut self.vals.avgs, VecDeque::new()); + let ts1s = vals.ts1s.make_contiguous(); + let ts2s = vals.ts2s.make_contiguous(); + let (ts_anch, ts1ms, ts1ns) = ts_offs_from_abs(ts1s); + let (ts2ms, ts2ns) = ts_offs_from_abs_with_anchor(ts_anch, ts2s); + let counts = vals.counts; + let mins = vals.mins; + let maxs = vals.maxs; + let avgs = vals.avgs; let ret = BinsDim0CollectedResult:: { - ts_anchor_sec: tst1.0, - ts1_off_ms: tst1.1, - ts1_off_ns: tst1.2, - ts2_off_ms: tst2.0, - ts2_off_ns: tst2.1, + ts_anchor_sec: ts_anch, + ts1_off_ms: ts1ms, + ts1_off_ns: ts1ns, + ts2_off_ms: ts2ms, + ts2_off_ns: ts2ns, counts, mins, maxs, @@ -507,8 +520,7 @@ impl CollectorType for BinsDim0Collector { continue_at, finished_at, }; - Ok(ret)*/ - todo!() + Ok(ret) } } diff --git a/items_2/src/binsxbindim0.rs b/items_2/src/binsxbindim0.rs index 64c819d..3f49a5c 100644 --- a/items_2/src/binsxbindim0.rs +++ b/items_2/src/binsxbindim0.rs @@ -307,13 +307,15 @@ where } } +impl WithLen for BinsXbinDim0CollectedResult { + fn len(&self) -> usize { + self.mins.len() + } +} + impl Collected for BinsXbinDim0CollectedResult {} impl BinsXbinDim0CollectedResult { - pub fn len(&self) -> usize { - self.ts1_off_ms.len() - } - pub fn ts_anchor_sec(&self) -> u64 { self.ts_anchor_sec } diff --git a/items_2/src/channelevents.rs b/items_2/src/channelevents.rs index 8c865c2..430e91a 100644 --- a/items_2/src/channelevents.rs +++ b/items_2/src/channelevents.rs @@ -915,6 +915,12 @@ impl AsAnyMut for ChannelEventsCollectorOutput { } } +impl WithLen for ChannelEventsCollectorOutput { + fn len(&self) -> usize { + todo!() + } +} + impl items_0::collect_s::ToJsonResult for ChannelEventsCollectorOutput { fn to_json_result(&self) -> Result, err::Error> { todo!() diff --git a/items_2/src/eventsdim0.rs b/items_2/src/eventsdim0.rs index f3ce108..3c8f62d 100644 --- a/items_2/src/eventsdim0.rs +++ b/items_2/src/eventsdim0.rs @@ -39,10 +39,20 @@ use std::collections::VecDeque; use std::fmt; use std::mem; +#[allow(unused)] +macro_rules! trace_ingest { + ($($arg:tt)*) => { + //let _ = ($($arg)*); + //trace!($($arg)*); + }; +} + #[allow(unused)] macro_rules! trace2 { - (EN$($arg:tt)*) => (); - ($($arg:tt)*) => (trace!($($arg)*)); + ($($arg:tt)*) => { + //let _ = ($($arg)*,); + //trace!($($arg)*); + }; } #[derive(Clone, PartialEq, Serialize, Deserialize)] @@ -258,10 +268,6 @@ pub struct EventsDim0CollectorOutput { } impl EventsDim0CollectorOutput { - pub fn len(&self) -> usize { - self.values.len() - } - pub fn ts_anchor_sec(&self) -> u64 { self.ts_anchor_sec } @@ -337,6 +343,12 @@ where } } +impl WithLen for EventsDim0CollectorOutput { + fn len(&self) -> usize { + self.values.len() + } +} + impl ToJsonResult for EventsDim0CollectorOutput { fn to_json_result(&self) -> Result, Error> { let k = serde_json::to_value(self)?; @@ -498,7 +510,7 @@ impl EventsDim0Aggregator { // TODO reduce clone.. optimize via more traits to factor the trade-offs? fn apply_min_max(&mut self, val: NTY) { - trace!( + trace_ingest!( "apply_min_max val {:?} last_val {:?} count {} sumc {:?} min {:?} max {:?}", val, self.last_seen_val, @@ -522,7 +534,7 @@ impl EventsDim0Aggregator { } fn apply_event_unweight(&mut self, val: NTY) { - trace!("TODO check again result_reset_unweight"); + error!("TODO check again result_reset_unweight"); err::todo(); let vf = val.as_prim_f32_b(); self.apply_min_max(val); @@ -535,7 +547,7 @@ impl EventsDim0Aggregator { fn apply_event_time_weight(&mut self, px: u64, pxbeg: u64) { if let Some(v) = &self.last_seen_val { - trace!("apply_event_time_weight with v {v:?}"); + trace_ingest!("apply_event_time_weight with v {v:?}"); let vf = v.as_prim_f32_b(); let v2 = v.clone(); if px > pxbeg { @@ -558,7 +570,7 @@ impl EventsDim0Aggregator { } fn ingest_unweight(&mut self, item: &::Input) { - trace!("TODO check again result_reset_unweight"); + error!("TODO check again result_reset_unweight"); err::todo(); if self.range.is_time() { for i1 in 0..item.tss.len() { @@ -583,25 +595,25 @@ impl EventsDim0Aggregator { fn ingest_time_weight(&mut self, item: &::Input) { let self_name = any::type_name::(); - trace!("{self_name}::ingest_time_weight item len {}", item.len()); + trace_ingest!("{self_name}::ingest_time_weight item len {}", item.len()); if self.range.is_time() { for i1 in 0..item.tss.len() { let ts = item.tss[i1]; let val = item.values[i1].clone(); if ts < self.int_ts { - trace!("{self_name} ingest {:6} {:20} {:10?} BEFORE", i1, ts, val); + trace_ingest!("{self_name} ingest {:6} {:20} {:10?} BEFORE", i1, ts, val); self.events_ignored_count += 1; self.last_seen_ts = ts; self.last_seen_val = Some(val); } else if ts >= self.range.end_u64() { - trace!("{self_name} ingest {:6} {:20} {:10?} AFTER", i1, ts, val); + trace_ingest!("{self_name} ingest {:6} {:20} {:10?} AFTER", i1, ts, val); self.events_ignored_count += 1; return; } else { - trace!("{self_name} ingest {:6} {:20} {:10?} IN", i1, ts, val); + trace_ingest!("{self_name} ingest {:6} {:20} {:10?} IN", i1, ts, val); if false && self.last_seen_val.is_none() { // TODO no longer needed or? - trace!( + trace_ingest!( "call apply_min_max without last val, use current instead {} {:?}", ts, val @@ -712,11 +724,11 @@ impl TimeBinnableTypeAggregator for EventsDim0Aggregator { fn ingest(&mut self, item: &Self::Input) { if true { - trace!("{} ingest {} events", std::any::type_name::(), item.len()); + trace_ingest!("{} ingest {} events", std::any::type_name::(), item.len()); } if false { for (i, &ts) in item.tss.iter().enumerate() { - trace!("{} ingest {:6} {:20}", std::any::type_name::(), i, ts); + trace_ingest!("{} ingest {:6} {:20}", std::any::type_name::(), i, ts); } } if self.do_time_weight { @@ -1008,19 +1020,19 @@ impl TimeBinner for EventsDim0TimeBinner { .downcast_ref::< as TimeBinnableTypeAggregator>::Input>() { // TODO collect statistics associated with this request: - trace!("{self_name} FEED THE ITEM..."); + trace_ingest!("{self_name} FEED THE ITEM..."); self.agg.ingest(item); if item.ends_after(self.agg.range()) { - trace!("{self_name} FED ITEM, ENDS AFTER."); + trace_ingest!("{self_name} FED ITEM, ENDS AFTER."); self.cycle(); if self.rng.is_none() { warn!("{self_name} no more bin in edges C"); return; } else { - trace!("{self_name} FED ITEM, CYCLED, CONTINUE."); + trace_ingest!("{self_name} FED ITEM, CYCLED, CONTINUE."); } } else { - trace!("{self_name} FED ITEM."); + trace_ingest!("{self_name} FED ITEM."); break; } } else { diff --git a/items_2/src/eventsdim1.rs b/items_2/src/eventsdim1.rs index 14fee48..ed00b18 100644 --- a/items_2/src/eventsdim1.rs +++ b/items_2/src/eventsdim1.rs @@ -220,10 +220,6 @@ pub struct EventsDim1CollectorOutput { } impl EventsDim1CollectorOutput { - pub fn len(&self) -> usize { - self.values.len() - } - pub fn ts_anchor_sec(&self) -> u64 { self.ts_anchor_sec } @@ -275,6 +271,12 @@ where } } +impl WithLen for EventsDim1CollectorOutput { + fn len(&self) -> usize { + self.values.len() + } +} + impl ToJsonResult for EventsDim1CollectorOutput { fn to_json_result(&self) -> Result, Error> { let k = serde_json::to_value(self)?; diff --git a/items_2/src/eventsxbindim0.rs b/items_2/src/eventsxbindim0.rs index 96e8547..08a4272 100644 --- a/items_2/src/eventsxbindim0.rs +++ b/items_2/src/eventsxbindim0.rs @@ -400,6 +400,12 @@ where } } +impl WithLen for EventsXbinDim0CollectorOutput { + fn len(&self) -> usize { + self.mins.len() + } +} + impl ToJsonResult for EventsXbinDim0CollectorOutput where NTY: ScalarOps, diff --git a/netpod/src/range/binrange.rs b/netpod/src/range/binrange.rs index 58e2a68..ff7cd03 100644 --- a/netpod/src/range/binrange.rs +++ b/netpod/src/range/binrange.rs @@ -21,7 +21,7 @@ fn test_binned_range_covering_00() { } let r2 = r.binned_range_time(); let a = r2.edges(); - assert_eq!(a.len(), 11); + assert_eq!(a.len(), 1 + r.bin_count() as usize); assert_eq!(a[0], TsNano((((10 * 60) + 10) * 60 + 0) * SEC)); assert_eq!(a[1], TsNano((((10 * 60) + 11) * 60 + 0) * SEC)); assert_eq!(a[10], TsNano((((10 * 60) + 20) * 60 + 0) * SEC)); @@ -47,7 +47,33 @@ fn test_binned_range_covering_01() { } let r2 = r.binned_range_time(); let a = r2.edges(); - assert_eq!(a.len(), 15); + assert_eq!(a.len(), 1 + r.bin_count() as usize); + assert_eq!(a[0], TsNano((((0 * 60) + 20) * 60 + 0) * SEC)); + assert_eq!(a[1], TsNano((((0 * 60) + 20) * 60 + 5) * SEC)); + assert_eq!(a[14], TsNano((((0 * 60) + 21) * 60 + 10) * SEC)); + let x = r.range_at(0).unwrap(); + let y = SeriesRange::TimeRange(NanoRange { + beg: (((0 * 60) + 20) * 60 + 0) * SEC, + end: (((0 * 60) + 20) * 60 + 5) * SEC, + }); + assert_eq!(x, y); +} + +#[test] +fn test_binned_range_covering_02() { + let range = SeriesRange::TimeRange(NanoRange::from_date_time( + DateTime::parse_from_rfc3339("1970-01-01T00:20:04Z").unwrap().into(), + DateTime::parse_from_rfc3339("1970-01-01T00:22:10Z").unwrap().into(), + )); + let r = BinnedRangeEnum::covering_range(range, 25).unwrap(); + assert_eq!(r.bin_count(), 26); + if let Dim0Kind::Time = r.dim0kind() { + } else { + panic!() + } + let r2 = r.binned_range_time(); + let a = r2.edges(); + assert_eq!(a.len(), 1 + r.bin_count() as usize); assert_eq!(a[0], TsNano((((0 * 60) + 20) * 60 + 0) * SEC)); assert_eq!(a[1], TsNano((((0 * 60) + 20) * 60 + 5) * SEC)); assert_eq!(a[14], TsNano((((0 * 60) + 21) * 60 + 10) * SEC)); diff --git a/streams/src/generators.rs b/streams/src/generators.rs index 049cc4d..ea1bbe1 100644 --- a/streams/src/generators.rs +++ b/streams/src/generators.rs @@ -1,16 +1,12 @@ -use err::Error; use futures_util::Future; use futures_util::FutureExt; use futures_util::Stream; use items_0::container::ByteEstimate; use items_0::streamitem::sitem_data; -use items_0::streamitem::RangeCompletableItem; use items_0::streamitem::Sitemty; -use items_0::streamitem::StreamItem; use items_0::Appendable; use items_0::Empty; use items_2::channelevents::ChannelEvents; -use netpod::log::*; use netpod::range::evrange::SeriesRange; use netpod::timeunits::MS; use std::pin::Pin; @@ -61,7 +57,6 @@ impl GenerateI32 { self.ts = ts; let w = ChannelEvents::Events(Box::new(item) as _); let w = sitem_data(w); - eprintln!("make_batch {w:?}"); w } } @@ -74,6 +69,9 @@ impl Stream for GenerateI32 { loop { break if self.ts >= self.tsend { Ready(None) + } else if false { + // To use the generator without throttling, use this scope + Ready(Some(self.make_batch())) } else if let Some(fut) = self.timeout.as_mut() { match fut.poll_unpin(cx) { Ready(()) => { @@ -83,7 +81,7 @@ impl Stream for GenerateI32 { Pending => Pending, } } else { - self.timeout = Some(Box::pin(tokio::time::sleep(Duration::from_millis(500)))); + self.timeout = Some(Box::pin(tokio::time::sleep(Duration::from_millis(2)))); continue; }; } diff --git a/streams/src/test/collect.rs b/streams/src/test/collect.rs index cf517b0..f756532 100644 --- a/streams/src/test/collect.rs +++ b/streams/src/test/collect.rs @@ -2,6 +2,7 @@ use crate::test::runfut; use err::Error; use futures_util::stream; use items_0::streamitem::sitem_data; +use items_0::WithLen; use items_2::eventsdim0::EventsDim0CollectorOutput; use items_2::testgen::make_some_boxed_d0_f32; use netpod::timeunits::SEC; diff --git a/streams/src/test/timebin.rs b/streams/src/test/timebin.rs index fff449a..faae2bf 100644 --- a/streams/src/test/timebin.rs +++ b/streams/src/test/timebin.rs @@ -22,6 +22,7 @@ use netpod::range::evrange::SeriesRange; use netpod::timeunits::MS; use netpod::timeunits::SEC; use netpod::BinnedRangeEnum; +use serde_json::Value as JsValue; use std::collections::VecDeque; use std::time::Duration; use std::time::Instant; @@ -160,9 +161,11 @@ fn time_bin_02() -> Result<(), Error> { let fut = async { let do_time_weight = true; let deadline = Instant::now() + Duration::from_millis(4000); - let range = nano_range_from_str("1970-01-01T00:20:04Z", "1970-01-01T00:21:10Z")?; + let range = nano_range_from_str("1970-01-01T00:20:04Z", "1970-01-01T00:22:10Z")?; let range = SeriesRange::TimeRange(range); - let min_bin_count = 10; + // TODO add test: 26 bins should result in next higher resolution. + let min_bin_count = 25; + let expected_bin_count = 26; let binned_range = BinnedRangeEnum::covering_range(range.clone(), min_bin_count)?; eprintln!("binned_range: {:?}", binned_range); for i in 0.. { @@ -194,16 +197,41 @@ fn time_bin_02() -> Result<(), Error> { if false { while let Some(e) = binned_stream.next().await { eprintln!("see item {e:?}"); - let x = on_sitemty_data!(e, |e| { + let _x = on_sitemty_data!(e, |e| { // Ok(StreamItem::DataItem(RangeCompletableItem::Data(e))) }); } } else { let res = collect(binned_stream, deadline, 200, None, Some(binned_range)).await?; + assert_eq!(res.len(), expected_bin_count); let d = res.to_json_result()?.to_json_bytes()?; let s = String::from_utf8_lossy(&d); eprintln!("{s}"); + let jsval: JsValue = serde_json::from_slice(&d)?; + { + let counts = jsval.get("counts").unwrap().as_array().unwrap(); + assert_eq!(counts.len(), expected_bin_count); + for v in counts { + assert_eq!(v.as_u64().unwrap(), 5); + } + } + { + let ts1ms = jsval.get("ts1Ms").unwrap().as_array().unwrap(); + let mins = jsval.get("mins").unwrap().as_array().unwrap(); + assert_eq!(mins.len(), expected_bin_count); + for (ts1ms, min) in ts1ms.iter().zip(mins) { + assert_eq!((ts1ms.as_u64().unwrap() / 100) % 1000, min.as_u64().unwrap()); + } + } + { + let ts1ms = jsval.get("ts1Ms").unwrap().as_array().unwrap(); + let maxs = jsval.get("maxs").unwrap().as_array().unwrap(); + assert_eq!(maxs.len(), expected_bin_count); + for (ts1ms, max) in ts1ms.iter().zip(maxs) { + assert_eq!((40 + ts1ms.as_u64().unwrap() / 100) % 1000, max.as_u64().unwrap()); + } + } } Ok(()) };