Merger for events and channel status
This commit is contained in:
1
.gitignore
vendored
1
.gitignore
vendored
@@ -4,4 +4,5 @@
|
||||
/.idea
|
||||
/.vscode
|
||||
/tmpdata
|
||||
/tmpdoc
|
||||
/docs
|
||||
|
||||
@@ -17,6 +17,7 @@ num-traits = "0.2.15"
|
||||
chrono = { version = "0.4.19", features = ["serde"] }
|
||||
crc32fast = "1.3.2"
|
||||
futures-util = "0.3.24"
|
||||
tokio = { version = "1.20", features = ["rt-multi-thread", "sync"] }
|
||||
err = { path = "../err" }
|
||||
items_proc = { path = "../items_proc" }
|
||||
netpod = { path = "../netpod" }
|
||||
|
||||
@@ -1,8 +1,6 @@
|
||||
use crate::streams::{CollectableType, CollectorType, ToJsonResult};
|
||||
use crate::{
|
||||
ts_offs_from_abs, AppendEmptyBin, Empty, IsoDateTime, ScalarOps, TimeBinnable, TimeBinnableType,
|
||||
TimeBinnableTypeAggregator, TimeBinned, TimeBinner, TimeSeries, WithLen,
|
||||
};
|
||||
use crate::{ts_offs_from_abs, AppendEmptyBin, Empty, IsoDateTime, RangeOverlapInfo, ScalarOps, TimeBins, WithLen};
|
||||
use crate::{TimeBinnable, TimeBinnableType, TimeBinnableTypeAggregator, TimeBinned, TimeBinner};
|
||||
use chrono::{TimeZone, Utc};
|
||||
use err::Error;
|
||||
use netpod::log::*;
|
||||
@@ -17,12 +15,12 @@ use std::{fmt, mem};
|
||||
|
||||
#[derive(Clone, Serialize, Deserialize)]
|
||||
pub struct MinMaxAvgDim0Bins<NTY> {
|
||||
pub ts1s: Vec<u64>,
|
||||
pub ts2s: Vec<u64>,
|
||||
pub counts: Vec<u64>,
|
||||
pub mins: Vec<NTY>,
|
||||
pub maxs: Vec<NTY>,
|
||||
pub avgs: Vec<f32>,
|
||||
pub ts1s: VecDeque<u64>,
|
||||
pub ts2s: VecDeque<u64>,
|
||||
pub counts: VecDeque<u64>,
|
||||
pub mins: VecDeque<NTY>,
|
||||
pub maxs: VecDeque<NTY>,
|
||||
pub avgs: VecDeque<f32>,
|
||||
}
|
||||
|
||||
impl<NTY> fmt::Debug for MinMaxAvgDim0Bins<NTY>
|
||||
@@ -48,12 +46,12 @@ where
|
||||
impl<NTY> MinMaxAvgDim0Bins<NTY> {
|
||||
pub fn empty() -> Self {
|
||||
Self {
|
||||
ts1s: vec![],
|
||||
ts2s: vec![],
|
||||
counts: vec![],
|
||||
mins: vec![],
|
||||
maxs: vec![],
|
||||
avgs: vec![],
|
||||
ts1s: VecDeque::new(),
|
||||
ts2s: VecDeque::new(),
|
||||
counts: VecDeque::new(),
|
||||
mins: VecDeque::new(),
|
||||
maxs: VecDeque::new(),
|
||||
avgs: VecDeque::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -64,41 +62,71 @@ impl<NTY> WithLen for MinMaxAvgDim0Bins<NTY> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<NTY> RangeOverlapInfo for MinMaxAvgDim0Bins<NTY> {
|
||||
fn ends_before(&self, range: NanoRange) -> bool {
|
||||
if let Some(&max) = self.ts2s.back() {
|
||||
max <= range.beg
|
||||
} else {
|
||||
true
|
||||
}
|
||||
}
|
||||
|
||||
fn ends_after(&self, range: NanoRange) -> bool {
|
||||
if let Some(&max) = self.ts2s.back() {
|
||||
max > range.end
|
||||
} else {
|
||||
true
|
||||
}
|
||||
}
|
||||
|
||||
fn starts_after(&self, range: NanoRange) -> bool {
|
||||
if let Some(&min) = self.ts1s.front() {
|
||||
min >= range.end
|
||||
} else {
|
||||
true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<NTY> Empty for MinMaxAvgDim0Bins<NTY> {
|
||||
fn empty() -> Self {
|
||||
Self {
|
||||
ts1s: Vec::new(),
|
||||
ts2s: Vec::new(),
|
||||
counts: Vec::new(),
|
||||
mins: Vec::new(),
|
||||
maxs: Vec::new(),
|
||||
avgs: Vec::new(),
|
||||
ts1s: Default::default(),
|
||||
ts2s: Default::default(),
|
||||
counts: Default::default(),
|
||||
mins: Default::default(),
|
||||
maxs: Default::default(),
|
||||
avgs: Default::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<NTY: ScalarOps> AppendEmptyBin for MinMaxAvgDim0Bins<NTY> {
|
||||
fn append_empty_bin(&mut self, ts1: u64, ts2: u64) {
|
||||
self.ts1s.push(ts1);
|
||||
self.ts2s.push(ts2);
|
||||
self.counts.push(0);
|
||||
self.mins.push(NTY::zero());
|
||||
self.maxs.push(NTY::zero());
|
||||
self.avgs.push(0.);
|
||||
self.ts1s.push_back(ts1);
|
||||
self.ts2s.push_back(ts2);
|
||||
self.counts.push_back(0);
|
||||
self.mins.push_back(NTY::zero());
|
||||
self.maxs.push_back(NTY::zero());
|
||||
self.avgs.push_back(0.);
|
||||
}
|
||||
}
|
||||
|
||||
impl<NTY: ScalarOps> TimeSeries for MinMaxAvgDim0Bins<NTY> {
|
||||
impl<NTY: ScalarOps> TimeBins for MinMaxAvgDim0Bins<NTY> {
|
||||
fn ts_min(&self) -> Option<u64> {
|
||||
todo!("collection of bins can not be TimeSeries")
|
||||
self.ts1s.front().map(Clone::clone)
|
||||
}
|
||||
|
||||
fn ts_max(&self) -> Option<u64> {
|
||||
todo!("collection of bins can not be TimeSeries")
|
||||
self.ts2s.back().map(Clone::clone)
|
||||
}
|
||||
|
||||
fn ts_min_max(&self) -> Option<(u64, u64)> {
|
||||
todo!("collection of bins can not be TimeSeries")
|
||||
if let (Some(min), Some(max)) = (self.ts1s.front().map(Clone::clone), self.ts2s.back().map(Clone::clone)) {
|
||||
Some((min, max))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -131,13 +159,13 @@ pub struct MinMaxAvgBinsCollectedResult<NTY> {
|
||||
#[serde(rename = "tsAnchor")]
|
||||
ts_anchor_sec: u64,
|
||||
#[serde(rename = "tsMs")]
|
||||
ts_off_ms: Vec<u64>,
|
||||
ts_off_ms: VecDeque<u64>,
|
||||
#[serde(rename = "tsNs")]
|
||||
ts_off_ns: Vec<u64>,
|
||||
counts: Vec<u64>,
|
||||
mins: Vec<NTY>,
|
||||
maxs: Vec<NTY>,
|
||||
avgs: Vec<f32>,
|
||||
ts_off_ns: VecDeque<u64>,
|
||||
counts: VecDeque<u64>,
|
||||
mins: VecDeque<NTY>,
|
||||
maxs: VecDeque<NTY>,
|
||||
avgs: VecDeque<f32>,
|
||||
#[serde(skip_serializing_if = "crate::bool_is_false", rename = "finalisedRange")]
|
||||
finalised_range: bool,
|
||||
#[serde(skip_serializing_if = "Zero::is_zero", rename = "missingBins")]
|
||||
@@ -196,12 +224,12 @@ impl<NTY: ScalarOps> CollectorType for MinMaxAvgBinsCollector<NTY> {
|
||||
// TODO could save the copy:
|
||||
let mut ts_all = self.vals.ts1s.clone();
|
||||
if self.vals.ts2s.len() > 0 {
|
||||
ts_all.push(*self.vals.ts2s.last().unwrap());
|
||||
ts_all.push_back(*self.vals.ts2s.back().unwrap());
|
||||
}
|
||||
info!("TODO return proper continueAt");
|
||||
let bin_count_exp = 100 as u32;
|
||||
let continue_at = if self.vals.ts1s.len() < bin_count_exp as usize {
|
||||
match ts_all.last() {
|
||||
match ts_all.back() {
|
||||
Some(&k) => {
|
||||
let iso = IsoDateTime(Utc.timestamp_nanos(k as i64));
|
||||
Some(iso)
|
||||
@@ -211,11 +239,14 @@ impl<NTY: ScalarOps> CollectorType for MinMaxAvgBinsCollector<NTY> {
|
||||
} else {
|
||||
None
|
||||
};
|
||||
let tst = ts_offs_from_abs(&ts_all);
|
||||
let counts = mem::replace(&mut self.vals.counts, Vec::new());
|
||||
let mins = mem::replace(&mut self.vals.mins, Vec::new());
|
||||
let maxs = mem::replace(&mut self.vals.maxs, Vec::new());
|
||||
let avgs = mem::replace(&mut self.vals.avgs, Vec::new());
|
||||
if ts_all.as_slices().1.len() != 0 {
|
||||
panic!();
|
||||
}
|
||||
let tst = ts_offs_from_abs(ts_all.as_slices().0);
|
||||
let counts = mem::replace(&mut self.vals.counts, VecDeque::new());
|
||||
let mins = mem::replace(&mut self.vals.mins, VecDeque::new());
|
||||
let maxs = mem::replace(&mut self.vals.maxs, VecDeque::new());
|
||||
let avgs = mem::replace(&mut self.vals.avgs, VecDeque::new());
|
||||
let ret = MinMaxAvgBinsCollectedResult::<NTY> {
|
||||
ts_anchor_sec: tst.0,
|
||||
ts_off_ms: tst.1,
|
||||
@@ -302,12 +333,12 @@ impl<NTY: ScalarOps> TimeBinnableTypeAggregator for MinMaxAvgDim0BinsAggregator<
|
||||
self.avg = self.sum / self.sumc as f32;
|
||||
}
|
||||
let ret = Self::Output {
|
||||
ts1s: vec![self.range.beg],
|
||||
ts2s: vec![self.range.end],
|
||||
counts: vec![self.count],
|
||||
mins: vec![self.min.clone()],
|
||||
maxs: vec![self.max.clone()],
|
||||
avgs: vec![self.avg],
|
||||
ts1s: [self.range.beg].into(),
|
||||
ts2s: [self.range.end].into(),
|
||||
counts: [self.count].into(),
|
||||
mins: [self.min.clone()].into(),
|
||||
maxs: [self.max.clone()].into(),
|
||||
avgs: [self.avg].into(),
|
||||
};
|
||||
self.range = range;
|
||||
self.count = 0;
|
||||
@@ -499,23 +530,33 @@ impl<NTY: ScalarOps> TimeBinned for MinMaxAvgDim0Bins<NTY> {
|
||||
}
|
||||
|
||||
fn edges_slice(&self) -> (&[u64], &[u64]) {
|
||||
(&self.ts1s[..], &self.ts2s[..])
|
||||
if self.ts1s.as_slices().1.len() != 0 {
|
||||
panic!();
|
||||
}
|
||||
if self.ts2s.as_slices().1.len() != 0 {
|
||||
panic!();
|
||||
}
|
||||
(&self.ts1s.as_slices().0, &self.ts2s.as_slices().0)
|
||||
}
|
||||
|
||||
fn counts(&self) -> &[u64] {
|
||||
&self.counts[..]
|
||||
// TODO check for contiguous
|
||||
self.counts.as_slices().0
|
||||
}
|
||||
|
||||
// TODO is Vec needed?
|
||||
fn mins(&self) -> Vec<f32> {
|
||||
self.mins.iter().map(|x| x.clone().as_prim_f32()).collect()
|
||||
}
|
||||
|
||||
// TODO is Vec needed?
|
||||
fn maxs(&self) -> Vec<f32> {
|
||||
self.maxs.iter().map(|x| x.clone().as_prim_f32()).collect()
|
||||
}
|
||||
|
||||
// TODO is Vec needed?
|
||||
fn avgs(&self) -> Vec<f32> {
|
||||
self.avgs.clone()
|
||||
self.avgs.iter().map(Clone::clone).collect()
|
||||
}
|
||||
|
||||
fn validate(&self) -> Result<(), String> {
|
||||
|
||||
@@ -1,10 +1,8 @@
|
||||
use crate::binsdim0::MinMaxAvgDim0Bins;
|
||||
use crate::streams::{CollectableType, CollectorType, ToJsonResult};
|
||||
use crate::{pulse_offs_from_abs, ts_offs_from_abs};
|
||||
use crate::{
|
||||
Empty, Events, ScalarOps, TimeBinnable, TimeBinnableType, TimeBinnableTypeAggregator, TimeBinner, TimeSeries,
|
||||
WithLen,
|
||||
};
|
||||
use crate::{pulse_offs_from_abs, ts_offs_from_abs, RangeOverlapInfo};
|
||||
use crate::{Empty, Events, ScalarOps, WithLen};
|
||||
use crate::{TimeBinnable, TimeBinnableType, TimeBinnableTypeAggregator, TimeBinner};
|
||||
use err::Error;
|
||||
use netpod::log::*;
|
||||
use netpod::NanoRange;
|
||||
@@ -13,30 +11,28 @@ use std::any::Any;
|
||||
use std::collections::VecDeque;
|
||||
use std::{fmt, mem};
|
||||
|
||||
// TODO in this module reduce clones.
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
#[derive(Clone, PartialEq, Serialize, Deserialize)]
|
||||
pub struct EventsDim0<NTY> {
|
||||
pub tss: Vec<u64>,
|
||||
pub pulses: Vec<u64>,
|
||||
pub values: Vec<NTY>,
|
||||
pub tss: VecDeque<u64>,
|
||||
pub pulses: VecDeque<u64>,
|
||||
pub values: VecDeque<NTY>,
|
||||
}
|
||||
|
||||
impl<NTY> EventsDim0<NTY> {
|
||||
#[inline(always)]
|
||||
pub fn push(&mut self, ts: u64, pulse: u64, value: NTY) {
|
||||
self.tss.push(ts);
|
||||
self.pulses.push(pulse);
|
||||
self.values.push(value);
|
||||
self.tss.push_back(ts);
|
||||
self.pulses.push_back(pulse);
|
||||
self.values.push_back(value);
|
||||
}
|
||||
}
|
||||
|
||||
impl<NTY> Empty for EventsDim0<NTY> {
|
||||
fn empty() -> Self {
|
||||
Self {
|
||||
tss: vec![],
|
||||
pulses: vec![],
|
||||
values: vec![],
|
||||
tss: VecDeque::new(),
|
||||
pulses: VecDeque::new(),
|
||||
values: VecDeque::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -50,10 +46,10 @@ where
|
||||
fmt,
|
||||
"count {} ts {:?} .. {:?} vals {:?} .. {:?}",
|
||||
self.tss.len(),
|
||||
self.tss.first(),
|
||||
self.tss.last(),
|
||||
self.values.first(),
|
||||
self.values.last(),
|
||||
self.tss.front(),
|
||||
self.tss.back(),
|
||||
self.values.front(),
|
||||
self.values.back(),
|
||||
)
|
||||
}
|
||||
}
|
||||
@@ -64,20 +60,28 @@ impl<NTY> WithLen for EventsDim0<NTY> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<NTY> TimeSeries for EventsDim0<NTY> {
|
||||
fn ts_min(&self) -> Option<u64> {
|
||||
self.tss.first().map(Clone::clone)
|
||||
}
|
||||
|
||||
fn ts_max(&self) -> Option<u64> {
|
||||
self.tss.last().map(Clone::clone)
|
||||
}
|
||||
|
||||
fn ts_min_max(&self) -> Option<(u64, u64)> {
|
||||
if self.tss.len() == 0 {
|
||||
None
|
||||
impl<NTY: ScalarOps> RangeOverlapInfo for EventsDim0<NTY> {
|
||||
fn ends_before(&self, range: NanoRange) -> bool {
|
||||
if let Some(&max) = self.tss.back() {
|
||||
max < range.beg
|
||||
} else {
|
||||
Some((self.tss.first().unwrap().clone(), self.tss.last().unwrap().clone()))
|
||||
true
|
||||
}
|
||||
}
|
||||
|
||||
fn ends_after(&self, range: NanoRange) -> bool {
|
||||
if let Some(&max) = self.tss.back() {
|
||||
max >= range.end
|
||||
} else {
|
||||
true
|
||||
}
|
||||
}
|
||||
|
||||
fn starts_after(&self, range: NanoRange) -> bool {
|
||||
if let Some(&min) = self.tss.front() {
|
||||
min >= range.end
|
||||
} else {
|
||||
true
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -123,14 +127,14 @@ pub struct EventValuesCollectorOutput<NTY> {
|
||||
#[serde(rename = "tsAnchor")]
|
||||
ts_anchor_sec: u64,
|
||||
#[serde(rename = "tsMs")]
|
||||
ts_off_ms: Vec<u64>,
|
||||
ts_off_ms: VecDeque<u64>,
|
||||
#[serde(rename = "tsNs")]
|
||||
ts_off_ns: Vec<u64>,
|
||||
ts_off_ns: VecDeque<u64>,
|
||||
#[serde(rename = "pulseAnchor")]
|
||||
pulse_anchor: u64,
|
||||
#[serde(rename = "pulseOff")]
|
||||
pulse_off: Vec<u64>,
|
||||
values: Vec<NTY>,
|
||||
pulse_off: VecDeque<u64>,
|
||||
values: VecDeque<NTY>,
|
||||
#[serde(skip_serializing_if = "crate::bool_is_false", rename = "finalisedRange")]
|
||||
range_complete: bool,
|
||||
#[serde(skip_serializing_if = "crate::bool_is_false", rename = "timedOut")]
|
||||
@@ -164,15 +168,16 @@ impl<NTY: ScalarOps> CollectorType for EventValuesCollector<NTY> {
|
||||
}
|
||||
|
||||
fn result(&mut self) -> Result<Self::Output, Error> {
|
||||
let tst = ts_offs_from_abs(&self.vals.tss);
|
||||
let (pulse_anchor, pulse_off) = pulse_offs_from_abs(&self.vals.pulses);
|
||||
// TODO require contiguous slices
|
||||
let tst = ts_offs_from_abs(&self.vals.tss.as_slices().0);
|
||||
let (pulse_anchor, pulse_off) = pulse_offs_from_abs(&self.vals.pulses.as_slices().0);
|
||||
let ret = Self::Output {
|
||||
ts_anchor_sec: tst.0,
|
||||
ts_off_ms: tst.1,
|
||||
ts_off_ns: tst.2,
|
||||
pulse_anchor,
|
||||
pulse_off,
|
||||
values: mem::replace(&mut self.vals.values, Vec::new()),
|
||||
pulse_off: pulse_off,
|
||||
values: mem::replace(&mut self.vals.values, VecDeque::new()),
|
||||
range_complete: self.range_complete,
|
||||
timed_out: self.timed_out,
|
||||
};
|
||||
@@ -345,12 +350,12 @@ impl<NTY: ScalarOps> EventValuesAggregator<NTY> {
|
||||
(g.clone(), g.clone(), g.as_prim_f32())
|
||||
};
|
||||
let ret = MinMaxAvgDim0Bins {
|
||||
ts1s: vec![self.range.beg],
|
||||
ts2s: vec![self.range.end],
|
||||
counts: vec![self.count],
|
||||
mins: vec![min],
|
||||
maxs: vec![max],
|
||||
avgs: vec![avg],
|
||||
ts1s: [self.range.beg].into(),
|
||||
ts2s: [self.range.end].into(),
|
||||
counts: [self.count].into(),
|
||||
mins: [min].into(),
|
||||
maxs: [max].into(),
|
||||
avgs: [avg].into(),
|
||||
};
|
||||
self.int_ts = range.beg;
|
||||
self.range = range;
|
||||
@@ -379,12 +384,12 @@ impl<NTY: ScalarOps> EventValuesAggregator<NTY> {
|
||||
(g.clone(), g.clone(), g.as_prim_f32())
|
||||
};
|
||||
let ret = MinMaxAvgDim0Bins {
|
||||
ts1s: vec![self.range.beg],
|
||||
ts2s: vec![self.range.end],
|
||||
counts: vec![self.count],
|
||||
mins: vec![min],
|
||||
maxs: vec![max],
|
||||
avgs: vec![avg],
|
||||
ts1s: [self.range.beg].into(),
|
||||
ts2s: [self.range.end].into(),
|
||||
counts: [self.count].into(),
|
||||
mins: [min].into(),
|
||||
maxs: [max].into(),
|
||||
avgs: [avg].into(),
|
||||
};
|
||||
self.int_ts = range.beg;
|
||||
self.range = range;
|
||||
@@ -474,6 +479,27 @@ impl<NTY: ScalarOps> Events for EventsDim0<NTY> {
|
||||
fn as_collectable_mut(&mut self) -> &mut dyn crate::streams::Collectable {
|
||||
self
|
||||
}
|
||||
|
||||
fn take_new_events_until_ts(&mut self, ts_end: u64) -> Box<dyn Events> {
|
||||
let n1 = self.tss.iter().take_while(|&&x| x < ts_end).count();
|
||||
let tss = self.tss.drain(..n1).collect();
|
||||
let pulses = self.pulses.drain(..n1).collect();
|
||||
let values = self.values.drain(..n1).collect();
|
||||
let ret = Self { tss, pulses, values };
|
||||
Box::new(ret)
|
||||
}
|
||||
|
||||
fn ts_min(&self) -> Option<u64> {
|
||||
self.tss.front().map(|&x| x)
|
||||
}
|
||||
|
||||
fn partial_eq_dyn(&self, other: &dyn Events) -> bool {
|
||||
if let Some(other) = other.as_any().downcast_ref::<Self>() {
|
||||
self == other
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct ScalarEventsTimeBinner<NTY: ScalarOps> {
|
||||
|
||||
@@ -1,15 +1,19 @@
|
||||
pub mod binsdim0;
|
||||
pub mod eventsdim0;
|
||||
pub mod streams;
|
||||
#[cfg(test)]
|
||||
pub mod test;
|
||||
|
||||
use chrono::{DateTime, TimeZone, Utc};
|
||||
use futures_util::Stream;
|
||||
use netpod::log::error;
|
||||
use netpod::log::*;
|
||||
use netpod::timeunits::*;
|
||||
use netpod::{AggKind, NanoRange, ScalarType, Shape};
|
||||
use serde::{Deserialize, Serialize, Serializer};
|
||||
use std::any::Any;
|
||||
use std::collections::VecDeque;
|
||||
use std::fmt;
|
||||
use std::ops::ControlFlow;
|
||||
use std::pin::Pin;
|
||||
use std::task::{Context, Poll};
|
||||
use streams::Collectable;
|
||||
@@ -18,10 +22,11 @@ pub fn bool_is_false(x: &bool) -> bool {
|
||||
*x == false
|
||||
}
|
||||
|
||||
pub fn ts_offs_from_abs(tss: &[u64]) -> (u64, Vec<u64>, Vec<u64>) {
|
||||
// TODO take iterator instead of slice, because a VecDeque can't produce a slice in general.
|
||||
pub fn ts_offs_from_abs(tss: &[u64]) -> (u64, VecDeque<u64>, VecDeque<u64>) {
|
||||
let ts_anchor_sec = tss.first().map_or(0, |&k| k) / SEC;
|
||||
let ts_anchor_ns = ts_anchor_sec * SEC;
|
||||
let ts_off_ms: Vec<_> = tss.iter().map(|&k| (k - ts_anchor_ns) / MS).collect();
|
||||
let ts_off_ms: VecDeque<_> = tss.iter().map(|&k| (k - ts_anchor_ns) / MS).collect();
|
||||
let ts_off_ns = tss
|
||||
.iter()
|
||||
.zip(ts_off_ms.iter().map(|&k| k * MS))
|
||||
@@ -30,9 +35,10 @@ pub fn ts_offs_from_abs(tss: &[u64]) -> (u64, Vec<u64>, Vec<u64>) {
|
||||
(ts_anchor_sec, ts_off_ms, ts_off_ns)
|
||||
}
|
||||
|
||||
pub fn pulse_offs_from_abs(pulse: &[u64]) -> (u64, Vec<u64>) {
|
||||
// TODO take iterator instead of slice, because a VecDeque can't produce a slice in general.
|
||||
pub fn pulse_offs_from_abs(pulse: &[u64]) -> (u64, VecDeque<u64>) {
|
||||
let pulse_anchor = pulse.first().map_or(0, |k| *k);
|
||||
let pulse_off: Vec<_> = pulse.iter().map(|k| *k - pulse_anchor).collect();
|
||||
let pulse_off = pulse.iter().map(|k| *k - pulse_anchor).collect();
|
||||
(pulse_anchor, pulse_off)
|
||||
}
|
||||
|
||||
@@ -104,7 +110,14 @@ impl_num_ops!(f64, 0.);
|
||||
#[allow(unused)]
|
||||
struct Ts(u64);
|
||||
|
||||
struct Error {
|
||||
#[derive(Debug, PartialEq)]
|
||||
pub enum ErrorKind {
|
||||
#[allow(unused)]
|
||||
MismatchedType,
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq)]
|
||||
pub struct Error {
|
||||
#[allow(unused)]
|
||||
kind: ErrorKind,
|
||||
}
|
||||
@@ -115,16 +128,12 @@ impl From<ErrorKind> for Error {
|
||||
}
|
||||
}
|
||||
|
||||
enum ErrorKind {
|
||||
#[allow(unused)]
|
||||
MismatchedType,
|
||||
}
|
||||
|
||||
pub trait WithLen {
|
||||
fn len(&self) -> usize;
|
||||
}
|
||||
|
||||
pub trait TimeSeries {
|
||||
// TODO can probably be removed.
|
||||
pub trait TimeBins {
|
||||
fn ts_min(&self) -> Option<u64>;
|
||||
fn ts_max(&self) -> Option<u64>;
|
||||
fn ts_min_max(&self) -> Option<(u64, u64)>;
|
||||
@@ -140,71 +149,12 @@ pub enum Fits {
|
||||
PartlyLowerAndGreater,
|
||||
}
|
||||
|
||||
// TODO can this be removed?
|
||||
pub trait FitsInside {
|
||||
fn fits_inside(&self, range: NanoRange) -> Fits;
|
||||
}
|
||||
|
||||
impl<T> FitsInside for T
|
||||
where
|
||||
T: TimeSeries,
|
||||
{
|
||||
fn fits_inside(&self, range: NanoRange) -> Fits {
|
||||
if let Some((min, max)) = self.ts_min_max() {
|
||||
if max <= range.beg {
|
||||
Fits::Lower
|
||||
} else if min >= range.end {
|
||||
Fits::Greater
|
||||
} else if min < range.beg && max > range.end {
|
||||
Fits::PartlyLowerAndGreater
|
||||
} else if min < range.beg {
|
||||
Fits::PartlyLower
|
||||
} else if max > range.end {
|
||||
Fits::PartlyGreater
|
||||
} else {
|
||||
Fits::Inside
|
||||
}
|
||||
} else {
|
||||
Fits::Empty
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub trait RangeOverlapInfo {
|
||||
fn ends_before(&self, range: NanoRange) -> bool;
|
||||
fn ends_after(&self, range: NanoRange) -> bool;
|
||||
fn starts_after(&self, range: NanoRange) -> bool;
|
||||
}
|
||||
|
||||
impl<T> RangeOverlapInfo for T
|
||||
where
|
||||
T: TimeSeries,
|
||||
{
|
||||
fn ends_before(&self, range: NanoRange) -> bool {
|
||||
if let Some(max) = self.ts_max() {
|
||||
max <= range.beg
|
||||
} else {
|
||||
true
|
||||
}
|
||||
}
|
||||
|
||||
fn ends_after(&self, range: NanoRange) -> bool {
|
||||
if let Some(max) = self.ts_max() {
|
||||
max > range.end
|
||||
} else {
|
||||
true
|
||||
}
|
||||
}
|
||||
|
||||
fn starts_after(&self, range: NanoRange) -> bool {
|
||||
if let Some(min) = self.ts_min() {
|
||||
min >= range.end
|
||||
} else {
|
||||
true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub trait EmptyForScalarTypeShape {
|
||||
fn empty(scalar_type: ScalarType, shape: Shape) -> Self;
|
||||
}
|
||||
@@ -262,11 +212,20 @@ pub trait TimeBinnable: WithLen + RangeOverlapInfo + Any + Send {
|
||||
}
|
||||
|
||||
/// Container of some form of events, for use as trait object.
|
||||
pub trait Events: Collectable + TimeBinnable {
|
||||
pub trait Events: fmt::Debug + Any + Collectable + TimeBinnable {
|
||||
fn as_time_binnable_dyn(&self) -> &dyn TimeBinnable;
|
||||
fn verify(&self);
|
||||
fn output_info(&self);
|
||||
fn as_collectable_mut(&mut self) -> &mut dyn Collectable;
|
||||
fn ts_min(&self) -> Option<u64>;
|
||||
fn take_new_events_until_ts(&mut self, ts_end: u64) -> Box<dyn Events>;
|
||||
fn partial_eq_dyn(&self, other: &dyn Events) -> bool;
|
||||
}
|
||||
|
||||
impl PartialEq for Box<dyn Events> {
|
||||
fn eq(&self, other: &Self) -> bool {
|
||||
Events::partial_eq_dyn(self.as_ref(), other.as_ref())
|
||||
}
|
||||
}
|
||||
|
||||
/// Data in time-binned form.
|
||||
@@ -387,60 +346,329 @@ pub fn empty_binned_dyn(scalar_type: &ScalarType, shape: &Shape, agg_kind: &AggK
|
||||
}
|
||||
}
|
||||
|
||||
pub enum ConnStatus {}
|
||||
#[derive(Clone, Debug, PartialEq)]
|
||||
pub enum ConnStatus {
|
||||
Connect,
|
||||
Disconnect,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, PartialEq)]
|
||||
pub struct ConnStatusEvent {
|
||||
pub ts: u64,
|
||||
pub status: ConnStatus,
|
||||
}
|
||||
|
||||
trait MergableEvents: Any {
|
||||
fn len(&self) -> usize;
|
||||
fn ts_min(&self) -> Option<u64>;
|
||||
fn ts_max(&self) -> Option<u64>;
|
||||
fn take_from(&mut self, src: &mut dyn MergableEvents, ts_end: u64) -> Result<(), Error>;
|
||||
}
|
||||
|
||||
impl<T: MergableEvents> MergableEvents for Box<T> {
|
||||
fn ts_min(&self) -> Option<u64> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn ts_max(&self) -> Option<u64> {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum ChannelEvents {
|
||||
Events(Box<dyn Events>),
|
||||
Status(ConnStatusEvent),
|
||||
RangeComplete,
|
||||
}
|
||||
|
||||
impl MergableEvents for ChannelEvents {
|
||||
fn len(&self) -> usize {
|
||||
error!("TODO MergableEvents");
|
||||
todo!()
|
||||
impl PartialEq for ChannelEvents {
|
||||
fn eq(&self, other: &Self) -> bool {
|
||||
match (self, other) {
|
||||
(Self::Events(l0), Self::Events(r0)) => l0 == r0,
|
||||
(Self::Status(l0), Self::Status(r0)) => l0 == r0,
|
||||
_ => core::mem::discriminant(self) == core::mem::discriminant(other),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl ChannelEvents {
|
||||
fn forget_after_merge_process(&self) -> bool {
|
||||
use ChannelEvents::*;
|
||||
match self {
|
||||
Events(k) => k.len() == 0,
|
||||
Status(_) => true,
|
||||
RangeComplete => true,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl MergableEvents for ChannelEvents {
|
||||
fn ts_min(&self) -> Option<u64> {
|
||||
error!("TODO MergableEvents");
|
||||
todo!()
|
||||
use ChannelEvents::*;
|
||||
match self {
|
||||
Events(k) => k.ts_min(),
|
||||
Status(k) => Some(k.ts),
|
||||
RangeComplete => panic!(),
|
||||
}
|
||||
}
|
||||
|
||||
fn ts_max(&self) -> Option<u64> {
|
||||
error!("TODO MergableEvents");
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
||||
fn take_from(&mut self, _src: &mut dyn MergableEvents, _ts_end: u64) -> Result<(), Error> {
|
||||
error!("TODO MergableEvents");
|
||||
todo!()
|
||||
pub struct ChannelEventsMerger {
|
||||
inp1: Pin<Box<dyn Stream<Item = Result<ChannelEvents, Error>>>>,
|
||||
inp2: Pin<Box<dyn Stream<Item = Result<ChannelEvents, Error>>>>,
|
||||
inp1_done: bool,
|
||||
inp2_done: bool,
|
||||
inp1_item: Option<ChannelEvents>,
|
||||
inp2_item: Option<ChannelEvents>,
|
||||
out: Option<ChannelEvents>,
|
||||
range_complete: bool,
|
||||
done: bool,
|
||||
complete: bool,
|
||||
}
|
||||
|
||||
impl ChannelEventsMerger {
|
||||
pub fn new(
|
||||
inp1: Pin<Box<dyn Stream<Item = Result<ChannelEvents, Error>>>>,
|
||||
inp2: Pin<Box<dyn Stream<Item = Result<ChannelEvents, Error>>>>,
|
||||
) -> Self {
|
||||
Self {
|
||||
done: false,
|
||||
complete: false,
|
||||
inp1,
|
||||
inp2,
|
||||
inp1_done: false,
|
||||
inp2_done: false,
|
||||
inp1_item: None,
|
||||
inp2_item: None,
|
||||
out: None,
|
||||
range_complete: false,
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_no_ts_event(item: &mut Option<ChannelEvents>, range_complete: &mut bool) -> bool {
|
||||
match item {
|
||||
Some(k) => match k {
|
||||
ChannelEvents::Events(_) => false,
|
||||
ChannelEvents::Status(_) => false,
|
||||
ChannelEvents::RangeComplete => {
|
||||
*range_complete = true;
|
||||
item.take();
|
||||
true
|
||||
}
|
||||
},
|
||||
None => false,
|
||||
}
|
||||
}
|
||||
|
||||
fn process(mut self: Pin<&mut Self>, _cx: &mut Context) -> ControlFlow<Poll<Option<<Self as Stream>::Item>>> {
|
||||
eprintln!("process {self:?}");
|
||||
use ControlFlow::*;
|
||||
use Poll::*;
|
||||
// Some event types have no timestamp.
|
||||
let gg: &mut ChannelEventsMerger = &mut self;
|
||||
let &mut ChannelEventsMerger {
|
||||
inp1_item: ref mut item,
|
||||
range_complete: ref mut raco,
|
||||
..
|
||||
} = gg;
|
||||
if Self::handle_no_ts_event(item, raco) {
|
||||
return Continue(());
|
||||
}
|
||||
let &mut ChannelEventsMerger {
|
||||
inp2_item: ref mut item,
|
||||
range_complete: ref mut raco,
|
||||
..
|
||||
} = gg;
|
||||
if Self::handle_no_ts_event(item, raco) {
|
||||
return Continue(());
|
||||
}
|
||||
|
||||
// Find the two lowest ts.
|
||||
let mut tsj = [None, None];
|
||||
for (i1, item) in [&self.inp1_item, &self.inp2_item].into_iter().enumerate() {
|
||||
if let Some(a) = &item {
|
||||
if let Some(tsmin) = a.ts_min() {
|
||||
if let Some((_, k)) = tsj[0] {
|
||||
if tsmin < k {
|
||||
tsj[1] = tsj[0];
|
||||
tsj[0] = Some((i1, tsmin));
|
||||
} else {
|
||||
if let Some((_, k)) = tsj[1] {
|
||||
if tsmin < k {
|
||||
tsj[1] = Some((i1, tsmin));
|
||||
} else {
|
||||
}
|
||||
} else {
|
||||
tsj[1] = Some((i1, tsmin));
|
||||
}
|
||||
}
|
||||
} else {
|
||||
tsj[0] = Some((i1, tsmin));
|
||||
}
|
||||
} else {
|
||||
// TODO design such that this can't occur.
|
||||
warn!("found input item without timestamp");
|
||||
//Break(Ready(Some(Err(Error::))))
|
||||
}
|
||||
}
|
||||
}
|
||||
eprintln!("---------- Found lowest: {tsj:?}");
|
||||
|
||||
if tsj[0].is_none() {
|
||||
Continue(())
|
||||
} else if tsj[1].is_none() {
|
||||
let (_ts_min, itemref) = if tsj[0].as_mut().unwrap().0 == 0 {
|
||||
(tsj[0].as_ref().unwrap().1, self.inp1_item.as_mut())
|
||||
} else {
|
||||
(tsj[0].as_ref().unwrap().1, self.inp2_item.as_mut())
|
||||
};
|
||||
if itemref.is_none() {
|
||||
panic!("logic error");
|
||||
}
|
||||
// Precondition: at least one event is before the requested range.
|
||||
use ChannelEvents::*;
|
||||
let itemout = match itemref {
|
||||
Some(Events(k)) => Events(k.take_new_events_until_ts(u64::MAX)),
|
||||
Some(Status(k)) => Status(k.clone()),
|
||||
Some(RangeComplete) => RangeComplete,
|
||||
None => panic!(),
|
||||
};
|
||||
{
|
||||
// TODO refactor
|
||||
if tsj[0].as_mut().unwrap().0 == 0 {
|
||||
if let Some(item) = self.inp1_item.as_ref() {
|
||||
if item.forget_after_merge_process() {
|
||||
self.inp1_item.take();
|
||||
}
|
||||
}
|
||||
}
|
||||
if tsj[0].as_mut().unwrap().0 == 1 {
|
||||
if let Some(item) = self.inp2_item.as_ref() {
|
||||
if item.forget_after_merge_process() {
|
||||
self.inp2_item.take();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Break(Ready(Some(Ok(itemout))))
|
||||
} else {
|
||||
let (ts_end, itemref) = if tsj[0].as_mut().unwrap().0 == 0 {
|
||||
(tsj[1].as_ref().unwrap().1, self.inp1_item.as_mut())
|
||||
} else {
|
||||
(tsj[1].as_ref().unwrap().1, self.inp2_item.as_mut())
|
||||
};
|
||||
if itemref.is_none() {
|
||||
panic!("logic error");
|
||||
}
|
||||
// Precondition: at least one event is before the requested range.
|
||||
use ChannelEvents::*;
|
||||
let itemout = match itemref {
|
||||
Some(Events(k)) => Events(k.take_new_events_until_ts(ts_end)),
|
||||
Some(Status(k)) => Status(k.clone()),
|
||||
Some(RangeComplete) => RangeComplete,
|
||||
None => panic!(),
|
||||
};
|
||||
{
|
||||
// TODO refactor
|
||||
if tsj[0].as_mut().unwrap().0 == 0 {
|
||||
if let Some(item) = self.inp1_item.as_ref() {
|
||||
if item.forget_after_merge_process() {
|
||||
self.inp1_item.take();
|
||||
}
|
||||
}
|
||||
}
|
||||
if tsj[0].as_mut().unwrap().0 == 1 {
|
||||
if let Some(item) = self.inp2_item.as_ref() {
|
||||
if item.forget_after_merge_process() {
|
||||
self.inp2_item.take();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Break(Ready(Some(Ok(itemout))))
|
||||
}
|
||||
}
|
||||
|
||||
fn poll2(mut self: Pin<&mut Self>, cx: &mut Context) -> ControlFlow<Poll<Option<<Self as Stream>::Item>>> {
|
||||
use ControlFlow::*;
|
||||
use Poll::*;
|
||||
if self.inp1_item.is_none() && !self.inp1_done {
|
||||
match Pin::new(&mut self.inp1).poll_next(cx) {
|
||||
Ready(Some(Ok(k))) => {
|
||||
self.inp1_item = Some(k);
|
||||
Continue(())
|
||||
}
|
||||
Ready(Some(Err(e))) => Break(Ready(Some(Err(e)))),
|
||||
Ready(None) => {
|
||||
self.inp1_done = true;
|
||||
Continue(())
|
||||
}
|
||||
Pending => Break(Pending),
|
||||
}
|
||||
} else if self.inp2_item.is_none() && !self.inp2_done {
|
||||
match Pin::new(&mut self.inp2).poll_next(cx) {
|
||||
Ready(Some(Ok(k))) => {
|
||||
self.inp2_item = Some(k);
|
||||
Continue(())
|
||||
}
|
||||
Ready(Some(Err(e))) => Break(Ready(Some(Err(e)))),
|
||||
Ready(None) => {
|
||||
self.inp2_done = true;
|
||||
Continue(())
|
||||
}
|
||||
Pending => Break(Pending),
|
||||
}
|
||||
} else if self.inp1_item.is_some() || self.inp2_item.is_some() {
|
||||
let process_res = Self::process(self.as_mut(), cx);
|
||||
match process_res {
|
||||
Continue(()) => Continue(()),
|
||||
Break(k) => Break(k),
|
||||
}
|
||||
} else {
|
||||
self.done = true;
|
||||
Continue(())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct ChannelEventsMerger {
|
||||
_inp_1: Pin<Box<dyn Stream<Item = Result<Box<dyn MergableEvents>, Error>>>>,
|
||||
_inp_2: Pin<Box<dyn Stream<Item = Result<Box<dyn MergableEvents>, Error>>>>,
|
||||
impl fmt::Debug for ChannelEventsMerger {
|
||||
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
|
||||
fmt.debug_struct(std::any::type_name::<Self>())
|
||||
.field("inp1_done", &self.inp1_done)
|
||||
.field("inp2_done", &self.inp2_done)
|
||||
.field("inp1_item", &self.inp1_item)
|
||||
.field("inp2_item", &self.inp2_item)
|
||||
.field("out", &self.out)
|
||||
.field("range_complete", &self.range_complete)
|
||||
.field("done", &self.done)
|
||||
.field("complete", &self.complete)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl Stream for ChannelEventsMerger {
|
||||
type Item = Result<ChannelEvents, Error>;
|
||||
|
||||
fn poll_next(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Option<Self::Item>> {
|
||||
//use Poll::*;
|
||||
error!("TODO ChannelEventsMerger");
|
||||
err::todoval()
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
||||
use Poll::*;
|
||||
eprintln!("ChannelEventsMerger poll_next");
|
||||
loop {
|
||||
break if self.complete {
|
||||
panic!("poll after complete");
|
||||
} else if self.done {
|
||||
self.complete = true;
|
||||
Ready(None)
|
||||
} else {
|
||||
match Self::poll2(self.as_mut(), cx) {
|
||||
ControlFlow::Continue(()) => continue,
|
||||
ControlFlow::Break(k) => break k,
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
137
items_2/src/test.rs
Normal file
137
items_2/src/test.rs
Normal file
@@ -0,0 +1,137 @@
|
||||
use crate::eventsdim0::EventsDim0;
|
||||
use crate::{ChannelEvents, ChannelEventsMerger, ConnStatus, Empty};
|
||||
use crate::{ConnStatusEvent, Error};
|
||||
use futures_util::StreamExt;
|
||||
|
||||
#[test]
|
||||
fn merge01() {
|
||||
let fut = async {
|
||||
let mut events_vec1 = Vec::new();
|
||||
let mut events_vec2 = Vec::new();
|
||||
{
|
||||
let mut events = EventsDim0::empty();
|
||||
for i in 0..10 {
|
||||
events.push(i * 100, i, i as f32 * 100.);
|
||||
}
|
||||
events_vec1.push(Ok(ChannelEvents::Events(Box::new(events.clone()))));
|
||||
events_vec2.push(Ok(ChannelEvents::Events(Box::new(events.clone()))));
|
||||
}
|
||||
let inp1 = events_vec1;
|
||||
let inp1 = futures_util::stream::iter(inp1);
|
||||
let inp1 = Box::pin(inp1);
|
||||
let inp2: Vec<Result<ChannelEvents, Error>> = Vec::new();
|
||||
let inp2 = futures_util::stream::iter(inp2);
|
||||
let inp2 = Box::pin(inp2);
|
||||
let mut merger = ChannelEventsMerger::new(inp1, inp2);
|
||||
let item = merger.next().await;
|
||||
assert_eq!(item.as_ref(), events_vec2.get(0));
|
||||
let item = merger.next().await;
|
||||
assert_eq!(item.as_ref(), None);
|
||||
};
|
||||
tokio::runtime::Runtime::new().unwrap().block_on(fut);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn merge02() {
|
||||
let fut = async {
|
||||
let mut events_vec1 = Vec::new();
|
||||
let mut events_vec2 = Vec::new();
|
||||
{
|
||||
let mut events = EventsDim0::empty();
|
||||
for i in 0..10 {
|
||||
events.push(i * 100, i, i as f32 * 100.);
|
||||
}
|
||||
events_vec1.push(Ok(ChannelEvents::Events(Box::new(events.clone()))));
|
||||
events_vec2.push(Ok(ChannelEvents::Events(Box::new(events.clone()))));
|
||||
}
|
||||
{
|
||||
let mut events = EventsDim0::empty();
|
||||
for i in 10..20 {
|
||||
events.push(i * 100, i, i as f32 * 100.);
|
||||
}
|
||||
events_vec1.push(Ok(ChannelEvents::Events(Box::new(events.clone()))));
|
||||
events_vec2.push(Ok(ChannelEvents::Events(Box::new(events.clone()))));
|
||||
}
|
||||
let inp1 = events_vec1;
|
||||
let inp1 = futures_util::stream::iter(inp1);
|
||||
let inp1 = Box::pin(inp1);
|
||||
let inp2: Vec<Result<ChannelEvents, Error>> = Vec::new();
|
||||
let inp2 = futures_util::stream::iter(inp2);
|
||||
let inp2 = Box::pin(inp2);
|
||||
let mut merger = ChannelEventsMerger::new(inp1, inp2);
|
||||
let item = merger.next().await;
|
||||
assert_eq!(item.as_ref(), events_vec2.get(0));
|
||||
let item = merger.next().await;
|
||||
assert_eq!(item.as_ref(), events_vec2.get(1));
|
||||
let item = merger.next().await;
|
||||
assert_eq!(item.as_ref(), None);
|
||||
};
|
||||
tokio::runtime::Runtime::new().unwrap().block_on(fut);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn merge03() {
|
||||
let fut = async {
|
||||
let mut events_vec1 = Vec::new();
|
||||
{
|
||||
let mut events = EventsDim0::empty();
|
||||
for i in 0..10 {
|
||||
events.push(i * 100, i, i as f32 * 100.);
|
||||
}
|
||||
events_vec1.push(Ok(ChannelEvents::Events(Box::new(events))));
|
||||
let mut events = EventsDim0::empty();
|
||||
for i in 10..20 {
|
||||
events.push(i * 100, i, i as f32 * 100.);
|
||||
}
|
||||
events_vec1.push(Ok(ChannelEvents::Events(Box::new(events.clone()))));
|
||||
}
|
||||
let events_vec1 = events_vec1;
|
||||
let mut events_vec2 = Vec::new();
|
||||
{
|
||||
let mut events = EventsDim0::empty();
|
||||
for i in 0..10 {
|
||||
events.push(i * 100, i, i as f32 * 100.);
|
||||
}
|
||||
events_vec2.push(Ok(ChannelEvents::Events(Box::new(events.clone()))));
|
||||
let mut events = EventsDim0::empty();
|
||||
for i in 10..12 {
|
||||
events.push(i * 100, i, i as f32 * 100.);
|
||||
}
|
||||
events_vec2.push(Ok(ChannelEvents::Events(Box::new(events.clone()))));
|
||||
let mut events = EventsDim0::empty();
|
||||
for i in 12..20 {
|
||||
events.push(i * 100, i, i as f32 * 100.);
|
||||
}
|
||||
events_vec2.push(Ok(ChannelEvents::Events(Box::new(events.clone()))));
|
||||
}
|
||||
let events_vec2 = events_vec2;
|
||||
|
||||
let inp2_events_a: Vec<Result<_, Error>> = vec![Ok(ChannelEvents::Status(ConnStatusEvent {
|
||||
ts: 1199,
|
||||
status: ConnStatus::Disconnect,
|
||||
}))];
|
||||
let inp2_events_b: Vec<Result<_, Error>> = vec![Ok(ChannelEvents::Status(ConnStatusEvent {
|
||||
ts: 1199,
|
||||
status: ConnStatus::Disconnect,
|
||||
}))];
|
||||
|
||||
let inp1 = events_vec1;
|
||||
let inp1 = futures_util::stream::iter(inp1);
|
||||
let inp1 = Box::pin(inp1);
|
||||
let inp2: Vec<Result<ChannelEvents, Error>> = inp2_events_a;
|
||||
let inp2 = futures_util::stream::iter(inp2);
|
||||
let inp2 = Box::pin(inp2);
|
||||
let mut merger = ChannelEventsMerger::new(inp1, inp2);
|
||||
let item = merger.next().await;
|
||||
assert_eq!(item.as_ref(), events_vec2.get(0));
|
||||
let item = merger.next().await;
|
||||
assert_eq!(item.as_ref(), events_vec2.get(1));
|
||||
let item = merger.next().await;
|
||||
assert_eq!(item.as_ref(), inp2_events_b.get(0));
|
||||
let item = merger.next().await;
|
||||
assert_eq!(item.as_ref(), events_vec2.get(2));
|
||||
let item = merger.next().await;
|
||||
assert_eq!(item.as_ref(), None);
|
||||
};
|
||||
tokio::runtime::Runtime::new().unwrap().block_on(fut);
|
||||
}
|
||||
@@ -10,6 +10,7 @@ use netpod::timeunits::*;
|
||||
use netpod::{AggKind, ChannelTyped, ScalarType, Shape};
|
||||
use netpod::{PreBinnedPatchCoord, PreBinnedPatchIterator, PreBinnedPatchRange};
|
||||
use scylla::Session as ScySession;
|
||||
use std::collections::VecDeque;
|
||||
use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
use std::task::{Context, Poll};
|
||||
@@ -49,8 +50,8 @@ pub async fn read_cached_scylla(
|
||||
if counts.len() != avgs.len() {
|
||||
counts_mismatch = true;
|
||||
}
|
||||
let ts1s = edges[..(edges.len() - 1).min(edges.len())].to_vec();
|
||||
let ts2s = edges[1.min(edges.len())..].to_vec();
|
||||
let ts1s: VecDeque<_> = edges[..(edges.len() - 1).min(edges.len())].iter().map(|&x| x).collect();
|
||||
let ts2s: VecDeque<_> = edges[1.min(edges.len())..].iter().map(|&x| x).collect();
|
||||
if ts1s.len() != ts2s.len() {
|
||||
error!("ts1s vs ts2s mismatch");
|
||||
counts_mismatch = true;
|
||||
@@ -58,9 +59,9 @@ pub async fn read_cached_scylla(
|
||||
if ts1s.len() != counts.len() {
|
||||
counts_mismatch = true;
|
||||
}
|
||||
let avgs = avgs.into_iter().map(|x| x).collect::<Vec<_>>();
|
||||
let mins = mins.into_iter().map(|x| x as _).collect::<Vec<_>>();
|
||||
let maxs = maxs.into_iter().map(|x| x as _).collect::<Vec<_>>();
|
||||
let avgs: VecDeque<_> = avgs.into_iter().map(|x| x).collect();
|
||||
let mins: VecDeque<_> = mins.into_iter().map(|x| x as _).collect();
|
||||
let maxs: VecDeque<_> = maxs.into_iter().map(|x| x as _).collect();
|
||||
if counts_mismatch {
|
||||
error!(
|
||||
"mismatch: edges {} ts1s {} ts2s {} counts {} avgs {} mins {} maxs {}",
|
||||
@@ -73,7 +74,7 @@ pub async fn read_cached_scylla(
|
||||
maxs.len(),
|
||||
);
|
||||
}
|
||||
let counts: Vec<_> = counts.into_iter().map(|x| x as u64).collect();
|
||||
let counts: VecDeque<_> = counts.into_iter().map(|x| x as u64).collect();
|
||||
// TODO construct a dyn TimeBinned using the scalar type and shape information.
|
||||
// TODO place the values with little copying into the TimeBinned.
|
||||
use ScalarType::*;
|
||||
|
||||
Reference in New Issue
Block a user