Clean up some metrics counters

This commit is contained in:
Dominik Werder
2025-07-09 12:44:55 +02:00
parent 3270d48436
commit 7aba068324
7 changed files with 404 additions and 263 deletions

View File

@@ -736,7 +736,7 @@ dependencies = [
[[package]]
name = "daqingest"
version = "0.3.0"
version = "0.3.0-ab.1"
dependencies = [
"async-channel",
"autoerr",
@@ -1257,9 +1257,9 @@ dependencies = [
[[package]]
name = "hyper-util"
version = "0.1.14"
version = "0.1.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dc2fdfdbff08affe55bb779f33b053aa1fe5dd5b54c257343c17edfa55711bdb"
checksum = "7f66d5bd4c6f02bf0542fad85d626775bab9258cf795a4256dcaf3161114d1df"
dependencies = [
"bytes",
"futures-core",
@@ -2155,21 +2155,21 @@ dependencies = [
[[package]]
name = "scylla"
version = "1.2.0"
version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1f4d5b7fe3cb3e140d374238f5f916eb810053562287a01d66ac685e305c166f"
checksum = "221bcc7d06d8eddb9f1152e7955c4965950a6b93666b40797a9ce78624f5a4d2"
dependencies = [
"arc-swap",
"async-trait",
"bytes",
"chrono",
"dashmap 5.5.3",
"dashmap 6.1.0",
"futures",
"hashbrown 0.14.5",
"hashbrown 0.15.4",
"itertools 0.14.0",
"rand 0.9.1",
"rand_pcg 0.9.0",
"scylla-cql 1.2.0",
"scylla-cql 1.3.0",
"smallvec",
"socket2",
"thiserror",
@@ -2199,22 +2199,22 @@ dependencies = [
[[package]]
name = "scylla-cql"
version = "1.2.0"
version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "507db4914c625c86d32c5c00ed1add75eaf966a2d4ba9772b601b2563701df58"
checksum = "58b3e593a1cb468a39f7d51d6971b462a22672f22bc5b6b0dab5426acd48189a"
dependencies = [
"byteorder",
"bytes",
"chrono",
"itertools 0.14.0",
"lz4_flex",
"scylla-macros 1.2.0",
"scylla-macros 1.3.0",
"snap",
"stable_deref_trait",
"thiserror",
"tokio",
"uuid",
"yoke 0.7.5",
"yoke 0.8.0",
]
[[package]]
@@ -2231,9 +2231,9 @@ dependencies = [
[[package]]
name = "scylla-macros"
version = "1.2.0"
version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8efb4b519ab70556c8d3adfd4f192c635d0c7c72d4f125d2b79713742c98f39d"
checksum = "fcd4e8ce08ba975bdbff47f6bc16f4a87f0c852866baaba5947e29f58e7ce4df"
dependencies = [
"darling",
"proc-macro2",
@@ -2254,7 +2254,7 @@ dependencies = [
"daqbuf-series",
"futures-util",
"pin-project",
"scylla 1.2.0",
"scylla 1.3.0",
"serde",
"smallvec",
"stats",

View File

@@ -1,6 +1,6 @@
[package]
name = "daqingest"
version = "0.3.0-ab.1"
version = "0.3.0"
authors = ["Dominik Werder <dominik.werder@gmail.com>"]
edition = "2024"

View File

@@ -1,6 +1,5 @@
mod enumfetch;
use crate::ca::connset::CaConnSet;
use crate::conf::ChannelConfig;
use crate::metrics::status::StorageUsage;
use crate::throttletrace::ThrottleTrace;
@@ -48,7 +47,6 @@ use scywriiq::AccountingRecv;
use scywriiq::ChannelStatusItem;
use scywriiq::ConnectionStatus;
use scywriiq::ConnectionStatusItem;
use scywriiq::MspItem;
use scywriiq::QueryItem;
use scywriiq::ShutdownReason;
use serde::Serialize;
@@ -318,6 +316,27 @@ struct Monitoring2PassiveState {
tsbeg: Instant,
#[serde(with = "serde_Instant_elapsed_ms")]
ts_silence_read_next: Instant,
manual_poll_on_quiet_after_sec: u16,
}
impl Monitoring2PassiveState {
fn new(tsnow: Instant, rng: &mut Xoshiro128PlusPlus) -> Self {
let mut ret = Self {
tsbeg: tsnow,
ts_silence_read_next: tsnow + CaConn::silence_read_next_ivl_rng(rng),
manual_poll_on_quiet_after_sec: 300,
};
ret.manual_poll_on_quiet_after_reset_next(rng);
ret
}
fn manual_poll_on_quiet_after(&self) -> Duration {
Duration::from_secs(self.manual_poll_on_quiet_after_sec as u64)
}
fn manual_poll_on_quiet_after_reset_next(&mut self, rng: &mut Xoshiro128PlusPlus) {
self.manual_poll_on_quiet_after_sec = 250 + 0x3f & rng.next_u32() as u16;
}
}
#[derive(Debug, Clone, Serialize)]
@@ -336,7 +355,6 @@ enum Monitoring2State {
pub enum MonitorReadCmp {
Equal,
DiffTime,
DiffTimeValue,
DiffValue,
}
@@ -408,6 +426,7 @@ struct WritableState {
writer: CaRtWriter,
binwriter: BinWriter,
reading: ReadingState,
manual_poll_on_quiet_after_sec: u16,
}
#[derive(Debug, Clone, Serialize)]
@@ -1089,7 +1108,19 @@ impl<'a> EventAddIngestRefobj<'a> {
if let Some(binwriter) = self.binwriter.as_mut() {
binwriter.ingest(tsev, val_for_agg, self.iqdqs)?;
}
self.mett.ts_msp_reput_onevent().add(wres.msp_rewrite() as u32);
self.mett.ts_msp_reput_onevent().add(wres.msp_rewrite() as _);
self.mett
.writer_ignore_rewind_time()
.add(wres.ignore_rewind_time() as _);
self.mett.writer_ignore_same_time().add(wres.ignore_same_time() as _);
self.mett.writer_ignore_same_value().add(wres.ignore_same_value() as _);
self.mett
.writer_ignore_monitor_not_min_quiet()
.add(wres.ignore_monitor_not_min_quiet() as _);
self.mett
.writer_ignore_poll_not_min_quiet()
.add(wres.ignore_poll_not_min_quiet() as _);
self.mett.writer_ignore_rate_cap().add(wres.ignore_rate_cap() as _);
}
if false {
// TODO record stats on drop with the new filter
@@ -1823,6 +1854,7 @@ impl CaConn {
poll_ivl: ivl,
tick: PollTickState::Idle(PollTickStateIdle { next }),
}),
manual_poll_on_quiet_after_sec: 300,
};
conf.state = ChannelState::Writable(created_state);
Ok(())
@@ -1861,6 +1893,7 @@ impl CaConn {
tsbeg: self.poll_tsnow,
subid,
}),
manual_poll_on_quiet_after_sec: 300,
};
conf.state = ChannelState::Writable(created_state);
Ok(())
@@ -2222,10 +2255,7 @@ impl CaConn {
st.reading = ReadingState::Monitoring(MonitoringState {
tsbeg: tsnow,
subid: st2.subid,
mon2state: Monitoring2State::Passive(Monitoring2PassiveState {
tsbeg: tsnow,
ts_silence_read_next: tsnow + Self::silence_read_next_ivl_rng(&mut self.rng),
}),
mon2state: Monitoring2State::Passive(Monitoring2PassiveState::new(tsnow, &mut self.rng)),
monitoring_event_last: Some(ev.clone()),
last_comparisons: VecDeque::new(),
});
@@ -2383,165 +2413,38 @@ impl CaConn {
Err(Error::FutLogic)
}
} else {
if let Some(cid) = self.read_ioids.get(&ioid) {
let (ch_s, ch_wrst, ch_conf) = if let Some(x) = self.channels.get_mut(cid) {
(&mut x.state, &mut x.wrst, &x.conf)
} else {
warn!("handle_read_notify_res can not find channel for {:?} {:?}", cid, ioid);
return Ok(());
};
match ch_s {
ChannelState::Writable(st) => {
if st.channel.sid != sid_ev {
// TODO count for metrics
// warn!("mismatch in ReadNotifyRes {:?} {:?}", st.channel.sid, sid_ev);
}
let stnow = self.tmp_ts_poll;
let crst = &mut st.channel;
let stwin_ts = stnow.duration_since(std::time::UNIX_EPOCH).unwrap().as_secs() / 1;
if crst.stwin_ts != stwin_ts {
crst.stwin_ts = stwin_ts;
crst.stwin_count = 0;
}
{
crst.stwin_count += 1;
crst.stwin_bytes += ev.payload_len;
}
match &mut st.reading {
ReadingState::Polling(st2) => match &mut st2.tick {
PollTickState::Idle(_) => {
self.mett.recv_read_notify_while_polling_idle().inc();
}
PollTickState::Wait(st3) => {
if self.read_ioids.remove(&st3.ioid).is_some() {
self.mett.ioid_read_done().inc();
} else {
self.mett.ioid_read_error_not_found().inc();
}
let dt = tsnow.saturating_duration_since(st3.since);
self.mett.caget_lat().push_dur_100us(dt);
let next = PollTickStateIdle::decide_next(st3.next_backup, st2.poll_ivl, tsnow);
if self.trace_channel_poll {
trace!("make next poll idle at {:?} tsnow {:?}", next, tsnow);
}
st2.tick = PollTickState::Idle(PollTickStateIdle { next });
let mut robj = EventAddIngestRefobj::from_writable_state(
&self.opts,
&mut self.iqdqs,
st,
&mut self.mett,
&mut self.rng,
)
.and_channel_status_writer(ch_wrst)
.and_with_use_ioc_time(ch_conf.use_ioc_time());
robj.event_add_ingest(ev.payload_len, ev.value, tsnow, stnow, tscaproto)?;
}
},
ReadingState::EnableMonitoring(_) => {
self.mett.recv_read_notify_while_enabling_monitoring().inc();
if let Some(cid) = self.read_ioids.remove(&ioid) {
if let Some(x) = self.channels.get_mut(&cid) {
let ch_s = &mut x.state;
let ch_wrst = &mut x.wrst;
let ch_conf = &x.conf;
match ch_s {
ChannelState::Writable(st) => {
if st.channel.sid != sid_ev {
self.mett.recv_read_notify_channel_sid_mismatch().inc();
}
ReadingState::Monitoring(st2) => match &mut st2.mon2state {
Monitoring2State::Passive(st3) => {
if self.read_ioids.remove(&ioid).is_some() {
self.mett.ioid_read_done().inc();
self.mett.recv_read_notify_state_passive_found_ioid().inc();
} else {
self.mett.ioid_read_error_not_found().inc();
}
st3.tsbeg = tsnow;
}
Monitoring2State::ReadPending(st3) => {
// We don't check again for `since` here. That's done in timeout checking.
// So we could be here a little beyond timeout but we don't care about that.
if ioid != st3.ioid {
// warn!("IOID mismatch ReadNotifyRes on Monitor Read Pending {ioid:?} {ioid2:?}");
self.mett.recv_read_notify_state_read_pending_bad_ioid().inc();
} else {
self.mett.recv_read_notify_state_read_pending().inc();
}
let read_expected = if let Some(_cid) = self.read_ioids.remove(&ioid) {
self.mett.ioid_read_done().inc();
true
} else {
self.mett.ioid_read_error_not_found().inc();
false
};
st2.mon2state = Monitoring2State::Passive(Monitoring2PassiveState {
tsbeg: tsnow,
ts_silence_read_next: tsnow + Self::silence_read_next_ivl_rng(&mut self.rng),
});
if read_expected {
self.mett.monitoring_read_expected().inc();
let item = ChannelStatusItem {
ts: self.tmp_ts_poll,
cssid: st.channel.cssid.clone(),
status: ChannelStatus::MonitoringReadResultExpected,
};
ch_wrst
.emit_channel_status_item(item, Self::channel_status_qu(&mut self.iqdqs))?;
} else {
self.mett.monitoring_read_unexpected().inc();
let item = ChannelStatusItem {
ts: self.tmp_ts_poll,
cssid: st.channel.cssid.clone(),
status: ChannelStatus::MonitoringReadResultUnexpected,
};
ch_wrst
.emit_channel_status_item(item, Self::channel_status_qu(&mut self.iqdqs))?;
}
// NOTE we do not update the last value in this ev handler.
{
if let Some(lst) = st2.monitoring_event_last.as_ref() {
// TODO compare with last monitoring value
if ev.value.meta == lst.value.meta {
st2.last_comparisons
.push_back((UtcDateTime::now(), MonitorReadCmp::Equal));
} else {
self.mett.monitoring_read_diff_time().inc();
st2.last_comparisons
.push_back((UtcDateTime::now(), MonitorReadCmp::DiffTime));
{
let item = ChannelStatusItem {
ts: self.tmp_ts_poll,
cssid: st.channel.cssid.clone(),
status: ChannelStatus::MonitoringReadDiffTime,
};
ch_wrst.emit_channel_status_item(
item,
Self::channel_status_qu(&mut self.iqdqs),
)?;
}
}
if ev.value.data == lst.value.data {
st2.last_comparisons
.push_back((UtcDateTime::now(), MonitorReadCmp::Equal));
} else {
self.mett.monitoring_read_diff_value().inc();
st2.last_comparisons
.push_back((UtcDateTime::now(), MonitorReadCmp::DiffValue));
{
let item = ChannelStatusItem {
ts: self.tmp_ts_poll,
cssid: st.channel.cssid.clone(),
status: ChannelStatus::MonitoringReadDiffValue,
};
ch_wrst.emit_channel_status_item(
item,
Self::channel_status_qu(&mut self.iqdqs),
)?;
}
}
let stnow = self.tmp_ts_poll;
let crst = &mut st.channel;
let stwin_ts = stnow.duration_since(std::time::UNIX_EPOCH).unwrap().as_secs() / 1;
if crst.stwin_ts != stwin_ts {
crst.stwin_ts = stwin_ts;
crst.stwin_count = 0;
}
{
crst.stwin_count += 1;
crst.stwin_bytes += ev.payload_len;
}
match &mut st.reading {
ReadingState::Polling(st2) => match &mut st2.tick {
PollTickState::Wait(st3) => {
self.mett.recv_read_notify_poll_wait().inc();
let dt = tsnow.saturating_duration_since(st3.since);
self.mett.caget_lat().push_dur_100us(dt);
let next = PollTickStateIdle::decide_next(st3.next_backup, st2.poll_ivl, tsnow);
if self.trace_channel_poll {
trace!("make next poll idle at {:?} tsnow {:?}", next, tsnow);
}
while st2.last_comparisons.len() > 12 {
st2.last_comparisons.pop_front();
}
}
// TODO check ADEL to see if monitor should have fired.
// But there is still a small chance that the monitor will just received slightly later.
// More involved check would be to raise a flag, wait for the expected monitor for some
// timeout, and if we get nothing error out.
// TODO read-result-after-monitor-silence
if false {
st2.tick = PollTickState::Idle(PollTickStateIdle { next });
let mut robj = EventAddIngestRefobj::from_writable_state(
&self.opts,
&mut self.iqdqs,
@@ -2552,24 +2455,149 @@ impl CaConn {
.and_channel_status_writer(ch_wrst)
.and_with_use_ioc_time(ch_conf.use_ioc_time());
robj.event_add_ingest(ev.payload_len, ev.value, tsnow, stnow, tscaproto)?;
Ok(())
}
PollTickState::Idle(_) => {
self.mett.recv_read_notify_poll_idle().inc();
Ok(())
}
},
ReadingState::EnableMonitoring(_) => {
self.mett.recv_read_notify_channel_transition().inc();
Ok(())
}
ReadingState::Monitoring(st2) => match &mut st2.mon2state {
Monitoring2State::Passive(st3) => {
self.mett.recv_read_notify_monitor_passive().inc();
st3.tsbeg = tsnow;
Ok(())
}
Monitoring2State::ReadPending(st3) => {
// We don't check again for `since` here. That's done in timeout checking.
// So we could be here a little beyond timeout but we don't care about that.
if ioid != st3.ioid {
// warn!("IOID mismatch ReadNotifyRes on Monitor Read Pending {ioid:?} {ioid2:?}");
self.mett.recv_read_notify_state_read_pending_bad_ioid().inc();
Ok(())
} else {
st2.mon2state = Monitoring2State::Passive(Monitoring2PassiveState::new(
tsnow,
&mut self.rng,
));
{
self.mett.monitoring_read_expected().inc();
let item = ChannelStatusItem {
ts: self.tmp_ts_poll,
cssid: st.channel.cssid.clone(),
status: ChannelStatus::MonitoringReadResultExpected,
};
ch_wrst.emit_channel_status_item(
item,
Self::channel_status_qu(&mut self.iqdqs),
)?;
}
// NOTE we do not update the last value in this ev handler.
{
if let Some(lst) = st2.monitoring_event_last.as_ref() {
// TODO compare with last monitoring value
if ev.value.meta == lst.value.meta {
st2.last_comparisons
.push_back((UtcDateTime::now(), MonitorReadCmp::Equal));
} else {
self.mett.monitoring_read_diff_time().inc();
st2.last_comparisons
.push_back((UtcDateTime::now(), MonitorReadCmp::DiffTime));
{
let item = ChannelStatusItem {
ts: self.tmp_ts_poll,
cssid: st.channel.cssid.clone(),
status: ChannelStatus::MonitoringReadDiffTime,
};
ch_wrst.emit_channel_status_item(
item,
Self::channel_status_qu(&mut self.iqdqs),
)?;
}
}
if ev.value.data == lst.value.data {
st2.last_comparisons
.push_back((UtcDateTime::now(), MonitorReadCmp::Equal));
} else {
self.mett.monitoring_read_diff_value().inc();
st2.last_comparisons
.push_back((UtcDateTime::now(), MonitorReadCmp::DiffValue));
{
let item = ChannelStatusItem {
ts: self.tmp_ts_poll,
cssid: st.channel.cssid.clone(),
status: ChannelStatus::MonitoringReadDiffValue,
};
ch_wrst.emit_channel_status_item(
item,
Self::channel_status_qu(&mut self.iqdqs),
)?;
}
}
}
while st2.last_comparisons.len() > 12 {
st2.last_comparisons.pop_front();
}
}
// TODO check ADEL to see if monitor should have fired.
// But there is still a small chance that the monitor will just received slightly later.
// More involved check would be to raise a flag, wait for the expected monitor for some
// timeout, and if we get nothing error out.
// TODO read-result-after-monitor-silence
if false {
let mut robj = EventAddIngestRefobj::from_writable_state(
&self.opts,
&mut self.iqdqs,
st,
&mut self.mett,
&mut self.rng,
)
.and_channel_status_writer(ch_wrst)
.and_with_use_ioc_time(ch_conf.use_ioc_time());
robj.event_add_ingest(
ev.payload_len,
ev.value,
tsnow,
stnow,
tscaproto,
)?;
}
Ok(())
}
}
},
ReadingState::StopMonitoringForPolling(..) => {
self.mett.recv_read_notify_channel_transition().inc();
Ok(())
}
},
ReadingState::StopMonitoringForPolling(..) => {
error!("TODO handle_read_notify_res handle StopMonitoringForPolling");
}
}
_ => {
self.mett.recv_read_notify_channel_unexpected_state().inc();
Ok(())
}
}
_ => {
// TODO count instead of print
error!("unexpected state: ReadNotifyRes while having {:?}", ch_s);
}
} else {
self.mett.recv_read_notify_channel_not_found().inc();
Ok(())
}
} else {
// warn!("unknown {ioid:?}");
self.mett.unknown_ioid().inc();
self.mett.recv_read_notify_ioid_not_found().inc();
// {
// self.mett.monitoring_read_unexpected().inc();
// let item = ChannelStatusItem {
// ts: self.tmp_ts_poll,
// cssid: st.channel.cssid.clone(),
// status: ChannelStatus::MonitoringReadResultUnexpected,
// };
// ch_wrst.emit_channel_status_item(item, Self::channel_status_qu(&mut self.iqdqs))?;
// }
Ok(())
}
Ok(())
}
}
@@ -2688,9 +2716,10 @@ impl CaConn {
ChannelState::MakingSeriesWriter(_) => {}
ChannelState::Writable(st2) => match &mut st2.reading {
ReadingState::EnableMonitoring(_) => {}
ReadingState::Monitoring(st3) => match &st3.mon2state {
ReadingState::Monitoring(st3) => match &mut st3.mon2state {
Monitoring2State::Passive(st4) => {
if st4.tsbeg + conf.conf.manual_poll_on_quiet_after() < tsnow {
if st4.tsbeg + st4.manual_poll_on_quiet_after() < tsnow {
st4.manual_poll_on_quiet_after_reset_next(&mut self.rng);
trace_monitor_stale!("check_channels_state_poll Monitoring2State::Passive timeout");
self.mett.monitor_stale_read_begin().inc();
// TODO encapsulate and unify with Polling handler
@@ -2812,7 +2841,7 @@ impl CaConn {
PollTickState::Wait(st4) => {
if st4.since + POLL_READ_TIMEOUT <= tsnow {
if self.read_ioids.remove(&st4.ioid).is_some() {
self.mett.ioid_read_timeout().inc();
self.mett.polling_read_timeout().inc();
}
self.mett.caget_timeout().inc();
let next = PollTickStateIdle::decide_next(st4.next_backup, st3.poll_ivl, tsnow);
@@ -2820,6 +2849,15 @@ impl CaConn {
trace!("make poll idle after poll timeout {:?}", next);
}
st3.tick = PollTickState::Idle(PollTickStateIdle { next });
{
let item = ChannelStatusItem {
ts: self.tmp_ts_poll,
cssid: st2.channel.cssid.clone(),
status: ChannelStatus::PollingReadTimeout,
};
conf.wrst
.emit_channel_status_item(item, Self::channel_status_qu(&mut self.iqdqs))?;
}
}
}
},
@@ -2898,7 +2936,9 @@ impl CaConn {
// TODO should unify.
}
}
if st2.channel.ts_activity_last + conf.conf.expect_activity_within() < tsnow {
// TODO sync with Monitoring2PassiveState::manual_poll_on_quiet_after plus margin.
let timeout = Duration::from_millis(1000 * 800);
if st2.channel.ts_activity_last + conf.conf.expect_activity_within(timeout) < tsnow {
not_alive_count += 1;
} else {
alive_count += 1;
@@ -2907,10 +2947,9 @@ impl CaConn {
_ => {}
}
}
// TODO STATS
// self.stats.channel_all_count.__set(self.channels.len() as _);
// self.stats.channel_alive_count.__set(alive_count as _);
// self.stats.channel_not_alive_count.__set(not_alive_count as _);
self.mett.channel_all_count().set(self.channels.len() as _);
self.mett.channel_alive_count().set(alive_count as _);
self.mett.channel_not_alive_count().set(not_alive_count as _);
Ok(())
}

View File

@@ -816,37 +816,6 @@ impl ChannelConfig {
}
}
/// Only used when in monitoring mode. If we do not see activity for this Duration then
/// we issue a manual read to see if the channel is alive.
pub fn manual_poll_on_quiet_after(&self) -> Duration {
Duration::from_secs(300)
}
pub fn expect_activity_within(&self) -> Duration {
let dur = if self.is_polled() {
// It would be anyway invalid to be polled and specify a monitor record policy.
match self.arch.short_term {
Some(ChannelReadConfig::Poll(x)) => x,
Some(ChannelReadConfig::Monitor) => self.manual_poll_on_quiet_after(),
None => match self.arch.medium_term {
Some(ChannelReadConfig::Poll(x)) => x,
Some(ChannelReadConfig::Monitor) => self.manual_poll_on_quiet_after(),
None => match self.arch.long_term {
Some(ChannelReadConfig::Poll(x)) => x,
Some(ChannelReadConfig::Monitor) => self.manual_poll_on_quiet_after(),
None => {
// This is an invalid configuration, so just a fallback
self.manual_poll_on_quiet_after()
}
},
},
}
} else {
self.manual_poll_on_quiet_after()
};
dur + Duration::from_millis(1000 * 10)
}
pub fn min_quiets(&self) -> MinQuiets {
MinQuiets {
st: match self.arch.short_term {
@@ -882,6 +851,33 @@ impl ChannelConfig {
config_file_basename: String::new(),
}
}
pub fn expect_activity_within(&self, manual_poll_on_quiet_after: Duration) -> Duration {
use crate::conf::ChannelReadConfig;
let conf = self;
let dur = if conf.is_polled() {
// It would be anyway invalid to be polled and specify a monitor record policy.
match conf.arch.short_term {
Some(ChannelReadConfig::Poll(x)) => x,
Some(ChannelReadConfig::Monitor) => manual_poll_on_quiet_after,
None => match conf.arch.medium_term {
Some(ChannelReadConfig::Poll(x)) => x,
Some(ChannelReadConfig::Monitor) => manual_poll_on_quiet_after,
None => match conf.arch.long_term {
Some(ChannelReadConfig::Poll(x)) => x,
Some(ChannelReadConfig::Monitor) => manual_poll_on_quiet_after,
None => {
// This is an invalid configuration, so just a fallback
manual_poll_on_quiet_after
}
},
},
}
} else {
manual_poll_on_quiet_after
};
dur + Duration::from_millis(1000 * 10)
}
}
#[derive(Debug, Clone, PartialEq, Serialize)]

View File

@@ -28,6 +28,9 @@ pub struct WriteRes {
pub accept: bool,
pub bytes: u32,
pub msp_rewrite: u8,
pub ignore_monitor_not_min_quiet: u8,
pub ignore_poll_not_min_quiet: u8,
pub ignore_rate_cap: u8,
}
#[derive(Debug)]
@@ -96,38 +99,31 @@ where
let min_quiet = 1000 * self.min_quiet.as_secs() + self.min_quiet.subsec_millis() as u64;
let tsl = self.last_insert_ts.clone();
let ts = tsev;
if false {
trace_rt_decision!(
dtd,
"{} {} min_quiet {:?} ts1 {:?} ts2 {:?} item {:?}",
dbgname,
sid,
min_quiet,
ts.ms(),
tsl.ms(),
item
);
}
let mut ignore_monitor_not_min_quiet: u8 = 0;
let mut ignore_poll_not_min_quiet: u8 = 0;
let mut ignore_rate_cap: u8 = 0;
let do_write = {
if !self.is_polled && ts.ms() < tsl.ms() + min_quiet {
trace_rt_decision!(
dtd,
"{} {} ignore, because not min quiet {} {}",
"{} {} ignore, because monitor not min quiet {} {}",
dbgname,
sid,
ts,
tsl
);
ignore_monitor_not_min_quiet += 1;
false
} else if self.is_polled && ts.ms() + 800 < tsl.ms() + min_quiet {
trace_rt_decision!(
dtd,
"{} {} ignore, because not is-polled min quiet {} {}",
"{} {} ignore, because poll not min quiet {} {}",
dbgname,
sid,
ts,
tsl
);
ignore_poll_not_min_quiet += 1;
false
} else if ts < tsl.add_dt_nano(DtNano::from_ms(1)) {
trace_rt_decision!(
@@ -138,6 +134,7 @@ where
ts,
tsl
);
ignore_rate_cap += 1;
false
} else {
trace_rt_decision!(dtd, "{} {} accept {} {}", dbgname, sid, ts, tsl);
@@ -151,6 +148,9 @@ where
accept: true,
bytes: res.bytes,
msp_rewrite: res.msp_rewrite,
ignore_monitor_not_min_quiet,
ignore_poll_not_min_quiet,
ignore_rate_cap,
};
Ok(ret)
} else {
@@ -158,6 +158,9 @@ where
accept: false,
bytes: 0,
msp_rewrite: 0,
ignore_monitor_not_min_quiet,
ignore_poll_not_min_quiet,
ignore_rate_cap,
};
Ok(ret)
}

View File

@@ -83,6 +83,32 @@ impl WriteRes {
pub fn msp_rewrite(&self) -> u8 {
self.st.msp_rewrite + self.mt.msp_rewrite + self.lt.msp_rewrite
}
pub fn ignore_rewind_time(&self) -> u8 {
self.st.ignore_rewind_time + self.mt.ignore_rewind_time + self.lt.ignore_rewind_time
}
pub fn ignore_same_time(&self) -> u8 {
self.st.ignore_same_time + self.mt.ignore_same_time + self.lt.ignore_same_time
}
pub fn ignore_same_value(&self) -> u8 {
self.st.ignore_same_value + self.mt.ignore_same_value + self.lt.ignore_same_value
}
pub fn ignore_monitor_not_min_quiet(&self) -> u8 {
self.st.ignore_monitor_not_min_quiet
+ self.mt.ignore_monitor_not_min_quiet
+ self.lt.ignore_monitor_not_min_quiet
}
pub fn ignore_poll_not_min_quiet(&self) -> u8 {
self.st.ignore_poll_not_min_quiet + self.mt.ignore_poll_not_min_quiet + self.lt.ignore_poll_not_min_quiet
}
pub fn ignore_rate_cap(&self) -> u8 {
self.st.ignore_rate_cap + self.mt.ignore_rate_cap + self.lt.ignore_rate_cap
}
}
#[derive(Debug)]
@@ -90,6 +116,56 @@ pub struct WriteRtRes {
pub accept: bool,
pub bytes: u32,
pub msp_rewrite: u8,
pub ignore_rewind_time: u8,
pub ignore_same_time: u8,
pub ignore_same_value: u8,
pub ignore_monitor_not_min_quiet: u8,
pub ignore_poll_not_min_quiet: u8,
pub ignore_rate_cap: u8,
}
impl WriteRtRes {
fn ignore_rewind_time() -> Self {
Self {
accept: false,
bytes: 0,
msp_rewrite: 0,
ignore_rewind_time: 1,
ignore_same_time: 0,
ignore_same_value: 0,
ignore_monitor_not_min_quiet: 0,
ignore_poll_not_min_quiet: 0,
ignore_rate_cap: 0,
}
}
fn ignore_same_time() -> Self {
Self {
accept: false,
bytes: 0,
msp_rewrite: 0,
ignore_rewind_time: 0,
ignore_same_time: 1,
ignore_same_value: 0,
ignore_monitor_not_min_quiet: 0,
ignore_poll_not_min_quiet: 0,
ignore_rate_cap: 0,
}
}
fn ignore_same_value() -> Self {
Self {
accept: false,
bytes: 0,
msp_rewrite: 0,
ignore_rewind_time: 0,
ignore_same_time: 0,
ignore_same_value: 1,
ignore_monitor_not_min_quiet: 0,
ignore_poll_not_min_quiet: 0,
ignore_rate_cap: 0,
}
}
}
impl Default for WriteRtRes {
@@ -98,6 +174,12 @@ impl Default for WriteRtRes {
accept: false,
bytes: 0,
msp_rewrite: 0,
ignore_rewind_time: 0,
ignore_same_time: 0,
ignore_same_value: 0,
ignore_monitor_not_min_quiet: 0,
ignore_poll_not_min_quiet: 0,
ignore_rate_cap: 0,
}
}
}
@@ -229,14 +311,14 @@ where
tsev,
tsl
);
res_lt = WriteRtRes::default();
res_mt = WriteRtRes::default();
res_st = WriteRtRes::default();
res_lt = WriteRtRes::ignore_rewind_time();
res_mt = WriteRtRes::ignore_rewind_time();
res_st = WriteRtRes::ignore_rewind_time();
} else if tsev == tsl {
trace_rt_decision!(det, "{} ignore, because same time {:?} {:?}", self.series, tsev, tsl);
res_lt = WriteRtRes::default();
res_mt = WriteRtRes::default();
res_st = WriteRtRes::default();
res_lt = WriteRtRes::ignore_same_time();
res_mt = WriteRtRes::ignore_same_time();
res_st = WriteRtRes::ignore_same_time();
} else if self
.last_insert_val
.as_ref()
@@ -245,9 +327,9 @@ where
== false
{
trace_rt_decision!(det, "{} ignore, because value did not change", self.series);
res_lt = WriteRtRes::default();
res_mt = WriteRtRes::default();
res_st = WriteRtRes::default();
res_lt = WriteRtRes::ignore_same_value();
res_mt = WriteRtRes::ignore_same_value();
res_st = WriteRtRes::ignore_same_value();
} else {
res_lt = Self::write_inner(&mut self.state_lt, item.clone(), ts_net, tsev, &mut iqdqs.lt_rf3_qu)?;
res_mt = Self::write_inner(&mut self.state_mt, item.clone(), ts_net, tsev, &mut iqdqs.mt_rf3_qu)?;
@@ -281,6 +363,12 @@ where
accept: x.accept,
bytes: x.bytes,
msp_rewrite: x.msp_rewrite,
ignore_rewind_time: 0,
ignore_same_time: 0,
ignore_same_value: 0,
ignore_monitor_not_min_quiet: x.ignore_monitor_not_min_quiet,
ignore_poll_not_min_quiet: x.ignore_poll_not_min_quiet,
ignore_rate_cap: x.ignore_rate_cap,
};
Ok(ret)
}

View File

@@ -17,12 +17,6 @@ mod Metrics {
enum counters {
metrics_emit,
metrics_emit_final,
ioid_read_begin,
ioid_read_done,
ioid_read_timeout,
ioid_read_error_exists,
ioid_read_error_not_found,
recv_read_notify_state_passive_found_ioid,
proto_out_push,
logic_error,
poll_fn_begin,
@@ -47,17 +41,26 @@ mod Metrics {
pong_timeout,
caget_timeout,
caget_issued,
monitor_stale_read_timeout,
monitor_stale_read_begin,
unknown_ioid,
monitor_stale_read_begin,
monitor_stale_read_timeout,
ioid_read_error_exists,
ioid_read_begin,
recv_read_notify_ioid_not_found,
recv_read_notify_channel_not_found,
recv_read_notify_channel_unexpected_state,
recv_read_notify_channel_transition,
recv_read_notify_channel_sid_mismatch,
recv_read_notify_poll_wait,
recv_read_notify_poll_idle,
recv_read_notify_monitor_passive,
recv_read_notify_state_read_pending,
recv_read_notify_state_read_pending_bad_ioid,
recv_read_notify_while_polling_idle,
recv_read_notify_while_enabling_monitoring,
no_cid_for_subid,
recv_read_notify_but_no_longer_ready,
recv_read_notify_but_not_init_yet,
recv_event_add_while_wait_on_read_notify,
no_cid_for_subid,
monitoring_read_expected,
monitoring_read_unexpected,
monitoring_read_diff_time,
@@ -65,12 +68,24 @@ mod Metrics {
transition_to_polling,
transition_to_polling_bad_state,
transition_to_polling_already_in,
polling_read_timeout,
unknown_subid,
get_series_id_ok,
channel_add_exists,
ts_msp_reput_onevent,
ts_msp_reput_periodic,
series_writer_on_close,
writer_ignore_rewind_time,
writer_ignore_same_time,
writer_ignore_same_value,
writer_ignore_monitor_not_min_quiet,
writer_ignore_poll_not_min_quiet,
writer_ignore_rate_cap,
}
enum values {
channel_all_count,
channel_alive_count,
channel_not_alive_count,
}
enum histolog2s {
clock_ioc_diff_abs,