From 7aba0683247270830912326d763d3a4ba1b77c4a Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Wed, 9 Jul 2025 12:44:55 +0200 Subject: [PATCH] Clean up some metrics counters --- .cargo/cargo-lock | 30 +-- daqingest/Cargo.toml | 2 +- netfetch/src/ca/conn.rs | 405 +++++++++++++++------------- netfetch/src/conf.rs | 58 ++-- serieswriter/src/ratelimitwriter.rs | 31 ++- serieswriter/src/rtwriter.rs | 106 +++++++- stats/mettdecl.rs | 35 ++- 7 files changed, 404 insertions(+), 263 deletions(-) diff --git a/.cargo/cargo-lock b/.cargo/cargo-lock index bf6c636..200afda 100644 --- a/.cargo/cargo-lock +++ b/.cargo/cargo-lock @@ -736,7 +736,7 @@ dependencies = [ [[package]] name = "daqingest" -version = "0.3.0" +version = "0.3.0-ab.1" dependencies = [ "async-channel", "autoerr", @@ -1257,9 +1257,9 @@ dependencies = [ [[package]] name = "hyper-util" -version = "0.1.14" +version = "0.1.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dc2fdfdbff08affe55bb779f33b053aa1fe5dd5b54c257343c17edfa55711bdb" +checksum = "7f66d5bd4c6f02bf0542fad85d626775bab9258cf795a4256dcaf3161114d1df" dependencies = [ "bytes", "futures-core", @@ -2155,21 +2155,21 @@ dependencies = [ [[package]] name = "scylla" -version = "1.2.0" +version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1f4d5b7fe3cb3e140d374238f5f916eb810053562287a01d66ac685e305c166f" +checksum = "221bcc7d06d8eddb9f1152e7955c4965950a6b93666b40797a9ce78624f5a4d2" dependencies = [ "arc-swap", "async-trait", "bytes", "chrono", - "dashmap 5.5.3", + "dashmap 6.1.0", "futures", - "hashbrown 0.14.5", + "hashbrown 0.15.4", "itertools 0.14.0", "rand 0.9.1", "rand_pcg 0.9.0", - "scylla-cql 1.2.0", + "scylla-cql 1.3.0", "smallvec", "socket2", "thiserror", @@ -2199,22 +2199,22 @@ dependencies = [ [[package]] name = "scylla-cql" -version = "1.2.0" +version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "507db4914c625c86d32c5c00ed1add75eaf966a2d4ba9772b601b2563701df58" +checksum = "58b3e593a1cb468a39f7d51d6971b462a22672f22bc5b6b0dab5426acd48189a" dependencies = [ "byteorder", "bytes", "chrono", "itertools 0.14.0", "lz4_flex", - "scylla-macros 1.2.0", + "scylla-macros 1.3.0", "snap", "stable_deref_trait", "thiserror", "tokio", "uuid", - "yoke 0.7.5", + "yoke 0.8.0", ] [[package]] @@ -2231,9 +2231,9 @@ dependencies = [ [[package]] name = "scylla-macros" -version = "1.2.0" +version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8efb4b519ab70556c8d3adfd4f192c635d0c7c72d4f125d2b79713742c98f39d" +checksum = "fcd4e8ce08ba975bdbff47f6bc16f4a87f0c852866baaba5947e29f58e7ce4df" dependencies = [ "darling", "proc-macro2", @@ -2254,7 +2254,7 @@ dependencies = [ "daqbuf-series", "futures-util", "pin-project", - "scylla 1.2.0", + "scylla 1.3.0", "serde", "smallvec", "stats", diff --git a/daqingest/Cargo.toml b/daqingest/Cargo.toml index 6e39cb6..b3f148d 100644 --- a/daqingest/Cargo.toml +++ b/daqingest/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "daqingest" -version = "0.3.0-ab.1" +version = "0.3.0" authors = ["Dominik Werder "] edition = "2024" diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index 255b3c6..5545752 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -1,6 +1,5 @@ mod enumfetch; -use crate::ca::connset::CaConnSet; use crate::conf::ChannelConfig; use crate::metrics::status::StorageUsage; use crate::throttletrace::ThrottleTrace; @@ -48,7 +47,6 @@ use scywriiq::AccountingRecv; use scywriiq::ChannelStatusItem; use scywriiq::ConnectionStatus; use scywriiq::ConnectionStatusItem; -use scywriiq::MspItem; use scywriiq::QueryItem; use scywriiq::ShutdownReason; use serde::Serialize; @@ -318,6 +316,27 @@ struct Monitoring2PassiveState { tsbeg: Instant, #[serde(with = "serde_Instant_elapsed_ms")] ts_silence_read_next: Instant, + manual_poll_on_quiet_after_sec: u16, +} + +impl Monitoring2PassiveState { + fn new(tsnow: Instant, rng: &mut Xoshiro128PlusPlus) -> Self { + let mut ret = Self { + tsbeg: tsnow, + ts_silence_read_next: tsnow + CaConn::silence_read_next_ivl_rng(rng), + manual_poll_on_quiet_after_sec: 300, + }; + ret.manual_poll_on_quiet_after_reset_next(rng); + ret + } + + fn manual_poll_on_quiet_after(&self) -> Duration { + Duration::from_secs(self.manual_poll_on_quiet_after_sec as u64) + } + + fn manual_poll_on_quiet_after_reset_next(&mut self, rng: &mut Xoshiro128PlusPlus) { + self.manual_poll_on_quiet_after_sec = 250 + 0x3f & rng.next_u32() as u16; + } } #[derive(Debug, Clone, Serialize)] @@ -336,7 +355,6 @@ enum Monitoring2State { pub enum MonitorReadCmp { Equal, DiffTime, - DiffTimeValue, DiffValue, } @@ -408,6 +426,7 @@ struct WritableState { writer: CaRtWriter, binwriter: BinWriter, reading: ReadingState, + manual_poll_on_quiet_after_sec: u16, } #[derive(Debug, Clone, Serialize)] @@ -1089,7 +1108,19 @@ impl<'a> EventAddIngestRefobj<'a> { if let Some(binwriter) = self.binwriter.as_mut() { binwriter.ingest(tsev, val_for_agg, self.iqdqs)?; } - self.mett.ts_msp_reput_onevent().add(wres.msp_rewrite() as u32); + self.mett.ts_msp_reput_onevent().add(wres.msp_rewrite() as _); + self.mett + .writer_ignore_rewind_time() + .add(wres.ignore_rewind_time() as _); + self.mett.writer_ignore_same_time().add(wres.ignore_same_time() as _); + self.mett.writer_ignore_same_value().add(wres.ignore_same_value() as _); + self.mett + .writer_ignore_monitor_not_min_quiet() + .add(wres.ignore_monitor_not_min_quiet() as _); + self.mett + .writer_ignore_poll_not_min_quiet() + .add(wres.ignore_poll_not_min_quiet() as _); + self.mett.writer_ignore_rate_cap().add(wres.ignore_rate_cap() as _); } if false { // TODO record stats on drop with the new filter @@ -1823,6 +1854,7 @@ impl CaConn { poll_ivl: ivl, tick: PollTickState::Idle(PollTickStateIdle { next }), }), + manual_poll_on_quiet_after_sec: 300, }; conf.state = ChannelState::Writable(created_state); Ok(()) @@ -1861,6 +1893,7 @@ impl CaConn { tsbeg: self.poll_tsnow, subid, }), + manual_poll_on_quiet_after_sec: 300, }; conf.state = ChannelState::Writable(created_state); Ok(()) @@ -2222,10 +2255,7 @@ impl CaConn { st.reading = ReadingState::Monitoring(MonitoringState { tsbeg: tsnow, subid: st2.subid, - mon2state: Monitoring2State::Passive(Monitoring2PassiveState { - tsbeg: tsnow, - ts_silence_read_next: tsnow + Self::silence_read_next_ivl_rng(&mut self.rng), - }), + mon2state: Monitoring2State::Passive(Monitoring2PassiveState::new(tsnow, &mut self.rng)), monitoring_event_last: Some(ev.clone()), last_comparisons: VecDeque::new(), }); @@ -2383,165 +2413,38 @@ impl CaConn { Err(Error::FutLogic) } } else { - if let Some(cid) = self.read_ioids.get(&ioid) { - let (ch_s, ch_wrst, ch_conf) = if let Some(x) = self.channels.get_mut(cid) { - (&mut x.state, &mut x.wrst, &x.conf) - } else { - warn!("handle_read_notify_res can not find channel for {:?} {:?}", cid, ioid); - return Ok(()); - }; - match ch_s { - ChannelState::Writable(st) => { - if st.channel.sid != sid_ev { - // TODO count for metrics - // warn!("mismatch in ReadNotifyRes {:?} {:?}", st.channel.sid, sid_ev); - } - let stnow = self.tmp_ts_poll; - let crst = &mut st.channel; - let stwin_ts = stnow.duration_since(std::time::UNIX_EPOCH).unwrap().as_secs() / 1; - if crst.stwin_ts != stwin_ts { - crst.stwin_ts = stwin_ts; - crst.stwin_count = 0; - } - { - crst.stwin_count += 1; - crst.stwin_bytes += ev.payload_len; - } - match &mut st.reading { - ReadingState::Polling(st2) => match &mut st2.tick { - PollTickState::Idle(_) => { - self.mett.recv_read_notify_while_polling_idle().inc(); - } - PollTickState::Wait(st3) => { - if self.read_ioids.remove(&st3.ioid).is_some() { - self.mett.ioid_read_done().inc(); - } else { - self.mett.ioid_read_error_not_found().inc(); - } - let dt = tsnow.saturating_duration_since(st3.since); - self.mett.caget_lat().push_dur_100us(dt); - let next = PollTickStateIdle::decide_next(st3.next_backup, st2.poll_ivl, tsnow); - if self.trace_channel_poll { - trace!("make next poll idle at {:?} tsnow {:?}", next, tsnow); - } - st2.tick = PollTickState::Idle(PollTickStateIdle { next }); - let mut robj = EventAddIngestRefobj::from_writable_state( - &self.opts, - &mut self.iqdqs, - st, - &mut self.mett, - &mut self.rng, - ) - .and_channel_status_writer(ch_wrst) - .and_with_use_ioc_time(ch_conf.use_ioc_time()); - robj.event_add_ingest(ev.payload_len, ev.value, tsnow, stnow, tscaproto)?; - } - }, - ReadingState::EnableMonitoring(_) => { - self.mett.recv_read_notify_while_enabling_monitoring().inc(); + if let Some(cid) = self.read_ioids.remove(&ioid) { + if let Some(x) = self.channels.get_mut(&cid) { + let ch_s = &mut x.state; + let ch_wrst = &mut x.wrst; + let ch_conf = &x.conf; + match ch_s { + ChannelState::Writable(st) => { + if st.channel.sid != sid_ev { + self.mett.recv_read_notify_channel_sid_mismatch().inc(); } - ReadingState::Monitoring(st2) => match &mut st2.mon2state { - Monitoring2State::Passive(st3) => { - if self.read_ioids.remove(&ioid).is_some() { - self.mett.ioid_read_done().inc(); - self.mett.recv_read_notify_state_passive_found_ioid().inc(); - } else { - self.mett.ioid_read_error_not_found().inc(); - } - st3.tsbeg = tsnow; - } - Monitoring2State::ReadPending(st3) => { - // We don't check again for `since` here. That's done in timeout checking. - // So we could be here a little beyond timeout but we don't care about that. - if ioid != st3.ioid { - // warn!("IOID mismatch ReadNotifyRes on Monitor Read Pending {ioid:?} {ioid2:?}"); - self.mett.recv_read_notify_state_read_pending_bad_ioid().inc(); - } else { - self.mett.recv_read_notify_state_read_pending().inc(); - } - let read_expected = if let Some(_cid) = self.read_ioids.remove(&ioid) { - self.mett.ioid_read_done().inc(); - true - } else { - self.mett.ioid_read_error_not_found().inc(); - false - }; - st2.mon2state = Monitoring2State::Passive(Monitoring2PassiveState { - tsbeg: tsnow, - ts_silence_read_next: tsnow + Self::silence_read_next_ivl_rng(&mut self.rng), - }); - if read_expected { - self.mett.monitoring_read_expected().inc(); - let item = ChannelStatusItem { - ts: self.tmp_ts_poll, - cssid: st.channel.cssid.clone(), - status: ChannelStatus::MonitoringReadResultExpected, - }; - ch_wrst - .emit_channel_status_item(item, Self::channel_status_qu(&mut self.iqdqs))?; - } else { - self.mett.monitoring_read_unexpected().inc(); - let item = ChannelStatusItem { - ts: self.tmp_ts_poll, - cssid: st.channel.cssid.clone(), - status: ChannelStatus::MonitoringReadResultUnexpected, - }; - ch_wrst - .emit_channel_status_item(item, Self::channel_status_qu(&mut self.iqdqs))?; - } - // NOTE we do not update the last value in this ev handler. - { - if let Some(lst) = st2.monitoring_event_last.as_ref() { - // TODO compare with last monitoring value - if ev.value.meta == lst.value.meta { - st2.last_comparisons - .push_back((UtcDateTime::now(), MonitorReadCmp::Equal)); - } else { - self.mett.monitoring_read_diff_time().inc(); - st2.last_comparisons - .push_back((UtcDateTime::now(), MonitorReadCmp::DiffTime)); - { - let item = ChannelStatusItem { - ts: self.tmp_ts_poll, - cssid: st.channel.cssid.clone(), - status: ChannelStatus::MonitoringReadDiffTime, - }; - ch_wrst.emit_channel_status_item( - item, - Self::channel_status_qu(&mut self.iqdqs), - )?; - } - } - if ev.value.data == lst.value.data { - st2.last_comparisons - .push_back((UtcDateTime::now(), MonitorReadCmp::Equal)); - } else { - self.mett.monitoring_read_diff_value().inc(); - st2.last_comparisons - .push_back((UtcDateTime::now(), MonitorReadCmp::DiffValue)); - { - let item = ChannelStatusItem { - ts: self.tmp_ts_poll, - cssid: st.channel.cssid.clone(), - status: ChannelStatus::MonitoringReadDiffValue, - }; - ch_wrst.emit_channel_status_item( - item, - Self::channel_status_qu(&mut self.iqdqs), - )?; - } - } + let stnow = self.tmp_ts_poll; + let crst = &mut st.channel; + let stwin_ts = stnow.duration_since(std::time::UNIX_EPOCH).unwrap().as_secs() / 1; + if crst.stwin_ts != stwin_ts { + crst.stwin_ts = stwin_ts; + crst.stwin_count = 0; + } + { + crst.stwin_count += 1; + crst.stwin_bytes += ev.payload_len; + } + match &mut st.reading { + ReadingState::Polling(st2) => match &mut st2.tick { + PollTickState::Wait(st3) => { + self.mett.recv_read_notify_poll_wait().inc(); + let dt = tsnow.saturating_duration_since(st3.since); + self.mett.caget_lat().push_dur_100us(dt); + let next = PollTickStateIdle::decide_next(st3.next_backup, st2.poll_ivl, tsnow); + if self.trace_channel_poll { + trace!("make next poll idle at {:?} tsnow {:?}", next, tsnow); } - while st2.last_comparisons.len() > 12 { - st2.last_comparisons.pop_front(); - } - } - // TODO check ADEL to see if monitor should have fired. - // But there is still a small chance that the monitor will just received slightly later. - // More involved check would be to raise a flag, wait for the expected monitor for some - // timeout, and if we get nothing error out. - // TODO read-result-after-monitor-silence - if false { + st2.tick = PollTickState::Idle(PollTickStateIdle { next }); let mut robj = EventAddIngestRefobj::from_writable_state( &self.opts, &mut self.iqdqs, @@ -2552,24 +2455,149 @@ impl CaConn { .and_channel_status_writer(ch_wrst) .and_with_use_ioc_time(ch_conf.use_ioc_time()); robj.event_add_ingest(ev.payload_len, ev.value, tsnow, stnow, tscaproto)?; + Ok(()) } + PollTickState::Idle(_) => { + self.mett.recv_read_notify_poll_idle().inc(); + Ok(()) + } + }, + ReadingState::EnableMonitoring(_) => { + self.mett.recv_read_notify_channel_transition().inc(); + Ok(()) + } + ReadingState::Monitoring(st2) => match &mut st2.mon2state { + Monitoring2State::Passive(st3) => { + self.mett.recv_read_notify_monitor_passive().inc(); + st3.tsbeg = tsnow; + Ok(()) + } + Monitoring2State::ReadPending(st3) => { + // We don't check again for `since` here. That's done in timeout checking. + // So we could be here a little beyond timeout but we don't care about that. + if ioid != st3.ioid { + // warn!("IOID mismatch ReadNotifyRes on Monitor Read Pending {ioid:?} {ioid2:?}"); + self.mett.recv_read_notify_state_read_pending_bad_ioid().inc(); + Ok(()) + } else { + st2.mon2state = Monitoring2State::Passive(Monitoring2PassiveState::new( + tsnow, + &mut self.rng, + )); + { + self.mett.monitoring_read_expected().inc(); + let item = ChannelStatusItem { + ts: self.tmp_ts_poll, + cssid: st.channel.cssid.clone(), + status: ChannelStatus::MonitoringReadResultExpected, + }; + ch_wrst.emit_channel_status_item( + item, + Self::channel_status_qu(&mut self.iqdqs), + )?; + } + // NOTE we do not update the last value in this ev handler. + { + if let Some(lst) = st2.monitoring_event_last.as_ref() { + // TODO compare with last monitoring value + if ev.value.meta == lst.value.meta { + st2.last_comparisons + .push_back((UtcDateTime::now(), MonitorReadCmp::Equal)); + } else { + self.mett.monitoring_read_diff_time().inc(); + st2.last_comparisons + .push_back((UtcDateTime::now(), MonitorReadCmp::DiffTime)); + { + let item = ChannelStatusItem { + ts: self.tmp_ts_poll, + cssid: st.channel.cssid.clone(), + status: ChannelStatus::MonitoringReadDiffTime, + }; + ch_wrst.emit_channel_status_item( + item, + Self::channel_status_qu(&mut self.iqdqs), + )?; + } + } + if ev.value.data == lst.value.data { + st2.last_comparisons + .push_back((UtcDateTime::now(), MonitorReadCmp::Equal)); + } else { + self.mett.monitoring_read_diff_value().inc(); + st2.last_comparisons + .push_back((UtcDateTime::now(), MonitorReadCmp::DiffValue)); + { + let item = ChannelStatusItem { + ts: self.tmp_ts_poll, + cssid: st.channel.cssid.clone(), + status: ChannelStatus::MonitoringReadDiffValue, + }; + ch_wrst.emit_channel_status_item( + item, + Self::channel_status_qu(&mut self.iqdqs), + )?; + } + } + } + while st2.last_comparisons.len() > 12 { + st2.last_comparisons.pop_front(); + } + } + // TODO check ADEL to see if monitor should have fired. + // But there is still a small chance that the monitor will just received slightly later. + // More involved check would be to raise a flag, wait for the expected monitor for some + // timeout, and if we get nothing error out. + // TODO read-result-after-monitor-silence + if false { + let mut robj = EventAddIngestRefobj::from_writable_state( + &self.opts, + &mut self.iqdqs, + st, + &mut self.mett, + &mut self.rng, + ) + .and_channel_status_writer(ch_wrst) + .and_with_use_ioc_time(ch_conf.use_ioc_time()); + robj.event_add_ingest( + ev.payload_len, + ev.value, + tsnow, + stnow, + tscaproto, + )?; + } + Ok(()) + } + } + }, + ReadingState::StopMonitoringForPolling(..) => { + self.mett.recv_read_notify_channel_transition().inc(); + Ok(()) } - }, - ReadingState::StopMonitoringForPolling(..) => { - error!("TODO handle_read_notify_res handle StopMonitoringForPolling"); } } + _ => { + self.mett.recv_read_notify_channel_unexpected_state().inc(); + Ok(()) + } } - _ => { - // TODO count instead of print - error!("unexpected state: ReadNotifyRes while having {:?}", ch_s); - } + } else { + self.mett.recv_read_notify_channel_not_found().inc(); + Ok(()) } } else { - // warn!("unknown {ioid:?}"); - self.mett.unknown_ioid().inc(); + self.mett.recv_read_notify_ioid_not_found().inc(); + // { + // self.mett.monitoring_read_unexpected().inc(); + // let item = ChannelStatusItem { + // ts: self.tmp_ts_poll, + // cssid: st.channel.cssid.clone(), + // status: ChannelStatus::MonitoringReadResultUnexpected, + // }; + // ch_wrst.emit_channel_status_item(item, Self::channel_status_qu(&mut self.iqdqs))?; + // } + Ok(()) } - Ok(()) } } @@ -2688,9 +2716,10 @@ impl CaConn { ChannelState::MakingSeriesWriter(_) => {} ChannelState::Writable(st2) => match &mut st2.reading { ReadingState::EnableMonitoring(_) => {} - ReadingState::Monitoring(st3) => match &st3.mon2state { + ReadingState::Monitoring(st3) => match &mut st3.mon2state { Monitoring2State::Passive(st4) => { - if st4.tsbeg + conf.conf.manual_poll_on_quiet_after() < tsnow { + if st4.tsbeg + st4.manual_poll_on_quiet_after() < tsnow { + st4.manual_poll_on_quiet_after_reset_next(&mut self.rng); trace_monitor_stale!("check_channels_state_poll Monitoring2State::Passive timeout"); self.mett.monitor_stale_read_begin().inc(); // TODO encapsulate and unify with Polling handler @@ -2812,7 +2841,7 @@ impl CaConn { PollTickState::Wait(st4) => { if st4.since + POLL_READ_TIMEOUT <= tsnow { if self.read_ioids.remove(&st4.ioid).is_some() { - self.mett.ioid_read_timeout().inc(); + self.mett.polling_read_timeout().inc(); } self.mett.caget_timeout().inc(); let next = PollTickStateIdle::decide_next(st4.next_backup, st3.poll_ivl, tsnow); @@ -2820,6 +2849,15 @@ impl CaConn { trace!("make poll idle after poll timeout {:?}", next); } st3.tick = PollTickState::Idle(PollTickStateIdle { next }); + { + let item = ChannelStatusItem { + ts: self.tmp_ts_poll, + cssid: st2.channel.cssid.clone(), + status: ChannelStatus::PollingReadTimeout, + }; + conf.wrst + .emit_channel_status_item(item, Self::channel_status_qu(&mut self.iqdqs))?; + } } } }, @@ -2898,7 +2936,9 @@ impl CaConn { // TODO should unify. } } - if st2.channel.ts_activity_last + conf.conf.expect_activity_within() < tsnow { + // TODO sync with Monitoring2PassiveState::manual_poll_on_quiet_after plus margin. + let timeout = Duration::from_millis(1000 * 800); + if st2.channel.ts_activity_last + conf.conf.expect_activity_within(timeout) < tsnow { not_alive_count += 1; } else { alive_count += 1; @@ -2907,10 +2947,9 @@ impl CaConn { _ => {} } } - // TODO STATS - // self.stats.channel_all_count.__set(self.channels.len() as _); - // self.stats.channel_alive_count.__set(alive_count as _); - // self.stats.channel_not_alive_count.__set(not_alive_count as _); + self.mett.channel_all_count().set(self.channels.len() as _); + self.mett.channel_alive_count().set(alive_count as _); + self.mett.channel_not_alive_count().set(not_alive_count as _); Ok(()) } diff --git a/netfetch/src/conf.rs b/netfetch/src/conf.rs index f51a563..6c8d805 100644 --- a/netfetch/src/conf.rs +++ b/netfetch/src/conf.rs @@ -816,37 +816,6 @@ impl ChannelConfig { } } - /// Only used when in monitoring mode. If we do not see activity for this Duration then - /// we issue a manual read to see if the channel is alive. - pub fn manual_poll_on_quiet_after(&self) -> Duration { - Duration::from_secs(300) - } - - pub fn expect_activity_within(&self) -> Duration { - let dur = if self.is_polled() { - // It would be anyway invalid to be polled and specify a monitor record policy. - match self.arch.short_term { - Some(ChannelReadConfig::Poll(x)) => x, - Some(ChannelReadConfig::Monitor) => self.manual_poll_on_quiet_after(), - None => match self.arch.medium_term { - Some(ChannelReadConfig::Poll(x)) => x, - Some(ChannelReadConfig::Monitor) => self.manual_poll_on_quiet_after(), - None => match self.arch.long_term { - Some(ChannelReadConfig::Poll(x)) => x, - Some(ChannelReadConfig::Monitor) => self.manual_poll_on_quiet_after(), - None => { - // This is an invalid configuration, so just a fallback - self.manual_poll_on_quiet_after() - } - }, - }, - } - } else { - self.manual_poll_on_quiet_after() - }; - dur + Duration::from_millis(1000 * 10) - } - pub fn min_quiets(&self) -> MinQuiets { MinQuiets { st: match self.arch.short_term { @@ -882,6 +851,33 @@ impl ChannelConfig { config_file_basename: String::new(), } } + + pub fn expect_activity_within(&self, manual_poll_on_quiet_after: Duration) -> Duration { + use crate::conf::ChannelReadConfig; + let conf = self; + let dur = if conf.is_polled() { + // It would be anyway invalid to be polled and specify a monitor record policy. + match conf.arch.short_term { + Some(ChannelReadConfig::Poll(x)) => x, + Some(ChannelReadConfig::Monitor) => manual_poll_on_quiet_after, + None => match conf.arch.medium_term { + Some(ChannelReadConfig::Poll(x)) => x, + Some(ChannelReadConfig::Monitor) => manual_poll_on_quiet_after, + None => match conf.arch.long_term { + Some(ChannelReadConfig::Poll(x)) => x, + Some(ChannelReadConfig::Monitor) => manual_poll_on_quiet_after, + None => { + // This is an invalid configuration, so just a fallback + manual_poll_on_quiet_after + } + }, + }, + } + } else { + manual_poll_on_quiet_after + }; + dur + Duration::from_millis(1000 * 10) + } } #[derive(Debug, Clone, PartialEq, Serialize)] diff --git a/serieswriter/src/ratelimitwriter.rs b/serieswriter/src/ratelimitwriter.rs index 997808f..04f9ab1 100644 --- a/serieswriter/src/ratelimitwriter.rs +++ b/serieswriter/src/ratelimitwriter.rs @@ -28,6 +28,9 @@ pub struct WriteRes { pub accept: bool, pub bytes: u32, pub msp_rewrite: u8, + pub ignore_monitor_not_min_quiet: u8, + pub ignore_poll_not_min_quiet: u8, + pub ignore_rate_cap: u8, } #[derive(Debug)] @@ -96,38 +99,31 @@ where let min_quiet = 1000 * self.min_quiet.as_secs() + self.min_quiet.subsec_millis() as u64; let tsl = self.last_insert_ts.clone(); let ts = tsev; - if false { - trace_rt_decision!( - dtd, - "{} {} min_quiet {:?} ts1 {:?} ts2 {:?} item {:?}", - dbgname, - sid, - min_quiet, - ts.ms(), - tsl.ms(), - item - ); - } + let mut ignore_monitor_not_min_quiet: u8 = 0; + let mut ignore_poll_not_min_quiet: u8 = 0; + let mut ignore_rate_cap: u8 = 0; let do_write = { if !self.is_polled && ts.ms() < tsl.ms() + min_quiet { trace_rt_decision!( dtd, - "{} {} ignore, because not min quiet {} {}", + "{} {} ignore, because monitor not min quiet {} {}", dbgname, sid, ts, tsl ); + ignore_monitor_not_min_quiet += 1; false } else if self.is_polled && ts.ms() + 800 < tsl.ms() + min_quiet { trace_rt_decision!( dtd, - "{} {} ignore, because not is-polled min quiet {} {}", + "{} {} ignore, because poll not min quiet {} {}", dbgname, sid, ts, tsl ); + ignore_poll_not_min_quiet += 1; false } else if ts < tsl.add_dt_nano(DtNano::from_ms(1)) { trace_rt_decision!( @@ -138,6 +134,7 @@ where ts, tsl ); + ignore_rate_cap += 1; false } else { trace_rt_decision!(dtd, "{} {} accept {} {}", dbgname, sid, ts, tsl); @@ -151,6 +148,9 @@ where accept: true, bytes: res.bytes, msp_rewrite: res.msp_rewrite, + ignore_monitor_not_min_quiet, + ignore_poll_not_min_quiet, + ignore_rate_cap, }; Ok(ret) } else { @@ -158,6 +158,9 @@ where accept: false, bytes: 0, msp_rewrite: 0, + ignore_monitor_not_min_quiet, + ignore_poll_not_min_quiet, + ignore_rate_cap, }; Ok(ret) } diff --git a/serieswriter/src/rtwriter.rs b/serieswriter/src/rtwriter.rs index 51ae470..4b8f24b 100644 --- a/serieswriter/src/rtwriter.rs +++ b/serieswriter/src/rtwriter.rs @@ -83,6 +83,32 @@ impl WriteRes { pub fn msp_rewrite(&self) -> u8 { self.st.msp_rewrite + self.mt.msp_rewrite + self.lt.msp_rewrite } + + pub fn ignore_rewind_time(&self) -> u8 { + self.st.ignore_rewind_time + self.mt.ignore_rewind_time + self.lt.ignore_rewind_time + } + + pub fn ignore_same_time(&self) -> u8 { + self.st.ignore_same_time + self.mt.ignore_same_time + self.lt.ignore_same_time + } + + pub fn ignore_same_value(&self) -> u8 { + self.st.ignore_same_value + self.mt.ignore_same_value + self.lt.ignore_same_value + } + + pub fn ignore_monitor_not_min_quiet(&self) -> u8 { + self.st.ignore_monitor_not_min_quiet + + self.mt.ignore_monitor_not_min_quiet + + self.lt.ignore_monitor_not_min_quiet + } + + pub fn ignore_poll_not_min_quiet(&self) -> u8 { + self.st.ignore_poll_not_min_quiet + self.mt.ignore_poll_not_min_quiet + self.lt.ignore_poll_not_min_quiet + } + + pub fn ignore_rate_cap(&self) -> u8 { + self.st.ignore_rate_cap + self.mt.ignore_rate_cap + self.lt.ignore_rate_cap + } } #[derive(Debug)] @@ -90,6 +116,56 @@ pub struct WriteRtRes { pub accept: bool, pub bytes: u32, pub msp_rewrite: u8, + pub ignore_rewind_time: u8, + pub ignore_same_time: u8, + pub ignore_same_value: u8, + pub ignore_monitor_not_min_quiet: u8, + pub ignore_poll_not_min_quiet: u8, + pub ignore_rate_cap: u8, +} + +impl WriteRtRes { + fn ignore_rewind_time() -> Self { + Self { + accept: false, + bytes: 0, + msp_rewrite: 0, + ignore_rewind_time: 1, + ignore_same_time: 0, + ignore_same_value: 0, + ignore_monitor_not_min_quiet: 0, + ignore_poll_not_min_quiet: 0, + ignore_rate_cap: 0, + } + } + + fn ignore_same_time() -> Self { + Self { + accept: false, + bytes: 0, + msp_rewrite: 0, + ignore_rewind_time: 0, + ignore_same_time: 1, + ignore_same_value: 0, + ignore_monitor_not_min_quiet: 0, + ignore_poll_not_min_quiet: 0, + ignore_rate_cap: 0, + } + } + + fn ignore_same_value() -> Self { + Self { + accept: false, + bytes: 0, + msp_rewrite: 0, + ignore_rewind_time: 0, + ignore_same_time: 0, + ignore_same_value: 1, + ignore_monitor_not_min_quiet: 0, + ignore_poll_not_min_quiet: 0, + ignore_rate_cap: 0, + } + } } impl Default for WriteRtRes { @@ -98,6 +174,12 @@ impl Default for WriteRtRes { accept: false, bytes: 0, msp_rewrite: 0, + ignore_rewind_time: 0, + ignore_same_time: 0, + ignore_same_value: 0, + ignore_monitor_not_min_quiet: 0, + ignore_poll_not_min_quiet: 0, + ignore_rate_cap: 0, } } } @@ -229,14 +311,14 @@ where tsev, tsl ); - res_lt = WriteRtRes::default(); - res_mt = WriteRtRes::default(); - res_st = WriteRtRes::default(); + res_lt = WriteRtRes::ignore_rewind_time(); + res_mt = WriteRtRes::ignore_rewind_time(); + res_st = WriteRtRes::ignore_rewind_time(); } else if tsev == tsl { trace_rt_decision!(det, "{} ignore, because same time {:?} {:?}", self.series, tsev, tsl); - res_lt = WriteRtRes::default(); - res_mt = WriteRtRes::default(); - res_st = WriteRtRes::default(); + res_lt = WriteRtRes::ignore_same_time(); + res_mt = WriteRtRes::ignore_same_time(); + res_st = WriteRtRes::ignore_same_time(); } else if self .last_insert_val .as_ref() @@ -245,9 +327,9 @@ where == false { trace_rt_decision!(det, "{} ignore, because value did not change", self.series); - res_lt = WriteRtRes::default(); - res_mt = WriteRtRes::default(); - res_st = WriteRtRes::default(); + res_lt = WriteRtRes::ignore_same_value(); + res_mt = WriteRtRes::ignore_same_value(); + res_st = WriteRtRes::ignore_same_value(); } else { res_lt = Self::write_inner(&mut self.state_lt, item.clone(), ts_net, tsev, &mut iqdqs.lt_rf3_qu)?; res_mt = Self::write_inner(&mut self.state_mt, item.clone(), ts_net, tsev, &mut iqdqs.mt_rf3_qu)?; @@ -281,6 +363,12 @@ where accept: x.accept, bytes: x.bytes, msp_rewrite: x.msp_rewrite, + ignore_rewind_time: 0, + ignore_same_time: 0, + ignore_same_value: 0, + ignore_monitor_not_min_quiet: x.ignore_monitor_not_min_quiet, + ignore_poll_not_min_quiet: x.ignore_poll_not_min_quiet, + ignore_rate_cap: x.ignore_rate_cap, }; Ok(ret) } diff --git a/stats/mettdecl.rs b/stats/mettdecl.rs index 0779153..7bc3d18 100644 --- a/stats/mettdecl.rs +++ b/stats/mettdecl.rs @@ -17,12 +17,6 @@ mod Metrics { enum counters { metrics_emit, metrics_emit_final, - ioid_read_begin, - ioid_read_done, - ioid_read_timeout, - ioid_read_error_exists, - ioid_read_error_not_found, - recv_read_notify_state_passive_found_ioid, proto_out_push, logic_error, poll_fn_begin, @@ -47,17 +41,26 @@ mod Metrics { pong_timeout, caget_timeout, caget_issued, - monitor_stale_read_timeout, - monitor_stale_read_begin, unknown_ioid, + monitor_stale_read_begin, + monitor_stale_read_timeout, + ioid_read_error_exists, + ioid_read_begin, + recv_read_notify_ioid_not_found, + recv_read_notify_channel_not_found, + recv_read_notify_channel_unexpected_state, + recv_read_notify_channel_transition, + recv_read_notify_channel_sid_mismatch, + recv_read_notify_poll_wait, + recv_read_notify_poll_idle, + recv_read_notify_monitor_passive, recv_read_notify_state_read_pending, recv_read_notify_state_read_pending_bad_ioid, - recv_read_notify_while_polling_idle, recv_read_notify_while_enabling_monitoring, - no_cid_for_subid, recv_read_notify_but_no_longer_ready, recv_read_notify_but_not_init_yet, recv_event_add_while_wait_on_read_notify, + no_cid_for_subid, monitoring_read_expected, monitoring_read_unexpected, monitoring_read_diff_time, @@ -65,12 +68,24 @@ mod Metrics { transition_to_polling, transition_to_polling_bad_state, transition_to_polling_already_in, + polling_read_timeout, unknown_subid, get_series_id_ok, channel_add_exists, ts_msp_reput_onevent, ts_msp_reput_periodic, series_writer_on_close, + writer_ignore_rewind_time, + writer_ignore_same_time, + writer_ignore_same_value, + writer_ignore_monitor_not_min_quiet, + writer_ignore_poll_not_min_quiet, + writer_ignore_rate_cap, + } + enum values { + channel_all_count, + channel_alive_count, + channel_not_alive_count, } enum histolog2s { clock_ioc_diff_abs,