diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index e6e8da0..0d2c9c9 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -23,6 +23,8 @@ use futures_util::Stream; use futures_util::StreamExt; use hashbrown::HashMap; use log::*; +use netpod::channelstatus::ChannelStatus; +use netpod::channelstatus::ChannelStatusClosedReason; use netpod::timeunits::*; use netpod::trigger; use netpod::ttl::RetentionTime; @@ -45,8 +47,6 @@ use scywr::iteminsertqueue as scywriiq; use scywr::senderpolling::SenderPolling; use scywriiq::Accounting; use scywriiq::AccountingRecv; -use scywriiq::ChannelStatus; -use scywriiq::ChannelStatusClosedReason; use scywriiq::ChannelStatusItem; use scywriiq::ConnectionStatus; use scywriiq::ConnectionStatusItem; @@ -90,6 +90,7 @@ const DO_RATE_CHECK: bool = false; const MONITOR_POLL_TIMEOUT: Duration = Duration::from_millis(6000); const TIMEOUT_CHANNEL_CLOSING: Duration = Duration::from_millis(8000); const TIMEOUT_PONG_WAIT: Duration = Duration::from_millis(10000); +const READ_CHANNEL_VALUE_STATUS_EMIT_QUIET_MIN: Duration = Duration::from_millis(120000); #[allow(unused)] macro_rules! trace2 { @@ -472,6 +473,7 @@ struct CreatedState { name: String, enum_str_table: Option>, status_emit_count: u64, + ts_recv_value_status_emit_next: Instant, } impl CreatedState { @@ -510,6 +512,7 @@ impl CreatedState { name: String::new(), enum_str_table: None, status_emit_count: 0, + ts_recv_value_status_emit_next: Instant::now(), } } @@ -1599,8 +1602,8 @@ impl CaConn { return Ok(()); }; let dbg_chn = dbg_chn_cid(cid, self); - let ch_s = if let Some(x) = self.channels.get_mut(&cid) { - &mut x.state + let (ch_s, ch_wrst) = if let Some(x) = self.channels.get_mut(&cid) { + (&mut x.state, &mut x.wrst) } else { // TODO return better as error and let caller decide (with more structured errors) warn!("TODO handle_event_add_res can not find channel for {cid:?} {subid:?}"); @@ -1681,6 +1684,7 @@ impl CaConn { Self::event_add_ingest( ev.payload_len, ev.value, + ch_wrst, crst, writer, binwriter, @@ -1709,6 +1713,7 @@ impl CaConn { Self::event_add_ingest( ev.payload_len, ev.value, + ch_wrst, crst, writer, binwriter, @@ -1869,7 +1874,7 @@ impl CaConn { st2.tick = PollTickState::Idle(tsnow); let iqdqs = &mut self.iqdqs; let stats = self.stats.as_ref(); - Self::read_notify_res_for_write(ev, st, iqdqs, stnow, tsnow, stats)?; + Self::read_notify_res_for_write(ev, ch_wrst, st, iqdqs, stnow, tsnow, stats)?; } }, ReadingState::EnableMonitoring(_) => { @@ -1905,7 +1910,7 @@ impl CaConn { } let iqdqs = &mut self.iqdqs; let stats = self.stats.as_ref(); - Self::read_notify_res_for_write(ev, st, iqdqs, stnow, tsnow, stats)?; + Self::read_notify_res_for_write(ev, ch_wrst, st, iqdqs, stnow, tsnow, stats)?; } }, ReadingState::StopMonitoringForPolling(..) => { @@ -1928,6 +1933,7 @@ impl CaConn { fn read_notify_res_for_write( ev: proto::ReadNotifyRes, + wrst: &mut WriterStatus, st: &mut WritableState, iqdqs: &mut InsertDeques, stnow: SystemTime, @@ -1940,6 +1946,7 @@ impl CaConn { Self::event_add_ingest( ev.payload_len, ev.value, + wrst, crst, writer, binwriter, @@ -1954,6 +1961,7 @@ impl CaConn { fn event_add_ingest( payload_len: u32, value: CaEventValue, + wrst: &mut WriterStatus, crst: &mut CreatedState, writer: &mut CaRtWriter, binwriter: &mut BinWriter, @@ -1983,6 +1991,18 @@ impl CaConn { crst.recv_bytes += payload_len as u64; crst.acc_recv.push_written(payload_len); // TODO should attach these counters already to Writable state. + if crst.ts_recv_value_status_emit_next <= tsnow { + crst.ts_recv_value_status_emit_next = tsnow + READ_CHANNEL_VALUE_STATUS_EMIT_QUIET_MIN; + let item = ChannelStatusItem { + ts: stnow, + cssid: crst.cssid, + status: ChannelStatus::MonitoringSilenceReadUnchanged, + }; + let deque = &mut iqdqs.st_rf3_qu; + if wrst.emit_channel_status_item(item, deque).is_err() { + stats.logic_error().inc(); + } + } let ts_local = TsNano::from_system_time(stnow); { let ts = value.ts().ok_or_else(|| Error::MissingTimestamp)?; @@ -2568,6 +2588,7 @@ impl CaConn { name: conf.conf.name().into(), enum_str_table: None, status_emit_count: 0, + ts_recv_value_status_emit_next: Instant::now(), }; if dbg_chn_name(created_state.name()) { info!( diff --git a/netfetch/src/ca/connset.rs b/netfetch/src/ca/connset.rs index e86a0c0..0e941e5 100644 --- a/netfetch/src/ca/connset.rs +++ b/netfetch/src/ca/connset.rs @@ -63,6 +63,7 @@ use std::pin::Pin; use netpod::trigger; use netpod::OnDrop; +use netpod::TsNano; use scywr::insertqueues::InsertQueuesTx; use series::SeriesId; use std::sync::Arc; @@ -625,12 +626,25 @@ impl CaConnSet { self.cssid_latency_max = dt + Duration::from_millis(2000); debug!("slow cssid fetch dt {:.0} ms {:?}", 1e3 * dt.as_secs_f32(), cmd); } - let writer_status = serieswriter::writer::SeriesWriter::new(SeriesId::new(cmd.cssid.id())) + let mut writer_status = serieswriter::writer::SeriesWriter::new(SeriesId::new(cmd.cssid.id())) .map_err(|e| Error::with_msg_no_trace(e.to_string()))?; - let writer_status_state = serieswriter::fixgridwriter::ChannelStatusWriteState::new( + let mut writer_status_state = serieswriter::fixgridwriter::ChannelStatusWriteState::new( SeriesId::new(cmd.cssid.id()), serieswriter::fixgridwriter::CHANNEL_STATUS_GRID, ); + { + let status = netpod::channelstatus::ChannelStatus::HaveStatusId; + let stnow = SystemTime::now(); + let ts = TsNano::from_system_time(stnow); + let item = serieswriter::fixgridwriter::ChannelStatusWriteValue::new(ts, status.to_u64()); + let state = &mut writer_status_state; + let ts_net = Instant::now(); + let mut deque = VecDeque::new(); + writer_status + .write(item, state, ts_net, ts, &mut deque) + .map_err(Error::from_string)?; + self.storage_insert_queue.push_back(deque); + } *chst2 = ActiveChannelState::WithStatusSeriesId(WithStatusSeriesIdState { cssid: cmd.cssid, addr_find_backoff: 0, @@ -679,12 +693,25 @@ impl CaConnSet { trace!("handle_add_channel_with_addr INNER {cmd:?}"); self.stats.handle_add_channel_with_addr().inc(); let tsnow = SystemTime::now(); - let writer_status = serieswriter::writer::SeriesWriter::new(SeriesId::new(cmd.cssid.id())) + let mut writer_status = serieswriter::writer::SeriesWriter::new(SeriesId::new(cmd.cssid.id())) .map_err(|e| Error::with_msg_no_trace(e.to_string()))?; - let writer_status_state = serieswriter::fixgridwriter::ChannelStatusWriteState::new( + let mut writer_status_state = serieswriter::fixgridwriter::ChannelStatusWriteState::new( SeriesId::new(cmd.cssid.id()), serieswriter::fixgridwriter::CHANNEL_STATUS_GRID, ); + { + let status = netpod::channelstatus::ChannelStatus::HaveAddress; + let stnow = SystemTime::now(); + let ts = TsNano::from_system_time(stnow); + let item = serieswriter::fixgridwriter::ChannelStatusWriteValue::new(ts, status.to_u64()); + let state = &mut writer_status_state; + let ts_net = Instant::now(); + let mut deque = VecDeque::new(); + writer_status + .write(item, state, ts_net, ts, &mut deque) + .map_err(Error::from_string)?; + self.storage_insert_queue.push_back(deque); + } *st3 = WithStatusSeriesIdState { cssid: cmd.cssid.clone(), addr_find_backoff: 0, @@ -1657,13 +1684,13 @@ impl Stream for CaConnSet { if self.storage_insert_sender.is_idle() { if let Some(item) = self.storage_insert_queue.pop_front() { - self.stats.logic_error().inc(); self.storage_insert_sender.as_mut().send_pin(item); } } if self.storage_insert_sender.is_sending() { match self.storage_insert_sender.poll_unpin(cx) { Ready(Ok(())) => { + self.stats.storage_insert_queue_send().inc(); have_progress = true; } Ready(Err(_)) => { diff --git a/netfetch/src/ca/proto.rs b/netfetch/src/ca/proto.rs index 62be960..c6ac720 100644 --- a/netfetch/src/ca/proto.rs +++ b/netfetch/src/ca/proto.rs @@ -638,7 +638,12 @@ impl CaMsgTy { AccessRightsRes(_) => {} EventAdd(_) => { // Using flags DBE_ARCHIVE, DBE_ALARM, DBE_PROPERTY. - let flags = 0b1110; + let dbe_value = 0x01; + let dbe_log = 0x02; + let dbe_alarm = 0x04; + let dbe_property = 0x08; + let _ = dbe_value | dbe_property; + let flags = dbe_log | dbe_alarm; buf.copy_from_slice(&[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, flags, 0, 0]); } EventAddRes(_) => {} diff --git a/netfetch/src/lib.rs b/netfetch/src/lib.rs index 01a6d9f..66044e5 100644 --- a/netfetch/src/lib.rs +++ b/netfetch/src/lib.rs @@ -6,6 +6,7 @@ pub mod linuxhelper; pub mod metrics; pub mod netbuf; pub mod polltimer; +pub mod ratelimit; pub mod rt; #[cfg(test)] pub mod test; diff --git a/netfetch/src/ratelimit.rs b/netfetch/src/ratelimit.rs new file mode 100644 index 0000000..24fe614 --- /dev/null +++ b/netfetch/src/ratelimit.rs @@ -0,0 +1,23 @@ +use std::time::Duration; +use std::time::Instant; + +pub struct RateLimit { + last: Instant, + dtmin: Duration, +} + +impl RateLimit { + pub fn new(dtmin: Duration) -> Self { + let last = Instant::now().checked_sub(2 * dtmin).unwrap(); + Self { last, dtmin } + } + + pub fn trigger(&mut self, tsnow: Instant) -> bool { + if self.last + self.dtmin <= tsnow { + self.last = tsnow; + true + } else { + false + } + } +} diff --git a/scywr/src/iteminsertqueue.rs b/scywr/src/iteminsertqueue.rs index 72f43a9..a43ae0a 100644 --- a/scywr/src/iteminsertqueue.rs +++ b/scywr/src/iteminsertqueue.rs @@ -7,6 +7,8 @@ use err::thiserror; use err::ThisError; use futures_util::Future; use futures_util::FutureExt; +use netpod::channelstatus::ChannelStatus; +use netpod::channelstatus::ChannelStatusClosedReason; use netpod::DtNano; use netpod::Shape; use netpod::TsMs; @@ -408,94 +410,6 @@ pub struct ConnectionStatusItem { pub status: ConnectionStatus, } -#[derive(Debug, Clone)] -pub enum ChannelStatusClosedReason { - ShutdownCommand, - ChannelRemove, - ProtocolError, - FrequencyQuota, - BandwidthQuota, - InternalError, - IocTimeout, - NoProtocol, - ProtocolDone, - ConnectFail, - IoError, -} - -#[derive(Debug, Clone)] -pub enum ChannelStatus { - AssignedToAddress, - Opened, - Closed(ChannelStatusClosedReason), - Pong, - MonitoringSilenceReadStart, - MonitoringSilenceReadTimeout, - MonitoringSilenceReadUnchanged, -} - -impl ChannelStatus { - pub fn to_kind(&self) -> u32 { - use ChannelStatus::*; - use ChannelStatusClosedReason::*; - match self { - AssignedToAddress => 24, - Opened => 1, - Closed(x) => match x { - ShutdownCommand => 2, - ChannelRemove => 3, - ProtocolError => 4, - FrequencyQuota => 5, - BandwidthQuota => 6, - InternalError => 7, - IocTimeout => 8, - NoProtocol => 9, - ProtocolDone => 10, - ConnectFail => 11, - IoError => 12, - }, - Pong => 25, - MonitoringSilenceReadStart => 26, - MonitoringSilenceReadTimeout => 27, - MonitoringSilenceReadUnchanged => 28, - } - } - - pub fn from_kind(kind: u32) -> Result { - use ChannelStatus::*; - use ChannelStatusClosedReason::*; - let ret = match kind { - 1 => Opened, - 2 => Closed(ShutdownCommand), - 3 => Closed(ChannelRemove), - 4 => Closed(ProtocolError), - 5 => Closed(FrequencyQuota), - 6 => Closed(BandwidthQuota), - 7 => Closed(InternalError), - 8 => Closed(IocTimeout), - 9 => Closed(NoProtocol), - 10 => Closed(ProtocolDone), - 11 => Closed(ConnectFail), - 12 => Closed(IoError), - 24 => AssignedToAddress, - 25 => Pong, - 26 => MonitoringSilenceReadStart, - 27 => MonitoringSilenceReadTimeout, - 28 => MonitoringSilenceReadUnchanged, - _ => { - return Err(err::Error::with_msg_no_trace(format!( - "unknown ChannelStatus kind {kind}" - ))); - } - }; - Ok(ret) - } - - pub fn to_u64(&self) -> u64 { - self.to_kind() as u64 - } -} - #[derive(Debug, Clone)] pub enum ShutdownReason { ConnectRefused, diff --git a/stats/src/stats.rs b/stats/src/stats.rs index 25e37c3..d5dd060 100644 --- a/stats/src/stats.rs +++ b/stats/src/stats.rs @@ -424,6 +424,7 @@ stats_proc::stats_struct!(( handle_add_channel_with_addr, create_ca_conn, command_reply_fail, + storage_insert_queue_send, ), values( storage_insert_queue_len,