From 44e37c7dbc7999ac3e2bbce14afa9cf8852d5776 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Thu, 7 Dec 2023 22:35:13 +0100 Subject: [PATCH] Refactor url handling --- crates/httpret/src/api1.rs | 12 ++----- crates/httpret/src/api4/binned.rs | 8 ++--- crates/httpret/src/api4/databuffer_tools.rs | 6 ++-- crates/httpret/src/api4/events.rs | 8 ++--- crates/httpret/src/api4/search.rs | 4 +-- crates/httpret/src/channel_status.rs | 5 +-- crates/httpret/src/channelconfig.rs | 15 ++++---- crates/httpret/src/httpret.rs | 3 +- crates/httpret/src/prometheus.rs | 4 +-- crates/httpret/src/proxy.rs | 5 +-- crates/httpret/src/proxy/api4.rs | 3 +- crates/httpret/src/pulsemap.rs | 40 ++++++++++++++------- crates/netpod/Cargo.toml | 2 +- crates/netpod/src/netpod.rs | 13 +++++++ 14 files changed, 71 insertions(+), 57 deletions(-) diff --git a/crates/httpret/src/api1.rs b/crates/httpret/src/api1.rs index 52d4bcc..f165578 100644 --- a/crates/httpret/src/api1.rs +++ b/crates/httpret/src/api1.rs @@ -70,6 +70,7 @@ use std::time::Instant; use taskrun::tokio; use tracing_futures::Instrument; use url::Url; +use netpod::req_uri_to_url; pub trait BackendAware { fn backend(&self) -> &str; @@ -912,12 +913,7 @@ impl Api1EventsBinaryHandler { } else { tracing::Span::none() }; - let url = { - let s1 = format!("dummy:{}", head.uri); - Url::parse(&s1) - .map_err(Error::from) - .map_err(|e| e.add_public_msg(format!("Can not parse query url")))? - }; + let url = req_uri_to_url(&head.uri)?; let disk_tune = DiskIoTune::from_url(&url)?; let reqidspan = tracing::info_span!("api1query", reqid = ctx.reqid()); // TODO do not clone here @@ -960,16 +956,12 @@ impl Api1EventsBinaryHandler { let beg_date = qu.range().beg().clone(); let end_date = qu.range().end().clone(); trace!("{self_name} beg_date {:?} end_date {:?}", beg_date, end_date); - //let url = Url::parse(&format!("dummy:{}", req.uri()))?; - //let query = PlainEventsBinaryQuery::from_url(&url)?; if accept.contains(APP_OCTET) || accept.contains(ACCEPT_ALL) { let beg = beg_date.timestamp() as u64 * SEC + beg_date.timestamp_subsec_nanos() as u64; let end = end_date.timestamp() as u64 * SEC + end_date.timestamp_subsec_nanos() as u64; let range = NanoRange { beg, end }; // TODO check for valid given backend name: let backend = &ncc.node_config.cluster.backend; - // TODO ask for channel config quorum for all channels up front. - //httpclient::http_get(url, accept); let ts1 = Instant::now(); let mut chans = Vec::new(); for ch in qu.channels() { diff --git a/crates/httpret/src/api4/binned.rs b/crates/httpret/src/api4/binned.rs index d4fbc3f..ff21844 100644 --- a/crates/httpret/src/api4/binned.rs +++ b/crates/httpret/src/api4/binned.rs @@ -11,6 +11,7 @@ use httpclient::Requ; use httpclient::StreamResponse; use httpclient::ToJsonBody; use netpod::log::*; +use netpod::req_uri_to_url; use netpod::timeunits::SEC; use netpod::FromUrl; use netpod::NodeConfigCached; @@ -58,12 +59,7 @@ async fn binned_json( } async fn binned(req: Requ, ctx: &ReqCtx, node_config: &NodeConfigCached) -> Result { - let url = { - let s1 = format!("dummy:{}", req.uri()); - Url::parse(&s1) - .map_err(Error::from) - .map_err(|e| e.add_public_msg(format!("Can not parse query url")))? - }; + let url = req_uri_to_url(req.uri())?; if req .uri() .path_and_query() diff --git a/crates/httpret/src/api4/databuffer_tools.rs b/crates/httpret/src/api4/databuffer_tools.rs index 2309ef1..0b08513 100644 --- a/crates/httpret/src/api4/databuffer_tools.rs +++ b/crates/httpret/src/api4/databuffer_tools.rs @@ -16,6 +16,7 @@ use httpclient::body_stream; use httpclient::Requ; use httpclient::StreamResponse; use netpod::log::*; +use netpod::req_uri_to_url; use netpod::Node; use netpod::NodeConfigCached; use netpod::ACCEPT_ALL; @@ -88,10 +89,7 @@ impl FindActiveHandler { .headers() .get(http::header::ACCEPT) .map_or(accept_def, |k| k.to_str().unwrap_or(accept_def)); - let _url = { - let s1 = format!("dummy:{}", req.uri()); - Url::parse(&s1)? - }; + let _url = req_uri_to_url(req.uri()).map_err(|_| FindActiveError::HttpBadUrl)?; if accept.contains(APP_JSON) || accept.contains(ACCEPT_ALL) { type _A = netpod::BodyStream; let stream = FindActiveStream::new(40, 2, ncc); diff --git a/crates/httpret/src/api4/events.rs b/crates/httpret/src/api4/events.rs index 14aba3b..52a9780 100644 --- a/crates/httpret/src/api4/events.rs +++ b/crates/httpret/src/api4/events.rs @@ -14,6 +14,7 @@ use httpclient::Requ; use httpclient::StreamResponse; use httpclient::ToJsonBody; use netpod::log::*; +use netpod::req_uri_to_url; use netpod::FromUrl; use netpod::NodeConfigCached; use netpod::ReqCtx; @@ -59,12 +60,7 @@ async fn plain_events(req: Requ, ctx: &ReqCtx, node_config: &NodeConfigCached) - .headers() .get(http::header::ACCEPT) .map_or(accept_def, |k| k.to_str().unwrap_or(accept_def)); - let url = { - let s1 = format!("dummy:{}", req.uri()); - Url::parse(&s1) - .map_err(Error::from) - .map_err(|e| e.add_public_msg(format!("Can not parse query url")))? - }; + let url = req_uri_to_url(req.uri())?; if accept.contains(APP_JSON) || accept.contains(ACCEPT_ALL) { Ok(plain_events_json(url, req, ctx, node_config).await?) } else if accept == APP_OCTET { diff --git a/crates/httpret/src/api4/search.rs b/crates/httpret/src/api4/search.rs index b8b3dab..3a9ab82 100644 --- a/crates/httpret/src/api4/search.rs +++ b/crates/httpret/src/api4/search.rs @@ -9,15 +9,15 @@ use httpclient::Requ; use httpclient::StreamResponse; use httpclient::ToJsonBody; use netpod::log::*; +use netpod::req_uri_to_url; use netpod::ChannelSearchQuery; use netpod::ChannelSearchResult; use netpod::NodeConfigCached; use netpod::ACCEPT_ALL; use netpod::APP_JSON; -use url::Url; pub async fn channel_search(req: Requ, node_config: &NodeConfigCached) -> Result { - let url = Url::parse(&format!("dummy://{}", req.uri()))?; + let url = req_uri_to_url(req.uri())?; let query = ChannelSearchQuery::from_url(&url)?; info!("search query: {:?}", query); let res = dbconn::search::search_channel(query, node_config).await?; diff --git a/crates/httpret/src/channel_status.rs b/crates/httpret/src/channel_status.rs index 08b42b6..26f4f31 100644 --- a/crates/httpret/src/channel_status.rs +++ b/crates/httpret/src/channel_status.rs @@ -16,6 +16,7 @@ use items_2::channelevents::ChannelStatusEvents; use items_2::channelevents::ConnStatusEvent; use netpod::log::*; use netpod::query::ChannelStateEventsQuery; +use netpod::req_uri_to_url; use netpod::FromUrl; use netpod::NodeConfigCached; use netpod::ACCEPT_ALL; @@ -46,7 +47,7 @@ impl ConnectionStatusEvents { .get(http::header::ACCEPT) .map_or(accept_def, |k| k.to_str().unwrap_or(accept_def)); if accept.contains(APP_JSON) || accept.contains(ACCEPT_ALL) { - let url = Url::parse(&format!("dummy:{}", req.uri()))?; + let url = req_uri_to_url(req.uri())?; let q = ChannelStateEventsQuery::from_url(&url)?; match self.fetch_data(&q, node_config).await { Ok(k) => { @@ -120,7 +121,7 @@ impl ChannelStatusEventsHandler { .get(http::header::ACCEPT) .map_or(accept_def, |k| k.to_str().unwrap_or(accept_def)); if accept.contains(APP_JSON) || accept.contains(ACCEPT_ALL) { - let url = Url::parse(&format!("dummy:{}", req.uri()))?; + let url = req_uri_to_url(req.uri())?; let q = ChannelStateEventsQuery::from_url(&url)?; match self.fetch_data(&q, node_config).await { Ok(k) => { diff --git a/crates/httpret/src/channelconfig.rs b/crates/httpret/src/channelconfig.rs index 41848ef..e209143 100644 --- a/crates/httpret/src/channelconfig.rs +++ b/crates/httpret/src/channelconfig.rs @@ -14,6 +14,7 @@ use httpclient::ToJsonBody; use netpod::get_url_query_pairs; use netpod::log::*; use netpod::query::prebinned::PreBinnedQuery; +use netpod::req_uri_to_url; use netpod::timeunits::*; use netpod::ChannelConfigQuery; use netpod::ChannelConfigResponse; @@ -97,7 +98,7 @@ impl ChannelConfigHandler { } async fn channel_config(&self, req: Requ, node_config: &NodeConfigCached) -> Result { - let url = Url::parse(&format!("dummy:{}", req.uri()))?; + let url = req_uri_to_url(req.uri())?; let q = ChannelConfigQuery::from_url(&url)?; let conf = nodenet::channelconfig::channel_config(q.range.clone(), q.channel.clone(), node_config).await?; match conf { @@ -154,7 +155,7 @@ impl ChannelConfigsHandler { async fn channel_configs(&self, req: Requ, ncc: &NodeConfigCached) -> Result { info!("channel_configs"); - let url = Url::parse(&format!("dummy:{}", req.uri()))?; + let url = req_uri_to_url(req.uri())?; let q = ChannelConfigQuery::from_url(&url)?; info!("channel_configs for q {q:?}"); let ch_confs = nodenet::channelconfig::channel_configs(q.channel, ncc).await?; @@ -211,7 +212,7 @@ impl ChannelConfigQuorumHandler { ncc: &NodeConfigCached, ) -> Result { info!("channel_config_quorum"); - let url = Url::parse(&format!("dummy:{}", req.uri()))?; + let url = req_uri_to_url(req.uri())?; let q = ChannelConfigQuery::from_url(&url)?; info!("channel_config_quorum for q {q:?}"); let ch_confs = nodenet::configquorum::find_config_basics_quorum(q.channel, q.range.into(), ctx, ncc).await?; @@ -354,7 +355,7 @@ impl ScyllaChannelsWithType { .get(http::header::ACCEPT) .map_or(accept_def, |k| k.to_str().unwrap_or(accept_def)); if accept == APP_JSON || accept == ACCEPT_ALL { - let url = Url::parse(&format!("dummy:{}", req.uri()))?; + let url = req_uri_to_url(req.uri())?; let q = ChannelsWithTypeQuery::from_url(&url)?; let res = self .get_channels(&q, &node_config.node_config.cluster.backend, node_config) @@ -512,7 +513,7 @@ impl ScyllaChannelsActive { .get(http::header::ACCEPT) .map_or(accept_def, |k| k.to_str().unwrap_or(accept_def)); if accept == APP_JSON || accept == ACCEPT_ALL { - let url = Url::parse(&format!("dummy:{}", req.uri()))?; + let url = req_uri_to_url(req.uri())?; let q = ScyllaChannelsActiveQuery::from_url(&url)?; let res = self.get_channels(&q, node_config).await?; let body = ToJsonBody::from(&res).into_body(); @@ -615,7 +616,7 @@ impl IocForChannel { .get(http::header::ACCEPT) .map_or(accept_def, |k| k.to_str().unwrap_or(accept_def)); if accept == APP_JSON || accept == ACCEPT_ALL { - let url = Url::parse(&format!("dummy:{}", req.uri()))?; + let url = req_uri_to_url(req.uri())?; let q = IocForChannelQuery::from_url(&url)?; match self.find(&q, node_config).await { Ok(k) => { @@ -704,7 +705,7 @@ impl ScyllaSeriesTsMsp { .get(http::header::ACCEPT) .map_or(accept_def, |k| k.to_str().unwrap_or(accept_def)); if accept == APP_JSON || accept == ACCEPT_ALL { - let url = Url::parse(&format!("dummy:{}", req.uri()))?; + let url = req_uri_to_url(req.uri())?; let q = ScyllaSeriesTsMspQuery::from_url(&url)?; match self.get_ts_msps(&q, node_config).await { Ok(k) => { diff --git a/crates/httpret/src/httpret.rs b/crates/httpret/src/httpret.rs index d6e9047..f70f04d 100644 --- a/crates/httpret/src/httpret.rs +++ b/crates/httpret/src/httpret.rs @@ -38,6 +38,7 @@ use hyper_util::rt::TokioIo; use net::SocketAddr; use netpod::log::*; use netpod::query::prebinned::PreBinnedQuery; +use netpod::req_uri_to_url; use netpod::NodeConfigCached; use netpod::ReqCtx; use netpod::ServiceVersion; @@ -558,7 +559,7 @@ async fn prebinned_inner( _node_config: &NodeConfigCached, ) -> Result { let (head, _body) = req.into_parts(); - let url: url::Url = format!("dummy://{}", head.uri).parse()?; + let url = req_uri_to_url(&head.uri)?; let query = PreBinnedQuery::from_url(&url)?; let span1 = span!(Level::INFO, "httpret::prebinned", desc = &query.patch().span_desc()); span1.in_scope(|| { diff --git a/crates/httpret/src/prometheus.rs b/crates/httpret/src/prometheus.rs index 80fd74d..42f53bb 100644 --- a/crates/httpret/src/prometheus.rs +++ b/crates/httpret/src/prometheus.rs @@ -275,7 +275,7 @@ impl QueryHandler { pub async fn handle(&self, req: Request) -> Result, RetrievalError> { info!("{} for {:?}", std::any::type_name::(), req); - let url = url::Url::parse(&format!("dummy://{}", &req.uri())); + let url = req_uri_to_url(req.uri())?; info!("/api/v1/query parsed url: {:?}", url); let body = read_body_bytes(req.into_body()).await?; let body_str = String::from_utf8_lossy(&body); @@ -307,7 +307,7 @@ impl QueryRangeHandler { pub async fn handle(&self, req: Request) -> Result, RetrievalError> { info!("{} for {:?}", std::any::type_name::(), req); - let url = url::Url::parse(&format!("dummy://{}", &req.uri())); + let url = req_uri_to_url(req.uri())?; info!("/api/v1/query_range parsed url: {:?}", url); let body = read_body_bytes(req.into_body()).await?; let body_str = String::from_utf8_lossy(&body); diff --git a/crates/httpret/src/proxy.rs b/crates/httpret/src/proxy.rs index 20537fe..92015d0 100644 --- a/crates/httpret/src/proxy.rs +++ b/crates/httpret/src/proxy.rs @@ -33,6 +33,7 @@ use hyper_util::rt::TokioIo; use itertools::Itertools; use netpod::log::*; use netpod::query::ChannelStateEventsQuery; +use netpod::req_uri_to_url; use netpod::AppendToUrl; use netpod::ChannelConfigQuery; use netpod::ChannelSearchQuery; @@ -327,7 +328,7 @@ pub async fn channel_search(req: Requ, proxy_config: &ProxyConfig) -> Result { if v == APP_JSON { - let url = Url::parse(&format!("dummy:{}", head.uri))?; + let url = req_uri_to_url(&head.uri)?; let query = ChannelSearchQuery::from_url(&url)?; let mut methods = Vec::new(); let mut bodies = Vec::new(); @@ -509,7 +510,7 @@ where match head.headers.get(http::header::ACCEPT) { Some(v) => { if v == APP_JSON || v == ACCEPT_ALL { - let url = Url::parse(&format!("dummy:{}", head.uri))?; + let url = req_uri_to_url(&head.uri)?; let query = match QT::from_url(&url) { Ok(k) => k, Err(_) => { diff --git a/crates/httpret/src/proxy/api4.rs b/crates/httpret/src/proxy/api4.rs index 682236a..a8efc17 100644 --- a/crates/httpret/src/proxy/api4.rs +++ b/crates/httpret/src/proxy/api4.rs @@ -19,6 +19,7 @@ use httpclient::StreamResponse; use httpclient::ToJsonBody; use hyper::body::Incoming; use netpod::log::*; +use netpod::req_uri_to_url; use netpod::ChannelSearchQuery; use netpod::ChannelSearchResult; use netpod::NodeStatus; @@ -42,7 +43,7 @@ use url::Url; pub async fn channel_search(req: Requ, proxy_config: &ProxyConfig) -> Result { let (head, _body) = req.into_parts(); - let inpurl = Url::parse(&format!("dummy:{}", head.uri))?; + let inpurl = req_uri_to_url(&head.uri)?; let query = ChannelSearchQuery::from_url(&inpurl)?; let mut urls = Vec::new(); let mut tags = Vec::new(); diff --git a/crates/httpret/src/pulsemap.rs b/crates/httpret/src/pulsemap.rs index a11a211..db0de04 100644 --- a/crates/httpret/src/pulsemap.rs +++ b/crates/httpret/src/pulsemap.rs @@ -21,6 +21,7 @@ use httpclient::read_body_bytes; use httpclient::StreamResponse; use hyper::Request; use netpod::log::*; +use netpod::req_uri_to_url; use netpod::timeunits::SEC; use netpod::AppendToUrl; use netpod::FromUrl; @@ -912,8 +913,7 @@ impl MapPulseScyllaHandler { if req.method() != Method::GET { return Ok(response(StatusCode::NOT_ACCEPTABLE).body(body_empty())?); } - let urls = format!("dummy://{}", req.uri()); - let url = url::Url::parse(&urls)?; + let url = req_uri_to_url(req.uri())?; let query = MapPulseQuery::from_url(&url)?; let pulse = query.pulse; let scyconf = if let Some(x) = node_config.node_config.cluster.scylla.as_ref() { @@ -949,6 +949,24 @@ impl MapPulseScyllaHandler { } } +fn extract_path_number_after_prefix(req: &Requ, prefix: &str) -> Result { + let v: Vec<_> = req.uri().path().split(prefix).collect(); + if v.len() < 2 { + return Err(Error::with_msg_no_trace(format!( + "extract_path_number_after_prefix can not understand url {} prefix {}", + req.uri(), + prefix + ))); + } + v[1].parse().map_err(|_| { + Error::with_public_msg_no_trace(format!( + "extract_path_number_after_prefix can not understand url {} prefix {}", + req.uri(), + prefix + )) + }) +} + pub struct MapPulseLocalHttpFunction {} impl MapPulseLocalHttpFunction { @@ -964,10 +982,7 @@ impl MapPulseLocalHttpFunction { if req.method() != Method::GET { return Ok(response(StatusCode::NOT_ACCEPTABLE).body(body_empty())?); } - let urls = req.uri().to_string(); - let pulse: u64 = urls[MAP_PULSE_LOCAL_URL_PREFIX.len()..] - .parse() - .map_err(|_| Error::with_public_msg_no_trace(format!("can not understand pulse map url: {}", req.uri())))?; + let pulse = extract_path_number_after_prefix(&req, MAP_PULSE_LOCAL_URL_PREFIX)?; let req_from = req.headers().get("x-req-from").map_or(None, |x| Some(format!("{x:?}"))); let ts1 = Instant::now(); let conn = dbconn::create_connection(&node_config.node_config.cluster.database).await?; @@ -1123,8 +1138,7 @@ impl MapPulseHistoHttpFunction { if req.method() != Method::GET { return Ok(response(StatusCode::NOT_ACCEPTABLE).body(body_empty())?); } - let urls = format!("{}", req.uri()); - let pulse: u64 = urls[MAP_PULSE_HISTO_URL_PREFIX.len()..].parse()?; + let pulse = extract_path_number_after_prefix(&req, MAP_PULSE_HISTO_URL_PREFIX)?; let ret = Self::histo(pulse, node_config).await?; Ok(response(StatusCode::OK).body(body_string(serde_json::to_string(&ret)?))?) } @@ -1216,8 +1230,7 @@ impl MapPulseHttpFunction { return Ok(response(StatusCode::NOT_ACCEPTABLE).body(body_empty())?); } trace!("MapPulseHttpFunction handle uri: {:?}", req.uri()); - let urls = format!("{}", req.uri()); - let pulse: u64 = urls[MAP_PULSE_URL_PREFIX.len()..].parse()?; + let pulse = extract_path_number_after_prefix(&req, MAP_PULSE_URL_PREFIX)?; match CACHE.portal(pulse) { CachePortal::Fresh => { let histo = MapPulseHistoHttpFunction::histo(pulse, node_config).await?; @@ -1348,8 +1361,8 @@ impl Api4MapPulseHttpFunction { return Ok(response(StatusCode::NOT_ACCEPTABLE).body(body_empty())?); } let ts1 = Instant::now(); - trace!("Api4MapPulseHttpFunction handle uri: {:?}", req.uri()); - let url = Url::parse(&format!("dummy:{}", req.uri()))?; + debug!("Api4MapPulseHttpFunction handle uri: {:?}", req.uri()); + let url = req_uri_to_url(req.uri())?; let q = MapPulseQuery::from_url(&url)?; let ret = match Self::find_timestamp(q, ncc).await { Ok(Some(val)) => Ok(response(StatusCode::OK).body(body_string(serde_json::to_string(&val)?))?), @@ -1399,7 +1412,8 @@ impl Api4MapPulse2HttpFunction { return Ok(response(StatusCode::NOT_ACCEPTABLE).body(body_empty())?); } let ts1 = Instant::now(); - let url = Url::parse(&format!("dummy:{}", req.uri()))?; + debug!("Api4MapPulse2HttpFunction handle uri: {:?}", req.uri()); + let url = req_uri_to_url(req.uri())?; let q = MapPulseQuery::from_url(&url)?; let ret = match Api4MapPulseHttpFunction::find_timestamp(q, ncc).await { Ok(Some(val)) => { diff --git a/crates/netpod/Cargo.toml b/crates/netpod/Cargo.toml index 939b009..90966c1 100644 --- a/crates/netpod/Cargo.toml +++ b/crates/netpod/Cargo.toml @@ -17,7 +17,7 @@ bytes = "1.4.0" chrono = { version = "0.4.19", features = ["serde"] } futures-util = "0.3.14" tracing = "0.1.37" -url = "2.2" +url = "2.5.0" num-traits = "0.2.16" hex = "0.4.3" err = { path = "../err" } diff --git a/crates/netpod/src/netpod.rs b/crates/netpod/src/netpod.rs index 813e704..1eb0094 100644 --- a/crates/netpod/src/netpod.rs +++ b/crates/netpod/src/netpod.rs @@ -18,6 +18,7 @@ use err::Error; use futures_util::Stream; use futures_util::StreamExt; use http::Request; +use http::Uri; use range::evrange::NanoRange; use range::evrange::PulseRange; use range::evrange::SeriesRange; @@ -3188,3 +3189,15 @@ impl ReqCtx { } pub type ReqCtxArc = std::sync::Arc; + +pub fn req_uri_to_url(uri: &Uri) -> Result { + if uri.scheme().is_none() { + format!("dummy:{uri}") + .parse() + .map_err(|_| Error::with_msg_no_trace(format!("can not use uri {uri}"))) + } else { + uri.to_string() + .parse() + .map_err(|_| Error::with_msg_no_trace(format!("can not use uri {uri}"))) + } +}