Unify service version, postcard, remove error overhead
This commit is contained in:
@@ -995,7 +995,7 @@ impl Api1EventsBinaryHandler {
|
||||
Ok(ret)
|
||||
} else {
|
||||
// TODO set the public error code and message and return Err(e).
|
||||
let e = Error::with_public_msg(format!("{self_name} unsupported Accept: {}", accept));
|
||||
let e = Error::with_public_msg_no_trace(format!("{self_name} unsupported Accept: {}", accept));
|
||||
error!("{self_name} {e}");
|
||||
Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?)
|
||||
}
|
||||
@@ -1030,7 +1030,7 @@ impl RequestStatusHandler {
|
||||
.to_owned();
|
||||
if accept != APP_JSON && accept != ACCEPT_ALL {
|
||||
// TODO set the public error code and message and return Err(e).
|
||||
let e = Error::with_public_msg(format!("Unsupported Accept: {:?}", accept));
|
||||
let e = Error::with_public_msg_no_trace(format!("Unsupported Accept: {:?}", accept));
|
||||
error!("{e}");
|
||||
return Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?);
|
||||
}
|
||||
|
||||
@@ -12,9 +12,6 @@ use netpod::log::*;
|
||||
use netpod::timeunits::SEC;
|
||||
use netpod::FromUrl;
|
||||
use netpod::NodeConfigCached;
|
||||
use netpod::ACCEPT_ALL;
|
||||
use netpod::APP_JSON;
|
||||
use netpod::APP_OCTET;
|
||||
use query::api4::binned::BinnedQuery;
|
||||
use tracing::Instrument;
|
||||
use url::Url;
|
||||
@@ -37,6 +34,7 @@ async fn binned_json(url: Url, req: Request<Body>, node_config: &NodeConfigCache
|
||||
let span1 = span!(
|
||||
Level::INFO,
|
||||
"httpret::binned",
|
||||
reqid,
|
||||
beg = query.range().beg_u64() / SEC,
|
||||
end = query.range().end_u64() / SEC,
|
||||
ch = query.channel().name().clone(),
|
||||
@@ -53,10 +51,6 @@ async fn binned_json(url: Url, req: Request<Body>, node_config: &NodeConfigCache
|
||||
}
|
||||
|
||||
async fn binned(req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
|
||||
let accept = req
|
||||
.headers()
|
||||
.get(http::header::ACCEPT)
|
||||
.map_or(ACCEPT_ALL, |k| k.to_str().unwrap_or(ACCEPT_ALL));
|
||||
let url = {
|
||||
let s1 = format!("dummy:{}", req.uri());
|
||||
Url::parse(&s1)
|
||||
@@ -70,15 +64,18 @@ async fn binned(req: Request<Body>, node_config: &NodeConfigCached) -> Result<Re
|
||||
{
|
||||
Err(Error::with_msg_no_trace("hidden message").add_public_msg("PublicMessage"))?;
|
||||
}
|
||||
if accept.contains(APP_JSON) || accept.contains(ACCEPT_ALL) {
|
||||
if crate::accepts_json(&req.headers()) {
|
||||
Ok(binned_json(url, req, node_config).await?)
|
||||
} else if accept == APP_OCTET {
|
||||
} else if crate::accepts_octets(&req.headers()) {
|
||||
Ok(response_err(
|
||||
StatusCode::NOT_ACCEPTABLE,
|
||||
format!("binary binned data not yet available"),
|
||||
)?)
|
||||
} else {
|
||||
let ret = response_err(StatusCode::NOT_ACCEPTABLE, format!("Unsupported Accept: {:?}", accept))?;
|
||||
let ret = response_err(
|
||||
StatusCode::NOT_ACCEPTABLE,
|
||||
format!("Unsupported Accept: {:?}", req.headers()),
|
||||
)?;
|
||||
Ok(ret)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -9,6 +9,7 @@ use netpod::log::*;
|
||||
use netpod::NodeConfigCached;
|
||||
use netpod::NodeStatus;
|
||||
use netpod::NodeStatusArchiverAppliance;
|
||||
use netpod::ServiceVersion;
|
||||
use netpod::TableSizes;
|
||||
use std::collections::VecDeque;
|
||||
use std::time::Duration;
|
||||
@@ -39,8 +40,13 @@ impl StatusNodesRecursive {
|
||||
req: Request<Body>,
|
||||
ctx: &ReqCtx,
|
||||
node_config: &NodeConfigCached,
|
||||
service_version: &ServiceVersion,
|
||||
) -> Result<Response<Body>, Error> {
|
||||
let res = tokio::time::timeout(Duration::from_millis(1200), self.status(req, ctx, node_config)).await;
|
||||
let res = tokio::time::timeout(
|
||||
Duration::from_millis(1200),
|
||||
self.status(req, ctx, node_config, service_version),
|
||||
)
|
||||
.await;
|
||||
let res = match res {
|
||||
Ok(res) => res,
|
||||
Err(e) => {
|
||||
@@ -67,6 +73,7 @@ impl StatusNodesRecursive {
|
||||
req: Request<Body>,
|
||||
_ctx: &ReqCtx,
|
||||
node_config: &NodeConfigCached,
|
||||
service_version: &ServiceVersion,
|
||||
) -> Result<NodeStatus, Error> {
|
||||
let (_head, _body) = req.into_parts();
|
||||
let archiver_appliance_status = match node_config.node.archiver_appliance.as_ref() {
|
||||
@@ -96,7 +103,7 @@ impl StatusNodesRecursive {
|
||||
let database_size = dbconn::database_size(node_config).await.map_err(|e| format!("{e}"));
|
||||
let ret = NodeStatus {
|
||||
name: format!("{}:{}", node_config.node.host, node_config.node.port),
|
||||
version: core::env!("CARGO_PKG_VERSION").into(),
|
||||
version: service_version.to_string(),
|
||||
is_sf_databuffer: node_config.node.sf_databuffer.is_some(),
|
||||
is_archiver_engine: node_config.node.channel_archiver.is_some(),
|
||||
is_archiver_appliance: node_config.node.archiver_appliance.is_some(),
|
||||
|
||||
@@ -37,6 +37,7 @@ use netpod::query::prebinned::PreBinnedQuery;
|
||||
use netpod::CmpZero;
|
||||
use netpod::NodeConfigCached;
|
||||
use netpod::ProxyConfig;
|
||||
use netpod::ServiceVersion;
|
||||
use netpod::APP_JSON;
|
||||
use netpod::APP_JSON_LINES;
|
||||
use nodenet::conn::events_service;
|
||||
@@ -100,7 +101,27 @@ impl ::err::ToErr for RetrievalError {
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn host(node_config: NodeConfigCached) -> Result<(), RetrievalError> {
|
||||
pub fn accepts_json(hm: &http::HeaderMap) -> bool {
|
||||
match hm.get(http::header::ACCEPT) {
|
||||
Some(x) => match x.to_str() {
|
||||
Ok(x) => x.contains(netpod::APP_JSON) || x.contains(netpod::ACCEPT_ALL),
|
||||
Err(_) => false,
|
||||
},
|
||||
None => false,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn accepts_octets(hm: &http::HeaderMap) -> bool {
|
||||
match hm.get(http::header::ACCEPT) {
|
||||
Some(x) => match x.to_str() {
|
||||
Ok(x) => x.contains(netpod::APP_OCTET),
|
||||
Err(_) => false,
|
||||
},
|
||||
None => false,
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn host(node_config: NodeConfigCached, service_version: ServiceVersion) -> Result<(), RetrievalError> {
|
||||
static STATUS_BOARD_INIT: Once = Once::new();
|
||||
STATUS_BOARD_INIT.call_once(|| {
|
||||
let b = StatusBoard::new();
|
||||
@@ -124,6 +145,7 @@ pub async fn host(node_config: NodeConfigCached) -> Result<(), RetrievalError> {
|
||||
debug!("new connection from {:?}", conn.remote_addr());
|
||||
let node_config = node_config.clone();
|
||||
let addr = conn.remote_addr();
|
||||
let service_version = service_version.clone();
|
||||
async move {
|
||||
Ok::<_, Error>(service_fn({
|
||||
move |req| {
|
||||
@@ -135,7 +157,7 @@ pub async fn host(node_config: NodeConfigCached) -> Result<(), RetrievalError> {
|
||||
req.uri(),
|
||||
req.headers()
|
||||
);
|
||||
let f = http_service(req, node_config.clone());
|
||||
let f = http_service(req, node_config.clone(), service_version.clone());
|
||||
Cont { f: Box::pin(f) }
|
||||
}
|
||||
}))
|
||||
@@ -150,8 +172,12 @@ pub async fn host(node_config: NodeConfigCached) -> Result<(), RetrievalError> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn http_service(req: Request<Body>, node_config: NodeConfigCached) -> Result<Response<Body>, Error> {
|
||||
match http_service_try(req, &node_config).await {
|
||||
async fn http_service(
|
||||
req: Request<Body>,
|
||||
node_config: NodeConfigCached,
|
||||
service_version: ServiceVersion,
|
||||
) -> Result<Response<Body>, Error> {
|
||||
match http_service_try(req, &node_config, &service_version).await {
|
||||
Ok(k) => Ok(k),
|
||||
Err(e) => {
|
||||
error!("daqbuffer node http_service sees error: {}", e);
|
||||
@@ -286,7 +312,11 @@ macro_rules! static_http_api1 {
|
||||
};
|
||||
}
|
||||
|
||||
async fn http_service_try(req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
|
||||
async fn http_service_try(
|
||||
req: Request<Body>,
|
||||
node_config: &NodeConfigCached,
|
||||
service_version: &ServiceVersion,
|
||||
) -> Result<Response<Body>, Error> {
|
||||
use http::HeaderValue;
|
||||
let mut urlmarks = Vec::new();
|
||||
urlmarks.push(format!("{}:{}", req.method(), req.uri()));
|
||||
@@ -297,7 +327,7 @@ async fn http_service_try(req: Request<Body>, node_config: &NodeConfigCached) ->
|
||||
}
|
||||
}
|
||||
let ctx = ReqCtx::with_node(&req, node_config);
|
||||
let mut res = http_service_inner(req, &ctx, node_config).await?;
|
||||
let mut res = http_service_inner(req, &ctx, node_config, service_version).await?;
|
||||
let hm = res.headers_mut();
|
||||
hm.append("Access-Control-Allow-Origin", "*".parse().unwrap());
|
||||
hm.append("Access-Control-Allow-Headers", "*".parse().unwrap());
|
||||
@@ -316,19 +346,17 @@ async fn http_service_inner(
|
||||
req: Request<Body>,
|
||||
ctx: &ReqCtx,
|
||||
node_config: &NodeConfigCached,
|
||||
service_version: &ServiceVersion,
|
||||
) -> Result<Response<Body>, RetrievalError> {
|
||||
let uri = req.uri().clone();
|
||||
let path = uri.path();
|
||||
if path == "/api/4/private/version" {
|
||||
if req.method() == Method::GET {
|
||||
let ver_maj: u32 = std::env!("CARGO_PKG_VERSION_MAJOR").parse().unwrap_or(0);
|
||||
let ver_min: u32 = std::env!("CARGO_PKG_VERSION_MINOR").parse().unwrap_or(0);
|
||||
let ver_pat: u32 = std::env!("CARGO_PKG_VERSION_PATCH").parse().unwrap_or(0);
|
||||
let ret = serde_json::json!({
|
||||
"daqbuf_version": {
|
||||
"major": ver_maj,
|
||||
"minor": ver_min,
|
||||
"patch": ver_pat,
|
||||
"major": service_version.major,
|
||||
"minor": service_version.minor,
|
||||
"patch": service_version.patch,
|
||||
},
|
||||
});
|
||||
Ok(response(StatusCode::OK).body(Body::from(serde_json::to_vec(&ret)?))?)
|
||||
@@ -355,7 +383,7 @@ async fn http_service_inner(
|
||||
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?)
|
||||
}
|
||||
} else if let Some(h) = api4::status::StatusNodesRecursive::handler(&req) {
|
||||
Ok(h.handle(req, ctx, &node_config).await?)
|
||||
Ok(h.handle(req, ctx, &node_config, service_version).await?)
|
||||
} else if let Some(h) = StatusBoardAllHandler::handler(&req) {
|
||||
h.handle(req, &node_config).await
|
||||
} else if let Some(h) = api4::databuffer_tools::FindActiveHandler::handler(&req) {
|
||||
|
||||
@@ -37,6 +37,7 @@ use netpod::FromUrl;
|
||||
use netpod::HasBackend;
|
||||
use netpod::HasTimeout;
|
||||
use netpod::ProxyConfig;
|
||||
use netpod::ServiceVersion;
|
||||
use netpod::ACCEPT_ALL;
|
||||
use netpod::APP_JSON;
|
||||
use query::api4::binned::BinnedQuery;
|
||||
@@ -57,12 +58,13 @@ use url::Url;
|
||||
|
||||
const DISTRI_PRE: &str = "/distri/";
|
||||
|
||||
pub async fn proxy(proxy_config: ProxyConfig) -> Result<(), Error> {
|
||||
pub async fn proxy(proxy_config: ProxyConfig, service_version: ServiceVersion) -> Result<(), Error> {
|
||||
use std::str::FromStr;
|
||||
let addr = SocketAddr::from_str(&format!("{}:{}", proxy_config.listen, proxy_config.port))?;
|
||||
let make_service = make_service_fn({
|
||||
move |_conn| {
|
||||
let proxy_config = proxy_config.clone();
|
||||
let service_version = service_version.clone();
|
||||
async move {
|
||||
Ok::<_, Error>(service_fn({
|
||||
move |req| {
|
||||
@@ -73,7 +75,7 @@ pub async fn proxy(proxy_config: ProxyConfig) -> Result<(), Error> {
|
||||
req.uri(),
|
||||
req.headers()
|
||||
);
|
||||
let f = proxy_http_service(req, proxy_config.clone());
|
||||
let f = proxy_http_service(req, proxy_config.clone(), service_version.clone());
|
||||
Cont { f: Box::pin(f) }
|
||||
}
|
||||
}))
|
||||
@@ -84,8 +86,12 @@ pub async fn proxy(proxy_config: ProxyConfig) -> Result<(), Error> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn proxy_http_service(req: Request<Body>, proxy_config: ProxyConfig) -> Result<Response<Body>, Error> {
|
||||
match proxy_http_service_try(req, &proxy_config).await {
|
||||
async fn proxy_http_service(
|
||||
req: Request<Body>,
|
||||
proxy_config: ProxyConfig,
|
||||
service_version: ServiceVersion,
|
||||
) -> Result<Response<Body>, Error> {
|
||||
match proxy_http_service_try(req, &proxy_config, &service_version).await {
|
||||
Ok(k) => Ok(k),
|
||||
Err(e) => {
|
||||
error!("data_api_proxy sees error: {:?}", e);
|
||||
@@ -94,9 +100,13 @@ async fn proxy_http_service(req: Request<Body>, proxy_config: ProxyConfig) -> Re
|
||||
}
|
||||
}
|
||||
|
||||
async fn proxy_http_service_try(req: Request<Body>, proxy_config: &ProxyConfig) -> Result<Response<Body>, Error> {
|
||||
async fn proxy_http_service_try(
|
||||
req: Request<Body>,
|
||||
proxy_config: &ProxyConfig,
|
||||
service_version: &ServiceVersion,
|
||||
) -> Result<Response<Body>, Error> {
|
||||
let ctx = ReqCtx::with_proxy(&req, proxy_config);
|
||||
let mut res = proxy_http_service_inner(req, &ctx, proxy_config).await?;
|
||||
let mut res = proxy_http_service_inner(req, &ctx, proxy_config, &service_version).await?;
|
||||
let hm = res.headers_mut();
|
||||
hm.insert("Access-Control-Allow-Origin", "*".parse().unwrap());
|
||||
hm.insert("Access-Control-Allow-Headers", "*".parse().unwrap());
|
||||
@@ -111,6 +121,7 @@ async fn proxy_http_service_inner(
|
||||
req: Request<Body>,
|
||||
ctx: &ReqCtx,
|
||||
proxy_config: &ProxyConfig,
|
||||
service_version: &ServiceVersion,
|
||||
) -> Result<Response<Body>, Error> {
|
||||
let uri = req.uri().clone();
|
||||
let path = uri.path();
|
||||
@@ -122,14 +133,11 @@ async fn proxy_http_service_inner(
|
||||
Ok(gather_json_2_v1(req, "/api/1/gather/", proxy_config).await?)
|
||||
} else if path == "/api/4/private/version" {
|
||||
if req.method() == Method::GET {
|
||||
let ver_maj: u32 = std::env!("CARGO_PKG_VERSION_MAJOR").parse().unwrap_or(0);
|
||||
let ver_min: u32 = std::env!("CARGO_PKG_VERSION_MINOR").parse().unwrap_or(0);
|
||||
let ver_pat: u32 = std::env!("CARGO_PKG_VERSION_PATCH").parse().unwrap_or(0);
|
||||
let ret = serde_json::json!({
|
||||
"daqbuf_version": {
|
||||
"major": ver_maj,
|
||||
"minor": ver_min,
|
||||
"patch": ver_pat,
|
||||
"major": service_version.major,
|
||||
"minor": service_version.minor,
|
||||
"patch": service_version.patch,
|
||||
},
|
||||
});
|
||||
Ok(response(StatusCode::OK).body(Body::from(serde_json::to_vec(&ret)?))?)
|
||||
@@ -137,7 +145,7 @@ async fn proxy_http_service_inner(
|
||||
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?)
|
||||
}
|
||||
} else if let Some(h) = api4::StatusNodesRecursive::handler(&req) {
|
||||
h.handle(req, ctx, &proxy_config).await
|
||||
h.handle(req, ctx, &proxy_config, service_version).await
|
||||
} else if path == "/api/4/backends" {
|
||||
Ok(backends(req, proxy_config).await?)
|
||||
} else if let Some(h) = api4::ChannelSearchAggHandler::handler(&req) {
|
||||
|
||||
@@ -39,7 +39,7 @@ impl RequestStatusHandler {
|
||||
.to_owned();
|
||||
if accept != APP_JSON && accept != ACCEPT_ALL {
|
||||
// TODO set the public error code and message and return Err(e).
|
||||
let e = Error::with_public_msg(format!("Unsupported Accept: {:?}", accept));
|
||||
let e = Error::with_public_msg_no_trace(format!("Unsupported Accept: {:?}", accept));
|
||||
error!("{e}");
|
||||
return Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?);
|
||||
}
|
||||
|
||||
@@ -19,6 +19,7 @@ use netpod::ChannelSearchResult;
|
||||
use netpod::NodeStatus;
|
||||
use netpod::NodeStatusSub;
|
||||
use netpod::ProxyConfig;
|
||||
use netpod::ServiceVersion;
|
||||
use netpod::ACCEPT_ALL;
|
||||
use netpod::APP_JSON;
|
||||
use serde_json::Value as JsVal;
|
||||
@@ -162,9 +163,10 @@ impl StatusNodesRecursive {
|
||||
&self,
|
||||
req: Request<Body>,
|
||||
ctx: &ReqCtx,
|
||||
node_config: &ProxyConfig,
|
||||
proxy_config: &ProxyConfig,
|
||||
service_version: &ServiceVersion,
|
||||
) -> Result<Response<Body>, Error> {
|
||||
match self.status(req, ctx, node_config).await {
|
||||
match self.status(req, ctx, proxy_config, service_version).await {
|
||||
Ok(status) => {
|
||||
let body = serde_json::to_vec(&status)?;
|
||||
let ret = response(StatusCode::OK).body(Body::from(body))?;
|
||||
@@ -183,6 +185,7 @@ impl StatusNodesRecursive {
|
||||
_req: Request<Body>,
|
||||
_ctx: &ReqCtx,
|
||||
proxy_config: &ProxyConfig,
|
||||
service_version: &ServiceVersion,
|
||||
) -> Result<NodeStatus, Error> {
|
||||
let path = crate::api4::status::StatusNodesRecursive::path();
|
||||
let mut bodies = Vec::new();
|
||||
@@ -243,7 +246,7 @@ impl StatusNodesRecursive {
|
||||
}
|
||||
let ret = NodeStatus {
|
||||
name: format!("{}:{}", proxy_config.name, proxy_config.port),
|
||||
version: core::env!("CARGO_PKG_VERSION").into(),
|
||||
version: service_version.to_string(),
|
||||
is_sf_databuffer: false,
|
||||
is_archiver_engine: false,
|
||||
is_archiver_appliance: false,
|
||||
|
||||
@@ -881,9 +881,11 @@ impl FromUrl for MapPulseQuery {
|
||||
fn from_url(url: &url::Url) -> Result<Self, err::Error> {
|
||||
let mut pit = url
|
||||
.path_segments()
|
||||
.ok_or(Error::with_msg_no_trace("no path in url"))?
|
||||
.ok_or_else(|| Error::with_msg_no_trace("no path in url"))?
|
||||
.rev();
|
||||
let pulsestr = pit.next().ok_or(Error::with_msg_no_trace("no pulse in url path"))?;
|
||||
let pulsestr = pit
|
||||
.next()
|
||||
.ok_or_else(|| Error::with_msg_no_trace("no pulse in url path"))?;
|
||||
let backend = pit.next().unwrap_or("sf-databuffer").into();
|
||||
// TODO legacy: use a default backend if not specified.
|
||||
let backend = if backend == "pulse" {
|
||||
|
||||
@@ -35,7 +35,7 @@ impl SettingsThreadsMaxHandler {
|
||||
.to_owned();
|
||||
if accept != APP_JSON && accept != ACCEPT_ALL {
|
||||
// TODO set the public error code and message and return Err(e).
|
||||
let e = Error::with_public_msg(format!("Unsupported Accept: {:?}", accept));
|
||||
let e = Error::with_public_msg_no_trace(format!("Unsupported Accept: {:?}", accept));
|
||||
error!("{e}");
|
||||
return Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user