From 5219b56488d1b1094c6d0a7a120475e0419719e2 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Tue, 22 Jun 2021 13:27:43 +0200 Subject: [PATCH] Use /api/4 on nodes for /api/1 channel search --- dbconn/src/search.rs | 3 +- httpret/src/api1.rs | 209 +++++++++++++++++++++++++++++++++++++----- httpret/src/gather.rs | 42 +++++++-- httpret/src/lib.rs | 1 + httpret/src/proxy.rs | 55 ++++++----- netpod/src/lib.rs | 2 + 6 files changed, 253 insertions(+), 59 deletions(-) diff --git a/dbconn/src/search.rs b/dbconn/src/search.rs index bf0d3ac..8577128 100644 --- a/dbconn/src/search.rs +++ b/dbconn/src/search.rs @@ -44,7 +44,8 @@ pub async fn search_channel( None => vec![], }; let k = ChannelSearchSingleResult { - backend: row.get(7), + backend: node_config.node.backend.clone(), + //backend: row.get(7), name: row.get(1), source: row.get(2), ty: row.get(3), diff --git a/httpret/src/api1.rs b/httpret/src/api1.rs index c5f2a2a..08d13f0 100644 --- a/httpret/src/api1.rs +++ b/httpret/src/api1.rs @@ -1,15 +1,18 @@ +use crate::gather::{gather_get_json_generic, SubRes}; use crate::response; use err::Error; use http::{Method, StatusCode}; use hyper::{Body, Client, Request, Response}; +use itertools::Itertools; use netpod::log::*; -use netpod::{ProxyBackend, ProxyConfig}; +use netpod::{ChannelSearchQuery, ChannelSearchResult, ProxyBackend, ProxyConfig, APP_JSON}; use serde::{Deserialize, Serialize}; use serde_json::Value as JsonValue; use std::future::Future; use std::pin::Pin; use std::time::Duration; use tokio::time::timeout_at; +use url::Url; fn get_live_hosts() -> &'static [(&'static str, u16)] { // TODO take from config. @@ -87,30 +90,186 @@ impl FromErrorCode for ChannelSearchResultItemV1 { #[derive(Debug, Serialize, Deserialize)] pub struct ChannelSearchResultV1(pub Vec); -pub async fn channels_list_v1(req: Request, proxy_config: &ProxyConfig) -> Result, Error> { - let reqbody = req.into_body(); - let bodyslice = hyper::body::to_bytes(reqbody).await?; - let query: ChannelSearchQueryV1 = serde_json::from_slice(&bodyslice)?; - let subq_maker = |backend: &str| -> JsonValue { - serde_json::to_value(ChannelSearchQueryV1 { - regex: query.regex.clone(), - source_regex: query.source_regex.clone(), - description_regex: query.description_regex.clone(), - backends: vec![backend.into()], - ordering: query.ordering.clone(), - }) - .unwrap() - }; - let back2: Vec<_> = query.backends.iter().map(|x| x.as_str()).collect(); - let spawned = subreq(&back2[..], "channels", &subq_maker, proxy_config)?; - let mut res = vec![]; - for (backend, s) in spawned { - res.push((backend, s.await)); +pub async fn channel_search_list_v1(req: Request, proxy_config: &ProxyConfig) -> Result, Error> { + let (head, reqbody) = req.into_parts(); + let bodybytes = hyper::body::to_bytes(reqbody).await?; + let query: ChannelSearchQueryV1 = serde_json::from_slice(&bodybytes)?; + match head.headers.get("accept") { + Some(v) => { + if v == APP_JSON { + let query = ChannelSearchQuery { + name_regex: query.regex.map_or(String::new(), |k| k), + source_regex: query.source_regex.map_or(String::new(), |k| k), + description_regex: query.description_regex.map_or(String::new(), |k| k), + }; + let urls = proxy_config + .search_hosts + .iter() + .map(|sh| match Url::parse(&format!("{}/api/4/search/channel", sh)) { + Ok(mut url) => { + query.append_to_url(&mut url); + Ok(url) + } + Err(e) => Err(Error::with_msg(format!("parse error for: {:?} {:?}", sh, e))), + }) + .fold_ok(vec![], |mut a, x| { + a.push(x); + a + })?; + let tags: Vec<_> = urls.iter().map(|k| k.to_string()).collect(); + let nt = |res| { + let fut = async { + let body = hyper::body::to_bytes(res).await?; + let res: ChannelSearchResult = match serde_json::from_slice(&body) { + Ok(k) => k, + Err(_) => ChannelSearchResult { channels: vec![] }, + }; + Ok(res) + }; + Box::pin(fut) as Pin + Send>> + }; + let ft = |all: Vec>| { + let mut res = ChannelSearchResultV1(vec![]); + for j in all { + for k in j.val.channels { + let mut found = false; + let mut i2 = 0; + for i1 in 0..res.0.len() { + if res.0[i1].backend == k.backend { + found = true; + i2 = i1; + break; + } + } + if !found { + let u = ChannelSearchResultItemV1 { + backend: k.backend, + channels: vec![], + error: None, + }; + res.0.push(u); + i2 = res.0.len() - 1; + } + res.0[i2].channels.push(k.name); + } + } + let res = response(StatusCode::OK) + .header(http::header::CONTENT_TYPE, APP_JSON) + .body(Body::from(serde_json::to_string(&res)?))?; + Ok(res) + }; + let ret = + gather_get_json_generic(http::Method::GET, urls, None, tags, nt, ft, Duration::from_millis(3000)) + .await?; + Ok(ret) + } else { + Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?) + } + } + None => Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?), + } +} + +pub async fn channel_search_configs_v1( + req: Request, + proxy_config: &ProxyConfig, +) -> Result, Error> { + let (head, reqbody) = req.into_parts(); + let bodybytes = hyper::body::to_bytes(reqbody).await?; + let query: ChannelSearchQueryV1 = serde_json::from_slice(&bodybytes)?; + match head.headers.get("accept") { + Some(v) => { + if v == APP_JSON { + // Transform the ChannelSearchQueryV1 to ChannelSearchQuery + let query = ChannelSearchQuery { + name_regex: query.regex.map_or(String::new(), |k| k), + source_regex: query.source_regex.map_or(String::new(), |k| k), + description_regex: query.description_regex.map_or(String::new(), |k| k), + }; + let urls = proxy_config + .search_hosts + .iter() + .map(|sh| match Url::parse(&format!("{}/api/4/search/channel", sh)) { + Ok(mut url) => { + query.append_to_url(&mut url); + Ok(url) + } + Err(e) => Err(Error::with_msg(format!("parse error for: {:?} {:?}", sh, e))), + }) + .fold_ok(vec![], |mut a, x| { + a.push(x); + a + })?; + let tags: Vec<_> = urls.iter().map(|k| k.to_string()).collect(); + let nt = |res| { + let fut = async { + let body = hyper::body::to_bytes(res).await?; + let res: ChannelSearchResult = match serde_json::from_slice(&body) { + Ok(k) => k, + Err(_) => ChannelSearchResult { channels: vec![] }, + }; + Ok(res) + }; + Box::pin(fut) as Pin + Send>> + }; + let ft = |all: Vec>| { + let mut res = ChannelConfigsResponseV1(vec![]); + for j in all { + for k in j.val.channels { + let mut found = false; + let mut i2 = 0; + for i1 in 0..res.0.len() { + if res.0[i1].backend == k.backend { + found = true; + i2 = i1; + break; + } + } + if !found { + let u = ChannelBackendConfigsV1 { + backend: k.backend.clone(), + channels: vec![], + error: None, + }; + res.0.push(u); + i2 = res.0.len() - 1; + } + { + let shape = if k.shape.len() == 0 { None } else { Some(k.shape) }; + let unit = if k.unit.len() == 0 { None } else { Some(k.unit) }; + let description = if k.description.len() == 0 { + None + } else { + Some(k.description) + }; + let t = ChannelConfigV1 { + backend: k.backend, + name: k.name, + source: k.source, + description, + ty: k.ty, + shape, + unit, + }; + res.0[i2].channels.push(t); + } + } + } + let res = response(StatusCode::OK) + .header(http::header::CONTENT_TYPE, APP_JSON) + .body(Body::from(serde_json::to_string(&res)?))?; + Ok(res) + }; + let ret = + gather_get_json_generic(http::Method::GET, urls, None, tags, nt, ft, Duration::from_millis(3000)) + .await?; + Ok(ret) + } else { + Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?) + } + } + None => Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?), } - let res2 = ChannelSearchResultV1(extr(res)); - let body = serde_json::to_string(&res2.0)?; - let res = response(StatusCode::OK).body(body.into())?; - Ok(res) } type TT0 = (ProxyBackend, http::response::Parts, hyper::body::Bytes); @@ -121,6 +280,8 @@ type TT4 = Result; type TT7 = Pin + Send>>; type TT8 = (String, TT7); +// TODO try to get rid of this. + fn subreq( backends_req: &[&str], endp: &str, diff --git a/httpret/src/gather.rs b/httpret/src/gather.rs index 2a739e7..c07cb1d 100644 --- a/httpret/src/gather.rs +++ b/httpret/src/gather.rs @@ -4,7 +4,7 @@ use futures_util::{select, FutureExt}; use http::{Method, StatusCode}; use hyper::{Body, Client, Request, Response}; use hyper_tls::HttpsConnector; -use netpod::{Node, NodeConfigCached}; +use netpod::{Node, NodeConfigCached, APP_JSON}; use serde::{Deserialize, Serialize}; use serde_json::Value as JsonValue; use std::future::Future; @@ -162,9 +162,16 @@ pub async fn gather_get_json(req: Request, node_config: &NodeConfigCached) Ok(res) } +pub struct SubRes { + pub tag: String, + pub val: T, +} + pub async fn gather_get_json_generic( method: http::Method, urls: Vec, + bodies: Option>, + tags: Vec, nt: NT, ft: FT, timeout: Duration, @@ -172,23 +179,30 @@ pub async fn gather_get_json_generic( where SM: Send + 'static, NT: Fn(Response) -> Pin> + Send>> + Send + Sync + Copy + 'static, - FT: Fn(Vec) -> Result, Error>, + FT: Fn(Vec>) -> Result, Error>, { + assert!(urls.len() == tags.len()); + let bodies: Vec<_> = match bodies { + None => (0..urls.len()).into_iter().map(|_| Body::empty()).collect(), + Some(bodies) => bodies, + }; let spawned: Vec<_> = urls .into_iter() - .map(move |url| { + .zip(bodies.into_iter()) + .zip(tags.into_iter()) + .map(move |((url, body), tag)| { let url_str = url.as_str(); let is_tls = if url_str.starts_with("https://") { true } else { false }; let req = Request::builder().method(method.clone()).uri(url_str); //let req = req.header("x-log-from-node-name", format!("{}", node_config.node_config.name)); - let req = req.header(http::header::ACCEPT, "application/json"); - let req = req.body(Body::empty()); + let req = req.header(http::header::ACCEPT, APP_JSON); + let req = req.body(body); let task = tokio::spawn(async move { select! { - _ = sleep(timeout).fuse() => { - Err(Error::with_msg("timeout")) - } - res = { + _ = sleep(timeout).fuse() => { + Err(Error::with_msg("timeout")) + } + res = { if is_tls { let https = HttpsConnector::new(); let client = Client::builder().build::<_, hyper::Body>(https); @@ -198,7 +212,13 @@ where let client = Client::new(); client.request(req?).fuse() } - } => Ok(nt(res?).await?) + } => { + let ret = SubRes { + tag: tag, + val:nt(res?).await?, + }; + Ok(ret) + } } }); (url, task) @@ -228,6 +248,8 @@ mod test { let fut = gather_get_json_generic( hyper::Method::GET, vec![], + None, + vec![], |_res| { let fut = async { Ok(()) }; Box::pin(fut) diff --git a/httpret/src/lib.rs b/httpret/src/lib.rs index b26b51a..a90444d 100644 --- a/httpret/src/lib.rs +++ b/httpret/src/lib.rs @@ -261,6 +261,7 @@ where .status(status) .header("Access-Control-Allow-Origin", "*") .header("Access-Control-Allow-Headers", "*") + .header("x-proxy-log-mark", proxy_mark()) } struct BodyStreamWrap(netpod::BodyStream); diff --git a/httpret/src/proxy.rs b/httpret/src/proxy.rs index 094b64e..a9d9b1f 100644 --- a/httpret/src/proxy.rs +++ b/httpret/src/proxy.rs @@ -1,17 +1,20 @@ -use crate::api1::{channels_config_v1, channels_list_v1, gather_json_2_v1, gather_json_v1, proxy_distribute_v1}; -use crate::gather::gather_get_json_generic; -use crate::{proxy_mark, response, Cont}; +use crate::api1::{ + channel_search_configs_v1, channel_search_list_v1, channels_config_v1, gather_json_2_v1, gather_json_v1, + proxy_distribute_v1, +}; +use crate::gather::{gather_get_json_generic, SubRes}; +use crate::{response, Cont}; use disk::binned::query::BinnedQuery; use disk::events::PlainEventsJsonQuery; use err::Error; -use http::{HeaderValue, StatusCode}; +use http::StatusCode; use hyper::service::{make_service_fn, service_fn}; use hyper::{Body, Request, Response, Server}; use itertools::Itertools; use netpod::log::*; use netpod::{ AppendToUrl, ChannelConfigQuery, ChannelSearchQuery, ChannelSearchResult, FromUrl, HasBackend, HasTimeout, - ProxyConfig, + ProxyConfig, APP_JSON, }; use serde::{Deserialize, Serialize}; use serde_json::Value as JsonValue; @@ -55,9 +58,11 @@ async fn proxy_http_service_try(req: Request, proxy_config: &ProxyConfig) let uri = req.uri().clone(); let path = uri.path(); if path == "/api/1/channels" { - Ok(channels_list_v1(req, proxy_config).await?) + Ok(channel_search_list_v1(req, proxy_config).await?) } else if path == "/api/1/channels/config" { Ok(channels_config_v1(req, proxy_config).await?) + } else if path == "/api/1/channelsA/config" { + Ok(channel_search_configs_v1(req, proxy_config).await?) } else if path == "/api/1/stats/version" { Ok(gather_json_v1(req, "/stats/version").await?) } else if path == "/api/1/stats/" { @@ -118,6 +123,7 @@ pub async fn channel_search(req: Request, proxy_config: &ProxyConfig) -> R a.push(x); a })?; + let tags = urls.iter().map(|k| k.to_string()).collect(); let nt = |res| { let fut = async { let body = hyper::body::to_bytes(res).await?; @@ -130,10 +136,10 @@ pub async fn channel_search(req: Request, proxy_config: &ProxyConfig) -> R }; Box::pin(fut) as Pin + Send>> }; - let ft = |all: Vec| { + let ft = |all: Vec>| { let mut res = vec![]; for j in all { - for k in j.channels { + for k in j.val.channels { res.push(k); } } @@ -143,10 +149,9 @@ pub async fn channel_search(req: Request, proxy_config: &ProxyConfig) -> R .body(Body::from(serde_json::to_string(&res)?))?; Ok(res) }; - let mut ret = - gather_get_json_generic(http::Method::GET, urls, nt, ft, Duration::from_millis(3000)).await?; - ret.headers_mut() - .append("x-proxy-log-mark", HeaderValue::from_str(proxy_mark())?); + let ret = + gather_get_json_generic(http::Method::GET, urls, None, tags, nt, ft, Duration::from_millis(3000)) + .await?; Ok(ret) } else { Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?) @@ -183,6 +188,7 @@ where a.push(x); a })?; + let tags: Vec<_> = urls.iter().map(|k| k.to_string()).collect(); let nt = |res| { let fut = async { let body = hyper::body::to_bytes(res).await?; @@ -193,19 +199,20 @@ where }; Box::pin(fut) as Pin + Send>> }; - let ft = |all: Vec| { - let res = match all.first() { - Some(item) => Ok(item), - None => Err(Error::with_msg("no response from upstream")), - }?; - let res = response(StatusCode::OK) - .header(http::header::CONTENT_TYPE, "application/json") - .body(Body::from(serde_json::to_string(res)?))?; - Ok(res) + let ft = |mut all: Vec>| { + if all.len() > 0 { + all.truncate(1); + let z = all.pop().unwrap(); + let res = z.val; + let res = response(StatusCode::OK) + .header(http::header::CONTENT_TYPE, APP_JSON) + .body(Body::from(serde_json::to_string(&res)?))?; + return Ok(res); + } else { + return Err(Error::with_msg("no response from upstream")); + } }; - let mut ret = gather_get_json_generic(http::Method::GET, urls, nt, ft, query.timeout()).await?; - ret.headers_mut() - .append("x-proxy-log-mark", HeaderValue::from_str(proxy_mark())?); + let ret = gather_get_json_generic(http::Method::GET, urls, None, tags, nt, ft, query.timeout()).await?; Ok(ret) } else { Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?) diff --git a/netpod/src/lib.rs b/netpod/src/lib.rs index 38a759b..7c1a0cf 100644 --- a/netpod/src/lib.rs +++ b/netpod/src/lib.rs @@ -19,6 +19,8 @@ use url::Url; pub mod status; pub mod streamext; +pub const APP_JSON: &'static str = "application/json"; + #[derive(Clone, Debug, Serialize, Deserialize)] pub struct AggQuerySingleChannel { pub channel_config: ChannelConfig,