Restore backwards compatibility with map-pulse requests which not include backend

This commit is contained in:
Dominik Werder
2022-11-29 09:46:06 +01:00
parent 94e49bd014
commit 75d492ff85
11 changed files with 111 additions and 36 deletions

View File

@@ -151,6 +151,7 @@ pub async fn make_event_pipe(
let channel_config = match read_local_config(evq.channel.clone(), node_config.node.clone()).await {
Ok(k) => k,
Err(e) => {
// TODO introduce detailed error type
if e.msg().contains("ErrorKind::NotFound") {
let s = futures_util::stream::empty();
return Ok(Box::pin(s));

View File

@@ -15,3 +15,4 @@ async-channel = "1.6"
chrono = { version = "0.4", features = ["serde"] }
url = "2.2"
regex = "1.5"
http = "0.2"

View File

@@ -366,6 +366,12 @@ impl From<rmp_serde::decode::Error> for Error {
}
}
impl From<http::header::ToStrError> for Error {
fn from(k: http::header::ToStrError) -> Self {
Self::with_msg(format!("{:?}", k))
}
}
#[derive(Debug, Serialize, Deserialize)]
pub struct PublicError {
reason: Option<Reason>,

View File

@@ -88,4 +88,5 @@ impl Convable for chrono::ParseError {}
impl Convable for url::ParseError {}
impl Convable for http::uri::InvalidUri {}
impl Convable for http::Error {}
impl Convable for http::header::ToStrError {}
impl Convable for hyper::Error {}

View File

@@ -118,7 +118,7 @@ async fn plain_events_json(
let query = query;
// ---
if query.backend() == "testbackend" {
if true || query.backend().starts_with("test-") {
let query = RawEventsQuery::new(query.channel().clone(), query.range().clone(), AggKind::Plain);
let item = streams::plaineventsjson::plain_events_json(query, &node_config.node_config.cluster).await?;
let buf = serde_json::to_vec(&item)?;

View File

@@ -67,7 +67,7 @@ pub async fn unused_gather_json_from_hosts(req: Request<Body>, pathpre: &str) ->
} else {
req
};
let req = req.header(http::header::ACCEPT, "application/json");
let req = req.header(http::header::ACCEPT, APP_JSON);
let req = req.body(Body::empty());
let task = tokio::spawn(async move {
select! {
@@ -100,7 +100,7 @@ pub async fn unused_gather_json_from_hosts(req: Request<Body>, pathpre: &str) ->
a.push(Hres { gh: tr.0, res });
}
let res = response(StatusCode::OK)
.header(http::header::CONTENT_TYPE, "application/json")
.header(http::header::CONTENT_TYPE, APP_JSON)
.body(serde_json::to_string(&Jres { hosts: a })?.into())?;
Ok(res)
}
@@ -118,8 +118,7 @@ pub async fn gather_get_json(req: Request<Body>, node_config: &NodeConfigCached)
.map(|node| {
let uri = format!("http://{}:{}/api/4/{}", node.host, node.port, pathsuf);
let req = Request::builder().method(Method::GET).uri(uri);
let req = req.header("x-log-from-node-name", format!("{}", node_config.node_config.name));
let req = req.header(http::header::ACCEPT, "application/json");
let req = req.header(http::header::ACCEPT, APP_JSON);
let req = req.body(Body::empty());
let task = tokio::spawn(async move {
select! {
@@ -187,8 +186,12 @@ where
+ 'static,
FT: Fn(Vec<SubRes<SM>>) -> Result<Response<Body>, Error>,
{
assert!(urls.len() == bodies.len());
assert!(urls.len() == tags.len());
if urls.len() != bodies.len() {
return Err(Error::with_msg_no_trace("unequal numbers of urls and bodies"));
}
if urls.len() != tags.len() {
return Err(Error::with_msg_no_trace("unequal numbers of urls and tags"));
}
let spawned: Vec<_> = urls
.into_iter()
.zip(bodies.into_iter())
@@ -237,7 +240,7 @@ where
(url, jh)
})
.collect();
let mut a: Vec<SubRes<SM>> = vec![];
let mut a: Vec<SubRes<SM>> = Vec::new();
for (_url, jh) in spawned {
let res: SubRes<SM> = match jh.await {
Ok(k) => match k {
@@ -281,7 +284,7 @@ mod test {
},
|_all| {
let res = response(StatusCode::OK)
.header(http::header::CONTENT_TYPE, "application/json")
.header(http::header::CONTENT_TYPE, APP_JSON)
.body(serde_json::to_string(&42)?.into())?;
Ok(res)
},

View File

@@ -47,6 +47,7 @@ use tracing::Instrument;
use url::Url;
pub const PSI_DAQBUFFER_SERVICE_MARK: &'static str = "PSI-Daqbuffer-Service-Mark";
pub const PSI_DAQBUFFER_SEEN_URL: &'static str = "PSI-Daqbuffer-Seen-Url";
pub async fn host(node_config: NodeConfigCached) -> Result<(), Error> {
static STATUS_BOARD_INIT: Once = Once::new();
@@ -231,6 +232,15 @@ macro_rules! static_http_api1 {
}
async fn http_service_try(req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
use http::HeaderValue;
let mut urlmarks = Vec::new();
urlmarks.push(format!("{}:{}", req.method(), req.uri()));
for (k, v) in req.headers() {
if k == PSI_DAQBUFFER_SEEN_URL {
let s = String::from_utf8_lossy(v.as_bytes());
urlmarks.push(s.into());
}
}
let ctx = ReqCtx::with_node(&req, node_config);
let mut res = http_service_inner(req, &ctx, node_config).await?;
let hm = res.headers_mut();
@@ -240,6 +250,10 @@ async fn http_service_try(req: Request<Body>, node_config: &NodeConfigCached) ->
hm.append(PSI_DAQBUFFER_SERVICE_MARK, m.parse().unwrap());
}
hm.append(PSI_DAQBUFFER_SERVICE_MARK, ctx.mark.parse().unwrap());
for s in urlmarks {
let v = HeaderValue::from_str(&s).unwrap_or_else(|_| HeaderValue::from_static("invalid"));
hm.append(PSI_DAQBUFFER_SEEN_URL, v);
}
Ok(res)
}
@@ -398,12 +412,21 @@ async fn http_service_inner(
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?)
}
} else {
Ok(response(StatusCode::NOT_FOUND).body(Body::from(format!(
"Sorry, not found: {:?} {:?} {:?}",
req.method(),
req.uri().path(),
req.uri().query(),
)))?)
use std::fmt::Write;
let mut body = String::new();
let out = &mut body;
write!(out, "<pre>\n")?;
write!(out, "METHOD {:?}<br>\n", req.method())?;
write!(out, "URI {:?}<br>\n", req.uri())?;
write!(out, "HOST {:?}<br>\n", req.uri().host())?;
write!(out, "PORT {:?}<br>\n", req.uri().port())?;
write!(out, "PATH {:?}<br>\n", req.uri().path())?;
write!(out, "QUERY {:?}<br>\n", req.uri().query())?;
for (hn, hv) in req.headers() {
write!(out, "HEADER {hn:?}: {hv:?}<br>\n")?;
}
write!(out, "</pre>\n")?;
Ok(response(StatusCode::NOT_FOUND).body(Body::from(body))?)
}
}

View File

@@ -167,14 +167,17 @@ async fn proxy_http_service_inner(
use std::fmt::Write;
let mut body = String::new();
let out = &mut body;
write!(out, "<pre>\n")?;
write!(out, "METHOD {:?}<br>\n", req.method())?;
write!(out, "URI {:?}<br>\n", req.uri())?;
write!(out, "HOST {:?}<br>\n", req.uri().host())?;
write!(out, "PORT {:?}<br>\n", req.uri().port())?;
write!(out, "PATH {:?}<br>\n", req.uri().path())?;
write!(out, "URI {:?}<br>\n", req.uri())?;
write!(out, "HOST {:?}<br>\n", req.uri().host())?;
write!(out, "PORT {:?}<br>\n", req.uri().port())?;
write!(out, "PATH {:?}<br>\n", req.uri().path())?;
write!(out, "QUERY {:?}<br>\n", req.uri().query())?;
for (hn, hv) in req.headers() {
write!(out, "HEADER {hn:?}: {hv:?}<br>\n")?;
}
write!(out, "</pre>\n")?;
Ok(response(StatusCode::NOT_FOUND).body(Body::from(body))?)
}
}
@@ -536,6 +539,7 @@ pub async fn proxy_single_backend_query<QT>(
where
QT: FromUrl + AppendToUrl + HasBackend + HasTimeout,
{
info!("proxy_single_backend_query");
let (head, _body) = req.into_parts();
match head.headers.get(http::header::ACCEPT) {
Some(v) => {
@@ -548,15 +552,40 @@ where
return Ok(response_err(StatusCode::BAD_REQUEST, msg)?);
}
};
// TODO is this special case used any more?
// TODO remove this special case
// SPECIAL CASE
// For pulse mapping we want to use a separate list of backends from the config file.
// In general, caller of this function should already provide the chosen list of backends.
let sh = if url.as_str().contains("/map/pulse/") {
get_query_host_for_backend_2(&query.backend(), proxy_config)?
} else {
get_query_host_for_backend(&query.backend(), proxy_config)?
};
// TODO remove this special case
// SPECIAL CASE:
// Since the inner proxy is not yet handling map-pulse requests without backend,
// we can not simply copy the original url here.
// Instead, url needs to get parsed and formatted.
// In general, the caller of this function should be able to provide a url, or maybe
// better a closure so that the url can even depend on backend.
let uri_path: String = if url.as_str().contains("/map/pulse/") {
match MapPulseQuery::from_url(&url) {
Ok(qu) => {
info!("qu {qu:?}");
format!("/api/4/map/pulse/{}/{}", qu.backend, qu.pulse)
}
Err(e) => {
error!("{e:?}");
String::from("/BAD")
}
}
} else {
head.uri.path().into()
};
info!("uri_path {uri_path}");
let urls = [sh]
.iter()
.map(|sh| match Url::parse(&format!("{}{}", sh, head.uri.path())) {
.map(|sh| match Url::parse(&format!("{}{}", sh, uri_path)) {
Ok(mut url) => {
query.append_to_url(&mut url);
Ok(url)

View File

@@ -99,18 +99,21 @@ pub async fn channel_search(req: Request<Body>, proxy_config: &ProxyConfig) -> R
pub async fn node_status(req: Request<Body>, proxy_config: &ProxyConfig) -> Result<Response<Body>, Error> {
let (head, _body) = req.into_parts();
let vdef = header::HeaderValue::from_static(APP_JSON);
let v = head.headers.get(header::ACCEPT).unwrap_or(&vdef);
if v == APP_JSON || v == ACCEPT_ALL {
let mut bodies = vec![];
let v = head
.headers
.get(http::header::ACCEPT)
.ok_or(Error::with_msg_no_trace("no accept header"))?;
let v = v.to_str()?;
if v.contains(APP_JSON) || v.contains(ACCEPT_ALL) {
let mut bodies = Vec::new();
let urls = proxy_config
.backends_status
.iter()
.map(|pb| match Url::parse(&format!("{}/api/4/node_status", pb.url)) {
.map(|b| match Url::parse(&format!("{}/api/4/node_status", b.url)) {
Ok(url) => Ok(url),
Err(_) => Err(Error::with_msg(format!("parse error for: {:?}", pb))),
Err(e) => Err(Error::with_msg(format!("parse error for: {b:?} {e:?}"))),
})
.fold_ok(vec![], |mut a, x| {
.fold_ok(Vec::new(), |mut a, x| {
a.push(x);
bodies.push(None);
a
@@ -137,7 +140,7 @@ pub async fn node_status(req: Request<Body>, proxy_config: &ProxyConfig) -> Resu
Box::pin(fut) as Pin<Box<dyn Future<Output = _> + Send>>
};
let ft = |all: Vec<SubRes<JsVal>>| {
let mut sres = vec![];
let mut sres = Vec::new();
for j in all {
let val = serde_json::json!({
j.tag: j.val,
@@ -145,7 +148,7 @@ pub async fn node_status(req: Request<Body>, proxy_config: &ProxyConfig) -> Resu
sres.push(val);
}
let res = serde_json::json!({
"THIS_PROXY": "status here...",
proxy_config.name.clone(): "TODO proxy status",
"subnodes": sres,
});
let res = response(StatusCode::OK)

View File

@@ -463,8 +463,8 @@ async fn search_pulse(pulse: u64, path: &Path) -> Result<Option<u64>, Error> {
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MapPulseQuery {
backend: String,
pulse: u64,
pub backend: String,
pub pulse: u64,
}
impl HasBackend for MapPulseQuery {
@@ -486,12 +486,19 @@ impl FromUrl for MapPulseQuery {
.ok_or(Error::with_msg_no_trace("no path in url"))?
.rev();
let pulsestr = pit.next().ok_or(Error::with_msg_no_trace("no pulse in url path"))?;
let backend = pit
.next()
.ok_or(Error::with_msg_no_trace("no backend in url path"))?
.into();
let backend = pit.next().unwrap_or("sf-databuffer").into();
//.ok_or(Error::with_msg_no_trace("no backend in url path"))?
//.into();
// TODO !!!
// Clients MUST specify the backend
let backend = if backend == "pulse" {
String::from("sf-databuffer")
} else {
backend
};
let pulse: u64 = pulsestr.parse()?;
let ret = Self { backend, pulse };
info!("FromUrl {ret:?}");
Ok(ret)
}

View File

@@ -756,6 +756,7 @@ pub enum GenVar {
ConstRegular,
}
// TODO move to databuffer-specific crate
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ChannelConfig {
pub channel: Channel,