From 62b362800397149e7cfe0e00c453c615f8d0b480 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Wed, 24 Jul 2024 21:50:28 +0200 Subject: [PATCH] Emit pong channel status event, and in status metrics --- netfetch/src/ca/conn.rs | 43 ++++- netfetch/src/metrics.rs | 7 + netfetch/src/metrics/status.rs | 284 ++++++++++++--------------------- scywr/src/iteminsertqueue.rs | 3 + 4 files changed, 152 insertions(+), 185 deletions(-) diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index 34f3d97..6c1ceaf 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -200,6 +200,8 @@ pub struct ChannelStateInfo { pub addr: SocketAddrV4, pub series: Option, pub channel_connected_info: ChannelConnectedInfo, + pub ping_last: Option, + pub pong_last: Option, pub scalar_type: Option, pub shape: Option, // NOTE: this solution can yield to the same Instant serialize to different string representations. @@ -551,6 +553,7 @@ impl ChannelState { addr: SocketAddrV4, conf: ChannelConfig, stnow: SystemTime, + conn: &CaConn, ) -> ChannelStateInfo { let channel_connected_info = match self { ChannelState::Init(..) => ChannelConnectedInfo::Disconnected, @@ -626,6 +629,8 @@ impl ChannelState { addr, series, channel_connected_info, + ping_last: conn.ioc_ping_last, + pong_last: conn.ioc_pong_last, scalar_type, shape, ts_created, @@ -962,9 +967,10 @@ pub struct CaConn { conn_command_rx: Pin>>, conn_backoff: f32, conn_backoff_beg: f32, - ioc_ping_last: Instant, ioc_ping_next: Instant, ioc_ping_start: Option, + ioc_ping_last: Option, + ioc_pong_last: Option, iqsp: Pin>, ca_conn_event_out_queue: VecDeque, ca_conn_event_out_queue_max: usize, @@ -1029,9 +1035,10 @@ impl CaConn { conn_command_rx: Box::pin(cq_rx), conn_backoff: 0.02, conn_backoff_beg: 0.02, - ioc_ping_last: tsnow, ioc_ping_next: tsnow + Self::ioc_ping_ivl_rng(&mut rng), ioc_ping_start: None, + ioc_ping_last: None, + ioc_pong_last: None, iqsp: Box::pin(InsertSenderPolling::new(iqtx)), ca_conn_event_out_queue: VecDeque::new(), ca_conn_event_out_queue_max: 2000, @@ -1055,7 +1062,7 @@ impl CaConn { } fn channel_status_emit_ivl(rng: &mut Xoshiro128PlusPlus) -> Duration { - Duration::from_millis(6000 + (rng.next_u32() & 0x7ff) as u64) + Duration::from_millis(8000 + (rng.next_u32() & 0xfff) as u64) } fn new_self_ticker() -> Pin> { @@ -2267,6 +2274,7 @@ impl CaConn { if let Some(proto) = &mut self.proto { self.stats.ping_start().inc(); self.ioc_ping_start = Some(tsnow); + self.ioc_ping_last = Some(self.tmp_ts_poll); let msg = CaMsg::from_ty_ts(CaMsgTy::Echo, tsnow); proto.push_out(msg); } else { @@ -2371,9 +2379,10 @@ impl CaConn { let addr = &self.remote_addr_dbg; warn!("received Echo even though we didn't asked for it {addr:?}"); } - self.ioc_ping_last = tsnow; + self.ioc_pong_last = Some(self.tmp_ts_poll); self.ioc_ping_next = tsnow + Self::ioc_ping_ivl_rng(&mut self.rng); self.ioc_ping_start = None; + self.emit_channel_event_pong(); } CaMsgTy::CreateChanFail(msg) => { // TODO @@ -2790,7 +2799,7 @@ impl CaConn { let mut channel_statuses = BTreeMap::new(); for (_, conf) in self.channels.iter() { let chst = &conf.state; - let chinfo = chst.to_info(chst.cssid(), self.remote_addr_dbg, conf.conf.clone(), stnow); + let chinfo = chst.to_info(chst.cssid(), self.remote_addr_dbg, conf.conf.clone(), stnow, self); channel_statuses.insert(chst.cssid(), chinfo); } // trace2!("{:?}", channel_statuses); @@ -2866,6 +2875,30 @@ impl CaConn { Ok(()) } + fn emit_channel_event_pong(&mut self) { + for (cid, ch) in self.channels.iter() { + match &ch.state { + ChannelState::Init(_) => {} + ChannelState::Creating(_) => {} + ChannelState::FetchEnumDetails(_) => {} + ChannelState::FetchCaStatusSeries(_) => {} + ChannelState::MakingSeriesWriter(_) => {} + ChannelState::Writable(st1) => { + let item = ChannelStatusItem { + ts: self.tmp_ts_poll, + cssid: st1.channel.cssid, + status: ChannelStatus::Pong, + }; + let item = QueryItem::ChannelStatus(item); + self.iqdqs.st_rf3_rx.push_back(item); + } + ChannelState::Closing(_) => {} + ChannelState::Error(_) => {} + ChannelState::Ended(_) => {} + } + } + } + fn tick_writers(&mut self) -> Result<(), Error> { for (_, chconf) in &mut self.channels { let chst = &mut chconf.state; diff --git a/netfetch/src/metrics.rs b/netfetch/src/metrics.rs index 66fb42e..3d96be5 100644 --- a/netfetch/src/metrics.rs +++ b/netfetch/src/metrics.rs @@ -382,6 +382,13 @@ fn make_routes( "/channel", Router::new() .fallback(|| async { axum::Json(json!({"subcommands":["states"]})) }) + .route( + "/error_handler_test", + get({ + let tx = connset_cmd_tx.clone(); + |Query(params): Query>| status::error_handler_test() + }), + ) .route( "/states", get({ diff --git a/netfetch/src/metrics/status.rs b/netfetch/src/metrics/status.rs index 7aec7d8..0bd1370 100644 --- a/netfetch/src/metrics/status.rs +++ b/netfetch/src/metrics/status.rs @@ -5,12 +5,20 @@ use crate::conf::ChannelConfig; use async_channel::Sender; use chrono::DateTime; use chrono::Utc; +use err::thiserror; +use err::ThisError; use serde::Serialize; use std::collections::BTreeMap; use std::collections::HashMap; use std::net::SocketAddr; use std::time::SystemTime; +#[derive(Debug, ThisError)] +#[cstm(name = "StatusError")] +pub enum Error { + Internal, +} + #[derive(Debug, Serialize)] pub struct ChannelStates { running_since: DateTime, @@ -66,9 +74,65 @@ struct ChannelState { write_lt_last: SystemTime, #[serde(with = "humantime_serde", skip_serializing_if = "system_time_epoch")] updated: SystemTime, + #[serde(with = "humantime_serde")] + pong_last: Option, private: StatePrivate, } +impl ChannelState { + fn connecting(config: ChannelConfig) -> Self { + Self::connecting_addr(config, None, ConnectionState::Connecting) + } + + fn connecting_addr(config: ChannelConfig, ioc_address: Option, connst: ConnectionState) -> Self { + Self { + ioc_address, + connection: connst, + archiving_configuration: config, + recv_count: 0, + recv_bytes: 0, + recv_last: SystemTime::UNIX_EPOCH, + write_st_last: SystemTime::UNIX_EPOCH, + write_mt_last: SystemTime::UNIX_EPOCH, + write_lt_last: SystemTime::UNIX_EPOCH, + updated: SystemTime::UNIX_EPOCH, + pong_last: None, + private: StatePrivate::default(), + } + } + + fn with_chst(config: ChannelConfig, chst: crate::ca::conn::ChannelStateInfo) -> Self { + let private = StatePrivate { + status_emit_count: chst.status_emit_count, + }; + let connst = { + use crate::ca::conn::ChannelConnectedInfo::*; + match chst.channel_connected_info { + Disconnected => ConnectionState::Disconnected, + Connecting => ConnectionState::Connecting, + Connected => ConnectionState::Connected, + Error => ConnectionState::Error, + } + }; + Self { + ioc_address: Some(SocketAddr::V4(chst.addr)), + connection: connst, + // TODO config is stored in two places + // conf: chst.conf, + archiving_configuration: config, + recv_count: chst.recv_count.unwrap_or(0), + recv_bytes: chst.recv_bytes.unwrap_or(0), + recv_last: chst.recv_last, + write_st_last: chst.write_st_last, + write_mt_last: chst.write_mt_last, + write_lt_last: chst.write_lt_last, + updated: chst.stnow, + pong_last: chst.pong_last, + private, + } + } +} + #[derive(Debug, Serialize)] struct StatePrivate { status_emit_count: u64, @@ -93,9 +157,26 @@ enum ConnectionState { Error, } +pub async fn error_handler_test() -> Result, axum::Json> { + Err(axum::Json(format!("test error message"))) +} + // ChannelStatusesResponse // BTreeMap -pub async fn channel_states(params: HashMap, tx: Sender) -> axum::Json { +pub async fn channel_states( + params: HashMap, + tx: Sender, +) -> Result, axum::Json> { + match channel_states_try(params, tx).await { + Ok(x) => Ok(x), + Err(e) => Err(axum::Json(e.to_string())), + } +} + +async fn channel_states_try( + params: HashMap, + tx: Sender, +) -> Result, Error> { let name = params.get("name").map_or(String::new(), |x| x.clone()).to_string(); let limit = params.get("limit").and_then(|x| x.parse().ok()).unwrap_or(40); let (tx2, rx2) = async_channel::bounded(1); @@ -115,220 +196,63 @@ pub async fn channel_states(params: HashMap, tx: Sender { - let chst = ChannelState { - ioc_address: None, - connection: ConnectionState::Connecting, - archiving_configuration: st1.config, - recv_count: 0, - recv_bytes: 0, - recv_last: SystemTime::UNIX_EPOCH, - write_st_last: SystemTime::UNIX_EPOCH, - write_mt_last: SystemTime::UNIX_EPOCH, - write_lt_last: SystemTime::UNIX_EPOCH, - updated: SystemTime::UNIX_EPOCH, - private: StatePrivate::default(), - }; + let chst = ChannelState::connecting(st1.config); states.channels.insert(k, chst); } ActiveChannelState::WaitForStatusSeriesId { .. } => { - let chst = ChannelState { - ioc_address: None, - connection: ConnectionState::Connecting, - archiving_configuration: st1.config, - recv_count: 0, - recv_bytes: 0, - recv_last: SystemTime::UNIX_EPOCH, - write_st_last: SystemTime::UNIX_EPOCH, - write_mt_last: SystemTime::UNIX_EPOCH, - write_lt_last: SystemTime::UNIX_EPOCH, - updated: SystemTime::UNIX_EPOCH, - private: StatePrivate::default(), - }; + let chst = ChannelState::connecting(st1.config); states.channels.insert(k, chst); } ActiveChannelState::WithStatusSeriesId(st3) => { use crate::ca::statemap::WithStatusSeriesIdStateInner; match st3.inner { WithStatusSeriesIdStateInner::AddrSearchPending { .. } => { - let chst = ChannelState { - ioc_address: None, - connection: ConnectionState::Connecting, - archiving_configuration: st1.config, - recv_count: 0, - recv_bytes: 0, - recv_last: SystemTime::UNIX_EPOCH, - write_st_last: SystemTime::UNIX_EPOCH, - write_mt_last: SystemTime::UNIX_EPOCH, - write_lt_last: SystemTime::UNIX_EPOCH, - updated: SystemTime::UNIX_EPOCH, - private: StatePrivate::default(), - }; + let chst = ChannelState::connecting(st1.config); states.channels.insert(k, chst); } WithStatusSeriesIdStateInner::WithAddress { addr, state: st4 } => { use crate::ca::statemap::WithAddressState; + let addr2 = SocketAddr::V4(addr); match st4 { WithAddressState::Unassigned { .. } => { - let chst = ChannelState { - ioc_address: Some(SocketAddr::V4(addr)), - connection: ConnectionState::Connecting, - archiving_configuration: st1.config, - recv_count: 0, - recv_bytes: 0, - recv_last: SystemTime::UNIX_EPOCH, - write_st_last: SystemTime::UNIX_EPOCH, - write_mt_last: SystemTime::UNIX_EPOCH, - write_lt_last: SystemTime::UNIX_EPOCH, - updated: SystemTime::UNIX_EPOCH, - private: StatePrivate::default(), - }; + let chst = ChannelState::connecting_addr( + st1.config, + Some(addr2), + ConnectionState::Connecting, + ); states.channels.insert(k, chst); } WithAddressState::Assigned(st5) => { use crate::ca::statemap::ConnectionStateValue; match st5.value { ConnectionStateValue::Unknown => { - let chst = ChannelState { - ioc_address: Some(SocketAddr::V4(addr)), - connection: ConnectionState::Connecting, - archiving_configuration: st1.config, - recv_count: 0, - recv_bytes: 0, - recv_last: SystemTime::UNIX_EPOCH, - write_st_last: SystemTime::UNIX_EPOCH, - write_mt_last: SystemTime::UNIX_EPOCH, - write_lt_last: SystemTime::UNIX_EPOCH, - updated: SystemTime::UNIX_EPOCH, - private: StatePrivate::default(), - }; + let chst = ChannelState::connecting_addr( + st1.config, + Some(addr2), + ConnectionState::Connecting, + ); states.channels.insert(k, chst); } ConnectionStateValue::ChannelStateInfo(st6) => { - let recv_count = st6.recv_count.unwrap_or(0); - let recv_bytes = st6.recv_bytes.unwrap_or(0); - let private = StatePrivate { - status_emit_count: st6.status_emit_count, - }; - use crate::ca::conn::ChannelConnectedInfo; - match st6.channel_connected_info { - ChannelConnectedInfo::Disconnected => { - let chst = ChannelState { - ioc_address: Some(SocketAddr::V4(addr)), - connection: ConnectionState::Disconnected, - // TODO config is stored in two places - // conf: st6.conf, - archiving_configuration: st1.config, - recv_count, - recv_bytes, - recv_last: st6.recv_last, - write_st_last: st6.write_st_last, - write_mt_last: st6.write_mt_last, - write_lt_last: st6.write_lt_last, - updated: st6.stnow, - private, - }; - states.channels.insert(k, chst); - } - ChannelConnectedInfo::Connecting => { - let chst = ChannelState { - ioc_address: Some(SocketAddr::V4(addr)), - connection: ConnectionState::Connecting, - archiving_configuration: st1.config, - recv_count, - recv_bytes, - recv_last: st6.recv_last, - write_st_last: st6.write_st_last, - write_mt_last: st6.write_mt_last, - write_lt_last: st6.write_lt_last, - updated: st6.stnow, - private, - }; - states.channels.insert(k, chst); - } - ChannelConnectedInfo::Connected => { - let chst = ChannelState { - ioc_address: Some(SocketAddr::V4(addr)), - connection: ConnectionState::Connected, - archiving_configuration: st1.config, - recv_count, - recv_bytes, - recv_last: st6.recv_last, - write_st_last: st6.write_st_last, - write_mt_last: st6.write_mt_last, - write_lt_last: st6.write_lt_last, - updated: st6.stnow, - private, - }; - states.channels.insert(k, chst); - } - ChannelConnectedInfo::Error => { - let chst = ChannelState { - ioc_address: Some(SocketAddr::V4(addr)), - connection: ConnectionState::Error, - archiving_configuration: st1.config, - recv_count, - recv_bytes, - recv_last: st6.recv_last, - write_st_last: st6.write_st_last, - write_mt_last: st6.write_mt_last, - write_lt_last: st6.write_lt_last, - updated: st6.stnow, - private, - }; - states.channels.insert(k, chst); - } - } + let chst = ChannelState::with_chst(st1.config, st6); + states.channels.insert(k, chst); } } } } } WithStatusSeriesIdStateInner::UnknownAddress { .. } => { - let chst = ChannelState { - ioc_address: None, - connection: ConnectionState::Connecting, - archiving_configuration: st1.config, - recv_count: 0, - recv_bytes: 0, - recv_last: SystemTime::UNIX_EPOCH, - write_st_last: SystemTime::UNIX_EPOCH, - write_mt_last: SystemTime::UNIX_EPOCH, - write_lt_last: SystemTime::UNIX_EPOCH, - updated: SystemTime::UNIX_EPOCH, - private: StatePrivate::default(), - }; + let chst = ChannelState::connecting(st1.config); states.channels.insert(k, chst); } WithStatusSeriesIdStateInner::NoAddress { .. } => { - let chst = ChannelState { - ioc_address: None, - connection: ConnectionState::Unreachable, - archiving_configuration: st1.config, - recv_count: 0, - recv_bytes: 0, - recv_last: SystemTime::UNIX_EPOCH, - write_st_last: SystemTime::UNIX_EPOCH, - write_mt_last: SystemTime::UNIX_EPOCH, - write_lt_last: SystemTime::UNIX_EPOCH, - updated: SystemTime::UNIX_EPOCH, - private: StatePrivate::default(), - }; + let chst = + ChannelState::connecting_addr(st1.config, None, ConnectionState::Unreachable); states.channels.insert(k, chst); } WithStatusSeriesIdStateInner::MaybeWrongAddress(..) => { - let chst = ChannelState { - ioc_address: None, - connection: ConnectionState::Unreachable, - archiving_configuration: st1.config, - recv_count: 0, - recv_bytes: 0, - recv_last: SystemTime::UNIX_EPOCH, - write_st_last: SystemTime::UNIX_EPOCH, - write_mt_last: SystemTime::UNIX_EPOCH, - write_lt_last: SystemTime::UNIX_EPOCH, - updated: SystemTime::UNIX_EPOCH, - private: StatePrivate::default(), - }; + let chst = + ChannelState::connecting_addr(st1.config, None, ConnectionState::Unreachable); states.channels.insert(k, chst); } } @@ -338,5 +262,5 @@ pub async fn channel_states(params: HashMap, tx: Sender {} } } - axum::Json(states) + Ok(axum::Json(states)) } diff --git a/scywr/src/iteminsertqueue.rs b/scywr/src/iteminsertqueue.rs index 327e757..3ca88f4 100644 --- a/scywr/src/iteminsertqueue.rs +++ b/scywr/src/iteminsertqueue.rs @@ -470,6 +470,7 @@ pub enum ChannelStatus { AssignedToAddress, Opened, Closed(ChannelStatusClosedReason), + Pong, } impl ChannelStatus { @@ -492,6 +493,7 @@ impl ChannelStatus { ConnectFail => 11, IoError => 12, }, + Pong => 25, } } @@ -512,6 +514,7 @@ impl ChannelStatus { 11 => Closed(ConnectFail), 12 => Closed(IoError), 24 => AssignedToAddress, + 25 => Pong, _ => { return Err(err::Error::with_msg_no_trace(format!( "unknown ChannelStatus kind {kind}"