Fixes and refactoring

This commit is contained in:
Dominik Werder
2024-11-14 16:01:57 +01:00
parent bc7eb345a6
commit baf2c7f2d1
23 changed files with 696 additions and 659 deletions

View File

@@ -1,13 +1,9 @@
[package]
name = "daqbuf-items-2"
version = "0.0.3"
version = "0.0.4"
authors = ["Dominik Werder <dominik.werder@gmail.com>"]
edition = "2021"
[lib]
path = "src/items_2.rs"
doctest = false
[dependencies]
serde = { version = "1", features = ["derive"] }
serde_json = "1"

View File

@@ -7,5 +7,3 @@ pub mod valuetype;
#[cfg(test)]
mod test;
use super::binning as ___;

View File

@@ -2,7 +2,7 @@ pub trait BinnedValueType {}
pub struct BinnedNumericValue<EVT> {
avg: f32,
_t: Option<EVT>,
t: Option<EVT>,
}
impl<EVT> BinnedValueType for BinnedNumericValue<EVT> {}

View File

@@ -1,9 +1,6 @@
use super::aggregator::AggregatorNumeric;
use super::aggregator::AggregatorTimeWeight;
use super::container_events::EventValueType;
use super::___;
use crate::ts_offs_from_abs;
use crate::ts_offs_from_abs_with_anchor;
use crate::offsets::ts_offs_from_abs;
use crate::offsets::ts_offs_from_abs_with_anchor;
use core::fmt;
use daqbuf_err as err;
use err::thiserror;
@@ -19,7 +16,6 @@ use items_0::AsAnyRef;
use items_0::TypeName;
use items_0::WithLen;
use netpod::log::*;
use netpod::EnumVariant;
use netpod::TsNano;
use serde::Deserialize;
use serde::Serialize;
@@ -285,21 +281,6 @@ where
pp
}
pub fn pop_front(&mut self) -> Option<BinSingle<EVT>> {
todo!("pop_front");
let ts1 = if let Some(x) = self.ts1s.pop_front() {
x
} else {
return None;
};
let ts2 = if let Some(x) = self.ts2s.pop_front() {
x
} else {
return None;
};
todo!()
}
pub fn push_back(
&mut self,
ts1: TsNano,
@@ -562,8 +543,8 @@ where
fn result(
&mut self,
range: Option<netpod::range::evrange::SeriesRange>,
binrange: Option<netpod::BinnedRangeEnum>,
_range: Option<netpod::range::evrange::SeriesRange>,
_binrange: Option<netpod::BinnedRangeEnum>,
) -> Result<Box<dyn items_0::collect_s::CollectedDyn>, err::Error> {
// TODO do we need to set timeout, continueAt or anything?
let bins = mem::replace(&mut self.bins, ContainerBins::new());
@@ -633,7 +614,7 @@ where
}
fn fix_numerics(&mut self) {
for ((min, max), avg) in self
for ((_min, _max), _avg) in self
.mins
.iter_mut()
.zip(self.maxs.iter_mut())
@@ -675,17 +656,4 @@ where
pub fn len(&self) -> usize {
self.len
}
pub fn pop_front(&mut self) -> Option<BinSingle<EVT>> {
if self.len != 0 {
if let Some(ev) = self.evs.pop_front() {
self.len -= 1;
Some(ev)
} else {
None
}
} else {
None
}
}
}

View File

@@ -24,25 +24,31 @@ macro_rules! trace_init { ($($arg:tt)*) => ( if true { trace!($($arg)*); }) }
#[cstm(name = "ValueContainerError")]
pub enum ValueContainerError {}
pub trait Container<EVT>: fmt::Debug + Send + Clone + PreviewRange + Serialize + for<'a> Deserialize<'a> {
pub trait Container<EVT>:
fmt::Debug + Send + Clone + PreviewRange + Serialize + for<'a> Deserialize<'a>
where
EVT: EventValueType,
{
fn new() -> Self;
// fn verify(&self) -> Result<(), ValueContainerError>;
fn push_back(&mut self, val: EVT);
fn pop_front(&mut self) -> Option<EVT>;
fn get_iter_ty_1(&self, pos: usize) -> Option<EVT::IterTy1<'_>>;
}
pub trait PartialOrdEvtA<EVT> {
fn cmp_a(&self, other: &EVT) -> Option<std::cmp::Ordering>;
}
pub trait EventValueType: fmt::Debug + Clone + PartialOrd + Send + 'static + Serialize {
type Container: Container<Self>;
type AggregatorTimeWeight: AggregatorTimeWeight<Self>;
type AggTimeWeightOutputAvg: AggTimeWeightOutputAvg;
// fn identity_sum() -> Self;
// fn add_weighted(&self, add: &Self, f: f32) -> Self;
type IterTy1<'a>: fmt::Debug + Clone + PartialOrdEvtA<Self> + Into<Self>;
}
impl<EVT> Container<EVT> for VecDeque<EVT>
where
EVT: EventValueType + Serialize + for<'a> Deserialize<'a>,
EVT: for<'a> EventValueType<IterTy1<'a> = EVT> + Serialize + for<'a> Deserialize<'a>,
{
fn new() -> Self {
VecDeque::new()
@@ -55,6 +61,28 @@ where
fn pop_front(&mut self) -> Option<EVT> {
self.pop_front()
}
fn get_iter_ty_1(&self, pos: usize) -> Option<EVT::IterTy1<'_>> {
self.get(pos).map(|x| x.clone())
}
}
impl Container<String> for VecDeque<String> {
fn new() -> Self {
VecDeque::new()
}
fn push_back(&mut self, val: String) {
self.push_back(val);
}
fn pop_front(&mut self) -> Option<String> {
self.pop_front()
}
fn get_iter_ty_1(&self, pos: usize) -> Option<&str> {
todo!()
}
}
macro_rules! impl_event_value_type {
@@ -63,6 +91,13 @@ macro_rules! impl_event_value_type {
type Container = VecDeque<Self>;
type AggregatorTimeWeight = AggregatorNumeric;
type AggTimeWeightOutputAvg = f64;
type IterTy1<'a> = $evt;
}
impl PartialOrdEvtA<$evt> for $evt {
fn cmp_a(&self, other: &$evt) -> Option<std::cmp::Ordering> {
self.partial_cmp(other)
}
}
};
}
@@ -78,28 +113,74 @@ impl_event_value_type!(i64);
// impl_event_value_type!(f32);
// impl_event_value_type!(f64);
impl PartialOrdEvtA<f32> for f32 {
fn cmp_a(&self, other: &f32) -> Option<std::cmp::Ordering> {
self.partial_cmp(other)
}
}
impl PartialOrdEvtA<f64> for f64 {
fn cmp_a(&self, other: &f64) -> Option<std::cmp::Ordering> {
self.partial_cmp(other)
}
}
impl PartialOrdEvtA<bool> for bool {
fn cmp_a(&self, other: &bool) -> Option<std::cmp::Ordering> {
self.partial_cmp(other)
}
}
impl PartialOrdEvtA<String> for &str {
fn cmp_a(&self, other: &String) -> Option<std::cmp::Ordering> {
(*self).partial_cmp(other.as_str())
}
}
impl EventValueType for f32 {
type Container = VecDeque<Self>;
type AggregatorTimeWeight = AggregatorNumeric;
type AggTimeWeightOutputAvg = f32;
type IterTy1<'a> = f32;
}
impl EventValueType for f64 {
type Container = VecDeque<Self>;
type AggregatorTimeWeight = AggregatorNumeric;
type AggTimeWeightOutputAvg = f64;
type IterTy1<'a> = f64;
}
impl EventValueType for bool {
type Container = VecDeque<Self>;
type AggregatorTimeWeight = AggregatorNumeric;
type AggTimeWeightOutputAvg = f64;
type IterTy1<'a> = bool;
}
impl EventValueType for String {
type Container = VecDeque<Self>;
type AggregatorTimeWeight = AggregatorNumeric;
type AggTimeWeightOutputAvg = f64;
type IterTy1<'a> = &'a str;
}
#[derive(Debug, Clone)]
pub struct EventSingleRef<'a, EVT>
where
EVT: EventValueType,
{
pub ts: TsNano,
pub val: EVT::IterTy1<'a>,
}
impl<'a, EVT> EventSingleRef<'a, EVT>
where
EVT: EventValueType,
{
pub fn to_owned(&self) {
todo!()
}
}
#[derive(Debug, Clone)]
@@ -108,6 +189,30 @@ pub struct EventSingle<EVT> {
pub val: EVT,
}
impl<'a, EVT> From<EventSingleRef<'a, EVT>> for EventSingle<EVT>
where
EVT: EventValueType,
{
fn from(value: EventSingleRef<'a, EVT>) -> Self {
Self {
ts: value.ts,
val: value.val.into(),
}
}
}
impl<'a, EVT> From<&EventSingleRef<'a, EVT>> for EventSingle<EVT>
where
EVT: EventValueType,
{
fn from(value: &EventSingleRef<'a, EVT>) -> Self {
Self {
ts: value.ts,
val: value.val.clone().into(),
}
}
}
#[derive(Debug, ThisError)]
#[cstm(name = "EventsContainerError")]
pub enum EventsContainerError {
@@ -127,7 +232,10 @@ impl<EVT> ContainerEvents<EVT>
where
EVT: EventValueType,
{
pub fn from_constituents(tss: VecDeque<TsNano>, vals: <EVT as EventValueType>::Container) -> Self {
pub fn from_constituents(
tss: VecDeque<TsNano>,
vals: <EVT as EventValueType>::Container,
) -> Self {
Self { tss, vals }
}
@@ -147,7 +255,12 @@ where
}
pub fn verify(&self) -> Result<(), EventsContainerError> {
if self.tss.iter().zip(self.tss.iter().skip(1)).any(|(&a, &b)| a > b) {
if self
.tss
.iter()
.zip(self.tss.iter().skip(1))
.any(|(&a, &b)| a > b)
{
return Err(EventsContainerError::Unordered);
}
Ok(())
@@ -161,18 +274,15 @@ where
self.tss.back().map(|&x| x)
}
pub fn len_before(&self, end: TsNano) -> usize {
let pp = self.tss.partition_point(|&x| x < end);
assert!(pp <= self.len(), "len_before pp {} len {}", pp, self.len());
fn _len_before(&self, end: TsNano) -> usize {
let tss = &self.tss;
let pp = tss.partition_point(|&x| x < end);
assert!(pp <= tss.len(), "len_before pp {} len {}", pp, tss.len());
pp
}
pub fn pop_front(&mut self) -> Option<EventSingle<EVT>> {
if let (Some(ts), Some(val)) = (self.tss.pop_front(), self.vals.pop_front()) {
Some(EventSingle { ts, val })
} else {
None
}
fn _pop_front(&mut self) -> Option<EventSingleRef<EVT>> {
todo!()
}
pub fn push_back(&mut self, ts: TsNano, val: EVT) {
@@ -210,40 +320,58 @@ pub struct ContainerEventsTakeUpTo<'a, EVT>
where
EVT: EventValueType,
{
evs: &'a mut ContainerEvents<EVT>,
len: usize,
evs: &'a ContainerEvents<EVT>,
beg: usize,
end: usize,
pos: usize,
}
impl<'a, EVT> ContainerEventsTakeUpTo<'a, EVT>
where
EVT: EventValueType,
{
pub fn new(evs: &'a mut ContainerEvents<EVT>, len: usize) -> Self {
let len = len.min(evs.len());
Self { evs, len }
}
}
impl<'a, EVT> ContainerEventsTakeUpTo<'a, EVT>
where
EVT: EventValueType,
{
pub fn ts_first(&self) -> Option<TsNano> {
self.evs.ts_first()
pub fn new(evs: &'a ContainerEvents<EVT>) -> Self {
Self {
evs,
beg: 0,
end: evs.len(),
pos: 0,
}
}
pub fn ts_last(&self) -> Option<TsNano> {
self.evs.ts_last()
pub fn constrain_up_to_ts(&mut self, end: TsNano) {
let tss = &self.evs.tss;
let pp = tss.partition_point(|&x| x < end);
let pp = pp.max(self.pos);
assert!(pp <= tss.len(), "len_before pp {} len {}", pp, tss.len());
assert!(pp >= self.pos);
self.end = pp;
}
pub fn extend_to_all(&mut self) {
self.end = self.evs.len();
}
pub fn len(&self) -> usize {
self.len
self.end - self.pos
}
pub fn pop_front(&mut self) -> Option<EventSingle<EVT>> {
if self.len != 0 {
if let Some(ev) = self.evs.pop_front() {
self.len -= 1;
pub fn pos(&self) -> usize {
self.pos
}
pub fn ts_first(&self) -> Option<TsNano> {
self.evs.tss.get(self.pos).cloned()
}
pub fn next(&mut self) -> Option<EventSingleRef<EVT>> {
let evs = &self.evs;
if self.pos < self.end {
if let (Some(&ts), Some(val)) =
(evs.tss.get(self.pos), evs.vals.get_iter_ty_1(self.pos))
{
self.pos += 1;
let ev = EventSingleRef { ts, val };
Some(ev)
} else {
None

View File

@@ -1,7 +1,6 @@
mod events00;
use super::container_events::ContainerEvents;
use super::___;
use netpod::log::*;
use std::any;
#[test]

View File

@@ -41,7 +41,9 @@ trait IntoVecDequeU64 {
impl IntoVecDequeU64 for &str {
fn into_vec_deque_u64(self) -> VecDeque<u64> {
self.split_ascii_whitespace().map(|x| x.parse().unwrap()).collect()
self.split_ascii_whitespace()
.map(|x| x.parse().unwrap())
.collect()
}
}
trait IntoVecDequeF32 {
@@ -50,7 +52,9 @@ trait IntoVecDequeF32 {
impl IntoVecDequeF32 for &str {
fn into_vec_deque_f32(self) -> VecDeque<f32> {
self.split_ascii_whitespace().map(|x| x.parse().unwrap()).collect()
self.split_ascii_whitespace()
.map(|x| x.parse().unwrap())
.collect()
}
}
@@ -70,7 +74,10 @@ fn exp_u64<'a>(
}
if let (Some(&val), Some(&exp)) = (a, b) {
if val != exp {
return Err(Error::AssertMsg(format!("{tag} val {} exp {} i {}", val, exp, i)));
return Err(Error::AssertMsg(format!(
"{tag} val {} exp {} i {}",
val, exp, i
)));
}
} else {
return Err(Error::AssertMsg(format!("{tag} len mismatch")));
@@ -96,7 +103,10 @@ fn exp_f32<'a>(
}
if let (Some(&val), Some(&exp)) = (a, b) {
if netpod::f32_close(val, exp) == false {
return Err(Error::AssertMsg(format!("{tag} val {} exp {} i {}", val, exp, i)));
return Err(Error::AssertMsg(format!(
"{tag} val {} exp {} i {}",
val, exp, i
)));
}
} else {
return Err(Error::AssertMsg(format!("{tag} len mismatch")));
@@ -108,17 +118,29 @@ fn exp_f32<'a>(
#[cfg(test)]
fn exp_cnts(bins: &ContainerBins<f32>, exps: impl IntoVecDequeU64) -> Result<(), Error> {
exp_u64(bins.cnts_iter(), exps.into_vec_deque_u64().iter(), "exp_cnts")
exp_u64(
bins.cnts_iter(),
exps.into_vec_deque_u64().iter(),
"exp_cnts",
)
}
#[cfg(test)]
fn exp_mins(bins: &ContainerBins<f32>, exps: impl IntoVecDequeF32) -> Result<(), Error> {
exp_f32(bins.mins_iter(), exps.into_vec_deque_f32().iter(), "exp_mins")
exp_f32(
bins.mins_iter(),
exps.into_vec_deque_f32().iter(),
"exp_mins",
)
}
#[cfg(test)]
fn exp_maxs(bins: &ContainerBins<f32>, exps: impl IntoVecDequeF32) -> Result<(), Error> {
exp_f32(bins.maxs_iter(), exps.into_vec_deque_f32().iter(), "exp_maxs")
exp_f32(
bins.maxs_iter(),
exps.into_vec_deque_f32().iter(),
"exp_maxs",
)
}
fn exp_avgs(bins: &ContainerBins<f32>, exps: impl IntoVecDequeF32) -> Result<(), Error> {
@@ -135,7 +157,10 @@ fn exp_avgs(bins: &ContainerBins<f32>, exps: impl IntoVecDequeF32) -> Result<(),
if let (Some(a), Some(&exp)) = (a, b) {
let val = *a.avg as f32;
if netpod::f32_close(val, exp) == false {
return Err(Error::AssertMsg(format!("exp_avgs val {} exp {} i {}", val, exp, i)));
return Err(Error::AssertMsg(format!(
"exp_avgs val {} exp {} i {}",
val, exp, i
)));
}
} else {
return Err(Error::AssertMsg(format!(
@@ -161,7 +186,7 @@ fn test_bin_events_f32_simple_with_before_00() -> Result<(), Error> {
let mut binner = BinnedEventsTimeweight::new(range);
let mut evs = ContainerEvents::<f32>::new();
evs.push_back(TsNano::from_ms(103), 2.0);
binner.ingest(evs)?;
binner.ingest(&evs)?;
binner.input_done_range_final()?;
let bins = binner.output();
exp_cnts(&bins, "0")?;
@@ -186,7 +211,7 @@ fn test_bin_events_f32_simple_with_before_01_range_final() -> Result<(), Error>
let mut evs = ContainerEvents::<f32>::new();
let em = &mut evs;
pu(em, 103, 2.0);
binner.ingest(evs)?;
binner.ingest(&evs)?;
binner.input_done_range_final()?;
let bins = binner.output();
exp_cnts(&bins, "0 0")?;
@@ -212,13 +237,13 @@ fn test_bin_events_f32_simple_00() -> Result<(), Error> {
let em = &mut evs;
pu(em, 100, 2.0);
pu(em, 104, 2.4);
binner.ingest(evs)?;
binner.ingest(&evs)?;
let mut evs = ContainerEvents::<f32>::new();
let em = &mut evs;
pu(em, 111, 1.0);
pu(em, 112, 1.2);
pu(em, 113, 1.4);
binner.ingest(evs)?;
binner.ingest(&evs)?;
binner.input_done_range_open()?;
let bins = binner.output();
for b in bins.iter_debug() {
@@ -247,13 +272,13 @@ fn test_bin_events_f32_simple_01() -> Result<(), Error> {
let em = &mut evs;
pu(em, 102, 2.0);
pu(em, 104, 2.4);
binner.ingest(evs)?;
binner.ingest(&evs)?;
let mut evs = ContainerEvents::<f32>::new();
let em = &mut evs;
pu(em, 111, 1.0);
pu(em, 112, 1.2);
pu(em, 113, 1.4);
binner.ingest(evs)?;
binner.ingest(&evs)?;
binner.input_done_range_open()?;
let bins = binner.output();
for b in bins.iter_debug() {
@@ -283,13 +308,13 @@ fn test_bin_events_f32_small_range_final() -> Result<(), Error> {
let em = &mut evs;
pu(em, 102, 2.0);
pu(em, 104, 2.4);
binner.ingest(evs)?;
binner.ingest(&evs)?;
let mut evs = ContainerEvents::<f32>::new();
let em = &mut evs;
pu(em, 111, 1.0);
pu(em, 112, 1.2);
pu(em, 113, 1.4);
binner.ingest(evs)?;
binner.ingest(&evs)?;
binner.input_done_range_final()?;
let bins = binner.output();
for b in bins.iter_debug() {
@@ -319,12 +344,12 @@ fn test_bin_events_f32_small_intermittent_silence_range_open() -> Result<(), Err
let em = &mut evs;
pu(em, 102, 2.0);
pu(em, 104, 2.4);
binner.ingest(evs)?;
binner.ingest(&evs)?;
let mut evs = ContainerEvents::<f32>::new();
let em = &mut evs;
pu(em, 111, 1.0);
pu(em, 112, 1.2);
binner.ingest(evs)?;
binner.ingest(&evs)?;
// TODO take bins already here and assert.
// TODO combine all bins together for combined assert.
let mut evs = ContainerEvents::<f32>::new();
@@ -332,7 +357,7 @@ fn test_bin_events_f32_small_intermittent_silence_range_open() -> Result<(), Err
pu(em, 113, 1.4);
pu(em, 146, 1.3);
pu(em, 148, 1.2);
binner.ingest(evs)?;
binner.ingest(&evs)?;
binner.input_done_range_open()?;
let bins = binner.output();
for b in bins.iter_debug() {
@@ -352,22 +377,23 @@ fn test_bin_events_f32_small_intermittent_silence_range_open() -> Result<(), Err
fn test_bin_events_f32_small_intermittent_silence_range_final() -> Result<(), Error> {
let beg = TsNano::from_ms(100);
let end = TsNano::from_ms(150);
let bin_len = DtMs::from_ms_u64(10);
let nano_range = NanoRange {
beg: beg.ns(),
end: end.ns(),
};
let range = BinnedRange::from_nano_range(nano_range, DtMs::from_ms_u64(10));
let range = BinnedRange::from_nano_range(nano_range, bin_len);
let mut binner = BinnedEventsTimeweight::new(range);
let mut evs = ContainerEvents::<f32>::new();
let em = &mut evs;
pu(em, 102, 2.0);
pu(em, 104, 2.4);
binner.ingest(evs)?;
binner.ingest(&evs)?;
let mut evs = ContainerEvents::<f32>::new();
let em = &mut evs;
pu(em, 111, 1.0);
pu(em, 112, 1.2);
binner.ingest(evs)?;
binner.ingest(&evs)?;
// TODO take bins already here and assert.
// TODO combine all bins together for combined assert.
let mut evs = ContainerEvents::<f32>::new();
@@ -375,7 +401,7 @@ fn test_bin_events_f32_small_intermittent_silence_range_final() -> Result<(), Er
pu(em, 113, 1.4);
pu(em, 146, 1.3);
pu(em, 148, 1.2);
binner.ingest(evs)?;
binner.ingest(&evs)?;
binner.input_done_range_final()?;
let bins = binner.output();
for b in bins.iter_debug() {
@@ -391,7 +417,8 @@ fn test_bin_events_f32_small_intermittent_silence_range_final() -> Result<(), Er
}
#[test]
fn test_bin_events_f32_small_intermittent_silence_minmax_no_edge_range_final() -> Result<(), Error> {
fn test_bin_events_f32_small_intermittent_silence_minmax_no_edge_range_final() -> Result<(), Error>
{
let beg = TsNano::from_ms(110);
let end = TsNano::from_ms(120);
let nano_range = NanoRange {
@@ -403,7 +430,7 @@ fn test_bin_events_f32_small_intermittent_silence_minmax_no_edge_range_final() -
let mut evs = ContainerEvents::<f32>::new();
let em = &mut evs;
pu(em, 109, 50.);
binner.ingest(evs)?;
binner.ingest(&evs)?;
let mut evs = ContainerEvents::<f32>::new();
let em = &mut evs;
pu(em, 111, 40.);
@@ -415,7 +442,7 @@ fn test_bin_events_f32_small_intermittent_silence_minmax_no_edge_range_final() -
// pu(em, 120, 1.4);
// pu(em, 146, 1.3);
// pu(em, 148, 1.2);
binner.ingest(evs)?;
binner.ingest(&evs)?;
binner.input_done_range_final()?;
let bins = binner.output();
for b in bins.iter_debug() {
@@ -442,7 +469,7 @@ fn test_bin_events_f32_small_intermittent_silence_minmax_edge_range_final() -> R
let mut evs = ContainerEvents::<f32>::new();
let em = &mut evs;
pu(em, 109, 50.);
binner.ingest(evs)?;
binner.ingest(&evs)?;
let mut evs = ContainerEvents::<f32>::new();
let em = &mut evs;
pu(em, 110, 40.);
@@ -454,7 +481,7 @@ fn test_bin_events_f32_small_intermittent_silence_minmax_edge_range_final() -> R
// pu(em, 120, 1.4);
// pu(em, 146, 1.3);
// pu(em, 148, 1.2);
binner.ingest(evs)?;
binner.ingest(&evs)?;
binner.input_done_range_final()?;
let bins = binner.output();
for b in bins.iter_debug() {
@@ -481,7 +508,7 @@ fn test_bin_events_enum_simple_range_final() -> Result<(), Error> {
let mut evs = ContainerEvents::new();
evs.push_back(TsNano::from_ms(103), EnumVariant::new(1, "one"));
evs.push_back(TsNano::from_ms(104), EnumVariant::new(2, "two"));
binner.ingest(evs)?;
binner.ingest(&evs)?;
binner.input_done_range_final()?;
let bins = binner.output();
Ok(())

View File

@@ -3,7 +3,6 @@ pub mod timeweight_bins_dyn;
pub mod timeweight_events;
pub mod timeweight_events_dyn;
use super::___;
use netpod::log::*;
#[allow(unused)]

View File

@@ -1,4 +1,3 @@
use super::___;
use netpod::log::*;
#[allow(unused)]

View File

@@ -4,6 +4,8 @@ use crate::binning::container_bins::ContainerBins;
use crate::binning::container_events::ContainerEvents;
use crate::binning::container_events::ContainerEventsTakeUpTo;
use crate::binning::container_events::EventSingle;
use crate::binning::container_events::EventSingleRef;
use crate::binning::container_events::PartialOrdEvtA;
use core::fmt;
use daqbuf_err as err;
use err::thiserror;
@@ -14,41 +16,25 @@ use netpod::DtNano;
use netpod::TsNano;
use std::mem;
#[allow(unused)]
macro_rules! trace_ { ($($arg:tt)*) => ( if true { trace!($($arg)*); }) }
macro_rules! trace_ { ($($arg:tt)*) => ( if true { eprintln!($($arg)*); }) }
#[allow(unused)]
macro_rules! trace_init { ($($arg:tt)*) => ( if false { trace_!($($arg)*); }) }
macro_rules! trace_init { ($($arg:tt)*) => ( if true { trace_!($($arg)*); }) }
#[allow(unused)]
macro_rules! trace_cycle { ($($arg:tt)*) => ( if false { trace_!($($arg)*); }) }
macro_rules! trace_cycle { ($($arg:tt)*) => ( if true { trace_!($($arg)*); }) }
#[allow(unused)]
macro_rules! trace_event_next { ($($arg:tt)*) => ( if false { trace_!($($arg)*); }) }
macro_rules! trace_event_next { ($($arg:tt)*) => ( if true { trace_!($($arg)*); }) }
#[allow(unused)]
macro_rules! trace_ingest_init_lst { ($($arg:tt)*) => ( if false { trace_!($($arg)*); }) }
macro_rules! trace_ingest_init_lst { ($($arg:tt)*) => ( if true { trace_!($($arg)*); }) }
#[allow(unused)]
macro_rules! trace_ingest_minmax { ($($arg:tt)*) => ( if false { trace_!($($arg)*); }) }
macro_rules! trace_ingest_minmax { ($($arg:tt)*) => ( if true { trace_!($($arg)*); }) }
#[allow(unused)]
macro_rules! trace_ingest_event { ($($arg:tt)*) => ( if false { trace_!($($arg)*); }) }
macro_rules! trace_ingest_event { ($($arg:tt)*) => ( if true { trace_!($($arg)*); }) }
#[allow(unused)]
macro_rules! trace_ingest_firsts { ($($arg:tt)*) => ( if false { trace_!($($arg)*); }) }
macro_rules! trace_ingest_container { ($($arg:tt)*) => ( if true { trace_!($($arg)*); }) }
#[allow(unused)]
macro_rules! trace_ingest_finish_bin { ($($arg:tt)*) => ( if false { trace_!($($arg)*); }) }
macro_rules! trace_ingest_container_2 { ($($arg:tt)*) => ( if true { trace_!($($arg)*); }) }
#[allow(unused)]
macro_rules! trace_ingest_container { ($($arg:tt)*) => ( if false { trace_!($($arg)*); }) }
#[allow(unused)]
macro_rules! trace_ingest_container_2 { ($($arg:tt)*) => ( if false { trace_!($($arg)*); }) }
#[allow(unused)]
macro_rules! trace_fill_until { ($($arg:tt)*) => ( if false { trace_!($($arg)*); }) }
macro_rules! trace_fill_until { ($($arg:tt)*) => ( if true { trace_!($($arg)*); }) }
#[cold]
#[inline]
@@ -98,7 +84,11 @@ where
EVT: EventValueType,
{
// NOTE that this is also used during bin-cycle.
fn ingest_event_with_lst_gt_range_beg_agg(&mut self, ev: EventSingle<EVT>, lst: LstRef<EVT>) {
fn ingest_event_with_lst_gt_range_beg_agg(
&mut self,
ev: EventSingleRef<EVT>,
lst: LstRef<EVT>,
) {
let selfname = "ingest_event_with_lst_gt_range_beg_agg";
trace_ingest_event!("{selfname} {:?}", ev);
if DEBUG_CHECKS {
@@ -120,7 +110,11 @@ where
self.filled_until = ev.ts;
}
fn ingest_event_with_lst_gt_range_beg_2(&mut self, ev: EventSingle<EVT>, lst: LstMut<EVT>) -> Result<(), Error> {
fn ingest_event_with_lst_gt_range_beg_2(
&mut self,
ev: EventSingleRef<EVT>,
lst: LstMut<EVT>,
) -> Result<(), Error> {
let selfname = "ingest_event_with_lst_gt_range_beg_2";
trace_ingest_event!("{selfname}");
self.ingest_event_with_lst_gt_range_beg_agg(ev.clone(), LstRef(lst.0));
@@ -131,7 +125,7 @@ where
fn ingest_event_with_lst_gt_range_beg(
&mut self,
ev: EventSingle<EVT>,
ev: EventSingleRef<EVT>,
lst: LstMut<EVT>,
minmax: &mut MinMax<EVT>,
) -> Result<(), Error> {
@@ -146,7 +140,7 @@ where
fn ingest_event_with_lst_eq_range_beg(
&mut self,
ev: EventSingle<EVT>,
ev: EventSingleRef<EVT>,
lst: LstMut<EVT>,
minmax: &mut MinMax<EVT>,
) -> Result<(), Error> {
@@ -161,13 +155,13 @@ where
fn ingest_with_lst_gt_range_beg(
&mut self,
mut evs: ContainerEventsTakeUpTo<EVT>,
evs: &mut ContainerEventsTakeUpTo<EVT>,
lst: LstMut<EVT>,
minmax: &mut MinMax<EVT>,
) -> Result<(), Error> {
let selfname = "ingest_with_lst_gt_range_beg";
trace_ingest_event!("{selfname}");
while let Some(ev) = evs.pop_front() {
trace_ingest_event!("{selfname} len {}", evs.len());
while let Some(ev) = evs.next() {
trace_event_next!("EVENT POP FRONT {:?} {:30}", ev, selfname);
if ev.ts <= self.active_beg {
panic!("should never get here");
@@ -183,41 +177,42 @@ where
fn ingest_with_lst_ge_range_beg(
&mut self,
mut evs: ContainerEventsTakeUpTo<EVT>,
evs: &mut ContainerEventsTakeUpTo<EVT>,
lst: LstMut<EVT>,
minmax: &mut MinMax<EVT>,
) -> Result<(), Error> {
let selfname = "ingest_with_lst_ge_range_beg";
trace_ingest_event!("{selfname}");
while let Some(ev) = evs.pop_front() {
trace_ingest_event!("{selfname} len {}", evs.len());
while let Some(ev) = evs.next() {
trace_event_next!("EVENT POP FRONT {:?} {:30}", ev, selfname);
if ev.ts < self.active_beg {
panic!("should never get here");
}
if ev.ts >= self.active_end {
panic!("should never get here");
}
assert!(ev.ts >= self.active_beg);
assert!(ev.ts < self.active_end);
if ev.ts == self.active_beg {
trace_ingest_event!("{selfname} ts == active_beg");
self.ingest_event_with_lst_eq_range_beg(ev, LstMut(lst.0), minmax)?;
self.cnt += 1;
} else {
trace_ingest_event!("{selfname} ts != active_beg");
self.ingest_event_with_lst_gt_range_beg(ev.clone(), LstMut(lst.0), minmax)?;
self.cnt += 1;
trace_ingest_event!("{selfname} now calling ingest_with_lst_gt_range_beg");
return self.ingest_with_lst_gt_range_beg(evs, LstMut(lst.0), minmax);
break;
}
}
Ok(())
trace_ingest_event!(
"{selfname} defer remainder to ingest_with_lst_gt_range_beg len {}",
evs.len()
);
self.ingest_with_lst_gt_range_beg(evs, LstMut(lst.0), minmax)
}
fn ingest_with_lst_minmax(
&mut self,
evs: ContainerEventsTakeUpTo<EVT>,
evs: &mut ContainerEventsTakeUpTo<EVT>,
lst: LstMut<EVT>,
minmax: &mut MinMax<EVT>,
) -> Result<(), Error> {
let selfname = "ingest_with_lst_minmax";
trace_ingest_event!("{selfname}");
trace_ingest_event!("{selfname} len {}", evs.len());
// TODO how to handle the min max? I don't take event data yet out of the container.
if let Some(ts0) = evs.ts_first() {
trace_ingest_event!("EVENT POP FRONT {selfname}");
@@ -260,38 +255,50 @@ impl<EVT> InnerA<EVT>
where
EVT: EventValueType,
{
fn apply_min_max(ev: &EventSingle<EVT>, minmax: &mut MinMax<EVT>) {
if ev.val < minmax.0.val {
minmax.0 = ev.clone();
fn apply_min_max(ev: &EventSingleRef<EVT>, minmax: &mut MinMax<EVT>) {
if let Some(std::cmp::Ordering::Less) = ev.val.cmp_a(&minmax.0.val) {
minmax.0 = ev.into();
}
if ev.val > minmax.1.val {
minmax.1 = ev.clone();
if let Some(std::cmp::Ordering::Greater) = ev.val.cmp_a(&minmax.1.val) {
minmax.1 = ev.into();
}
// if ev.val < minmax.0.val {
// minmax.0 = ev.into();
// }
// if ev.val > minmax.1.val {
// minmax.1 = ev.into();
// }
}
fn apply_lst_after_event_handled(ev: EventSingle<EVT>, lst: LstMut<EVT>) {
*lst.0 = ev;
fn apply_lst_after_event_handled(ev: EventSingleRef<EVT>, lst: LstMut<EVT>) {
*lst.0 = ev.into();
}
fn init_minmax(&mut self, ev: &EventSingle<EVT>) {
fn init_minmax(&mut self, ev: &EventSingleRef<EVT>) {
trace_ingest_minmax!("init_minmax {:?}", ev);
self.minmax = Some((ev.clone(), ev.clone()));
self.minmax = Some((ev.into(), ev.into()));
}
fn init_minmax_with_lst(&mut self, ev: &EventSingle<EVT>, lst: LstRef<EVT>) {
fn init_minmax_with_lst(&mut self, ev: &EventSingleRef<EVT>, lst: LstRef<EVT>) {
trace_ingest_minmax!("init_minmax_with_lst {:?} {:?}", ev, lst.0);
let minmax = self.minmax.insert((lst.0.clone(), lst.0.clone()));
Self::apply_min_max(ev, minmax);
}
fn ingest_with_lst(&mut self, mut evs: ContainerEventsTakeUpTo<EVT>, lst: LstMut<EVT>) -> Result<(), Error> {
fn ingest_with_lst(
&mut self,
evs: &mut ContainerEventsTakeUpTo<EVT>,
lst: LstMut<EVT>,
) -> Result<(), Error> {
let selfname = "ingest_with_lst";
trace_ingest_container!("{selfname} evs len {}", evs.len());
trace_ingest_container!("{selfname} len {}", evs.len());
let b = &mut self.inner_b;
if let Some(minmax) = self.minmax.as_mut() {
b.ingest_with_lst_minmax(evs, lst, minmax)
} else {
if let Some(ev) = evs.pop_front() {
let mut run_ingest_with_lst_minmax = false;
let _ = run_ingest_with_lst_minmax;
if let Some(ev) = evs.next() {
trace_event_next!("EVENT POP FRONT {:?} {selfname:30}", ev);
let beg = b.active_beg;
let end = b.active_end;
@@ -305,23 +312,31 @@ where
InnerA::apply_lst_after_event_handled(ev, lst);
let b = &mut self.inner_b;
b.cnt += 1;
Ok(())
return Ok(());
} else {
self.init_minmax_with_lst(&ev, LstRef(lst.0));
let b = &mut self.inner_b;
if let Some(minmax) = self.minmax.as_mut() {
{
if ev.ts == beg {
panic!("logic error, is handled before");
} else {
b.ingest_event_with_lst_gt_range_beg_2(ev, LstMut(lst.0))?;
}
b.cnt += 1;
b.ingest_with_lst_minmax(evs, lst, minmax)
} else {
Err(Error::NoMinMaxAfterInit)
run_ingest_with_lst_minmax = true;
}
}
}
} else {
return Ok(());
}
if run_ingest_with_lst_minmax {
if let Some(minmax) = self.minmax.as_mut() {
let b = &mut self.inner_b;
b.ingest_with_lst_minmax(evs, lst, minmax)
} else {
return Err(Error::NoMinMaxAfterInit);
}
} else {
Ok(())
}
@@ -348,7 +363,12 @@ where
self.minmax = Some((lst.0.clone(), lst.0.clone()));
}
fn push_out_and_reset(&mut self, lst: LstRef<EVT>, range_final: bool, out: &mut ContainerBins<EVT>) {
fn push_out_and_reset(
&mut self,
lst: LstRef<EVT>,
range_final: bool,
out: &mut ContainerBins<EVT>,
) {
let selfname = "push_out_and_reset";
// TODO there is not always good enough input to produce a meaningful bin.
// TODO can we always reset, and what exactly does reset mean here?
@@ -406,6 +426,7 @@ where
EVT: EventValueType,
{
pub fn new(range: BinnedRange<TsNano>) -> Self {
trace_init!("BinnedEventsTimeweight::new {}", range);
let active_beg = range.nano_beg();
let active_end = active_beg.add_dt_nano(range.bin_len.to_dt_nano());
let active_len = active_end.delta(active_beg);
@@ -419,7 +440,9 @@ where
active_len,
filled_until: active_beg,
filled_width: DtNano::from_ns(0),
agg: <<EVT as EventValueType>::AggregatorTimeWeight as AggregatorTimeWeight<EVT>>::new(),
agg: <<EVT as EventValueType>::AggregatorTimeWeight as AggregatorTimeWeight<
EVT,
>>::new(),
},
minmax: None,
},
@@ -435,14 +458,14 @@ where
ret
}
fn ingest_event_without_lst(&mut self, ev: EventSingle<EVT>) -> Result<(), Error> {
fn ingest_event_without_lst(&mut self, ev: EventSingleRef<EVT>) -> Result<(), Error> {
let selfname = "ingest_event_without_lst";
let b = &self.inner_a.inner_b;
if ev.ts >= b.active_end {
panic!("{selfname} should never get here");
} else {
trace_ingest_init_lst!("ingest_event_without_lst set lst {:?}", ev);
self.lst = Some(ev.clone());
self.lst = Some((&ev).into());
if ev.ts >= b.active_beg {
trace_ingest_minmax!("ingest_event_without_lst");
self.inner_a.init_minmax(&ev);
@@ -454,19 +477,24 @@ where
}
}
fn ingest_without_lst(&mut self, mut evs: ContainerEventsTakeUpTo<EVT>) -> Result<(), Error> {
fn ingest_without_lst(&mut self, evs: &mut ContainerEventsTakeUpTo<EVT>) -> Result<(), Error> {
let selfname = "ingest_without_lst";
if let Some(ev) = evs.pop_front() {
trace_ingest_container!("{selfname} len {}", evs.len());
let mut run_ingest_with_lst = false;
let _ = run_ingest_with_lst;
if let Some(ev) = evs.next() {
trace_event_next!("EVENT POP FRONT {:?} {:30}", ev, selfname);
if ev.ts >= self.inner_a.inner_b.active_end {
panic!("{selfname} should never get here");
assert!(ev.ts < self.inner_a.inner_b.active_end);
self.ingest_event_without_lst(ev)?;
run_ingest_with_lst = true;
} else {
return Ok(());
}
if run_ingest_with_lst {
if let Some(lst) = self.lst.as_mut() {
self.inner_a.ingest_with_lst(evs, LstMut(lst))
} else {
self.ingest_event_without_lst(ev)?;
if let Some(lst) = self.lst.as_mut() {
self.inner_a.ingest_with_lst(evs, LstMut(lst))
} else {
Err(Error::NoLstAfterFirst)
}
Err(Error::NoLstAfterFirst)
}
} else {
Ok(())
@@ -475,7 +503,12 @@ where
// Caller asserts that evs is ordered within the current container
// and with respect to the last container, if any.
fn ingest_ordered(&mut self, evs: ContainerEventsTakeUpTo<EVT>) -> Result<(), Error> {
fn ingest_ordered(&mut self, evs: &mut ContainerEventsTakeUpTo<EVT>) -> Result<(), Error> {
let selfname = "ingest_ordered";
trace_ingest_container!(
"------------------------------------\n{selfname} len {}",
evs.len()
);
if let Some(lst) = self.lst.as_mut() {
self.inner_a.ingest_with_lst(evs, LstMut(lst))
} else {
@@ -508,7 +541,8 @@ where
if b.filled_until < b.active_end {
self.inner_a.inner_b.fill_until(b.active_end, lst.clone());
}
self.inner_a.push_out_and_reset(lst.clone(), true, &mut self.out);
self.inner_a
.push_out_and_reset(lst.clone(), true, &mut self.out);
} else {
self.inner_a.inner_b.fill_until(ts, lst.clone());
}
@@ -523,7 +557,8 @@ where
if b.filled_until < b.active_end {
self.inner_a.inner_b.fill_until(b.active_end, lst.clone());
}
self.inner_a.push_out_and_reset(lst.clone(), true, &mut self.out);
self.inner_a
.push_out_and_reset(lst.clone(), true, &mut self.out);
} else {
// TODO should not hit this case. Prove it, assert it.
self.inner_a.inner_b.fill_until(ts, lst.clone());
@@ -572,7 +607,7 @@ where
}
}
pub fn ingest(&mut self, mut evs_all: ContainerEvents<EVT>) -> Result<(), Error> {
pub fn ingest(&mut self, evs: &ContainerEvents<EVT>) -> Result<(), Error> {
// It is this type's task to find and store the one-before event.
// We then pass it to the aggregation.
// AggregatorTimeWeight needs a function for that.
@@ -583,41 +618,60 @@ where
// ALSO: need to keep track of the "lst". Probably best done in this type as well?
// TODO should rely on external stream adapter for verification to not duplicate things.
evs_all.verify()?;
evs.verify()?;
let mut evs = ContainerEventsTakeUpTo::new(evs);
loop {
break if let Some(ts) = evs_all.ts_first() {
trace_ingest_container!(
"main-ingest-loop UNCONSTRAINED len {} pos {}",
evs.len(),
evs.pos()
);
break if let Some(ts) = evs.ts_first() {
trace_ingest_event!("EVENT TIMESTAMP FRONT {:?} ingest", ts);
let b = &mut self.inner_a.inner_b;
if ts >= self.range.nano_end() {
return Err(Error::EventAfterRange);
}
if ts >= b.active_end {
assert!(b.filled_until < b.active_end, "{} < {}", b.filled_until, b.active_end);
assert!(
b.filled_until < b.active_end,
"{} < {}",
b.filled_until,
b.active_end
);
self.cycle_01(ts);
}
let n1 = evs_all.len();
let len_before = evs_all.len_before(self.inner_a.inner_b.active_end);
let evs = ContainerEventsTakeUpTo::new(&mut evs_all, len_before);
if let Some(lst) = self.lst.as_ref() {
if ts < lst.ts {
return Err(Error::Unordered);
let n1 = evs.len();
evs.constrain_up_to_ts(self.inner_a.inner_b.active_end);
{
trace_ingest_container!(
"main-ingest-loop len {} pos {}",
evs.len(),
evs.pos()
);
if let Some(lst) = self.lst.as_ref() {
if ts < lst.ts {
return Err(Error::Unordered);
} else {
self.ingest_ordered(&mut evs)?
}
} else {
self.ingest_ordered(evs)?
}
self.ingest_ordered(&mut evs)?
};
trace_ingest_container_2!("ingest after still left len evs {}", evs.len());
}
evs.extend_to_all();
let n2 = evs.len();
if n2 == 0 {
// done
} else {
self.ingest_ordered(evs)?
};
trace_ingest_container_2!("ingest after still left len evs {}", evs_all.len());
let n2 = evs_all.len();
if n2 != 0 {
if n2 == n1 {
panic!("no progress");
}
assert!(n2 < n1, "no progress");
continue;
}
} else {
()
// done
};
}
Ok(())

View File

@@ -56,22 +56,32 @@ impl<EVT> BinnedEventsTimeweightTrait for BinnedEventsTimeweightDynbox<EVT>
where
EVT: EventValueType,
{
fn ingest(&mut self, mut evs: EventsBoxed) -> Result<(), BinningggError> {
fn ingest(&mut self, evs: &EventsBoxed) -> Result<(), BinningggError> {
// let a = (&evs as &dyn any::Any).downcast_ref::<String>();
// evs.downcast::<String>();
// evs.as_anybox().downcast::<ContainerEvents<f64>>();
match evs.to_anybox().downcast::<ContainerEvents<EVT>>() {
Ok(evs) => {
let evs = {
let a = evs;
*a
// match evs.to_anybox().downcast::<ContainerEvents<EVT>>() {
// Ok(evs) => {
// let evs = {
// let a = evs;
// *a
// };
// Ok(self.binner.ingest(evs)?)
// }
// Err(_) => Err(BinningggError::TypeMismatch {
// have: evs.type_name().into(),
// expect: std::any::type_name::<ContainerEvents<EVT>>().into(),
// }),
// }
match evs.as_any_ref().downcast_ref::<ContainerEvents<EVT>>() {
Some(evs) => Ok(self.binner.ingest(evs)?),
None => {
let e = BinningggError::TypeMismatch {
have: evs.type_name().into(),
expect: std::any::type_name::<ContainerEvents<EVT>>().into(),
};
Ok(self.binner.ingest(evs)?)
Err(e)
}
Err(_) => Err(BinningggError::TypeMismatch {
have: evs.type_name().into(),
expect: std::any::type_name::<ContainerEvents<EVT>>().into(),
}),
}
}
@@ -109,10 +119,10 @@ impl BinnedEventsTimeweightLazy {
}
impl BinnedEventsTimeweightTrait for BinnedEventsTimeweightLazy {
fn ingest(&mut self, evs_all: EventsBoxed) -> Result<(), BinningggError> {
fn ingest(&mut self, evs: &EventsBoxed) -> Result<(), BinningggError> {
self.binned_events
.get_or_insert_with(|| evs_all.binned_events_timeweight_traitobj(self.range.clone()))
.ingest(evs_all)
.get_or_insert_with(|| evs.binned_events_timeweight_traitobj(self.range.clone()))
.ingest(evs)
}
fn input_done_range_final(&mut self) -> Result<(), BinningggError> {
@@ -133,7 +143,10 @@ impl BinnedEventsTimeweightTrait for BinnedEventsTimeweightLazy {
}
fn output(&mut self) -> Result<Option<BinsBoxed>, BinningggError> {
self.binned_events.as_mut().map(|x| x.output()).unwrap_or(Ok(None))
self.binned_events
.as_mut()
.map(|x| x.output())
.unwrap_or(Ok(None))
}
}
@@ -151,7 +164,10 @@ pub struct BinnedEventsTimeweightStream {
}
impl BinnedEventsTimeweightStream {
pub fn new(range: BinnedRange<TsNano>, inp: Pin<Box<dyn Stream<Item = Sitemty<ChannelEvents>> + Send>>) -> Self {
pub fn new(
range: BinnedRange<TsNano>,
inp: Pin<Box<dyn Stream<Item = Sitemty<ChannelEvents>> + Send>>,
) -> Self {
Self {
state: StreamState::Reading,
inp,
@@ -173,7 +189,10 @@ impl BinnedEventsTimeweightStream {
Ok(x) => match x {
DataItem(x) => match x {
Data(x) => match x {
ChannelEvents::Events(evs) => match self.binned_events.ingest(evs.to_container_events()) {
ChannelEvents::Events(evs) => match self
.binned_events
.ingest(&evs.to_container_events())
{
Ok(()) => {
match self.binned_events.output() {
Ok(Some(x)) => {
@@ -210,7 +229,10 @@ impl BinnedEventsTimeweightStream {
}
}
fn handle_eos(mut self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Option<<Self as Stream>::Item>> {
fn handle_eos(
mut self: Pin<&mut Self>,
_cx: &mut Context,
) -> Poll<Option<<Self as Stream>::Item>> {
trace_input_container!("handle_eos");
use items_0::streamitem::RangeCompletableItem::*;
use items_0::streamitem::StreamItem::*;
@@ -225,7 +247,11 @@ impl BinnedEventsTimeweightStream {
.input_done_range_open()
.map_err(err::Error::from_string)?;
}
match self.binned_events.output().map_err(err::Error::from_string)? {
match self
.binned_events
.output()
.map_err(err::Error::from_string)?
{
Some(x) => {
trace_emit!("seeing ready bins {:?}", x);
Ready(Some(Ok(DataItem(Data(x)))))
@@ -237,7 +263,10 @@ impl BinnedEventsTimeweightStream {
}
}
fn handle_main(mut self: Pin<&mut Self>, cx: &mut Context) -> ControlFlow<Poll<Option<<Self as Stream>::Item>>> {
fn handle_main(
mut self: Pin<&mut Self>,
cx: &mut Context,
) -> ControlFlow<Poll<Option<<Self as Stream>::Item>>> {
use ControlFlow::*;
use Poll::*;
let ret = match &self.state {

View File

@@ -1,10 +1,12 @@
use super::aggregator::AggregatorTimeWeight;
use super::container_events::Container;
use super::container_events::EventValueType;
use super::container_events::PartialOrdEvtA;
use core::fmt;
use items_0::vecpreview::PreviewRange;
use netpod::DtNano;
use netpod::EnumVariant;
use netpod::EnumVariantRef;
use serde::Deserialize;
use serde::Serialize;
use std::collections::VecDeque;
@@ -46,6 +48,18 @@ impl Container<EnumVariant> for EnumVariantContainer {
None
}
}
fn get_iter_ty_1(&self, pos: usize) -> Option<<EnumVariant as EventValueType>::IterTy1<'_>> {
if let (Some(&ix), Some(name)) = (self.ixs.get(pos), self.names.get(pos)) {
let ret = EnumVariantRef {
ix,
name: name.as_str(),
};
Some(ret)
} else {
None
}
}
}
#[derive(Debug)]
@@ -78,8 +92,26 @@ impl AggregatorTimeWeight<EnumVariant> for EnumVariantAggregatorTimeWeight {
}
}
impl<'a> PartialOrdEvtA<EnumVariant> for EnumVariantRef<'a> {
fn cmp_a(&self, other: &EnumVariant) -> Option<std::cmp::Ordering> {
use std::cmp::Ordering::*;
let x = self.ix.partial_cmp(&other.ix());
if let Some(Equal) = x {
let x = self.name.partial_cmp(other.name());
if let Some(Equal) = x {
Some(Equal)
} else {
x
}
} else {
x
}
}
}
impl EventValueType for EnumVariant {
type Container = EnumVariantContainer;
type AggregatorTimeWeight = EnumVariantAggregatorTimeWeight;
type AggTimeWeightOutputAvg = f32;
type IterTy1<'a> = EnumVariantRef<'a>;
}

View File

@@ -28,8 +28,6 @@ use std::collections::VecDeque;
use std::time::Duration;
use std::time::SystemTime;
macro_rules! trace_ingest { ($($arg:tt)*) => ( if true { trace!($($arg)*); }) }
// TODO maybe rename to ChannelStatus?
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub enum ConnStatus {

View File

@@ -279,7 +279,11 @@ pub struct EventsDim0CollectorOutput<STY> {
range_final: bool,
#[serde(rename = "timedOut", default, skip_serializing_if = "is_false")]
timed_out: bool,
#[serde(rename = "continueAt", default, skip_serializing_if = "Option::is_none")]
#[serde(
rename = "continueAt",
default,
skip_serializing_if = "Option::is_none"
)]
continue_at: Option<IsoDateTime>,
}
@@ -440,8 +444,8 @@ impl<STY: ScalarOps> CollectorTy for EventsDim0Collector<STY> {
};
let tss_sl = vals.tss.make_contiguous();
let pulses_sl = vals.pulses.make_contiguous();
let (ts_anchor_sec, ts_off_ms, ts_off_ns) = crate::ts_offs_from_abs(tss_sl);
let (pulse_anchor, pulse_off) = crate::pulse_offs_from_abs(pulses_sl);
let (ts_anchor_sec, ts_off_ms, ts_off_ns) = crate::offsets::ts_offs_from_abs(tss_sl);
let (pulse_anchor, pulse_off) = crate::offsets::pulse_offs_from_abs(pulses_sl);
let values = mem::replace(&mut vals.values, VecDeque::new());
if ts_off_ms.len() != ts_off_ns.len() {
return Err(Error::with_msg_no_trace("collected len mismatch"));
@@ -478,27 +482,6 @@ impl<STY: ScalarOps> items_0::collect_s::CollectableType for EventsDim0<STY> {
}
}
#[derive(Debug)]
pub struct EventsDim0Aggregator<STY> {
range: SeriesRange,
count: u64,
minmaxlst: Option<(STY, STY, STY)>,
sumc: u64,
sum: f32,
int_ts: u64,
last_ts: u64,
do_time_weight: bool,
events_ignored_count: u64,
items_seen: usize,
}
impl<STY> Drop for EventsDim0Aggregator<STY> {
fn drop(&mut self) {
// TODO collect as stats for the request context:
trace!("count {} ignored {}", self.count, self.events_ignored_count);
}
}
impl<STY> TypeName for EventsDim0<STY> {
fn type_name(&self) -> String {
let self_name = any::type_name::<Self>();
@@ -583,7 +566,11 @@ impl<STY: ScalarOps> Events for EventsDim0<STY> {
let tss = self.tss.drain(..n1).collect();
let pulses = self.pulses.drain(..n1).collect();
let values = self.values.drain(..n1).collect();
let ret = Self { tss, pulses, values };
let ret = Self {
tss,
pulses,
values,
};
Box::new(ret)
}
@@ -591,7 +578,11 @@ impl<STY: ScalarOps> Events for EventsDim0<STY> {
Box::new(Self::empty())
}
fn drain_into_evs(&mut self, dst: &mut dyn Events, range: (usize, usize)) -> Result<(), MergeError> {
fn drain_into_evs(
&mut self,
dst: &mut dyn Events,
range: (usize, usize),
) -> Result<(), MergeError> {
// TODO as_any and as_any_mut are declared on unrelated traits. Simplify.
if let Some(dst) = dst.as_any_mut().downcast_mut::<Self>() {
// TODO make it harder to forget new members when the struct may get modified in the future
@@ -607,7 +598,7 @@ impl<STY: ScalarOps> Events for EventsDim0<STY> {
dst.type_name()
);
panic!();
Err(MergeError::NotCompatible)
// Err(MergeError::NotCompatible)
}
}
@@ -696,8 +687,8 @@ impl<STY: ScalarOps> Events for EventsDim0<STY> {
let mut values = self.values.clone();
let tss_sl = tss.make_contiguous();
let pulses_sl = pulses.make_contiguous();
let (ts_anchor_sec, ts_off_ms, ts_off_ns) = crate::ts_offs_from_abs(tss_sl);
let (pulse_anchor, pulse_off) = crate::pulse_offs_from_abs(pulses_sl);
let (ts_anchor_sec, ts_off_ms, ts_off_ns) = crate::offsets::ts_offs_from_abs(tss_sl);
let (pulse_anchor, pulse_off) = crate::offsets::pulse_offs_from_abs(pulses_sl);
let values = mem::replace(&mut values, VecDeque::new());
let ret = EventsDim0CollectorOutput {
ts_anchor_sec,
@@ -759,7 +750,10 @@ impl<STY: ScalarOps> Events for EventsDim0<STY> {
try_to_container_events!(bool, self);
try_to_container_events!(String, self);
let this = self;
if let Some(evs) = self.as_any_ref().downcast_ref::<EventsDim0<netpod::EnumVariant>>() {
if let Some(evs) = self
.as_any_ref()
.downcast_ref::<EventsDim0<netpod::EnumVariant>>()
{
use crate::binning::container_events::ContainerEvents;
let tss = this.tss.iter().map(|&x| TsNano::from_ns(x)).collect();
use crate::binning::container_events::Container;

View File

@@ -4,7 +4,6 @@ use items_0::collect_s::CollectableDyn;
use items_0::collect_s::CollectedDyn;
use items_0::collect_s::CollectorDyn;
use items_0::collect_s::CollectorTy;
use items_0::collect_s::ToJsonBytes;
use items_0::collect_s::ToJsonResult;
use items_0::container::ByteEstimate;
use items_0::isodate::IsoDateTime;
@@ -88,11 +87,19 @@ pub struct EventsDim0EnumCollectorOutput {
vals: VecDeque<u16>,
#[serde(rename = "valuestrings")]
valstrs: VecDeque<String>,
#[serde(rename = "rangeFinal", default, skip_serializing_if = "netpod::is_false")]
#[serde(
rename = "rangeFinal",
default,
skip_serializing_if = "netpod::is_false"
)]
range_final: bool,
#[serde(rename = "timedOut", default, skip_serializing_if = "netpod::is_false")]
timed_out: bool,
#[serde(rename = "continueAt", default, skip_serializing_if = "Option::is_none")]
#[serde(
rename = "continueAt",
default,
skip_serializing_if = "Option::is_none"
)]
continue_at: Option<IsoDateTime>,
}
@@ -154,7 +161,7 @@ impl CollectorTy for EventsDim0EnumCollector {
fn result(
&mut self,
range: Option<SeriesRange>,
binrange: Option<BinnedRangeEnum>,
_binrange: Option<BinnedRangeEnum>,
) -> Result<EventsDim0EnumCollectorOutput, Error> {
trace_collect_result!(
"{} result() needs_continue_at {}",
@@ -188,7 +195,7 @@ impl CollectorTy for EventsDim0EnumCollector {
None
};
let tss_sl = vals.tss.make_contiguous();
let (ts_anchor_sec, ts_off_ms, ts_off_ns) = crate::ts_offs_from_abs(tss_sl);
let (ts_anchor_sec, ts_off_ms, ts_off_ns) = crate::offsets::ts_offs_from_abs(tss_sl);
let valixs = mem::replace(&mut vals.values, VecDeque::new());
let valstrs = mem::replace(&mut vals.valuestrs, VecDeque::new());
let vals = valixs;
@@ -291,7 +298,7 @@ impl TimeBinnerTy for EventsDim0EnumTimeBinner {
type Input = EventsDim0Enum;
type Output = ();
fn ingest(&mut self, item: &mut Self::Input) {
fn ingest(&mut self, _item: &mut Self::Input) {
todo!()
}
@@ -307,7 +314,7 @@ impl TimeBinnerTy for EventsDim0EnumTimeBinner {
todo!()
}
fn push_in_progress(&mut self, push_empty: bool) {
fn push_in_progress(&mut self, _push_empty: bool) {
todo!()
}
@@ -330,9 +337,9 @@ impl TimeBinnableTy for EventsDim0Enum {
fn time_binner_new(
&self,
binrange: BinnedRangeEnum,
do_time_weight: bool,
emit_empty_bins: bool,
_binrange: BinnedRangeEnum,
_do_time_weight: bool,
_emit_empty_bins: bool,
) -> Self::TimeBinner {
todo!()
}
@@ -377,7 +384,7 @@ impl Events for EventsDim0Enum {
todo!()
}
fn take_new_events_until_ts(&mut self, ts_end: u64) -> Box<dyn Events> {
fn take_new_events_until_ts(&mut self, _ts_end: u64) -> Box<dyn Events> {
todo!()
}
@@ -385,19 +392,23 @@ impl Events for EventsDim0Enum {
todo!()
}
fn drain_into_evs(&mut self, dst: &mut dyn Events, range: (usize, usize)) -> Result<(), items_0::MergeError> {
fn drain_into_evs(
&mut self,
_dst: &mut dyn Events,
_range: (usize, usize),
) -> Result<(), items_0::MergeError> {
todo!()
}
fn find_lowest_index_gt_evs(&self, ts: u64) -> Option<usize> {
fn find_lowest_index_gt_evs(&self, _ts: u64) -> Option<usize> {
todo!()
}
fn find_lowest_index_ge_evs(&self, ts: u64) -> Option<usize> {
fn find_lowest_index_ge_evs(&self, _ts: u64) -> Option<usize> {
todo!()
}
fn find_highest_index_lt_evs(&self, ts: u64) -> Option<usize> {
fn find_highest_index_lt_evs(&self, _ts: u64) -> Option<usize> {
todo!()
}
@@ -405,7 +416,7 @@ impl Events for EventsDim0Enum {
todo!()
}
fn partial_eq_dyn(&self, other: &dyn Events) -> bool {
fn partial_eq_dyn(&self, _other: &dyn Events) -> bool {
todo!()
}

View File

@@ -400,8 +400,8 @@ impl<STY: ScalarOps> CollectorTy for EventsDim1Collector<STY> {
};
let tss_sl = vals.tss.make_contiguous();
let pulses_sl = vals.pulses.make_contiguous();
let (ts_anchor_sec, ts_off_ms, ts_off_ns) = crate::ts_offs_from_abs(tss_sl);
let (pulse_anchor, pulse_off) = crate::pulse_offs_from_abs(pulses_sl);
let (ts_anchor_sec, ts_off_ms, ts_off_ns) = crate::offsets::ts_offs_from_abs(tss_sl);
let (pulse_anchor, pulse_off) = crate::offsets::pulse_offs_from_abs(pulses_sl);
let values = mem::replace(&mut vals.values, VecDeque::new());
if ts_off_ms.len() != ts_off_ns.len() {
return Err(Error::with_msg_no_trace("collected len mismatch"));

View File

@@ -202,7 +202,7 @@ fn test_frame_log() {
let item: Sitemty<ChannelEvents> = Ok(StreamItem::Log(item));
let buf = Framable::make_frame_dyn(&item).unwrap();
let len = u32::from_le_bytes(buf[12..16].try_into().unwrap());
let item2: LogItem = decode_from_slice(&buf[20..20 + len as usize]).unwrap();
let _item2: LogItem = decode_from_slice(&buf[20..20 + len as usize]).unwrap();
}
#[test]
@@ -217,5 +217,6 @@ fn test_frame_error() {
panic!("bad tyid");
}
eprintln!("buf len {} len {}", buf.len(), len);
let item2: items_0::streamitem::SitemErrTy = json_from_slice(&buf[20..20 + len as usize]).unwrap();
let _item2: items_0::streamitem::SitemErrTy =
json_from_slice(&buf[20..20 + len as usize]).unwrap();
}

View File

@@ -1,182 +0,0 @@
pub mod accounting;
pub mod binning;
pub mod channelevents;
pub mod empty;
pub mod eventfull;
pub mod eventsdim0;
pub mod eventsdim0enum;
pub mod eventsdim1;
pub mod framable;
pub mod frame;
pub mod inmem;
pub mod merger;
pub mod streams;
#[cfg(feature = "heavy")]
#[cfg(test)]
pub mod test;
pub mod testgen;
pub mod transform;
use channelevents::ChannelEvents;
use daqbuf_err as err;
use futures_util::Stream;
use items_0::isodate::IsoDateTime;
use items_0::streamitem::Sitemty;
use items_0::transform::EventTransform;
use items_0::Events;
use items_0::MergeError;
use merger::Mergeable;
use netpod::timeunits::*;
use std::collections::VecDeque;
use std::fmt;
pub fn ts_offs_from_abs(tss: &[u64]) -> (u64, VecDeque<u64>, VecDeque<u64>) {
let ts_anchor_sec = tss.first().map_or(0, |&k| k) / SEC;
let ts_anchor_ns = ts_anchor_sec * SEC;
let ts_off_ms: VecDeque<_> = tss.iter().map(|&k| (k - ts_anchor_ns) / MS).collect();
let ts_off_ns = tss
.iter()
.zip(ts_off_ms.iter().map(|&k| k * MS))
.map(|(&j, k)| (j - ts_anchor_ns - k))
.collect();
(ts_anchor_sec, ts_off_ms, ts_off_ns)
}
pub fn ts_offs_from_abs_with_anchor(
ts_anchor_sec: u64,
tss: &[u64],
) -> (VecDeque<u64>, VecDeque<u64>) {
let ts_anchor_ns = ts_anchor_sec * SEC;
let ts_off_ms: VecDeque<_> = tss.iter().map(|&k| (k - ts_anchor_ns) / MS).collect();
let ts_off_ns = tss
.iter()
.zip(ts_off_ms.iter().map(|&k| k * MS))
.map(|(&j, k)| (j - ts_anchor_ns - k))
.collect();
(ts_off_ms, ts_off_ns)
}
pub fn pulse_offs_from_abs(pulse: &[u64]) -> (u64, VecDeque<u64>) {
let pulse_anchor = pulse.first().map_or(0, |&k| k) / 10000 * 10000;
let pulse_off = pulse.iter().map(|&k| k - pulse_anchor).collect();
(pulse_anchor, pulse_off)
}
#[derive(Debug, PartialEq)]
pub enum ErrorKind {
General,
#[allow(unused)]
MismatchedType,
}
// TODO stack error better
#[derive(Debug, PartialEq)]
pub struct Error {
#[allow(unused)]
kind: ErrorKind,
msg: Option<String>,
}
impl fmt::Display for Error {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
write!(fmt, "{self:?}")
}
}
impl From<ErrorKind> for Error {
fn from(kind: ErrorKind) -> Self {
Self { kind, msg: None }
}
}
impl From<String> for Error {
fn from(msg: String) -> Self {
Self {
msg: Some(msg),
kind: ErrorKind::General,
}
}
}
// TODO this discards structure
impl From<err::Error> for Error {
fn from(e: err::Error) -> Self {
Self {
msg: Some(format!("{e}")),
kind: ErrorKind::General,
}
}
}
// TODO this discards structure
impl From<Error> for err::Error {
fn from(e: Error) -> Self {
err::Error::with_msg_no_trace(format!("{e}"))
}
}
impl std::error::Error for Error {}
impl serde::de::Error for Error {
fn custom<T>(msg: T) -> Self
where
T: fmt::Display,
{
format!("{msg}").into()
}
}
pub fn make_iso_ts(tss: &[u64]) -> Vec<IsoDateTime> {
tss.iter().map(|&k| IsoDateTime::from_ns_u64(k)).collect()
}
impl Mergeable for Box<dyn Events> {
fn ts_min(&self) -> Option<u64> {
self.as_ref().ts_min()
}
fn ts_max(&self) -> Option<u64> {
self.as_ref().ts_max()
}
fn new_empty(&self) -> Self {
self.as_ref().new_empty_evs()
}
fn clear(&mut self) {
Events::clear(self.as_mut())
}
fn drain_into(&mut self, dst: &mut Self, range: (usize, usize)) -> Result<(), MergeError> {
self.as_mut().drain_into_evs(dst, range)
}
fn find_lowest_index_gt(&self, ts: u64) -> Option<usize> {
self.as_ref().find_lowest_index_gt_evs(ts)
}
fn find_lowest_index_ge(&self, ts: u64) -> Option<usize> {
self.as_ref().find_lowest_index_ge_evs(ts)
}
fn find_highest_index_lt(&self, ts: u64) -> Option<usize> {
self.as_ref().find_highest_index_lt_evs(ts)
}
fn tss(&self) -> Vec<netpod::TsMs> {
Events::tss(self)
.iter()
.map(|x| netpod::TsMs::from_ns_u64(*x))
.collect()
}
}
pub trait ChannelEventsInput:
Stream<Item = Sitemty<ChannelEvents>> + EventTransform + Send
{
}
impl<T> ChannelEventsInput for T where
T: Stream<Item = Sitemty<ChannelEvents>> + EventTransform + Send
{
}

91
src/lib.rs Normal file
View File

@@ -0,0 +1,91 @@
pub mod accounting;
pub mod binning;
pub mod channelevents;
pub mod empty;
pub mod eventfull;
pub mod eventsdim0;
pub mod eventsdim0enum;
pub mod eventsdim1;
pub mod framable;
pub mod frame;
pub mod inmem;
pub mod merger;
pub mod offsets;
pub mod streams;
#[cfg(feature = "heavy")]
#[cfg(test)]
pub mod test;
pub mod testgen;
use daqbuf_err as err;
use items_0::isodate::IsoDateTime;
use items_0::Events;
use std::fmt;
#[derive(Debug, PartialEq)]
pub enum ErrorKind {
General,
#[allow(unused)]
MismatchedType,
}
// TODO stack error better
#[derive(Debug, PartialEq)]
pub struct Error {
#[allow(unused)]
kind: ErrorKind,
msg: Option<String>,
}
impl fmt::Display for Error {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
write!(fmt, "{self:?}")
}
}
impl From<ErrorKind> for Error {
fn from(kind: ErrorKind) -> Self {
Self { kind, msg: None }
}
}
impl From<String> for Error {
fn from(msg: String) -> Self {
Self {
msg: Some(msg),
kind: ErrorKind::General,
}
}
}
// TODO this discards structure
impl From<err::Error> for Error {
fn from(e: err::Error) -> Self {
Self {
msg: Some(format!("{e}")),
kind: ErrorKind::General,
}
}
}
// TODO this discards structure
impl From<Error> for err::Error {
fn from(e: Error) -> Self {
err::Error::with_msg_no_trace(format!("{e}"))
}
}
impl std::error::Error for Error {}
impl serde::de::Error for Error {
fn custom<T>(msg: T) -> Self
where
T: fmt::Display,
{
format!("{msg}").into()
}
}
pub fn make_iso_ts(tss: &[u64]) -> Vec<IsoDateTime> {
tss.iter().map(|&k| IsoDateTime::from_ns_u64(k)).collect()
}

View File

@@ -8,9 +8,6 @@ use items_0::streamitem::LogItem;
use items_0::streamitem::RangeCompletableItem;
use items_0::streamitem::Sitemty;
use items_0::streamitem::StreamItem;
use items_0::transform::EventTransform;
use items_0::transform::TransformProperties;
use items_0::transform::WithTransformProperties;
use items_0::Events;
use items_0::MergeError;
use items_0::WithLen;
@@ -46,6 +43,47 @@ pub trait Mergeable<Rhs = Self>: fmt::Debug + WithLen + ByteEstimate + Unpin {
fn tss(&self) -> Vec<TsMs>;
}
impl Mergeable for Box<dyn Events> {
fn ts_min(&self) -> Option<u64> {
self.as_ref().ts_min()
}
fn ts_max(&self) -> Option<u64> {
self.as_ref().ts_max()
}
fn new_empty(&self) -> Self {
self.as_ref().new_empty_evs()
}
fn clear(&mut self) {
Events::clear(self.as_mut())
}
fn drain_into(&mut self, dst: &mut Self, range: (usize, usize)) -> Result<(), MergeError> {
self.as_mut().drain_into_evs(dst, range)
}
fn find_lowest_index_gt(&self, ts: u64) -> Option<usize> {
self.as_ref().find_lowest_index_gt_evs(ts)
}
fn find_lowest_index_ge(&self, ts: u64) -> Option<usize> {
self.as_ref().find_lowest_index_ge_evs(ts)
}
fn find_highest_index_lt(&self, ts: u64) -> Option<usize> {
self.as_ref().find_highest_index_lt_evs(ts)
}
fn tss(&self) -> Vec<netpod::TsMs> {
Events::tss(self)
.iter()
.map(|x| netpod::TsMs::from_ns_u64(*x))
.collect()
}
}
type MergeInp<T> = Pin<Box<dyn Stream<Item = Sitemty<T>> + Send>>;
pub struct Merger<T> {
@@ -479,18 +517,3 @@ where
}
}
}
impl<T> WithTransformProperties for Merger<T> {
fn query_transform_properties(&self) -> TransformProperties {
todo!()
}
}
impl<T> EventTransform for Merger<T>
where
T: Send,
{
fn transform(&mut self, _src: Box<dyn Events>) -> Box<dyn Events> {
todo!()
}
}

35
src/offsets.rs Normal file
View File

@@ -0,0 +1,35 @@
use netpod::timeunits::MS;
use netpod::timeunits::SEC;
use std::collections::VecDeque;
pub fn ts_offs_from_abs(tss: &[u64]) -> (u64, VecDeque<u64>, VecDeque<u64>) {
let ts_anchor_sec = tss.first().map_or(0, |&k| k) / SEC;
let ts_anchor_ns = ts_anchor_sec * SEC;
let ts_off_ms: VecDeque<_> = tss.iter().map(|&k| (k - ts_anchor_ns) / MS).collect();
let ts_off_ns = tss
.iter()
.zip(ts_off_ms.iter().map(|&k| k * MS))
.map(|(&j, k)| (j - ts_anchor_ns - k))
.collect();
(ts_anchor_sec, ts_off_ms, ts_off_ns)
}
pub fn ts_offs_from_abs_with_anchor(
ts_anchor_sec: u64,
tss: &[u64],
) -> (VecDeque<u64>, VecDeque<u64>) {
let ts_anchor_ns = ts_anchor_sec * SEC;
let ts_off_ms: VecDeque<_> = tss.iter().map(|&k| (k - ts_anchor_ns) / MS).collect();
let ts_off_ns = tss
.iter()
.zip(ts_off_ms.iter().map(|&k| k * MS))
.map(|(&j, k)| (j - ts_anchor_ns - k))
.collect();
(ts_off_ms, ts_off_ns)
}
pub fn pulse_offs_from_abs(pulse: &[u64]) -> (u64, VecDeque<u64>) {
let pulse_anchor = pulse.first().map_or(0, |&k| k) / 10000 * 10000;
let pulse_off = pulse.iter().map(|&k| k - pulse_anchor).collect();
(pulse_anchor, pulse_off)
}

View File

@@ -6,7 +6,6 @@ use items_0::streamitem::RangeCompletableItem;
use items_0::streamitem::Sitemty;
use items_0::streamitem::StreamItem;
use items_0::transform::EventStreamTrait;
use items_0::transform::EventTransform;
use items_0::transform::TransformProperties;
use items_0::transform::WithTransformProperties;
use items_0::Events;
@@ -21,10 +20,7 @@ pub struct Enumerate2<T> {
}
impl<T> Enumerate2<T> {
pub fn new(inp: T) -> Self
where
T: EventTransform,
{
pub fn new(inp: T) -> Self {
Self { inp, cnt: 0 }
}
}
@@ -58,15 +54,6 @@ where
}
}
impl<T> EventTransform for Enumerate2<T>
where
T: WithTransformProperties + Send,
{
fn transform(&mut self, src: Box<dyn Events>) -> Box<dyn Events> {
todo!()
}
}
pub struct Then2<T, F, Fut> {
inp: Pin<Box<T>>,
f: Pin<Box<F>>,
@@ -78,10 +65,7 @@ where
T: Stream,
F: Fn(<T as Stream>::Item) -> Fut,
{
pub fn new(inp: T, f: F) -> Self
where
T: EventTransform,
{
pub fn new(inp: T, f: F) -> Self {
Self {
inp: Box::pin(inp),
f: Box::pin(f),
@@ -135,56 +119,6 @@ where
}
}
impl<T, F, Fut> WithTransformProperties for Then2<T, F, Fut>
where
T: EventTransform,
{
fn query_transform_properties(&self) -> TransformProperties {
self.inp.query_transform_properties()
}
}
impl<T, F, Fut> EventTransform for Then2<T, F, Fut>
where
T: EventTransform + Send,
F: Send,
Fut: Send,
{
fn transform(&mut self, src: Box<dyn Events>) -> Box<dyn Events> {
todo!()
}
}
pub trait TransformerExt {
fn enumerate2(self) -> Enumerate2<Self>
where
Self: EventTransform + Sized;
fn then2<F, Fut>(self, f: F) -> Then2<Self, F, Fut>
where
Self: EventTransform + Stream + Sized,
F: Fn(<Self as Stream>::Item) -> Fut,
Fut: Future;
}
impl<T> TransformerExt for T {
fn enumerate2(self) -> Enumerate2<Self>
where
Self: EventTransform + Sized,
{
Enumerate2::new(self)
}
fn then2<F, Fut>(self, f: F) -> Then2<Self, F, Fut>
where
Self: EventTransform + Stream + Sized,
F: Fn(<Self as Stream>::Item) -> Fut,
Fut: Future,
{
Then2::new(self, f)
}
}
pub struct VecStream<T> {
inp: VecDeque<T>,
}
@@ -211,21 +145,6 @@ where
}
}
impl<T> WithTransformProperties for VecStream<T> {
fn query_transform_properties(&self) -> TransformProperties {
todo!()
}
}
impl<T> EventTransform for VecStream<T>
where
T: Send,
{
fn transform(&mut self, src: Box<dyn Events>) -> Box<dyn Events> {
todo!()
}
}
/// Wrap any event stream and provide transformation properties.
pub struct PlainEventStream<INP, T>
where
@@ -259,7 +178,9 @@ where
Ok(item) => Ok(match item {
StreamItem::DataItem(item) => StreamItem::DataItem(match item {
RangeCompletableItem::RangeComplete => RangeCompletableItem::RangeComplete,
RangeCompletableItem::Data(item) => RangeCompletableItem::Data(Box::new(item)),
RangeCompletableItem::Data(item) => {
RangeCompletableItem::Data(Box::new(item))
}
}),
StreamItem::Log(item) => StreamItem::Log(item),
StreamItem::Stats(item) => StreamItem::Stats(item),

View File

@@ -1,84 +0,0 @@
//! Helper functions to create transforms which act locally on a batch of events.
//! Tailored to the usage pattern given by `TransformQuery`.
use crate::channelevents::ChannelEvents;
use crate::eventsdim0::EventsDim0;
use items_0::transform::EventTransform;
use items_0::transform::TransformEvent;
use items_0::transform::TransformProperties;
use items_0::transform::WithTransformProperties;
use items_0::Appendable;
use items_0::AsAnyMut;
use items_0::Empty;
use items_0::Events;
use items_0::EventsNonObj;
use netpod::log::*;
use std::mem;
struct TransformEventIdentity {}
impl WithTransformProperties for TransformEventIdentity {
fn query_transform_properties(&self) -> TransformProperties {
todo!()
}
}
impl EventTransform for TransformEventIdentity {
fn transform(&mut self, src: Box<dyn Events>) -> Box<dyn Events> {
src
}
}
pub fn make_transform_identity() -> TransformEvent {
TransformEvent(Box::new(TransformEventIdentity {}))
}
struct TransformEventMinMaxAvg {}
impl WithTransformProperties for TransformEventMinMaxAvg {
fn query_transform_properties(&self) -> TransformProperties {
todo!()
}
}
impl EventTransform for TransformEventMinMaxAvg {
fn transform(&mut self, mut src: Box<dyn Events>) -> Box<dyn Events> {
src.to_min_max_avg()
}
}
pub fn make_transform_min_max_avg() -> TransformEvent {
TransformEvent(Box::new(TransformEventMinMaxAvg {}))
}
struct TransformEventPulseIdDiff {
pulse_last: Option<u64>,
}
impl WithTransformProperties for TransformEventPulseIdDiff {
fn query_transform_properties(&self) -> TransformProperties {
todo!()
}
}
impl EventTransform for TransformEventPulseIdDiff {
fn transform(&mut self, src: Box<dyn Events>) -> Box<dyn Events> {
let (tss, pulses) = EventsNonObj::into_tss_pulses(src);
let mut item = EventsDim0::empty();
let pulse_last = &mut self.pulse_last;
for (ts, pulse) in tss.into_iter().zip(pulses) {
let value = if let Some(last) = pulse_last {
pulse as i64 - *last as i64
} else {
0
};
item.push(ts, pulse, value);
*pulse_last = Some(pulse);
}
Box::new(ChannelEvents::Events(Box::new(item)))
}
}
pub fn make_transform_pulse_id_diff() -> TransformEvent {
TransformEvent(Box::new(TransformEventPulseIdDiff { pulse_last: None }))
}