Decide best-matching series to return, enable continue-at

This commit is contained in:
Dominik Werder
2024-02-22 12:29:54 +01:00
parent 76d7e3b4f3
commit 6171f901ad
33 changed files with 558 additions and 375 deletions
+9
View File
@@ -399,6 +399,10 @@ pub struct BinsDim0Collector<NTY> {
}
impl<NTY> BinsDim0Collector<NTY> {
pub fn self_name() -> &'static str {
any::type_name::<Self>()
}
pub fn new() -> Self {
Self {
timed_out: false,
@@ -439,6 +443,11 @@ impl<NTY: ScalarOps> CollectorType for BinsDim0Collector<NTY> {
self.timed_out = true;
}
fn set_continue_at_here(&mut self) {
debug!("{}::set_continue_at_here", Self::self_name());
// TODO for bins, do nothing: either we have all bins or not.
}
fn result(
&mut self,
_range: Option<SeriesRange>,
+21 -16
View File
@@ -386,6 +386,10 @@ pub struct BinsXbinDim0Collector<NTY> {
}
impl<NTY> BinsXbinDim0Collector<NTY> {
pub fn self_name() -> &'static str {
any::type_name::<Self>()
}
pub fn new() -> Self {
Self {
vals: BinsXbinDim0::empty(),
@@ -424,6 +428,11 @@ impl<NTY: ScalarOps> CollectorType for BinsXbinDim0Collector<NTY> {
self.timed_out = true;
}
fn set_continue_at_here(&mut self) {
debug!("{}::set_continue_at_here", Self::self_name());
// TODO for bins, do nothing: either we have all bins or not.
}
fn result(
&mut self,
_range: std::option::Option<SeriesRange>,
@@ -435,23 +444,19 @@ impl<NTY: ScalarOps> CollectorType for BinsXbinDim0Collector<NTY> {
0
};
let bin_count = self.vals.ts1s.len() as u32;
let (missing_bins, continue_at, finished_at) = if self.range_final {
if bin_count < bin_count_exp {
match self.vals.ts2s.back() {
Some(&k) => {
let missing_bins = bin_count_exp - bin_count;
let continue_at = IsoDateTime(Utc.timestamp_nanos(k as i64));
let u = k + (k - self.vals.ts1s.back().unwrap()) * missing_bins as u64;
let finished_at = IsoDateTime(Utc.timestamp_nanos(u as i64));
(missing_bins, Some(continue_at), Some(finished_at))
}
None => {
warn!("can not determine continue-at parameters");
(0, None, None)
}
let (missing_bins, continue_at, finished_at) = if bin_count < bin_count_exp {
match self.vals.ts2s.back() {
Some(&k) => {
let missing_bins = bin_count_exp - bin_count;
let continue_at = IsoDateTime(Utc.timestamp_nanos(k as i64));
let u = k + (k - self.vals.ts1s.back().unwrap()) * missing_bins as u64;
let finished_at = IsoDateTime(Utc.timestamp_nanos(u as i64));
(missing_bins, Some(continue_at), Some(finished_at))
}
None => {
warn!("can not determine continue-at parameters");
(0, None, None)
}
} else {
(0, None, None)
}
} else {
(0, None, None)
+29 -8
View File
@@ -1131,14 +1131,24 @@ pub struct ChannelEventsCollector {
coll: Option<Box<dyn Collector>>,
range_complete: bool,
timed_out: bool,
needs_continue_at: bool,
tmp_warned_status: bool,
tmp_error_unknown_type: bool,
}
impl ChannelEventsCollector {
pub fn self_name() -> &'static str {
any::type_name::<Self>()
}
pub fn new() -> Self {
Self {
coll: None,
range_complete: false,
timed_out: false,
needs_continue_at: false,
tmp_warned_status: false,
tmp_error_unknown_type: false,
}
}
}
@@ -1154,19 +1164,24 @@ impl Collector for ChannelEventsCollector {
if let Some(item) = item.as_any_mut().downcast_mut::<ChannelEvents>() {
match item {
ChannelEvents::Events(item) => {
if self.coll.is_none() {
let coll = item.as_ref().as_collectable_with_default_ref().new_collector();
self.coll = Some(coll);
}
let coll = self.coll.as_mut().unwrap();
let coll = self
.coll
.get_or_insert_with(|| item.as_ref().as_collectable_with_default_ref().new_collector());
coll.ingest(item.as_collectable_with_default_mut());
}
ChannelEvents::Status(_) => {
// TODO decide on output format to collect also the connection status events
if !self.tmp_warned_status {
self.tmp_warned_status = true;
warn!("TODO ChannelEventsCollector ChannelEvents::Status");
}
}
}
} else {
error!("ChannelEventsCollector::ingest unexpected item {:?}", item);
if !self.tmp_error_unknown_type {
self.tmp_error_unknown_type = true;
error!("ChannelEventsCollector::ingest unexpected item {:?}", item);
}
}
}
@@ -1178,6 +1193,10 @@ impl Collector for ChannelEventsCollector {
self.timed_out = true;
}
fn set_continue_at_here(&mut self) {
self.needs_continue_at = true;
}
fn result(
&mut self,
range: Option<SeriesRange>,
@@ -1185,6 +1204,7 @@ impl Collector for ChannelEventsCollector {
) -> Result<Box<dyn Collected>, err::Error> {
match self.coll.as_mut() {
Some(coll) => {
coll.set_continue_at_here();
if self.range_complete {
coll.set_range_complete();
}
@@ -1195,8 +1215,9 @@ impl Collector for ChannelEventsCollector {
Ok(res)
}
None => {
error!("nothing collected [caa8d2565]");
Err(err::Error::with_public_msg_no_trace("nothing collected [caa8d2565]"))
let e = err::Error::with_public_msg_no_trace("nothing collected [caa8d2565]");
error!("{e}");
Err(e)
}
}
}
+18 -5
View File
@@ -241,6 +241,7 @@ pub struct EventsDim0Collector<STY> {
vals: EventsDim0<STY>,
range_final: bool,
timed_out: bool,
needs_continue_at: bool,
}
impl<STY> EventsDim0Collector<STY> {
@@ -249,10 +250,12 @@ impl<STY> EventsDim0Collector<STY> {
}
pub fn new() -> Self {
debug!("EventsDim0Collector NEW");
Self {
vals: EventsDim0::empty(),
range_final: false,
timed_out: false,
needs_continue_at: false,
}
}
}
@@ -392,6 +395,11 @@ impl<STY: ScalarOps> CollectorType for EventsDim0Collector<STY> {
fn set_timed_out(&mut self) {
self.timed_out = true;
self.needs_continue_at = true;
}
fn set_continue_at_here(&mut self) {
self.needs_continue_at = true;
}
fn result(
@@ -399,27 +407,32 @@ impl<STY: ScalarOps> CollectorType for EventsDim0Collector<STY> {
range: Option<SeriesRange>,
_binrange: Option<BinnedRangeEnum>,
) -> Result<Self::Output, Error> {
debug!(
"{} result() needs_continue_at {}",
Self::self_name(),
self.needs_continue_at
);
// If we timed out, we want to hint the client from where to continue.
// This is tricky: currently, client can not request a left-exclusive range.
// We currently give the timestamp of the last event plus a small delta.
// The amount of the delta must take into account what kind of timestamp precision the client
// can parse and handle.
let vals = &mut self.vals;
let continue_at = if self.timed_out {
let continue_at = if self.needs_continue_at {
if let Some(ts) = vals.tss.back() {
Some(IsoDateTime::from_u64(*ts + MS))
let x = Some(IsoDateTime::from_u64(*ts / MS * MS + MS));
x
} else {
if let Some(range) = &range {
match range {
SeriesRange::TimeRange(x) => Some(IsoDateTime::from_u64(x.beg + SEC)),
SeriesRange::PulseRange(x) => {
error!("TODO emit create continueAt for pulse range");
None
Some(IsoDateTime::from_u64(0))
}
}
} else {
warn!("can not determine continue-at parameters");
None
Some(IsoDateTime::from_u64(0))
}
}
} else {
+9 -2
View File
@@ -200,6 +200,7 @@ pub struct EventsDim1Collector<STY> {
vals: EventsDim1<STY>,
range_final: bool,
timed_out: bool,
needs_continue_at: bool,
}
impl<STY> EventsDim1Collector<STY> {
@@ -212,6 +213,7 @@ impl<STY> EventsDim1Collector<STY> {
vals: EventsDim1::empty(),
range_final: false,
timed_out: false,
needs_continue_at: false,
}
}
}
@@ -356,6 +358,11 @@ impl<STY: ScalarOps> CollectorType for EventsDim1Collector<STY> {
self.timed_out = true;
}
fn set_continue_at_here(&mut self) {
debug!("{}::set_continue_at_here", Self::self_name());
self.needs_continue_at = true;
}
// TODO unify with dim0 case
fn result(
&mut self,
@@ -377,12 +384,12 @@ impl<STY: ScalarOps> CollectorType for EventsDim1Collector<STY> {
SeriesRange::TimeRange(x) => Some(IsoDateTime::from_u64(x.beg + SEC)),
SeriesRange::PulseRange(x) => {
error!("TODO emit create continueAt for pulse range");
None
Some(IsoDateTime::from_u64(0))
}
}
} else {
warn!("can not determine continue-at parameters");
None
Some(IsoDateTime::from_u64(0))
}
}
} else {
+10
View File
@@ -942,14 +942,20 @@ pub struct EventsXbinDim0Collector<NTY> {
vals: EventsXbinDim0<NTY>,
range_final: bool,
timed_out: bool,
needs_continue_at: bool,
}
impl<NTY> EventsXbinDim0Collector<NTY> {
pub fn self_name() -> &'static str {
any::type_name::<Self>()
}
pub fn new() -> Self {
Self {
range_final: false,
timed_out: false,
vals: EventsXbinDim0::empty(),
needs_continue_at: false,
}
}
}
@@ -983,6 +989,10 @@ where
self.timed_out = true;
}
fn set_continue_at_here(&mut self) {
self.needs_continue_at = true;
}
fn result(
&mut self,
range: Option<SeriesRange>,