WIP before remove of function bodies

This commit is contained in:
Dominik Werder
2023-03-09 13:58:13 +01:00
parent 3993fc0432
commit f262f7e9df
4 changed files with 73 additions and 40 deletions

View File

@@ -21,12 +21,13 @@ use items_0::TimeBinned;
use items_0::TimeBinner;
use items_0::TimeBins;
use items_0::WithLen;
use netpod::SeriesRange;
use netpod::log::*;
use netpod::timeunits::SEC;
use netpod::BinnedRange;
use netpod::BinnedRangeEnum;
use netpod::Dim0Kind;
use netpod::NanoRange;
use netpod::SeriesRange;
use num_traits::Zero;
use serde::Deserialize;
use serde::Serialize;
@@ -50,6 +51,7 @@ pub struct BinsDim0<NTY> {
pub mins: VecDeque<NTY>,
pub maxs: VecDeque<NTY>,
pub avgs: VecDeque<f32>,
pub dim0kind: Dim0Kind,
}
impl<NTY> fmt::Debug for BinsDim0<NTY>
@@ -163,7 +165,7 @@ where
}
impl<NTY> Empty for BinsDim0<NTY> {
fn empty() -> Self {
fn empty(dim0kind: Dim0Kind) -> Self {
Self {
ts1s: VecDeque::new(),
ts2s: VecDeque::new(),
@@ -171,6 +173,7 @@ impl<NTY> Empty for BinsDim0<NTY> {
mins: VecDeque::new(),
maxs: VecDeque::new(),
avgs: VecDeque::new(),
dim0kind,
}
}
}
@@ -190,7 +193,8 @@ impl<NTY> RangeOverlapInfo for BinsDim0<NTY> {
true
}
} else if range.is_pulse() {
if let Some(&max) = self.pulses.back() {
// TODO for the time being, the ts represent either ts or pulse
if let Some(&max) = self.ts2s.back() {
max <= range.beg_u64()
} else {
true
@@ -209,7 +213,7 @@ impl<NTY> RangeOverlapInfo for BinsDim0<NTY> {
true
}
} else if range.is_pulse() {
if let Some(&max) = self.pulses.back() {
if let Some(&max) = self.ts2s.back() {
max > range.end_u64()
} else {
true
@@ -228,7 +232,7 @@ impl<NTY> RangeOverlapInfo for BinsDim0<NTY> {
true
}
} else if range.is_pulse() {
if let Some(&min) = self.pulses.front() {
if let Some(&min) = self.ts1s.front() {
min >= range.end_u64()
} else {
true
@@ -391,9 +395,9 @@ impl<NTY: ScalarOps> ToJsonResult for BinsDim0CollectedResult<NTY> {
#[derive(Debug)]
pub struct BinsDim0Collector<NTY> {
vals: Option<BinsDim0<NTY>>,
timed_out: bool,
range_final: bool,
vals: BinsDim0<NTY>,
}
impl<NTY> BinsDim0Collector<NTY> {
@@ -401,7 +405,7 @@ impl<NTY> BinsDim0Collector<NTY> {
Self {
timed_out: false,
range_final: false,
vals: BinsDim0::<NTY>::empty(),
vals: None,
}
}
}

View File

@@ -17,6 +17,7 @@ use items_0::WithLen;
use netpod::log::*;
use netpod::timeunits::SEC;
use netpod::BinnedRangeEnum;
use netpod::Dim0Kind;
use netpod::NanoRange;
use netpod::SeriesRange;
use serde::Deserialize;
@@ -38,6 +39,7 @@ pub struct EventsDim0<NTY> {
pub tss: VecDeque<u64>,
pub pulses: VecDeque<u64>,
pub values: VecDeque<NTY>,
pub dim0kind: Dim0Kind,
}
impl<NTY> EventsDim0<NTY> {
@@ -83,11 +85,12 @@ where
}
impl<NTY> Empty for EventsDim0<NTY> {
fn empty() -> Self {
fn empty(dim0kind: Dim0Kind) -> Self {
Self {
tss: VecDeque::new(),
pulses: VecDeque::new(),
values: VecDeque::new(),
dim0kind,
}
}
}
@@ -203,7 +206,7 @@ where
#[derive(Debug)]
pub struct EventsDim0Collector<NTY> {
vals: EventsDim0<NTY>,
vals: Option<EventsDim0<NTY>>,
range_final: bool,
timed_out: bool,
}
@@ -211,7 +214,7 @@ pub struct EventsDim0Collector<NTY> {
impl<NTY> EventsDim0Collector<NTY> {
pub fn new() -> Self {
Self {
vals: EventsDim0::empty(),
vals: None,
range_final: false,
timed_out: false,
}
@@ -220,7 +223,7 @@ impl<NTY> EventsDim0Collector<NTY> {
impl<NTY> WithLen for EventsDim0Collector<NTY> {
fn len(&self) -> usize {
self.vals.tss.len()
self.vals.map_or(0, |x| x.tss.len())
}
}
@@ -340,25 +343,13 @@ impl<NTY: ScalarOps> items_0::collect_s::CollectorType for EventsDim0Collector<N
type Output = EventsDim0CollectorOutput<NTY>;
fn ingest(&mut self, src: &mut Self::Input) {
self.vals.tss.append(&mut src.tss);
self.vals.pulses.append(&mut src.pulses);
self.vals.values.append(&mut src.values);
if self.len() >= 2 {
let mut print = false;
let c = self.vals.tss.len();
if self.vals.tss[c - 2] + 1000000000 <= self.vals.tss[c - 1] {
print = true;
}
let c = self.vals.pulses.len();
if self.vals.pulses[c - 2] + 1000 <= self.vals.pulses[c - 1] {
print = true;
}
if print {
error!("gap detected\n{self:?}");
let bt = std::backtrace::Backtrace::capture();
error!("{bt}");
}
if self.vals.is_none() {
self.vals = Some(EventsDim0::empty(src.dim0kind.clone()));
}
let vals = self.vals.as_mut().unwrap();
vals.tss.append(&mut src.tss);
vals.pulses.append(&mut src.pulses);
vals.values.append(&mut src.values);
}
fn set_range_complete(&mut self) {
@@ -370,13 +361,19 @@ impl<NTY: ScalarOps> items_0::collect_s::CollectorType for EventsDim0Collector<N
}
fn result(&mut self, range: Option<NanoRange>, _binrange: Option<BinnedRangeEnum>) -> Result<Self::Output, Error> {
let self_name = any::type_name::<Self>();
// If we timed out, we want to hint the client from where to continue.
// This is tricky: currently, client can not request a left-exclusive range.
// We currently give the timestamp of the last event plus a small delta.
// The amount of the delta must take into account what kind of timestamp precision the client
// can parse and handle.
let vals = if let Some(x) = &mut self.vals {
x
} else {
return Err(Error::with_msg_no_trace(format!("{self_name} no vals")));
};
let continue_at = if self.timed_out {
if let Some(ts) = self.vals.tss.back() {
if let Some(ts) = vals.tss.back() {
Some(IsoDateTime::from_u64(*ts + netpod::timeunits::MS))
} else {
if let Some(range) = &range {
@@ -389,11 +386,11 @@ impl<NTY: ScalarOps> items_0::collect_s::CollectorType for EventsDim0Collector<N
} else {
None
};
let tss_sl = self.vals.tss.make_contiguous();
let pulses_sl = self.vals.pulses.make_contiguous();
let tss_sl = vals.tss.make_contiguous();
let pulses_sl = vals.pulses.make_contiguous();
let (ts_anchor_sec, ts_off_ms, ts_off_ns) = crate::ts_offs_from_abs(tss_sl);
let (pulse_anchor, pulse_off) = crate::pulse_offs_from_abs(pulses_sl);
let values = mem::replace(&mut self.vals.values, VecDeque::new());
let values = mem::replace(&mut vals.values, VecDeque::new());
if ts_off_ms.len() != ts_off_ns.len() {
return Err(Error::with_msg_no_trace("collected len mismatch"));
}
@@ -431,7 +428,7 @@ impl<NTY: ScalarOps> items_0::collect_s::CollectableType for EventsDim0<NTY> {
impl<NTY: ScalarOps> items_0::collect_c::Collector for EventsDim0Collector<NTY> {
fn len(&self) -> usize {
self.vals.len()
self.vals.map_or(0, |x| x.len())
}
fn ingest(&mut self, item: &mut dyn items_0::collect_c::Collectable) {
@@ -490,6 +487,10 @@ impl<NTY> Drop for EventsDim0Aggregator<NTY> {
}
impl<NTY: ScalarOps> EventsDim0Aggregator<NTY> {
fn self_name() -> String {
format!("{}<{}>", any::type_name::<Self>(), any::type_name::<NTY>())
}
pub fn new(binrange: SeriesRange, do_time_weight: bool) -> Self {
let int_ts = binrange.beg_u64();
Self {
@@ -657,6 +658,7 @@ impl<NTY: ScalarOps> EventsDim0Aggregator<NTY> {
mins: [min].into(),
maxs: [max].into(),
avgs: [avg].into(),
dim0kind: self.range.dim0kind(),
}
} else {
error!("TODO result_reset_unweight");
@@ -698,6 +700,7 @@ impl<NTY: ScalarOps> EventsDim0Aggregator<NTY> {
mins: [min].into(),
maxs: [max].into(),
avgs: [avg].into(),
dim0kind: self.range.dim0kind(),
}
} else {
error!("TODO result_reset_time_weight");
@@ -838,12 +841,17 @@ impl<STY: ScalarOps> Events for EventsDim0<STY> {
let tss = self.tss.drain(..n1).collect();
let pulses = self.pulses.drain(..n1).collect();
let values = self.values.drain(..n1).collect();
let ret = Self { tss, pulses, values };
let ret = Self {
tss,
pulses,
values,
dim0kind: self.dim0kind.clone(),
};
Box::new(ret)
}
fn new_empty(&self) -> Box<dyn Events> {
Box::new(Self::empty())
Box::new(Self::empty(self.dim0kind.clone()))
}
fn drain_into(&mut self, dst: &mut Box<dyn Events>, range: (usize, usize)) -> Result<(), items_0::MergeError> {
@@ -1088,7 +1096,7 @@ impl<NTY: ScalarOps> TimeBinner for EventsDim0TimeBinner<NTY> {
let range_next = self.next_bin_range();
self.rng = range_next.clone();
if let Some(range) = range_next {
let mut bins = BinsDim0::<NTY>::empty();
let mut bins = BinsDim0::empty(self.binrange.dim0kind());
if range.is_time() {
bins.append_zero(range.beg_u64(), range.end_u64());
} else {
@@ -1116,7 +1124,7 @@ impl<NTY: ScalarOps> TimeBinner for EventsDim0TimeBinner<NTY> {
}
fn empty(&self) -> Box<dyn items_0::TimeBinned> {
let ret = <EventsDim0Aggregator<NTY> as TimeBinnableTypeAggregator>::Output::empty();
let ret = <EventsDim0Aggregator<NTY> as TimeBinnableTypeAggregator>::Output::empty(self.binrange.dim0kind());
Box::new(ret)
}
}
@@ -1229,7 +1237,7 @@ mod test_frame {
fn events_bincode() {
taskrun::tracing_init().unwrap();
// core::result::Result<items::StreamItem<items::RangeCompletableItem<items_2::channelevents::ChannelEvents>>, err::Error>
let mut events = EventsDim0::empty();
let mut events = EventsDim0::empty(Dim0Kind::Time);
events.push(123, 234, 55f32);
let events = events;
let events: Box<dyn Events> = Box::new(events);