Fix timebin no last before
This commit is contained in:
@@ -1,6 +1,6 @@
|
||||
# Binned Data
|
||||
|
||||
Binned data can be fetched like this:
|
||||
Binned data can be fetched this way:
|
||||
|
||||
```bash
|
||||
curl "https://data-api.psi.ch/api/4/binned?backend=sf-databuffer&channelName=S10BC01-DBPM010:Q1&begDate=2024-02-15T00:00:00Z&endDate=2024-02-15T12:00:00Z&binWidth=5m"
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "daqbuffer"
|
||||
version = "0.5.3-aa.2"
|
||||
version = "0.5.3-aa.3"
|
||||
authors = ["Dominik Werder <dominik.werder@gmail.com>"]
|
||||
edition = "2021"
|
||||
|
||||
|
||||
@@ -13,7 +13,7 @@ where
|
||||
http::StatusCode: std::convert::TryFrom<T>,
|
||||
<http::StatusCode as std::convert::TryFrom<T>>::Error: Into<http::Error>,
|
||||
{
|
||||
Response::builder().status(status)
|
||||
Response::builder().status(status).header("daqbuf-api-version", "1.0")
|
||||
}
|
||||
|
||||
pub trait ToPublicResponse {
|
||||
|
||||
@@ -1114,6 +1114,14 @@ impl<STY: ScalarOps> TimeBinnerCommonV0Trait for BinsDim0TimeBinner<STY> {
|
||||
fn common_agg_ingest(&mut self, item: &mut Self::Input) {
|
||||
self.agg.ingest(item)
|
||||
}
|
||||
|
||||
fn common_has_lst(&self) -> bool {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn common_feed_lst(&mut self, item: &mut Self::Input) {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
||||
impl<NTY: ScalarOps> TimeBinner for BinsDim0TimeBinner<NTY> {
|
||||
|
||||
@@ -57,19 +57,19 @@ use std::fmt;
|
||||
use std::mem;
|
||||
|
||||
#[allow(unused)]
|
||||
macro_rules! trace_init { ($($arg:tt)*) => ( if false { trace!($($arg)*); }) }
|
||||
macro_rules! trace_init { ($($arg:tt)*) => ( if true { trace!($($arg)*); }) }
|
||||
|
||||
#[allow(unused)]
|
||||
macro_rules! trace_ingest { ($($arg:tt)*) => ( if false { trace!($($arg)*); }) }
|
||||
macro_rules! trace_ingest_item { ($($arg:tt)*) => ( if true { trace!($($arg)*); }) }
|
||||
|
||||
#[allow(unused)]
|
||||
macro_rules! trace_ingest_item { ($($arg:tt)*) => ( if false { trace!($($arg)*); }) }
|
||||
macro_rules! trace_ingest_event { ($($arg:tt)*) => ( if false { trace!($($arg)*); }) }
|
||||
|
||||
#[allow(unused)]
|
||||
macro_rules! trace2 { ($($arg:tt)*) => ( if false { trace!($($arg)*); }) }
|
||||
|
||||
#[allow(unused)]
|
||||
macro_rules! trace_binning { ($($arg:tt)*) => ( if false { trace!($($arg)*); }) }
|
||||
macro_rules! trace_binning { ($($arg:tt)*) => ( if true { trace!($($arg)*); }) }
|
||||
|
||||
#[allow(unused)]
|
||||
macro_rules! debug_ingest { ($($arg:tt)*) => ( if true { trace!($($arg)*); }) }
|
||||
@@ -575,24 +575,30 @@ impl<STY: ScalarOps> TimeAggregatorCommonV0Trait for EventsDim0Aggregator<STY> {
|
||||
}
|
||||
|
||||
fn common_ingest_one_before(&mut self, item: &Self::Input, j: usize) {
|
||||
trace_ingest!("{} common_ingest_one_before {:?} {:?}", Self::type_name(), j, item,);
|
||||
trace_ingest_item!("{} common_ingest_one_before {:?} {:?}", Self::type_name(), j, item);
|
||||
self.apply_min_max_lst(item.values[j].clone());
|
||||
self.last_ts = item.tss[j];
|
||||
}
|
||||
|
||||
fn common_ingest_range(&mut self, item: &Self::Input, r: core::ops::Range<usize>) {
|
||||
trace_ingest_item!(
|
||||
"{} common_ingest_range {:?} {:?} lst {:?}",
|
||||
Self::type_name(),
|
||||
r,
|
||||
item,
|
||||
self.minmaxlst
|
||||
);
|
||||
// panic!("common_ingest_range");
|
||||
let beg = self.range.beg_u64();
|
||||
for (&ts, val) in item.tss.range(r.clone()).zip(item.values.range(r)) {
|
||||
if ts > beg {
|
||||
self.apply_event_time_weight(ts);
|
||||
} else {
|
||||
trace_ingest_item!("{} common_ingest_range init minmaxlst {:?}", Self::type_name(), val);
|
||||
self.apply_min_max_lst(val.clone());
|
||||
}
|
||||
self.count += 1;
|
||||
self.last_ts = ts;
|
||||
if let Some(minmaxlst) = self.minmaxlst.as_mut() {
|
||||
minmaxlst.2 = val.clone();
|
||||
} else {
|
||||
self.minmaxlst = Some((val.clone(), val.clone(), val.clone()));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -621,7 +627,7 @@ impl<STY: ScalarOps> EventsDim0Aggregator<STY> {
|
||||
|
||||
// TODO reduce clone.. optimize via more traits to factor the trade-offs?
|
||||
fn apply_min_max_lst(&mut self, val: STY) {
|
||||
trace_ingest!(
|
||||
trace_ingest_event!(
|
||||
"apply_min_max_lst val {:?} count {} sumc {:?} minmaxlst {:?}",
|
||||
val,
|
||||
self.count,
|
||||
@@ -655,7 +661,7 @@ impl<STY: ScalarOps> EventsDim0Aggregator<STY> {
|
||||
|
||||
fn apply_event_time_weight(&mut self, px: u64) {
|
||||
if let Some((_, _, v)) = self.minmaxlst.as_ref() {
|
||||
trace_ingest!("apply_event_time_weight with v {v:?}");
|
||||
trace_ingest_event!("apply_event_time_weight with v {v:?}");
|
||||
let vf = v.as_prim_f32_b();
|
||||
let v2 = v.clone();
|
||||
self.apply_min_max_lst(v2);
|
||||
@@ -677,7 +683,7 @@ impl<STY: ScalarOps> EventsDim0Aggregator<STY> {
|
||||
}
|
||||
self.int_ts = px;
|
||||
} else {
|
||||
debug_ingest!("apply_event_time_weight NO VALUE");
|
||||
debug_ingest!("apply_event_time_weight minmaxlst None");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -787,10 +793,10 @@ impl<STY: ScalarOps> TimeBinnableTypeAggregator for EventsDim0Aggregator<STY> {
|
||||
}
|
||||
|
||||
fn ingest(&mut self, item: &Self::Input) {
|
||||
trace_ingest!("{} ingest {} events", Self::type_name(), item.len());
|
||||
trace_ingest_item!("{} ingest {} events", Self::type_name(), item.len());
|
||||
if false {
|
||||
for (i, &ts) in item.tss.iter().enumerate() {
|
||||
trace_ingest!("{} ingest {:6} {:20}", Self::type_name(), i, ts);
|
||||
trace_ingest_event!("{} ingest {:6} {:20}", Self::type_name(), i, ts);
|
||||
}
|
||||
}
|
||||
if self.do_time_weight {
|
||||
@@ -1176,11 +1182,23 @@ impl<STY: ScalarOps> TimeBinnerCommonV0Trait for EventsDim0TimeBinner<STY> {
|
||||
fn common_agg_ingest(&mut self, item: &mut Self::Input) {
|
||||
self.agg.ingest(item)
|
||||
}
|
||||
|
||||
fn common_has_lst(&self) -> bool {
|
||||
self.agg.minmaxlst.is_some()
|
||||
}
|
||||
|
||||
fn common_feed_lst(&mut self, item: &mut Self::Input) {
|
||||
if self.agg.minmaxlst.is_none() {
|
||||
if let Some(val) = item.values.front() {
|
||||
self.agg.apply_min_max_lst(val.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<STY: ScalarOps> TimeBinner for EventsDim0TimeBinner<STY> {
|
||||
fn ingest(&mut self, item: &mut dyn TimeBinnable) {
|
||||
trace_ingest!("{}::ingest {:?}", Self::type_name(), item);
|
||||
trace_ingest_item!("<{} as TimeBinner>::ingest {:?}", Self::type_name(), item);
|
||||
TimeBinnerCommonV0Func::ingest(self, item)
|
||||
}
|
||||
|
||||
|
||||
@@ -34,6 +34,8 @@ pub trait TimeBinnerCommonV0Trait {
|
||||
fn common_take_or_append_all_from(&mut self, item: Self::Output);
|
||||
fn common_result_reset(&mut self, range: Option<SeriesRange>) -> Self::Output;
|
||||
fn common_agg_ingest(&mut self, item: &mut Self::Input);
|
||||
fn common_has_lst(&self) -> bool;
|
||||
fn common_feed_lst(&mut self, item: &mut Self::Input);
|
||||
}
|
||||
|
||||
pub struct TimeBinnerCommonV0Func {}
|
||||
@@ -57,6 +59,22 @@ impl TimeBinnerCommonV0Func {
|
||||
// TODO optimize by remembering at which event array index we have arrived.
|
||||
// That needs modified interfaces which can take and yield the start and latest index.
|
||||
// Or consume the input data.
|
||||
if B::common_has_lst(binner) == false {
|
||||
if let Some(item) = item
|
||||
.as_any_mut()
|
||||
// TODO make statically sure that we attempt to cast to the correct type here:
|
||||
.downcast_mut::<B::Input>()
|
||||
{
|
||||
B::common_feed_lst(binner, item);
|
||||
} else {
|
||||
error!(
|
||||
"{self_name}::ingest unexpected item type {} expected {}",
|
||||
item.type_name(),
|
||||
any::type_name::<B::Input>()
|
||||
);
|
||||
return;
|
||||
}
|
||||
}
|
||||
loop {
|
||||
while item.starts_after(B::common_range_current(binner)) {
|
||||
trace_ingest_item!("{self_name} ignore item and cycle starts_after");
|
||||
@@ -105,6 +123,7 @@ impl TimeBinnerCommonV0Func {
|
||||
item.type_name(),
|
||||
any::type_name::<B::Input>()
|
||||
);
|
||||
return;
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user