From 2c26689b9f69c0991e330e6f233e5eae7134b299 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Tue, 12 Nov 2024 15:14:15 +0100 Subject: [PATCH] Trim unused code --- src/binsdim0.rs | 39 ++- src/binsxbindim0.rs | 523 ---------------------------- src/channelevents.rs | 201 ++++++----- src/eventsdim1.rs | 49 +-- src/eventsxbindim0.rs | 779 ------------------------------------------ src/items_2.rs | 19 +- src/merger.rs | 49 +-- 7 files changed, 199 insertions(+), 1460 deletions(-) delete mode 100644 src/binsxbindim0.rs delete mode 100644 src/eventsxbindim0.rs diff --git a/src/binsdim0.rs b/src/binsdim0.rs index 4b3d969..3c36b0c 100644 --- a/src/binsdim0.rs +++ b/src/binsdim0.rs @@ -312,7 +312,10 @@ impl TimeBins for BinsDim0 { } fn ts_min_max(&self) -> Option<(u64, u64)> { - if let (Some(min), Some(max)) = (self.ts1s.front().map(Clone::clone), self.ts2s.back().map(Clone::clone)) { + if let (Some(min), Some(max)) = ( + self.ts1s.front().map(Clone::clone), + self.ts2s.back().map(Clone::clone), + ) { Some((min, max)) } else { None @@ -389,7 +392,11 @@ where type Output = BinsDim0; fn ingest(&mut self, item: &mut Self::Input) { - trace_ingest!("<{} as TimeBinnerTy>::ingest {:?}", Self::type_name(), item); + trace_ingest!( + "<{} as TimeBinnerTy>::ingest {:?}", + Self::type_name(), + item + ); let mut count_before = 0; for ((((((&ts1, &ts2), &cnt), min), max), &avg), lst) in item .ts1s @@ -407,7 +414,11 @@ where continue; } // warn!("encountered bin from time before {} {}", ts1, self.ts1now.ns()); - trace_ingest!("{} input bin before {}", Self::type_name(), TsNano::from_ns(ts1)); + trace_ingest!( + "{} input bin before {}", + Self::type_name(), + TsNano::from_ns(ts1) + ); self.min = min.clone(); self.max = max.clone(); self.lst = lst.clone(); @@ -562,7 +573,9 @@ impl TimeBinnableTy for BinsDim0 { emit_empty_bins: bool, ) -> Self::TimeBinner { match binrange { - BinnedRangeEnum::Time(binrange) => BinsDim0TimeBinnerTy::new(binrange, do_time_weight, emit_empty_bins), + BinnedRangeEnum::Time(binrange) => { + BinsDim0TimeBinnerTy::new(binrange, do_time_weight, emit_empty_bins) + } BinnedRangeEnum::Pulse(_) => todo!("TimeBinnableTy for BinsDim0 Pulse"), } } @@ -593,11 +606,23 @@ pub struct BinsDim0CollectedResult { range_final: bool, #[serde(rename = "timedOut", default, skip_serializing_if = "is_false")] timed_out: bool, - #[serde(rename = "missingBins", default, skip_serializing_if = "CmpZero::is_zero")] + #[serde( + rename = "missingBins", + default, + skip_serializing_if = "CmpZero::is_zero" + )] missing_bins: u32, - #[serde(rename = "continueAt", default, skip_serializing_if = "Option::is_none")] + #[serde( + rename = "continueAt", + default, + skip_serializing_if = "Option::is_none" + )] continue_at: Option, - #[serde(rename = "finishedAt", default, skip_serializing_if = "Option::is_none")] + #[serde( + rename = "finishedAt", + default, + skip_serializing_if = "Option::is_none" + )] finished_at: Option, } diff --git a/src/binsxbindim0.rs b/src/binsxbindim0.rs deleted file mode 100644 index 326f69b..0000000 --- a/src/binsxbindim0.rs +++ /dev/null @@ -1,523 +0,0 @@ -use crate::ts_offs_from_abs; -use crate::ts_offs_from_abs_with_anchor; -use crate::IsoDateTime; -use daqbuf_err as err; -use err::Error; -use items_0::collect_s::CollectableDyn; -use items_0::collect_s::CollectableType; -use items_0::collect_s::CollectedDyn; -use items_0::collect_s::CollectorTy; -use items_0::collect_s::ToJsonResult; -use items_0::container::ByteEstimate; -use items_0::scalar_ops::AsPrimF32; -use items_0::scalar_ops::ScalarOps; -use items_0::timebin::TimeBins; -use items_0::AppendEmptyBin; -use items_0::AsAnyMut; -use items_0::AsAnyRef; -use items_0::Empty; -use items_0::Resettable; -use items_0::TypeName; -use items_0::WithLen; -use netpod::is_false; -use netpod::log::*; -use netpod::range::evrange::NanoRange; -use netpod::range::evrange::SeriesRange; -use netpod::timeunits::SEC; -use netpod::BinnedRangeEnum; -use netpod::CmpZero; -use netpod::Dim0Kind; -use serde::Deserialize; -use serde::Serialize; -use std::any; -use std::any::Any; -use std::collections::VecDeque; -use std::fmt; -use std::mem; -use std::ops::Range; - -#[allow(unused)] -macro_rules! trace4 { - ($($arg:tt)*) => (); - ($($arg:tt)*) => (eprintln!($($arg)*)); -} - -#[derive(Clone, PartialEq, Serialize, Deserialize)] -pub struct BinsXbinDim0 { - ts1s: VecDeque, - ts2s: VecDeque, - counts: VecDeque, - mins: VecDeque, - maxs: VecDeque, - avgs: VecDeque, - // TODO could consider more variables: - // ts min/max, pulse min/max, avg of mins, avg of maxs, variances, etc... - dim0kind: Option, -} - -impl TypeName for BinsXbinDim0 { - fn type_name(&self) -> String { - any::type_name::().into() - } -} - -impl fmt::Debug for BinsXbinDim0 -where - NTY: fmt::Debug, -{ - fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - let self_name = any::type_name::(); - write!( - fmt, - "{self_name} count {} ts1s {:?} ts2s {:?} counts {:?} mins {:?} maxs {:?} avgs {:?}", - self.ts1s.len(), - self.ts1s.iter().map(|k| k / SEC).collect::>(), - self.ts2s.iter().map(|k| k / SEC).collect::>(), - self.counts, - self.mins, - self.maxs, - self.avgs, - ) - } -} - -impl BinsXbinDim0 { - pub fn from_content( - ts1s: VecDeque, - ts2s: VecDeque, - counts: VecDeque, - mins: VecDeque, - maxs: VecDeque, - avgs: VecDeque, - ) -> Self { - Self { - ts1s, - ts2s, - counts, - mins, - maxs, - avgs, - dim0kind: None, - } - } - - pub fn counts(&self) -> &VecDeque { - &self.counts - } - - pub fn push(&mut self, ts1: u64, ts2: u64, count: u64, min: NTY, max: NTY, avg: f32) { - self.ts1s.push_back(ts1); - self.ts2s.push_back(ts2); - self.counts.push_back(count); - self.mins.push_back(min); - self.maxs.push_back(max); - self.avgs.push_back(avg); - } - - pub fn append_zero(&mut self, beg: u64, end: u64) { - self.ts1s.push_back(beg); - self.ts2s.push_back(end); - self.counts.push_back(0); - self.mins.push_back(NTY::zero_b()); - self.maxs.push_back(NTY::zero_b()); - self.avgs.push_back(0.); - } - - pub fn append_all_from(&mut self, src: &mut Self) { - self.ts1s.extend(src.ts1s.drain(..)); - self.ts2s.extend(src.ts2s.drain(..)); - self.counts.extend(src.counts.drain(..)); - self.mins.extend(src.mins.drain(..)); - self.maxs.extend(src.maxs.drain(..)); - self.avgs.extend(src.avgs.drain(..)); - } - - pub fn equal_slack(&self, other: &Self) -> bool { - for (&a, &b) in self.ts1s.iter().zip(other.ts1s.iter()) { - if a != b { - return false; - } - } - for (&a, &b) in self.ts2s.iter().zip(other.ts2s.iter()) { - if a != b { - return false; - } - } - for (a, b) in self.mins.iter().zip(other.mins.iter()) { - if !a.equal_slack(b) { - return false; - } - } - for (a, b) in self.maxs.iter().zip(other.maxs.iter()) { - if !a.equal_slack(b) { - return false; - } - } - for (a, b) in self.avgs.iter().zip(other.avgs.iter()) { - if !a.equal_slack(b) { - return false; - } - } - true - } -} - -impl AsAnyRef for BinsXbinDim0 -where - NTY: ScalarOps, -{ - fn as_any_ref(&self) -> &dyn Any { - self - } -} - -impl AsAnyMut for BinsXbinDim0 -where - STY: ScalarOps, -{ - fn as_any_mut(&mut self) -> &mut dyn Any { - self - } -} - -impl Empty for BinsXbinDim0 { - fn empty() -> Self { - Self { - ts1s: VecDeque::new(), - ts2s: VecDeque::new(), - counts: VecDeque::new(), - mins: VecDeque::new(), - maxs: VecDeque::new(), - avgs: VecDeque::new(), - dim0kind: None, - } - } -} - -impl WithLen for BinsXbinDim0 { - fn len(&self) -> usize { - self.ts1s.len() - } -} - -impl ByteEstimate for BinsXbinDim0 { - fn byte_estimate(&self) -> u64 { - // TODO - // Should use a better estimate for waveform and string types, - // or keep some aggregated byte count on push. - let n = self.len(); - if n == 0 { - 0 - } else { - // TODO use the actual size of one/some of the elements. - let i = n * 2 / 3; - let w1 = self.mins[i].byte_estimate(); - let w2 = self.maxs[i].byte_estimate(); - (n as u64 * (8 + 8 + 8 + 4 + w1 + w2)) as u64 - } - } -} - -impl Resettable for BinsXbinDim0 { - fn reset(&mut self) { - self.ts1s.clear(); - self.ts2s.clear(); - self.counts.clear(); - self.mins.clear(); - self.maxs.clear(); - self.avgs.clear(); - } -} - -impl AppendEmptyBin for BinsXbinDim0 { - fn append_empty_bin(&mut self, ts1: u64, ts2: u64) { - self.ts1s.push_back(ts1); - self.ts2s.push_back(ts2); - self.counts.push_back(0); - self.mins.push_back(NTY::zero_b()); - self.maxs.push_back(NTY::zero_b()); - self.avgs.push_back(0.); - } -} - -impl TimeBins for BinsXbinDim0 { - fn ts_min(&self) -> Option { - self.ts1s.front().map(Clone::clone) - } - - fn ts_max(&self) -> Option { - self.ts2s.back().map(Clone::clone) - } - - fn ts_min_max(&self) -> Option<(u64, u64)> { - if let (Some(min), Some(max)) = (self.ts1s.front().map(Clone::clone), self.ts2s.back().map(Clone::clone)) { - Some((min, max)) - } else { - None - } - } -} - -// TODO rename to BinsDim0CollectorOutput -#[derive(Debug, Serialize, Deserialize)] -pub struct BinsXbinDim0CollectedResult { - #[serde(rename = "tsAnchor")] - ts_anchor_sec: u64, - #[serde(rename = "ts1Ms")] - ts1_off_ms: VecDeque, - #[serde(rename = "ts2Ms")] - ts2_off_ms: VecDeque, - #[serde(rename = "ts1Ns")] - ts1_off_ns: VecDeque, - #[serde(rename = "ts2Ns")] - ts2_off_ns: VecDeque, - #[serde(rename = "counts")] - counts: VecDeque, - #[serde(rename = "mins")] - mins: VecDeque, - #[serde(rename = "maxs")] - maxs: VecDeque, - #[serde(rename = "avgs")] - avgs: VecDeque, - #[serde(rename = "rangeFinal", default, skip_serializing_if = "is_false")] - range_final: bool, - #[serde(rename = "timedOut", default, skip_serializing_if = "is_false")] - timed_out: bool, - #[serde(rename = "missingBins", default, skip_serializing_if = "CmpZero::is_zero")] - missing_bins: u32, - #[serde(rename = "continueAt", default, skip_serializing_if = "Option::is_none")] - continue_at: Option, - #[serde(rename = "finishedAt", default, skip_serializing_if = "Option::is_none")] - finished_at: Option, -} - -impl AsAnyRef for BinsXbinDim0CollectedResult -where - NTY: ScalarOps, -{ - fn as_any_ref(&self) -> &dyn Any { - self - } -} - -impl AsAnyMut for BinsXbinDim0CollectedResult -where - NTY: ScalarOps, -{ - fn as_any_mut(&mut self) -> &mut dyn Any { - self - } -} - -impl TypeName for BinsXbinDim0CollectedResult { - fn type_name(&self) -> String { - any::type_name::().into() - } -} - -impl WithLen for BinsXbinDim0CollectedResult { - fn len(&self) -> usize { - self.mins.len() - } -} - -impl CollectedDyn for BinsXbinDim0CollectedResult {} - -impl BinsXbinDim0CollectedResult { - pub fn ts_anchor_sec(&self) -> u64 { - self.ts_anchor_sec - } - - pub fn ts1_off_ms(&self) -> &VecDeque { - &self.ts1_off_ms - } - - pub fn ts2_off_ms(&self) -> &VecDeque { - &self.ts2_off_ms - } - - pub fn counts(&self) -> &VecDeque { - &self.counts - } - - pub fn range_final(&self) -> bool { - self.range_final - } - - pub fn missing_bins(&self) -> u32 { - self.missing_bins - } - - pub fn continue_at(&self) -> Option { - self.continue_at.clone() - } - - pub fn mins(&self) -> &VecDeque { - &self.mins - } - - pub fn maxs(&self) -> &VecDeque { - &self.maxs - } -} - -impl ToJsonResult for BinsXbinDim0CollectedResult { - fn to_json_value(&self) -> Result { - serde_json::to_value(self) - } -} - -#[derive(Debug)] -pub struct BinsXbinDim0Collector { - vals: BinsXbinDim0, - timed_out: bool, - range_final: bool, -} - -impl BinsXbinDim0Collector { - pub fn self_name() -> &'static str { - any::type_name::() - } - - pub fn new() -> Self { - Self { - vals: BinsXbinDim0::empty(), - timed_out: false, - range_final: false, - } - } -} - -impl WithLen for BinsXbinDim0Collector { - fn len(&self) -> usize { - self.vals.len() - } -} - -impl ByteEstimate for BinsXbinDim0Collector { - fn byte_estimate(&self) -> u64 { - self.vals.byte_estimate() - } -} - -impl CollectorTy for BinsXbinDim0Collector { - type Input = BinsXbinDim0; - type Output = BinsXbinDim0CollectedResult; - - fn ingest(&mut self, src: &mut Self::Input) { - trace!("\n\n----------- BinsXbinDim0Collector ingest\n{:?}\n\n", src); - // TODO could be optimized by non-contiguous container. - self.vals.ts1s.append(&mut src.ts1s); - self.vals.ts2s.append(&mut src.ts2s); - self.vals.counts.append(&mut src.counts); - self.vals.mins.append(&mut src.mins); - self.vals.maxs.append(&mut src.maxs); - self.vals.avgs.append(&mut src.avgs); - } - - fn set_range_complete(&mut self) { - self.range_final = true; - } - - fn set_timed_out(&mut self) { - self.timed_out = true; - } - - fn set_continue_at_here(&mut self) { - debug!("{}::set_continue_at_here", Self::self_name()); - // TODO for bins, do nothing: either we have all bins or not. - } - - fn result( - &mut self, - _range: std::option::Option, - binrange: Option, - ) -> Result { - let bin_count_exp = if let Some(r) = &binrange { - r.bin_count() as u32 - } else { - 0 - }; - let bin_count = self.vals.ts1s.len() as u32; - let (missing_bins, continue_at, finished_at) = if bin_count < bin_count_exp { - match self.vals.ts2s.back() { - Some(&k) => { - let missing_bins = bin_count_exp - bin_count; - let continue_at = IsoDateTime::from_ns_u64(k); - let u = k + (k - self.vals.ts1s.back().unwrap()) * missing_bins as u64; - let finished_at = IsoDateTime::from_ns_u64(u); - (missing_bins, Some(continue_at), Some(finished_at)) - } - None => { - warn!("can not determine continue-at parameters"); - (0, None, None) - } - } - } else { - (0, None, None) - }; - if self.vals.ts1s.as_slices().1.len() != 0 { - panic!(); - } - if self.vals.ts2s.as_slices().1.len() != 0 { - panic!(); - } - let tst1 = ts_offs_from_abs(self.vals.ts1s.as_slices().0); - let tst2 = ts_offs_from_abs_with_anchor(tst1.0, self.vals.ts2s.as_slices().0); - let counts = mem::replace(&mut self.vals.counts, VecDeque::new()); - let mins = mem::replace(&mut self.vals.mins, VecDeque::new()); - let maxs = mem::replace(&mut self.vals.maxs, VecDeque::new()); - let avgs = mem::replace(&mut self.vals.avgs, VecDeque::new()); - let ret = BinsXbinDim0CollectedResult:: { - ts_anchor_sec: tst1.0, - ts1_off_ms: tst1.1, - ts1_off_ns: tst1.2, - ts2_off_ms: tst2.0, - ts2_off_ns: tst2.1, - counts, - mins, - maxs, - avgs, - range_final: self.range_final, - timed_out: self.timed_out, - missing_bins, - continue_at, - finished_at, - }; - Ok(ret) - } -} - -impl CollectableType for BinsXbinDim0 { - type Collector = BinsXbinDim0Collector; - - fn new_collector() -> Self::Collector { - Self::Collector::new() - } -} - -#[derive(Debug)] -pub struct BinsXbinDim0Aggregator { - range: SeriesRange, - count: u64, - min: NTY, - max: NTY, - // Carry over to next bin: - avg: f32, - sumc: u64, - sum: f32, -} - -impl BinsXbinDim0Aggregator { - pub fn new(range: SeriesRange, _do_time_weight: bool) -> Self { - Self { - range, - count: 0, - min: NTY::zero_b(), - max: NTY::zero_b(), - avg: 0., - sumc: 0, - sum: 0f32, - } - } -} diff --git a/src/channelevents.rs b/src/channelevents.rs index 3ce2736..d696127 100644 --- a/src/channelevents.rs +++ b/src/channelevents.rs @@ -9,8 +9,6 @@ use items_0::container::ByteEstimate; use items_0::framable::FrameTypeInnerStatic; use items_0::isodate::IsoDateTime; use items_0::streamitem::ITEMS_2_CHANNEL_EVENTS_FRAME_TYPE_ID; -use items_0::timebin::TimeBinnableTy; -use items_0::timebin::TimeBinnerTy; use items_0::AsAnyMut; use items_0::AsAnyRef; use items_0::Empty; @@ -27,11 +25,9 @@ use serde::Serialize; use std::any; use std::any::Any; use std::collections::VecDeque; -use std::fmt; use std::time::Duration; use std::time::SystemTime; -#[allow(unused)] macro_rules! trace_ingest { ($($arg:tt)*) => ( if true { trace!($($arg)*); }) } // TODO maybe rename to ChannelStatus? @@ -62,7 +58,11 @@ pub struct ConnStatusEvent { impl ConnStatusEvent { pub fn new(ts: u64, status: ConnStatus) -> Self { let datetime = SystemTime::UNIX_EPOCH + Duration::from_millis(ts / 1000000); - Self { ts, datetime, status } + Self { + ts, + datetime, + status, + } } } @@ -135,7 +135,11 @@ pub struct ChannelStatusEvent { impl ChannelStatusEvent { pub fn new(ts: u64, status: ChannelStatus) -> Self { let datetime = SystemTime::UNIX_EPOCH + Duration::from_millis(ts / 1000000); - Self { ts, datetime, status } + Self { + ts, + datetime, + status, + } } } @@ -207,7 +211,6 @@ mod serde_channel_events { use crate::channelevents::ConnStatusEvent; use crate::eventsdim0::EventsDim0; use crate::eventsdim1::EventsDim1; - use crate::eventsxbindim0::EventsXbinDim0; use items_0::subfr::SubFrId; use netpod::log::*; use netpod::EnumVariant; @@ -258,73 +261,90 @@ mod serde_channel_events { where A: de::SeqAccess<'de>, { - let cty: &str = seq.next_element()?.ok_or_else(|| de::Error::missing_field("[0] cty"))?; - let nty: u32 = seq.next_element()?.ok_or_else(|| de::Error::missing_field("[1] nty"))?; + let cty: &str = seq + .next_element()? + .ok_or_else(|| de::Error::missing_field("[0] cty"))?; + let nty: u32 = seq + .next_element()? + .ok_or_else(|| de::Error::missing_field("[1] nty"))?; if cty == EventsDim0::::serde_id() { match nty { u8::SUB => { - let obj: EventsDim0 = - seq.next_element()?.ok_or_else(|| de::Error::missing_field("[2] obj"))?; + let obj: EventsDim0 = seq + .next_element()? + .ok_or_else(|| de::Error::missing_field("[2] obj"))?; Ok(EvBox(Box::new(obj))) } u16::SUB => { - let obj: EventsDim0 = - seq.next_element()?.ok_or_else(|| de::Error::missing_field("[2] obj"))?; + let obj: EventsDim0 = seq + .next_element()? + .ok_or_else(|| de::Error::missing_field("[2] obj"))?; Ok(EvBox(Box::new(obj))) } u32::SUB => { - let obj: EventsDim0 = - seq.next_element()?.ok_or_else(|| de::Error::missing_field("[2] obj"))?; + let obj: EventsDim0 = seq + .next_element()? + .ok_or_else(|| de::Error::missing_field("[2] obj"))?; Ok(EvBox(Box::new(obj))) } u64::SUB => { - let obj: EventsDim0 = - seq.next_element()?.ok_or_else(|| de::Error::missing_field("[2] obj"))?; + let obj: EventsDim0 = seq + .next_element()? + .ok_or_else(|| de::Error::missing_field("[2] obj"))?; Ok(EvBox(Box::new(obj))) } i8::SUB => { - let obj: EventsDim0 = - seq.next_element()?.ok_or_else(|| de::Error::missing_field("[2] obj"))?; + let obj: EventsDim0 = seq + .next_element()? + .ok_or_else(|| de::Error::missing_field("[2] obj"))?; Ok(EvBox(Box::new(obj))) } i16::SUB => { - let obj: EventsDim0 = - seq.next_element()?.ok_or_else(|| de::Error::missing_field("[2] obj"))?; + let obj: EventsDim0 = seq + .next_element()? + .ok_or_else(|| de::Error::missing_field("[2] obj"))?; Ok(EvBox(Box::new(obj))) } i32::SUB => { - let obj: EventsDim0 = - seq.next_element()?.ok_or_else(|| de::Error::missing_field("[2] obj"))?; + let obj: EventsDim0 = seq + .next_element()? + .ok_or_else(|| de::Error::missing_field("[2] obj"))?; Ok(EvBox(Box::new(obj))) } i64::SUB => { - let obj: EventsDim0 = - seq.next_element()?.ok_or_else(|| de::Error::missing_field("[2] obj"))?; + let obj: EventsDim0 = seq + .next_element()? + .ok_or_else(|| de::Error::missing_field("[2] obj"))?; Ok(EvBox(Box::new(obj))) } f32::SUB => { - let obj: EventsDim0 = - seq.next_element()?.ok_or_else(|| de::Error::missing_field("[2] obj"))?; + let obj: EventsDim0 = seq + .next_element()? + .ok_or_else(|| de::Error::missing_field("[2] obj"))?; Ok(EvBox(Box::new(obj))) } f64::SUB => { - let obj: EventsDim0 = - seq.next_element()?.ok_or_else(|| de::Error::missing_field("[2] obj"))?; + let obj: EventsDim0 = seq + .next_element()? + .ok_or_else(|| de::Error::missing_field("[2] obj"))?; Ok(EvBox(Box::new(obj))) } bool::SUB => { - let obj: EventsDim0 = - seq.next_element()?.ok_or_else(|| de::Error::missing_field("[2] obj"))?; + let obj: EventsDim0 = seq + .next_element()? + .ok_or_else(|| de::Error::missing_field("[2] obj"))?; Ok(EvBox(Box::new(obj))) } String::SUB => { - let obj: EventsDim0 = - seq.next_element()?.ok_or_else(|| de::Error::missing_field("[2] obj"))?; + let obj: EventsDim0 = seq + .next_element()? + .ok_or_else(|| de::Error::missing_field("[2] obj"))?; Ok(EvBox(Box::new(obj))) } EnumVariant::SUB => { - let obj: EventsDim0 = - seq.next_element()?.ok_or_else(|| de::Error::missing_field("[2] obj"))?; + let obj: EventsDim0 = seq + .next_element()? + .ok_or_else(|| de::Error::missing_field("[2] obj"))?; Ok(EvBox(Box::new(obj))) } _ => { @@ -335,85 +355,75 @@ mod serde_channel_events { } else if cty == EventsDim1::::serde_id() { match nty { u8::SUB => { - let obj: EventsDim1 = - seq.next_element()?.ok_or_else(|| de::Error::missing_field("[2] obj"))?; + let obj: EventsDim1 = seq + .next_element()? + .ok_or_else(|| de::Error::missing_field("[2] obj"))?; Ok(EvBox(Box::new(obj))) } u16::SUB => { - let obj: EventsDim1 = - seq.next_element()?.ok_or_else(|| de::Error::missing_field("[2] obj"))?; + let obj: EventsDim1 = seq + .next_element()? + .ok_or_else(|| de::Error::missing_field("[2] obj"))?; Ok(EvBox(Box::new(obj))) } u32::SUB => { - let obj: EventsDim1 = - seq.next_element()?.ok_or_else(|| de::Error::missing_field("[2] obj"))?; + let obj: EventsDim1 = seq + .next_element()? + .ok_or_else(|| de::Error::missing_field("[2] obj"))?; Ok(EvBox(Box::new(obj))) } u64::SUB => { - let obj: EventsDim1 = - seq.next_element()?.ok_or_else(|| de::Error::missing_field("[2] obj"))?; + let obj: EventsDim1 = seq + .next_element()? + .ok_or_else(|| de::Error::missing_field("[2] obj"))?; Ok(EvBox(Box::new(obj))) } i8::SUB => { - let obj: EventsDim1 = - seq.next_element()?.ok_or_else(|| de::Error::missing_field("[2] obj"))?; + let obj: EventsDim1 = seq + .next_element()? + .ok_or_else(|| de::Error::missing_field("[2] obj"))?; Ok(EvBox(Box::new(obj))) } i16::SUB => { - let obj: EventsDim1 = - seq.next_element()?.ok_or_else(|| de::Error::missing_field("[2] obj"))?; + let obj: EventsDim1 = seq + .next_element()? + .ok_or_else(|| de::Error::missing_field("[2] obj"))?; Ok(EvBox(Box::new(obj))) } i32::SUB => { - let obj: EventsDim1 = - seq.next_element()?.ok_or_else(|| de::Error::missing_field("[2] obj"))?; + let obj: EventsDim1 = seq + .next_element()? + .ok_or_else(|| de::Error::missing_field("[2] obj"))?; Ok(EvBox(Box::new(obj))) } i64::SUB => { - let obj: EventsDim1 = - seq.next_element()?.ok_or_else(|| de::Error::missing_field("[2] obj"))?; + let obj: EventsDim1 = seq + .next_element()? + .ok_or_else(|| de::Error::missing_field("[2] obj"))?; Ok(EvBox(Box::new(obj))) } f32::SUB => { - let obj: EventsDim1 = - seq.next_element()?.ok_or_else(|| de::Error::missing_field("[2] obj"))?; + let obj: EventsDim1 = seq + .next_element()? + .ok_or_else(|| de::Error::missing_field("[2] obj"))?; Ok(EvBox(Box::new(obj))) } f64::SUB => { - let obj: EventsDim1 = - seq.next_element()?.ok_or_else(|| de::Error::missing_field("[2] obj"))?; + let obj: EventsDim1 = seq + .next_element()? + .ok_or_else(|| de::Error::missing_field("[2] obj"))?; Ok(EvBox(Box::new(obj))) } bool::SUB => { - let obj: EventsDim1 = - seq.next_element()?.ok_or_else(|| de::Error::missing_field("[2] obj"))?; + let obj: EventsDim1 = seq + .next_element()? + .ok_or_else(|| de::Error::missing_field("[2] obj"))?; Ok(EvBox(Box::new(obj))) } String::SUB => { - let obj: EventsDim1 = - seq.next_element()?.ok_or_else(|| de::Error::missing_field("[2] obj"))?; - Ok(EvBox(Box::new(obj))) - } - _ => { - error!("TODO serde cty {cty} nty {nty}"); - Err(de::Error::custom(&format!("unknown nty {nty}"))) - } - } - } else if cty == EventsXbinDim0::::serde_id() { - match nty { - f32::SUB => { - let obj: EventsXbinDim0 = - seq.next_element()?.ok_or_else(|| de::Error::missing_field("[2] obj"))?; - Ok(EvBox(Box::new(obj))) - } - f64::SUB => { - let obj: EventsXbinDim0 = - seq.next_element()?.ok_or_else(|| de::Error::missing_field("[2] obj"))?; - Ok(EvBox(Box::new(obj))) - } - bool::SUB => { - let obj: EventsXbinDim0 = - seq.next_element()?.ok_or_else(|| de::Error::missing_field("[2] obj"))?; + let obj: EventsDim1 = seq + .next_element()? + .ok_or_else(|| de::Error::missing_field("[2] obj"))?; Ok(EvBox(Box::new(obj))) } _ => { @@ -422,7 +432,7 @@ mod serde_channel_events { } } } else { - error!("TODO serde cty {cty} nty {nty}"); + error!("unsupported serde cty {cty} nty {nty}"); Err(de::Error::custom(&format!("unknown cty {cty}"))) } } @@ -448,7 +458,9 @@ mod serde_channel_events { ChannelEvents::Events(obj) => { serializer.serialize_newtype_variant(name, 0, vars[0], &EvRef(obj.as_ref())) } - ChannelEvents::Status(val) => serializer.serialize_newtype_variant(name, 1, vars[1], val), + ChannelEvents::Status(val) => { + serializer.serialize_newtype_variant(name, 1, vars[1], val) + } } } } @@ -491,7 +503,10 @@ mod serde_channel_events { } else if val == vars[1] { Ok(VarId::Status) } else { - Err(de::Error::unknown_variant(val, ChannelEventsVis::allowed_variants())) + Err(de::Error::unknown_variant( + val, + ChannelEventsVis::allowed_variants(), + )) } } } @@ -870,7 +885,11 @@ impl Events for ChannelEvents { } } - fn drain_into_evs(&mut self, dst: &mut dyn Events, range: (usize, usize)) -> Result<(), MergeError> { + fn drain_into_evs( + &mut self, + dst: &mut dyn Events, + range: (usize, usize), + ) -> Result<(), MergeError> { let dst2 = if let Some(x) = dst.as_any_mut().downcast_mut::() { // debug!("unwrapped dst ChannelEvents as well"); x @@ -931,7 +950,9 @@ impl Events for ChannelEvents { fn to_min_max_avg(&mut self) -> Box { match self { - ChannelEvents::Events(item) => Box::new(ChannelEvents::Events(Events::to_min_max_avg(item))), + ChannelEvents::Events(item) => { + Box::new(ChannelEvents::Events(Events::to_min_max_avg(item))) + } ChannelEvents::Status(item) => Box::new(ChannelEvents::Status(item.take())), } } @@ -1075,9 +1096,11 @@ impl CollectorDyn for ChannelEventsCollector { if let Some(item) = item.as_any_mut().downcast_mut::() { match item { ChannelEvents::Events(item) => { - let coll = self - .coll - .get_or_insert_with(|| item.as_ref().as_collectable_with_default_ref().new_collector()); + let coll = self.coll.get_or_insert_with(|| { + item.as_ref() + .as_collectable_with_default_ref() + .new_collector() + }); coll.ingest(item.as_collectable_with_default_mut()); } ChannelEvents::Status(_) => { diff --git a/src/eventsdim1.rs b/src/eventsdim1.rs index 9c5a9b5..88a4358 100644 --- a/src/eventsdim1.rs +++ b/src/eventsdim1.rs @@ -1,5 +1,3 @@ -use crate::binsdim0::BinsDim0; -use crate::eventsxbindim0::EventsXbinDim0; use crate::IsoDateTime; use daqbuf_err as err; use err::Error; @@ -32,7 +30,6 @@ use std::any; use std::any::Any; use std::collections::VecDeque; use std::fmt; -use std::marker::PhantomData; use std::mem; #[allow(unused)] @@ -239,7 +236,11 @@ pub struct EventsDim1CollectorOutput { range_final: bool, #[serde(rename = "timedOut", default, skip_serializing_if = "is_false")] timed_out: bool, - #[serde(rename = "continueAt", default, skip_serializing_if = "Option::is_none")] + #[serde( + rename = "continueAt", + default, + skip_serializing_if = "Option::is_none" + )] continue_at: Option, } @@ -517,7 +518,11 @@ impl Events for EventsDim1 { let tss = self.tss.drain(..n1).collect(); let pulses = self.pulses.drain(..n1).collect(); let values = self.values.drain(..n1).collect(); - let ret = Self { tss, pulses, values }; + let ret = Self { + tss, + pulses, + values, + }; Box::new(ret) } @@ -525,7 +530,11 @@ impl Events for EventsDim1 { Box::new(Self::empty()) } - fn drain_into_evs(&mut self, dst: &mut dyn Events, range: (usize, usize)) -> Result<(), MergeError> { + fn drain_into_evs( + &mut self, + dst: &mut dyn Events, + range: (usize, usize), + ) -> Result<(), MergeError> { // TODO as_any and as_any_mut are declared on unrelated traits. Simplify. if let Some(dst) = dst.as_any_mut().downcast_mut::() { // TODO make it harder to forget new members when the struct may get modified in the future @@ -609,33 +618,7 @@ impl Events for EventsDim1 { } fn to_min_max_avg(&mut self) -> Box { - let mins = self - .values - .iter() - .map(|x| STY::find_vec_min(x)) - .map(|x| x.unwrap_or_else(|| STY::zero_b())) - .collect(); - let maxs = self - .values - .iter() - .map(|x| STY::find_vec_max(x)) - .map(|x| x.unwrap_or_else(|| STY::zero_b())) - .collect(); - let avgs = self - .values - .iter() - .map(|x| STY::avg_vec(x)) - .map(|x| x.unwrap_or_else(|| STY::zero_b())) - .map(|x| x.as_prim_f32_b()) - .collect(); - let item = EventsXbinDim0 { - tss: mem::replace(&mut self.tss, VecDeque::new()), - pulses: mem::replace(&mut self.pulses, VecDeque::new()), - mins, - maxs, - avgs, - }; - Box::new(item) + panic!("discontinued support for EventsDim1") } fn to_json_string(&self) -> String { diff --git a/src/eventsxbindim0.rs b/src/eventsxbindim0.rs deleted file mode 100644 index c2256ea..0000000 --- a/src/eventsxbindim0.rs +++ /dev/null @@ -1,779 +0,0 @@ -use crate::binsxbindim0::BinsXbinDim0; -use crate::IsoDateTime; -use daqbuf_err as err; -use err::Error; -use items_0::collect_s::CollectableDyn; -use items_0::collect_s::CollectableType; -use items_0::collect_s::CollectedDyn; -use items_0::collect_s::CollectorTy; -use items_0::collect_s::ToJsonResult; -use items_0::container::ByteEstimate; -use items_0::overlap::HasTimestampDeque; -use items_0::scalar_ops::ScalarOps; -use items_0::timebin::TimeBinnerTy; -use items_0::AsAnyMut; -use items_0::AsAnyRef; -use items_0::Empty; -use items_0::Events; -use items_0::EventsNonObj; -use items_0::MergeError; -use items_0::TypeName; -use items_0::WithLen; -use netpod::is_false; -use netpod::log::*; -use netpod::range::evrange::NanoRange; -use netpod::range::evrange::SeriesRange; -use netpod::timeunits::SEC; -use netpod::BinnedRangeEnum; -use serde::Deserialize; -use serde::Serialize; -use std::any; -use std::any::Any; -use std::collections::VecDeque; -use std::fmt; -use std::mem; - -#[allow(unused)] -macro_rules! trace_ingest { - ($($arg:tt)*) => {}; - ($($arg:tt)*) => { trace!($($arg)*) }; -} - -#[allow(unused)] -macro_rules! trace2 { - ($($arg:tt)*) => {}; - ($($arg:tt)*) => { trace!($($arg)*) }; -} - -#[derive(Clone, PartialEq, Serialize, Deserialize)] -pub struct EventsXbinDim0 { - pub tss: VecDeque, - pub pulses: VecDeque, - pub mins: VecDeque, - pub maxs: VecDeque, - pub avgs: VecDeque, - // TODO maybe add variance? -} - -impl EventsXbinDim0 { - #[inline(always)] - pub fn push(&mut self, ts: u64, pulse: u64, min: NTY, max: NTY, avg: f32) { - self.tss.push_back(ts); - self.pulses.push_back(pulse); - self.mins.push_back(min); - self.maxs.push_back(max); - self.avgs.push_back(avg); - } - - #[inline(always)] - pub fn push_front(&mut self, ts: u64, pulse: u64, min: NTY, max: NTY, avg: f32) { - self.tss.push_front(ts); - self.pulses.push_front(pulse); - self.mins.push_front(min); - self.maxs.push_front(max); - self.avgs.push_front(avg); - } - - pub fn serde_id() -> &'static str { - "EventsXbinDim0" - } -} - -impl TypeName for EventsXbinDim0 { - fn type_name(&self) -> String { - any::type_name::().into() - } -} - -impl fmt::Debug for EventsXbinDim0 -where - STY: fmt::Debug, -{ - fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - if false { - write!( - fmt, - "{} {{ count {} ts {:?} vals {:?} }}", - self.type_name(), - self.tss.len(), - self.tss.iter().map(|x| x / SEC).collect::>(), - self.avgs, - ) - } else { - write!( - fmt, - "{} {{ count {} ts {:?} .. {:?} vals {:?} .. {:?} }}", - self.type_name(), - self.tss.len(), - self.tss.front().map(|x| x / SEC), - self.tss.back().map(|x| x / SEC), - self.avgs.front(), - self.avgs.back(), - ) - } - } -} - -impl ByteEstimate for EventsXbinDim0 { - fn byte_estimate(&self) -> u64 { - let stylen = mem::size_of::(); - (self.len() * (8 + 8 + 2 * stylen + 4)) as u64 - } -} - -impl Empty for EventsXbinDim0 { - fn empty() -> Self { - Self { - tss: VecDeque::new(), - pulses: VecDeque::new(), - mins: VecDeque::new(), - maxs: VecDeque::new(), - avgs: VecDeque::new(), - } - } -} - -impl AsAnyRef for EventsXbinDim0 -where - STY: ScalarOps, -{ - fn as_any_ref(&self) -> &dyn Any { - self - } -} - -impl AsAnyMut for EventsXbinDim0 -where - STY: ScalarOps, -{ - fn as_any_mut(&mut self) -> &mut dyn Any { - self - } -} - -impl WithLen for EventsXbinDim0 { - fn len(&self) -> usize { - self.tss.len() - } -} - -impl HasTimestampDeque for EventsXbinDim0 { - fn timestamp_min(&self) -> Option { - self.tss.front().map(|x| *x) - } - - fn timestamp_max(&self) -> Option { - self.tss.back().map(|x| *x) - } - - fn pulse_min(&self) -> Option { - self.pulses.front().map(|x| *x) - } - - fn pulse_max(&self) -> Option { - self.pulses.back().map(|x| *x) - } -} - -impl EventsNonObj for EventsXbinDim0 { - fn into_tss_pulses(self: Box) -> (VecDeque, VecDeque) { - info!( - "EventsXbinDim0::into_tss_pulses len {} len {}", - self.tss.len(), - self.pulses.len() - ); - (self.tss, self.pulses) - } -} - -impl Events for EventsXbinDim0 { - fn verify(&self) -> bool { - let mut good = true; - let mut ts_max = 0; - for ts in &self.tss { - let ts = *ts; - if ts < ts_max { - good = false; - error!("unordered event data ts {} ts_max {}", ts, ts_max); - } - ts_max = ts_max.max(ts); - } - good - } - - fn output_info(&self) -> String { - let n2 = self.tss.len().max(1) - 1; - format!( - "EventsXbinDim0OutputInfo {{ len {}, ts_min {}, ts_max {} }}", - self.tss.len(), - self.tss.get(0).map_or(-1i64, |&x| x as i64), - self.tss.get(n2).map_or(-1i64, |&x| x as i64), - ) - } - - fn as_collectable_mut(&mut self) -> &mut dyn CollectableDyn { - self - } - - fn as_collectable_with_default_ref(&self) -> &dyn CollectableDyn { - self - } - - fn as_collectable_with_default_mut(&mut self) -> &mut dyn CollectableDyn { - self - } - - fn take_new_events_until_ts(&mut self, ts_end: u64) -> Box { - // TODO improve the search - let n1 = self.tss.iter().take_while(|&&x| x <= ts_end).count(); - let tss = self.tss.drain(..n1).collect(); - let pulses = self.pulses.drain(..n1).collect(); - let mins = self.mins.drain(..n1).collect(); - let maxs = self.maxs.drain(..n1).collect(); - let avgs = self.avgs.drain(..n1).collect(); - let ret = Self { - tss, - pulses, - mins, - maxs, - avgs, - }; - Box::new(ret) - } - - fn new_empty_evs(&self) -> Box { - Box::new(Self::empty()) - } - - fn drain_into_evs(&mut self, dst: &mut dyn Events, range: (usize, usize)) -> Result<(), MergeError> { - // TODO as_any and as_any_mut are declared on unrelated traits. Simplify. - if let Some(dst) = dst.as_any_mut().downcast_mut::() { - // TODO make it harder to forget new members when the struct may get modified in the future - let r = range.0..range.1; - dst.tss.extend(self.tss.drain(r.clone())); - dst.pulses.extend(self.pulses.drain(r.clone())); - dst.mins.extend(self.mins.drain(r.clone())); - dst.maxs.extend(self.maxs.drain(r.clone())); - dst.avgs.extend(self.avgs.drain(r.clone())); - Ok(()) - } else { - error!("downcast to {} FAILED", self.type_name()); - Err(MergeError::NotCompatible) - } - } - - fn find_lowest_index_gt_evs(&self, ts: u64) -> Option { - for (i, &m) in self.tss.iter().enumerate() { - if m > ts { - return Some(i); - } - } - None - } - - fn find_lowest_index_ge_evs(&self, ts: u64) -> Option { - for (i, &m) in self.tss.iter().enumerate() { - if m >= ts { - return Some(i); - } - } - None - } - - fn find_highest_index_lt_evs(&self, ts: u64) -> Option { - for (i, &m) in self.tss.iter().enumerate().rev() { - if m < ts { - return Some(i); - } - } - None - } - - fn ts_min(&self) -> Option { - self.tss.front().map(|&x| x) - } - - fn ts_max(&self) -> Option { - self.tss.back().map(|&x| x) - } - - fn partial_eq_dyn(&self, other: &dyn Events) -> bool { - if let Some(other) = other.as_any_ref().downcast_ref::() { - self == other - } else { - false - } - } - - fn serde_id(&self) -> &'static str { - Self::serde_id() - } - - fn nty_id(&self) -> u32 { - STY::SUB - } - - fn clone_dyn(&self) -> Box { - Box::new(self.clone()) - } - - fn tss(&self) -> &VecDeque { - &self.tss - } - - fn pulses(&self) -> &VecDeque { - &self.pulses - } - - fn frame_type_id(&self) -> u32 { - error!("TODO frame_type_id should not be called"); - // TODO make more nice - panic!() - } - - fn to_min_max_avg(&mut self) -> Box { - let dst = Self { - tss: mem::replace(&mut self.tss, Default::default()), - pulses: mem::replace(&mut self.pulses, Default::default()), - mins: mem::replace(&mut self.mins, Default::default()), - maxs: mem::replace(&mut self.maxs, Default::default()), - avgs: mem::replace(&mut self.avgs, Default::default()), - }; - Box::new(dst) - } - - fn to_json_string(&self) -> String { - todo!() - } - - fn to_json_vec_u8(&self) -> Vec { - todo!() - } - - fn to_cbor_vec_u8(&self) -> Vec { - todo!() - } - - fn clear(&mut self) { - self.tss.clear(); - self.pulses.clear(); - self.mins.clear(); - self.maxs.clear(); - self.avgs.clear(); - } - - fn to_dim0_f32_for_binning(&self) -> Box { - todo!("{}::to_dim0_f32_for_binning", self.type_name()) - } - - fn to_container_events(&self) -> Box { - todo!("{}::to_container_events", self.type_name()) - } -} - -#[derive(Debug)] -pub struct EventsXbinDim0Aggregator -where - STY: ScalarOps, -{ - range: SeriesRange, - /// Number of events which actually fall in this bin. - count: u64, - min: STY, - max: STY, - /// Number of times we accumulated to the sum of this bin. - sumc: u64, - sum: f32, - int_ts: u64, - last_ts: u64, - last_vals: Option<(STY, STY, f32)>, - did_min_max: bool, - do_time_weight: bool, - events_ignored_count: u64, -} - -impl EventsXbinDim0Aggregator -where - STY: ScalarOps, -{ - pub fn type_name() -> &'static str { - std::any::type_name::() - } - - pub fn new(range: SeriesRange, do_time_weight: bool) -> Self { - let int_ts = range.beg_u64(); - Self { - range, - did_min_max: false, - count: 0, - min: STY::zero_b(), - max: STY::zero_b(), - sumc: 0, - sum: 0f32, - int_ts, - last_ts: 0, - last_vals: None, - events_ignored_count: 0, - do_time_weight, - } - } - - fn apply_min_max(&mut self, min: &STY, max: &STY) { - if self.did_min_max != (self.sumc > 0) { - panic!("logic error apply_min_max {} {}", self.did_min_max, self.sumc); - } - if self.sumc == 0 { - self.did_min_max = true; - self.min = min.clone(); - self.max = max.clone(); - } else { - if *min < self.min { - self.min = min.clone(); - } - if *max > self.max { - self.max = max.clone(); - } - } - } - - fn apply_event_unweight(&mut self, avg: f32, min: STY, max: STY) { - //debug!("apply_event_unweight"); - self.apply_min_max(&min, &max); - self.sumc += 1; - let vf = avg; - if vf.is_nan() { - } else { - self.sum += vf; - } - } - - // Only integrate, do not count because it is used even if the event does not fall into current bin. - fn apply_event_time_weight(&mut self, px: u64) { - trace_ingest!( - "apply_event_time_weight px {} count {} sumc {} events_ignored_count {}", - px, - self.count, - self.sumc, - self.events_ignored_count - ); - if let Some((min, max, avg)) = self.last_vals.as_ref() { - let vf = *avg; - { - let min = min.clone(); - let max = max.clone(); - self.apply_min_max(&min, &max); - } - self.sumc += 1; - let w = (px - self.int_ts) as f32 * 1e-9; - if vf.is_nan() { - } else { - self.sum += vf * w; - } - self.int_ts = px; - } else { - debug!("apply_event_time_weight NO VALUE"); - } - } - - fn ingest_unweight(&mut self, item: &EventsXbinDim0) { - /*for i1 in 0..item.tss.len() { - let ts = item.tss[i1]; - let avg = item.avgs[i1]; - let min = item.mins[i1].clone(); - let max = item.maxs[i1].clone(); - if ts < self.range.beg { - } else if ts >= self.range.end { - } else { - self.apply_event_unweight(avg, min, max); - } - }*/ - todo!() - } - - fn ingest_time_weight(&mut self, item: &EventsXbinDim0) { - trace!( - "{} ingest_time_weight range {:?} last_ts {:?} int_ts {:?}", - Self::type_name(), - self.range, - self.last_ts, - self.int_ts - ); - let range_beg = self.range.beg_u64(); - let range_end = self.range.end_u64(); - for (((&ts, min), max), avg) in item - .tss - .iter() - .zip(item.mins.iter()) - .zip(item.maxs.iter()) - .zip(item.avgs.iter()) - { - if ts >= range_end { - self.events_ignored_count += 1; - // TODO break early when tests pass. - //break; - } else if ts >= range_beg { - self.apply_event_time_weight(ts); - self.count += 1; - self.last_ts = ts; - self.last_vals = Some((min.clone(), max.clone(), avg.clone())); - } else { - self.events_ignored_count += 1; - self.last_ts = ts; - self.last_vals = Some((min.clone(), max.clone(), avg.clone())); - } - } - } - - fn result_reset_unweight(&mut self, range: SeriesRange) -> BinsXbinDim0 { - /*let avg = if self.sumc == 0 { - 0f32 - } else { - self.sum / self.sumc as f32 - }; - let ret = BinsXbinDim0::from_content( - [self.range.beg].into(), - [self.range.end].into(), - [self.count].into(), - [self.min.clone()].into(), - [self.max.clone()].into(), - [avg].into(), - ); - self.int_ts = range.beg; - self.range = range; - self.sum = 0f32; - self.sumc = 0; - self.did_min_max = false; - self.min = NTY::zero_b(); - self.max = NTY::zero_b(); - ret*/ - todo!() - } - - fn result_reset_time_weight(&mut self, range: SeriesRange) -> BinsXbinDim0 { - trace!("{} result_reset_time_weight", Self::type_name()); - // TODO check callsite for correct expand status. - if self.range.is_time() { - self.apply_event_time_weight(self.range.end_u64()); - } else { - error!("TODO result_reset_time_weight"); - err::todoval() - } - let range_beg = self.range.beg_u64(); - let range_end = self.range.end_u64(); - let (min, max, avg) = if self.sumc > 0 { - let avg = self.sum / (self.range.delta_u64() as f32 * 1e-9); - (self.min.clone(), self.max.clone(), avg) - } else { - let (min, max, avg) = match &self.last_vals { - Some((min, max, avg)) => { - warn!("\n\n\n!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! SHOULD ALWAYS HAVE ACCUMULATED IN THIS CASE"); - (min.clone(), max.clone(), avg.clone()) - } - None => (STY::zero_b(), STY::zero_b(), 0.), - }; - (min, max, avg) - }; - let ret = BinsXbinDim0::from_content( - [range_beg].into(), - [range_end].into(), - [self.count].into(), - [min.clone()].into(), - [max.clone()].into(), - [avg].into(), - ); - self.int_ts = range.beg_u64(); - self.range = range; - self.count = 0; - self.sumc = 0; - self.sum = 0.; - self.did_min_max = false; - self.min = STY::zero_b(); - self.max = STY::zero_b(); - ret - } -} - -#[derive(Debug, Serialize, Deserialize)] -pub struct EventsXbinDim0CollectorOutput { - #[serde(rename = "tsAnchor")] - ts_anchor_sec: u64, - #[serde(rename = "tsMs")] - ts_off_ms: VecDeque, - #[serde(rename = "tsNs")] - ts_off_ns: VecDeque, - #[serde(rename = "pulseAnchor")] - pulse_anchor: u64, - #[serde(rename = "pulseOff")] - pulse_off: VecDeque, - #[serde(rename = "mins")] - mins: VecDeque, - #[serde(rename = "maxs")] - maxs: VecDeque, - #[serde(rename = "avgs")] - avgs: VecDeque, - #[serde(rename = "rangeFinal", default, skip_serializing_if = "is_false")] - range_final: bool, - #[serde(rename = "timedOut", default, skip_serializing_if = "is_false")] - timed_out: bool, - #[serde(rename = "continueAt", default, skip_serializing_if = "Option::is_none")] - continue_at: Option, -} - -impl AsAnyRef for EventsXbinDim0CollectorOutput -where - NTY: ScalarOps, -{ - fn as_any_ref(&self) -> &dyn Any { - self - } -} - -impl AsAnyMut for EventsXbinDim0CollectorOutput -where - NTY: ScalarOps, -{ - fn as_any_mut(&mut self) -> &mut dyn Any { - self - } -} - -impl TypeName for EventsXbinDim0CollectorOutput { - fn type_name(&self) -> String { - any::type_name::().into() - } -} - -impl WithLen for EventsXbinDim0CollectorOutput { - fn len(&self) -> usize { - self.mins.len() - } -} - -impl ToJsonResult for EventsXbinDim0CollectorOutput -where - NTY: ScalarOps, -{ - fn to_json_value(&self) -> Result { - serde_json::to_value(self) - } -} - -impl CollectedDyn for EventsXbinDim0CollectorOutput where NTY: ScalarOps {} - -#[derive(Debug)] -pub struct EventsXbinDim0Collector { - vals: EventsXbinDim0, - range_final: bool, - timed_out: bool, - needs_continue_at: bool, -} - -impl EventsXbinDim0Collector { - pub fn self_name() -> &'static str { - any::type_name::() - } - - pub fn new() -> Self { - Self { - range_final: false, - timed_out: false, - vals: EventsXbinDim0::empty(), - needs_continue_at: false, - } - } -} - -impl WithLen for EventsXbinDim0Collector { - fn len(&self) -> usize { - WithLen::len(&self.vals) - } -} - -impl ByteEstimate for EventsXbinDim0Collector { - fn byte_estimate(&self) -> u64 { - ByteEstimate::byte_estimate(&self.vals) - } -} - -impl CollectorTy for EventsXbinDim0Collector -where - NTY: ScalarOps, -{ - type Input = EventsXbinDim0; - type Output = EventsXbinDim0CollectorOutput; - - fn ingest(&mut self, src: &mut Self::Input) { - self.vals.tss.append(&mut src.tss); - self.vals.pulses.append(&mut src.pulses); - self.vals.mins.append(&mut src.mins); - self.vals.maxs.append(&mut src.maxs); - self.vals.avgs.append(&mut src.avgs); - } - - fn set_range_complete(&mut self) { - self.range_final = true; - } - - fn set_timed_out(&mut self) { - self.timed_out = true; - } - - fn set_continue_at_here(&mut self) { - self.needs_continue_at = true; - } - - fn result( - &mut self, - range: Option, - _binrange: Option, - ) -> Result { - /*use std::mem::replace; - let continue_at = if self.timed_out { - if let Some(ts) = self.vals.tss.back() { - Some(IsoDateTime::from_u64(*ts + netpod::timeunits::MS)) - } else { - if let Some(range) = &range { - Some(IsoDateTime::from_u64(range.beg + netpod::timeunits::SEC)) - } else { - warn!("can not determine continue-at parameters"); - None - } - } - } else { - None - }; - let mins = replace(&mut self.vals.mins, VecDeque::new()); - let maxs = replace(&mut self.vals.maxs, VecDeque::new()); - let avgs = replace(&mut self.vals.avgs, VecDeque::new()); - self.vals.tss.make_contiguous(); - self.vals.pulses.make_contiguous(); - let tst = crate::ts_offs_from_abs(self.vals.tss.as_slices().0); - let (pulse_anchor, pulse_off) = crate::pulse_offs_from_abs(&self.vals.pulses.as_slices().0); - let ret = Self::Output { - ts_anchor_sec: tst.0, - ts_off_ms: tst.1, - ts_off_ns: tst.2, - pulse_anchor, - pulse_off, - mins, - maxs, - avgs, - range_final: self.range_final, - timed_out: self.timed_out, - continue_at, - }; - Ok(ret)*/ - todo!() - } -} - -impl CollectableType for EventsXbinDim0 -where - NTY: ScalarOps, -{ - type Collector = EventsXbinDim0Collector; - - fn new_collector() -> Self::Collector { - Self::Collector::new() - } -} diff --git a/src/items_2.rs b/src/items_2.rs index 300d198..a7aedf5 100644 --- a/src/items_2.rs +++ b/src/items_2.rs @@ -1,14 +1,12 @@ pub mod accounting; pub mod binning; pub mod binsdim0; -pub mod binsxbindim0; pub mod channelevents; pub mod empty; pub mod eventfull; pub mod eventsdim0; pub mod eventsdim0enum; pub mod eventsdim1; -pub mod eventsxbindim0; pub mod framable; pub mod frame; pub mod inmem; @@ -26,11 +24,9 @@ use futures_util::Stream; use items_0::isodate::IsoDateTime; use items_0::streamitem::Sitemty; use items_0::transform::EventTransform; -use items_0::Empty; use items_0::Events; use items_0::MergeError; use merger::Mergeable; -use netpod::range::evrange::SeriesRange; use netpod::timeunits::*; use std::collections::VecDeque; use std::fmt; @@ -47,7 +43,10 @@ pub fn ts_offs_from_abs(tss: &[u64]) -> (u64, VecDeque, VecDeque) { (ts_anchor_sec, ts_off_ms, ts_off_ns) } -pub fn ts_offs_from_abs_with_anchor(ts_anchor_sec: u64, tss: &[u64]) -> (VecDeque, VecDeque) { +pub fn ts_offs_from_abs_with_anchor( + ts_anchor_sec: u64, + tss: &[u64], +) -> (VecDeque, VecDeque) { let ts_anchor_ns = ts_anchor_sec * SEC; let ts_off_ms: VecDeque<_> = tss.iter().map(|&k| (k - ts_anchor_ns) / MS).collect(); let ts_off_ns = tss @@ -173,6 +172,12 @@ impl Mergeable for Box { } } -pub trait ChannelEventsInput: Stream> + EventTransform + Send {} +pub trait ChannelEventsInput: + Stream> + EventTransform + Send +{ +} -impl ChannelEventsInput for T where T: Stream> + EventTransform + Send {} +impl ChannelEventsInput for T where + T: Stream> + EventTransform + Send +{ +} diff --git a/src/merger.rs b/src/merger.rs index 48f83a6..3c70219 100644 --- a/src/merger.rs +++ b/src/merger.rs @@ -26,23 +26,11 @@ use std::task::Poll; const OUT_MAX_BYTES: u64 = 1024 * 200; const DO_DETECT_NON_MONO: bool = true; -#[allow(unused)] -macro_rules! trace2 { - ($($arg:tt)*) => {}; - ($($arg:tt)*) => { trace!($($arg)*) }; -} +macro_rules! trace2 { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) } -#[allow(unused)] -macro_rules! trace3 { - ($($arg:tt)*) => {}; - ($($arg:tt)*) => { trace!($($arg)*) }; -} +macro_rules! trace3 { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) } -#[allow(unused)] -macro_rules! trace4 { - ($($arg:tt)*) => {}; - ($($arg:tt)*) => { trace!($($arg)*) }; -} +macro_rules! trace4 { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) } pub trait Mergeable: fmt::Debug + WithLen + ByteEstimate + Unpin { fn ts_min(&self) -> Option; @@ -311,7 +299,8 @@ where } StreamItem::Stats(item) => { // TODO limit queue length - self.out_of_band_queue.push_back(Ok(StreamItem::Stats(item))); + self.out_of_band_queue + .push_back(Ok(StreamItem::Stats(item))); continue; } }, @@ -337,7 +326,10 @@ where } } - fn poll3(mut self: Pin<&mut Self>, cx: &mut Context) -> ControlFlow>>> { + fn poll3( + mut self: Pin<&mut Self>, + cx: &mut Context, + ) -> ControlFlow>>> { use ControlFlow::*; use Poll::*; trace4!("poll3"); @@ -364,9 +356,17 @@ where } } if let Some(o) = self.out.as_ref() { - if o.len() >= self.out_max_len || o.byte_estimate() >= OUT_MAX_BYTES || self.do_clear_out || last_emit { + if o.len() >= self.out_max_len + || o.byte_estimate() >= OUT_MAX_BYTES + || self.do_clear_out + || last_emit + { if o.len() > self.out_max_len { - debug!("MERGER OVERWEIGHT ITEM {} vs {}", o.len(), self.out_max_len); + debug!( + "MERGER OVERWEIGHT ITEM {} vs {}", + o.len(), + self.out_max_len + ); } trace3!("decide to output"); self.do_clear_out = false; @@ -388,7 +388,10 @@ where } } - fn poll2(mut self: Pin<&mut Self>, cx: &mut Context) -> ControlFlow>>> { + fn poll2( + mut self: Pin<&mut Self>, + cx: &mut Context, + ) -> ControlFlow>>> { use ControlFlow::*; use Poll::*; match Self::refill(Pin::new(&mut self), cx) { @@ -426,7 +429,9 @@ where self.done_range_complete = true; if self.range_complete.iter().all(|x| *x) { trace!("emit RangeComplete"); - Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)))) + Ready(Some(Ok(StreamItem::DataItem( + RangeCompletableItem::RangeComplete, + )))) } else { continue; } @@ -485,7 +490,7 @@ impl EventTransform for Merger where T: Send, { - fn transform(&mut self, src: Box) -> Box { + fn transform(&mut self, _src: Box) -> Box { todo!() } }