Emit pong channel status event, and in status metrics

This commit is contained in:
Dominik Werder
2024-07-24 21:50:28 +02:00
parent 85db531133
commit 62b3628003
4 changed files with 152 additions and 185 deletions

View File

@@ -200,6 +200,8 @@ pub struct ChannelStateInfo {
pub addr: SocketAddrV4,
pub series: Option<SeriesId>,
pub channel_connected_info: ChannelConnectedInfo,
pub ping_last: Option<SystemTime>,
pub pong_last: Option<SystemTime>,
pub scalar_type: Option<ScalarType>,
pub shape: Option<Shape>,
// NOTE: this solution can yield to the same Instant serialize to different string representations.
@@ -551,6 +553,7 @@ impl ChannelState {
addr: SocketAddrV4,
conf: ChannelConfig,
stnow: SystemTime,
conn: &CaConn,
) -> ChannelStateInfo {
let channel_connected_info = match self {
ChannelState::Init(..) => ChannelConnectedInfo::Disconnected,
@@ -626,6 +629,8 @@ impl ChannelState {
addr,
series,
channel_connected_info,
ping_last: conn.ioc_ping_last,
pong_last: conn.ioc_pong_last,
scalar_type,
shape,
ts_created,
@@ -962,9 +967,10 @@ pub struct CaConn {
conn_command_rx: Pin<Box<Receiver<ConnCommand>>>,
conn_backoff: f32,
conn_backoff_beg: f32,
ioc_ping_last: Instant,
ioc_ping_next: Instant,
ioc_ping_start: Option<Instant>,
ioc_ping_last: Option<SystemTime>,
ioc_pong_last: Option<SystemTime>,
iqsp: Pin<Box<InsertSenderPolling>>,
ca_conn_event_out_queue: VecDeque<CaConnEvent>,
ca_conn_event_out_queue_max: usize,
@@ -1029,9 +1035,10 @@ impl CaConn {
conn_command_rx: Box::pin(cq_rx),
conn_backoff: 0.02,
conn_backoff_beg: 0.02,
ioc_ping_last: tsnow,
ioc_ping_next: tsnow + Self::ioc_ping_ivl_rng(&mut rng),
ioc_ping_start: None,
ioc_ping_last: None,
ioc_pong_last: None,
iqsp: Box::pin(InsertSenderPolling::new(iqtx)),
ca_conn_event_out_queue: VecDeque::new(),
ca_conn_event_out_queue_max: 2000,
@@ -1055,7 +1062,7 @@ impl CaConn {
}
fn channel_status_emit_ivl(rng: &mut Xoshiro128PlusPlus) -> Duration {
Duration::from_millis(6000 + (rng.next_u32() & 0x7ff) as u64)
Duration::from_millis(8000 + (rng.next_u32() & 0xfff) as u64)
}
fn new_self_ticker() -> Pin<Box<tokio::time::Sleep>> {
@@ -2267,6 +2274,7 @@ impl CaConn {
if let Some(proto) = &mut self.proto {
self.stats.ping_start().inc();
self.ioc_ping_start = Some(tsnow);
self.ioc_ping_last = Some(self.tmp_ts_poll);
let msg = CaMsg::from_ty_ts(CaMsgTy::Echo, tsnow);
proto.push_out(msg);
} else {
@@ -2371,9 +2379,10 @@ impl CaConn {
let addr = &self.remote_addr_dbg;
warn!("received Echo even though we didn't asked for it {addr:?}");
}
self.ioc_ping_last = tsnow;
self.ioc_pong_last = Some(self.tmp_ts_poll);
self.ioc_ping_next = tsnow + Self::ioc_ping_ivl_rng(&mut self.rng);
self.ioc_ping_start = None;
self.emit_channel_event_pong();
}
CaMsgTy::CreateChanFail(msg) => {
// TODO
@@ -2790,7 +2799,7 @@ impl CaConn {
let mut channel_statuses = BTreeMap::new();
for (_, conf) in self.channels.iter() {
let chst = &conf.state;
let chinfo = chst.to_info(chst.cssid(), self.remote_addr_dbg, conf.conf.clone(), stnow);
let chinfo = chst.to_info(chst.cssid(), self.remote_addr_dbg, conf.conf.clone(), stnow, self);
channel_statuses.insert(chst.cssid(), chinfo);
}
// trace2!("{:?}", channel_statuses);
@@ -2866,6 +2875,30 @@ impl CaConn {
Ok(())
}
fn emit_channel_event_pong(&mut self) {
for (cid, ch) in self.channels.iter() {
match &ch.state {
ChannelState::Init(_) => {}
ChannelState::Creating(_) => {}
ChannelState::FetchEnumDetails(_) => {}
ChannelState::FetchCaStatusSeries(_) => {}
ChannelState::MakingSeriesWriter(_) => {}
ChannelState::Writable(st1) => {
let item = ChannelStatusItem {
ts: self.tmp_ts_poll,
cssid: st1.channel.cssid,
status: ChannelStatus::Pong,
};
let item = QueryItem::ChannelStatus(item);
self.iqdqs.st_rf3_rx.push_back(item);
}
ChannelState::Closing(_) => {}
ChannelState::Error(_) => {}
ChannelState::Ended(_) => {}
}
}
}
fn tick_writers(&mut self) -> Result<(), Error> {
for (_, chconf) in &mut self.channels {
let chst = &mut chconf.state;

View File

@@ -382,6 +382,13 @@ fn make_routes(
"/channel",
Router::new()
.fallback(|| async { axum::Json(json!({"subcommands":["states"]})) })
.route(
"/error_handler_test",
get({
let tx = connset_cmd_tx.clone();
|Query(params): Query<HashMap<String, String>>| status::error_handler_test()
}),
)
.route(
"/states",
get({

View File

@@ -5,12 +5,20 @@ use crate::conf::ChannelConfig;
use async_channel::Sender;
use chrono::DateTime;
use chrono::Utc;
use err::thiserror;
use err::ThisError;
use serde::Serialize;
use std::collections::BTreeMap;
use std::collections::HashMap;
use std::net::SocketAddr;
use std::time::SystemTime;
#[derive(Debug, ThisError)]
#[cstm(name = "StatusError")]
pub enum Error {
Internal,
}
#[derive(Debug, Serialize)]
pub struct ChannelStates {
running_since: DateTime<Utc>,
@@ -66,9 +74,65 @@ struct ChannelState {
write_lt_last: SystemTime,
#[serde(with = "humantime_serde", skip_serializing_if = "system_time_epoch")]
updated: SystemTime,
#[serde(with = "humantime_serde")]
pong_last: Option<SystemTime>,
private: StatePrivate,
}
impl ChannelState {
fn connecting(config: ChannelConfig) -> Self {
Self::connecting_addr(config, None, ConnectionState::Connecting)
}
fn connecting_addr(config: ChannelConfig, ioc_address: Option<SocketAddr>, connst: ConnectionState) -> Self {
Self {
ioc_address,
connection: connst,
archiving_configuration: config,
recv_count: 0,
recv_bytes: 0,
recv_last: SystemTime::UNIX_EPOCH,
write_st_last: SystemTime::UNIX_EPOCH,
write_mt_last: SystemTime::UNIX_EPOCH,
write_lt_last: SystemTime::UNIX_EPOCH,
updated: SystemTime::UNIX_EPOCH,
pong_last: None,
private: StatePrivate::default(),
}
}
fn with_chst(config: ChannelConfig, chst: crate::ca::conn::ChannelStateInfo) -> Self {
let private = StatePrivate {
status_emit_count: chst.status_emit_count,
};
let connst = {
use crate::ca::conn::ChannelConnectedInfo::*;
match chst.channel_connected_info {
Disconnected => ConnectionState::Disconnected,
Connecting => ConnectionState::Connecting,
Connected => ConnectionState::Connected,
Error => ConnectionState::Error,
}
};
Self {
ioc_address: Some(SocketAddr::V4(chst.addr)),
connection: connst,
// TODO config is stored in two places
// conf: chst.conf,
archiving_configuration: config,
recv_count: chst.recv_count.unwrap_or(0),
recv_bytes: chst.recv_bytes.unwrap_or(0),
recv_last: chst.recv_last,
write_st_last: chst.write_st_last,
write_mt_last: chst.write_mt_last,
write_lt_last: chst.write_lt_last,
updated: chst.stnow,
pong_last: chst.pong_last,
private,
}
}
}
#[derive(Debug, Serialize)]
struct StatePrivate {
status_emit_count: u64,
@@ -93,9 +157,26 @@ enum ConnectionState {
Error,
}
pub async fn error_handler_test() -> Result<axum::Json<ChannelStates>, axum::Json<String>> {
Err(axum::Json(format!("test error message")))
}
// ChannelStatusesResponse
// BTreeMap<String, ChannelState>
pub async fn channel_states(params: HashMap<String, String>, tx: Sender<CaConnSetEvent>) -> axum::Json<ChannelStates> {
pub async fn channel_states(
params: HashMap<String, String>,
tx: Sender<CaConnSetEvent>,
) -> Result<axum::Json<ChannelStates>, axum::Json<String>> {
match channel_states_try(params, tx).await {
Ok(x) => Ok(x),
Err(e) => Err(axum::Json(e.to_string())),
}
}
async fn channel_states_try(
params: HashMap<String, String>,
tx: Sender<CaConnSetEvent>,
) -> Result<axum::Json<ChannelStates>, Error> {
let name = params.get("name").map_or(String::new(), |x| x.clone()).to_string();
let limit = params.get("limit").and_then(|x| x.parse().ok()).unwrap_or(40);
let (tx2, rx2) = async_channel::bounded(1);
@@ -115,220 +196,63 @@ pub async fn channel_states(params: HashMap<String, String>, tx: Sender<CaConnSe
use crate::ca::statemap::ActiveChannelState;
match st2 {
ActiveChannelState::Init { .. } => {
let chst = ChannelState {
ioc_address: None,
connection: ConnectionState::Connecting,
archiving_configuration: st1.config,
recv_count: 0,
recv_bytes: 0,
recv_last: SystemTime::UNIX_EPOCH,
write_st_last: SystemTime::UNIX_EPOCH,
write_mt_last: SystemTime::UNIX_EPOCH,
write_lt_last: SystemTime::UNIX_EPOCH,
updated: SystemTime::UNIX_EPOCH,
private: StatePrivate::default(),
};
let chst = ChannelState::connecting(st1.config);
states.channels.insert(k, chst);
}
ActiveChannelState::WaitForStatusSeriesId { .. } => {
let chst = ChannelState {
ioc_address: None,
connection: ConnectionState::Connecting,
archiving_configuration: st1.config,
recv_count: 0,
recv_bytes: 0,
recv_last: SystemTime::UNIX_EPOCH,
write_st_last: SystemTime::UNIX_EPOCH,
write_mt_last: SystemTime::UNIX_EPOCH,
write_lt_last: SystemTime::UNIX_EPOCH,
updated: SystemTime::UNIX_EPOCH,
private: StatePrivate::default(),
};
let chst = ChannelState::connecting(st1.config);
states.channels.insert(k, chst);
}
ActiveChannelState::WithStatusSeriesId(st3) => {
use crate::ca::statemap::WithStatusSeriesIdStateInner;
match st3.inner {
WithStatusSeriesIdStateInner::AddrSearchPending { .. } => {
let chst = ChannelState {
ioc_address: None,
connection: ConnectionState::Connecting,
archiving_configuration: st1.config,
recv_count: 0,
recv_bytes: 0,
recv_last: SystemTime::UNIX_EPOCH,
write_st_last: SystemTime::UNIX_EPOCH,
write_mt_last: SystemTime::UNIX_EPOCH,
write_lt_last: SystemTime::UNIX_EPOCH,
updated: SystemTime::UNIX_EPOCH,
private: StatePrivate::default(),
};
let chst = ChannelState::connecting(st1.config);
states.channels.insert(k, chst);
}
WithStatusSeriesIdStateInner::WithAddress { addr, state: st4 } => {
use crate::ca::statemap::WithAddressState;
let addr2 = SocketAddr::V4(addr);
match st4 {
WithAddressState::Unassigned { .. } => {
let chst = ChannelState {
ioc_address: Some(SocketAddr::V4(addr)),
connection: ConnectionState::Connecting,
archiving_configuration: st1.config,
recv_count: 0,
recv_bytes: 0,
recv_last: SystemTime::UNIX_EPOCH,
write_st_last: SystemTime::UNIX_EPOCH,
write_mt_last: SystemTime::UNIX_EPOCH,
write_lt_last: SystemTime::UNIX_EPOCH,
updated: SystemTime::UNIX_EPOCH,
private: StatePrivate::default(),
};
let chst = ChannelState::connecting_addr(
st1.config,
Some(addr2),
ConnectionState::Connecting,
);
states.channels.insert(k, chst);
}
WithAddressState::Assigned(st5) => {
use crate::ca::statemap::ConnectionStateValue;
match st5.value {
ConnectionStateValue::Unknown => {
let chst = ChannelState {
ioc_address: Some(SocketAddr::V4(addr)),
connection: ConnectionState::Connecting,
archiving_configuration: st1.config,
recv_count: 0,
recv_bytes: 0,
recv_last: SystemTime::UNIX_EPOCH,
write_st_last: SystemTime::UNIX_EPOCH,
write_mt_last: SystemTime::UNIX_EPOCH,
write_lt_last: SystemTime::UNIX_EPOCH,
updated: SystemTime::UNIX_EPOCH,
private: StatePrivate::default(),
};
let chst = ChannelState::connecting_addr(
st1.config,
Some(addr2),
ConnectionState::Connecting,
);
states.channels.insert(k, chst);
}
ConnectionStateValue::ChannelStateInfo(st6) => {
let recv_count = st6.recv_count.unwrap_or(0);
let recv_bytes = st6.recv_bytes.unwrap_or(0);
let private = StatePrivate {
status_emit_count: st6.status_emit_count,
};
use crate::ca::conn::ChannelConnectedInfo;
match st6.channel_connected_info {
ChannelConnectedInfo::Disconnected => {
let chst = ChannelState {
ioc_address: Some(SocketAddr::V4(addr)),
connection: ConnectionState::Disconnected,
// TODO config is stored in two places
// conf: st6.conf,
archiving_configuration: st1.config,
recv_count,
recv_bytes,
recv_last: st6.recv_last,
write_st_last: st6.write_st_last,
write_mt_last: st6.write_mt_last,
write_lt_last: st6.write_lt_last,
updated: st6.stnow,
private,
};
states.channels.insert(k, chst);
}
ChannelConnectedInfo::Connecting => {
let chst = ChannelState {
ioc_address: Some(SocketAddr::V4(addr)),
connection: ConnectionState::Connecting,
archiving_configuration: st1.config,
recv_count,
recv_bytes,
recv_last: st6.recv_last,
write_st_last: st6.write_st_last,
write_mt_last: st6.write_mt_last,
write_lt_last: st6.write_lt_last,
updated: st6.stnow,
private,
};
states.channels.insert(k, chst);
}
ChannelConnectedInfo::Connected => {
let chst = ChannelState {
ioc_address: Some(SocketAddr::V4(addr)),
connection: ConnectionState::Connected,
archiving_configuration: st1.config,
recv_count,
recv_bytes,
recv_last: st6.recv_last,
write_st_last: st6.write_st_last,
write_mt_last: st6.write_mt_last,
write_lt_last: st6.write_lt_last,
updated: st6.stnow,
private,
};
states.channels.insert(k, chst);
}
ChannelConnectedInfo::Error => {
let chst = ChannelState {
ioc_address: Some(SocketAddr::V4(addr)),
connection: ConnectionState::Error,
archiving_configuration: st1.config,
recv_count,
recv_bytes,
recv_last: st6.recv_last,
write_st_last: st6.write_st_last,
write_mt_last: st6.write_mt_last,
write_lt_last: st6.write_lt_last,
updated: st6.stnow,
private,
};
states.channels.insert(k, chst);
}
}
let chst = ChannelState::with_chst(st1.config, st6);
states.channels.insert(k, chst);
}
}
}
}
}
WithStatusSeriesIdStateInner::UnknownAddress { .. } => {
let chst = ChannelState {
ioc_address: None,
connection: ConnectionState::Connecting,
archiving_configuration: st1.config,
recv_count: 0,
recv_bytes: 0,
recv_last: SystemTime::UNIX_EPOCH,
write_st_last: SystemTime::UNIX_EPOCH,
write_mt_last: SystemTime::UNIX_EPOCH,
write_lt_last: SystemTime::UNIX_EPOCH,
updated: SystemTime::UNIX_EPOCH,
private: StatePrivate::default(),
};
let chst = ChannelState::connecting(st1.config);
states.channels.insert(k, chst);
}
WithStatusSeriesIdStateInner::NoAddress { .. } => {
let chst = ChannelState {
ioc_address: None,
connection: ConnectionState::Unreachable,
archiving_configuration: st1.config,
recv_count: 0,
recv_bytes: 0,
recv_last: SystemTime::UNIX_EPOCH,
write_st_last: SystemTime::UNIX_EPOCH,
write_mt_last: SystemTime::UNIX_EPOCH,
write_lt_last: SystemTime::UNIX_EPOCH,
updated: SystemTime::UNIX_EPOCH,
private: StatePrivate::default(),
};
let chst =
ChannelState::connecting_addr(st1.config, None, ConnectionState::Unreachable);
states.channels.insert(k, chst);
}
WithStatusSeriesIdStateInner::MaybeWrongAddress(..) => {
let chst = ChannelState {
ioc_address: None,
connection: ConnectionState::Unreachable,
archiving_configuration: st1.config,
recv_count: 0,
recv_bytes: 0,
recv_last: SystemTime::UNIX_EPOCH,
write_st_last: SystemTime::UNIX_EPOCH,
write_mt_last: SystemTime::UNIX_EPOCH,
write_lt_last: SystemTime::UNIX_EPOCH,
updated: SystemTime::UNIX_EPOCH,
private: StatePrivate::default(),
};
let chst =
ChannelState::connecting_addr(st1.config, None, ConnectionState::Unreachable);
states.channels.insert(k, chst);
}
}
@@ -338,5 +262,5 @@ pub async fn channel_states(params: HashMap<String, String>, tx: Sender<CaConnSe
ChannelStateValue::ToRemove { .. } => {}
}
}
axum::Json(states)
Ok(axum::Json(states))
}

View File

@@ -470,6 +470,7 @@ pub enum ChannelStatus {
AssignedToAddress,
Opened,
Closed(ChannelStatusClosedReason),
Pong,
}
impl ChannelStatus {
@@ -492,6 +493,7 @@ impl ChannelStatus {
ConnectFail => 11,
IoError => 12,
},
Pong => 25,
}
}
@@ -512,6 +514,7 @@ impl ChannelStatus {
11 => Closed(ConnectFail),
12 => Closed(IoError),
24 => AssignedToAddress,
25 => Pong,
_ => {
return Err(err::Error::with_msg_no_trace(format!(
"unknown ChannelStatus kind {kind}"