Use different log output, test GapFill
This commit is contained in:
@@ -18,6 +18,7 @@ chrono = { version = "0.4.19", features = ["serde"] }
|
||||
crc32fast = "1.3.2"
|
||||
futures-util = "0.3.24"
|
||||
humantime-serde = "1.1.1"
|
||||
itertools = "0.13.0"
|
||||
autoerr = "0.0.3"
|
||||
thiserror = "=0.0.1"
|
||||
daqbuf-err = { path = "../daqbuf-err" }
|
||||
|
||||
@@ -3,19 +3,17 @@ pub mod agg_bins;
|
||||
use super::container::bins::BinAggedType;
|
||||
use super::container_events::EventValueType;
|
||||
use super::container_events::PulsedVal;
|
||||
use crate::log::*;
|
||||
use core::fmt;
|
||||
use items_0::subfr::SubFrId;
|
||||
use netpod::log::*;
|
||||
use netpod::DtNano;
|
||||
use netpod::EnumVariant;
|
||||
use serde::Deserialize;
|
||||
use serde::Serialize;
|
||||
|
||||
#[allow(unused)]
|
||||
macro_rules! trace_event { ($($arg:tt)*) => ( if false { trace!($($arg)*); }) }
|
||||
|
||||
#[allow(unused)]
|
||||
macro_rules! trace_result { ($($arg:tt)*) => ( if false { trace!($($arg)*); }) }
|
||||
macro_rules! trace_result { ($($arg:tt)*) => ( if true { trace!($($arg)*); }) }
|
||||
|
||||
pub trait AggTimeWeightOutputAvg: BinAggedType + Serialize + for<'a> Deserialize<'a> {}
|
||||
|
||||
@@ -101,7 +99,7 @@ impl AggregatorTimeWeight<f32> for AggregatorNumeric {
|
||||
|
||||
fn ingest(&mut self, dt: DtNano, bl: DtNano, val: f32) {
|
||||
let f = dt.ns() as f64 / bl.ns() as f64;
|
||||
trace_event!("INGEST {} {}", f, val);
|
||||
trace_event!("INGEST {:5} {:7.3} {:7.3}", dt.ms_u64(), f, val);
|
||||
self.sum += f * val as f64;
|
||||
}
|
||||
|
||||
|
||||
@@ -3,10 +3,9 @@ use super::container_events::Container;
|
||||
use super::container_events::EventValueType;
|
||||
use crate::apitypes::ContainerBinsApi;
|
||||
use crate::binning::container::bins::BinAggedContainer;
|
||||
use crate::log::*;
|
||||
use core::fmt;
|
||||
use daqbuf_err as err;
|
||||
use err::thiserror;
|
||||
use err::ThisError;
|
||||
use items_0::apitypes::ToUserFacingApiType;
|
||||
use items_0::collect_s::CollectableDyn;
|
||||
use items_0::collect_s::CollectedDyn;
|
||||
@@ -21,19 +20,20 @@ use items_0::AsAnyMut;
|
||||
use items_0::AsAnyRef;
|
||||
use items_0::TypeName;
|
||||
use items_0::WithLen;
|
||||
use netpod::f32_close;
|
||||
use netpod::TsNano;
|
||||
use std::any;
|
||||
use std::collections::VecDeque;
|
||||
use std::mem;
|
||||
|
||||
#[allow(unused)]
|
||||
macro_rules! trace_init { ($($arg:tt)*) => ( if true { trace!($($arg)*); }) }
|
||||
|
||||
#[derive(Debug, ThisError)]
|
||||
#[cstm(name = "ContainerBins")]
|
||||
pub enum ContainerBinsError {
|
||||
Unordered,
|
||||
}
|
||||
autoerr::create_error_v1!(
|
||||
name(ContainerBinsError, "ContainerBins"),
|
||||
enum variants {
|
||||
Unordered,
|
||||
},
|
||||
);
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct BinRef<'a, EVT, BVT>
|
||||
@@ -263,6 +263,33 @@ where
|
||||
.zip(self.fnls_iter())
|
||||
}
|
||||
|
||||
pub fn zip_iter_2(
|
||||
&self,
|
||||
) -> impl Iterator<
|
||||
Item = (
|
||||
TsNano,
|
||||
TsNano,
|
||||
u64,
|
||||
EVT::IterTy1<'_>,
|
||||
EVT::IterTy1<'_>,
|
||||
BVT::IterTy1<'_>,
|
||||
EVT::IterTy1<'_>,
|
||||
bool,
|
||||
),
|
||||
> {
|
||||
let bins = self;
|
||||
itertools::izip!(
|
||||
bins.ts1s_iter().map(Clone::clone),
|
||||
bins.ts2s_iter().map(Clone::clone),
|
||||
bins.cnts_iter().map(Clone::clone),
|
||||
bins.mins_iter(),
|
||||
bins.maxs_iter(),
|
||||
bins.aggs_iter(),
|
||||
bins.lsts_iter(),
|
||||
bins.fnls_iter().map(Clone::clone),
|
||||
)
|
||||
}
|
||||
|
||||
pub fn edges_iter(
|
||||
&self,
|
||||
) -> std::iter::Zip<
|
||||
@@ -313,6 +340,53 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
pub fn compare_boxed_f32(lhs: &ContainerBins<f32, f32>, rhs: &ContainerBins<f32, f32>) -> bool {
|
||||
if let Some(lhs) = lhs.as_any_ref().downcast_ref::<ContainerBins<f32, f32>>() {
|
||||
if let Some(rhs) = rhs.as_any_ref().downcast_ref::<ContainerBins<f32, f32>>() {
|
||||
if lhs.len() != rhs.len() {
|
||||
error!("length differ");
|
||||
false
|
||||
} else {
|
||||
for (a, b) in lhs.zip_iter_2().zip(rhs.zip_iter_2()) {
|
||||
if a.0 != b.0 {
|
||||
error!("ts1 differ");
|
||||
return false;
|
||||
}
|
||||
if a.1 != b.1 {
|
||||
error!("ts2 differ");
|
||||
return false;
|
||||
}
|
||||
if a.2 != b.2 {
|
||||
error!("cnt differ {:?} {:?}", a, b);
|
||||
return false;
|
||||
}
|
||||
if !f32_close(a.3, b.3) {
|
||||
error!("min differ {:?} {:?}", a, b);
|
||||
return false;
|
||||
}
|
||||
if !f32_close(a.4, b.4) {
|
||||
error!("max differ {:?} {:?}", a, b);
|
||||
return false;
|
||||
}
|
||||
if !f32_close(a.5, b.5) {
|
||||
error!("agg differ {:?} {:?}", a, b);
|
||||
return false;
|
||||
}
|
||||
if !f32_close(a.6, b.6) {
|
||||
error!("lst differ {:?} {:?}", a, b);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
true
|
||||
}
|
||||
} else {
|
||||
panic!("lhs is not bins f32")
|
||||
}
|
||||
} else {
|
||||
panic!("lhs is not bins f32")
|
||||
}
|
||||
}
|
||||
|
||||
impl<EVT, BVT> fmt::Debug for ContainerBins<EVT, BVT>
|
||||
where
|
||||
EVT: EventValueType,
|
||||
|
||||
@@ -15,13 +15,14 @@ use netpod::EnumVariant;
|
||||
use netpod::TsNano;
|
||||
use std::task::Context;
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
#[cstm(name = "Error")]
|
||||
enum Error {
|
||||
Timeweight(#[from] crate::binning::timeweight::timeweight_events::Error),
|
||||
AssertMsg(String),
|
||||
Compare(#[from] super::compare::Error),
|
||||
}
|
||||
autoerr::create_error_v1!(
|
||||
name(Error, "Error"),
|
||||
enum variants {
|
||||
Timeweight(#[from] crate::binning::timeweight::timeweight_events::Error),
|
||||
AssertMsg(String),
|
||||
Compare(#[from] super::compare::Error),
|
||||
},
|
||||
);
|
||||
|
||||
#[test]
|
||||
fn test_bin_events_dim1_f32_00() -> Result<(), Error> {
|
||||
|
||||
@@ -8,13 +8,12 @@ use crate::binning::container_events::EventSingleRef;
|
||||
use crate::binning::container_events::PartialOrdEvtA;
|
||||
use crate::log::*;
|
||||
use core::fmt;
|
||||
use daqbuf_err as err;
|
||||
use netpod::BinnedRange;
|
||||
use netpod::DtNano;
|
||||
use netpod::TsNano;
|
||||
use std::mem;
|
||||
|
||||
macro_rules! trace_ { ($($arg:tt)*) => ( if false { eprintln!($($arg)*); }) }
|
||||
macro_rules! trace_ { ($($arg:tt)*) => ( if false { trace!($($arg)*); }) }
|
||||
|
||||
macro_rules! trace_init { ($($arg:tt)*) => ( if true { trace_!($($arg)*); }) }
|
||||
|
||||
@@ -89,7 +88,7 @@ where
|
||||
lst: LstRef<EVT>,
|
||||
) {
|
||||
let selfname = "ingest_event_with_lst_gt_range_beg_agg";
|
||||
trace_ingest_event!("{selfname} {:?}", ev);
|
||||
trace_ingest_event!("{} {:?}", selfname, ev);
|
||||
if DEBUG_CHECKS {
|
||||
if ev.ts <= self.active_beg {
|
||||
panic!("should never get here");
|
||||
@@ -99,7 +98,7 @@ where
|
||||
}
|
||||
}
|
||||
let dt = ev.ts.delta(self.filled_until);
|
||||
trace_ingest_event!("{selfname} dt {:?} ev {:?}", dt, ev);
|
||||
trace_ingest_event!("{} dt {:?} ev {:?}", selfname, dt, ev);
|
||||
// TODO can the caller already take the value and replace it afterwards with the current value?
|
||||
// This fn could swap the value in lst and directly use it.
|
||||
// This would require that any call path does not mess with lst.
|
||||
@@ -115,7 +114,7 @@ where
|
||||
lst: LstMut<EVT>,
|
||||
) -> Result<(), Error> {
|
||||
let selfname = "ingest_event_with_lst_gt_range_beg_2";
|
||||
trace_ingest_event!("{selfname}");
|
||||
trace_ingest_event!("{}", selfname);
|
||||
self.ingest_event_with_lst_gt_range_beg_agg(ev.clone(), LstRef(lst.0));
|
||||
InnerA::apply_lst_after_event_handled(ev, lst);
|
||||
// self.cnt += 1;
|
||||
@@ -129,7 +128,7 @@ where
|
||||
minmax: &mut MinMax<EVT>,
|
||||
) -> Result<(), Error> {
|
||||
let selfname = "ingest_event_with_lst_gt_range_beg";
|
||||
trace_ingest_event!("{selfname}");
|
||||
trace_ingest_event!("{}", selfname);
|
||||
// TODO if the event is exactly on the current bin first edge, then there is no contribution to the avg yet
|
||||
// and I must initialize the min/max with the current event.
|
||||
InnerA::apply_min_max(&ev, minmax);
|
||||
@@ -144,7 +143,7 @@ where
|
||||
minmax: &mut MinMax<EVT>,
|
||||
) -> Result<(), Error> {
|
||||
let selfname = "ingest_event_with_lst_eq_range_beg";
|
||||
trace_ingest_event!("{selfname}");
|
||||
trace_ingest_event!("{}", selfname);
|
||||
// TODO if the event is exactly on the current bin first edge, then there is no contribution to the avg yet
|
||||
// and I must initialize the min/max with the current event.
|
||||
InnerA::apply_min_max(&ev, minmax);
|
||||
@@ -159,7 +158,7 @@ where
|
||||
minmax: &mut MinMax<EVT>,
|
||||
) -> Result<(), Error> {
|
||||
let selfname = "ingest_with_lst_gt_range_beg";
|
||||
trace_ingest_event!("{selfname} len {}", evs.len());
|
||||
trace_ingest_event!("{} len {}", selfname, evs.len());
|
||||
while let Some(ev) = evs.next() {
|
||||
trace_event_next!("EVENT POP FRONT {:?} {:30}", ev, selfname);
|
||||
if ev.ts <= self.active_beg {
|
||||
@@ -181,24 +180,25 @@ where
|
||||
minmax: &mut MinMax<EVT>,
|
||||
) -> Result<(), Error> {
|
||||
let selfname = "ingest_with_lst_ge_range_beg";
|
||||
trace_ingest_event!("{selfname} len {}", evs.len());
|
||||
trace_ingest_event!("{} len {}", selfname, evs.len());
|
||||
while let Some(ev) = evs.next() {
|
||||
trace_event_next!("EVENT POP FRONT {:?} {:30}", ev, selfname);
|
||||
assert!(ev.ts >= self.active_beg);
|
||||
assert!(ev.ts < self.active_end);
|
||||
if ev.ts == self.active_beg {
|
||||
trace_ingest_event!("{selfname} ts == active_beg");
|
||||
trace_ingest_event!("{} ts == active_beg", selfname);
|
||||
self.ingest_event_with_lst_eq_range_beg(ev, LstMut(lst.0), minmax)?;
|
||||
self.cnt += 1;
|
||||
} else {
|
||||
trace_ingest_event!("{selfname} ts != active_beg");
|
||||
trace_ingest_event!("{} ts != active_beg", selfname);
|
||||
self.ingest_event_with_lst_gt_range_beg(ev.clone(), LstMut(lst.0), minmax)?;
|
||||
self.cnt += 1;
|
||||
break;
|
||||
}
|
||||
}
|
||||
trace_ingest_event!(
|
||||
"{selfname} defer remainder to ingest_with_lst_gt_range_beg len {}",
|
||||
"{} defer remainder to ingest_with_lst_gt_range_beg len {}",
|
||||
selfname,
|
||||
evs.len()
|
||||
);
|
||||
self.ingest_with_lst_gt_range_beg(evs, LstMut(lst.0), minmax)
|
||||
@@ -211,11 +211,10 @@ where
|
||||
minmax: &mut MinMax<EVT>,
|
||||
) -> Result<(), Error> {
|
||||
let selfname = "ingest_with_lst_minmax";
|
||||
trace_ingest_event!("{selfname} len {}", evs.len());
|
||||
trace_ingest_event!("{} len {}", selfname, evs.len());
|
||||
// TODO how to handle the min max? I don't take event data yet out of the container.
|
||||
if let Some(ts0) = evs.ts_first() {
|
||||
trace_ingest_event!("EVENT POP FRONT {selfname}");
|
||||
trace_ingest_event!("EVENT TIMESTAMP FRONT {:?} {selfname}", ts0);
|
||||
trace_ingest_event!("EVENT TIMESTAMP FRONT {:?} {}", ts0, selfname);
|
||||
if ts0 < self.active_beg {
|
||||
panic!("should never get here");
|
||||
} else {
|
||||
@@ -284,7 +283,7 @@ where
|
||||
lst: LstMut<EVT>,
|
||||
) -> Result<(), Error> {
|
||||
let selfname = "ingest_with_lst";
|
||||
trace_ingest_container!("{selfname} len {}", evs.len());
|
||||
trace_ingest_container!("{} len {}", selfname, evs.len());
|
||||
let b = &mut self.inner_b;
|
||||
if let Some(minmax) = self.minmax.as_mut() {
|
||||
b.ingest_with_lst_minmax(evs, lst, minmax)
|
||||
@@ -292,7 +291,7 @@ where
|
||||
let mut run_ingest_with_lst_minmax = false;
|
||||
let _ = run_ingest_with_lst_minmax;
|
||||
if let Some(ev) = evs.next() {
|
||||
trace_event_next!("EVENT POP FRONT {:?} {selfname:30}", ev);
|
||||
trace_event_next!("EVENT POP FRONT {:?} {:30}", ev, selfname);
|
||||
let beg = b.active_beg;
|
||||
let end = b.active_end;
|
||||
if ev.ts < beg {
|
||||
@@ -340,7 +339,8 @@ where
|
||||
let selfname = "reset_01";
|
||||
let b = &mut self.inner_b;
|
||||
trace_cycle!(
|
||||
"{selfname} active_end {:?} filled_until {:?}",
|
||||
"{} active_end {:?} filled_until {:?}",
|
||||
selfname,
|
||||
b.active_end,
|
||||
b.filled_until
|
||||
);
|
||||
@@ -368,7 +368,7 @@ where
|
||||
// TODO what logic can I save here? To output a bin I need to have min, max, lst.
|
||||
let b = &mut self.inner_b;
|
||||
let minmax = self.minmax.get_or_insert_with(|| {
|
||||
trace_cycle!("{selfname} minmax not yet set");
|
||||
trace_cycle!("{} minmax not yet set", selfname);
|
||||
(lst.0.clone(), lst.0.clone())
|
||||
});
|
||||
{
|
||||
@@ -472,7 +472,7 @@ where
|
||||
|
||||
fn ingest_without_lst(&mut self, evs: &mut ContainerEventsTakeUpTo<EVT>) -> Result<(), Error> {
|
||||
let selfname = "ingest_without_lst";
|
||||
trace_ingest_container!("{selfname} len {}", evs.len());
|
||||
trace_ingest_container!("{} len {}", selfname, evs.len());
|
||||
let mut run_ingest_with_lst = false;
|
||||
let _ = run_ingest_with_lst;
|
||||
if let Some(ev) = evs.next() {
|
||||
@@ -498,10 +498,8 @@ where
|
||||
// and with respect to the last container, if any.
|
||||
fn ingest_ordered(&mut self, evs: &mut ContainerEventsTakeUpTo<EVT>) -> Result<(), Error> {
|
||||
let selfname = "ingest_ordered";
|
||||
trace_ingest_container!(
|
||||
"------------------------------------\n{selfname} len {}",
|
||||
evs.len()
|
||||
);
|
||||
trace_ingest_container!("--------------------------------------------------");
|
||||
trace_ingest_container!("{} len {}", selfname, evs.len());
|
||||
if let Some(lst) = self.lst.as_mut() {
|
||||
self.inner_a.ingest_with_lst(evs, LstMut(lst))
|
||||
} else {
|
||||
|
||||
@@ -1,9 +1,9 @@
|
||||
use crate::binning::container_events::ContainerEvents;
|
||||
use crate::binning::container_events::PulsedVal;
|
||||
use crate::log::*;
|
||||
use crate::Error;
|
||||
use daqbuf_err as err;
|
||||
use items_0::timebin::BinningggContainerEventsDyn;
|
||||
use netpod::log::*;
|
||||
use netpod::EnumVariant;
|
||||
use netpod::ScalarType;
|
||||
use netpod::Shape;
|
||||
@@ -52,7 +52,7 @@ pub fn empty_events_dyn_ev(
|
||||
}
|
||||
}
|
||||
Shape::Image(..) => {
|
||||
error!("TODO empty_events_dyn_ev {scalar_type:?} {shape:?}");
|
||||
error!("TODO empty_events_dyn_ev {:?} {:?}", scalar_type, shape);
|
||||
err::todoval()
|
||||
}
|
||||
};
|
||||
@@ -85,7 +85,7 @@ pub fn empty_events_pulsed_dyn_ev(
|
||||
}
|
||||
Shape::Wave(..) => {
|
||||
use ScalarType::*;
|
||||
type K<T> = ContainerEvents<Vec<T>>;
|
||||
type K<T> = ContainerEvents<PulsedVal<Vec<T>>>;
|
||||
match scalar_type {
|
||||
U8 => Box::new(K::<u8>::new()),
|
||||
U16 => Box::new(K::<u16>::new()),
|
||||
@@ -103,7 +103,7 @@ pub fn empty_events_pulsed_dyn_ev(
|
||||
}
|
||||
}
|
||||
Shape::Image(..) => {
|
||||
error!("TODO empty_events_dyn_ev {scalar_type:?} {shape:?}");
|
||||
error!("TODO empty_events_dyn_ev {:?} {:?}", scalar_type, shape);
|
||||
err::todoval()
|
||||
}
|
||||
};
|
||||
|
||||
@@ -21,10 +21,7 @@ use items_0::isodate::IsoDateTime;
|
||||
use std::fmt;
|
||||
|
||||
mod log {
|
||||
#[cfg(not(test))]
|
||||
pub use netpod::log::*;
|
||||
#[cfg(test)]
|
||||
pub use netpod::log_direct::*;
|
||||
pub use netpod::log_macros_branch::*;
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq)]
|
||||
|
||||
@@ -10,6 +10,32 @@ where
|
||||
Box::pin(inp)
|
||||
}
|
||||
|
||||
pub fn new_events_gen_dim0_f32_v00(range: NanoRange) -> impl Iterator<Item = ContainerEvents<f32>> {
|
||||
let dt = 1000 * 1000 * 10;
|
||||
let beg = range.beg();
|
||||
let end = range.end();
|
||||
let mut ts = beg - dt;
|
||||
std::iter::repeat(0)
|
||||
.map(move |_| {
|
||||
type T = f32;
|
||||
let mut c = ContainerEvents::new();
|
||||
loop {
|
||||
let ts1 = TsNano::from_ns(ts);
|
||||
if ts1.ns() >= end {
|
||||
break;
|
||||
}
|
||||
let val = (ts / 1_000_000) as T + 0.1;
|
||||
c.push_back(ts1, val);
|
||||
ts += dt;
|
||||
if c.len() >= 8 {
|
||||
break;
|
||||
}
|
||||
}
|
||||
c
|
||||
})
|
||||
.take_while(|c| c.len() != 0)
|
||||
}
|
||||
|
||||
pub fn new_events_gen_dim1_f32_v00(
|
||||
range: NanoRange,
|
||||
) -> impl Iterator<Item = ContainerEvents<Vec<f32>>> {
|
||||
|
||||
Reference in New Issue
Block a user