use crate::timebin::TimeBinnerCommonV0Func; use crate::timebin::TimeBinnerCommonV0Trait; use crate::ts_offs_from_abs; use crate::ts_offs_from_abs_with_anchor; use crate::IsoDateTime; use crate::RangeOverlapInfo; use crate::TimeBinnableType; use crate::TimeBinnableTypeAggregator; use chrono::TimeZone; use chrono::Utc; use err::Error; use items_0::collect_s::CollectableDyn; use items_0::collect_s::CollectableType; use items_0::collect_s::CollectedDyn; use items_0::collect_s::CollectorTy; use items_0::collect_s::ToJsonResult; use items_0::container::ByteEstimate; use items_0::overlap::HasTimestampDeque; use items_0::scalar_ops::AsPrimF32; use items_0::scalar_ops::ScalarOps; use items_0::timebin::TimeBinnable; use items_0::timebin::TimeBinnableTy; use items_0::timebin::TimeBinned; use items_0::timebin::TimeBinner; use items_0::timebin::TimeBinnerTy; use items_0::timebin::TimeBins; use items_0::vecpreview::VecPreview; use items_0::AppendAllFrom; use items_0::AppendEmptyBin; use items_0::AsAnyMut; use items_0::AsAnyRef; use items_0::Empty; use items_0::HasNonemptyFirstBin; use items_0::Resettable; use items_0::TypeName; use items_0::WithLen; use netpod::is_false; use netpod::log::*; use netpod::range::evrange::SeriesRange; use netpod::timeunits::SEC; use netpod::BinnedRange; use netpod::BinnedRangeEnum; use netpod::CmpZero; use netpod::Dim0Kind; use netpod::TsNano; use serde::Deserialize; use serde::Serialize; use std::any; use std::any::Any; use std::collections::VecDeque; use std::fmt; use std::mem; use std::ops::Range; #[allow(unused)] macro_rules! trace_ingest { ($($arg:tt)*) => ( if true { trace!($($arg)*); }) } // TODO make members private #[derive(Clone, PartialEq, Serialize, Deserialize)] pub struct BinsDim0 { pub ts1s: VecDeque, pub ts2s: VecDeque, pub cnts: VecDeque, pub mins: VecDeque, pub maxs: VecDeque, pub avgs: VecDeque, pub lsts: VecDeque, pub dim0kind: Option, } impl TypeName for BinsDim0 { fn type_name(&self) -> String { any::type_name::().into() } } impl fmt::Debug for BinsDim0 where NTY: fmt::Debug, { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { let self_name = any::type_name::(); if true { return fmt::Display::fmt(self, fmt); } if true { write!( fmt, "{self_name} count {} ts1s {:?} ts2s {:?} counts {:?} mins {:?} maxs {:?} avgs {:?}", self.ts1s.len(), self.ts1s.iter().map(|k| k / SEC).collect::>(), self.ts2s.iter().map(|k| k / SEC).collect::>(), self.cnts, self.mins, self.maxs, self.avgs, ) } else { write!( fmt, "{self_name} count {} edges {:?} .. {:?} counts {:?} .. {:?} avgs {:?} .. {:?}", self.ts1s.len(), self.ts1s.front().map(|k| k / SEC), self.ts2s.back().map(|k| k / SEC), self.cnts.front(), self.cnts.back(), self.avgs.front(), self.avgs.back(), ) } } } impl fmt::Display for BinsDim0 where NTY: fmt::Debug, { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { let self_name = any::type_name::(); write!( fmt, "{self_name} {{ len: {:?}, ts1s: {:?}, ts2s {:?}, counts {:?}, mins {:?}, maxs {:?}, avgs {:?}, lsts {:?} }}", self.len(), VecPreview::new(&self.ts1s), VecPreview::new(&self.ts2s), VecPreview::new(&self.cnts), VecPreview::new(&self.mins), VecPreview::new(&self.maxs), VecPreview::new(&self.avgs), VecPreview::new(&self.lsts), ) } } impl BinsDim0 { pub fn push(&mut self, ts1: u64, ts2: u64, count: u64, min: NTY, max: NTY, avg: f32, lst: NTY) { if avg < min.as_prim_f32_b() || avg > max.as_prim_f32_b() { // TODO rounding issues? debug!("bad avg"); } self.ts1s.push_back(ts1); self.ts2s.push_back(ts2); self.cnts.push_back(count); self.mins.push_back(min); self.maxs.push_back(max); self.avgs.push_back(avg); self.lsts.push_back(lst); } pub fn equal_slack(&self, other: &Self) -> bool { if self.len() != other.len() { return false; } for (&a, &b) in self.ts1s.iter().zip(other.ts1s.iter()) { if a != b { return false; } } for (&a, &b) in self.ts2s.iter().zip(other.ts2s.iter()) { if a != b { return false; } } for (a, b) in self.mins.iter().zip(other.mins.iter()) { if !a.equal_slack(b) { return false; } } for (a, b) in self.maxs.iter().zip(other.maxs.iter()) { if !a.equal_slack(b) { return false; } } for (a, b) in self.avgs.iter().zip(other.avgs.iter()) { if !a.equal_slack(b) { return false; } } true } // TODO make this part of a new bins trait, similar like Events trait. // TODO check for error? pub fn drain_into(&mut self, dst: &mut Self, range: Range) -> () { dst.ts1s.extend(self.ts1s.drain(range.clone())); dst.ts2s.extend(self.ts2s.drain(range.clone())); dst.cnts.extend(self.cnts.drain(range.clone())); dst.mins.extend(self.mins.drain(range.clone())); dst.maxs.extend(self.maxs.drain(range.clone())); dst.avgs.extend(self.avgs.drain(range.clone())); dst.lsts.extend(self.lsts.drain(range.clone())); } } impl AsAnyRef for BinsDim0 where NTY: ScalarOps, { fn as_any_ref(&self) -> &dyn Any { self } } impl AsAnyMut for BinsDim0 where STY: ScalarOps, { fn as_any_mut(&mut self) -> &mut dyn Any { self } } impl Empty for BinsDim0 { fn empty() -> Self { Self { ts1s: VecDeque::new(), ts2s: VecDeque::new(), cnts: VecDeque::new(), mins: VecDeque::new(), maxs: VecDeque::new(), avgs: VecDeque::new(), lsts: VecDeque::new(), dim0kind: None, } } } impl WithLen for BinsDim0 { fn len(&self) -> usize { self.ts1s.len() } } impl ByteEstimate for BinsDim0 { fn byte_estimate(&self) -> u64 { // TODO // Should use a better estimate for waveform and string types, // or keep some aggregated byte count on push. let n = self.len(); if n == 0 { 0 } else { // TODO use the actual size of one/some of the elements. let i = n * 2 / 3; let w1 = self.mins[i].byte_estimate(); let w2 = self.maxs[i].byte_estimate(); (n as u64 * (8 + 8 + 8 + 4 + w1 + w2)) as u64 } } } impl Resettable for BinsDim0 { fn reset(&mut self) { self.ts1s.clear(); self.ts2s.clear(); self.cnts.clear(); self.mins.clear(); self.maxs.clear(); self.avgs.clear(); self.lsts.clear(); } } impl HasNonemptyFirstBin for BinsDim0 { fn has_nonempty_first_bin(&self) -> bool { self.cnts.front().map_or(false, |x| *x > 0) } } impl HasTimestampDeque for BinsDim0 { fn timestamp_min(&self) -> Option { self.ts1s.front().map(|x| *x) } fn timestamp_max(&self) -> Option { self.ts2s.back().map(|x| *x) } fn pulse_min(&self) -> Option { todo!() } fn pulse_max(&self) -> Option { todo!() } } items_0::impl_range_overlap_info_bins!(BinsDim0); impl AppendEmptyBin for BinsDim0 { fn append_empty_bin(&mut self, ts1: u64, ts2: u64) { debug!("AppendEmptyBin::append_empty_bin should not get used"); self.ts1s.push_back(ts1); self.ts2s.push_back(ts2); self.cnts.push_back(0); self.mins.push_back(NTY::zero_b()); self.maxs.push_back(NTY::zero_b()); self.avgs.push_back(0.); self.lsts.push_back(NTY::zero_b()); } } impl AppendAllFrom for BinsDim0 { fn append_all_from(&mut self, src: &mut Self) { debug!("AppendAllFrom::append_all_from should not get used"); self.ts1s.extend(src.ts1s.drain(..)); self.ts2s.extend(src.ts2s.drain(..)); self.cnts.extend(src.cnts.drain(..)); self.mins.extend(src.mins.drain(..)); self.maxs.extend(src.maxs.drain(..)); self.avgs.extend(src.avgs.drain(..)); self.lsts.extend(src.lsts.drain(..)); } } impl TimeBins for BinsDim0 { fn ts_min(&self) -> Option { self.ts1s.front().map(Clone::clone) } fn ts_max(&self) -> Option { self.ts2s.back().map(Clone::clone) } fn ts_min_max(&self) -> Option<(u64, u64)> { if let (Some(min), Some(max)) = (self.ts1s.front().map(Clone::clone), self.ts2s.back().map(Clone::clone)) { Some((min, max)) } else { None } } } impl TimeBinnableType for BinsDim0 { type Output = BinsDim0; type Aggregator = BinsDim0Aggregator; fn aggregator(range: SeriesRange, x_bin_count: usize, do_time_weight: bool) -> Self::Aggregator { let self_name = any::type_name::(); debug!( "TimeBinnableType for {self_name} aggregator() range {:?} x_bin_count {} do_time_weight {}", range, x_bin_count, do_time_weight ); //Self::Aggregator::new(range, do_time_weight) todo!() } } #[derive(Debug)] pub struct BinsDim0TimeBinnerTy where STY: ScalarOps, { ts1now: TsNano, ts2now: TsNano, binrange: BinnedRange, do_time_weight: bool, emit_empty_bins: bool, range_complete: bool, out: ::Output, cnt: u64, min: STY, max: STY, avg: f64, lst: STY, filled_up_to: TsNano, last_seen_avg: f32, } impl BinsDim0TimeBinnerTy where STY: ScalarOps, { pub fn type_name() -> &'static str { any::type_name::() } pub fn new(binrange: BinnedRange, do_time_weight: bool, emit_empty_bins: bool) -> Self { // let ts1now = TsNano::from_ns(binrange.bin_off * binrange.bin_len.ns()); // let ts2 = ts1.add_dt_nano(binrange.bin_len.to_dt_nano()); let ts1now = TsNano::from_ns(binrange.nano_beg().ns()); let ts2now = ts1now.add_dt_nano(binrange.bin_len.to_dt_nano()); Self { ts1now, ts2now, binrange, do_time_weight, emit_empty_bins, range_complete: false, out: ::Output::empty(), cnt: 0, min: STY::zero_b(), max: STY::zero_b(), avg: 0., lst: STY::zero_b(), filled_up_to: ts1now, last_seen_avg: 0., } } // used internally for the aggregation fn reset_agg(&mut self) { self.cnt = 0; self.min = STY::zero_b(); self.max = STY::zero_b(); self.avg = 0.; } } impl TimeBinnerTy for BinsDim0TimeBinnerTy where STY: ScalarOps, { type Input = BinsDim0; type Output = BinsDim0; fn ingest(&mut self, item: &mut Self::Input) { trace_ingest!("<{} as TimeBinnerTy>::ingest {:?}", Self::type_name(), item); let mut count_before = 0; for ((((((&ts1, &ts2), &cnt), min), max), &avg), lst) in item .ts1s .iter() .zip(&item.ts2s) .zip(&item.cnts) .zip(&item.mins) .zip(&item.maxs) .zip(&item.avgs) .zip(&item.lsts) { if ts1 < self.ts1now.ns() { if ts2 > self.ts1now.ns() { error!("{} bad input grid mismatch", Self::type_name()); continue; } // warn!("encountered bin from time before {} {}", ts1, self.ts1now.ns()); trace_ingest!("{} input bin before {}", Self::type_name(), TsNano::from_ns(ts1)); self.min = min.clone(); self.max = max.clone(); self.lst = lst.clone(); count_before += 1; continue; } else { if ts2 > self.ts2now.ns() { if ts2 - ts1 > self.ts2now.ns() - self.ts1now.ns() { panic!("incoming bin len too large"); } else if ts1 < self.ts2now.ns() { panic!("encountered unaligned input bin"); } else { let mut i = 0; while ts1 >= self.ts2now.ns() { self.cycle(); i += 1; if i > 50000 { panic!("cycle forward too many iterations"); } } } } else { // ok, we're still inside the current bin } } if cnt == 0 { // ignore input bin, it does not contain any valid information. } else { if self.cnt == 0 { self.cnt = cnt; self.min = min.clone(); self.max = max.clone(); if self.do_time_weight { let f = (ts2 - ts1) as f64 / (self.ts2now.ns() - self.ts1now.ns()) as f64; self.avg = avg as f64 * f; } else { panic!("TODO non-time-weighted binning to be impl"); } } else { self.cnt += cnt; if *min < self.min { self.min = min.clone(); } if *max > self.max { self.max = max.clone(); } if self.do_time_weight { let f = (ts2 - ts1) as f64 / (self.ts2now.ns() - self.ts1now.ns()) as f64; self.avg += avg as f64 * f; } else { panic!("TODO non-time-weighted binning to be impl"); } } self.filled_up_to = TsNano::from_ns(ts2); self.last_seen_avg = avg; } } if count_before != 0 { warn!( "----- seen {} / {} input bins from time before", count_before, item.len() ); } } fn set_range_complete(&mut self) { self.range_complete = true; } fn bins_ready_count(&self) -> usize { self.out.len() } fn bins_ready(&mut self) -> Option { if self.out.len() != 0 { let ret = core::mem::replace(&mut self.out, BinsDim0::empty()); Some(ret) } else { None } } fn push_in_progress(&mut self, push_empty: bool) { if self.filled_up_to != self.ts2now { if self.cnt != 0 { info!("push_in_progress partially filled bin"); if self.do_time_weight { let f = (self.ts2now.ns() - self.filled_up_to.ns()) as f64 / (self.ts2now.ns() - self.ts1now.ns()) as f64; self.avg += self.lst.as_prim_f32_b() as f64 * f; self.filled_up_to = self.ts2now; } else { panic!("TODO non-time-weighted binning to be impl"); } } else { if self.filled_up_to != self.ts1now { error!("partially filled bin with cnt 0"); } } } if self.cnt == 0 && !push_empty { self.reset_agg(); } else { let min = self.min.clone(); let max = self.max.clone(); let avg = self.avg as f32; if avg < min.as_prim_f32_b() || avg > max.as_prim_f32_b() { // TODO rounding issues? debug!("bad avg"); } self.out.ts1s.push_back(self.ts1now.ns()); self.out.ts2s.push_back(self.ts2now.ns()); self.out.cnts.push_back(self.cnt); self.out.mins.push_back(min); self.out.maxs.push_back(max); self.out.avgs.push_back(avg); self.out.lsts.push_back(self.lst.clone()); self.reset_agg(); } } fn cycle(&mut self) { self.push_in_progress(true); self.ts1now = self.ts1now.add_dt_nano(self.binrange.bin_len.to_dt_nano()); self.ts2now = self.ts2now.add_dt_nano(self.binrange.bin_len.to_dt_nano()); } fn empty(&self) -> Option { Some(::Output::empty()) } fn append_empty_until_end(&mut self) { let mut i = 0; while self.ts2now.ns() < self.binrange.full_range().end() { self.cycle(); i += 1; if i > 100000 { panic!("append_empty_until_end too many iterations"); } } } } impl TimeBinnableTy for BinsDim0 { type TimeBinner = BinsDim0TimeBinnerTy; fn time_binner_new( &self, binrange: BinnedRangeEnum, do_time_weight: bool, emit_empty_bins: bool, ) -> Self::TimeBinner { match binrange { BinnedRangeEnum::Time(binrange) => BinsDim0TimeBinnerTy::new(binrange, do_time_weight, emit_empty_bins), BinnedRangeEnum::Pulse(_) => todo!("TimeBinnableTy for BinsDim0 Pulse"), } } } // TODO rename to BinsDim0CollectorOutput #[derive(Debug, Serialize, Deserialize)] pub struct BinsDim0CollectedResult { #[serde(rename = "tsAnchor")] ts_anchor_sec: u64, #[serde(rename = "ts1Ms")] ts1_off_ms: VecDeque, #[serde(rename = "ts2Ms")] ts2_off_ms: VecDeque, #[serde(rename = "ts1Ns")] ts1_off_ns: VecDeque, #[serde(rename = "ts2Ns")] ts2_off_ns: VecDeque, #[serde(rename = "counts")] counts: VecDeque, #[serde(rename = "mins")] mins: VecDeque, #[serde(rename = "maxs")] maxs: VecDeque, #[serde(rename = "avgs")] avgs: VecDeque, #[serde(rename = "rangeFinal", default, skip_serializing_if = "is_false")] range_final: bool, #[serde(rename = "timedOut", default, skip_serializing_if = "is_false")] timed_out: bool, #[serde(rename = "missingBins", default, skip_serializing_if = "CmpZero::is_zero")] missing_bins: u32, #[serde(rename = "continueAt", default, skip_serializing_if = "Option::is_none")] continue_at: Option, #[serde(rename = "finishedAt", default, skip_serializing_if = "Option::is_none")] finished_at: Option, } // TODO temporary fix for the enum output impl BinsDim0CollectedResult where STY: ScalarOps, { pub fn boxed_collected_with_enum_fix(&self) -> Box { if let Some(bins) = self .as_any_ref() .downcast_ref::>() { debug!("boxed_collected_with_enum_fix"); let mins = self.mins.iter().map(|x| 6).collect(); let maxs = self.mins.iter().map(|x| 7).collect(); let bins = BinsDim0CollectedResult:: { ts_anchor_sec: self.ts_anchor_sec.clone(), ts1_off_ms: self.ts1_off_ms.clone(), ts2_off_ms: self.ts2_off_ms.clone(), ts1_off_ns: self.ts1_off_ns.clone(), ts2_off_ns: self.ts2_off_ns.clone(), counts: self.counts.clone(), mins, maxs, avgs: self.avgs.clone(), range_final: self.range_final.clone(), timed_out: self.timed_out.clone(), missing_bins: self.missing_bins.clone(), continue_at: self.continue_at.clone(), finished_at: self.finished_at.clone(), }; Box::new(bins) } else { let bins = Self { ts_anchor_sec: self.ts_anchor_sec.clone(), ts1_off_ms: self.ts1_off_ms.clone(), ts2_off_ms: self.ts2_off_ms.clone(), ts1_off_ns: self.ts1_off_ns.clone(), ts2_off_ns: self.ts2_off_ns.clone(), counts: self.counts.clone(), mins: self.mins.clone(), maxs: self.maxs.clone(), avgs: self.avgs.clone(), range_final: self.range_final.clone(), timed_out: self.timed_out.clone(), missing_bins: self.missing_bins.clone(), continue_at: self.continue_at.clone(), finished_at: self.finished_at.clone(), }; Box::new(bins) } } } impl AsAnyRef for BinsDim0CollectedResult where NTY: 'static, { fn as_any_ref(&self) -> &dyn Any { self } } impl AsAnyMut for BinsDim0CollectedResult where NTY: 'static, { fn as_any_mut(&mut self) -> &mut dyn Any { self } } impl TypeName for BinsDim0CollectedResult { fn type_name(&self) -> String { any::type_name::().into() } } impl WithLen for BinsDim0CollectedResult { fn len(&self) -> usize { self.mins.len() } } impl CollectedDyn for BinsDim0CollectedResult {} impl BinsDim0CollectedResult { pub fn ts_anchor_sec(&self) -> u64 { self.ts_anchor_sec } pub fn ts1_off_ms(&self) -> &VecDeque { &self.ts1_off_ms } pub fn ts2_off_ms(&self) -> &VecDeque { &self.ts2_off_ms } pub fn counts(&self) -> &VecDeque { &self.counts } pub fn range_final(&self) -> bool { self.range_final } pub fn timed_out(&self) -> bool { self.timed_out } pub fn missing_bins(&self) -> u32 { self.missing_bins } pub fn continue_at(&self) -> Option { self.continue_at.clone() } pub fn mins(&self) -> &VecDeque { &self.mins } pub fn maxs(&self) -> &VecDeque { &self.maxs } pub fn avgs(&self) -> &VecDeque { &self.avgs } } impl ToJsonResult for BinsDim0CollectedResult { fn to_json_value(&self) -> Result { serde_json::to_value(self).map_err(Error::from_string) } } #[derive(Debug)] pub struct BinsDim0Collector { vals: Option>, timed_out: bool, range_final: bool, } impl BinsDim0Collector { pub fn self_name() -> &'static str { any::type_name::() } pub fn new() -> Self { Self { timed_out: false, range_final: false, vals: None, } } } impl WithLen for BinsDim0Collector { fn len(&self) -> usize { self.vals.as_ref().map_or(0, WithLen::len) } } impl ByteEstimate for BinsDim0Collector { fn byte_estimate(&self) -> u64 { self.vals.as_ref().map_or(0, ByteEstimate::byte_estimate) } } impl CollectorTy for BinsDim0Collector { type Input = BinsDim0; type Output = BinsDim0CollectedResult; fn ingest(&mut self, src: &mut Self::Input) { if self.vals.is_none() { self.vals = Some(Self::Input::empty()); } let vals = self.vals.as_mut().unwrap(); vals.ts1s.append(&mut src.ts1s); vals.ts2s.append(&mut src.ts2s); vals.cnts.append(&mut src.cnts); vals.mins.append(&mut src.mins); vals.maxs.append(&mut src.maxs); vals.avgs.append(&mut src.avgs); vals.lsts.append(&mut src.lsts); } fn set_range_complete(&mut self) { self.range_final = true; } fn set_timed_out(&mut self) { self.timed_out = true; } fn set_continue_at_here(&mut self) { debug!("{}::set_continue_at_here", Self::self_name()); // TODO for bins, do nothing: either we have all bins or not. } fn result( &mut self, _range: Option, binrange: Option, ) -> Result { trace!("trying to make a result from {self:?}"); let bin_count_exp = if let Some(r) = &binrange { r.bin_count() as u32 } else { debug!("no binrange given"); 0 }; let mut vals = if let Some(x) = self.vals.take() { x } else { return Err(Error::with_msg_no_trace("BinsDim0Collector without vals")); }; let bin_count = vals.ts1s.len() as u32; debug!( "result make missing bins bin_count_exp {} bin_count {}", bin_count_exp, bin_count ); let (missing_bins, continue_at, finished_at) = if bin_count < bin_count_exp { match vals.ts2s.back() { Some(&k) => { let missing_bins = bin_count_exp - bin_count; let continue_at = IsoDateTime::from_ns_u64(k); let u = k + (k - vals.ts1s.back().unwrap()) * missing_bins as u64; let finished_at = IsoDateTime::from_ns_u64(u); (missing_bins, Some(continue_at), Some(finished_at)) } None => { warn!("can not determine continue-at parameters"); (0, None, None) } } } else { (0, None, None) }; if vals.ts1s.as_slices().1.len() != 0 { warn!("ts1s non-contiguous"); } if vals.ts2s.as_slices().1.len() != 0 { warn!("ts2s non-contiguous"); } let ts1s = vals.ts1s.make_contiguous(); let ts2s = vals.ts2s.make_contiguous(); let (ts_anch, ts1ms, ts1ns) = ts_offs_from_abs(ts1s); let (ts2ms, ts2ns) = ts_offs_from_abs_with_anchor(ts_anch, ts2s); let counts = vals.cnts; let mins = vals.mins; let maxs = vals.maxs; let avgs = vals.avgs; let ret = BinsDim0CollectedResult:: { ts_anchor_sec: ts_anch, ts1_off_ms: ts1ms, ts1_off_ns: ts1ns, ts2_off_ms: ts2ms, ts2_off_ns: ts2ns, counts, mins, maxs, avgs, range_final: self.range_final, timed_out: self.timed_out, missing_bins, continue_at, finished_at, }; *self = Self::new(); Ok(ret) } } impl CollectableType for BinsDim0 { type Collector = BinsDim0Collector; fn new_collector() -> Self::Collector { Self::Collector::new() } } #[derive(Debug)] pub struct BinsDim0Aggregator { range: SeriesRange, cnt: u64, minmaxlst: Option<(NTY, NTY, NTY)>, sumc: u64, sum: f32, } impl BinsDim0Aggregator { pub fn new(range: SeriesRange, _do_time_weight: bool) -> Self { Self { range, cnt: 0, minmaxlst: None, sumc: 0, sum: 0f32, } } } impl TimeBinnableTypeAggregator for BinsDim0Aggregator { type Input = BinsDim0; type Output = BinsDim0; fn range(&self) -> &SeriesRange { &self.range } fn ingest(&mut self, item: &Self::Input) { let beg = self.range.beg_u64(); let end = self.range.end_u64(); for ((((((&ts1, &ts2), &count), min), max), &avg), lst) in item .ts1s .iter() .zip(item.ts2s.iter()) .zip(item.cnts.iter()) .zip(item.mins.iter()) .zip(item.maxs.iter()) .zip(item.avgs.iter()) .zip(item.lsts.iter()) { if ts2 <= beg { } else if ts1 >= end { } else { if let Some((cmin, cmax, clst)) = self.minmaxlst.as_mut() { if min < cmin { *cmin = min.clone(); } if max > cmax { *cmax = max.clone(); } *clst = lst.clone(); } else { self.minmaxlst = Some((min.clone(), max.clone(), lst.clone())); } self.cnt += count; // TODO this works only for equidistant bins edges. self.sumc += 1; self.sum += avg; } } } fn result_reset(&mut self, range: SeriesRange) -> Self::Output { let ret = if let Some((min, max, lst)) = self.minmaxlst.take() { self.minmaxlst = Some((lst.clone(), lst.clone(), lst.clone())); let avg = if self.sumc > 0 { self.sum / self.sumc as f32 } else { NTY::zero_b().as_prim_f32_b() }; Self::Output { ts1s: [self.range.beg_u64()].into(), ts2s: [self.range.end_u64()].into(), cnts: [self.cnt].into(), mins: [min].into(), maxs: [max].into(), avgs: [avg].into(), lsts: [lst].into(), // TODO dim0kind: None, } } else { if self.cnt != 0 { error!("result_reset non-zero cnt but no minmaxlst"); } warn!("result_reset missing minmaxlst"); Self::Output::empty() }; self.range = range; self.cnt = 0; self.sumc = 0; self.sum = 0.; ret } } macro_rules! try_to_container_bins { ($sty:ty, $avgty:ty, $this:expr) => { let this = $this; if let Some(bins) = this.as_any_ref().downcast_ref::>() { let ts1s = bins.ts1s.iter().map(|&x| TsNano::from_ns(x)).collect(); let ts2s = bins.ts2s.iter().map(|&x| TsNano::from_ns(x)).collect(); let cnts = bins.cnts.iter().map(|&x| x).collect(); let mins = bins.mins.iter().map(|&x| x).collect(); let maxs = bins.maxs.iter().map(|&x| x).collect(); let avgs = bins.avgs.iter().map(|&x| x as $avgty).collect(); let lsts = bins.lsts.iter().map(|&x| x).collect(); let fnls = bins.ts1s.iter().map(|_| true).collect(); let dst = crate::binning::container_bins::ContainerBins::<$sty>::from_constituents( ts1s, ts2s, cnts, mins, maxs, avgs, lsts, fnls, ); return Box::new(dst); } }; } impl TimeBinnable for BinsDim0 { fn time_binner_new( &self, binrange: BinnedRangeEnum, do_time_weight: bool, emit_empty_bins: bool, ) -> Box { // TODO get rid of unwrap // TODO respect emit_empty_bins let ret = BinsDim0TimeBinner::::new(binrange, do_time_weight).unwrap(); Box::new(ret) } fn to_box_to_json_result(&self) -> Box { let k = serde_json::to_value(self).unwrap(); Box::new(k) as _ } fn to_container_bins(&self) -> Box { let this = self; try_to_container_bins!(u8, f64, self); try_to_container_bins!(u16, f64, self); try_to_container_bins!(u32, f64, self); try_to_container_bins!(u64, f64, self); try_to_container_bins!(i8, f64, self); try_to_container_bins!(i16, f64, self); try_to_container_bins!(i32, f64, self); try_to_container_bins!(i64, f64, self); try_to_container_bins!(f32, f32, self); try_to_container_bins!(f64, f64, self); if let Some(bins) = this.as_any_ref().downcast_ref::>() { let ts1s = bins.ts1s.iter().map(|&x| TsNano::from_ns(x)).collect(); let ts2s = bins.ts2s.iter().map(|&x| TsNano::from_ns(x)).collect(); let cnts = bins.cnts.iter().map(|&x| x).collect(); let mins = bins.mins.iter().map(|x| x.clone()).collect(); let maxs = bins.maxs.iter().map(|x| x.clone()).collect(); let avgs = bins.avgs.iter().map(|&x| x as f64).collect(); let lsts = bins.lsts.iter().map(|&x| x).collect(); let fnls = bins.ts1s.iter().map(|_| true).collect(); let dst = crate::binning::container_bins::ContainerBins::::from_constituents( ts1s, ts2s, cnts, mins, maxs, avgs, lsts, fnls, ); return Box::new(dst); } if let Some(bins) = this.as_any_ref().downcast_ref::>() { let ts1s = bins.ts1s.iter().map(|&x| TsNano::from_ns(x)).collect(); let ts2s = bins.ts2s.iter().map(|&x| TsNano::from_ns(x)).collect(); let cnts = bins.cnts.iter().map(|&x| x).collect(); let mins = bins.mins.iter().map(|x| x.clone()).collect(); let maxs = bins.maxs.iter().map(|x| x.clone()).collect(); let avgs = bins.avgs.iter().map(|&x| x as f64).collect(); let lsts = bins.lsts.iter().map(|x| x.clone()).collect(); let fnls = bins.ts1s.iter().map(|_| true).collect(); let dst = crate::binning::container_bins::ContainerBins::::from_constituents( ts1s, ts2s, cnts, mins, maxs, avgs, lsts, fnls, ); return Box::new(dst); } if let Some(bins) = this.as_any_ref().downcast_ref::>() { let ts1s = bins.ts1s.iter().map(|&x| TsNano::from_ns(x)).collect(); let ts2s = bins.ts2s.iter().map(|&x| TsNano::from_ns(x)).collect(); let cnts = bins.cnts.iter().map(|&x| x).collect(); let mins = bins.mins.iter().map(|x| x.clone()).collect(); let maxs = bins.maxs.iter().map(|x| x.clone()).collect(); let avgs = bins.avgs.iter().map(|&x| x).collect(); let lsts = bins.lsts.iter().map(|x| x.clone()).collect(); let fnls = bins.ts1s.iter().map(|_| true).collect(); let dst = crate::binning::container_bins::ContainerBins::::from_constituents( ts1s, ts2s, cnts, mins, maxs, avgs, lsts, fnls, ); return Box::new(dst); } let styn = any::type_name::(); todo!("TODO impl for {styn}"); } } #[derive(Debug)] pub struct BinsDim0TimeBinner { binrange: BinnedRangeEnum, rix: usize, rng: Option, agg: BinsDim0Aggregator, ready: Option< as TimeBinnableTypeAggregator>::Output>, range_final: bool, } impl BinsDim0TimeBinner { fn type_name() -> &'static str { any::type_name::() } fn new(binrange: BinnedRangeEnum, do_time_weight: bool) -> Result { let rng = binrange .range_at(0) .ok_or_else(|| Error::with_msg_no_trace("empty binrange"))?; let agg = BinsDim0Aggregator::new(rng, do_time_weight); let ret = Self { binrange, rix: 0, rng: Some(agg.range().clone()), agg, ready: None, range_final: false, }; Ok(ret) } fn next_bin_range(&mut self) -> Option { self.rix += 1; if let Some(rng) = self.binrange.range_at(self.rix) { trace!("{} next_bin_range {:?}", Self::type_name(), rng); Some(rng) } else { trace!("{} next_bin_range None", Self::type_name()); None } } } impl TimeBinnerCommonV0Trait for BinsDim0TimeBinner { type Input = as TimeBinnableTypeAggregator>::Input; type Output = as TimeBinnableTypeAggregator>::Output; fn type_name() -> &'static str { Self::type_name() } fn common_bins_ready_count(&self) -> usize { match &self.ready { Some(k) => k.len(), None => 0, } } fn common_range_current(&self) -> &SeriesRange { self.agg.range() } fn common_has_more_range(&self) -> bool { self.rng.is_some() } fn common_next_bin_range(&mut self) -> Option { self.next_bin_range() } fn common_set_current_range(&mut self, range: Option) { self.rng = range; } fn common_take_or_append_all_from(&mut self, item: Self::Output) { let mut item = item; match self.ready.as_mut() { Some(ready) => { ready.append_all_from(&mut item); } None => { self.ready = Some(item); } } } fn common_result_reset(&mut self, range: Option) -> Self::Output { // TODO maybe better to wrap the aggregator in Option and remove the whole thing when no more bins? self.agg.result_reset(range.unwrap_or_else(|| { SeriesRange::TimeRange(netpod::range::evrange::NanoRange { beg: u64::MAX, end: u64::MAX, }) })) } fn common_agg_ingest(&mut self, item: &mut Self::Input) { self.agg.ingest(item) } fn common_has_lst(&self) -> bool { todo!() } fn common_feed_lst(&mut self, item: &mut Self::Input) { todo!() } } impl TimeBinner for BinsDim0TimeBinner { fn bins_ready_count(&self) -> usize { TimeBinnerCommonV0Trait::common_bins_ready_count(self) } fn bins_ready(&mut self) -> Option> { match self.ready.take() { Some(k) => Some(Box::new(k)), None => None, } } fn ingest(&mut self, item: &mut dyn TimeBinnable) { /*let self_name = any::type_name::(); if item.len() == 0 { // Return already here, RangeOverlapInfo would not give much sense. return; } if self.edges.len() < 2 { warn!("TimeBinnerDyn for {self_name} no more bin in edges A\n{:?}\n\n", item); return; } // TODO optimize by remembering at which event array index we have arrived. // That needs modified interfaces which can take and yield the start and latest index. loop { while item.starts_after(NanoRange { beg: 0, end: self.edges[1], }) { self.cycle(); if self.edges.len() < 2 { warn!("TimeBinnerDyn for {self_name} no more bin in edges B\n{:?}\n\n", item); return; } } if item.ends_before(NanoRange { beg: self.edges[0], end: u64::MAX, }) { return; } else { if self.edges.len() < 2 { warn!("TimeBinnerDyn for {self_name} edge list exhausted"); return; } else { let agg = if let Some(agg) = self.agg.as_mut() { agg } else { self.agg = Some(BinsDim0Aggregator::new( // We know here that we have enough edges for another bin. // and `next_bin_range` will pop the first edge. self.next_bin_range().unwrap(), self.do_time_weight, )); self.agg.as_mut().unwrap() }; if let Some(item) = item .as_any_ref() // TODO make statically sure that we attempt to cast to the correct type here: .downcast_ref::< as TimeBinnableTypeAggregator>::Input>() { agg.ingest(item); } else { let tyid_item = std::any::Any::type_id(item.as_any_ref()); error!("not correct item type {:?}", tyid_item); }; if item.ends_after(agg.range().clone()) { self.cycle(); if self.edges.len() < 2 { warn!("TimeBinnerDyn for {self_name} no more bin in edges C\n{:?}\n\n", item); return; } } else { break; } } } }*/ TimeBinnerCommonV0Func::ingest(self, item) } // TODO there is too much common code between implementors: fn push_in_progress(&mut self, push_empty: bool) { // TODO expand should be derived from AggKind. Is it still required after all? /*let expand = true; if let Some(agg) = self.agg.as_mut() { let dummy_range = NanoRange { beg: 4, end: 5 }; let mut bins = agg.result_reset(dummy_range, expand); self.agg = None; assert_eq!(bins.len(), 1); if push_empty || bins.counts[0] != 0 { match self.ready.as_mut() { Some(ready) => { ready.append_all_from(&mut bins); } None => { self.ready = Some(bins); } } } }*/ TimeBinnerCommonV0Func::push_in_progress(self, push_empty) } // TODO there is too much common code between implementors: fn cycle(&mut self) { /*let n = self.bins_ready_count(); self.push_in_progress(true); if self.bins_ready_count() == n { if let Some(range) = self.next_bin_range() { let mut bins = BinsDim0::::empty(); bins.append_zero(range.beg, range.end); match self.ready.as_mut() { Some(ready) => { ready.append_all_from(&mut bins); } None => { self.ready = Some(bins); } } if self.bins_ready_count() <= n { error!("failed to push a zero bin"); } } else { warn!("cycle: no in-progress bin pushed, but also no more bin to add as zero-bin"); } }*/ TimeBinnerCommonV0Func::cycle(self) } fn set_range_complete(&mut self) { self.range_final = true; } fn empty(&self) -> Box { let ret = as TimeBinnableTypeAggregator>::Output::empty(); Box::new(ret) } fn append_empty_until_end(&mut self) { while self.common_has_more_range() { TimeBinnerCommonV0Func::push_in_progress(self, true); } } } impl TimeBinned for BinsDim0 { fn clone_box_time_binned(&self) -> Box { Box::new(self.clone()) } fn as_time_binnable_ref(&self) -> &dyn TimeBinnable { self } fn as_time_binnable_mut(&mut self) -> &mut dyn TimeBinnable { self } fn edges_slice(&self) -> (&[u64], &[u64]) { if self.ts1s.as_slices().1.len() != 0 { panic!(); } if self.ts2s.as_slices().1.len() != 0 { panic!(); } (&self.ts1s.as_slices().0, &self.ts2s.as_slices().0) } fn counts(&self) -> &[u64] { // TODO check for contiguous self.cnts.as_slices().0 } // TODO is Vec needed? fn mins(&self) -> Vec { self.mins.iter().map(|x| x.clone().as_prim_f32_b()).collect() } // TODO is Vec needed? fn maxs(&self) -> Vec { self.maxs.iter().map(|x| x.clone().as_prim_f32_b()).collect() } // TODO is Vec needed? fn avgs(&self) -> Vec { self.avgs.iter().map(Clone::clone).collect() } fn validate(&self) -> Result<(), String> { use fmt::Write; let mut msg = String::new(); if self.ts1s.len() != self.ts2s.len() { write!(&mut msg, "ts1s ≠ ts2s\n").unwrap(); } for (i, ((count, min), max)) in self.cnts.iter().zip(&self.mins).zip(&self.maxs).enumerate() { if min.as_prim_f32_b() < 1. && *count != 0 { write!(&mut msg, "i {} count {} min {:?} max {:?}\n", i, count, min, max).unwrap(); } } if msg.is_empty() { Ok(()) } else { Err(msg) } } fn as_collectable_mut(&mut self) -> &mut dyn CollectableDyn { self } fn empty_like_self_box_time_binned(&self) -> Box { Box::new(Self::empty()) } fn to_simple_bins_f32(&mut self) -> Box { use mem::replace; let ret = BinsDim0:: { ts1s: replace(&mut self.ts1s, VecDeque::new()), ts2s: replace(&mut self.ts2s, VecDeque::new()), cnts: replace(&mut self.cnts, VecDeque::new()), mins: self.mins.iter().map(AsPrimF32::as_prim_f32_b).collect(), maxs: self.maxs.iter().map(AsPrimF32::as_prim_f32_b).collect(), avgs: replace(&mut self.avgs, VecDeque::new()), lsts: self.lsts.iter().map(AsPrimF32::as_prim_f32_b).collect(), dim0kind: None, }; Box::new(ret) } fn drain_into_tb(&mut self, dst: &mut dyn TimeBinned, range: Range) -> Result<(), Error> { // TODO as_any and as_any_mut are declared on unrelated traits. Simplify. if let Some(dst) = dst.as_any_mut().downcast_mut::() { // TODO make it harder to forget new members when the struct may get modified in the future dst.ts1s.extend(self.ts1s.drain(range.clone())); dst.ts2s.extend(self.ts2s.drain(range.clone())); dst.cnts.extend(self.cnts.drain(range.clone())); dst.mins.extend(self.mins.drain(range.clone())); dst.maxs.extend(self.maxs.drain(range.clone())); dst.avgs.extend(self.avgs.drain(range.clone())); dst.lsts.extend(self.lsts.drain(range.clone())); Ok(()) } else { let type_name = any::type_name::(); error!("downcast to {} FAILED", type_name); Err(Error::with_msg_no_trace(format!("downcast to {} FAILED", type_name))) } } } #[test] fn bins_timebin_fill_empty_00() { let mut bins = BinsDim0::::empty(); let binrange = BinnedRangeEnum::Time(BinnedRange { bin_len: TsNano::from_ns(SEC * 2), bin_off: 9, bin_cnt: 5, }); let do_time_weight = true; let mut binner = bins .as_time_binnable_ref() .time_binner_new(binrange, do_time_weight, false); binner.ingest(&mut bins); binner.append_empty_until_end(); let ready = binner.bins_ready(); let got = ready.unwrap(); let got: &BinsDim0 = got.as_any_ref().downcast_ref().unwrap(); let mut exp = BinsDim0::empty(); // Currently bins without lst can not exist. // for i in 0..5 { // exp.push(SEC * 2 * (9 + i), SEC * 2 * (10 + i), 0, 0, 0, 0., None); // } assert_eq!(got, &exp); } #[test] fn bins_timebin_fill_empty_01() { let mut bins = BinsDim0::::empty(); let binrange = BinnedRangeEnum::Time(BinnedRange { bin_len: TsNano::from_ns(SEC * 2), bin_off: 9, bin_cnt: 5, }); let do_time_weight = true; let mut binner = bins .as_time_binnable_ref() .time_binner_new(binrange, do_time_weight, false); binner.ingest(&mut bins); binner.push_in_progress(true); binner.append_empty_until_end(); let ready = binner.bins_ready(); let got = ready.unwrap(); let got: &BinsDim0 = got.as_any_ref().downcast_ref().unwrap(); let mut exp = BinsDim0::empty(); // Currently bins without lst can not exist. // for i in 0..5 { // exp.push(SEC * 2 * (9 + i), SEC * 2 * (10 + i), 0, 0, 0, 0., None); // } assert_eq!(got, &exp); } #[test] fn bins_timebin_push_empty_00() { let mut bins = BinsDim0::::empty(); let binrange = BinnedRangeEnum::Time(BinnedRange { bin_len: TsNano::from_ns(SEC * 2), bin_off: 9, bin_cnt: 5, }); let do_time_weight = true; let mut binner = bins .as_time_binnable_ref() .time_binner_new(binrange, do_time_weight, false); binner.ingest(&mut bins); binner.push_in_progress(true); let ready = binner.bins_ready(); let got = ready.unwrap(); let got: &BinsDim0 = got.as_any_ref().downcast_ref().unwrap(); let mut exp = BinsDim0::empty(); // Currently bins without lst can not exist. // for i in 0..1 { // exp.push(SEC * 2 * (9 + i), SEC * 2 * (10 + i), 0, 0, 0, 0., None); // } assert_eq!(got, &exp); } #[test] fn bins_timebin_push_empty_01() { let mut bins = BinsDim0::::empty(); let binrange = BinnedRangeEnum::Time(BinnedRange { bin_len: TsNano::from_ns(SEC * 2), bin_off: 9, bin_cnt: 5, }); let do_time_weight = true; let mut binner = bins .as_time_binnable_ref() .time_binner_new(binrange, do_time_weight, false); binner.ingest(&mut bins); binner.push_in_progress(true); binner.push_in_progress(true); binner.push_in_progress(true); let ready = binner.bins_ready(); let got = ready.unwrap(); let got: &BinsDim0 = got.as_any_ref().downcast_ref().unwrap(); let mut exp = BinsDim0::empty(); // Currently bins without lst can not exist. // for i in 0..3 { // exp.push(SEC * 2 * (9 + i), SEC * 2 * (10 + i), 0, 0, 0, 0., None); // } assert_eq!(got, &exp); } #[test] fn bins_timebin_ingest_only_before() { let mut bins = BinsDim0::::empty(); bins.push(SEC * 2, SEC * 4, 3, 7, 9, 8.1, 8); bins.push(SEC * 4, SEC * 6, 3, 6, 9, 8.2, 8); let binrange = BinnedRangeEnum::Time(BinnedRange { bin_len: TsNano::from_ns(SEC * 2), bin_off: 9, bin_cnt: 5, }); let do_time_weight = true; let mut binner = bins .as_time_binnable_ref() .time_binner_new(binrange, do_time_weight, false); binner.ingest(&mut bins); binner.push_in_progress(true); let ready = binner.bins_ready(); let got = ready.unwrap(); let got: &BinsDim0 = got.as_any_ref().downcast_ref().unwrap(); let mut exp = BinsDim0::empty(); // Currently bins without lst can not exist. // exp.push(SEC * 18, SEC * 20, 0, 0, 0, 0., None); assert_eq!(got, &exp); } #[test] fn bins_timebin_ingest_00() { let mut bins = BinsDim0::::empty(); bins.push(SEC * 20, SEC * 21, 3, 70, 94, 82., 80); bins.push(SEC * 21, SEC * 22, 5, 71, 93, 86., 81); bins.push(SEC * 23, SEC * 24, 6, 72, 92, 81., 82); let binrange = BinnedRangeEnum::Time(BinnedRange { bin_len: TsNano::from_ns(SEC * 2), bin_off: 9, bin_cnt: 5, }); let do_time_weight = true; let mut binner = bins .as_time_binnable_ref() .time_binner_new(binrange, do_time_weight, false); binner.ingest(&mut bins); binner.push_in_progress(true); let ready = binner.bins_ready(); let got = ready.unwrap(); let got: &BinsDim0 = got.as_any_ref().downcast_ref().unwrap(); let mut exp = BinsDim0::empty(); // Currently bins without lst can not exist. // exp.push(SEC * 18, SEC * 20, 0, 0, 0, 0., None); exp.push(SEC * 20, SEC * 22, 8, 70, 94, 84., 82); exp.push(SEC * 22, SEC * 24, 6, 72, 92, 81., 91); assert_eq!(got, &exp); } #[test] fn bins_timebin_ingest_continuous_00() { let binrange = BinnedRangeEnum::Time(BinnedRange { bin_len: TsNano::from_ns(SEC * 2), bin_off: 9, bin_cnt: 20, }); let do_time_weight = true; let mut bins = BinsDim0::::empty(); bins.push(SEC * 20, SEC * 21, 3, 70, 94, 82., 80); //bins.push(SEC * 21, SEC * 22, 5, 71, 93, 86.); //bins.push(SEC * 23, SEC * 24, 6, 72, 92, 81.); let mut binner = bins .as_time_binnable_ref() .time_binner_new(binrange, do_time_weight, false); binner.ingest(&mut bins); //binner.push_in_progress(true); let ready = binner.bins_ready(); let got = ready.unwrap(); let got: &BinsDim0 = got.as_any_ref().downcast_ref().unwrap(); let mut exp = BinsDim0::empty(); // Currently bins without lst can not exist. // exp.push(SEC * 18, SEC * 20, 0, 0, 0, 0., None); exp.push(SEC * 20, SEC * 22, 8, 70, 94, 84., 82); exp.push(SEC * 22, SEC * 24, 6, 72, 92, 81., 91); assert_eq!(got, &exp); }