WIP binned bins
This commit is contained in:
@@ -31,3 +31,6 @@ thiserror = { git = "https://github.com/dominikwerder/thiserror.git", branch = "
|
||||
|
||||
[features]
|
||||
heavy = []
|
||||
|
||||
[dev-dependencies]
|
||||
tokio = { version = "1", features = ["rt"] }
|
||||
|
||||
@@ -613,6 +613,14 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
fn binned_bins_timeweight_traitobj(
|
||||
&self,
|
||||
range: netpod::BinnedRange<TsNano>,
|
||||
) -> Box<dyn items_0::timebin::BinnedBinsTimeweightTrait> {
|
||||
let ret = super::timeweight::timeweight_bins::BinnedBinsTimeweight::<EVT>::new(range);
|
||||
Box::new(ret)
|
||||
}
|
||||
|
||||
fn fix_numerics(&mut self) {
|
||||
for ((_min, _max), _avg) in self
|
||||
.mins
|
||||
|
||||
@@ -81,7 +81,7 @@ impl Container<String> for VecDeque<String> {
|
||||
}
|
||||
|
||||
fn get_iter_ty_1(&self, pos: usize) -> Option<&str> {
|
||||
todo!()
|
||||
self.get(pos).map(|x| x.as_str())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -174,14 +174,7 @@ where
|
||||
pub val: EVT::IterTy1<'a>,
|
||||
}
|
||||
|
||||
impl<'a, EVT> EventSingleRef<'a, EVT>
|
||||
where
|
||||
EVT: EventValueType,
|
||||
{
|
||||
pub fn to_owned(&self) {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
impl<'a, EVT> EventSingleRef<'a, EVT> where EVT: EventValueType {}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct EventSingle<EVT> {
|
||||
@@ -266,25 +259,6 @@ where
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn ts_first(&self) -> Option<TsNano> {
|
||||
self.tss.front().map(|&x| x)
|
||||
}
|
||||
|
||||
pub fn ts_last(&self) -> Option<TsNano> {
|
||||
self.tss.back().map(|&x| x)
|
||||
}
|
||||
|
||||
fn _len_before(&self, end: TsNano) -> usize {
|
||||
let tss = &self.tss;
|
||||
let pp = tss.partition_point(|&x| x < end);
|
||||
assert!(pp <= tss.len(), "len_before pp {} len {}", pp, tss.len());
|
||||
pp
|
||||
}
|
||||
|
||||
fn _pop_front(&mut self) -> Option<EventSingleRef<EVT>> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
pub fn push_back(&mut self, ts: TsNano, val: EVT) {
|
||||
self.tss.push_back(ts);
|
||||
self.vals.push_back(val);
|
||||
|
||||
@@ -1,3 +1,6 @@
|
||||
mod bins00;
|
||||
mod bins_gen;
|
||||
mod compare;
|
||||
mod events00;
|
||||
|
||||
use super::container_events::ContainerEvents;
|
||||
|
||||
54
src/binning/test/bins00.rs
Normal file
54
src/binning/test/bins00.rs
Normal file
@@ -0,0 +1,54 @@
|
||||
use super::bins_gen::bins_gen_dim0_f32_v00;
|
||||
use super::compare::exp_avgs;
|
||||
use super::compare::exp_cnts;
|
||||
use super::compare::exp_maxs;
|
||||
use super::compare::exp_mins;
|
||||
use super::events00::pu;
|
||||
use crate::binning::container_events::ContainerEvents;
|
||||
use crate::binning::test::bins_gen::boxed_conts;
|
||||
use crate::binning::timeweight::timeweight_bins_stream::BinnedBinsTimeweightStream;
|
||||
use crate::binning::timeweight::timeweight_events::BinnedEventsTimeweight;
|
||||
use futures_util::StreamExt;
|
||||
use items_0::timebin::BinningggContainerBinsDyn;
|
||||
use netpod::log::*;
|
||||
use netpod::range::evrange::NanoRange;
|
||||
use netpod::BinnedRange;
|
||||
use netpod::DtMs;
|
||||
use netpod::TsNano;
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
#[cstm(name = "Error")]
|
||||
enum Error {
|
||||
Timeweight(#[from] crate::binning::timeweight::timeweight_events::Error),
|
||||
AssertMsg(String),
|
||||
Compare(#[from] super::compare::Error),
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_bin_events_f32_simple_01() -> Result<(), Error> {
|
||||
let fut = async {
|
||||
let beg = TsNano::from_ms(100);
|
||||
let end = TsNano::from_ms(500);
|
||||
let bin_len = DtMs::from_ms_u64(10);
|
||||
let nano_range = NanoRange {
|
||||
beg: beg.ns(),
|
||||
end: end.ns(),
|
||||
};
|
||||
let range = BinnedRange::from_nano_range(nano_range, bin_len);
|
||||
let inp = bins_gen_dim0_f32_v00();
|
||||
let inp = boxed_conts(inp);
|
||||
let mut stream = BinnedBinsTimeweightStream::new(range, inp);
|
||||
while let Some(bin) = stream.next().await {
|
||||
eprintln!("bin {:?}", bin);
|
||||
}
|
||||
// exp_cnts(&bins, "2 3")?;
|
||||
// exp_mins(&bins, "2. 1.")?;
|
||||
// exp_maxs(&bins, "2.4 2.4")?;
|
||||
// exp_avgs(&bins, "2.30 1.5333")?;
|
||||
Ok(())
|
||||
};
|
||||
let rt = tokio::runtime::Builder::new_current_thread()
|
||||
.build()
|
||||
.unwrap();
|
||||
rt.block_on(fut)
|
||||
}
|
||||
25
src/binning/test/bins_gen.rs
Normal file
25
src/binning/test/bins_gen.rs
Normal file
@@ -0,0 +1,25 @@
|
||||
use crate::binning::container_bins::ContainerBins;
|
||||
use futures_util::Stream;
|
||||
use futures_util::StreamExt;
|
||||
use items_0::streamitem::RangeCompletableItem;
|
||||
use items_0::streamitem::Sitemty;
|
||||
use items_0::streamitem::StreamItem;
|
||||
use items_0::timebin::BinningggContainerBinsDyn;
|
||||
use std::pin::Pin;
|
||||
|
||||
pub(super) fn boxed_conts<S>(inp: S) -> Pin<Box<dyn Stream<Item = <S as Stream>::Item> + Send>>
|
||||
where
|
||||
S: Stream + Send + 'static,
|
||||
{
|
||||
Box::pin(inp)
|
||||
}
|
||||
|
||||
pub(super) fn bins_gen_dim0_f32_v00(
|
||||
) -> impl Stream<Item = Sitemty<Box<dyn BinningggContainerBinsDyn>>> {
|
||||
futures_util::stream::iter((0usize..1000).into_iter())
|
||||
.map(|x| {
|
||||
let c = ContainerBins::<f32>::new();
|
||||
Box::new(c) as Box<dyn BinningggContainerBinsDyn>
|
||||
})
|
||||
.map(|x| Ok(StreamItem::DataItem(RangeCompletableItem::Data(x))))
|
||||
}
|
||||
144
src/binning/test/compare.rs
Normal file
144
src/binning/test/compare.rs
Normal file
@@ -0,0 +1,144 @@
|
||||
use crate::binning::container_bins::ContainerBins;
|
||||
use std::collections::VecDeque;
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
#[cstm(name = "Compare")]
|
||||
pub enum Error {
|
||||
AssertMsg(String),
|
||||
}
|
||||
|
||||
trait IntoVecDequeU64 {
|
||||
fn into_vec_deque_u64(self) -> VecDeque<u64>;
|
||||
}
|
||||
|
||||
impl IntoVecDequeU64 for &str {
|
||||
fn into_vec_deque_u64(self) -> VecDeque<u64> {
|
||||
self.split_ascii_whitespace()
|
||||
.map(|x| x.parse().unwrap())
|
||||
.collect()
|
||||
}
|
||||
}
|
||||
trait IntoVecDequeF32 {
|
||||
fn into_vec_deque_f32(self) -> VecDeque<f32>;
|
||||
}
|
||||
|
||||
impl IntoVecDequeF32 for &str {
|
||||
fn into_vec_deque_f32(self) -> VecDeque<f32> {
|
||||
self.split_ascii_whitespace()
|
||||
.map(|x| x.parse().unwrap())
|
||||
.collect()
|
||||
}
|
||||
}
|
||||
|
||||
fn exp_u64<'a>(
|
||||
vals: impl Iterator<Item = &'a u64>,
|
||||
exps: impl Iterator<Item = &'a u64>,
|
||||
tag: &str,
|
||||
) -> Result<(), Error> {
|
||||
let mut it_a = vals;
|
||||
let mut it_b = exps;
|
||||
let mut i = 0;
|
||||
loop {
|
||||
let a = it_a.next();
|
||||
let b = it_b.next();
|
||||
if a.is_none() && b.is_none() {
|
||||
break;
|
||||
}
|
||||
if let (Some(&val), Some(&exp)) = (a, b) {
|
||||
if val != exp {
|
||||
return Err(Error::AssertMsg(format!(
|
||||
"{tag} val {} exp {} i {}",
|
||||
val, exp, i
|
||||
)));
|
||||
}
|
||||
} else {
|
||||
return Err(Error::AssertMsg(format!("{tag} len mismatch")));
|
||||
}
|
||||
i += 1;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn exp_f32<'a>(
|
||||
vals: impl Iterator<Item = &'a f32>,
|
||||
exps: impl Iterator<Item = &'a f32>,
|
||||
tag: &str,
|
||||
) -> Result<(), Error> {
|
||||
let mut it_a = vals;
|
||||
let mut it_b = exps;
|
||||
let mut i = 0;
|
||||
loop {
|
||||
let a = it_a.next();
|
||||
let b = it_b.next();
|
||||
if a.is_none() && b.is_none() {
|
||||
break;
|
||||
}
|
||||
if let (Some(&val), Some(&exp)) = (a, b) {
|
||||
if netpod::f32_close(val, exp) == false {
|
||||
return Err(Error::AssertMsg(format!(
|
||||
"{tag} val {} exp {} i {}",
|
||||
val, exp, i
|
||||
)));
|
||||
}
|
||||
} else {
|
||||
return Err(Error::AssertMsg(format!("{tag} len mismatch")));
|
||||
}
|
||||
i += 1;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(super) fn exp_cnts(bins: &ContainerBins<f32>, exps: impl IntoVecDequeU64) -> Result<(), Error> {
|
||||
exp_u64(
|
||||
bins.cnts_iter(),
|
||||
exps.into_vec_deque_u64().iter(),
|
||||
"exp_cnts",
|
||||
)
|
||||
}
|
||||
|
||||
pub(super) fn exp_mins(bins: &ContainerBins<f32>, exps: impl IntoVecDequeF32) -> Result<(), Error> {
|
||||
exp_f32(
|
||||
bins.mins_iter(),
|
||||
exps.into_vec_deque_f32().iter(),
|
||||
"exp_mins",
|
||||
)
|
||||
}
|
||||
|
||||
pub(super) fn exp_maxs(bins: &ContainerBins<f32>, exps: impl IntoVecDequeF32) -> Result<(), Error> {
|
||||
exp_f32(
|
||||
bins.maxs_iter(),
|
||||
exps.into_vec_deque_f32().iter(),
|
||||
"exp_maxs",
|
||||
)
|
||||
}
|
||||
|
||||
pub(super) fn exp_avgs(bins: &ContainerBins<f32>, exps: impl IntoVecDequeF32) -> Result<(), Error> {
|
||||
let exps = exps.into_vec_deque_f32();
|
||||
let mut it_a = bins.iter_debug();
|
||||
let mut it_b = exps.iter();
|
||||
let mut i = 0;
|
||||
loop {
|
||||
let a = it_a.next();
|
||||
let b = it_b.next();
|
||||
if a.is_none() && b.is_none() {
|
||||
break;
|
||||
}
|
||||
if let (Some(a), Some(&exp)) = (a, b) {
|
||||
let val = *a.avg as f32;
|
||||
if netpod::f32_close(val, exp) == false {
|
||||
return Err(Error::AssertMsg(format!(
|
||||
"exp_avgs val {} exp {} i {}",
|
||||
val, exp, i
|
||||
)));
|
||||
}
|
||||
} else {
|
||||
return Err(Error::AssertMsg(format!(
|
||||
"len mismatch {} vs {}",
|
||||
bins.len(),
|
||||
exps.len()
|
||||
)));
|
||||
}
|
||||
i += 1;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
@@ -1,22 +1,23 @@
|
||||
use super::compare::exp_avgs;
|
||||
use super::compare::exp_cnts;
|
||||
use super::compare::exp_maxs;
|
||||
use super::compare::exp_mins;
|
||||
use crate::binning::container_bins::ContainerBins;
|
||||
use crate::binning::container_events::ContainerEvents;
|
||||
use crate::binning::timeweight::timeweight_events::BinnedEventsTimeweight;
|
||||
use daqbuf_err as err;
|
||||
use err::thiserror;
|
||||
use err::ThisError;
|
||||
use netpod::log::*;
|
||||
use netpod::range::evrange::NanoRange;
|
||||
use netpod::BinnedRange;
|
||||
use netpod::DtMs;
|
||||
use netpod::EnumVariant;
|
||||
use netpod::TsNano;
|
||||
use std::collections::VecDeque;
|
||||
|
||||
#[derive(Debug, ThisError)]
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
#[cstm(name = "Error")]
|
||||
enum Error {
|
||||
Timeweight(#[from] crate::binning::timeweight::timeweight_events::Error),
|
||||
AssertMsg(String),
|
||||
Compare(#[from] super::compare::Error),
|
||||
}
|
||||
|
||||
// fn prepare_data_with_cuts(beg_ms: u64, cuts: VecDeque<u64>) -> VecDeque<ContainerEvents<f32>> {
|
||||
@@ -27,7 +28,7 @@ enum Error {
|
||||
// let ivl = DtMs::from_ms_u64(x)
|
||||
// }
|
||||
|
||||
fn pu(c: &mut ContainerEvents<f32>, ts_ms: u64, val: f32)
|
||||
pub(super) fn pu(c: &mut ContainerEvents<f32>, ts_ms: u64, val: f32)
|
||||
// where
|
||||
// C: AsMut<ContainerEvents<f32>>,
|
||||
// C: std::borrow::BorrowMut<ContainerEvents<f32>>,
|
||||
@@ -35,145 +36,6 @@ fn pu(c: &mut ContainerEvents<f32>, ts_ms: u64, val: f32)
|
||||
c.push_back(TsNano::from_ms(ts_ms), val);
|
||||
}
|
||||
|
||||
trait IntoVecDequeU64 {
|
||||
fn into_vec_deque_u64(self) -> VecDeque<u64>;
|
||||
}
|
||||
|
||||
impl IntoVecDequeU64 for &str {
|
||||
fn into_vec_deque_u64(self) -> VecDeque<u64> {
|
||||
self.split_ascii_whitespace()
|
||||
.map(|x| x.parse().unwrap())
|
||||
.collect()
|
||||
}
|
||||
}
|
||||
trait IntoVecDequeF32 {
|
||||
fn into_vec_deque_f32(self) -> VecDeque<f32>;
|
||||
}
|
||||
|
||||
impl IntoVecDequeF32 for &str {
|
||||
fn into_vec_deque_f32(self) -> VecDeque<f32> {
|
||||
self.split_ascii_whitespace()
|
||||
.map(|x| x.parse().unwrap())
|
||||
.collect()
|
||||
}
|
||||
}
|
||||
|
||||
fn exp_u64<'a>(
|
||||
vals: impl Iterator<Item = &'a u64>,
|
||||
exps: impl Iterator<Item = &'a u64>,
|
||||
tag: &str,
|
||||
) -> Result<(), Error> {
|
||||
let mut it_a = vals;
|
||||
let mut it_b = exps;
|
||||
let mut i = 0;
|
||||
loop {
|
||||
let a = it_a.next();
|
||||
let b = it_b.next();
|
||||
if a.is_none() && b.is_none() {
|
||||
break;
|
||||
}
|
||||
if let (Some(&val), Some(&exp)) = (a, b) {
|
||||
if val != exp {
|
||||
return Err(Error::AssertMsg(format!(
|
||||
"{tag} val {} exp {} i {}",
|
||||
val, exp, i
|
||||
)));
|
||||
}
|
||||
} else {
|
||||
return Err(Error::AssertMsg(format!("{tag} len mismatch")));
|
||||
}
|
||||
i += 1;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn exp_f32<'a>(
|
||||
vals: impl Iterator<Item = &'a f32>,
|
||||
exps: impl Iterator<Item = &'a f32>,
|
||||
tag: &str,
|
||||
) -> Result<(), Error> {
|
||||
let mut it_a = vals;
|
||||
let mut it_b = exps;
|
||||
let mut i = 0;
|
||||
loop {
|
||||
let a = it_a.next();
|
||||
let b = it_b.next();
|
||||
if a.is_none() && b.is_none() {
|
||||
break;
|
||||
}
|
||||
if let (Some(&val), Some(&exp)) = (a, b) {
|
||||
if netpod::f32_close(val, exp) == false {
|
||||
return Err(Error::AssertMsg(format!(
|
||||
"{tag} val {} exp {} i {}",
|
||||
val, exp, i
|
||||
)));
|
||||
}
|
||||
} else {
|
||||
return Err(Error::AssertMsg(format!("{tag} len mismatch")));
|
||||
}
|
||||
i += 1;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
fn exp_cnts(bins: &ContainerBins<f32>, exps: impl IntoVecDequeU64) -> Result<(), Error> {
|
||||
exp_u64(
|
||||
bins.cnts_iter(),
|
||||
exps.into_vec_deque_u64().iter(),
|
||||
"exp_cnts",
|
||||
)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
fn exp_mins(bins: &ContainerBins<f32>, exps: impl IntoVecDequeF32) -> Result<(), Error> {
|
||||
exp_f32(
|
||||
bins.mins_iter(),
|
||||
exps.into_vec_deque_f32().iter(),
|
||||
"exp_mins",
|
||||
)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
fn exp_maxs(bins: &ContainerBins<f32>, exps: impl IntoVecDequeF32) -> Result<(), Error> {
|
||||
exp_f32(
|
||||
bins.maxs_iter(),
|
||||
exps.into_vec_deque_f32().iter(),
|
||||
"exp_maxs",
|
||||
)
|
||||
}
|
||||
|
||||
fn exp_avgs(bins: &ContainerBins<f32>, exps: impl IntoVecDequeF32) -> Result<(), Error> {
|
||||
let exps = exps.into_vec_deque_f32();
|
||||
let mut it_a = bins.iter_debug();
|
||||
let mut it_b = exps.iter();
|
||||
let mut i = 0;
|
||||
loop {
|
||||
let a = it_a.next();
|
||||
let b = it_b.next();
|
||||
if a.is_none() && b.is_none() {
|
||||
break;
|
||||
}
|
||||
if let (Some(a), Some(&exp)) = (a, b) {
|
||||
let val = *a.avg as f32;
|
||||
if netpod::f32_close(val, exp) == false {
|
||||
return Err(Error::AssertMsg(format!(
|
||||
"exp_avgs val {} exp {} i {}",
|
||||
val, exp, i
|
||||
)));
|
||||
}
|
||||
} else {
|
||||
return Err(Error::AssertMsg(format!(
|
||||
"len mismatch {} vs {}",
|
||||
bins.len(),
|
||||
exps.len()
|
||||
)));
|
||||
}
|
||||
i += 1;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_bin_events_f32_simple_with_before_00() -> Result<(), Error> {
|
||||
let beg = TsNano::from_ms(110);
|
||||
@@ -513,3 +375,22 @@ fn test_bin_events_enum_simple_range_final() -> Result<(), Error> {
|
||||
let bins = binner.output();
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_bin_events_string_simple_range_final() -> Result<(), Error> {
|
||||
let beg = TsNano::from_ms(100);
|
||||
let end = TsNano::from_ms(120);
|
||||
let nano_range = NanoRange {
|
||||
beg: beg.ns(),
|
||||
end: end.ns(),
|
||||
};
|
||||
let range = BinnedRange::from_nano_range(nano_range, DtMs::from_ms_u64(10));
|
||||
let mut binner = BinnedEventsTimeweight::new(range);
|
||||
let mut evs = ContainerEvents::new();
|
||||
evs.push_back(TsNano::from_ms(103), EnumVariant::new(1, "one"));
|
||||
evs.push_back(TsNano::from_ms(104), EnumVariant::new(2, "two"));
|
||||
binner.ingest(&evs)?;
|
||||
binner.input_done_range_final()?;
|
||||
let bins = binner.output();
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -1,10 +1,10 @@
|
||||
pub mod timeweight_bins;
|
||||
pub mod timeweight_bins_dyn;
|
||||
pub mod timeweight_bins_lazy;
|
||||
pub mod timeweight_bins_stream;
|
||||
pub mod timeweight_events;
|
||||
pub mod timeweight_events_dyn;
|
||||
|
||||
use netpod::log::*;
|
||||
|
||||
#[allow(unused)]
|
||||
macro_rules! trace_init { ($($arg:tt)*) => ( if true { trace!($($arg)*); }) }
|
||||
|
||||
|
||||
@@ -1,4 +1,59 @@
|
||||
use netpod::log::*;
|
||||
use crate::binning::container_bins::ContainerBins;
|
||||
use crate::binning::container_events::EventValueType;
|
||||
use crate::log::*;
|
||||
use items_0::timebin::BinnedBinsTimeweightTrait;
|
||||
use items_0::timebin::BinningggError;
|
||||
use items_0::timebin::BinsBoxed;
|
||||
use netpod::BinnedRange;
|
||||
use netpod::TsNano;
|
||||
|
||||
#[allow(unused)]
|
||||
macro_rules! trace_init { ($($arg:tt)*) => ( if true { trace!($($arg)*); }) }
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct BinnedBinsTimeweight<EVT>
|
||||
where
|
||||
EVT: EventValueType,
|
||||
{
|
||||
range: BinnedRange<TsNano>,
|
||||
out: ContainerBins<EVT>,
|
||||
produce_cnt_zero: bool,
|
||||
}
|
||||
|
||||
impl<EVT> BinnedBinsTimeweight<EVT>
|
||||
where
|
||||
EVT: EventValueType,
|
||||
{
|
||||
pub fn new(range: BinnedRange<TsNano>) -> Self {
|
||||
trace_init!("BinnedBinsTimeweight::new {}", range);
|
||||
let active_beg = range.nano_beg();
|
||||
let active_end = active_beg.add_dt_nano(range.bin_len.to_dt_nano());
|
||||
let active_len = active_end.delta(active_beg);
|
||||
Self {
|
||||
range,
|
||||
out: ContainerBins::new(),
|
||||
produce_cnt_zero: true,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<EVT> BinnedBinsTimeweightTrait for BinnedBinsTimeweight<EVT>
|
||||
where
|
||||
EVT: EventValueType,
|
||||
{
|
||||
fn ingest(&mut self, evs: &BinsBoxed) -> Result<(), BinningggError> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn input_done_range_final(&mut self) -> Result<(), BinningggError> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn input_done_range_open(&mut self) -> Result<(), BinningggError> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn output(&mut self) -> Result<Option<BinsBoxed>, BinningggError> {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,27 +1 @@
|
||||
use futures_util::Stream;
|
||||
use items_0::streamitem::Sitemty;
|
||||
use items_0::timebin::BinningggContainerBinsDyn;
|
||||
use netpod::BinnedRange;
|
||||
use netpod::TsNano;
|
||||
use std::pin::Pin;
|
||||
use std::task::Context;
|
||||
use std::task::Poll;
|
||||
|
||||
pub struct BinnedBinsTimeweightStream {}
|
||||
|
||||
impl BinnedBinsTimeweightStream {
|
||||
pub fn new(
|
||||
range: BinnedRange<TsNano>,
|
||||
inp: Pin<Box<dyn Stream<Item = Sitemty<Box<dyn BinningggContainerBinsDyn>>> + Send>>,
|
||||
) -> Self {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
||||
impl Stream for BinnedBinsTimeweightStream {
|
||||
type Item = Sitemty<Box<dyn BinningggContainerBinsDyn>>;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
||||
52
src/binning/timeweight/timeweight_bins_lazy.rs
Normal file
52
src/binning/timeweight/timeweight_bins_lazy.rs
Normal file
@@ -0,0 +1,52 @@
|
||||
use crate::log::*;
|
||||
use items_0::timebin::BinnedBinsTimeweightTrait;
|
||||
use items_0::timebin::BinningggError;
|
||||
use items_0::timebin::BinsBoxed;
|
||||
use netpod::BinnedRange;
|
||||
use netpod::TsNano;
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
#[cstm(name = "BinnedBinsLazy")]
|
||||
pub enum Error {}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct BinnedBinsTimeweightLazy {
|
||||
range: BinnedRange<TsNano>,
|
||||
binned: Option<Box<dyn BinnedBinsTimeweightTrait>>,
|
||||
}
|
||||
|
||||
impl BinnedBinsTimeweightLazy {
|
||||
pub fn new(range: BinnedRange<TsNano>) -> Self {
|
||||
Self {
|
||||
range,
|
||||
binned: None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn ingest(&mut self, evs: &BinsBoxed) -> Result<(), BinningggError> {
|
||||
self.binned
|
||||
.get_or_insert_with(|| evs.binned_bins_timeweight_traitobj(self.range.clone()))
|
||||
.ingest(evs)
|
||||
}
|
||||
|
||||
pub fn input_done_range_final(&mut self) -> Result<(), BinningggError> {
|
||||
self.binned
|
||||
.as_mut()
|
||||
.map(|x| x.input_done_range_final())
|
||||
.unwrap_or_else(|| {
|
||||
debug!("TODO something to do if we miss the binner here?");
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
|
||||
pub fn input_done_range_open(&mut self) -> Result<(), BinningggError> {
|
||||
self.binned
|
||||
.as_mut()
|
||||
.map(|x| x.input_done_range_open())
|
||||
.unwrap_or(Ok(()))
|
||||
}
|
||||
|
||||
pub fn output(&mut self) -> Result<Option<BinsBoxed>, BinningggError> {
|
||||
self.binned.as_mut().map(|x| x.output()).unwrap_or(Ok(None))
|
||||
}
|
||||
}
|
||||
165
src/binning/timeweight/timeweight_bins_stream.rs
Normal file
165
src/binning/timeweight/timeweight_bins_stream.rs
Normal file
@@ -0,0 +1,165 @@
|
||||
use super::timeweight_bins_lazy::BinnedBinsTimeweightLazy;
|
||||
use crate::log::*;
|
||||
use futures_util::Stream;
|
||||
use futures_util::StreamExt;
|
||||
use items_0::streamitem::sitem_err2_from_string;
|
||||
use items_0::streamitem::LogItem;
|
||||
use items_0::streamitem::Sitemty;
|
||||
use items_0::timebin::BinnedBinsTimeweightTrait;
|
||||
use items_0::timebin::BinningggContainerBinsDyn;
|
||||
use items_0::timebin::BinsBoxed;
|
||||
use netpod::BinnedRange;
|
||||
use netpod::TsNano;
|
||||
use std::fmt;
|
||||
use std::ops::ControlFlow;
|
||||
use std::pin::Pin;
|
||||
use std::task::Context;
|
||||
use std::task::Poll;
|
||||
|
||||
macro_rules! trace_input_container { ($($arg:tt)*) => ( if false { trace!($($arg)*); }) }
|
||||
|
||||
macro_rules! trace_emit { ($($arg:tt)*) => ( if false { trace!($($arg)*); }) }
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
#[cstm(name = "BinnedEventsTimeweightDyn")]
|
||||
pub enum Error {
|
||||
InnerDynMissing,
|
||||
}
|
||||
|
||||
type ItemA = Box<dyn BinningggContainerBinsDyn>;
|
||||
type ItemB = Sitemty<ItemA>;
|
||||
|
||||
enum StreamState {
|
||||
Reading,
|
||||
Done,
|
||||
}
|
||||
|
||||
pub struct BinnedBinsTimeweightStream {
|
||||
state: StreamState,
|
||||
inp: Pin<Box<dyn Stream<Item = ItemB> + Send>>,
|
||||
range_complete: bool,
|
||||
binned: BinnedBinsTimeweightLazy,
|
||||
}
|
||||
|
||||
impl BinnedBinsTimeweightStream {
|
||||
pub fn new(range: BinnedRange<TsNano>, inp: Pin<Box<dyn Stream<Item = ItemB> + Send>>) -> Self {
|
||||
Self {
|
||||
state: StreamState::Reading,
|
||||
inp,
|
||||
range_complete: false,
|
||||
binned: BinnedBinsTimeweightLazy::new(range),
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_sitemty(
|
||||
mut self: Pin<&mut Self>,
|
||||
item: ItemB,
|
||||
_cx: &mut Context,
|
||||
) -> ControlFlow<Poll<Option<<Self as Stream>::Item>>> {
|
||||
use items_0::streamitem::RangeCompletableItem::*;
|
||||
use items_0::streamitem::StreamItem::*;
|
||||
use ControlFlow::*;
|
||||
use Poll::*;
|
||||
match item {
|
||||
Ok(x) => match x {
|
||||
DataItem(x) => match x {
|
||||
Data(x) => match self.binned.ingest(&x) {
|
||||
Ok(()) => match self.binned.output() {
|
||||
Ok(Some(x)) => {
|
||||
if x.len() == 0 {
|
||||
Continue(())
|
||||
} else {
|
||||
let ret = Ok(DataItem(Data(x)));
|
||||
Break(Ready(Some(ret)))
|
||||
}
|
||||
}
|
||||
Ok(None) => Continue(()),
|
||||
Err(e) => {
|
||||
let e = sitem_err2_from_string(e);
|
||||
Break(Ready(Some(Err(e))))
|
||||
}
|
||||
},
|
||||
Err(e) => {
|
||||
let e = sitem_err2_from_string(e);
|
||||
Break(Ready(Some(Err(e))))
|
||||
}
|
||||
},
|
||||
RangeComplete => {
|
||||
self.range_complete = true;
|
||||
Continue(())
|
||||
}
|
||||
},
|
||||
Log(x) => Break(Ready(Some(Ok(Log(x))))),
|
||||
Stats(x) => Break(Ready(Some(Ok(Stats(x))))),
|
||||
},
|
||||
Err(e) => {
|
||||
self.state = StreamState::Done;
|
||||
Break(Ready(Some(Err(e))))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_eos(
|
||||
mut self: Pin<&mut Self>,
|
||||
_cx: &mut Context,
|
||||
) -> Poll<Option<<Self as Stream>::Item>> {
|
||||
trace_input_container!("handle_eos");
|
||||
use items_0::streamitem::RangeCompletableItem::*;
|
||||
use items_0::streamitem::StreamItem::*;
|
||||
use Poll::*;
|
||||
self.state = StreamState::Done;
|
||||
if self.range_complete {
|
||||
self.binned
|
||||
.input_done_range_final()
|
||||
.map_err(sitem_err2_from_string)?;
|
||||
} else {
|
||||
self.binned
|
||||
.input_done_range_open()
|
||||
.map_err(sitem_err2_from_string)?;
|
||||
}
|
||||
match self.binned.output().map_err(sitem_err2_from_string)? {
|
||||
Some(x) => {
|
||||
trace_emit!("seeing ready bins {:?}", x);
|
||||
Ready(Some(Ok(DataItem(Data(x)))))
|
||||
}
|
||||
None => {
|
||||
let item = LogItem::from_node(888, Level::INFO, format!("no bins ready on eos"));
|
||||
Ready(Some(Ok(Log(item))))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_main(
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &mut Context,
|
||||
) -> ControlFlow<Poll<Option<<Self as Stream>::Item>>> {
|
||||
use ControlFlow::*;
|
||||
use Poll::*;
|
||||
let ret = match &self.state {
|
||||
StreamState::Reading => match self.as_mut().inp.poll_next_unpin(cx) {
|
||||
Ready(Some(x)) => self.as_mut().handle_sitemty(x, cx),
|
||||
Ready(None) => Break(self.as_mut().handle_eos(cx)),
|
||||
Pending => Break(Pending),
|
||||
},
|
||||
StreamState::Done => Break(Ready(None)),
|
||||
};
|
||||
if let Break(Ready(Some(Err(_)))) = ret {
|
||||
self.state = StreamState::Done;
|
||||
}
|
||||
ret
|
||||
}
|
||||
}
|
||||
|
||||
impl Stream for BinnedBinsTimeweightStream {
|
||||
type Item = Sitemty<Box<dyn BinningggContainerBinsDyn>>;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
||||
use ControlFlow::*;
|
||||
loop {
|
||||
break match self.as_mut().handle_main(cx) {
|
||||
Break(x) => x,
|
||||
Continue(()) => continue,
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -6,17 +6,17 @@ use crate::binning::container_events::ContainerEventsTakeUpTo;
|
||||
use crate::binning::container_events::EventSingle;
|
||||
use crate::binning::container_events::EventSingleRef;
|
||||
use crate::binning::container_events::PartialOrdEvtA;
|
||||
use crate::log::*;
|
||||
use core::fmt;
|
||||
use daqbuf_err as err;
|
||||
use err::thiserror;
|
||||
use err::ThisError;
|
||||
use netpod::log::*;
|
||||
use netpod::BinnedRange;
|
||||
use netpod::DtNano;
|
||||
use netpod::TsNano;
|
||||
use std::mem;
|
||||
|
||||
macro_rules! trace_ { ($($arg:tt)*) => ( if true { eprintln!($($arg)*); }) }
|
||||
macro_rules! trace_ { ($($arg:tt)*) => ( if false { eprintln!($($arg)*); }) }
|
||||
|
||||
macro_rules! trace_init { ($($arg:tt)*) => ( if true { trace_!($($arg)*); }) }
|
||||
|
||||
|
||||
@@ -2,6 +2,7 @@ use super::timeweight_events::BinnedEventsTimeweight;
|
||||
use crate::binning::container_events::ContainerEvents;
|
||||
use crate::binning::container_events::EventValueType;
|
||||
use crate::channelevents::ChannelEvents;
|
||||
use crate::log::*;
|
||||
use daqbuf_err as err;
|
||||
use err::thiserror;
|
||||
use err::ThisError;
|
||||
@@ -14,7 +15,6 @@ use items_0::timebin::BinningggContainerBinsDyn;
|
||||
use items_0::timebin::BinningggError;
|
||||
use items_0::timebin::BinsBoxed;
|
||||
use items_0::timebin::EventsBoxed;
|
||||
use netpod::log::*;
|
||||
use netpod::BinnedRange;
|
||||
use netpod::TsNano;
|
||||
use std::ops::ControlFlow;
|
||||
|
||||
@@ -22,6 +22,10 @@ use items_0::isodate::IsoDateTime;
|
||||
use items_0::Events;
|
||||
use std::fmt;
|
||||
|
||||
mod log {
|
||||
pub use netpod::log::*;
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq)]
|
||||
pub enum ErrorKind {
|
||||
General,
|
||||
|
||||
Reference in New Issue
Block a user