From e7891fee13d0e5c202afbbc09258172f2d5623d8 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Fri, 18 Jun 2021 21:13:01 +0200 Subject: [PATCH] WIP on proxy and url handling --- daqbuffer/src/lib.rs | 2 +- disk/Cargo.toml | 1 + disk/src/binned/query.rs | 38 ++++--- err/Cargo.toml | 1 + err/src/lib.rs | 6 ++ httpret/src/api1.rs | 227 ++++++++++++++++++++++++++++----------- httpret/src/gather.rs | 19 ++-- httpret/src/lib.rs | 64 +---------- httpret/src/proxy.rs | 203 ++++++++++++++++++++++++++++++---- httpret/src/search.rs | 7 +- netpod/src/lib.rs | 48 ++++++++- 11 files changed, 444 insertions(+), 172 deletions(-) diff --git a/daqbuffer/src/lib.rs b/daqbuffer/src/lib.rs index b8a8ecb..fc03ce7 100644 --- a/daqbuffer/src/lib.rs +++ b/daqbuffer/src/lib.rs @@ -31,6 +31,6 @@ pub async fn run_node(node_config: NodeConfigCached) -> Result<(), Error> { } pub async fn run_proxy(proxy_config: ProxyConfig) -> Result<(), Error> { - httpret::proxy(proxy_config).await?; + httpret::proxy::proxy(proxy_config).await?; Ok(()) } diff --git a/disk/Cargo.toml b/disk/Cargo.toml index 21da2de..a361243 100644 --- a/disk/Cargo.toml +++ b/disk/Cargo.toml @@ -30,6 +30,7 @@ hex = "0.4.3" nom = "6.1.2" num-traits = "0.2" num-derive = "0.3" +url = "2.2.2" tiny-keccak = { version = "2.0", features = ["sha3"] } err = { path = "../err" } taskrun = { path = "../taskrun" } diff --git a/disk/src/binned/query.rs b/disk/src/binned/query.rs index fbc4b4b..b80ddf9 100644 --- a/disk/src/binned/query.rs +++ b/disk/src/binned/query.rs @@ -1,10 +1,12 @@ use crate::query::channel_from_params; use chrono::{DateTime, TimeZone, Utc}; use err::Error; +use http::request::Parts; use netpod::log::*; use netpod::{AggKind, ByteSize, Channel, HostPort, NanoRange, PreBinnedPatchCoord, ToNanos}; use std::collections::BTreeMap; use std::time::Duration; +use url::Url; #[derive(Clone, Debug)] pub struct PreBinnedQuery { @@ -35,21 +37,25 @@ impl PreBinnedQuery { } } - pub fn from_request(req: &http::request::Parts) -> Result { - let params = netpod::query_params(req.uri.query()); - let patch_ix = params - .get("patchIx") - .ok_or(Error::with_msg("missing patchIx"))? - .parse()?; - let bin_t_len = params + pub fn from_url(url: &Url) -> Result { + let mut pairs = BTreeMap::new(); + for (j, k) in url.query_pairs() { + pairs.insert(j.to_string(), k.to_string()); + } + let pairs = pairs; + let bin_t_len = pairs .get("binTlen") .ok_or(Error::with_msg("missing binTlen"))? .parse()?; - let patch_t_len = params + let patch_t_len = pairs .get("patchTlen") .ok_or(Error::with_msg("missing patchTlen"))? .parse()?; - let disk_stats_every = params + let patch_ix = pairs + .get("patchIx") + .ok_or(Error::with_msg("missing patchIx"))? + .parse()?; + let disk_stats_every = pairs .get("diskStatsEveryKb") .ok_or(Error::with_msg("missing diskStatsEveryKb"))?; let disk_stats_every = disk_stats_every @@ -57,11 +63,11 @@ impl PreBinnedQuery { .map_err(|e| Error::with_msg(format!("can not parse diskStatsEveryKb {:?}", e)))?; let ret = Self { patch: PreBinnedPatchCoord::new(bin_t_len, patch_t_len, patch_ix), - agg_kind: agg_kind_from_binning_scheme(¶ms).unwrap_or(AggKind::DimXBins1), - channel: channel_from_params(¶ms)?, - cache_usage: CacheUsage::from_params(¶ms)?, + channel: channel_from_params(&pairs)?, + agg_kind: agg_kind_from_binning_scheme(&pairs).unwrap_or(AggKind::DimXBins1), + cache_usage: CacheUsage::from_params(&pairs)?, disk_stats_every: ByteSize::kb(disk_stats_every), - report_error: params + report_error: pairs .get("reportError") .map_or("false", |k| k) .parse() @@ -70,6 +76,12 @@ impl PreBinnedQuery { Ok(ret) } + pub fn from_request(head: &Parts) -> Result { + let s1 = format!("dummy:{}", head.uri); + let url = Url::parse(&s1)?; + Self::from_url(&url) + } + pub fn make_query_string(&self) -> String { format!( "{}&channelBackend={}&channelName={}&binningScheme={}&cacheUsage={}&diskStatsEveryKb={}&reportError={}", diff --git a/err/Cargo.toml b/err/Cargo.toml index 59fc7d9..0251dc6 100644 --- a/err/Cargo.toml +++ b/err/Cargo.toml @@ -18,3 +18,4 @@ nom = "6.1.2" tokio-postgres = { version = "0.7", features = ["runtime", "with-chrono-0_4", "with-serde_json-1"] } serde_cbor = "0.11.1" regex = "1.5.4" +url = "2.2" diff --git a/err/src/lib.rs b/err/src/lib.rs index 97320ab..2947cf5 100644 --- a/err/src/lib.rs +++ b/err/src/lib.rs @@ -254,6 +254,12 @@ impl From for Error { } } +impl From for Error { + fn from(k: url::ParseError) -> Self { + Self::with_msg(format!("{:?}", k)) + } +} + pub fn todo() { todo!("TODO"); } diff --git a/httpret/src/api1.rs b/httpret/src/api1.rs index 19be26a..c5f2a2a 100644 --- a/httpret/src/api1.rs +++ b/httpret/src/api1.rs @@ -3,6 +3,7 @@ use err::Error; use http::{Method, StatusCode}; use hyper::{Body, Client, Request, Response}; use netpod::log::*; +use netpod::{ProxyBackend, ProxyConfig}; use serde::{Deserialize, Serialize}; use serde_json::Value as JsonValue; use std::future::Future; @@ -10,41 +11,10 @@ use std::pin::Pin; use std::time::Duration; use tokio::time::timeout_at; -fn get_backends() -> [(&'static str, &'static str, u16); 6] { - // TODO take from config. - err::todo(); - [ - ("gls-archive", "gls-data-api.psi.ch", 8371), - ("hipa-archive", "hipa-data-api.psi.ch", 8082), - ("sf-databuffer", "sf-daqbuf-33.psi.ch", 8371), - ("sf-imagebuffer", "sf-daq-5.psi.ch", 8371), - ("timeout", "sf-daqbuf-33.psi.ch", 8371), - ("error500", "sf-daqbuf-33.psi.ch", 8371), - ] -} - fn get_live_hosts() -> &'static [(&'static str, u16)] { // TODO take from config. err::todo(); - &[ - ("sf-daqbuf-21", 8371), - ("sf-daqbuf-22", 8371), - ("sf-daqbuf-23", 8371), - ("sf-daqbuf-24", 8371), - ("sf-daqbuf-25", 8371), - ("sf-daqbuf-26", 8371), - ("sf-daqbuf-27", 8371), - ("sf-daqbuf-28", 8371), - ("sf-daqbuf-29", 8371), - ("sf-daqbuf-30", 8371), - ("sf-daqbuf-31", 8371), - ("sf-daqbuf-32", 8371), - ("sf-daqbuf-33", 8371), - ("sf-daq-5", 8371), - ("sf-daq-6", 8371), - ("hipa-data-api", 8082), - ("gls-data-api", 8371), - ] + &[] } pub trait BackendAware { @@ -117,7 +87,7 @@ impl FromErrorCode for ChannelSearchResultItemV1 { #[derive(Debug, Serialize, Deserialize)] pub struct ChannelSearchResultV1(pub Vec); -pub async fn channels_list_v1(req: Request) -> Result, Error> { +pub async fn channels_list_v1(req: Request, proxy_config: &ProxyConfig) -> Result, Error> { let reqbody = req.into_body(); let bodyslice = hyper::body::to_bytes(reqbody).await?; let query: ChannelSearchQueryV1 = serde_json::from_slice(&bodyslice)?; @@ -132,7 +102,7 @@ pub async fn channels_list_v1(req: Request) -> Result, Erro .unwrap() }; let back2: Vec<_> = query.backends.iter().map(|x| x.as_str()).collect(); - let spawned = subreq(&back2[..], "channels", &subq_maker)?; + let spawned = subreq(&back2[..], "channels", &subq_maker, proxy_config)?; let mut res = vec![]; for (backend, s) in spawned { res.push((backend, s.await)); @@ -143,54 +113,58 @@ pub async fn channels_list_v1(req: Request) -> Result, Erro Ok(res) } -type TT0 = ( - (&'static str, &'static str, u16), - http::response::Parts, - hyper::body::Bytes, -); +type TT0 = (ProxyBackend, http::response::Parts, hyper::body::Bytes); type TT1 = Result; type TT2 = tokio::task::JoinHandle; type TT3 = Result; type TT4 = Result; type TT7 = Pin + Send>>; -type TT8 = (&'static str, TT7); +type TT8 = (String, TT7); -fn subreq(backends_req: &[&str], endp: &str, subq_maker: &dyn Fn(&str) -> JsonValue) -> Result, Error> { - let backends = get_backends(); +fn subreq( + backends_req: &[&str], + endp: &str, + subq_maker: &dyn Fn(&str) -> JsonValue, + proxy_config: &ProxyConfig, +) -> Result, Error> { + let backends = proxy_config.backends.clone(); let mut spawned = vec![]; for back in &backends { - if backends_req.contains(&back.0) { + if backends_req.contains(&back.name.as_str()) { let back = back.clone(); - let q = subq_maker(back.0); - let endp = match back.0 { + let q = subq_maker(&back.name); + let endp = match back.name.as_str() { "timeout" => "channels_timeout", "error500" => "channels_error500", _ => endp, }; - let uri = format!("http://{}:{}{}/{}", back.1, back.2, "/api/1", endp); + let uri = format!("http://{}:{}{}/{}", back.host, back.port, "/api/1", endp); let req = Request::builder() .method(Method::POST) .uri(uri) .header("content-type", "application/json") .body(Body::from(serde_json::to_string(&q)?))?; - let jh: TT2 = tokio::spawn(async move { - let res = Client::new().request(req).await?; - let (pre, body) = res.into_parts(); - //info!("Answer from {} status {}", back.1, pre.status); - let body_all = hyper::body::to_bytes(body).await?; - //info!("Got {} bytes from {}", body_all.len(), back.1); - Ok::<_, Error>((back, pre, body_all)) + let jh: TT2 = tokio::spawn({ + let back = back.clone(); + async move { + let res = Client::new().request(req).await?; + let (pre, body) = res.into_parts(); + //info!("Answer from {} status {}", back.1, pre.status); + let body_all = hyper::body::to_bytes(body).await?; + //info!("Got {} bytes from {}", body_all.len(), back.1); + Ok::<_, Error>((back, pre, body_all)) + } }); let jh = tokio::time::timeout(std::time::Duration::from_millis(5000), jh); let bx: TT7 = Box::pin(jh); - spawned.push((back.0, bx)); + spawned.push((back.name.clone(), bx)); } } Ok(spawned) } //fn extr<'a, T: BackendAware + FromErrorCode + Deserialize<'a>>(results: Vec<(&str, TT4)>) -> Vec { -fn extr Deserialize<'a>>(results: Vec<(&str, TT4)>) -> Vec { +fn extr Deserialize<'a>>(results: Vec<(String, TT4)>) -> Vec { let mut ret = vec![]; for (backend, r) in results { if let Ok(r20) = r { @@ -203,30 +177,30 @@ fn extr Deserialize<'a>>(results: Vec< error!("more than one result item from {:?}", r2.0); } else { for inp2 in inp { - if inp2.backend() == r2.0 .0 { + if inp2.backend() == r2.0.name { ret.push(inp2); } } } } else { error!("malformed answer from {:?}", r2.0); - ret.push(T::from_error_code(backend, ErrorCode::Error)); + ret.push(T::from_error_code(backend.as_str(), ErrorCode::Error)); } } else { error!("bad answer from {:?}", r2.0); - ret.push(T::from_error_code(backend, ErrorCode::Error)); + ret.push(T::from_error_code(backend.as_str(), ErrorCode::Error)); } } else { error!("bad answer from {:?}", r30); - ret.push(T::from_error_code(backend, ErrorCode::Error)); + ret.push(T::from_error_code(backend.as_str(), ErrorCode::Error)); } } else { error!("subrequest join handle error {:?}", r20); - ret.push(T::from_error_code(backend, ErrorCode::Error)); + ret.push(T::from_error_code(backend.as_str(), ErrorCode::Error)); } } else { error!("subrequest timeout {:?}", r); - ret.push(T::from_error_code(backend, ErrorCode::Timeout)); + ret.push(T::from_error_code(backend.as_str(), ErrorCode::Timeout)); } } ret @@ -288,7 +262,7 @@ impl FromErrorCode for ChannelBackendConfigsV1 { } } -pub async fn channels_config_v1(req: Request) -> Result, Error> { +pub async fn channels_config_v1(req: Request, proxy_config: &ProxyConfig) -> Result, Error> { let reqbody = req.into_body(); let bodyslice = hyper::body::to_bytes(reqbody).await?; let query: ChannelConfigsQueryV1 = serde_json::from_slice(&bodyslice)?; @@ -303,7 +277,7 @@ pub async fn channels_config_v1(req: Request) -> Result, Er .unwrap() }; let back2: Vec<_> = query.backends.iter().map(|x| x.as_str()).collect(); - let spawned = subreq(&back2[..], "channels/config", &subq_maker)?; + let spawned = subreq(&back2[..], "channels/config", &subq_maker, proxy_config)?; let mut res = vec![]; for (backend, s) in spawned { res.push((backend, s.await)); @@ -315,6 +289,8 @@ pub async fn channels_config_v1(req: Request) -> Result, Er } pub async fn gather_json_v1(req_m: Request, path: &str) -> Result, Error> { + // TODO can this be removed? + err::todo(); let mut spawned = vec![]; let (req_h, _) = req_m.into_parts(); for host in get_live_hosts() { @@ -388,3 +364,128 @@ pub async fn gather_json_v1(req_m: Request, path: &str) -> Result, + pathpre: &str, + _proxy_config: &ProxyConfig, +) -> Result, Error> { + let (part_head, part_body) = req.into_parts(); + let bodyslice = hyper::body::to_bytes(part_body).await?; + let gather_from: GatherFromV1 = serde_json::from_slice(&bodyslice)?; + let mut spawned = vec![]; + let uri = part_head.uri; + let path_post = &uri.path()[pathpre.len()..]; + //let hds = part_head.headers; + for gh in gather_from.hosts { + let uri = format!("http://{}:{}/{}", gh.host, gh.port, path_post); + let req = Request::builder().method(Method::GET).uri(uri); + let req = if gh.inst.len() > 0 { + req.header("retrieval_instance", &gh.inst) + } else { + req + }; + let req = req.header(http::header::ACCEPT, "application/json"); + //.body(Body::from(serde_json::to_string(&q)?))?; + let req = req.body(Body::empty()); + let task = tokio::spawn(async move { + //let res = Client::new().request(req); + let res = Client::new().request(req?).await; + Ok::<_, Error>(process_answer(res?).await?) + }); + let task = tokio::time::timeout(std::time::Duration::from_millis(5000), task); + spawned.push((gh.clone(), task)); + } + #[derive(Serialize)] + struct Hres { + gh: GatherHostV1, + res: JsonValue, + } + #[derive(Serialize)] + struct Jres { + hosts: Vec, + } + let mut a = vec![]; + for tr in spawned { + let res = match tr.1.await { + Ok(k) => match k { + Ok(k) => match k { + Ok(k) => k, + Err(e) => JsonValue::String(format!("ERROR({:?})", e)), + }, + Err(e) => JsonValue::String(format!("ERROR({:?})", e)), + }, + Err(e) => JsonValue::String(format!("ERROR({:?})", e)), + }; + a.push(Hres { gh: tr.0, res }); + } + let res = response(StatusCode::OK) + .header(http::header::CONTENT_TYPE, "application/json") + .body(serde_json::to_string(&Jres { hosts: a })?.into())?; + Ok(res) +} + +#[derive(Clone, Serialize, Deserialize)] +struct GatherFromV1 { + hosts: Vec, +} + +#[derive(Clone, Serialize, Deserialize)] +struct GatherHostV1 { + host: String, + port: u16, + inst: String, +} + +async fn process_answer(res: Response) -> Result { + let (pre, mut body) = res.into_parts(); + if pre.status != StatusCode::OK { + use hyper::body::HttpBody; + if let Some(c) = body.data().await { + let c: bytes::Bytes = c?; + let s1 = String::from_utf8(c.to_vec())?; + Ok(JsonValue::String(format!( + "status {} body {}", + pre.status.as_str(), + s1 + ))) + } else { + //use snafu::IntoError; + //Err(Bad{msg:format!("API error")}.into_error(NoneError)).ctxb(SE!(AddPos)) + Ok(JsonValue::String(format!("status {}", pre.status.as_str()))) + } + } else { + let body: hyper::Body = body; + let body_all = hyper::body::to_bytes(body).await?; + let val = match serde_json::from_slice(&body_all) { + Ok(k) => k, + Err(_e) => JsonValue::String(String::from_utf8(body_all.to_vec())?), + }; + Ok::<_, Error>(val) + } +} + +pub async fn proxy_distribute_v1(req: Request) -> Result, Error> { + let (mut sink, body) = Body::channel(); + let uri = format!("http://sf-daqbuf-33:8371{}", req.uri().path()); + let res = Response::builder().status(StatusCode::OK).body(body)?; + tokio::spawn(async move { + let req = Request::builder().method(Method::GET).uri(uri).body(Body::empty())?; + let res = Client::new().request(req).await?; + if res.status() == StatusCode::OK { + let (_heads, mut body) = res.into_parts(); + loop { + use hyper::body::HttpBody; + let chunk = body.data().await; + if let Some(k) = chunk { + let k = k?; + sink.send_data(k).await?; + } else { + break; + } + } + } + Ok::<_, Error>(()) + }); + Ok(res) +} diff --git a/httpret/src/gather.rs b/httpret/src/gather.rs index 53df77f..a9f99b7 100644 --- a/httpret/src/gather.rs +++ b/httpret/src/gather.rs @@ -10,6 +10,7 @@ use std::future::Future; use std::pin::Pin; use std::time::Duration; use tokio::time::sleep; +use url::Url; #[derive(Clone, Serialize, Deserialize)] struct GatherFrom { @@ -162,8 +163,7 @@ pub async fn gather_get_json(req: Request, node_config: &NodeConfigCached) pub async fn gather_get_json_generic( method: http::Method, - uri: String, - schemehostports: Vec, + urls: Vec, nt: NT, ft: FT, timeout: Duration, @@ -173,11 +173,10 @@ where NT: Fn(Response) -> Pin> + Send>> + Send + Sync + Copy + 'static, FT: Fn(Vec) -> Result, Error>, { - let spawned: Vec<_> = schemehostports - .iter() - .map(move |schemehostport| { - let uri = format!("{}{}", schemehostport, uri.clone()); - let req = Request::builder().method(method.clone()).uri(uri); + let spawned: Vec<_> = urls + .into_iter() + .map(move |url| { + let req = Request::builder().method(method.clone()).uri(url.as_str()); //let req = req.header("x-log-from-node-name", format!("{}", node_config.node_config.name)); let req = req.header(http::header::ACCEPT, "application/json"); let req = req.body(Body::empty()); @@ -189,7 +188,7 @@ where res = Client::new().request(req?).fuse() => Ok(nt(res?).await?) } }); - (schemehostport.clone(), task) + (url, task) }) .collect(); let mut a = vec![]; @@ -213,11 +212,9 @@ mod test { #[test] fn try_search() { - let schemehostports = ["http://sf-daqbuf-22:8371".into()]; let fut = gather_get_json_generic( hyper::Method::GET, - format!("/api/4/search/channel"), - schemehostports.to_vec(), + vec![], |_res| { let fut = async { Ok(()) }; Box::pin(fut) diff --git a/httpret/src/lib.rs b/httpret/src/lib.rs index 3fd20a7..3838b36 100644 --- a/httpret/src/lib.rs +++ b/httpret/src/lib.rs @@ -1,4 +1,3 @@ -use crate::api1::{channels_config_v1, channels_list_v1, gather_json_v1}; use crate::gather::gather_get_json; use bytes::Bytes; use disk::binned::prebinned::pre_binned_bytes_for_http; @@ -14,7 +13,7 @@ use hyper::service::{make_service_fn, service_fn}; use hyper::{server::Server, Body, Request, Response}; use net::SocketAddr; use netpod::log::*; -use netpod::{AggKind, Channel, NodeConfigCached, ProxyConfig}; +use netpod::{AggKind, Channel, NodeConfigCached}; use panic::{AssertUnwindSafe, UnwindSafe}; use pin::Pin; use serde::{Deserialize, Serialize}; @@ -28,6 +27,10 @@ pub mod gather; pub mod proxy; pub mod search; +fn proxy_mark() -> &'static str { + "7c5e408a" +} + pub async fn host(node_config: NodeConfigCached) -> Result<(), Error> { let rawjh = taskrun::spawn(events_service(node_config.clone())); use std::str::FromStr; @@ -238,14 +241,6 @@ async fn http_service_try(req: Request, node_config: &NodeConfigCached) -> } else { Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) } - } else if path == "/api/1/channels" { - Ok(channels_list_v1(req).await?) - } else if path == "/api/1/channels/config" { - Ok(channels_config_v1(req).await?) - } else if path == "/api/1/stats/version" { - Ok(gather_json_v1(req, "/stats/version").await?) - } else if path.starts_with("/api/1/stats/") { - Ok(gather_json_v1(req, path).await?) } else { Ok(response(StatusCode::NOT_FOUND).body(Body::from(format!( "Sorry, not found: {:?} {:?} {:?}", @@ -603,52 +598,3 @@ pub async fn ca_connect_1(req: Request, node_config: &NodeConfigCached) -> })))?; Ok(ret) } - -pub async fn proxy(proxy_config: ProxyConfig) -> Result<(), Error> { - use std::str::FromStr; - let addr = SocketAddr::from_str(&format!("{}:{}", proxy_config.listen, proxy_config.port))?; - let make_service = make_service_fn({ - move |_conn| { - let proxy_config = proxy_config.clone(); - async move { - Ok::<_, Error>(service_fn({ - move |req| { - let f = proxy_http_service(req, proxy_config.clone()); - Cont { f: Box::pin(f) } - } - })) - } - } - }); - Server::bind(&addr).serve(make_service).await?; - Ok(()) -} - -async fn proxy_http_service(req: Request, proxy_config: ProxyConfig) -> Result, Error> { - match proxy_http_service_try(req, &proxy_config).await { - Ok(k) => Ok(k), - Err(e) => { - error!("data_api_proxy sees error: {:?}", e); - Err(e) - } - } -} - -async fn proxy_http_service_try(req: Request, proxy_config: &ProxyConfig) -> Result, Error> { - let uri = req.uri().clone(); - let path = uri.path(); - if path == "/api/4/search/channel" { - if req.method() == Method::GET { - Ok(proxy::channel_search(req, &proxy_config).await?) - } else { - Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) - } - } else { - Ok(response(StatusCode::NOT_FOUND).body(Body::from(format!( - "Sorry, not found: {:?} {:?} {:?}", - req.method(), - req.uri().path(), - req.uri().query(), - )))?) - } -} diff --git a/httpret/src/proxy.rs b/httpret/src/proxy.rs index e32e13d..dbb200e 100644 --- a/httpret/src/proxy.rs +++ b/httpret/src/proxy.rs @@ -1,47 +1,204 @@ -use crate::response; +use crate::api1::{channels_config_v1, channels_list_v1, gather_json_2_v1, gather_json_v1, proxy_distribute_v1}; +use crate::gather::gather_get_json_generic; +use crate::{proxy_mark, response, Cont}; use err::Error; use http::{HeaderValue, StatusCode}; -use hyper::{Body, Request, Response}; -use netpod::{ChannelSearchQuery, ProxyConfig}; +use hyper::service::{make_service_fn, service_fn}; +use hyper::{Body, Request, Response, Server}; +use netpod::log::*; +use netpod::{ChannelSearchQuery, ChannelSearchResult, ProxyConfig}; use std::future::Future; +use std::net::SocketAddr; use std::pin::Pin; use std::time::Duration; +use url::Url; + +pub async fn proxy(proxy_config: ProxyConfig) -> Result<(), Error> { + use std::str::FromStr; + let addr = SocketAddr::from_str(&format!("{}:{}", proxy_config.listen, proxy_config.port))?; + let make_service = make_service_fn({ + move |_conn| { + let proxy_config = proxy_config.clone(); + async move { + Ok::<_, Error>(service_fn({ + move |req| { + let f = proxy_http_service(req, proxy_config.clone()); + Cont { f: Box::pin(f) } + } + })) + } + } + }); + Server::bind(&addr).serve(make_service).await?; + Ok(()) +} + +async fn proxy_http_service(req: Request, proxy_config: ProxyConfig) -> Result, Error> { + match proxy_http_service_try(req, &proxy_config).await { + Ok(k) => Ok(k), + Err(e) => { + error!("data_api_proxy sees error: {:?}", e); + Err(e) + } + } +} + +async fn proxy_http_service_try(req: Request, proxy_config: &ProxyConfig) -> Result, Error> { + let uri = req.uri().clone(); + let path = uri.path(); + if path == "/api/1/channels" { + Ok(channels_list_v1(req, proxy_config).await?) + } else if path == "/api/1/channels/config" { + Ok(channels_config_v1(req, proxy_config).await?) + } else if path == "/api/1/stats/version" { + Ok(gather_json_v1(req, "/stats/version").await?) + } else if path.starts_with("/api/1/stats/") { + Ok(gather_json_v1(req, path).await?) + } else if path.starts_with("/api/1/gather/") { + Ok(gather_json_2_v1(req, "/api/1/gather/", proxy_config).await?) + } else if path.starts_with("/api/4/search/channel") { + Ok(channel_search(req, proxy_config).await?) + } else if path.starts_with("/distribute") { + proxy_distribute_v1(req).await + } else { + Ok(response(StatusCode::NOT_FOUND).body(Body::from(format!( + "Sorry, not found: {:?} {:?} {:?}", + req.method(), + req.uri().path(), + req.uri().query(), + )))?) + } +} pub async fn channel_search(req: Request, proxy_config: &ProxyConfig) -> Result, Error> { let (head, _body) = req.into_parts(); match head.headers.get("accept") { Some(v) => { if v == "application/json" { - // TODO actually pass on the query parameters to the sub query. - err::todo(); - let query = ChannelSearchQuery::from_request(head.uri.query())?; - let uri = format!("/api/4/search/channel"); - let nt = |_res| { - let fut = async { Ok(0f32) }; - Box::pin(fut) as Pin> + Send>> + let query = ChannelSearchQuery::from_query_string(head.uri.query())?; + let urls: Vec> = proxy_config + .search_hosts + .iter() + .map(|sh| match Url::parse(&format!("{}/api/4/search/channel", sh)) { + Ok(mut url) => { + query.append_to_url(&mut url); + Ok(url) + } + Err(_e) => Err(Error::with_msg(format!("parse error for: {:?}", sh))), + }) + .collect(); + for u in &urls { + match u { + Ok(url) => { + info!("URL: {}", url.as_str()); + } + Err(_) => { + return Err(Error::with_msg("url parse error")); + } + } + } + let urls: Vec<_> = urls.into_iter().map(Result::unwrap).collect(); + let nt = |res| { + let fut = async { + let body = hyper::body::to_bytes(res).await?; + info!("got a result {:?}", body); + let res: ChannelSearchResult = match serde_json::from_slice(&body) { + Ok(k) => k, + Err(_) => ChannelSearchResult { channels: vec![] }, + }; + Ok(res) + }; + Box::pin(fut) as Pin + Send>> }; - let ft = |_all| { + let ft = |all: Vec| { + let mut res = vec![]; + for j in all { + for k in j.channels { + res.push(k); + } + } + let res = ChannelSearchResult { channels: res }; let res = response(StatusCode::OK) .header(http::header::CONTENT_TYPE, "application/json") - .body(serde_json::to_string(&42)?.into())?; + .body(Body::from(serde_json::to_string(&res)?))?; Ok(res) }; - let mut ret = crate::gather::gather_get_json_generic::( - http::Method::GET, - uri, - proxy_config.search_hosts.clone(), - nt, - ft, - Duration::from_millis(3000), - ) - .await?; + let mut ret = + gather_get_json_generic(http::Method::GET, urls, nt, ft, Duration::from_millis(3000)).await?; ret.headers_mut() - .append("x-proxy-log-mark", HeaderValue::from_str("proxied")?); + .append("x-proxy-log-mark", HeaderValue::from_str(proxy_mark())?); Ok(ret) } else { Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?) } } - _ => Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?), + None => Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?), + } +} + +pub async fn events(req: Request, proxy_config: &ProxyConfig) -> Result, Error> { + let (head, _body) = req.into_parts(); + match head.headers.get("accept") { + Some(v) => { + if v == "application/json" { + Url::parse(&format!("{}", head.uri))?; + let query = ChannelSearchQuery::from_query_string(head.uri.query())?; + let urls: Vec> = proxy_config + .search_hosts + .iter() + .map(|sh| match Url::parse(&format!("{}/api/4/search/channel", sh)) { + Ok(mut url) => { + query.append_to_url(&mut url); + Ok(url) + } + Err(_e) => Err(Error::with_msg(format!("parse error for: {:?}", sh))), + }) + .collect(); + for u in &urls { + match u { + Ok(url) => { + info!("URL: {}", url.as_str()); + } + Err(_) => { + return Err(Error::with_msg("url parse error")); + } + } + } + let urls: Vec<_> = urls.into_iter().map(Result::unwrap).collect(); + let nt = |res| { + let fut = async { + let body = hyper::body::to_bytes(res).await?; + info!("got a result {:?}", body); + let res: ChannelSearchResult = match serde_json::from_slice(&body) { + Ok(k) => k, + Err(_) => ChannelSearchResult { channels: vec![] }, + }; + Ok(res) + }; + Box::pin(fut) as Pin + Send>> + }; + let ft = |all: Vec| { + let mut res = vec![]; + for j in all { + for k in j.channels { + res.push(k); + } + } + let res = ChannelSearchResult { channels: res }; + let res = response(StatusCode::OK) + .header(http::header::CONTENT_TYPE, "application/json") + .body(Body::from(serde_json::to_string(&res)?))?; + Ok(res) + }; + let mut ret = + gather_get_json_generic(http::Method::GET, urls, nt, ft, Duration::from_millis(3000)).await?; + ret.headers_mut() + .append("x-proxy-log-mark", HeaderValue::from_str(proxy_mark())?); + Ok(ret) + } else { + Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?) + } + } + None => Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?), } } diff --git a/httpret/src/search.rs b/httpret/src/search.rs index f5fbcb5..f39cd1e 100644 --- a/httpret/src/search.rs +++ b/httpret/src/search.rs @@ -2,12 +2,17 @@ use crate::response; use err::Error; use hyper::{Body, Request, Response, StatusCode}; use netpod::{ChannelSearchQuery, NodeConfigCached}; +use url::Url; pub async fn channel_search(req: Request, node_config: &NodeConfigCached) -> Result, Error> { let (head, _body) = req.into_parts(); match head.headers.get("accept") { Some(v) if v == "application/json" => { - let query = ChannelSearchQuery::from_request(head.uri.query())?; + let s1 = format!("dummy:{}", head.uri); + //netpod::log::info!("try to parse {}", s1); + let url = Url::parse(&s1)?; + let query = ChannelSearchQuery::from_url(&url)?; + //let query = ChannelSearchQuery::from_query_string(head.uri.query())?; let res = dbconn::search::search_channel(query, node_config).await?; let body = Body::from(serde_json::to_string(&res)?); let ret = super::response(StatusCode::OK).body(body)?; diff --git a/netpod/src/lib.rs b/netpod/src/lib.rs index aa03cb2..868c964 100644 --- a/netpod/src/lib.rs +++ b/netpod/src/lib.rs @@ -3,6 +3,7 @@ use err::Error; use futures_core::Stream; use futures_util::StreamExt; use serde::{Deserialize, Serialize}; +use std::collections::BTreeMap; use std::fmt::{Debug, Display, Formatter}; use std::path::PathBuf; use std::pin::Pin; @@ -11,6 +12,7 @@ use std::task::{Context, Poll}; use timeunits::*; #[allow(unused_imports)] use tracing::{debug, error, info, trace, warn}; +use url::Url; pub mod status; pub mod streamext; @@ -853,7 +855,7 @@ pub struct ChannelSearchQuery { } impl ChannelSearchQuery { - pub fn from_request(query: Option<&str>) -> Result { + pub fn from_query_string(query: Option<&str>) -> Result { let params = query_params(query); let ret = Self { name_regex: params.get("nameRegex").map_or("".into(), |k| k.into()), @@ -862,6 +864,42 @@ impl ChannelSearchQuery { }; Ok(ret) } + + pub fn from_url(url: &Url) -> Result { + let mut pairs = BTreeMap::new(); + for (j, k) in url.query_pairs() { + pairs.insert(j.to_string(), k.to_string()); + } + let ret = Self { + name_regex: pairs.get("nameRegex").map_or(String::new(), |k| k.clone()), + source_regex: pairs.get("sourceRegex").map_or(String::new(), |k| k.clone()), + description_regex: pairs.get("descriptionRegex").map_or(String::new(), |k| k.clone()), + }; + Ok(ret) + } + + pub fn append_to_url(&self, url: &mut Url) { + url.query_pairs_mut().append_pair("nameRegex", &self.name_regex); + url.query_pairs_mut().append_pair("sourceRegex", &self.source_regex); + url.query_pairs_mut() + .append_pair("descriptionRegex", &self.description_regex); + } +} + +#[cfg(test)] +mod test { + #[test] + fn parse_url_1() { + let mut url = url::Url::parse("http://host/123").unwrap(); + url.query_pairs_mut().append_pair("text", "jo jo • yo"); + assert_eq!(url.to_string(), "http://host/123?text=jo+jo+%E2%80%A2+yo"); + } + + #[test] + fn parse_url_2() { + let url = url::Url::parse("dummy:?123").unwrap(); + assert_eq!(url.query().unwrap(), "123") + } } #[derive(Serialize, Deserialize)] @@ -881,9 +919,17 @@ pub struct ChannelSearchResult { pub channels: Vec, } +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct ProxyBackend { + pub name: String, + pub host: String, + pub port: u16, +} + #[derive(Clone, Debug, Serialize, Deserialize)] pub struct ProxyConfig { pub listen: String, pub port: u16, pub search_hosts: Vec, + pub backends: Vec, }