From a715f7f9c71ffd1164b0485c66bd0f647f328dbe Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Thu, 22 Aug 2024 16:33:05 +0200 Subject: [PATCH] Status event on silent monitored channel --- daqingest/Cargo.toml | 2 +- netfetch/src/ca/conn.rs | 62 ++++++++++++++++++++--------- netfetch/src/ca/connset.rs | 5 ++- netfetch/src/metrics/ingest.rs | 11 ++++- scywr/src/iteminsertqueue.rs | 9 +++++ serieswriter/src/fixgridwriter.rs | 3 +- serieswriter/src/ratelimitwriter.rs | 54 +++++++++++++++++++------ serieswriter/src/rtwriter.rs | 29 +++++++++----- serieswriter/src/writer.rs | 30 ++++++++++---- 9 files changed, 152 insertions(+), 53 deletions(-) diff --git a/daqingest/Cargo.toml b/daqingest/Cargo.toml index 58d33a3..740444d 100644 --- a/daqingest/Cargo.toml +++ b/daqingest/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "daqingest" -version = "0.2.2" +version = "0.2.3-aa.1" authors = ["Dominik Werder "] edition = "2021" diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index 3e852c5..e6e8da0 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -42,17 +42,17 @@ use scywr::insertqueues::InsertDeques; use scywr::insertqueues::InsertQueuesTx; use scywr::insertqueues::InsertSenderPolling; use scywr::iteminsertqueue as scywriiq; -use scywr::iteminsertqueue::Accounting; -use scywr::iteminsertqueue::AccountingRecv; -use scywr::iteminsertqueue::MspItem; -use scywr::iteminsertqueue::QueryItem; -use scywr::iteminsertqueue::ShutdownReason; use scywr::senderpolling::SenderPolling; +use scywriiq::Accounting; +use scywriiq::AccountingRecv; use scywriiq::ChannelStatus; use scywriiq::ChannelStatusClosedReason; use scywriiq::ChannelStatusItem; use scywriiq::ConnectionStatus; use scywriiq::ConnectionStatusItem; +use scywriiq::MspItem; +use scywriiq::QueryItem; +use scywriiq::ShutdownReason; use serde::Serialize; use series::ChannelStatusSeriesId; use series::SeriesId; @@ -695,11 +695,13 @@ impl WriterStatus { item: ChannelStatusItem, deque: &mut VecDeque, ) -> Result<(), Error> { + let tsev = TsNano::from_system_time(SystemTime::now()); let (ts, val) = item.to_ts_val(); self.writer_status.write( serieswriter::fixgridwriter::ChannelStatusWriteValue::new(ts, val), &mut self.writer_status_state, Instant::now(), + tsev, deque, )?; Ok(()) @@ -1831,8 +1833,8 @@ impl CaConn { } } else { if let Some(cid) = self.read_ioids.get(&ioid) { - let ch_s = if let Some(x) = self.channels.get_mut(cid) { - &mut x.state + let (ch_s, ch_wrst) = if let Some(x) = self.channels.get_mut(cid) { + (&mut x.state, &mut x.wrst) } else { warn!("handle_read_notify_res can not find channel for {cid:?} {ioid:?}"); return Ok(()); @@ -1893,6 +1895,14 @@ impl CaConn { } self.read_ioids.remove(&ioid); st2.mon2state = Monitoring2State::Passive(Monitoring2PassiveState { tsbeg: tsnow }); + { + let item = ChannelStatusItem { + ts: self.tmp_ts_poll, + cssid: st.channel.cssid.clone(), + status: ChannelStatus::MonitoringSilenceReadUnchanged, + }; + ch_wrst.emit_channel_status_item(item, &mut self.iqdqs.st_rf3_qu)?; + } let iqdqs = &mut self.iqdqs; let stats = self.stats.as_ref(); Self::read_notify_res_for_write(ev, st, iqdqs, stnow, tsnow, stats)?; @@ -1973,14 +1983,14 @@ impl CaConn { crst.recv_bytes += payload_len as u64; crst.acc_recv.push_written(payload_len); // TODO should attach these counters already to Writable state. - let ts_local = { - let epoch = stnow.duration_since(std::time::UNIX_EPOCH).unwrap_or(Duration::ZERO); - epoch.as_secs() * SEC + epoch.subsec_nanos() as u64 - }; - let ts = value.ts().ok_or_else(|| Error::MissingTimestamp)?; - let ts_diff = ts.abs_diff(ts_local); - stats.ca_ts_off().ingest((ts_diff / MS) as u32); + let ts_local = TsNano::from_system_time(stnow); { + let ts = value.ts().ok_or_else(|| Error::MissingTimestamp)?; + let ts_diff = ts.abs_diff(ts_local.ns()); + stats.ca_ts_off().ingest((ts_diff / MS) as u32); + } + { + let evts = ts_local; Self::check_ev_value_data(&value.data, &writer.scalar_type())?; crst.muted_before = 0; crst.insert_item_ivl_ema.tick(tsnow); @@ -1988,7 +1998,7 @@ impl CaConn { // let ts_local = TsNano::from_ns(ts_local); // binwriter.ingest(ts_ioc, ts_local, &val, iqdqs)?; { - let wres = writer.write(CaWriterValue::new(value, crst), tsnow, iqdqs)?; + let wres = writer.write(CaWriterValue::new(value, crst), tsnow, evts, iqdqs)?; crst.status_emit_count += wres.nstatus() as u64; if wres.st.accept { crst.dw_st_last = stnow; @@ -2186,6 +2196,14 @@ impl CaConn { self.proto.as_mut().ok_or_else(|| Error::NoProtocol)?.push_out(msg); st3.mon2state = Monitoring2State::ReadPending(ioid, tsnow); self.stats.caget_issued().inc(); + { + let item = ChannelStatusItem { + ts: self.tmp_ts_poll, + cssid: st2.channel.cssid.clone(), + status: ChannelStatus::MonitoringSilenceReadStart, + }; + conf.wrst.emit_channel_status_item(item, &mut self.iqdqs.st_rf3_qu)?; + } } } Monitoring2State::ReadPending(ioid, since) => { @@ -2200,6 +2218,14 @@ impl CaConn { name, ioid ); + { + let item = ChannelStatusItem { + ts: self.tmp_ts_poll, + cssid: st2.channel.cssid.clone(), + status: ChannelStatus::MonitoringSilenceReadTimeout, + }; + conf.wrst.emit_channel_status_item(item, &mut self.iqdqs.st_rf3_qu)?; + } if false { // Here we try to close the channel at hand. @@ -3392,6 +3418,7 @@ impl EmittableType for CaWriterValue { fn into_query_item( mut self, ts_net: Instant, + tsev: TsNano, state: &mut ::State, ) -> serieswriter::writer::EmitRes { let mut items = serieswriter::writer::SmallVec::new(); @@ -3409,10 +3436,7 @@ impl EmittableType for CaWriterValue { }, None => true, }; - let ts = TsNano::from_ns(self.0.ts().unwrap()); - if let Some(ts) = self.0.ts() { - state.last_accepted_ts = TsNano::from_ns(ts); - } + let ts = tsev; state.last_accepted_val = Some(self.clone()); let byte_size = self.byte_size(); if diff_data { diff --git a/netfetch/src/ca/connset.rs b/netfetch/src/ca/connset.rs index 7b466a3..e86a0c0 100644 --- a/netfetch/src/ca/connset.rs +++ b/netfetch/src/ca/connset.rs @@ -1384,15 +1384,16 @@ impl CaConnSet { MaybeWrongAddressState::new(stnow, st3.addr_find_backoff), ); let item = ChannelStatusItem::new_closed_conn_timeout(stnow, st3.cssid.clone()); - let (ts, val) = item.to_ts_val(); + let (tsev, val) = item.to_ts_val(); let deque = &mut item_deque; st3.writer_status .as_mut() .unwrap() .write( - serieswriter::fixgridwriter::ChannelStatusWriteValue::new(ts, val), + serieswriter::fixgridwriter::ChannelStatusWriteValue::new(tsev, val), st3.writer_status_state.as_mut().unwrap(), tsnow, + tsev, deque, ) .map_err(|e| Error::with_msg_no_trace(e.to_string()))?; diff --git a/netfetch/src/metrics/ingest.rs b/netfetch/src/metrics/ingest.rs index 4c4c988..a90cbe2 100644 --- a/netfetch/src/metrics/ingest.rs +++ b/netfetch/src/metrics/ingest.rs @@ -90,6 +90,7 @@ impl EmittableType for WritableType { fn into_query_item( self, ts_net: Instant, + tsev: TsNano, state: &mut ::State, ) -> serieswriter::writer::EmitRes { todo!() @@ -317,13 +318,16 @@ where .map_err(|_| Error::Decode)?; let evs: EventsDim0 = evs.into(); trace_input!("see events {:?}", evs); + warn!("TODO require timestamp in input format"); + let stnow = SystemTime::now(); + let tsev = TsNano::from_system_time(stnow); let tsnow = Instant::now(); let mut emit_state = (); for (i, (&ts, val)) in evs.tss.iter().zip(evs.values.iter()).enumerate() { let val = val.clone(); trace_input!("ev {:6} {:20} {:20?}", i, ts, val); let val = f1(val); - writer.write(WritableType(val), &mut emit_state, tsnow, deque)?; + writer.write(WritableType(val), &mut emit_state, tsnow, tsev, deque)?; } Ok(()) } @@ -345,13 +349,16 @@ where .map_err(|_| Error::Decode)?; let evs: EventsDim1 = evs.into(); trace_input!("see events {:?}", evs); + warn!("TODO require timestamp in input format"); + let stnow = SystemTime::now(); + let tsev = TsNano::from_system_time(stnow); let tsnow = Instant::now(); let mut emit_state = (); for (i, (&ts, val)) in evs.tss.iter().zip(evs.values.iter()).enumerate() { let val = val.clone(); trace_input!("ev {:6} {:20} {:20?}", i, ts, val); let val = f1(val); - writer.write(WritableType(val), &mut emit_state, tsnow, deque)?; + writer.write(WritableType(val), &mut emit_state, tsnow, tsev, deque)?; } Ok(()) } diff --git a/scywr/src/iteminsertqueue.rs b/scywr/src/iteminsertqueue.rs index 99e3be4..72f43a9 100644 --- a/scywr/src/iteminsertqueue.rs +++ b/scywr/src/iteminsertqueue.rs @@ -429,6 +429,9 @@ pub enum ChannelStatus { Opened, Closed(ChannelStatusClosedReason), Pong, + MonitoringSilenceReadStart, + MonitoringSilenceReadTimeout, + MonitoringSilenceReadUnchanged, } impl ChannelStatus { @@ -452,6 +455,9 @@ impl ChannelStatus { IoError => 12, }, Pong => 25, + MonitoringSilenceReadStart => 26, + MonitoringSilenceReadTimeout => 27, + MonitoringSilenceReadUnchanged => 28, } } @@ -473,6 +479,9 @@ impl ChannelStatus { 12 => Closed(IoError), 24 => AssignedToAddress, 25 => Pong, + 26 => MonitoringSilenceReadStart, + 27 => MonitoringSilenceReadTimeout, + 28 => MonitoringSilenceReadUnchanged, _ => { return Err(err::Error::with_msg_no_trace(format!( "unknown ChannelStatus kind {kind}" diff --git a/serieswriter/src/fixgridwriter.rs b/serieswriter/src/fixgridwriter.rs index b5e39b9..5c516d3 100644 --- a/serieswriter/src/fixgridwriter.rs +++ b/serieswriter/src/fixgridwriter.rs @@ -43,10 +43,11 @@ impl EmittableType for ChannelStatusWriteValue { fn into_query_item( self, ts_net: Instant, + tsev: TsNano, state: &mut ::State, ) -> serieswriter::writer::EmitRes { let mut items = serieswriter::writer::SmallVec::new(); - let ts = self.ts(); + let ts = tsev; state.last_accepted_ts = ts; state.last_accepted_val = Some(self.1); let byte_size = self.byte_size(); diff --git a/serieswriter/src/ratelimitwriter.rs b/serieswriter/src/ratelimitwriter.rs index 7bb124c..510b888 100644 --- a/serieswriter/src/ratelimitwriter.rs +++ b/serieswriter/src/ratelimitwriter.rs @@ -15,8 +15,8 @@ use std::time::Instant; #[allow(unused)] macro_rules! trace_rt_decision { - ($($arg:tt)*) => { - if true { + ($det:expr, $($arg:tt)*) => { + if $det { trace!($($arg)*); } }; @@ -46,6 +46,7 @@ where last_insert_val: Option, dbgname: String, writer: SeriesWriter, + do_trace_detail: bool, _t1: PhantomData, } @@ -68,30 +69,55 @@ where last_insert_val: None, dbgname, writer, + do_trace_detail: netpod::TRACE_SERIES_ID.contains(&series.id()), _t1: PhantomData, }; + if ret.do_trace_detail { + debug!("debug test for detail series"); + trace!("trace test for detail series"); + } Ok(ret) } - pub fn write(&mut self, item: ET, ts_net: Instant, deque: &mut VecDeque) -> Result { + pub fn write( + &mut self, + item: ET, + ts_net: Instant, + tsev: TsNano, + deque: &mut VecDeque, + ) -> Result { // Decide whether we want to write. // TODO catch already in CaConn the cases when the IOC-timestamp did not change. + let det = self.do_trace_detail; let tsl = self.last_insert_ts.clone(); let dbgname = &self.dbgname; let sid = &self.series; + let min_quiet = 1000 * self.min_quiet.as_secs() + self.min_quiet.subsec_millis() as u64; + let ts = tsev; + if false { + trace_rt_decision!( + det, + "{dbgname} {sid} min_quiet {min_quiet:?} ts1 {ts1:?} ts2 {ts2:?} item {item:?}", + ts1 = ts.ms(), + ts2 = tsl.ms(), + item = item, + ); + } let do_write = { - let ts = item.ts(); if ts == tsl { - trace_rt_decision!("{dbgname} {sid} ignore, because same time {ts:?} {tsl:?}"); + trace_rt_decision!(det, "{dbgname} {sid} ignore, because same time {ts:?} {tsl:?}"); false } else if ts < tsl { - trace_rt_decision!("{dbgname} {sid} ignore, because ts_local rewind {ts:?} {tsl:?}"); + trace_rt_decision!( + det, + "{dbgname} {sid} ignore, because ts_local rewind {ts:?} {tsl:?}", + ); false - } else if ts.ms() < tsl.ms() + 1000 * self.min_quiet.as_secs() { - trace_rt_decision!("{dbgname} {sid} ignore, because not min quiet {ts:?} {tsl:?}"); + } else if ts.ms() < tsl.ms() + min_quiet { + trace_rt_decision!(det, "{dbgname} {sid} ignore, because not min quiet {ts:?} {tsl:?}"); false } else if ts < tsl.add_dt_nano(DtNano::from_ms(5)) { - trace_rt_decision!("{dbgname} {sid} ignore, because store rate cap"); + trace_rt_decision!(det, "{dbgname} {sid} ignore, because store rate cap"); false } else if self .last_insert_val @@ -99,15 +125,19 @@ where .map(|k| !item.has_change(k)) .unwrap_or(false) { - trace_rt_decision!("{dbgname} {sid} ignore, because value did not change"); + trace_rt_decision!(det, "{dbgname} {sid} ignore, because value did not change"); false } else { - trace_rt_decision!("{dbgname} {sid} accept"); + trace_rt_decision!(det, "{dbgname} {sid} accept"); + if true { + self.last_insert_val = Some(item.clone()); + } + self.last_insert_ts = ts.clone(); true } }; if do_write { - let res = self.writer.write(item, &mut self.emit_state, ts_net, deque)?; + let res = self.writer.write(item, &mut self.emit_state, ts_net, ts, deque)?; let ret = WriteRes { accept: true, bytes: res.bytes, diff --git a/serieswriter/src/rtwriter.rs b/serieswriter/src/rtwriter.rs index 7a713d8..e10f85a 100644 --- a/serieswriter/src/rtwriter.rs +++ b/serieswriter/src/rtwriter.rs @@ -5,6 +5,7 @@ use err::ThisError; use netpod::log::*; use netpod::ScalarType; use netpod::Shape; +use netpod::TsNano; use scywr::insertqueues::InsertDeques; use scywr::iteminsertqueue::QueryItem; use series::SeriesId; @@ -13,9 +14,9 @@ use std::time::Duration; use std::time::Instant; #[allow(unused)] -macro_rules! trace_ { - ($($arg:tt)*) => { - if false { +macro_rules! trace_emit { + ($det:expr, $($arg:tt)*) => { + if $det { trace!($($arg)*); } }; @@ -76,6 +77,7 @@ where state_mt: State, state_lt: State, min_quiets: MinQuiets, + do_trace_detail: bool, } impl RtWriter @@ -110,6 +112,7 @@ where state_mt, state_lt, min_quiets, + do_trace_detail: netpod::TRACE_SERIES_ID.contains(&series.id()), }; Ok(ret) } @@ -130,14 +133,21 @@ where self.min_quiets.clone() } - pub fn write(&mut self, item: ET, ts_net: Instant, iqdqs: &mut InsertDeques) -> Result { - trace!("write {:?}", item.ts()); + pub fn write( + &mut self, + item: ET, + ts_net: Instant, + tsev: TsNano, + iqdqs: &mut InsertDeques, + ) -> Result { + let det = self.do_trace_detail; + trace_emit!(det, "write {:?}", item.ts()); // TODO // Optimize for the common case that we only write into one of the stores. // Make the decision first, based on ref, then clone only as required. - let res_st = Self::write_inner(&mut self.state_st, item.clone(), ts_net, &mut iqdqs.st_rf3_qu)?; - let res_mt = Self::write_inner(&mut self.state_mt, item.clone(), ts_net, &mut iqdqs.mt_rf3_qu)?; - let res_lt = Self::write_inner(&mut self.state_lt, item, ts_net, &mut iqdqs.lt_rf3_qu)?; + let res_st = Self::write_inner(&mut self.state_st, item.clone(), ts_net, tsev, &mut iqdqs.st_rf3_qu)?; + let res_mt = Self::write_inner(&mut self.state_mt, item.clone(), ts_net, tsev, &mut iqdqs.mt_rf3_qu)?; + let res_lt = Self::write_inner(&mut self.state_lt, item, ts_net, tsev, &mut iqdqs.lt_rf3_qu)?; let ret = WriteRes { st: WriteRtRes { accept: res_st.accept, @@ -162,9 +172,10 @@ where state: &mut State, item: ET, ts_net: Instant, + tsev: TsNano, deque: &mut VecDeque, ) -> Result { - Ok(state.writer.write(item, ts_net, deque)?) + Ok(state.writer.write(item, ts_net, tsev, deque)?) } pub fn tick(&mut self, iqdqs: &mut InsertDeques) -> Result<(), Error> { diff --git a/serieswriter/src/writer.rs b/serieswriter/src/writer.rs index f3ff336..ab426cf 100644 --- a/serieswriter/src/writer.rs +++ b/serieswriter/src/writer.rs @@ -11,6 +11,15 @@ use std::time::Instant; use core::fmt; pub use smallvec::SmallVec; +#[allow(unused)] +macro_rules! trace_emit { + ($det:expr, $($arg:tt)*) => { + if $det { + trace!($($arg)*); + } + }; +} + #[derive(Debug)] pub struct EmitRes { pub items: SmallVec<[QueryItem; 4]>, @@ -23,7 +32,7 @@ pub trait EmittableType: fmt::Debug + Clone { fn ts(&self) -> TsNano; fn has_change(&self, k: &Self) -> bool; fn byte_size(&self) -> u32; - fn into_query_item(self, ts_net: Instant, state: &mut ::State) -> EmitRes; + fn into_query_item(self, ts_net: Instant, tsev: TsNano, state: &mut ::State) -> EmitRes; } #[derive(Debug, ThisError)] @@ -60,7 +69,8 @@ pub struct WriteRes { #[derive(Debug)] pub struct SeriesWriter { - sid: SeriesId, + series: SeriesId, + do_trace_detail: bool, _t1: PhantomData, } @@ -68,13 +78,17 @@ impl SeriesWriter where ET: EmittableType, { - pub fn new(sid: SeriesId) -> Result { - let res = Self { sid, _t1: PhantomData }; + pub fn new(series: SeriesId) -> Result { + let res = Self { + series, + do_trace_detail: netpod::TRACE_SERIES_ID.contains(&series.id()), + _t1: PhantomData, + }; Ok(res) } pub fn sid(&self) -> SeriesId { - self.sid.clone() + self.series.clone() } pub fn write( @@ -82,11 +96,13 @@ where item: ET, state: &mut ::State, ts_net: Instant, + tsev: TsNano, deque: &mut VecDeque, ) -> Result { + let det = self.do_trace_detail; let ts_main = item.ts(); - let res = item.into_query_item(ts_net, state); - trace!("emit value for ts {:?} items len {}", ts_main, res.items.len()); + let res = item.into_query_item(ts_net, tsev, state); + trace_emit!(det, "emit value for ts {:?} items len {}", ts_main, res.items.len()); for item in res.items { deque.push_back(item); }