From 6c3ef33f07717d32a494c71b5cfc77e3304f0c95 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Tue, 1 Oct 2024 14:22:06 +0200 Subject: [PATCH] Fix timebin no last before --- apidoc/src/bins.md | 2 +- crates/daqbuffer/Cargo.toml | 2 +- crates/httpret/src/bodystream.rs | 2 +- crates/items_2/src/binsdim0.rs | 8 +++++ crates/items_2/src/eventsdim0.rs | 50 ++++++++++++++++++++++---------- crates/items_2/src/timebin.rs | 19 ++++++++++++ 6 files changed, 64 insertions(+), 19 deletions(-) diff --git a/apidoc/src/bins.md b/apidoc/src/bins.md index 5899410..0c51b9e 100644 --- a/apidoc/src/bins.md +++ b/apidoc/src/bins.md @@ -1,6 +1,6 @@ # Binned Data -Binned data can be fetched like this: +Binned data can be fetched this way: ```bash curl "https://data-api.psi.ch/api/4/binned?backend=sf-databuffer&channelName=S10BC01-DBPM010:Q1&begDate=2024-02-15T00:00:00Z&endDate=2024-02-15T12:00:00Z&binWidth=5m" diff --git a/crates/daqbuffer/Cargo.toml b/crates/daqbuffer/Cargo.toml index 9eabc57..46148a7 100644 --- a/crates/daqbuffer/Cargo.toml +++ b/crates/daqbuffer/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "daqbuffer" -version = "0.5.3-aa.2" +version = "0.5.3-aa.3" authors = ["Dominik Werder "] edition = "2021" diff --git a/crates/httpret/src/bodystream.rs b/crates/httpret/src/bodystream.rs index 86ad394..7609ad0 100644 --- a/crates/httpret/src/bodystream.rs +++ b/crates/httpret/src/bodystream.rs @@ -13,7 +13,7 @@ where http::StatusCode: std::convert::TryFrom, >::Error: Into, { - Response::builder().status(status) + Response::builder().status(status).header("daqbuf-api-version", "1.0") } pub trait ToPublicResponse { diff --git a/crates/items_2/src/binsdim0.rs b/crates/items_2/src/binsdim0.rs index 883be89..fecf27d 100644 --- a/crates/items_2/src/binsdim0.rs +++ b/crates/items_2/src/binsdim0.rs @@ -1114,6 +1114,14 @@ impl TimeBinnerCommonV0Trait for BinsDim0TimeBinner { fn common_agg_ingest(&mut self, item: &mut Self::Input) { self.agg.ingest(item) } + + fn common_has_lst(&self) -> bool { + todo!() + } + + fn common_feed_lst(&mut self, item: &mut Self::Input) { + todo!() + } } impl TimeBinner for BinsDim0TimeBinner { diff --git a/crates/items_2/src/eventsdim0.rs b/crates/items_2/src/eventsdim0.rs index ed0e65a..cd76288 100644 --- a/crates/items_2/src/eventsdim0.rs +++ b/crates/items_2/src/eventsdim0.rs @@ -57,19 +57,19 @@ use std::fmt; use std::mem; #[allow(unused)] -macro_rules! trace_init { ($($arg:tt)*) => ( if false { trace!($($arg)*); }) } +macro_rules! trace_init { ($($arg:tt)*) => ( if true { trace!($($arg)*); }) } #[allow(unused)] -macro_rules! trace_ingest { ($($arg:tt)*) => ( if false { trace!($($arg)*); }) } +macro_rules! trace_ingest_item { ($($arg:tt)*) => ( if true { trace!($($arg)*); }) } #[allow(unused)] -macro_rules! trace_ingest_item { ($($arg:tt)*) => ( if false { trace!($($arg)*); }) } +macro_rules! trace_ingest_event { ($($arg:tt)*) => ( if false { trace!($($arg)*); }) } #[allow(unused)] macro_rules! trace2 { ($($arg:tt)*) => ( if false { trace!($($arg)*); }) } #[allow(unused)] -macro_rules! trace_binning { ($($arg:tt)*) => ( if false { trace!($($arg)*); }) } +macro_rules! trace_binning { ($($arg:tt)*) => ( if true { trace!($($arg)*); }) } #[allow(unused)] macro_rules! debug_ingest { ($($arg:tt)*) => ( if true { trace!($($arg)*); }) } @@ -575,24 +575,30 @@ impl TimeAggregatorCommonV0Trait for EventsDim0Aggregator { } fn common_ingest_one_before(&mut self, item: &Self::Input, j: usize) { - trace_ingest!("{} common_ingest_one_before {:?} {:?}", Self::type_name(), j, item,); + trace_ingest_item!("{} common_ingest_one_before {:?} {:?}", Self::type_name(), j, item); self.apply_min_max_lst(item.values[j].clone()); self.last_ts = item.tss[j]; } fn common_ingest_range(&mut self, item: &Self::Input, r: core::ops::Range) { + trace_ingest_item!( + "{} common_ingest_range {:?} {:?} lst {:?}", + Self::type_name(), + r, + item, + self.minmaxlst + ); + // panic!("common_ingest_range"); let beg = self.range.beg_u64(); for (&ts, val) in item.tss.range(r.clone()).zip(item.values.range(r)) { if ts > beg { self.apply_event_time_weight(ts); + } else { + trace_ingest_item!("{} common_ingest_range init minmaxlst {:?}", Self::type_name(), val); + self.apply_min_max_lst(val.clone()); } self.count += 1; self.last_ts = ts; - if let Some(minmaxlst) = self.minmaxlst.as_mut() { - minmaxlst.2 = val.clone(); - } else { - self.minmaxlst = Some((val.clone(), val.clone(), val.clone())); - } } } } @@ -621,7 +627,7 @@ impl EventsDim0Aggregator { // TODO reduce clone.. optimize via more traits to factor the trade-offs? fn apply_min_max_lst(&mut self, val: STY) { - trace_ingest!( + trace_ingest_event!( "apply_min_max_lst val {:?} count {} sumc {:?} minmaxlst {:?}", val, self.count, @@ -655,7 +661,7 @@ impl EventsDim0Aggregator { fn apply_event_time_weight(&mut self, px: u64) { if let Some((_, _, v)) = self.minmaxlst.as_ref() { - trace_ingest!("apply_event_time_weight with v {v:?}"); + trace_ingest_event!("apply_event_time_weight with v {v:?}"); let vf = v.as_prim_f32_b(); let v2 = v.clone(); self.apply_min_max_lst(v2); @@ -677,7 +683,7 @@ impl EventsDim0Aggregator { } self.int_ts = px; } else { - debug_ingest!("apply_event_time_weight NO VALUE"); + debug_ingest!("apply_event_time_weight minmaxlst None"); } } @@ -787,10 +793,10 @@ impl TimeBinnableTypeAggregator for EventsDim0Aggregator { } fn ingest(&mut self, item: &Self::Input) { - trace_ingest!("{} ingest {} events", Self::type_name(), item.len()); + trace_ingest_item!("{} ingest {} events", Self::type_name(), item.len()); if false { for (i, &ts) in item.tss.iter().enumerate() { - trace_ingest!("{} ingest {:6} {:20}", Self::type_name(), i, ts); + trace_ingest_event!("{} ingest {:6} {:20}", Self::type_name(), i, ts); } } if self.do_time_weight { @@ -1176,11 +1182,23 @@ impl TimeBinnerCommonV0Trait for EventsDim0TimeBinner { fn common_agg_ingest(&mut self, item: &mut Self::Input) { self.agg.ingest(item) } + + fn common_has_lst(&self) -> bool { + self.agg.minmaxlst.is_some() + } + + fn common_feed_lst(&mut self, item: &mut Self::Input) { + if self.agg.minmaxlst.is_none() { + if let Some(val) = item.values.front() { + self.agg.apply_min_max_lst(val.clone()); + } + } + } } impl TimeBinner for EventsDim0TimeBinner { fn ingest(&mut self, item: &mut dyn TimeBinnable) { - trace_ingest!("{}::ingest {:?}", Self::type_name(), item); + trace_ingest_item!("<{} as TimeBinner>::ingest {:?}", Self::type_name(), item); TimeBinnerCommonV0Func::ingest(self, item) } diff --git a/crates/items_2/src/timebin.rs b/crates/items_2/src/timebin.rs index e2b157d..5a6b961 100644 --- a/crates/items_2/src/timebin.rs +++ b/crates/items_2/src/timebin.rs @@ -34,6 +34,8 @@ pub trait TimeBinnerCommonV0Trait { fn common_take_or_append_all_from(&mut self, item: Self::Output); fn common_result_reset(&mut self, range: Option) -> Self::Output; fn common_agg_ingest(&mut self, item: &mut Self::Input); + fn common_has_lst(&self) -> bool; + fn common_feed_lst(&mut self, item: &mut Self::Input); } pub struct TimeBinnerCommonV0Func {} @@ -57,6 +59,22 @@ impl TimeBinnerCommonV0Func { // TODO optimize by remembering at which event array index we have arrived. // That needs modified interfaces which can take and yield the start and latest index. // Or consume the input data. + if B::common_has_lst(binner) == false { + if let Some(item) = item + .as_any_mut() + // TODO make statically sure that we attempt to cast to the correct type here: + .downcast_mut::() + { + B::common_feed_lst(binner, item); + } else { + error!( + "{self_name}::ingest unexpected item type {} expected {}", + item.type_name(), + any::type_name::() + ); + return; + } + } loop { while item.starts_after(B::common_range_current(binner)) { trace_ingest_item!("{self_name} ignore item and cycle starts_after"); @@ -105,6 +123,7 @@ impl TimeBinnerCommonV0Func { item.type_name(), any::type_name::() ); + return; }; } }