From abca73836db5f9db5e8e54c91b52d7d8614adcc8 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Fri, 21 Mar 2025 14:49:12 +0100 Subject: [PATCH] Check monotonic timestamp common for all rt --- serieswriter/src/ratelimitwriter.rs | 17 ++--------------- serieswriter/src/rtwriter.rs | 22 ++++++++++++++++++---- 2 files changed, 20 insertions(+), 19 deletions(-) diff --git a/serieswriter/src/ratelimitwriter.rs b/serieswriter/src/ratelimitwriter.rs index 9b30e66..5e431b2 100644 --- a/serieswriter/src/ratelimitwriter.rs +++ b/serieswriter/src/ratelimitwriter.rs @@ -86,10 +86,10 @@ where // Decide whether we want to write. // TODO catch already in CaConn the cases when the IOC-timestamp did not change. let det = self.do_trace_detail; - let tsl = self.last_insert_ts.clone(); let dbgname = &self.dbgname; let sid = &self.series; let min_quiet = 1000 * self.min_quiet.as_secs() + self.min_quiet.subsec_millis() as u64; + let tsl = self.last_insert_ts.clone(); let ts = tsev; if false { trace_rt_decision!( @@ -104,20 +104,7 @@ where ); } let do_write = { - if ts == tsl { - trace_rt_decision!(det, "{dbgname} {sid} ignore, because same time {ts:?} {tsl:?}"); - false - } else if ts < tsl { - trace_rt_decision!( - det, - "{} {} ignore, because ts_local rewind {:?} {:?}", - dbgname, - sid, - ts, - tsl - ); - false - } else if !self.is_polled && ts.ms() < tsl.ms() + min_quiet { + if !self.is_polled && ts.ms() < tsl.ms() + min_quiet { trace_rt_decision!(det, "{dbgname} {sid} ignore, because not min quiet {ts:?} {tsl:?}"); false } else if self.is_polled && ts.ms() + 800 < tsl.ms() + min_quiet { diff --git a/serieswriter/src/rtwriter.rs b/serieswriter/src/rtwriter.rs index 5a8ab2d..24a1a08 100644 --- a/serieswriter/src/rtwriter.rs +++ b/serieswriter/src/rtwriter.rs @@ -157,13 +157,27 @@ where ) -> Result { let det = self.do_trace_detail; trace_emit!(det, "write {:?}", item.ts()); - // TODO - // Optimize for the common case that we only write into one of the stores. - // Make the decision first, based on ref, then clone only as required. let res_lt; let res_mt; let res_st; - if self + let tsl = self.last_insert_ts.clone(); + if tsev < tsl { + trace_rt_decision!( + det, + "{} ignore, because rewind time {:?} {:?}", + self.series, + tsev, + tsl + ); + res_lt = WriteRtRes::default(); + res_mt = WriteRtRes::default(); + res_st = WriteRtRes::default(); + } else if tsev == tsl { + trace_rt_decision!(det, "{} ignore, because same time {:?} {:?}", self.series, tsev, tsl); + res_lt = WriteRtRes::default(); + res_mt = WriteRtRes::default(); + res_st = WriteRtRes::default(); + } else if self .last_insert_val .as_ref() .map(|k| item.has_change(k))