From 7f6eabd8470647556b54d3198672b9bb5cc33158 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Thu, 15 May 2025 10:30:32 +0200 Subject: [PATCH] Keep channel message log --- netfetch/src/ca/conn.rs | 80 +++++++++++++++++++++++++++----------- netfetch/src/ca/connset.rs | 4 +- 2 files changed, 60 insertions(+), 24 deletions(-) diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index ff754ce..f842fb9 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -888,6 +888,7 @@ pub struct CmdChannelInspectFull { pub enum ConnCommandKind { ChannelAdd(ChannelConfig, ChannelStatusSeriesId), ChannelClose(String), + ChannelCloseReconf(String), Shutdown, ChannelInspectFull(CmdChannelInspectFull), } @@ -913,6 +914,13 @@ impl ConnCommand { } } + pub fn channel_close_reconf(name: String) -> Self { + Self { + id: Self::make_id(), + kind: ConnCommandKind::ChannelCloseReconf(name), + } + } + pub fn channel_inspect(name: String, tx: Sender) -> Self { Self { id: Self::make_id(), @@ -1113,6 +1121,7 @@ pub struct CaConn { mett: stats::mett::CaConnMetrics, metrics_emit_last: Instant, fionread_last: u32, + logs_by_channel_name: HashMap>, } impl Drop for CaConn { @@ -1180,6 +1189,7 @@ impl CaConn { mett: stats::mett::CaConnMetrics::new(), metrics_emit_last: tsnow, fionread_last: 0, + logs_by_channel_name: HashMap::new(), } } @@ -1230,6 +1240,22 @@ impl CaConn { self.conn_command_tx.as_ref().get_ref().clone() } + fn log_channel(&mut self, name: &str, msg: String) { + if let Some(v) = self.logs_by_channel_name.get_mut(name) { + let n = v.len(); + if n >= 24 { + for _ in 16..n { + v.pop_front(); + } + } + v.push_back((time::UtcDateTime::now(), msg)); + } else { + let mut v = VecDeque::new(); + v.push_back((time::UtcDateTime::now(), msg)); + self.logs_by_channel_name.insert(name.into(), v); + } + } + fn is_shutdown(&self) -> bool { if let CaConnState::Shutdown(..) = self.state { true @@ -1316,10 +1342,17 @@ impl CaConn { trace3!("handle_conn_command received a command {}", self.remote_addr_dbg); match a.kind { ConnCommandKind::ChannelAdd(conf, cssid) => { + self.log_channel(conf.name(), format!("add file{}", conf.config_file_basename())); self.channel_add(conf, cssid)?; Ok(Ready(Some(()))) } ConnCommandKind::ChannelClose(name) => { + self.log_channel(&name, format!("close")); + self.cmd_channel_close(name); + Ok(Ready(Some(()))) + } + ConnCommandKind::ChannelCloseReconf(name) => { + self.log_channel(&name, format!("close for reconf")); self.cmd_channel_close(name); Ok(Ready(Some(()))) } @@ -1327,34 +1360,35 @@ impl CaConn { self.cmd_shutdown(); Ok(Ready(Some(()))) } - ConnCommandKind::ChannelInspectFull(cmd) => match self.cid_by_name(&cmd.name) { - Some(cid) => match self.channels.get(&cid) { - Some(ch) => match serde_json::to_value(&ch) { - Ok(val) => match cmd.tx.send_blocking(val) { - Ok(()) => { - // all fine - Ok(Ready(Some(()))) - } - Err(_) => { - // TODO count in metrics - Ok(Ready(Some(()))) - } + ConnCommandKind::ChannelInspectFull(cmd) => { + let chstate = match self.cid_by_name(&cmd.name) { + Some(cid) => match self.channels.get(&cid) { + Some(ch) => match serde_json::to_value(&ch) { + Ok(val) => val, + Err(_) => serde_json::Value::String(format!("serde error")), }, - Err(_) => { - // TODO count in metrics - Ok(Ready(Some(()))) - } + None => serde_json::Value::String(format!("unknown cid {:?}", cid)), }, - None => { + None => serde_json::Value::String(format!("unknown name {:?}", cmd.name)), + }; + let chlog = match self.logs_by_channel_name.get(&cmd.name) { + Some(v) => match serde_json::to_value(&v) { + Ok(val) => val, + Err(_) => serde_json::Value::String(format!("serde error")), + }, + None => serde_json::Value::String(format!("unknown name {:?}", cmd.name)), + }; + let mut h = serde_json::Map::new(); + h.insert("chstate".into(), chstate); + h.insert("chlog".into(), chlog); + match cmd.tx.send_blocking(serde_json::Value::Object(h)) { + Ok(()) => Ok(Ready(Some(()))), + Err(_) => { // TODO count in metrics Ok(Ready(Some(()))) } - }, - None => { - // cmd.tx.close(); - Ok(Ready(Some(()))) } - }, + } } } Ready(None) => { @@ -2850,6 +2884,7 @@ impl CaConn { if series::dbg::dbg_chn(&name) { info!("queue event to notice channel create fail {name}"); } + let name2 = name.to_string(); let failinfo = format!("name {} cid {}", name, cid); let item = CaConnEvent { ts: tsnow, @@ -2857,6 +2892,7 @@ impl CaConn { }; self.ca_conn_event_out_queue.push_back(item); warn!("CreateChanFail {} msg {:?}", name, msg); + self.log_channel(&name2, format!("CreateChanFail cid {}", msg.cid)); self.channel_remove_by_cid(cid); } else { let failinfo = format!("unexpected cid {}", cid); diff --git a/netfetch/src/ca/connset.rs b/netfetch/src/ca/connset.rs index 8630ca8..c82ad71 100644 --- a/netfetch/src/ca/connset.rs +++ b/netfetch/src/ca/connset.rs @@ -587,12 +587,12 @@ impl CaConnSet { ress.chst.config = cmd.ch_cfg; } WithAddressState::Assigned(_) => { - debug!("unassign for config change {cmd:?} {addr}"); + debug!("unassign for config change {:?} {}", cmd, addr); let conn_ress = ress .ca_conn_ress .get_mut(&SocketAddr::V4(addr.clone())) .ok_or_else(|| Error::ChannelAssignedWithoutConnRess)?; - let item = ConnCommand::channel_close(cmd.name().into()); + let item = ConnCommand::channel_close_reconf(cmd.name().into()); conn_ress.cmd_queue.push_back(item); st3.inner = WithStatusSeriesIdStateInner::UnassigningForConfigChange( statemap::UnassigningForConfigChangeState {