From ebb1ce89dc07554b7d7a015002dd2c4d85c6c2d3 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Wed, 23 Jun 2021 07:32:18 +0200 Subject: [PATCH] Remove too simplistic url parse --- daqbuffer/Cargo.toml | 1 + daqbuffer/src/test/events.rs | 11 +++++---- disk/src/events.rs | 44 ++++++++++++++++++------------------ httpret/src/lib.rs | 31 ++++++++++++------------- netpod/src/lib.rs | 19 +--------------- 5 files changed, 45 insertions(+), 61 deletions(-) diff --git a/daqbuffer/Cargo.toml b/daqbuffer/Cargo.toml index 47d6fac..296fa2d 100644 --- a/daqbuffer/Cargo.toml +++ b/daqbuffer/Cargo.toml @@ -21,6 +21,7 @@ serde = "1.0" serde_derive = "1.0" serde_json = "1.0" chrono = "0.4" +url = "2.2.2" clap = "3.0.0-beta.2" lazy_static = "1.4.0" err = { path = "../err" } diff --git a/daqbuffer/src/test/events.rs b/daqbuffer/src/test/events.rs index abcd1f2..1ceed64 100644 --- a/daqbuffer/src/test/events.rs +++ b/daqbuffer/src/test/events.rs @@ -13,11 +13,12 @@ use futures_util::{StreamExt, TryStreamExt}; use http::StatusCode; use hyper::Body; use netpod::log::*; -use netpod::{Channel, Cluster, HostPort, NanoRange, PerfOpts}; +use netpod::{AppendToUrl, Channel, Cluster, HostPort, NanoRange, PerfOpts, APP_OCTET}; use serde_json::Value as JsonValue; use std::fmt::Debug; use std::future::ready; use tokio::io::AsyncRead; +use url::Url; #[test] fn get_plain_events_binary_0() { @@ -65,12 +66,14 @@ where let range = NanoRange::from_date_time(beg_date, end_date); let query = PlainEventsBinaryQuery::new(channel, range); let hp = HostPort::from_node(node0); - let url = query.url(&hp); + let mut url = Url::parse(&format!("http://{}:{}", hp.host, hp.port))?; + query.append_to_url(&mut url); + let url = url; info!("get_plain_events get {}", url); let req = hyper::Request::builder() .method(http::Method::GET) - .uri(url) - .header("Accept", "application/octet-stream") + .uri(url.to_string()) + .header(http::header::ACCEPT, APP_OCTET) .body(Body::empty())?; let client = hyper::Client::new(); let res = client.request(req).await?; diff --git a/disk/src/events.rs b/disk/src/events.rs index 3200771..f514869 100644 --- a/disk/src/events.rs +++ b/disk/src/events.rs @@ -1,6 +1,5 @@ use chrono::{DateTime, TimeZone, Utc}; use err::Error; -use netpod::log::*; use netpod::{ channel_from_pairs, get_url_query_pairs, AppendToUrl, Channel, FromUrl, HasBackend, HasTimeout, HostPort, NanoRange, ToNanos, @@ -27,25 +26,22 @@ impl PlainEventsBinaryQuery { } } - pub fn from_request(req: &http::request::Parts) -> Result { - // TODO use Url - let params = netpod::query_params(req.uri.query()); - info!("PlainEventsBinaryQuery from_request uri: {:?}", req.uri); - info!("PlainEventsBinaryQuery from_request params: {:?}", params); - let beg_date = params.get("begDate").ok_or(Error::with_msg("missing begDate"))?; - let end_date = params.get("endDate").ok_or(Error::with_msg("missing endDate"))?; + pub fn from_url(url: &Url) -> Result { + let pairs = get_url_query_pairs(url); + let beg_date = pairs.get("begDate").ok_or(Error::with_msg("missing begDate"))?; + let end_date = pairs.get("endDate").ok_or(Error::with_msg("missing endDate"))?; let ret = Self { range: NanoRange { beg: beg_date.parse::>()?.to_nanos(), end: end_date.parse::>()?.to_nanos(), }, - channel: channel_from_pairs(¶ms)?, - report_error: params + channel: channel_from_pairs(&pairs)?, + report_error: pairs .get("reportError") .map_or("false", |k| k) .parse() .map_err(|e| Error::with_msg(format!("can not parse reportError {:?}", e)))?, - timeout: params + timeout: pairs .get("timeout") .map_or("10000", |k| k) .parse::() @@ -74,19 +70,23 @@ impl PlainEventsBinaryQuery { pub fn set_timeout(&mut self, k: Duration) { self.timeout = k; } +} - pub fn url(&self, host: &HostPort) -> String { +impl AppendToUrl for PlainEventsBinaryQuery { + fn append_to_url(&self, url: &mut Url) { let date_fmt = "%Y-%m-%dT%H:%M:%S.%3fZ"; - format!( - "http://{}:{}/api/4/events?channelBackend={}&channelName={}&begDate={}&endDate={}&timeout={}", - host.host, - host.port, - self.channel.backend, - self.channel.name, - Utc.timestamp_nanos(self.range.beg as i64).format(date_fmt), - Utc.timestamp_nanos(self.range.end as i64).format(date_fmt), - self.timeout.as_millis(), - ) + let mut g = url.query_pairs_mut(); + g.append_pair("channelBackend", &self.channel.backend); + g.append_pair("channelName", &self.channel.name); + g.append_pair( + "begDate", + &Utc.timestamp_nanos(self.range.beg as i64).format(date_fmt).to_string(), + ); + g.append_pair( + "endDate", + &Utc.timestamp_nanos(self.range.end as i64).format(date_fmt).to_string(), + ); + g.append_pair("timeout", &format!("{}", self.timeout.as_millis())); } } diff --git a/httpret/src/lib.rs b/httpret/src/lib.rs index 1ea52cf..e3833f5 100644 --- a/httpret/src/lib.rs +++ b/httpret/src/lib.rs @@ -13,7 +13,7 @@ use hyper::service::{make_service_fn, service_fn}; use hyper::{server::Server, Body, Request, Response}; use net::SocketAddr; use netpod::log::*; -use netpod::{AggKind, Channel, FromUrl, NodeConfigCached, APP_JSON, APP_OCTET}; +use netpod::{get_url_query_pairs, AggKind, Channel, FromUrl, NodeConfigCached, APP_JSON, APP_JSON_LINES, APP_OCTET}; use panic::{AssertUnwindSafe, UnwindSafe}; use pin::Pin; use serde::{Deserialize, Serialize}; @@ -417,8 +417,8 @@ async fn plain_events(req: Request, node_config: &NodeConfigCached) -> Res } async fn plain_events_binary(req: Request, node_config: &NodeConfigCached) -> Result, Error> { - let (head, _body) = req.into_parts(); - let query = PlainEventsBinaryQuery::from_request(&head)?; + let url = Url::parse(&format!("dummy:{}", req.uri()))?; + let query = PlainEventsBinaryQuery::from_url(&url)?; let op = disk::channelexec::PlainEvents::new(query.channel().clone(), query.range().clone(), node_config.clone()); let s = disk::channelexec::channel_exec(op, query.channel(), query.range(), AggKind::Plain, node_config).await?; let s = s.map(|item| item.make_frame()); @@ -500,7 +500,7 @@ pub async fn update_db_with_channel_names( dbconn::scan::update_db_with_channel_names(node_config.clone(), &node_config.node_config.cluster.database) .await?; let ret = response(StatusCode::OK) - .header(http::header::CONTENT_TYPE, "application/jsonlines") + .header(http::header::CONTENT_TYPE, APP_JSON_LINES) .body(Body::wrap_stream(res.map(|k| match serde_json::to_string(&k) { Ok(mut item) => { item.push('\n'); @@ -522,7 +522,7 @@ pub async fn update_db_with_channel_names_3( }; let res = dbconn::scan::update_db_with_channel_names_3(node_config); let ret = response(StatusCode::OK) - .header(http::header::CONTENT_TYPE, "application/jsonlines") + .header(http::header::CONTENT_TYPE, APP_JSON_LINES) .body(Body::wrap_stream(res.map(|k| match serde_json::to_string(&k) { Ok(mut item) => { item.push('\n'); @@ -544,7 +544,7 @@ pub async fn update_db_with_all_channel_configs( }; let res = dbconn::scan::update_db_with_all_channel_configs(node_config.clone()).await?; let ret = response(StatusCode::OK) - .header(http::header::CONTENT_TYPE, "application/jsonlines") + .header(http::header::CONTENT_TYPE, APP_JSON_LINES) .body(Body::wrap_stream(res.map(|k| match serde_json::to_string(&k) { Ok(mut item) => { item.push('\n'); @@ -569,15 +569,12 @@ pub async fn update_search_cache(req: Request, node_config: &NodeConfigCac } pub async fn channel_config(req: Request, node_config: &NodeConfigCached) -> Result, Error> { - let (head, _body) = req.into_parts(); - let _dry = match head.uri.query() { - Some(q) => q.contains("dry"), - None => false, - }; - let params = netpod::query_params(head.uri.query()); + let url = Url::parse(&format!("dummy:{}", req.uri()))?; + let pairs = get_url_query_pairs(&url); + let _dry = pairs.contains_key("dry"); let channel = Channel { backend: node_config.node.backend.clone(), - name: params.get("channelName").unwrap().into(), + name: pairs.get("channelName").unwrap().into(), }; let res = parse::channelconfig::read_local_config(&channel, &node_config.node).await?; let ret = response(StatusCode::OK) @@ -587,12 +584,12 @@ pub async fn channel_config(req: Request, node_config: &NodeConfigCached) } pub async fn ca_connect_1(req: Request, node_config: &NodeConfigCached) -> Result, Error> { - let (head, _body) = req.into_parts(); - let params = netpod::query_params(head.uri.query()); - let addr = params.get("addr").unwrap().into(); + let url = Url::parse(&format!("dummy:{}", req.uri()))?; + let pairs = get_url_query_pairs(&url); + let addr = pairs.get("addr").unwrap().into(); let res = netfetch::ca_connect_1(addr, node_config).await?; let ret = response(StatusCode::OK) - .header(http::header::CONTENT_TYPE, "application/jsonlines") + .header(http::header::CONTENT_TYPE, APP_JSON_LINES) .body(Body::wrap_stream(res.map(|k| match serde_json::to_string(&k) { Ok(mut item) => { item.push('\n'); diff --git a/netpod/src/lib.rs b/netpod/src/lib.rs index 8838ca0..89fb221 100644 --- a/netpod/src/lib.rs +++ b/netpod/src/lib.rs @@ -20,6 +20,7 @@ pub mod status; pub mod streamext; pub const APP_JSON: &'static str = "application/json"; +pub const APP_JSON_LINES: &'static str = "application/jsonlines"; pub const APP_OCTET: &'static str = "application/octet-stream"; #[derive(Clone, Debug, Serialize, Deserialize)] @@ -726,24 +727,6 @@ impl FromStr for AggKind { } } -pub fn query_params(q: Option<&str>) -> std::collections::BTreeMap { - let mut map = std::collections::BTreeMap::new(); - match q { - Some(k) => { - for par in k.split("&") { - let mut u = par.split("="); - if let Some(t1) = u.next() { - if let Some(t2) = u.next() { - map.insert(t1.into(), t2.into()); - } - } - } - } - None => {} - } - map -} - pub trait ToNanos { fn to_nanos(&self) -> u64; }