From a91f6d7070a5ff5742faf3f00e562c8c973aa0c1 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Wed, 26 Mar 2025 14:28:38 +0100 Subject: [PATCH] Channel inspect api --- daqingest/Cargo.toml | 2 +- netfetch/src/ca/conn.rs | 7 ++++ netfetch/src/ca/connset.rs | 39 +++++++++++-------- netfetch/src/metrics.rs | 77 +++++++++++++++++++++++++++++++------- 4 files changed, 95 insertions(+), 30 deletions(-) diff --git a/daqingest/Cargo.toml b/daqingest/Cargo.toml index 849be30..45a6220 100644 --- a/daqingest/Cargo.toml +++ b/daqingest/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "daqingest" -version = "0.2.7-aa.10" +version = "0.2.7-aa.11" authors = ["Dominik Werder "] edition = "2024" diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index c0e5010..a00bec1 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -912,6 +912,13 @@ impl ConnCommand { } } + pub fn channel_inspect(name: String, tx: Sender) -> Self { + Self { + id: Self::make_id(), + kind: ConnCommandKind::ChannelInspectFull(CmdChannelInspectFull { name, tx }), + } + } + pub fn shutdown() -> Self { Self { id: Self::make_id(), diff --git a/netfetch/src/ca/connset.rs b/netfetch/src/ca/connset.rs index 0793abc..7e6a203 100644 --- a/netfetch/src/ca/connset.rs +++ b/netfetch/src/ca/connset.rs @@ -226,15 +226,10 @@ impl fmt::Debug for ChannelStatusesRequest { } } -#[derive(Debug)] -pub enum ChannelCommandKind { - InspectDetail, -} - #[derive(Debug)] pub struct ChannelCommand { pub channel: String, - pub kind: ChannelCommandKind, + pub conn_command: ConnCommand, } #[derive(Debug)] @@ -245,6 +240,7 @@ pub enum ConnSetCmd { ChannelRemove(ChannelRemove), Shutdown, ChannelStatuses(ChannelStatusesRequest), + // TODO rename to ConnCommand because it must be handled by some specific Conn ChannelCommand(ChannelCommand), } @@ -1080,16 +1076,27 @@ impl CaConnSet { return Ok(()); } // TODO handle, send to corresponding CaConn - // let channels_ca_conn_set = self - // .channel_states - // .iter() - // .filter(|(k, _)| k.name() == cmd.channel) - // .map(|(k, v)| (k.name().to_string(), v.clone())) - // .collect(); - // let item = ChannelStatusesResponse { channels_ca_conn_set }; - // if req.tx.try_send(item).is_err() { - // self.stats.response_tx_fail.inc(); - // } + let name = cmd.channel.clone(); + let mut cmd = Some(cmd); + // TODO no need to iterate anymore + self.channel_states + .iter_mut() + .filter(|(k, _)| k.name() == name) + .map(|(_, st1)| { + if let ChannelStateValue::Active(st2) = &mut st1.value { + if let ActiveChannelState::WithStatusSeriesId(st3) = st2 { + if let WithStatusSeriesIdStateInner::WithAddress { addr, state: _ } = &mut st3.inner { + let addr2 = SocketAddr::V4(*addr); + self.ca_conn_ress.get_mut(&addr2).map(|q| { + if let Some(cmd) = cmd.take() { + q.cmd_queue.push_back(cmd.conn_command); + } + }); + } + } + }; + }) + .for_each(|_| ()); Ok(()) } diff --git a/netfetch/src/metrics.rs b/netfetch/src/metrics.rs index 705cdce..f6d313d 100644 --- a/netfetch/src/metrics.rs +++ b/netfetch/src/metrics.rs @@ -263,6 +263,36 @@ async fn channel_remove(params: HashMap, dcom: Arc) Json(Value::Bool(false)) } +async fn channel_inspect_inner( + params: HashMap, + dcom: Arc, +) -> Result, Error> { + if let Some(name) = params.get("name") { + let (tx, rx) = async_channel::bounded(1); + let ev = DaemonEvent::ChannelCommand(crate::ca::connset::ChannelCommand { + channel: name.into(), + conn_command: crate::ca::conn::ConnCommand::channel_inspect(name.into(), tx), + }); + dcom.tx.send(ev).await?; + match rx.recv().await { + Ok(js) => Ok(axum::Json(js)), + Err(e) => Err(Error::from_string("recv error while waiting for answer")), + } + } else { + Err(Error::with_msg_no_trace(format!("wrong parameters given"))) + } +} + +async fn channel_inspect( + params: HashMap, + dcom: Arc, +) -> Result, Response> { + match channel_inspect_inner(params, dcom).await { + Ok(ret) => Ok(ret), + Err(e) => Err(PublicErrorMsg(e.to_string()).into_response()), + } +} + // ChannelStatusesResponse // BTreeMap async fn private_channel_states( @@ -372,7 +402,6 @@ fn make_routes( use axum::extract; use axum::routing::{get, post, put}; use http::StatusCode; - Router::new() .fallback(|req: Request| async move { info!("Fallback for {} {}", req.method(), req.uri()); @@ -428,18 +457,11 @@ fn make_routes( Router::new() .nest( "/channel", - Router::new().route( - "/delete", - post({ - let rres = rres.clone(); - move |(headers, params, body): ( - HeaderMap, - Query>, - axum::body::Body, - )| { - delete::delete((headers, params, body), rres) - } - }), + make_routes_private_channel( + rres.clone(), + dcom.clone(), + connset_cmd_tx.clone(), + stats_set.clone(), ), ) .route( @@ -589,6 +611,35 @@ fn make_routes_ingest( ) } +fn make_routes_private_channel( + rres: Arc, + dcom: Arc, + connset_cmd_tx: Sender, + stats_set: StatsSet, +) -> axum::Router { + use axum::Router; + use axum::extract; + use axum::routing::{get, post, put}; + use http::StatusCode; + Router::new() + .route( + "/delete", + post({ + let rres = rres.clone(); + move |(headers, params, body): (HeaderMap, Query>, axum::body::Body)| { + delete::delete((headers, params, body), rres) + } + }), + ) + .route( + "/inspect", + get({ + let dcom = dcom.clone(); + |Query(params): Query>| channel_inspect(params, dcom) + }), + ) +} + pub async fn metrics_service( bind_to: String, dcom: Arc,