Refactor request status id endpoint
This commit is contained in:
@@ -9,6 +9,7 @@ use crate::ReqCtx;
|
|||||||
use crate::Requ;
|
use crate::Requ;
|
||||||
use crate::ServiceSharedResources;
|
use crate::ServiceSharedResources;
|
||||||
use bytes::BufMut;
|
use bytes::BufMut;
|
||||||
|
use bytes::Bytes;
|
||||||
use bytes::BytesMut;
|
use bytes::BytesMut;
|
||||||
use disk::merge::mergedblobsfromremotes::MergedBlobsFromRemotes;
|
use disk::merge::mergedblobsfromremotes::MergedBlobsFromRemotes;
|
||||||
use futures_util::Stream;
|
use futures_util::Stream;
|
||||||
@@ -1036,21 +1037,7 @@ impl Api1EventsBinaryHandler {
|
|||||||
);
|
);
|
||||||
let s = s.instrument(span).instrument(reqidspan);
|
let s = s.instrument(span).instrument(reqidspan);
|
||||||
let body = body_stream(s);
|
let body = body_stream(s);
|
||||||
let n2 = ncc
|
let req_stat_id = format!("{}{:02x}", reqctx.reqid_this(), ncc.ix);
|
||||||
.node_config
|
|
||||||
.cluster
|
|
||||||
.nodes
|
|
||||||
.get(ncc.ix)
|
|
||||||
.ok_or_else(|| Error::with_msg_no_trace(format!("node ix {} not found", ncc.ix)))?;
|
|
||||||
let nodeno_pre = "sf-daqbuf-";
|
|
||||||
let nodeno: u32 = if n2.host.starts_with(nodeno_pre) {
|
|
||||||
n2.host[nodeno_pre.len()..nodeno_pre.len() + 2]
|
|
||||||
.parse()
|
|
||||||
.map_err(|e| Error::with_msg_no_trace(format!("{e}")))?
|
|
||||||
} else {
|
|
||||||
ncc.ix as _
|
|
||||||
};
|
|
||||||
let req_stat_id = format!("{}{:02}", reqctx.reqid_this(), nodeno);
|
|
||||||
info!("return req_stat_id {req_stat_id}");
|
info!("return req_stat_id {req_stat_id}");
|
||||||
let ret = response(StatusCode::OK).header(X_DAQBUF_REQID, req_stat_id);
|
let ret = response(StatusCode::OK).header(X_DAQBUF_REQID, req_stat_id);
|
||||||
let ret = ret.body(body)?;
|
let ret = ret.body(body)?;
|
||||||
@@ -1082,38 +1069,52 @@ impl RequestStatusHandler {
|
|||||||
pub async fn handle(&self, req: Requ, ctx: &ReqCtx, ncc: &NodeConfigCached) -> Result<StreamResponse, Error> {
|
pub async fn handle(&self, req: Requ, ctx: &ReqCtx, ncc: &NodeConfigCached) -> Result<StreamResponse, Error> {
|
||||||
let (head, body) = req.into_parts();
|
let (head, body) = req.into_parts();
|
||||||
if head.method != Method::GET {
|
if head.method != Method::GET {
|
||||||
return Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(body_empty())?);
|
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(body_empty())?)
|
||||||
}
|
} else if !accepts_json_or_all(&head.headers) {
|
||||||
if accepts_json_or_all(&head.headers) {
|
Ok(response(StatusCode::NOT_ACCEPTABLE).body(body_empty())?)
|
||||||
let _body_data = read_body_bytes(body).await?;
|
} else {
|
||||||
let status_id = &head.uri.path()[Self::path_prefix().len()..];
|
let status_id = &head.uri.path()[Self::path_prefix().len()..];
|
||||||
if status_id.len() == 8 {
|
debug!("RequestStatusHandler status_id {:?}", status_id);
|
||||||
debug!("RequestStatusHandler status_id {:?}", status_id);
|
|
||||||
let status = crate::status_board().unwrap().status_as_json(status_id);
|
if false {
|
||||||
|
let status = netpod::StatusBoardEntryUser::new_all_good();
|
||||||
let s = serde_json::to_string(&status)?;
|
let s = serde_json::to_string(&status)?;
|
||||||
let ret = response(StatusCode::OK).body(body_string(s))?;
|
let ret = response(StatusCode::OK).body(body_string(s))?;
|
||||||
Ok(ret)
|
return Ok(ret);
|
||||||
} else if status_id.len() == 10 {
|
|
||||||
let node_id =
|
|
||||||
u32::from_str_radix(&status_id[8..10], 16).map_err(|e| Error::with_msg_no_trace(e.to_string()))?;
|
|
||||||
let status_id = status_id[0..8].to_string();
|
|
||||||
let node2 = ncc
|
|
||||||
.node_config
|
|
||||||
.cluster
|
|
||||||
.nodes
|
|
||||||
.get(node_id as usize)
|
|
||||||
.ok_or_else(|| Error::with_msg_no_trace(format!("node {node_id} unknown")))?;
|
|
||||||
let url = Url::parse(&format!("{}api/1/requestStatus/{}", node2.baseurl(), status_id))
|
|
||||||
.map_err(|_| Error::with_msg_no_trace(format!("bad url")))?;
|
|
||||||
let res = httpclient::http_get(url, APP_JSON, ctx).await?;
|
|
||||||
let ret = response(StatusCode::OK).body(body_bytes(res.body))?;
|
|
||||||
Ok(ret)
|
|
||||||
} else {
|
|
||||||
let ret = error_status_response(StatusCode::BAD_REQUEST, format!("bad status id requested"), "0000");
|
|
||||||
Ok(ret)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let _body_data = read_body_bytes(body).await?;
|
||||||
|
match self.try_make_status(status_id, ctx, ncc).await {
|
||||||
|
Ok(x) => Ok(response(StatusCode::OK).body(body_bytes(x))?),
|
||||||
|
Err(e) => {
|
||||||
|
let ret = error_status_response(StatusCode::INTERNAL_SERVER_ERROR, e.to_string(), "0");
|
||||||
|
Ok(ret)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn try_make_status(&self, status_id: &str, ctx: &ReqCtx, ncc: &NodeConfigCached) -> Result<Bytes, Error> {
|
||||||
|
if status_id.len() == 8 {
|
||||||
|
let status = crate::status_board().unwrap().status_as_json(status_id);
|
||||||
|
let v = serde_json::to_vec(&status)?;
|
||||||
|
Ok(v.into())
|
||||||
|
} else if status_id.len() == 10 {
|
||||||
|
let node_id =
|
||||||
|
u32::from_str_radix(&status_id[8..10], 16).map_err(|e| Error::with_msg_no_trace(e.to_string()))?;
|
||||||
|
let status_id = status_id[0..8].to_string();
|
||||||
|
let node2 = ncc
|
||||||
|
.node_config
|
||||||
|
.cluster
|
||||||
|
.nodes
|
||||||
|
.get(node_id as usize)
|
||||||
|
.ok_or_else(|| Error::with_msg_no_trace(format!("node {node_id} unknown")))?;
|
||||||
|
let url = Url::parse(&format!("{}api/1/requestStatus/{}", node2.baseurl(), status_id))
|
||||||
|
.map_err(|_| Error::with_msg_no_trace(format!("bad url")))?;
|
||||||
|
let res = httpclient::http_get(url, APP_JSON, ctx).await?;
|
||||||
|
Ok(res.body)
|
||||||
} else {
|
} else {
|
||||||
return Ok(response(StatusCode::NOT_ACCEPTABLE).body(body_empty())?);
|
Err(Error::with_msg_no_trace(format!("bad request id")))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -236,7 +236,7 @@ async fn proxy_http_service_inner(
|
|||||||
} else if let Some(h) = api1::PythonDataApi1Query::handler(&req) {
|
} else if let Some(h) = api1::PythonDataApi1Query::handler(&req) {
|
||||||
h.handle(req, ctx, proxy_config).await
|
h.handle(req, ctx, proxy_config).await
|
||||||
} else if let Some(h) = api1::reqstatus::RequestStatusHandler::handler(&req) {
|
} else if let Some(h) = api1::reqstatus::RequestStatusHandler::handler(&req) {
|
||||||
h.handle(req, proxy_config).await
|
h.handle(req, ctx, proxy_config).await
|
||||||
} else if path.starts_with(DISTRI_PRE) {
|
} else if path.starts_with(DISTRI_PRE) {
|
||||||
proxy_distribute_v2(req).await
|
proxy_distribute_v2(req).await
|
||||||
} else if let Some(h) = super::api4::docs::DocsHandler::handler(&req) {
|
} else if let Some(h) = super::api4::docs::DocsHandler::handler(&req) {
|
||||||
|
|||||||
@@ -1,19 +1,20 @@
|
|||||||
use crate::bodystream::response;
|
use crate::bodystream::response;
|
||||||
use crate::err::Error;
|
use crate::err::Error;
|
||||||
use http::header;
|
use crate::requests::accepts_json_or_all;
|
||||||
|
use http::request::Parts;
|
||||||
use http::Method;
|
use http::Method;
|
||||||
use http::Request;
|
|
||||||
use http::StatusCode;
|
use http::StatusCode;
|
||||||
use http::Uri;
|
|
||||||
use httpclient::body_bytes;
|
use httpclient::body_bytes;
|
||||||
use httpclient::body_empty;
|
use httpclient::body_empty;
|
||||||
use httpclient::connect_client;
|
use httpclient::body_string;
|
||||||
use httpclient::read_body_bytes;
|
use httpclient::read_body_bytes;
|
||||||
use httpclient::Requ;
|
use httpclient::Requ;
|
||||||
use httpclient::StreamResponse;
|
use httpclient::StreamResponse;
|
||||||
|
use netpod::get_url_query_pairs;
|
||||||
use netpod::log::*;
|
use netpod::log::*;
|
||||||
|
use netpod::req_uri_to_url;
|
||||||
use netpod::ProxyConfig;
|
use netpod::ProxyConfig;
|
||||||
use netpod::ACCEPT_ALL;
|
use netpod::ReqCtx;
|
||||||
use netpod::APP_JSON;
|
use netpod::APP_JSON;
|
||||||
|
|
||||||
pub struct RequestStatusHandler {}
|
pub struct RequestStatusHandler {}
|
||||||
@@ -31,68 +32,56 @@ impl RequestStatusHandler {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn handle(&self, req: Requ, proxy_config: &ProxyConfig) -> Result<StreamResponse, Error> {
|
pub async fn handle(&self, req: Requ, ctx: &ReqCtx, proxy_config: &ProxyConfig) -> Result<StreamResponse, Error> {
|
||||||
let (head, body) = req.into_parts();
|
let (head, body) = req.into_parts();
|
||||||
if head.method != Method::GET {
|
if head.method != Method::GET {
|
||||||
return Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(body_empty())?);
|
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(body_empty())?)
|
||||||
}
|
} else if !accepts_json_or_all(&head.headers) {
|
||||||
let accept = head
|
Ok(response(StatusCode::NOT_ACCEPTABLE).body(body_empty())?)
|
||||||
.headers
|
|
||||||
.get(http::header::ACCEPT)
|
|
||||||
.map_or(Ok(ACCEPT_ALL), |k| k.to_str())
|
|
||||||
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?
|
|
||||||
.to_owned();
|
|
||||||
if accept != APP_JSON && accept != ACCEPT_ALL {
|
|
||||||
// TODO set the public error code and message and return Err(e).
|
|
||||||
let e = Error::with_public_msg_no_trace(format!("Unsupported Accept: {:?}", accept));
|
|
||||||
error!("{e}");
|
|
||||||
return Ok(response(StatusCode::NOT_ACCEPTABLE).body(body_empty())?);
|
|
||||||
}
|
|
||||||
let _body_data = read_body_bytes(body).await?;
|
|
||||||
let status_id = &head.uri.path()[Self::path_prefix().len()..];
|
|
||||||
debug!("RequestStatusHandler status_id {:?}", status_id);
|
|
||||||
if status_id.len() < 8 {
|
|
||||||
return Err(Error::with_msg_no_trace(format!("bad status id {}", status_id)));
|
|
||||||
}
|
|
||||||
|
|
||||||
let back = {
|
|
||||||
let mut ret = None;
|
|
||||||
for b in &proxy_config.backends {
|
|
||||||
if b.name == "sf-databuffer" {
|
|
||||||
ret = Some(b);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
ret
|
|
||||||
};
|
|
||||||
if let Some(back) = back {
|
|
||||||
let (status_id, url) = if back.url.contains("sf-daqbuf-23.psi.ch") {
|
|
||||||
// TODO split_at may panic on bad input
|
|
||||||
let (status_id, node_tgt) = status_id.split_at(status_id.len() - 2);
|
|
||||||
(status_id, back.url.replace("-23.", &format!("-{}.", node_tgt)))
|
|
||||||
} else {
|
|
||||||
(status_id, back.url.clone())
|
|
||||||
};
|
|
||||||
let url_str = format!("{}{}{}", url, Self::path_prefix(), status_id);
|
|
||||||
debug!("try to ask {url_str}");
|
|
||||||
let uri: Uri = url_str.parse()?;
|
|
||||||
let req = Request::builder()
|
|
||||||
.method(Method::GET)
|
|
||||||
.header(header::HOST, uri.host().unwrap())
|
|
||||||
.uri(&uri)
|
|
||||||
.body(body_empty())?;
|
|
||||||
let res = connect_client(&uri).await?.send_request(req).await?;
|
|
||||||
let (head, body) = res.into_parts();
|
|
||||||
if head.status != StatusCode::OK {
|
|
||||||
error!("backend returned error: {head:?}");
|
|
||||||
Ok(response(StatusCode::INTERNAL_SERVER_ERROR).body(body_empty())?)
|
|
||||||
} else {
|
|
||||||
debug!("backend returned OK");
|
|
||||||
let body = read_body_bytes(body).await?;
|
|
||||||
Ok(response(StatusCode::OK).body(body_bytes(body))?)
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
Ok(response(StatusCode::INTERNAL_SERVER_ERROR).body(body_empty())?)
|
let _body_data = read_body_bytes(body).await?;
|
||||||
|
self.handle_json(ctx, head, proxy_config).await
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn handle_json(
|
||||||
|
&self,
|
||||||
|
ctx: &ReqCtx,
|
||||||
|
head: Parts,
|
||||||
|
proxy_config: &ProxyConfig,
|
||||||
|
) -> Result<StreamResponse, Error> {
|
||||||
|
let status_id = &head.uri.path()[Self::path_prefix().len()..];
|
||||||
|
debug!("RequestStatusHandler status_id {:?}", status_id);
|
||||||
|
|
||||||
|
if false {
|
||||||
|
let status = netpod::StatusBoardEntryUser::new_all_good();
|
||||||
|
let s = serde_json::to_string(&status)?;
|
||||||
|
let ret = response(StatusCode::OK).body(body_string(s))?;
|
||||||
|
return Ok(ret);
|
||||||
|
}
|
||||||
|
|
||||||
|
let url = req_uri_to_url(&head.uri).map_err(|e| Error::with_msg_no_trace(e.to_string()))?;
|
||||||
|
let pairs = get_url_query_pairs(&url);
|
||||||
|
let pn = if let Some(backend) = pairs.get("backend") {
|
||||||
|
proxy_config
|
||||||
|
.backends
|
||||||
|
.iter()
|
||||||
|
.filter(|x| x.name == *backend)
|
||||||
|
.next()
|
||||||
|
.ok_or_else(|| Error::with_msg_no_trace(format!("no default backend found")))?
|
||||||
|
} else {
|
||||||
|
proxy_config
|
||||||
|
.backends
|
||||||
|
.iter()
|
||||||
|
.filter(|x| x.name == "sf-databuffer")
|
||||||
|
.next()
|
||||||
|
.ok_or_else(|| Error::with_msg_no_trace(format!("no default backend found")))?
|
||||||
|
};
|
||||||
|
let url_str = format!("{}{}{}", pn.url, Self::path_prefix(), status_id);
|
||||||
|
debug!("try to ask {url_str}");
|
||||||
|
let url = url_str.parse()?;
|
||||||
|
let res = httpclient::http_get(url, APP_JSON, ctx).await?;
|
||||||
|
let ret = response(StatusCode::OK).body(body_bytes(res.body))?;
|
||||||
|
Ok(ret)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -4160,6 +4160,17 @@ pub struct StatusBoardEntryUser {
|
|||||||
errors: Vec<::err::PublicError>,
|
errors: Vec<::err::PublicError>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl StatusBoardEntryUser {
|
||||||
|
pub fn new_all_good() -> Self {
|
||||||
|
Self {
|
||||||
|
error_count: 0,
|
||||||
|
warn_count: 0,
|
||||||
|
channel_not_found: 0,
|
||||||
|
errors: Vec::new(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl From<&StatusBoardEntry> for StatusBoardEntryUser {
|
impl From<&StatusBoardEntry> for StatusBoardEntryUser {
|
||||||
fn from(e: &StatusBoardEntry) -> Self {
|
fn from(e: &StatusBoardEntry) -> Self {
|
||||||
Self {
|
Self {
|
||||||
@@ -4248,19 +4259,10 @@ impl StatusBoard {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn status_as_json(&self, status_id: &str) -> StatusBoardEntryUser {
|
pub fn status_as_json(&self, status_id: &str) -> Option<StatusBoardEntryUser> {
|
||||||
match self.entries.get(status_id) {
|
match self.entries.get(status_id) {
|
||||||
Some(e) => e.into(),
|
Some(e) => Some(e.into()),
|
||||||
None => {
|
None => None,
|
||||||
error!("can not find status id {}", status_id);
|
|
||||||
let _e = ::err::Error::with_public_msg_no_trace(format!("request-id unknown {status_id}"));
|
|
||||||
StatusBoardEntryUser {
|
|
||||||
error_count: 1,
|
|
||||||
warn_count: 0,
|
|
||||||
channel_not_found: 0,
|
|
||||||
errors: vec![::err::Error::with_public_msg_no_trace("request-id not found").into()],
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user