From 75d492ff855db6a8a360198a5832c5999f96e765 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Tue, 29 Nov 2022 09:46:06 +0100 Subject: [PATCH] Restore backwards compatibility with map-pulse requests which not include backend --- disk/src/raw/conn.rs | 1 + err/Cargo.toml | 1 + err/src/lib.rs | 6 ++++++ httpret/src/err.rs | 1 + httpret/src/events.rs | 2 +- httpret/src/gather.rs | 19 ++++++++++-------- httpret/src/httpret.rs | 35 +++++++++++++++++++++++++++------ httpret/src/proxy.rs | 41 +++++++++++++++++++++++++++++++++------ httpret/src/proxy/api4.rs | 21 +++++++++++--------- httpret/src/pulsemap.rs | 19 ++++++++++++------ netpod/src/netpod.rs | 1 + 11 files changed, 111 insertions(+), 36 deletions(-) diff --git a/disk/src/raw/conn.rs b/disk/src/raw/conn.rs index d0326a4..ca21c73 100644 --- a/disk/src/raw/conn.rs +++ b/disk/src/raw/conn.rs @@ -151,6 +151,7 @@ pub async fn make_event_pipe( let channel_config = match read_local_config(evq.channel.clone(), node_config.node.clone()).await { Ok(k) => k, Err(e) => { + // TODO introduce detailed error type if e.msg().contains("ErrorKind::NotFound") { let s = futures_util::stream::empty(); return Ok(Box::pin(s)); diff --git a/err/Cargo.toml b/err/Cargo.toml index efe01c0..8d9b383 100644 --- a/err/Cargo.toml +++ b/err/Cargo.toml @@ -15,3 +15,4 @@ async-channel = "1.6" chrono = { version = "0.4", features = ["serde"] } url = "2.2" regex = "1.5" +http = "0.2" diff --git a/err/src/lib.rs b/err/src/lib.rs index ab27d66..4899322 100644 --- a/err/src/lib.rs +++ b/err/src/lib.rs @@ -366,6 +366,12 @@ impl From for Error { } } +impl From for Error { + fn from(k: http::header::ToStrError) -> Self { + Self::with_msg(format!("{:?}", k)) + } +} + #[derive(Debug, Serialize, Deserialize)] pub struct PublicError { reason: Option, diff --git a/httpret/src/err.rs b/httpret/src/err.rs index cc22a7d..d02f676 100644 --- a/httpret/src/err.rs +++ b/httpret/src/err.rs @@ -88,4 +88,5 @@ impl Convable for chrono::ParseError {} impl Convable for url::ParseError {} impl Convable for http::uri::InvalidUri {} impl Convable for http::Error {} +impl Convable for http::header::ToStrError {} impl Convable for hyper::Error {} diff --git a/httpret/src/events.rs b/httpret/src/events.rs index cf5c3f8..cd72f27 100644 --- a/httpret/src/events.rs +++ b/httpret/src/events.rs @@ -118,7 +118,7 @@ async fn plain_events_json( let query = query; // --- - if query.backend() == "testbackend" { + if true || query.backend().starts_with("test-") { let query = RawEventsQuery::new(query.channel().clone(), query.range().clone(), AggKind::Plain); let item = streams::plaineventsjson::plain_events_json(query, &node_config.node_config.cluster).await?; let buf = serde_json::to_vec(&item)?; diff --git a/httpret/src/gather.rs b/httpret/src/gather.rs index be89294..efc40f3 100644 --- a/httpret/src/gather.rs +++ b/httpret/src/gather.rs @@ -67,7 +67,7 @@ pub async fn unused_gather_json_from_hosts(req: Request, pathpre: &str) -> } else { req }; - let req = req.header(http::header::ACCEPT, "application/json"); + let req = req.header(http::header::ACCEPT, APP_JSON); let req = req.body(Body::empty()); let task = tokio::spawn(async move { select! { @@ -100,7 +100,7 @@ pub async fn unused_gather_json_from_hosts(req: Request, pathpre: &str) -> a.push(Hres { gh: tr.0, res }); } let res = response(StatusCode::OK) - .header(http::header::CONTENT_TYPE, "application/json") + .header(http::header::CONTENT_TYPE, APP_JSON) .body(serde_json::to_string(&Jres { hosts: a })?.into())?; Ok(res) } @@ -118,8 +118,7 @@ pub async fn gather_get_json(req: Request, node_config: &NodeConfigCached) .map(|node| { let uri = format!("http://{}:{}/api/4/{}", node.host, node.port, pathsuf); let req = Request::builder().method(Method::GET).uri(uri); - let req = req.header("x-log-from-node-name", format!("{}", node_config.node_config.name)); - let req = req.header(http::header::ACCEPT, "application/json"); + let req = req.header(http::header::ACCEPT, APP_JSON); let req = req.body(Body::empty()); let task = tokio::spawn(async move { select! { @@ -187,8 +186,12 @@ where + 'static, FT: Fn(Vec>) -> Result, Error>, { - assert!(urls.len() == bodies.len()); - assert!(urls.len() == tags.len()); + if urls.len() != bodies.len() { + return Err(Error::with_msg_no_trace("unequal numbers of urls and bodies")); + } + if urls.len() != tags.len() { + return Err(Error::with_msg_no_trace("unequal numbers of urls and tags")); + } let spawned: Vec<_> = urls .into_iter() .zip(bodies.into_iter()) @@ -237,7 +240,7 @@ where (url, jh) }) .collect(); - let mut a: Vec> = vec![]; + let mut a: Vec> = Vec::new(); for (_url, jh) in spawned { let res: SubRes = match jh.await { Ok(k) => match k { @@ -281,7 +284,7 @@ mod test { }, |_all| { let res = response(StatusCode::OK) - .header(http::header::CONTENT_TYPE, "application/json") + .header(http::header::CONTENT_TYPE, APP_JSON) .body(serde_json::to_string(&42)?.into())?; Ok(res) }, diff --git a/httpret/src/httpret.rs b/httpret/src/httpret.rs index 4c6f03c..18b6098 100644 --- a/httpret/src/httpret.rs +++ b/httpret/src/httpret.rs @@ -47,6 +47,7 @@ use tracing::Instrument; use url::Url; pub const PSI_DAQBUFFER_SERVICE_MARK: &'static str = "PSI-Daqbuffer-Service-Mark"; +pub const PSI_DAQBUFFER_SEEN_URL: &'static str = "PSI-Daqbuffer-Seen-Url"; pub async fn host(node_config: NodeConfigCached) -> Result<(), Error> { static STATUS_BOARD_INIT: Once = Once::new(); @@ -231,6 +232,15 @@ macro_rules! static_http_api1 { } async fn http_service_try(req: Request, node_config: &NodeConfigCached) -> Result, Error> { + use http::HeaderValue; + let mut urlmarks = Vec::new(); + urlmarks.push(format!("{}:{}", req.method(), req.uri())); + for (k, v) in req.headers() { + if k == PSI_DAQBUFFER_SEEN_URL { + let s = String::from_utf8_lossy(v.as_bytes()); + urlmarks.push(s.into()); + } + } let ctx = ReqCtx::with_node(&req, node_config); let mut res = http_service_inner(req, &ctx, node_config).await?; let hm = res.headers_mut(); @@ -240,6 +250,10 @@ async fn http_service_try(req: Request, node_config: &NodeConfigCached) -> hm.append(PSI_DAQBUFFER_SERVICE_MARK, m.parse().unwrap()); } hm.append(PSI_DAQBUFFER_SERVICE_MARK, ctx.mark.parse().unwrap()); + for s in urlmarks { + let v = HeaderValue::from_str(&s).unwrap_or_else(|_| HeaderValue::from_static("invalid")); + hm.append(PSI_DAQBUFFER_SEEN_URL, v); + } Ok(res) } @@ -398,12 +412,21 @@ async fn http_service_inner( Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) } } else { - Ok(response(StatusCode::NOT_FOUND).body(Body::from(format!( - "Sorry, not found: {:?} {:?} {:?}", - req.method(), - req.uri().path(), - req.uri().query(), - )))?) + use std::fmt::Write; + let mut body = String::new(); + let out = &mut body; + write!(out, "
\n")?;
+        write!(out, "METHOD {:?}
\n", req.method())?; + write!(out, "URI {:?}
\n", req.uri())?; + write!(out, "HOST {:?}
\n", req.uri().host())?; + write!(out, "PORT {:?}
\n", req.uri().port())?; + write!(out, "PATH {:?}
\n", req.uri().path())?; + write!(out, "QUERY {:?}
\n", req.uri().query())?; + for (hn, hv) in req.headers() { + write!(out, "HEADER {hn:?}: {hv:?}
\n")?; + } + write!(out, "
\n")?; + Ok(response(StatusCode::NOT_FOUND).body(Body::from(body))?) } } diff --git a/httpret/src/proxy.rs b/httpret/src/proxy.rs index 4e6de32..d98f428 100644 --- a/httpret/src/proxy.rs +++ b/httpret/src/proxy.rs @@ -167,14 +167,17 @@ async fn proxy_http_service_inner( use std::fmt::Write; let mut body = String::new(); let out = &mut body; + write!(out, "
\n")?;
         write!(out, "METHOD {:?}
\n", req.method())?; - write!(out, "URI {:?}
\n", req.uri())?; - write!(out, "HOST {:?}
\n", req.uri().host())?; - write!(out, "PORT {:?}
\n", req.uri().port())?; - write!(out, "PATH {:?}
\n", req.uri().path())?; + write!(out, "URI {:?}
\n", req.uri())?; + write!(out, "HOST {:?}
\n", req.uri().host())?; + write!(out, "PORT {:?}
\n", req.uri().port())?; + write!(out, "PATH {:?}
\n", req.uri().path())?; + write!(out, "QUERY {:?}
\n", req.uri().query())?; for (hn, hv) in req.headers() { write!(out, "HEADER {hn:?}: {hv:?}
\n")?; } + write!(out, "
\n")?; Ok(response(StatusCode::NOT_FOUND).body(Body::from(body))?) } } @@ -536,6 +539,7 @@ pub async fn proxy_single_backend_query( where QT: FromUrl + AppendToUrl + HasBackend + HasTimeout, { + info!("proxy_single_backend_query"); let (head, _body) = req.into_parts(); match head.headers.get(http::header::ACCEPT) { Some(v) => { @@ -548,15 +552,40 @@ where return Ok(response_err(StatusCode::BAD_REQUEST, msg)?); } }; - // TODO is this special case used any more? + // TODO remove this special case + // SPECIAL CASE + // For pulse mapping we want to use a separate list of backends from the config file. + // In general, caller of this function should already provide the chosen list of backends. let sh = if url.as_str().contains("/map/pulse/") { get_query_host_for_backend_2(&query.backend(), proxy_config)? } else { get_query_host_for_backend(&query.backend(), proxy_config)? }; + // TODO remove this special case + // SPECIAL CASE: + // Since the inner proxy is not yet handling map-pulse requests without backend, + // we can not simply copy the original url here. + // Instead, url needs to get parsed and formatted. + // In general, the caller of this function should be able to provide a url, or maybe + // better a closure so that the url can even depend on backend. + let uri_path: String = if url.as_str().contains("/map/pulse/") { + match MapPulseQuery::from_url(&url) { + Ok(qu) => { + info!("qu {qu:?}"); + format!("/api/4/map/pulse/{}/{}", qu.backend, qu.pulse) + } + Err(e) => { + error!("{e:?}"); + String::from("/BAD") + } + } + } else { + head.uri.path().into() + }; + info!("uri_path {uri_path}"); let urls = [sh] .iter() - .map(|sh| match Url::parse(&format!("{}{}", sh, head.uri.path())) { + .map(|sh| match Url::parse(&format!("{}{}", sh, uri_path)) { Ok(mut url) => { query.append_to_url(&mut url); Ok(url) diff --git a/httpret/src/proxy/api4.rs b/httpret/src/proxy/api4.rs index e9debee..762b377 100644 --- a/httpret/src/proxy/api4.rs +++ b/httpret/src/proxy/api4.rs @@ -99,18 +99,21 @@ pub async fn channel_search(req: Request, proxy_config: &ProxyConfig) -> R pub async fn node_status(req: Request, proxy_config: &ProxyConfig) -> Result, Error> { let (head, _body) = req.into_parts(); - let vdef = header::HeaderValue::from_static(APP_JSON); - let v = head.headers.get(header::ACCEPT).unwrap_or(&vdef); - if v == APP_JSON || v == ACCEPT_ALL { - let mut bodies = vec![]; + let v = head + .headers + .get(http::header::ACCEPT) + .ok_or(Error::with_msg_no_trace("no accept header"))?; + let v = v.to_str()?; + if v.contains(APP_JSON) || v.contains(ACCEPT_ALL) { + let mut bodies = Vec::new(); let urls = proxy_config .backends_status .iter() - .map(|pb| match Url::parse(&format!("{}/api/4/node_status", pb.url)) { + .map(|b| match Url::parse(&format!("{}/api/4/node_status", b.url)) { Ok(url) => Ok(url), - Err(_) => Err(Error::with_msg(format!("parse error for: {:?}", pb))), + Err(e) => Err(Error::with_msg(format!("parse error for: {b:?} {e:?}"))), }) - .fold_ok(vec![], |mut a, x| { + .fold_ok(Vec::new(), |mut a, x| { a.push(x); bodies.push(None); a @@ -137,7 +140,7 @@ pub async fn node_status(req: Request, proxy_config: &ProxyConfig) -> Resu Box::pin(fut) as Pin + Send>> }; let ft = |all: Vec>| { - let mut sres = vec![]; + let mut sres = Vec::new(); for j in all { let val = serde_json::json!({ j.tag: j.val, @@ -145,7 +148,7 @@ pub async fn node_status(req: Request, proxy_config: &ProxyConfig) -> Resu sres.push(val); } let res = serde_json::json!({ - "THIS_PROXY": "status here...", + proxy_config.name.clone(): "TODO proxy status", "subnodes": sres, }); let res = response(StatusCode::OK) diff --git a/httpret/src/pulsemap.rs b/httpret/src/pulsemap.rs index 6fa421e..166ea35 100644 --- a/httpret/src/pulsemap.rs +++ b/httpret/src/pulsemap.rs @@ -463,8 +463,8 @@ async fn search_pulse(pulse: u64, path: &Path) -> Result, Error> { #[derive(Debug, Clone, Serialize, Deserialize)] pub struct MapPulseQuery { - backend: String, - pulse: u64, + pub backend: String, + pub pulse: u64, } impl HasBackend for MapPulseQuery { @@ -486,12 +486,19 @@ impl FromUrl for MapPulseQuery { .ok_or(Error::with_msg_no_trace("no path in url"))? .rev(); let pulsestr = pit.next().ok_or(Error::with_msg_no_trace("no pulse in url path"))?; - let backend = pit - .next() - .ok_or(Error::with_msg_no_trace("no backend in url path"))? - .into(); + let backend = pit.next().unwrap_or("sf-databuffer").into(); + //.ok_or(Error::with_msg_no_trace("no backend in url path"))? + //.into(); + // TODO !!! + // Clients MUST specify the backend + let backend = if backend == "pulse" { + String::from("sf-databuffer") + } else { + backend + }; let pulse: u64 = pulsestr.parse()?; let ret = Self { backend, pulse }; + info!("FromUrl {ret:?}"); Ok(ret) } diff --git a/netpod/src/netpod.rs b/netpod/src/netpod.rs index 6b3b321..e351880 100644 --- a/netpod/src/netpod.rs +++ b/netpod/src/netpod.rs @@ -756,6 +756,7 @@ pub enum GenVar { ConstRegular, } +// TODO move to databuffer-specific crate #[derive(Clone, Debug, Serialize, Deserialize)] pub struct ChannelConfig { pub channel: Channel,