Channel inspect api

This commit is contained in:
Dominik Werder
2025-03-26 14:28:38 +01:00
parent f1286d9ba6
commit a91f6d7070
4 changed files with 95 additions and 30 deletions

View File

@@ -1,6 +1,6 @@
[package]
name = "daqingest"
version = "0.2.7-aa.10"
version = "0.2.7-aa.11"
authors = ["Dominik Werder <dominik.werder@gmail.com>"]
edition = "2024"

View File

@@ -912,6 +912,13 @@ impl ConnCommand {
}
}
pub fn channel_inspect(name: String, tx: Sender<serde_json::Value>) -> Self {
Self {
id: Self::make_id(),
kind: ConnCommandKind::ChannelInspectFull(CmdChannelInspectFull { name, tx }),
}
}
pub fn shutdown() -> Self {
Self {
id: Self::make_id(),

View File

@@ -226,15 +226,10 @@ impl fmt::Debug for ChannelStatusesRequest {
}
}
#[derive(Debug)]
pub enum ChannelCommandKind {
InspectDetail,
}
#[derive(Debug)]
pub struct ChannelCommand {
pub channel: String,
pub kind: ChannelCommandKind,
pub conn_command: ConnCommand,
}
#[derive(Debug)]
@@ -245,6 +240,7 @@ pub enum ConnSetCmd {
ChannelRemove(ChannelRemove),
Shutdown,
ChannelStatuses(ChannelStatusesRequest),
// TODO rename to ConnCommand because it must be handled by some specific Conn
ChannelCommand(ChannelCommand),
}
@@ -1080,16 +1076,27 @@ impl CaConnSet {
return Ok(());
}
// TODO handle, send to corresponding CaConn
// let channels_ca_conn_set = self
// .channel_states
// .iter()
// .filter(|(k, _)| k.name() == cmd.channel)
// .map(|(k, v)| (k.name().to_string(), v.clone()))
// .collect();
// let item = ChannelStatusesResponse { channels_ca_conn_set };
// if req.tx.try_send(item).is_err() {
// self.stats.response_tx_fail.inc();
// }
let name = cmd.channel.clone();
let mut cmd = Some(cmd);
// TODO no need to iterate anymore
self.channel_states
.iter_mut()
.filter(|(k, _)| k.name() == name)
.map(|(_, st1)| {
if let ChannelStateValue::Active(st2) = &mut st1.value {
if let ActiveChannelState::WithStatusSeriesId(st3) = st2 {
if let WithStatusSeriesIdStateInner::WithAddress { addr, state: _ } = &mut st3.inner {
let addr2 = SocketAddr::V4(*addr);
self.ca_conn_ress.get_mut(&addr2).map(|q| {
if let Some(cmd) = cmd.take() {
q.cmd_queue.push_back(cmd.conn_command);
}
});
}
}
};
})
.for_each(|_| ());
Ok(())
}

View File

@@ -263,6 +263,36 @@ async fn channel_remove(params: HashMap<String, String>, dcom: Arc<DaemonComm>)
Json(Value::Bool(false))
}
async fn channel_inspect_inner(
params: HashMap<String, String>,
dcom: Arc<DaemonComm>,
) -> Result<axum::Json<serde_json::Value>, Error> {
if let Some(name) = params.get("name") {
let (tx, rx) = async_channel::bounded(1);
let ev = DaemonEvent::ChannelCommand(crate::ca::connset::ChannelCommand {
channel: name.into(),
conn_command: crate::ca::conn::ConnCommand::channel_inspect(name.into(), tx),
});
dcom.tx.send(ev).await?;
match rx.recv().await {
Ok(js) => Ok(axum::Json(js)),
Err(e) => Err(Error::from_string("recv error while waiting for answer")),
}
} else {
Err(Error::with_msg_no_trace(format!("wrong parameters given")))
}
}
async fn channel_inspect(
params: HashMap<String, String>,
dcom: Arc<DaemonComm>,
) -> Result<axum::Json<serde_json::Value>, Response> {
match channel_inspect_inner(params, dcom).await {
Ok(ret) => Ok(ret),
Err(e) => Err(PublicErrorMsg(e.to_string()).into_response()),
}
}
// ChannelStatusesResponse
// BTreeMap<String, ChannelState>
async fn private_channel_states(
@@ -372,7 +402,6 @@ fn make_routes(
use axum::extract;
use axum::routing::{get, post, put};
use http::StatusCode;
Router::new()
.fallback(|req: Request<axum::body::Body>| async move {
info!("Fallback for {} {}", req.method(), req.uri());
@@ -428,18 +457,11 @@ fn make_routes(
Router::new()
.nest(
"/channel",
Router::new().route(
"/delete",
post({
let rres = rres.clone();
move |(headers, params, body): (
HeaderMap,
Query<HashMap<String, String>>,
axum::body::Body,
)| {
delete::delete((headers, params, body), rres)
}
}),
make_routes_private_channel(
rres.clone(),
dcom.clone(),
connset_cmd_tx.clone(),
stats_set.clone(),
),
)
.route(
@@ -589,6 +611,35 @@ fn make_routes_ingest(
)
}
fn make_routes_private_channel(
rres: Arc<RoutesResources>,
dcom: Arc<DaemonComm>,
connset_cmd_tx: Sender<CaConnSetEvent>,
stats_set: StatsSet,
) -> axum::Router {
use axum::Router;
use axum::extract;
use axum::routing::{get, post, put};
use http::StatusCode;
Router::new()
.route(
"/delete",
post({
let rres = rres.clone();
move |(headers, params, body): (HeaderMap, Query<HashMap<String, String>>, axum::body::Body)| {
delete::delete((headers, params, body), rres)
}
}),
)
.route(
"/inspect",
get({
let dcom = dcom.clone();
|Query(params): Query<HashMap<String, String>>| channel_inspect(params, dcom)
}),
)
}
pub async fn metrics_service(
bind_to: String,
dcom: Arc<DaemonComm>,