Trim unused code
This commit is contained in:
@@ -312,7 +312,10 @@ impl<NTY: ScalarOps> TimeBins for BinsDim0<NTY> {
|
||||
}
|
||||
|
||||
fn ts_min_max(&self) -> Option<(u64, u64)> {
|
||||
if let (Some(min), Some(max)) = (self.ts1s.front().map(Clone::clone), self.ts2s.back().map(Clone::clone)) {
|
||||
if let (Some(min), Some(max)) = (
|
||||
self.ts1s.front().map(Clone::clone),
|
||||
self.ts2s.back().map(Clone::clone),
|
||||
) {
|
||||
Some((min, max))
|
||||
} else {
|
||||
None
|
||||
@@ -389,7 +392,11 @@ where
|
||||
type Output = BinsDim0<STY>;
|
||||
|
||||
fn ingest(&mut self, item: &mut Self::Input) {
|
||||
trace_ingest!("<{} as TimeBinnerTy>::ingest {:?}", Self::type_name(), item);
|
||||
trace_ingest!(
|
||||
"<{} as TimeBinnerTy>::ingest {:?}",
|
||||
Self::type_name(),
|
||||
item
|
||||
);
|
||||
let mut count_before = 0;
|
||||
for ((((((&ts1, &ts2), &cnt), min), max), &avg), lst) in item
|
||||
.ts1s
|
||||
@@ -407,7 +414,11 @@ where
|
||||
continue;
|
||||
}
|
||||
// warn!("encountered bin from time before {} {}", ts1, self.ts1now.ns());
|
||||
trace_ingest!("{} input bin before {}", Self::type_name(), TsNano::from_ns(ts1));
|
||||
trace_ingest!(
|
||||
"{} input bin before {}",
|
||||
Self::type_name(),
|
||||
TsNano::from_ns(ts1)
|
||||
);
|
||||
self.min = min.clone();
|
||||
self.max = max.clone();
|
||||
self.lst = lst.clone();
|
||||
@@ -562,7 +573,9 @@ impl<STY: ScalarOps> TimeBinnableTy for BinsDim0<STY> {
|
||||
emit_empty_bins: bool,
|
||||
) -> Self::TimeBinner {
|
||||
match binrange {
|
||||
BinnedRangeEnum::Time(binrange) => BinsDim0TimeBinnerTy::new(binrange, do_time_weight, emit_empty_bins),
|
||||
BinnedRangeEnum::Time(binrange) => {
|
||||
BinsDim0TimeBinnerTy::new(binrange, do_time_weight, emit_empty_bins)
|
||||
}
|
||||
BinnedRangeEnum::Pulse(_) => todo!("TimeBinnableTy for BinsDim0 Pulse"),
|
||||
}
|
||||
}
|
||||
@@ -593,11 +606,23 @@ pub struct BinsDim0CollectedResult<NTY> {
|
||||
range_final: bool,
|
||||
#[serde(rename = "timedOut", default, skip_serializing_if = "is_false")]
|
||||
timed_out: bool,
|
||||
#[serde(rename = "missingBins", default, skip_serializing_if = "CmpZero::is_zero")]
|
||||
#[serde(
|
||||
rename = "missingBins",
|
||||
default,
|
||||
skip_serializing_if = "CmpZero::is_zero"
|
||||
)]
|
||||
missing_bins: u32,
|
||||
#[serde(rename = "continueAt", default, skip_serializing_if = "Option::is_none")]
|
||||
#[serde(
|
||||
rename = "continueAt",
|
||||
default,
|
||||
skip_serializing_if = "Option::is_none"
|
||||
)]
|
||||
continue_at: Option<IsoDateTime>,
|
||||
#[serde(rename = "finishedAt", default, skip_serializing_if = "Option::is_none")]
|
||||
#[serde(
|
||||
rename = "finishedAt",
|
||||
default,
|
||||
skip_serializing_if = "Option::is_none"
|
||||
)]
|
||||
finished_at: Option<IsoDateTime>,
|
||||
}
|
||||
|
||||
|
||||
@@ -1,523 +0,0 @@
|
||||
use crate::ts_offs_from_abs;
|
||||
use crate::ts_offs_from_abs_with_anchor;
|
||||
use crate::IsoDateTime;
|
||||
use daqbuf_err as err;
|
||||
use err::Error;
|
||||
use items_0::collect_s::CollectableDyn;
|
||||
use items_0::collect_s::CollectableType;
|
||||
use items_0::collect_s::CollectedDyn;
|
||||
use items_0::collect_s::CollectorTy;
|
||||
use items_0::collect_s::ToJsonResult;
|
||||
use items_0::container::ByteEstimate;
|
||||
use items_0::scalar_ops::AsPrimF32;
|
||||
use items_0::scalar_ops::ScalarOps;
|
||||
use items_0::timebin::TimeBins;
|
||||
use items_0::AppendEmptyBin;
|
||||
use items_0::AsAnyMut;
|
||||
use items_0::AsAnyRef;
|
||||
use items_0::Empty;
|
||||
use items_0::Resettable;
|
||||
use items_0::TypeName;
|
||||
use items_0::WithLen;
|
||||
use netpod::is_false;
|
||||
use netpod::log::*;
|
||||
use netpod::range::evrange::NanoRange;
|
||||
use netpod::range::evrange::SeriesRange;
|
||||
use netpod::timeunits::SEC;
|
||||
use netpod::BinnedRangeEnum;
|
||||
use netpod::CmpZero;
|
||||
use netpod::Dim0Kind;
|
||||
use serde::Deserialize;
|
||||
use serde::Serialize;
|
||||
use std::any;
|
||||
use std::any::Any;
|
||||
use std::collections::VecDeque;
|
||||
use std::fmt;
|
||||
use std::mem;
|
||||
use std::ops::Range;
|
||||
|
||||
#[allow(unused)]
|
||||
macro_rules! trace4 {
|
||||
($($arg:tt)*) => ();
|
||||
($($arg:tt)*) => (eprintln!($($arg)*));
|
||||
}
|
||||
|
||||
#[derive(Clone, PartialEq, Serialize, Deserialize)]
|
||||
pub struct BinsXbinDim0<NTY> {
|
||||
ts1s: VecDeque<u64>,
|
||||
ts2s: VecDeque<u64>,
|
||||
counts: VecDeque<u64>,
|
||||
mins: VecDeque<NTY>,
|
||||
maxs: VecDeque<NTY>,
|
||||
avgs: VecDeque<f32>,
|
||||
// TODO could consider more variables:
|
||||
// ts min/max, pulse min/max, avg of mins, avg of maxs, variances, etc...
|
||||
dim0kind: Option<Dim0Kind>,
|
||||
}
|
||||
|
||||
impl<STY> TypeName for BinsXbinDim0<STY> {
|
||||
fn type_name(&self) -> String {
|
||||
any::type_name::<Self>().into()
|
||||
}
|
||||
}
|
||||
|
||||
impl<NTY> fmt::Debug for BinsXbinDim0<NTY>
|
||||
where
|
||||
NTY: fmt::Debug,
|
||||
{
|
||||
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
|
||||
let self_name = any::type_name::<Self>();
|
||||
write!(
|
||||
fmt,
|
||||
"{self_name} count {} ts1s {:?} ts2s {:?} counts {:?} mins {:?} maxs {:?} avgs {:?}",
|
||||
self.ts1s.len(),
|
||||
self.ts1s.iter().map(|k| k / SEC).collect::<Vec<_>>(),
|
||||
self.ts2s.iter().map(|k| k / SEC).collect::<Vec<_>>(),
|
||||
self.counts,
|
||||
self.mins,
|
||||
self.maxs,
|
||||
self.avgs,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
impl<NTY: ScalarOps> BinsXbinDim0<NTY> {
|
||||
pub fn from_content(
|
||||
ts1s: VecDeque<u64>,
|
||||
ts2s: VecDeque<u64>,
|
||||
counts: VecDeque<u64>,
|
||||
mins: VecDeque<NTY>,
|
||||
maxs: VecDeque<NTY>,
|
||||
avgs: VecDeque<f32>,
|
||||
) -> Self {
|
||||
Self {
|
||||
ts1s,
|
||||
ts2s,
|
||||
counts,
|
||||
mins,
|
||||
maxs,
|
||||
avgs,
|
||||
dim0kind: None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn counts(&self) -> &VecDeque<u64> {
|
||||
&self.counts
|
||||
}
|
||||
|
||||
pub fn push(&mut self, ts1: u64, ts2: u64, count: u64, min: NTY, max: NTY, avg: f32) {
|
||||
self.ts1s.push_back(ts1);
|
||||
self.ts2s.push_back(ts2);
|
||||
self.counts.push_back(count);
|
||||
self.mins.push_back(min);
|
||||
self.maxs.push_back(max);
|
||||
self.avgs.push_back(avg);
|
||||
}
|
||||
|
||||
pub fn append_zero(&mut self, beg: u64, end: u64) {
|
||||
self.ts1s.push_back(beg);
|
||||
self.ts2s.push_back(end);
|
||||
self.counts.push_back(0);
|
||||
self.mins.push_back(NTY::zero_b());
|
||||
self.maxs.push_back(NTY::zero_b());
|
||||
self.avgs.push_back(0.);
|
||||
}
|
||||
|
||||
pub fn append_all_from(&mut self, src: &mut Self) {
|
||||
self.ts1s.extend(src.ts1s.drain(..));
|
||||
self.ts2s.extend(src.ts2s.drain(..));
|
||||
self.counts.extend(src.counts.drain(..));
|
||||
self.mins.extend(src.mins.drain(..));
|
||||
self.maxs.extend(src.maxs.drain(..));
|
||||
self.avgs.extend(src.avgs.drain(..));
|
||||
}
|
||||
|
||||
pub fn equal_slack(&self, other: &Self) -> bool {
|
||||
for (&a, &b) in self.ts1s.iter().zip(other.ts1s.iter()) {
|
||||
if a != b {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
for (&a, &b) in self.ts2s.iter().zip(other.ts2s.iter()) {
|
||||
if a != b {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
for (a, b) in self.mins.iter().zip(other.mins.iter()) {
|
||||
if !a.equal_slack(b) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
for (a, b) in self.maxs.iter().zip(other.maxs.iter()) {
|
||||
if !a.equal_slack(b) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
for (a, b) in self.avgs.iter().zip(other.avgs.iter()) {
|
||||
if !a.equal_slack(b) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
true
|
||||
}
|
||||
}
|
||||
|
||||
impl<NTY> AsAnyRef for BinsXbinDim0<NTY>
|
||||
where
|
||||
NTY: ScalarOps,
|
||||
{
|
||||
fn as_any_ref(&self) -> &dyn Any {
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
impl<STY> AsAnyMut for BinsXbinDim0<STY>
|
||||
where
|
||||
STY: ScalarOps,
|
||||
{
|
||||
fn as_any_mut(&mut self) -> &mut dyn Any {
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
impl<STY> Empty for BinsXbinDim0<STY> {
|
||||
fn empty() -> Self {
|
||||
Self {
|
||||
ts1s: VecDeque::new(),
|
||||
ts2s: VecDeque::new(),
|
||||
counts: VecDeque::new(),
|
||||
mins: VecDeque::new(),
|
||||
maxs: VecDeque::new(),
|
||||
avgs: VecDeque::new(),
|
||||
dim0kind: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<STY> WithLen for BinsXbinDim0<STY> {
|
||||
fn len(&self) -> usize {
|
||||
self.ts1s.len()
|
||||
}
|
||||
}
|
||||
|
||||
impl<STY: ScalarOps> ByteEstimate for BinsXbinDim0<STY> {
|
||||
fn byte_estimate(&self) -> u64 {
|
||||
// TODO
|
||||
// Should use a better estimate for waveform and string types,
|
||||
// or keep some aggregated byte count on push.
|
||||
let n = self.len();
|
||||
if n == 0 {
|
||||
0
|
||||
} else {
|
||||
// TODO use the actual size of one/some of the elements.
|
||||
let i = n * 2 / 3;
|
||||
let w1 = self.mins[i].byte_estimate();
|
||||
let w2 = self.maxs[i].byte_estimate();
|
||||
(n as u64 * (8 + 8 + 8 + 4 + w1 + w2)) as u64
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<STY> Resettable for BinsXbinDim0<STY> {
|
||||
fn reset(&mut self) {
|
||||
self.ts1s.clear();
|
||||
self.ts2s.clear();
|
||||
self.counts.clear();
|
||||
self.mins.clear();
|
||||
self.maxs.clear();
|
||||
self.avgs.clear();
|
||||
}
|
||||
}
|
||||
|
||||
impl<NTY: ScalarOps> AppendEmptyBin for BinsXbinDim0<NTY> {
|
||||
fn append_empty_bin(&mut self, ts1: u64, ts2: u64) {
|
||||
self.ts1s.push_back(ts1);
|
||||
self.ts2s.push_back(ts2);
|
||||
self.counts.push_back(0);
|
||||
self.mins.push_back(NTY::zero_b());
|
||||
self.maxs.push_back(NTY::zero_b());
|
||||
self.avgs.push_back(0.);
|
||||
}
|
||||
}
|
||||
|
||||
impl<NTY: ScalarOps> TimeBins for BinsXbinDim0<NTY> {
|
||||
fn ts_min(&self) -> Option<u64> {
|
||||
self.ts1s.front().map(Clone::clone)
|
||||
}
|
||||
|
||||
fn ts_max(&self) -> Option<u64> {
|
||||
self.ts2s.back().map(Clone::clone)
|
||||
}
|
||||
|
||||
fn ts_min_max(&self) -> Option<(u64, u64)> {
|
||||
if let (Some(min), Some(max)) = (self.ts1s.front().map(Clone::clone), self.ts2s.back().map(Clone::clone)) {
|
||||
Some((min, max))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TODO rename to BinsDim0CollectorOutput
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct BinsXbinDim0CollectedResult<NTY> {
|
||||
#[serde(rename = "tsAnchor")]
|
||||
ts_anchor_sec: u64,
|
||||
#[serde(rename = "ts1Ms")]
|
||||
ts1_off_ms: VecDeque<u64>,
|
||||
#[serde(rename = "ts2Ms")]
|
||||
ts2_off_ms: VecDeque<u64>,
|
||||
#[serde(rename = "ts1Ns")]
|
||||
ts1_off_ns: VecDeque<u64>,
|
||||
#[serde(rename = "ts2Ns")]
|
||||
ts2_off_ns: VecDeque<u64>,
|
||||
#[serde(rename = "counts")]
|
||||
counts: VecDeque<u64>,
|
||||
#[serde(rename = "mins")]
|
||||
mins: VecDeque<NTY>,
|
||||
#[serde(rename = "maxs")]
|
||||
maxs: VecDeque<NTY>,
|
||||
#[serde(rename = "avgs")]
|
||||
avgs: VecDeque<f32>,
|
||||
#[serde(rename = "rangeFinal", default, skip_serializing_if = "is_false")]
|
||||
range_final: bool,
|
||||
#[serde(rename = "timedOut", default, skip_serializing_if = "is_false")]
|
||||
timed_out: bool,
|
||||
#[serde(rename = "missingBins", default, skip_serializing_if = "CmpZero::is_zero")]
|
||||
missing_bins: u32,
|
||||
#[serde(rename = "continueAt", default, skip_serializing_if = "Option::is_none")]
|
||||
continue_at: Option<IsoDateTime>,
|
||||
#[serde(rename = "finishedAt", default, skip_serializing_if = "Option::is_none")]
|
||||
finished_at: Option<IsoDateTime>,
|
||||
}
|
||||
|
||||
impl<NTY> AsAnyRef for BinsXbinDim0CollectedResult<NTY>
|
||||
where
|
||||
NTY: ScalarOps,
|
||||
{
|
||||
fn as_any_ref(&self) -> &dyn Any {
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
impl<NTY> AsAnyMut for BinsXbinDim0CollectedResult<NTY>
|
||||
where
|
||||
NTY: ScalarOps,
|
||||
{
|
||||
fn as_any_mut(&mut self) -> &mut dyn Any {
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
impl<STY> TypeName for BinsXbinDim0CollectedResult<STY> {
|
||||
fn type_name(&self) -> String {
|
||||
any::type_name::<Self>().into()
|
||||
}
|
||||
}
|
||||
|
||||
impl<NTY: ScalarOps> WithLen for BinsXbinDim0CollectedResult<NTY> {
|
||||
fn len(&self) -> usize {
|
||||
self.mins.len()
|
||||
}
|
||||
}
|
||||
|
||||
impl<NTY: ScalarOps> CollectedDyn for BinsXbinDim0CollectedResult<NTY> {}
|
||||
|
||||
impl<NTY> BinsXbinDim0CollectedResult<NTY> {
|
||||
pub fn ts_anchor_sec(&self) -> u64 {
|
||||
self.ts_anchor_sec
|
||||
}
|
||||
|
||||
pub fn ts1_off_ms(&self) -> &VecDeque<u64> {
|
||||
&self.ts1_off_ms
|
||||
}
|
||||
|
||||
pub fn ts2_off_ms(&self) -> &VecDeque<u64> {
|
||||
&self.ts2_off_ms
|
||||
}
|
||||
|
||||
pub fn counts(&self) -> &VecDeque<u64> {
|
||||
&self.counts
|
||||
}
|
||||
|
||||
pub fn range_final(&self) -> bool {
|
||||
self.range_final
|
||||
}
|
||||
|
||||
pub fn missing_bins(&self) -> u32 {
|
||||
self.missing_bins
|
||||
}
|
||||
|
||||
pub fn continue_at(&self) -> Option<IsoDateTime> {
|
||||
self.continue_at.clone()
|
||||
}
|
||||
|
||||
pub fn mins(&self) -> &VecDeque<NTY> {
|
||||
&self.mins
|
||||
}
|
||||
|
||||
pub fn maxs(&self) -> &VecDeque<NTY> {
|
||||
&self.maxs
|
||||
}
|
||||
}
|
||||
|
||||
impl<NTY: ScalarOps> ToJsonResult for BinsXbinDim0CollectedResult<NTY> {
|
||||
fn to_json_value(&self) -> Result<serde_json::Value, serde_json::Error> {
|
||||
serde_json::to_value(self)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct BinsXbinDim0Collector<NTY> {
|
||||
vals: BinsXbinDim0<NTY>,
|
||||
timed_out: bool,
|
||||
range_final: bool,
|
||||
}
|
||||
|
||||
impl<NTY> BinsXbinDim0Collector<NTY> {
|
||||
pub fn self_name() -> &'static str {
|
||||
any::type_name::<Self>()
|
||||
}
|
||||
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
vals: BinsXbinDim0::empty(),
|
||||
timed_out: false,
|
||||
range_final: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<NTY> WithLen for BinsXbinDim0Collector<NTY> {
|
||||
fn len(&self) -> usize {
|
||||
self.vals.len()
|
||||
}
|
||||
}
|
||||
|
||||
impl<STY: ScalarOps> ByteEstimate for BinsXbinDim0Collector<STY> {
|
||||
fn byte_estimate(&self) -> u64 {
|
||||
self.vals.byte_estimate()
|
||||
}
|
||||
}
|
||||
|
||||
impl<NTY: ScalarOps> CollectorTy for BinsXbinDim0Collector<NTY> {
|
||||
type Input = BinsXbinDim0<NTY>;
|
||||
type Output = BinsXbinDim0CollectedResult<NTY>;
|
||||
|
||||
fn ingest(&mut self, src: &mut Self::Input) {
|
||||
trace!("\n\n----------- BinsXbinDim0Collector ingest\n{:?}\n\n", src);
|
||||
// TODO could be optimized by non-contiguous container.
|
||||
self.vals.ts1s.append(&mut src.ts1s);
|
||||
self.vals.ts2s.append(&mut src.ts2s);
|
||||
self.vals.counts.append(&mut src.counts);
|
||||
self.vals.mins.append(&mut src.mins);
|
||||
self.vals.maxs.append(&mut src.maxs);
|
||||
self.vals.avgs.append(&mut src.avgs);
|
||||
}
|
||||
|
||||
fn set_range_complete(&mut self) {
|
||||
self.range_final = true;
|
||||
}
|
||||
|
||||
fn set_timed_out(&mut self) {
|
||||
self.timed_out = true;
|
||||
}
|
||||
|
||||
fn set_continue_at_here(&mut self) {
|
||||
debug!("{}::set_continue_at_here", Self::self_name());
|
||||
// TODO for bins, do nothing: either we have all bins or not.
|
||||
}
|
||||
|
||||
fn result(
|
||||
&mut self,
|
||||
_range: std::option::Option<SeriesRange>,
|
||||
binrange: Option<BinnedRangeEnum>,
|
||||
) -> Result<Self::Output, Error> {
|
||||
let bin_count_exp = if let Some(r) = &binrange {
|
||||
r.bin_count() as u32
|
||||
} else {
|
||||
0
|
||||
};
|
||||
let bin_count = self.vals.ts1s.len() as u32;
|
||||
let (missing_bins, continue_at, finished_at) = if bin_count < bin_count_exp {
|
||||
match self.vals.ts2s.back() {
|
||||
Some(&k) => {
|
||||
let missing_bins = bin_count_exp - bin_count;
|
||||
let continue_at = IsoDateTime::from_ns_u64(k);
|
||||
let u = k + (k - self.vals.ts1s.back().unwrap()) * missing_bins as u64;
|
||||
let finished_at = IsoDateTime::from_ns_u64(u);
|
||||
(missing_bins, Some(continue_at), Some(finished_at))
|
||||
}
|
||||
None => {
|
||||
warn!("can not determine continue-at parameters");
|
||||
(0, None, None)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
(0, None, None)
|
||||
};
|
||||
if self.vals.ts1s.as_slices().1.len() != 0 {
|
||||
panic!();
|
||||
}
|
||||
if self.vals.ts2s.as_slices().1.len() != 0 {
|
||||
panic!();
|
||||
}
|
||||
let tst1 = ts_offs_from_abs(self.vals.ts1s.as_slices().0);
|
||||
let tst2 = ts_offs_from_abs_with_anchor(tst1.0, self.vals.ts2s.as_slices().0);
|
||||
let counts = mem::replace(&mut self.vals.counts, VecDeque::new());
|
||||
let mins = mem::replace(&mut self.vals.mins, VecDeque::new());
|
||||
let maxs = mem::replace(&mut self.vals.maxs, VecDeque::new());
|
||||
let avgs = mem::replace(&mut self.vals.avgs, VecDeque::new());
|
||||
let ret = BinsXbinDim0CollectedResult::<NTY> {
|
||||
ts_anchor_sec: tst1.0,
|
||||
ts1_off_ms: tst1.1,
|
||||
ts1_off_ns: tst1.2,
|
||||
ts2_off_ms: tst2.0,
|
||||
ts2_off_ns: tst2.1,
|
||||
counts,
|
||||
mins,
|
||||
maxs,
|
||||
avgs,
|
||||
range_final: self.range_final,
|
||||
timed_out: self.timed_out,
|
||||
missing_bins,
|
||||
continue_at,
|
||||
finished_at,
|
||||
};
|
||||
Ok(ret)
|
||||
}
|
||||
}
|
||||
|
||||
impl<NTY: ScalarOps> CollectableType for BinsXbinDim0<NTY> {
|
||||
type Collector = BinsXbinDim0Collector<NTY>;
|
||||
|
||||
fn new_collector() -> Self::Collector {
|
||||
Self::Collector::new()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct BinsXbinDim0Aggregator<NTY> {
|
||||
range: SeriesRange,
|
||||
count: u64,
|
||||
min: NTY,
|
||||
max: NTY,
|
||||
// Carry over to next bin:
|
||||
avg: f32,
|
||||
sumc: u64,
|
||||
sum: f32,
|
||||
}
|
||||
|
||||
impl<NTY: ScalarOps> BinsXbinDim0Aggregator<NTY> {
|
||||
pub fn new(range: SeriesRange, _do_time_weight: bool) -> Self {
|
||||
Self {
|
||||
range,
|
||||
count: 0,
|
||||
min: NTY::zero_b(),
|
||||
max: NTY::zero_b(),
|
||||
avg: 0.,
|
||||
sumc: 0,
|
||||
sum: 0f32,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -9,8 +9,6 @@ use items_0::container::ByteEstimate;
|
||||
use items_0::framable::FrameTypeInnerStatic;
|
||||
use items_0::isodate::IsoDateTime;
|
||||
use items_0::streamitem::ITEMS_2_CHANNEL_EVENTS_FRAME_TYPE_ID;
|
||||
use items_0::timebin::TimeBinnableTy;
|
||||
use items_0::timebin::TimeBinnerTy;
|
||||
use items_0::AsAnyMut;
|
||||
use items_0::AsAnyRef;
|
||||
use items_0::Empty;
|
||||
@@ -27,11 +25,9 @@ use serde::Serialize;
|
||||
use std::any;
|
||||
use std::any::Any;
|
||||
use std::collections::VecDeque;
|
||||
use std::fmt;
|
||||
use std::time::Duration;
|
||||
use std::time::SystemTime;
|
||||
|
||||
#[allow(unused)]
|
||||
macro_rules! trace_ingest { ($($arg:tt)*) => ( if true { trace!($($arg)*); }) }
|
||||
|
||||
// TODO maybe rename to ChannelStatus?
|
||||
@@ -62,7 +58,11 @@ pub struct ConnStatusEvent {
|
||||
impl ConnStatusEvent {
|
||||
pub fn new(ts: u64, status: ConnStatus) -> Self {
|
||||
let datetime = SystemTime::UNIX_EPOCH + Duration::from_millis(ts / 1000000);
|
||||
Self { ts, datetime, status }
|
||||
Self {
|
||||
ts,
|
||||
datetime,
|
||||
status,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -135,7 +135,11 @@ pub struct ChannelStatusEvent {
|
||||
impl ChannelStatusEvent {
|
||||
pub fn new(ts: u64, status: ChannelStatus) -> Self {
|
||||
let datetime = SystemTime::UNIX_EPOCH + Duration::from_millis(ts / 1000000);
|
||||
Self { ts, datetime, status }
|
||||
Self {
|
||||
ts,
|
||||
datetime,
|
||||
status,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -207,7 +211,6 @@ mod serde_channel_events {
|
||||
use crate::channelevents::ConnStatusEvent;
|
||||
use crate::eventsdim0::EventsDim0;
|
||||
use crate::eventsdim1::EventsDim1;
|
||||
use crate::eventsxbindim0::EventsXbinDim0;
|
||||
use items_0::subfr::SubFrId;
|
||||
use netpod::log::*;
|
||||
use netpod::EnumVariant;
|
||||
@@ -258,73 +261,90 @@ mod serde_channel_events {
|
||||
where
|
||||
A: de::SeqAccess<'de>,
|
||||
{
|
||||
let cty: &str = seq.next_element()?.ok_or_else(|| de::Error::missing_field("[0] cty"))?;
|
||||
let nty: u32 = seq.next_element()?.ok_or_else(|| de::Error::missing_field("[1] nty"))?;
|
||||
let cty: &str = seq
|
||||
.next_element()?
|
||||
.ok_or_else(|| de::Error::missing_field("[0] cty"))?;
|
||||
let nty: u32 = seq
|
||||
.next_element()?
|
||||
.ok_or_else(|| de::Error::missing_field("[1] nty"))?;
|
||||
if cty == EventsDim0::<u8>::serde_id() {
|
||||
match nty {
|
||||
u8::SUB => {
|
||||
let obj: EventsDim0<u8> =
|
||||
seq.next_element()?.ok_or_else(|| de::Error::missing_field("[2] obj"))?;
|
||||
let obj: EventsDim0<u8> = seq
|
||||
.next_element()?
|
||||
.ok_or_else(|| de::Error::missing_field("[2] obj"))?;
|
||||
Ok(EvBox(Box::new(obj)))
|
||||
}
|
||||
u16::SUB => {
|
||||
let obj: EventsDim0<u16> =
|
||||
seq.next_element()?.ok_or_else(|| de::Error::missing_field("[2] obj"))?;
|
||||
let obj: EventsDim0<u16> = seq
|
||||
.next_element()?
|
||||
.ok_or_else(|| de::Error::missing_field("[2] obj"))?;
|
||||
Ok(EvBox(Box::new(obj)))
|
||||
}
|
||||
u32::SUB => {
|
||||
let obj: EventsDim0<u32> =
|
||||
seq.next_element()?.ok_or_else(|| de::Error::missing_field("[2] obj"))?;
|
||||
let obj: EventsDim0<u32> = seq
|
||||
.next_element()?
|
||||
.ok_or_else(|| de::Error::missing_field("[2] obj"))?;
|
||||
Ok(EvBox(Box::new(obj)))
|
||||
}
|
||||
u64::SUB => {
|
||||
let obj: EventsDim0<u64> =
|
||||
seq.next_element()?.ok_or_else(|| de::Error::missing_field("[2] obj"))?;
|
||||
let obj: EventsDim0<u64> = seq
|
||||
.next_element()?
|
||||
.ok_or_else(|| de::Error::missing_field("[2] obj"))?;
|
||||
Ok(EvBox(Box::new(obj)))
|
||||
}
|
||||
i8::SUB => {
|
||||
let obj: EventsDim0<i8> =
|
||||
seq.next_element()?.ok_or_else(|| de::Error::missing_field("[2] obj"))?;
|
||||
let obj: EventsDim0<i8> = seq
|
||||
.next_element()?
|
||||
.ok_or_else(|| de::Error::missing_field("[2] obj"))?;
|
||||
Ok(EvBox(Box::new(obj)))
|
||||
}
|
||||
i16::SUB => {
|
||||
let obj: EventsDim0<i16> =
|
||||
seq.next_element()?.ok_or_else(|| de::Error::missing_field("[2] obj"))?;
|
||||
let obj: EventsDim0<i16> = seq
|
||||
.next_element()?
|
||||
.ok_or_else(|| de::Error::missing_field("[2] obj"))?;
|
||||
Ok(EvBox(Box::new(obj)))
|
||||
}
|
||||
i32::SUB => {
|
||||
let obj: EventsDim0<i32> =
|
||||
seq.next_element()?.ok_or_else(|| de::Error::missing_field("[2] obj"))?;
|
||||
let obj: EventsDim0<i32> = seq
|
||||
.next_element()?
|
||||
.ok_or_else(|| de::Error::missing_field("[2] obj"))?;
|
||||
Ok(EvBox(Box::new(obj)))
|
||||
}
|
||||
i64::SUB => {
|
||||
let obj: EventsDim0<i64> =
|
||||
seq.next_element()?.ok_or_else(|| de::Error::missing_field("[2] obj"))?;
|
||||
let obj: EventsDim0<i64> = seq
|
||||
.next_element()?
|
||||
.ok_or_else(|| de::Error::missing_field("[2] obj"))?;
|
||||
Ok(EvBox(Box::new(obj)))
|
||||
}
|
||||
f32::SUB => {
|
||||
let obj: EventsDim0<f32> =
|
||||
seq.next_element()?.ok_or_else(|| de::Error::missing_field("[2] obj"))?;
|
||||
let obj: EventsDim0<f32> = seq
|
||||
.next_element()?
|
||||
.ok_or_else(|| de::Error::missing_field("[2] obj"))?;
|
||||
Ok(EvBox(Box::new(obj)))
|
||||
}
|
||||
f64::SUB => {
|
||||
let obj: EventsDim0<f64> =
|
||||
seq.next_element()?.ok_or_else(|| de::Error::missing_field("[2] obj"))?;
|
||||
let obj: EventsDim0<f64> = seq
|
||||
.next_element()?
|
||||
.ok_or_else(|| de::Error::missing_field("[2] obj"))?;
|
||||
Ok(EvBox(Box::new(obj)))
|
||||
}
|
||||
bool::SUB => {
|
||||
let obj: EventsDim0<bool> =
|
||||
seq.next_element()?.ok_or_else(|| de::Error::missing_field("[2] obj"))?;
|
||||
let obj: EventsDim0<bool> = seq
|
||||
.next_element()?
|
||||
.ok_or_else(|| de::Error::missing_field("[2] obj"))?;
|
||||
Ok(EvBox(Box::new(obj)))
|
||||
}
|
||||
String::SUB => {
|
||||
let obj: EventsDim0<String> =
|
||||
seq.next_element()?.ok_or_else(|| de::Error::missing_field("[2] obj"))?;
|
||||
let obj: EventsDim0<String> = seq
|
||||
.next_element()?
|
||||
.ok_or_else(|| de::Error::missing_field("[2] obj"))?;
|
||||
Ok(EvBox(Box::new(obj)))
|
||||
}
|
||||
EnumVariant::SUB => {
|
||||
let obj: EventsDim0<EnumVariant> =
|
||||
seq.next_element()?.ok_or_else(|| de::Error::missing_field("[2] obj"))?;
|
||||
let obj: EventsDim0<EnumVariant> = seq
|
||||
.next_element()?
|
||||
.ok_or_else(|| de::Error::missing_field("[2] obj"))?;
|
||||
Ok(EvBox(Box::new(obj)))
|
||||
}
|
||||
_ => {
|
||||
@@ -335,85 +355,75 @@ mod serde_channel_events {
|
||||
} else if cty == EventsDim1::<u8>::serde_id() {
|
||||
match nty {
|
||||
u8::SUB => {
|
||||
let obj: EventsDim1<u8> =
|
||||
seq.next_element()?.ok_or_else(|| de::Error::missing_field("[2] obj"))?;
|
||||
let obj: EventsDim1<u8> = seq
|
||||
.next_element()?
|
||||
.ok_or_else(|| de::Error::missing_field("[2] obj"))?;
|
||||
Ok(EvBox(Box::new(obj)))
|
||||
}
|
||||
u16::SUB => {
|
||||
let obj: EventsDim1<u16> =
|
||||
seq.next_element()?.ok_or_else(|| de::Error::missing_field("[2] obj"))?;
|
||||
let obj: EventsDim1<u16> = seq
|
||||
.next_element()?
|
||||
.ok_or_else(|| de::Error::missing_field("[2] obj"))?;
|
||||
Ok(EvBox(Box::new(obj)))
|
||||
}
|
||||
u32::SUB => {
|
||||
let obj: EventsDim1<u32> =
|
||||
seq.next_element()?.ok_or_else(|| de::Error::missing_field("[2] obj"))?;
|
||||
let obj: EventsDim1<u32> = seq
|
||||
.next_element()?
|
||||
.ok_or_else(|| de::Error::missing_field("[2] obj"))?;
|
||||
Ok(EvBox(Box::new(obj)))
|
||||
}
|
||||
u64::SUB => {
|
||||
let obj: EventsDim1<u64> =
|
||||
seq.next_element()?.ok_or_else(|| de::Error::missing_field("[2] obj"))?;
|
||||
let obj: EventsDim1<u64> = seq
|
||||
.next_element()?
|
||||
.ok_or_else(|| de::Error::missing_field("[2] obj"))?;
|
||||
Ok(EvBox(Box::new(obj)))
|
||||
}
|
||||
i8::SUB => {
|
||||
let obj: EventsDim1<i8> =
|
||||
seq.next_element()?.ok_or_else(|| de::Error::missing_field("[2] obj"))?;
|
||||
let obj: EventsDim1<i8> = seq
|
||||
.next_element()?
|
||||
.ok_or_else(|| de::Error::missing_field("[2] obj"))?;
|
||||
Ok(EvBox(Box::new(obj)))
|
||||
}
|
||||
i16::SUB => {
|
||||
let obj: EventsDim1<i16> =
|
||||
seq.next_element()?.ok_or_else(|| de::Error::missing_field("[2] obj"))?;
|
||||
let obj: EventsDim1<i16> = seq
|
||||
.next_element()?
|
||||
.ok_or_else(|| de::Error::missing_field("[2] obj"))?;
|
||||
Ok(EvBox(Box::new(obj)))
|
||||
}
|
||||
i32::SUB => {
|
||||
let obj: EventsDim1<i32> =
|
||||
seq.next_element()?.ok_or_else(|| de::Error::missing_field("[2] obj"))?;
|
||||
let obj: EventsDim1<i32> = seq
|
||||
.next_element()?
|
||||
.ok_or_else(|| de::Error::missing_field("[2] obj"))?;
|
||||
Ok(EvBox(Box::new(obj)))
|
||||
}
|
||||
i64::SUB => {
|
||||
let obj: EventsDim1<i64> =
|
||||
seq.next_element()?.ok_or_else(|| de::Error::missing_field("[2] obj"))?;
|
||||
let obj: EventsDim1<i64> = seq
|
||||
.next_element()?
|
||||
.ok_or_else(|| de::Error::missing_field("[2] obj"))?;
|
||||
Ok(EvBox(Box::new(obj)))
|
||||
}
|
||||
f32::SUB => {
|
||||
let obj: EventsDim1<f32> =
|
||||
seq.next_element()?.ok_or_else(|| de::Error::missing_field("[2] obj"))?;
|
||||
let obj: EventsDim1<f32> = seq
|
||||
.next_element()?
|
||||
.ok_or_else(|| de::Error::missing_field("[2] obj"))?;
|
||||
Ok(EvBox(Box::new(obj)))
|
||||
}
|
||||
f64::SUB => {
|
||||
let obj: EventsDim1<f64> =
|
||||
seq.next_element()?.ok_or_else(|| de::Error::missing_field("[2] obj"))?;
|
||||
let obj: EventsDim1<f64> = seq
|
||||
.next_element()?
|
||||
.ok_or_else(|| de::Error::missing_field("[2] obj"))?;
|
||||
Ok(EvBox(Box::new(obj)))
|
||||
}
|
||||
bool::SUB => {
|
||||
let obj: EventsDim1<bool> =
|
||||
seq.next_element()?.ok_or_else(|| de::Error::missing_field("[2] obj"))?;
|
||||
let obj: EventsDim1<bool> = seq
|
||||
.next_element()?
|
||||
.ok_or_else(|| de::Error::missing_field("[2] obj"))?;
|
||||
Ok(EvBox(Box::new(obj)))
|
||||
}
|
||||
String::SUB => {
|
||||
let obj: EventsDim1<String> =
|
||||
seq.next_element()?.ok_or_else(|| de::Error::missing_field("[2] obj"))?;
|
||||
Ok(EvBox(Box::new(obj)))
|
||||
}
|
||||
_ => {
|
||||
error!("TODO serde cty {cty} nty {nty}");
|
||||
Err(de::Error::custom(&format!("unknown nty {nty}")))
|
||||
}
|
||||
}
|
||||
} else if cty == EventsXbinDim0::<u8>::serde_id() {
|
||||
match nty {
|
||||
f32::SUB => {
|
||||
let obj: EventsXbinDim0<f32> =
|
||||
seq.next_element()?.ok_or_else(|| de::Error::missing_field("[2] obj"))?;
|
||||
Ok(EvBox(Box::new(obj)))
|
||||
}
|
||||
f64::SUB => {
|
||||
let obj: EventsXbinDim0<f64> =
|
||||
seq.next_element()?.ok_or_else(|| de::Error::missing_field("[2] obj"))?;
|
||||
Ok(EvBox(Box::new(obj)))
|
||||
}
|
||||
bool::SUB => {
|
||||
let obj: EventsXbinDim0<bool> =
|
||||
seq.next_element()?.ok_or_else(|| de::Error::missing_field("[2] obj"))?;
|
||||
let obj: EventsDim1<String> = seq
|
||||
.next_element()?
|
||||
.ok_or_else(|| de::Error::missing_field("[2] obj"))?;
|
||||
Ok(EvBox(Box::new(obj)))
|
||||
}
|
||||
_ => {
|
||||
@@ -422,7 +432,7 @@ mod serde_channel_events {
|
||||
}
|
||||
}
|
||||
} else {
|
||||
error!("TODO serde cty {cty} nty {nty}");
|
||||
error!("unsupported serde cty {cty} nty {nty}");
|
||||
Err(de::Error::custom(&format!("unknown cty {cty}")))
|
||||
}
|
||||
}
|
||||
@@ -448,7 +458,9 @@ mod serde_channel_events {
|
||||
ChannelEvents::Events(obj) => {
|
||||
serializer.serialize_newtype_variant(name, 0, vars[0], &EvRef(obj.as_ref()))
|
||||
}
|
||||
ChannelEvents::Status(val) => serializer.serialize_newtype_variant(name, 1, vars[1], val),
|
||||
ChannelEvents::Status(val) => {
|
||||
serializer.serialize_newtype_variant(name, 1, vars[1], val)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -491,7 +503,10 @@ mod serde_channel_events {
|
||||
} else if val == vars[1] {
|
||||
Ok(VarId::Status)
|
||||
} else {
|
||||
Err(de::Error::unknown_variant(val, ChannelEventsVis::allowed_variants()))
|
||||
Err(de::Error::unknown_variant(
|
||||
val,
|
||||
ChannelEventsVis::allowed_variants(),
|
||||
))
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -870,7 +885,11 @@ impl Events for ChannelEvents {
|
||||
}
|
||||
}
|
||||
|
||||
fn drain_into_evs(&mut self, dst: &mut dyn Events, range: (usize, usize)) -> Result<(), MergeError> {
|
||||
fn drain_into_evs(
|
||||
&mut self,
|
||||
dst: &mut dyn Events,
|
||||
range: (usize, usize),
|
||||
) -> Result<(), MergeError> {
|
||||
let dst2 = if let Some(x) = dst.as_any_mut().downcast_mut::<Self>() {
|
||||
// debug!("unwrapped dst ChannelEvents as well");
|
||||
x
|
||||
@@ -931,7 +950,9 @@ impl Events for ChannelEvents {
|
||||
|
||||
fn to_min_max_avg(&mut self) -> Box<dyn Events> {
|
||||
match self {
|
||||
ChannelEvents::Events(item) => Box::new(ChannelEvents::Events(Events::to_min_max_avg(item))),
|
||||
ChannelEvents::Events(item) => {
|
||||
Box::new(ChannelEvents::Events(Events::to_min_max_avg(item)))
|
||||
}
|
||||
ChannelEvents::Status(item) => Box::new(ChannelEvents::Status(item.take())),
|
||||
}
|
||||
}
|
||||
@@ -1075,9 +1096,11 @@ impl CollectorDyn for ChannelEventsCollector {
|
||||
if let Some(item) = item.as_any_mut().downcast_mut::<ChannelEvents>() {
|
||||
match item {
|
||||
ChannelEvents::Events(item) => {
|
||||
let coll = self
|
||||
.coll
|
||||
.get_or_insert_with(|| item.as_ref().as_collectable_with_default_ref().new_collector());
|
||||
let coll = self.coll.get_or_insert_with(|| {
|
||||
item.as_ref()
|
||||
.as_collectable_with_default_ref()
|
||||
.new_collector()
|
||||
});
|
||||
coll.ingest(item.as_collectable_with_default_mut());
|
||||
}
|
||||
ChannelEvents::Status(_) => {
|
||||
|
||||
@@ -1,5 +1,3 @@
|
||||
use crate::binsdim0::BinsDim0;
|
||||
use crate::eventsxbindim0::EventsXbinDim0;
|
||||
use crate::IsoDateTime;
|
||||
use daqbuf_err as err;
|
||||
use err::Error;
|
||||
@@ -32,7 +30,6 @@ use std::any;
|
||||
use std::any::Any;
|
||||
use std::collections::VecDeque;
|
||||
use std::fmt;
|
||||
use std::marker::PhantomData;
|
||||
use std::mem;
|
||||
|
||||
#[allow(unused)]
|
||||
@@ -239,7 +236,11 @@ pub struct EventsDim1CollectorOutput<STY> {
|
||||
range_final: bool,
|
||||
#[serde(rename = "timedOut", default, skip_serializing_if = "is_false")]
|
||||
timed_out: bool,
|
||||
#[serde(rename = "continueAt", default, skip_serializing_if = "Option::is_none")]
|
||||
#[serde(
|
||||
rename = "continueAt",
|
||||
default,
|
||||
skip_serializing_if = "Option::is_none"
|
||||
)]
|
||||
continue_at: Option<IsoDateTime>,
|
||||
}
|
||||
|
||||
@@ -517,7 +518,11 @@ impl<STY: ScalarOps> Events for EventsDim1<STY> {
|
||||
let tss = self.tss.drain(..n1).collect();
|
||||
let pulses = self.pulses.drain(..n1).collect();
|
||||
let values = self.values.drain(..n1).collect();
|
||||
let ret = Self { tss, pulses, values };
|
||||
let ret = Self {
|
||||
tss,
|
||||
pulses,
|
||||
values,
|
||||
};
|
||||
Box::new(ret)
|
||||
}
|
||||
|
||||
@@ -525,7 +530,11 @@ impl<STY: ScalarOps> Events for EventsDim1<STY> {
|
||||
Box::new(Self::empty())
|
||||
}
|
||||
|
||||
fn drain_into_evs(&mut self, dst: &mut dyn Events, range: (usize, usize)) -> Result<(), MergeError> {
|
||||
fn drain_into_evs(
|
||||
&mut self,
|
||||
dst: &mut dyn Events,
|
||||
range: (usize, usize),
|
||||
) -> Result<(), MergeError> {
|
||||
// TODO as_any and as_any_mut are declared on unrelated traits. Simplify.
|
||||
if let Some(dst) = dst.as_any_mut().downcast_mut::<Self>() {
|
||||
// TODO make it harder to forget new members when the struct may get modified in the future
|
||||
@@ -609,33 +618,7 @@ impl<STY: ScalarOps> Events for EventsDim1<STY> {
|
||||
}
|
||||
|
||||
fn to_min_max_avg(&mut self) -> Box<dyn Events> {
|
||||
let mins = self
|
||||
.values
|
||||
.iter()
|
||||
.map(|x| STY::find_vec_min(x))
|
||||
.map(|x| x.unwrap_or_else(|| STY::zero_b()))
|
||||
.collect();
|
||||
let maxs = self
|
||||
.values
|
||||
.iter()
|
||||
.map(|x| STY::find_vec_max(x))
|
||||
.map(|x| x.unwrap_or_else(|| STY::zero_b()))
|
||||
.collect();
|
||||
let avgs = self
|
||||
.values
|
||||
.iter()
|
||||
.map(|x| STY::avg_vec(x))
|
||||
.map(|x| x.unwrap_or_else(|| STY::zero_b()))
|
||||
.map(|x| x.as_prim_f32_b())
|
||||
.collect();
|
||||
let item = EventsXbinDim0 {
|
||||
tss: mem::replace(&mut self.tss, VecDeque::new()),
|
||||
pulses: mem::replace(&mut self.pulses, VecDeque::new()),
|
||||
mins,
|
||||
maxs,
|
||||
avgs,
|
||||
};
|
||||
Box::new(item)
|
||||
panic!("discontinued support for EventsDim1")
|
||||
}
|
||||
|
||||
fn to_json_string(&self) -> String {
|
||||
|
||||
@@ -1,779 +0,0 @@
|
||||
use crate::binsxbindim0::BinsXbinDim0;
|
||||
use crate::IsoDateTime;
|
||||
use daqbuf_err as err;
|
||||
use err::Error;
|
||||
use items_0::collect_s::CollectableDyn;
|
||||
use items_0::collect_s::CollectableType;
|
||||
use items_0::collect_s::CollectedDyn;
|
||||
use items_0::collect_s::CollectorTy;
|
||||
use items_0::collect_s::ToJsonResult;
|
||||
use items_0::container::ByteEstimate;
|
||||
use items_0::overlap::HasTimestampDeque;
|
||||
use items_0::scalar_ops::ScalarOps;
|
||||
use items_0::timebin::TimeBinnerTy;
|
||||
use items_0::AsAnyMut;
|
||||
use items_0::AsAnyRef;
|
||||
use items_0::Empty;
|
||||
use items_0::Events;
|
||||
use items_0::EventsNonObj;
|
||||
use items_0::MergeError;
|
||||
use items_0::TypeName;
|
||||
use items_0::WithLen;
|
||||
use netpod::is_false;
|
||||
use netpod::log::*;
|
||||
use netpod::range::evrange::NanoRange;
|
||||
use netpod::range::evrange::SeriesRange;
|
||||
use netpod::timeunits::SEC;
|
||||
use netpod::BinnedRangeEnum;
|
||||
use serde::Deserialize;
|
||||
use serde::Serialize;
|
||||
use std::any;
|
||||
use std::any::Any;
|
||||
use std::collections::VecDeque;
|
||||
use std::fmt;
|
||||
use std::mem;
|
||||
|
||||
#[allow(unused)]
|
||||
macro_rules! trace_ingest {
|
||||
($($arg:tt)*) => {};
|
||||
($($arg:tt)*) => { trace!($($arg)*) };
|
||||
}
|
||||
|
||||
#[allow(unused)]
|
||||
macro_rules! trace2 {
|
||||
($($arg:tt)*) => {};
|
||||
($($arg:tt)*) => { trace!($($arg)*) };
|
||||
}
|
||||
|
||||
#[derive(Clone, PartialEq, Serialize, Deserialize)]
|
||||
pub struct EventsXbinDim0<NTY> {
|
||||
pub tss: VecDeque<u64>,
|
||||
pub pulses: VecDeque<u64>,
|
||||
pub mins: VecDeque<NTY>,
|
||||
pub maxs: VecDeque<NTY>,
|
||||
pub avgs: VecDeque<f32>,
|
||||
// TODO maybe add variance?
|
||||
}
|
||||
|
||||
impl<NTY> EventsXbinDim0<NTY> {
|
||||
#[inline(always)]
|
||||
pub fn push(&mut self, ts: u64, pulse: u64, min: NTY, max: NTY, avg: f32) {
|
||||
self.tss.push_back(ts);
|
||||
self.pulses.push_back(pulse);
|
||||
self.mins.push_back(min);
|
||||
self.maxs.push_back(max);
|
||||
self.avgs.push_back(avg);
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
pub fn push_front(&mut self, ts: u64, pulse: u64, min: NTY, max: NTY, avg: f32) {
|
||||
self.tss.push_front(ts);
|
||||
self.pulses.push_front(pulse);
|
||||
self.mins.push_front(min);
|
||||
self.maxs.push_front(max);
|
||||
self.avgs.push_front(avg);
|
||||
}
|
||||
|
||||
pub fn serde_id() -> &'static str {
|
||||
"EventsXbinDim0"
|
||||
}
|
||||
}
|
||||
|
||||
impl<STY> TypeName for EventsXbinDim0<STY> {
|
||||
fn type_name(&self) -> String {
|
||||
any::type_name::<Self>().into()
|
||||
}
|
||||
}
|
||||
|
||||
impl<STY> fmt::Debug for EventsXbinDim0<STY>
|
||||
where
|
||||
STY: fmt::Debug,
|
||||
{
|
||||
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
|
||||
if false {
|
||||
write!(
|
||||
fmt,
|
||||
"{} {{ count {} ts {:?} vals {:?} }}",
|
||||
self.type_name(),
|
||||
self.tss.len(),
|
||||
self.tss.iter().map(|x| x / SEC).collect::<Vec<_>>(),
|
||||
self.avgs,
|
||||
)
|
||||
} else {
|
||||
write!(
|
||||
fmt,
|
||||
"{} {{ count {} ts {:?} .. {:?} vals {:?} .. {:?} }}",
|
||||
self.type_name(),
|
||||
self.tss.len(),
|
||||
self.tss.front().map(|x| x / SEC),
|
||||
self.tss.back().map(|x| x / SEC),
|
||||
self.avgs.front(),
|
||||
self.avgs.back(),
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<STY> ByteEstimate for EventsXbinDim0<STY> {
|
||||
fn byte_estimate(&self) -> u64 {
|
||||
let stylen = mem::size_of::<STY>();
|
||||
(self.len() * (8 + 8 + 2 * stylen + 4)) as u64
|
||||
}
|
||||
}
|
||||
|
||||
impl<STY> Empty for EventsXbinDim0<STY> {
|
||||
fn empty() -> Self {
|
||||
Self {
|
||||
tss: VecDeque::new(),
|
||||
pulses: VecDeque::new(),
|
||||
mins: VecDeque::new(),
|
||||
maxs: VecDeque::new(),
|
||||
avgs: VecDeque::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<STY> AsAnyRef for EventsXbinDim0<STY>
|
||||
where
|
||||
STY: ScalarOps,
|
||||
{
|
||||
fn as_any_ref(&self) -> &dyn Any {
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
impl<STY> AsAnyMut for EventsXbinDim0<STY>
|
||||
where
|
||||
STY: ScalarOps,
|
||||
{
|
||||
fn as_any_mut(&mut self) -> &mut dyn Any {
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
impl<STY> WithLen for EventsXbinDim0<STY> {
|
||||
fn len(&self) -> usize {
|
||||
self.tss.len()
|
||||
}
|
||||
}
|
||||
|
||||
impl<STY: ScalarOps> HasTimestampDeque for EventsXbinDim0<STY> {
|
||||
fn timestamp_min(&self) -> Option<u64> {
|
||||
self.tss.front().map(|x| *x)
|
||||
}
|
||||
|
||||
fn timestamp_max(&self) -> Option<u64> {
|
||||
self.tss.back().map(|x| *x)
|
||||
}
|
||||
|
||||
fn pulse_min(&self) -> Option<u64> {
|
||||
self.pulses.front().map(|x| *x)
|
||||
}
|
||||
|
||||
fn pulse_max(&self) -> Option<u64> {
|
||||
self.pulses.back().map(|x| *x)
|
||||
}
|
||||
}
|
||||
|
||||
impl<STY: ScalarOps> EventsNonObj for EventsXbinDim0<STY> {
|
||||
fn into_tss_pulses(self: Box<Self>) -> (VecDeque<u64>, VecDeque<u64>) {
|
||||
info!(
|
||||
"EventsXbinDim0::into_tss_pulses len {} len {}",
|
||||
self.tss.len(),
|
||||
self.pulses.len()
|
||||
);
|
||||
(self.tss, self.pulses)
|
||||
}
|
||||
}
|
||||
|
||||
impl<STY: ScalarOps> Events for EventsXbinDim0<STY> {
|
||||
fn verify(&self) -> bool {
|
||||
let mut good = true;
|
||||
let mut ts_max = 0;
|
||||
for ts in &self.tss {
|
||||
let ts = *ts;
|
||||
if ts < ts_max {
|
||||
good = false;
|
||||
error!("unordered event data ts {} ts_max {}", ts, ts_max);
|
||||
}
|
||||
ts_max = ts_max.max(ts);
|
||||
}
|
||||
good
|
||||
}
|
||||
|
||||
fn output_info(&self) -> String {
|
||||
let n2 = self.tss.len().max(1) - 1;
|
||||
format!(
|
||||
"EventsXbinDim0OutputInfo {{ len {}, ts_min {}, ts_max {} }}",
|
||||
self.tss.len(),
|
||||
self.tss.get(0).map_or(-1i64, |&x| x as i64),
|
||||
self.tss.get(n2).map_or(-1i64, |&x| x as i64),
|
||||
)
|
||||
}
|
||||
|
||||
fn as_collectable_mut(&mut self) -> &mut dyn CollectableDyn {
|
||||
self
|
||||
}
|
||||
|
||||
fn as_collectable_with_default_ref(&self) -> &dyn CollectableDyn {
|
||||
self
|
||||
}
|
||||
|
||||
fn as_collectable_with_default_mut(&mut self) -> &mut dyn CollectableDyn {
|
||||
self
|
||||
}
|
||||
|
||||
fn take_new_events_until_ts(&mut self, ts_end: u64) -> Box<dyn Events> {
|
||||
// TODO improve the search
|
||||
let n1 = self.tss.iter().take_while(|&&x| x <= ts_end).count();
|
||||
let tss = self.tss.drain(..n1).collect();
|
||||
let pulses = self.pulses.drain(..n1).collect();
|
||||
let mins = self.mins.drain(..n1).collect();
|
||||
let maxs = self.maxs.drain(..n1).collect();
|
||||
let avgs = self.avgs.drain(..n1).collect();
|
||||
let ret = Self {
|
||||
tss,
|
||||
pulses,
|
||||
mins,
|
||||
maxs,
|
||||
avgs,
|
||||
};
|
||||
Box::new(ret)
|
||||
}
|
||||
|
||||
fn new_empty_evs(&self) -> Box<dyn Events> {
|
||||
Box::new(Self::empty())
|
||||
}
|
||||
|
||||
fn drain_into_evs(&mut self, dst: &mut dyn Events, range: (usize, usize)) -> Result<(), MergeError> {
|
||||
// TODO as_any and as_any_mut are declared on unrelated traits. Simplify.
|
||||
if let Some(dst) = dst.as_any_mut().downcast_mut::<Self>() {
|
||||
// TODO make it harder to forget new members when the struct may get modified in the future
|
||||
let r = range.0..range.1;
|
||||
dst.tss.extend(self.tss.drain(r.clone()));
|
||||
dst.pulses.extend(self.pulses.drain(r.clone()));
|
||||
dst.mins.extend(self.mins.drain(r.clone()));
|
||||
dst.maxs.extend(self.maxs.drain(r.clone()));
|
||||
dst.avgs.extend(self.avgs.drain(r.clone()));
|
||||
Ok(())
|
||||
} else {
|
||||
error!("downcast to {} FAILED", self.type_name());
|
||||
Err(MergeError::NotCompatible)
|
||||
}
|
||||
}
|
||||
|
||||
fn find_lowest_index_gt_evs(&self, ts: u64) -> Option<usize> {
|
||||
for (i, &m) in self.tss.iter().enumerate() {
|
||||
if m > ts {
|
||||
return Some(i);
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
fn find_lowest_index_ge_evs(&self, ts: u64) -> Option<usize> {
|
||||
for (i, &m) in self.tss.iter().enumerate() {
|
||||
if m >= ts {
|
||||
return Some(i);
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
fn find_highest_index_lt_evs(&self, ts: u64) -> Option<usize> {
|
||||
for (i, &m) in self.tss.iter().enumerate().rev() {
|
||||
if m < ts {
|
||||
return Some(i);
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
fn ts_min(&self) -> Option<u64> {
|
||||
self.tss.front().map(|&x| x)
|
||||
}
|
||||
|
||||
fn ts_max(&self) -> Option<u64> {
|
||||
self.tss.back().map(|&x| x)
|
||||
}
|
||||
|
||||
fn partial_eq_dyn(&self, other: &dyn Events) -> bool {
|
||||
if let Some(other) = other.as_any_ref().downcast_ref::<Self>() {
|
||||
self == other
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
fn serde_id(&self) -> &'static str {
|
||||
Self::serde_id()
|
||||
}
|
||||
|
||||
fn nty_id(&self) -> u32 {
|
||||
STY::SUB
|
||||
}
|
||||
|
||||
fn clone_dyn(&self) -> Box<dyn Events> {
|
||||
Box::new(self.clone())
|
||||
}
|
||||
|
||||
fn tss(&self) -> &VecDeque<u64> {
|
||||
&self.tss
|
||||
}
|
||||
|
||||
fn pulses(&self) -> &VecDeque<u64> {
|
||||
&self.pulses
|
||||
}
|
||||
|
||||
fn frame_type_id(&self) -> u32 {
|
||||
error!("TODO frame_type_id should not be called");
|
||||
// TODO make more nice
|
||||
panic!()
|
||||
}
|
||||
|
||||
fn to_min_max_avg(&mut self) -> Box<dyn Events> {
|
||||
let dst = Self {
|
||||
tss: mem::replace(&mut self.tss, Default::default()),
|
||||
pulses: mem::replace(&mut self.pulses, Default::default()),
|
||||
mins: mem::replace(&mut self.mins, Default::default()),
|
||||
maxs: mem::replace(&mut self.maxs, Default::default()),
|
||||
avgs: mem::replace(&mut self.avgs, Default::default()),
|
||||
};
|
||||
Box::new(dst)
|
||||
}
|
||||
|
||||
fn to_json_string(&self) -> String {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn to_json_vec_u8(&self) -> Vec<u8> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn to_cbor_vec_u8(&self) -> Vec<u8> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn clear(&mut self) {
|
||||
self.tss.clear();
|
||||
self.pulses.clear();
|
||||
self.mins.clear();
|
||||
self.maxs.clear();
|
||||
self.avgs.clear();
|
||||
}
|
||||
|
||||
fn to_dim0_f32_for_binning(&self) -> Box<dyn Events> {
|
||||
todo!("{}::to_dim0_f32_for_binning", self.type_name())
|
||||
}
|
||||
|
||||
fn to_container_events(&self) -> Box<dyn ::items_0::timebin::BinningggContainerEventsDyn> {
|
||||
todo!("{}::to_container_events", self.type_name())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct EventsXbinDim0Aggregator<STY>
|
||||
where
|
||||
STY: ScalarOps,
|
||||
{
|
||||
range: SeriesRange,
|
||||
/// Number of events which actually fall in this bin.
|
||||
count: u64,
|
||||
min: STY,
|
||||
max: STY,
|
||||
/// Number of times we accumulated to the sum of this bin.
|
||||
sumc: u64,
|
||||
sum: f32,
|
||||
int_ts: u64,
|
||||
last_ts: u64,
|
||||
last_vals: Option<(STY, STY, f32)>,
|
||||
did_min_max: bool,
|
||||
do_time_weight: bool,
|
||||
events_ignored_count: u64,
|
||||
}
|
||||
|
||||
impl<STY> EventsXbinDim0Aggregator<STY>
|
||||
where
|
||||
STY: ScalarOps,
|
||||
{
|
||||
pub fn type_name() -> &'static str {
|
||||
std::any::type_name::<Self>()
|
||||
}
|
||||
|
||||
pub fn new(range: SeriesRange, do_time_weight: bool) -> Self {
|
||||
let int_ts = range.beg_u64();
|
||||
Self {
|
||||
range,
|
||||
did_min_max: false,
|
||||
count: 0,
|
||||
min: STY::zero_b(),
|
||||
max: STY::zero_b(),
|
||||
sumc: 0,
|
||||
sum: 0f32,
|
||||
int_ts,
|
||||
last_ts: 0,
|
||||
last_vals: None,
|
||||
events_ignored_count: 0,
|
||||
do_time_weight,
|
||||
}
|
||||
}
|
||||
|
||||
fn apply_min_max(&mut self, min: &STY, max: &STY) {
|
||||
if self.did_min_max != (self.sumc > 0) {
|
||||
panic!("logic error apply_min_max {} {}", self.did_min_max, self.sumc);
|
||||
}
|
||||
if self.sumc == 0 {
|
||||
self.did_min_max = true;
|
||||
self.min = min.clone();
|
||||
self.max = max.clone();
|
||||
} else {
|
||||
if *min < self.min {
|
||||
self.min = min.clone();
|
||||
}
|
||||
if *max > self.max {
|
||||
self.max = max.clone();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn apply_event_unweight(&mut self, avg: f32, min: STY, max: STY) {
|
||||
//debug!("apply_event_unweight");
|
||||
self.apply_min_max(&min, &max);
|
||||
self.sumc += 1;
|
||||
let vf = avg;
|
||||
if vf.is_nan() {
|
||||
} else {
|
||||
self.sum += vf;
|
||||
}
|
||||
}
|
||||
|
||||
// Only integrate, do not count because it is used even if the event does not fall into current bin.
|
||||
fn apply_event_time_weight(&mut self, px: u64) {
|
||||
trace_ingest!(
|
||||
"apply_event_time_weight px {} count {} sumc {} events_ignored_count {}",
|
||||
px,
|
||||
self.count,
|
||||
self.sumc,
|
||||
self.events_ignored_count
|
||||
);
|
||||
if let Some((min, max, avg)) = self.last_vals.as_ref() {
|
||||
let vf = *avg;
|
||||
{
|
||||
let min = min.clone();
|
||||
let max = max.clone();
|
||||
self.apply_min_max(&min, &max);
|
||||
}
|
||||
self.sumc += 1;
|
||||
let w = (px - self.int_ts) as f32 * 1e-9;
|
||||
if vf.is_nan() {
|
||||
} else {
|
||||
self.sum += vf * w;
|
||||
}
|
||||
self.int_ts = px;
|
||||
} else {
|
||||
debug!("apply_event_time_weight NO VALUE");
|
||||
}
|
||||
}
|
||||
|
||||
fn ingest_unweight(&mut self, item: &EventsXbinDim0<STY>) {
|
||||
/*for i1 in 0..item.tss.len() {
|
||||
let ts = item.tss[i1];
|
||||
let avg = item.avgs[i1];
|
||||
let min = item.mins[i1].clone();
|
||||
let max = item.maxs[i1].clone();
|
||||
if ts < self.range.beg {
|
||||
} else if ts >= self.range.end {
|
||||
} else {
|
||||
self.apply_event_unweight(avg, min, max);
|
||||
}
|
||||
}*/
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn ingest_time_weight(&mut self, item: &EventsXbinDim0<STY>) {
|
||||
trace!(
|
||||
"{} ingest_time_weight range {:?} last_ts {:?} int_ts {:?}",
|
||||
Self::type_name(),
|
||||
self.range,
|
||||
self.last_ts,
|
||||
self.int_ts
|
||||
);
|
||||
let range_beg = self.range.beg_u64();
|
||||
let range_end = self.range.end_u64();
|
||||
for (((&ts, min), max), avg) in item
|
||||
.tss
|
||||
.iter()
|
||||
.zip(item.mins.iter())
|
||||
.zip(item.maxs.iter())
|
||||
.zip(item.avgs.iter())
|
||||
{
|
||||
if ts >= range_end {
|
||||
self.events_ignored_count += 1;
|
||||
// TODO break early when tests pass.
|
||||
//break;
|
||||
} else if ts >= range_beg {
|
||||
self.apply_event_time_weight(ts);
|
||||
self.count += 1;
|
||||
self.last_ts = ts;
|
||||
self.last_vals = Some((min.clone(), max.clone(), avg.clone()));
|
||||
} else {
|
||||
self.events_ignored_count += 1;
|
||||
self.last_ts = ts;
|
||||
self.last_vals = Some((min.clone(), max.clone(), avg.clone()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn result_reset_unweight(&mut self, range: SeriesRange) -> BinsXbinDim0<STY> {
|
||||
/*let avg = if self.sumc == 0 {
|
||||
0f32
|
||||
} else {
|
||||
self.sum / self.sumc as f32
|
||||
};
|
||||
let ret = BinsXbinDim0::from_content(
|
||||
[self.range.beg].into(),
|
||||
[self.range.end].into(),
|
||||
[self.count].into(),
|
||||
[self.min.clone()].into(),
|
||||
[self.max.clone()].into(),
|
||||
[avg].into(),
|
||||
);
|
||||
self.int_ts = range.beg;
|
||||
self.range = range;
|
||||
self.sum = 0f32;
|
||||
self.sumc = 0;
|
||||
self.did_min_max = false;
|
||||
self.min = NTY::zero_b();
|
||||
self.max = NTY::zero_b();
|
||||
ret*/
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn result_reset_time_weight(&mut self, range: SeriesRange) -> BinsXbinDim0<STY> {
|
||||
trace!("{} result_reset_time_weight", Self::type_name());
|
||||
// TODO check callsite for correct expand status.
|
||||
if self.range.is_time() {
|
||||
self.apply_event_time_weight(self.range.end_u64());
|
||||
} else {
|
||||
error!("TODO result_reset_time_weight");
|
||||
err::todoval()
|
||||
}
|
||||
let range_beg = self.range.beg_u64();
|
||||
let range_end = self.range.end_u64();
|
||||
let (min, max, avg) = if self.sumc > 0 {
|
||||
let avg = self.sum / (self.range.delta_u64() as f32 * 1e-9);
|
||||
(self.min.clone(), self.max.clone(), avg)
|
||||
} else {
|
||||
let (min, max, avg) = match &self.last_vals {
|
||||
Some((min, max, avg)) => {
|
||||
warn!("\n\n\n!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! SHOULD ALWAYS HAVE ACCUMULATED IN THIS CASE");
|
||||
(min.clone(), max.clone(), avg.clone())
|
||||
}
|
||||
None => (STY::zero_b(), STY::zero_b(), 0.),
|
||||
};
|
||||
(min, max, avg)
|
||||
};
|
||||
let ret = BinsXbinDim0::from_content(
|
||||
[range_beg].into(),
|
||||
[range_end].into(),
|
||||
[self.count].into(),
|
||||
[min.clone()].into(),
|
||||
[max.clone()].into(),
|
||||
[avg].into(),
|
||||
);
|
||||
self.int_ts = range.beg_u64();
|
||||
self.range = range;
|
||||
self.count = 0;
|
||||
self.sumc = 0;
|
||||
self.sum = 0.;
|
||||
self.did_min_max = false;
|
||||
self.min = STY::zero_b();
|
||||
self.max = STY::zero_b();
|
||||
ret
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct EventsXbinDim0CollectorOutput<NTY> {
|
||||
#[serde(rename = "tsAnchor")]
|
||||
ts_anchor_sec: u64,
|
||||
#[serde(rename = "tsMs")]
|
||||
ts_off_ms: VecDeque<u64>,
|
||||
#[serde(rename = "tsNs")]
|
||||
ts_off_ns: VecDeque<u64>,
|
||||
#[serde(rename = "pulseAnchor")]
|
||||
pulse_anchor: u64,
|
||||
#[serde(rename = "pulseOff")]
|
||||
pulse_off: VecDeque<u64>,
|
||||
#[serde(rename = "mins")]
|
||||
mins: VecDeque<NTY>,
|
||||
#[serde(rename = "maxs")]
|
||||
maxs: VecDeque<NTY>,
|
||||
#[serde(rename = "avgs")]
|
||||
avgs: VecDeque<f32>,
|
||||
#[serde(rename = "rangeFinal", default, skip_serializing_if = "is_false")]
|
||||
range_final: bool,
|
||||
#[serde(rename = "timedOut", default, skip_serializing_if = "is_false")]
|
||||
timed_out: bool,
|
||||
#[serde(rename = "continueAt", default, skip_serializing_if = "Option::is_none")]
|
||||
continue_at: Option<IsoDateTime>,
|
||||
}
|
||||
|
||||
impl<NTY> AsAnyRef for EventsXbinDim0CollectorOutput<NTY>
|
||||
where
|
||||
NTY: ScalarOps,
|
||||
{
|
||||
fn as_any_ref(&self) -> &dyn Any {
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
impl<NTY> AsAnyMut for EventsXbinDim0CollectorOutput<NTY>
|
||||
where
|
||||
NTY: ScalarOps,
|
||||
{
|
||||
fn as_any_mut(&mut self) -> &mut dyn Any {
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
impl<STY> TypeName for EventsXbinDim0CollectorOutput<STY> {
|
||||
fn type_name(&self) -> String {
|
||||
any::type_name::<Self>().into()
|
||||
}
|
||||
}
|
||||
|
||||
impl<NTY: ScalarOps> WithLen for EventsXbinDim0CollectorOutput<NTY> {
|
||||
fn len(&self) -> usize {
|
||||
self.mins.len()
|
||||
}
|
||||
}
|
||||
|
||||
impl<NTY> ToJsonResult for EventsXbinDim0CollectorOutput<NTY>
|
||||
where
|
||||
NTY: ScalarOps,
|
||||
{
|
||||
fn to_json_value(&self) -> Result<serde_json::Value, serde_json::Error> {
|
||||
serde_json::to_value(self)
|
||||
}
|
||||
}
|
||||
|
||||
impl<NTY> CollectedDyn for EventsXbinDim0CollectorOutput<NTY> where NTY: ScalarOps {}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct EventsXbinDim0Collector<NTY> {
|
||||
vals: EventsXbinDim0<NTY>,
|
||||
range_final: bool,
|
||||
timed_out: bool,
|
||||
needs_continue_at: bool,
|
||||
}
|
||||
|
||||
impl<NTY> EventsXbinDim0Collector<NTY> {
|
||||
pub fn self_name() -> &'static str {
|
||||
any::type_name::<Self>()
|
||||
}
|
||||
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
range_final: false,
|
||||
timed_out: false,
|
||||
vals: EventsXbinDim0::empty(),
|
||||
needs_continue_at: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<NTY> WithLen for EventsXbinDim0Collector<NTY> {
|
||||
fn len(&self) -> usize {
|
||||
WithLen::len(&self.vals)
|
||||
}
|
||||
}
|
||||
|
||||
impl<STY> ByteEstimate for EventsXbinDim0Collector<STY> {
|
||||
fn byte_estimate(&self) -> u64 {
|
||||
ByteEstimate::byte_estimate(&self.vals)
|
||||
}
|
||||
}
|
||||
|
||||
impl<NTY> CollectorTy for EventsXbinDim0Collector<NTY>
|
||||
where
|
||||
NTY: ScalarOps,
|
||||
{
|
||||
type Input = EventsXbinDim0<NTY>;
|
||||
type Output = EventsXbinDim0CollectorOutput<NTY>;
|
||||
|
||||
fn ingest(&mut self, src: &mut Self::Input) {
|
||||
self.vals.tss.append(&mut src.tss);
|
||||
self.vals.pulses.append(&mut src.pulses);
|
||||
self.vals.mins.append(&mut src.mins);
|
||||
self.vals.maxs.append(&mut src.maxs);
|
||||
self.vals.avgs.append(&mut src.avgs);
|
||||
}
|
||||
|
||||
fn set_range_complete(&mut self) {
|
||||
self.range_final = true;
|
||||
}
|
||||
|
||||
fn set_timed_out(&mut self) {
|
||||
self.timed_out = true;
|
||||
}
|
||||
|
||||
fn set_continue_at_here(&mut self) {
|
||||
self.needs_continue_at = true;
|
||||
}
|
||||
|
||||
fn result(
|
||||
&mut self,
|
||||
range: Option<SeriesRange>,
|
||||
_binrange: Option<BinnedRangeEnum>,
|
||||
) -> Result<Self::Output, Error> {
|
||||
/*use std::mem::replace;
|
||||
let continue_at = if self.timed_out {
|
||||
if let Some(ts) = self.vals.tss.back() {
|
||||
Some(IsoDateTime::from_u64(*ts + netpod::timeunits::MS))
|
||||
} else {
|
||||
if let Some(range) = &range {
|
||||
Some(IsoDateTime::from_u64(range.beg + netpod::timeunits::SEC))
|
||||
} else {
|
||||
warn!("can not determine continue-at parameters");
|
||||
None
|
||||
}
|
||||
}
|
||||
} else {
|
||||
None
|
||||
};
|
||||
let mins = replace(&mut self.vals.mins, VecDeque::new());
|
||||
let maxs = replace(&mut self.vals.maxs, VecDeque::new());
|
||||
let avgs = replace(&mut self.vals.avgs, VecDeque::new());
|
||||
self.vals.tss.make_contiguous();
|
||||
self.vals.pulses.make_contiguous();
|
||||
let tst = crate::ts_offs_from_abs(self.vals.tss.as_slices().0);
|
||||
let (pulse_anchor, pulse_off) = crate::pulse_offs_from_abs(&self.vals.pulses.as_slices().0);
|
||||
let ret = Self::Output {
|
||||
ts_anchor_sec: tst.0,
|
||||
ts_off_ms: tst.1,
|
||||
ts_off_ns: tst.2,
|
||||
pulse_anchor,
|
||||
pulse_off,
|
||||
mins,
|
||||
maxs,
|
||||
avgs,
|
||||
range_final: self.range_final,
|
||||
timed_out: self.timed_out,
|
||||
continue_at,
|
||||
};
|
||||
Ok(ret)*/
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
||||
impl<NTY> CollectableType for EventsXbinDim0<NTY>
|
||||
where
|
||||
NTY: ScalarOps,
|
||||
{
|
||||
type Collector = EventsXbinDim0Collector<NTY>;
|
||||
|
||||
fn new_collector() -> Self::Collector {
|
||||
Self::Collector::new()
|
||||
}
|
||||
}
|
||||
@@ -1,14 +1,12 @@
|
||||
pub mod accounting;
|
||||
pub mod binning;
|
||||
pub mod binsdim0;
|
||||
pub mod binsxbindim0;
|
||||
pub mod channelevents;
|
||||
pub mod empty;
|
||||
pub mod eventfull;
|
||||
pub mod eventsdim0;
|
||||
pub mod eventsdim0enum;
|
||||
pub mod eventsdim1;
|
||||
pub mod eventsxbindim0;
|
||||
pub mod framable;
|
||||
pub mod frame;
|
||||
pub mod inmem;
|
||||
@@ -26,11 +24,9 @@ use futures_util::Stream;
|
||||
use items_0::isodate::IsoDateTime;
|
||||
use items_0::streamitem::Sitemty;
|
||||
use items_0::transform::EventTransform;
|
||||
use items_0::Empty;
|
||||
use items_0::Events;
|
||||
use items_0::MergeError;
|
||||
use merger::Mergeable;
|
||||
use netpod::range::evrange::SeriesRange;
|
||||
use netpod::timeunits::*;
|
||||
use std::collections::VecDeque;
|
||||
use std::fmt;
|
||||
@@ -47,7 +43,10 @@ pub fn ts_offs_from_abs(tss: &[u64]) -> (u64, VecDeque<u64>, VecDeque<u64>) {
|
||||
(ts_anchor_sec, ts_off_ms, ts_off_ns)
|
||||
}
|
||||
|
||||
pub fn ts_offs_from_abs_with_anchor(ts_anchor_sec: u64, tss: &[u64]) -> (VecDeque<u64>, VecDeque<u64>) {
|
||||
pub fn ts_offs_from_abs_with_anchor(
|
||||
ts_anchor_sec: u64,
|
||||
tss: &[u64],
|
||||
) -> (VecDeque<u64>, VecDeque<u64>) {
|
||||
let ts_anchor_ns = ts_anchor_sec * SEC;
|
||||
let ts_off_ms: VecDeque<_> = tss.iter().map(|&k| (k - ts_anchor_ns) / MS).collect();
|
||||
let ts_off_ns = tss
|
||||
@@ -173,6 +172,12 @@ impl Mergeable for Box<dyn Events> {
|
||||
}
|
||||
}
|
||||
|
||||
pub trait ChannelEventsInput: Stream<Item = Sitemty<ChannelEvents>> + EventTransform + Send {}
|
||||
pub trait ChannelEventsInput:
|
||||
Stream<Item = Sitemty<ChannelEvents>> + EventTransform + Send
|
||||
{
|
||||
}
|
||||
|
||||
impl<T> ChannelEventsInput for T where T: Stream<Item = Sitemty<ChannelEvents>> + EventTransform + Send {}
|
||||
impl<T> ChannelEventsInput for T where
|
||||
T: Stream<Item = Sitemty<ChannelEvents>> + EventTransform + Send
|
||||
{
|
||||
}
|
||||
|
||||
@@ -26,23 +26,11 @@ use std::task::Poll;
|
||||
const OUT_MAX_BYTES: u64 = 1024 * 200;
|
||||
const DO_DETECT_NON_MONO: bool = true;
|
||||
|
||||
#[allow(unused)]
|
||||
macro_rules! trace2 {
|
||||
($($arg:tt)*) => {};
|
||||
($($arg:tt)*) => { trace!($($arg)*) };
|
||||
}
|
||||
macro_rules! trace2 { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) }
|
||||
|
||||
#[allow(unused)]
|
||||
macro_rules! trace3 {
|
||||
($($arg:tt)*) => {};
|
||||
($($arg:tt)*) => { trace!($($arg)*) };
|
||||
}
|
||||
macro_rules! trace3 { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) }
|
||||
|
||||
#[allow(unused)]
|
||||
macro_rules! trace4 {
|
||||
($($arg:tt)*) => {};
|
||||
($($arg:tt)*) => { trace!($($arg)*) };
|
||||
}
|
||||
macro_rules! trace4 { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) }
|
||||
|
||||
pub trait Mergeable<Rhs = Self>: fmt::Debug + WithLen + ByteEstimate + Unpin {
|
||||
fn ts_min(&self) -> Option<u64>;
|
||||
@@ -311,7 +299,8 @@ where
|
||||
}
|
||||
StreamItem::Stats(item) => {
|
||||
// TODO limit queue length
|
||||
self.out_of_band_queue.push_back(Ok(StreamItem::Stats(item)));
|
||||
self.out_of_band_queue
|
||||
.push_back(Ok(StreamItem::Stats(item)));
|
||||
continue;
|
||||
}
|
||||
},
|
||||
@@ -337,7 +326,10 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
fn poll3(mut self: Pin<&mut Self>, cx: &mut Context) -> ControlFlow<Poll<Option<Result<T, Error>>>> {
|
||||
fn poll3(
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &mut Context,
|
||||
) -> ControlFlow<Poll<Option<Result<T, Error>>>> {
|
||||
use ControlFlow::*;
|
||||
use Poll::*;
|
||||
trace4!("poll3");
|
||||
@@ -364,9 +356,17 @@ where
|
||||
}
|
||||
}
|
||||
if let Some(o) = self.out.as_ref() {
|
||||
if o.len() >= self.out_max_len || o.byte_estimate() >= OUT_MAX_BYTES || self.do_clear_out || last_emit {
|
||||
if o.len() >= self.out_max_len
|
||||
|| o.byte_estimate() >= OUT_MAX_BYTES
|
||||
|| self.do_clear_out
|
||||
|| last_emit
|
||||
{
|
||||
if o.len() > self.out_max_len {
|
||||
debug!("MERGER OVERWEIGHT ITEM {} vs {}", o.len(), self.out_max_len);
|
||||
debug!(
|
||||
"MERGER OVERWEIGHT ITEM {} vs {}",
|
||||
o.len(),
|
||||
self.out_max_len
|
||||
);
|
||||
}
|
||||
trace3!("decide to output");
|
||||
self.do_clear_out = false;
|
||||
@@ -388,7 +388,10 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
fn poll2(mut self: Pin<&mut Self>, cx: &mut Context) -> ControlFlow<Poll<Option<Result<T, Error>>>> {
|
||||
fn poll2(
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &mut Context,
|
||||
) -> ControlFlow<Poll<Option<Result<T, Error>>>> {
|
||||
use ControlFlow::*;
|
||||
use Poll::*;
|
||||
match Self::refill(Pin::new(&mut self), cx) {
|
||||
@@ -426,7 +429,9 @@ where
|
||||
self.done_range_complete = true;
|
||||
if self.range_complete.iter().all(|x| *x) {
|
||||
trace!("emit RangeComplete");
|
||||
Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete))))
|
||||
Ready(Some(Ok(StreamItem::DataItem(
|
||||
RangeCompletableItem::RangeComplete,
|
||||
))))
|
||||
} else {
|
||||
continue;
|
||||
}
|
||||
@@ -485,7 +490,7 @@ impl<T> EventTransform for Merger<T>
|
||||
where
|
||||
T: Send,
|
||||
{
|
||||
fn transform(&mut self, src: Box<dyn Events>) -> Box<dyn Events> {
|
||||
fn transform(&mut self, _src: Box<dyn Events>) -> Box<dyn Events> {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user