Use /api/4 on nodes for /api/1 channel search
This commit is contained in:
@@ -44,7 +44,8 @@ pub async fn search_channel(
|
||||
None => vec![],
|
||||
};
|
||||
let k = ChannelSearchSingleResult {
|
||||
backend: row.get(7),
|
||||
backend: node_config.node.backend.clone(),
|
||||
//backend: row.get(7),
|
||||
name: row.get(1),
|
||||
source: row.get(2),
|
||||
ty: row.get(3),
|
||||
|
||||
@@ -1,15 +1,18 @@
|
||||
use crate::gather::{gather_get_json_generic, SubRes};
|
||||
use crate::response;
|
||||
use err::Error;
|
||||
use http::{Method, StatusCode};
|
||||
use hyper::{Body, Client, Request, Response};
|
||||
use itertools::Itertools;
|
||||
use netpod::log::*;
|
||||
use netpod::{ProxyBackend, ProxyConfig};
|
||||
use netpod::{ChannelSearchQuery, ChannelSearchResult, ProxyBackend, ProxyConfig, APP_JSON};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json::Value as JsonValue;
|
||||
use std::future::Future;
|
||||
use std::pin::Pin;
|
||||
use std::time::Duration;
|
||||
use tokio::time::timeout_at;
|
||||
use url::Url;
|
||||
|
||||
fn get_live_hosts() -> &'static [(&'static str, u16)] {
|
||||
// TODO take from config.
|
||||
@@ -87,30 +90,186 @@ impl FromErrorCode for ChannelSearchResultItemV1 {
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct ChannelSearchResultV1(pub Vec<ChannelSearchResultItemV1>);
|
||||
|
||||
pub async fn channels_list_v1(req: Request<Body>, proxy_config: &ProxyConfig) -> Result<Response<Body>, Error> {
|
||||
let reqbody = req.into_body();
|
||||
let bodyslice = hyper::body::to_bytes(reqbody).await?;
|
||||
let query: ChannelSearchQueryV1 = serde_json::from_slice(&bodyslice)?;
|
||||
let subq_maker = |backend: &str| -> JsonValue {
|
||||
serde_json::to_value(ChannelSearchQueryV1 {
|
||||
regex: query.regex.clone(),
|
||||
source_regex: query.source_regex.clone(),
|
||||
description_regex: query.description_regex.clone(),
|
||||
backends: vec![backend.into()],
|
||||
ordering: query.ordering.clone(),
|
||||
})
|
||||
.unwrap()
|
||||
};
|
||||
let back2: Vec<_> = query.backends.iter().map(|x| x.as_str()).collect();
|
||||
let spawned = subreq(&back2[..], "channels", &subq_maker, proxy_config)?;
|
||||
let mut res = vec![];
|
||||
for (backend, s) in spawned {
|
||||
res.push((backend, s.await));
|
||||
pub async fn channel_search_list_v1(req: Request<Body>, proxy_config: &ProxyConfig) -> Result<Response<Body>, Error> {
|
||||
let (head, reqbody) = req.into_parts();
|
||||
let bodybytes = hyper::body::to_bytes(reqbody).await?;
|
||||
let query: ChannelSearchQueryV1 = serde_json::from_slice(&bodybytes)?;
|
||||
match head.headers.get("accept") {
|
||||
Some(v) => {
|
||||
if v == APP_JSON {
|
||||
let query = ChannelSearchQuery {
|
||||
name_regex: query.regex.map_or(String::new(), |k| k),
|
||||
source_regex: query.source_regex.map_or(String::new(), |k| k),
|
||||
description_regex: query.description_regex.map_or(String::new(), |k| k),
|
||||
};
|
||||
let urls = proxy_config
|
||||
.search_hosts
|
||||
.iter()
|
||||
.map(|sh| match Url::parse(&format!("{}/api/4/search/channel", sh)) {
|
||||
Ok(mut url) => {
|
||||
query.append_to_url(&mut url);
|
||||
Ok(url)
|
||||
}
|
||||
Err(e) => Err(Error::with_msg(format!("parse error for: {:?} {:?}", sh, e))),
|
||||
})
|
||||
.fold_ok(vec![], |mut a, x| {
|
||||
a.push(x);
|
||||
a
|
||||
})?;
|
||||
let tags: Vec<_> = urls.iter().map(|k| k.to_string()).collect();
|
||||
let nt = |res| {
|
||||
let fut = async {
|
||||
let body = hyper::body::to_bytes(res).await?;
|
||||
let res: ChannelSearchResult = match serde_json::from_slice(&body) {
|
||||
Ok(k) => k,
|
||||
Err(_) => ChannelSearchResult { channels: vec![] },
|
||||
};
|
||||
Ok(res)
|
||||
};
|
||||
Box::pin(fut) as Pin<Box<dyn Future<Output = _> + Send>>
|
||||
};
|
||||
let ft = |all: Vec<SubRes<ChannelSearchResult>>| {
|
||||
let mut res = ChannelSearchResultV1(vec![]);
|
||||
for j in all {
|
||||
for k in j.val.channels {
|
||||
let mut found = false;
|
||||
let mut i2 = 0;
|
||||
for i1 in 0..res.0.len() {
|
||||
if res.0[i1].backend == k.backend {
|
||||
found = true;
|
||||
i2 = i1;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
let u = ChannelSearchResultItemV1 {
|
||||
backend: k.backend,
|
||||
channels: vec![],
|
||||
error: None,
|
||||
};
|
||||
res.0.push(u);
|
||||
i2 = res.0.len() - 1;
|
||||
}
|
||||
res.0[i2].channels.push(k.name);
|
||||
}
|
||||
}
|
||||
let res = response(StatusCode::OK)
|
||||
.header(http::header::CONTENT_TYPE, APP_JSON)
|
||||
.body(Body::from(serde_json::to_string(&res)?))?;
|
||||
Ok(res)
|
||||
};
|
||||
let ret =
|
||||
gather_get_json_generic(http::Method::GET, urls, None, tags, nt, ft, Duration::from_millis(3000))
|
||||
.await?;
|
||||
Ok(ret)
|
||||
} else {
|
||||
Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?)
|
||||
}
|
||||
}
|
||||
None => Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn channel_search_configs_v1(
|
||||
req: Request<Body>,
|
||||
proxy_config: &ProxyConfig,
|
||||
) -> Result<Response<Body>, Error> {
|
||||
let (head, reqbody) = req.into_parts();
|
||||
let bodybytes = hyper::body::to_bytes(reqbody).await?;
|
||||
let query: ChannelSearchQueryV1 = serde_json::from_slice(&bodybytes)?;
|
||||
match head.headers.get("accept") {
|
||||
Some(v) => {
|
||||
if v == APP_JSON {
|
||||
// Transform the ChannelSearchQueryV1 to ChannelSearchQuery
|
||||
let query = ChannelSearchQuery {
|
||||
name_regex: query.regex.map_or(String::new(), |k| k),
|
||||
source_regex: query.source_regex.map_or(String::new(), |k| k),
|
||||
description_regex: query.description_regex.map_or(String::new(), |k| k),
|
||||
};
|
||||
let urls = proxy_config
|
||||
.search_hosts
|
||||
.iter()
|
||||
.map(|sh| match Url::parse(&format!("{}/api/4/search/channel", sh)) {
|
||||
Ok(mut url) => {
|
||||
query.append_to_url(&mut url);
|
||||
Ok(url)
|
||||
}
|
||||
Err(e) => Err(Error::with_msg(format!("parse error for: {:?} {:?}", sh, e))),
|
||||
})
|
||||
.fold_ok(vec![], |mut a, x| {
|
||||
a.push(x);
|
||||
a
|
||||
})?;
|
||||
let tags: Vec<_> = urls.iter().map(|k| k.to_string()).collect();
|
||||
let nt = |res| {
|
||||
let fut = async {
|
||||
let body = hyper::body::to_bytes(res).await?;
|
||||
let res: ChannelSearchResult = match serde_json::from_slice(&body) {
|
||||
Ok(k) => k,
|
||||
Err(_) => ChannelSearchResult { channels: vec![] },
|
||||
};
|
||||
Ok(res)
|
||||
};
|
||||
Box::pin(fut) as Pin<Box<dyn Future<Output = _> + Send>>
|
||||
};
|
||||
let ft = |all: Vec<SubRes<ChannelSearchResult>>| {
|
||||
let mut res = ChannelConfigsResponseV1(vec![]);
|
||||
for j in all {
|
||||
for k in j.val.channels {
|
||||
let mut found = false;
|
||||
let mut i2 = 0;
|
||||
for i1 in 0..res.0.len() {
|
||||
if res.0[i1].backend == k.backend {
|
||||
found = true;
|
||||
i2 = i1;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
let u = ChannelBackendConfigsV1 {
|
||||
backend: k.backend.clone(),
|
||||
channels: vec![],
|
||||
error: None,
|
||||
};
|
||||
res.0.push(u);
|
||||
i2 = res.0.len() - 1;
|
||||
}
|
||||
{
|
||||
let shape = if k.shape.len() == 0 { None } else { Some(k.shape) };
|
||||
let unit = if k.unit.len() == 0 { None } else { Some(k.unit) };
|
||||
let description = if k.description.len() == 0 {
|
||||
None
|
||||
} else {
|
||||
Some(k.description)
|
||||
};
|
||||
let t = ChannelConfigV1 {
|
||||
backend: k.backend,
|
||||
name: k.name,
|
||||
source: k.source,
|
||||
description,
|
||||
ty: k.ty,
|
||||
shape,
|
||||
unit,
|
||||
};
|
||||
res.0[i2].channels.push(t);
|
||||
}
|
||||
}
|
||||
}
|
||||
let res = response(StatusCode::OK)
|
||||
.header(http::header::CONTENT_TYPE, APP_JSON)
|
||||
.body(Body::from(serde_json::to_string(&res)?))?;
|
||||
Ok(res)
|
||||
};
|
||||
let ret =
|
||||
gather_get_json_generic(http::Method::GET, urls, None, tags, nt, ft, Duration::from_millis(3000))
|
||||
.await?;
|
||||
Ok(ret)
|
||||
} else {
|
||||
Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?)
|
||||
}
|
||||
}
|
||||
None => Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?),
|
||||
}
|
||||
let res2 = ChannelSearchResultV1(extr(res));
|
||||
let body = serde_json::to_string(&res2.0)?;
|
||||
let res = response(StatusCode::OK).body(body.into())?;
|
||||
Ok(res)
|
||||
}
|
||||
|
||||
type TT0 = (ProxyBackend, http::response::Parts, hyper::body::Bytes);
|
||||
@@ -121,6 +280,8 @@ type TT4 = Result<TT3, tokio::time::error::Elapsed>;
|
||||
type TT7 = Pin<Box<dyn Future<Output = TT4> + Send>>;
|
||||
type TT8 = (String, TT7);
|
||||
|
||||
// TODO try to get rid of this.
|
||||
|
||||
fn subreq(
|
||||
backends_req: &[&str],
|
||||
endp: &str,
|
||||
|
||||
@@ -4,7 +4,7 @@ use futures_util::{select, FutureExt};
|
||||
use http::{Method, StatusCode};
|
||||
use hyper::{Body, Client, Request, Response};
|
||||
use hyper_tls::HttpsConnector;
|
||||
use netpod::{Node, NodeConfigCached};
|
||||
use netpod::{Node, NodeConfigCached, APP_JSON};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json::Value as JsonValue;
|
||||
use std::future::Future;
|
||||
@@ -162,9 +162,16 @@ pub async fn gather_get_json(req: Request<Body>, node_config: &NodeConfigCached)
|
||||
Ok(res)
|
||||
}
|
||||
|
||||
pub struct SubRes<T> {
|
||||
pub tag: String,
|
||||
pub val: T,
|
||||
}
|
||||
|
||||
pub async fn gather_get_json_generic<SM, NT, FT>(
|
||||
method: http::Method,
|
||||
urls: Vec<Url>,
|
||||
bodies: Option<Vec<Body>>,
|
||||
tags: Vec<String>,
|
||||
nt: NT,
|
||||
ft: FT,
|
||||
timeout: Duration,
|
||||
@@ -172,23 +179,30 @@ pub async fn gather_get_json_generic<SM, NT, FT>(
|
||||
where
|
||||
SM: Send + 'static,
|
||||
NT: Fn(Response<Body>) -> Pin<Box<dyn Future<Output = Result<SM, Error>> + Send>> + Send + Sync + Copy + 'static,
|
||||
FT: Fn(Vec<SM>) -> Result<Response<Body>, Error>,
|
||||
FT: Fn(Vec<SubRes<SM>>) -> Result<Response<Body>, Error>,
|
||||
{
|
||||
assert!(urls.len() == tags.len());
|
||||
let bodies: Vec<_> = match bodies {
|
||||
None => (0..urls.len()).into_iter().map(|_| Body::empty()).collect(),
|
||||
Some(bodies) => bodies,
|
||||
};
|
||||
let spawned: Vec<_> = urls
|
||||
.into_iter()
|
||||
.map(move |url| {
|
||||
.zip(bodies.into_iter())
|
||||
.zip(tags.into_iter())
|
||||
.map(move |((url, body), tag)| {
|
||||
let url_str = url.as_str();
|
||||
let is_tls = if url_str.starts_with("https://") { true } else { false };
|
||||
let req = Request::builder().method(method.clone()).uri(url_str);
|
||||
//let req = req.header("x-log-from-node-name", format!("{}", node_config.node_config.name));
|
||||
let req = req.header(http::header::ACCEPT, "application/json");
|
||||
let req = req.body(Body::empty());
|
||||
let req = req.header(http::header::ACCEPT, APP_JSON);
|
||||
let req = req.body(body);
|
||||
let task = tokio::spawn(async move {
|
||||
select! {
|
||||
_ = sleep(timeout).fuse() => {
|
||||
Err(Error::with_msg("timeout"))
|
||||
}
|
||||
res = {
|
||||
_ = sleep(timeout).fuse() => {
|
||||
Err(Error::with_msg("timeout"))
|
||||
}
|
||||
res = {
|
||||
if is_tls {
|
||||
let https = HttpsConnector::new();
|
||||
let client = Client::builder().build::<_, hyper::Body>(https);
|
||||
@@ -198,7 +212,13 @@ where
|
||||
let client = Client::new();
|
||||
client.request(req?).fuse()
|
||||
}
|
||||
} => Ok(nt(res?).await?)
|
||||
} => {
|
||||
let ret = SubRes {
|
||||
tag: tag,
|
||||
val:nt(res?).await?,
|
||||
};
|
||||
Ok(ret)
|
||||
}
|
||||
}
|
||||
});
|
||||
(url, task)
|
||||
@@ -228,6 +248,8 @@ mod test {
|
||||
let fut = gather_get_json_generic(
|
||||
hyper::Method::GET,
|
||||
vec![],
|
||||
None,
|
||||
vec![],
|
||||
|_res| {
|
||||
let fut = async { Ok(()) };
|
||||
Box::pin(fut)
|
||||
|
||||
@@ -261,6 +261,7 @@ where
|
||||
.status(status)
|
||||
.header("Access-Control-Allow-Origin", "*")
|
||||
.header("Access-Control-Allow-Headers", "*")
|
||||
.header("x-proxy-log-mark", proxy_mark())
|
||||
}
|
||||
|
||||
struct BodyStreamWrap(netpod::BodyStream);
|
||||
|
||||
@@ -1,17 +1,20 @@
|
||||
use crate::api1::{channels_config_v1, channels_list_v1, gather_json_2_v1, gather_json_v1, proxy_distribute_v1};
|
||||
use crate::gather::gather_get_json_generic;
|
||||
use crate::{proxy_mark, response, Cont};
|
||||
use crate::api1::{
|
||||
channel_search_configs_v1, channel_search_list_v1, channels_config_v1, gather_json_2_v1, gather_json_v1,
|
||||
proxy_distribute_v1,
|
||||
};
|
||||
use crate::gather::{gather_get_json_generic, SubRes};
|
||||
use crate::{response, Cont};
|
||||
use disk::binned::query::BinnedQuery;
|
||||
use disk::events::PlainEventsJsonQuery;
|
||||
use err::Error;
|
||||
use http::{HeaderValue, StatusCode};
|
||||
use http::StatusCode;
|
||||
use hyper::service::{make_service_fn, service_fn};
|
||||
use hyper::{Body, Request, Response, Server};
|
||||
use itertools::Itertools;
|
||||
use netpod::log::*;
|
||||
use netpod::{
|
||||
AppendToUrl, ChannelConfigQuery, ChannelSearchQuery, ChannelSearchResult, FromUrl, HasBackend, HasTimeout,
|
||||
ProxyConfig,
|
||||
ProxyConfig, APP_JSON,
|
||||
};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json::Value as JsonValue;
|
||||
@@ -55,9 +58,11 @@ async fn proxy_http_service_try(req: Request<Body>, proxy_config: &ProxyConfig)
|
||||
let uri = req.uri().clone();
|
||||
let path = uri.path();
|
||||
if path == "/api/1/channels" {
|
||||
Ok(channels_list_v1(req, proxy_config).await?)
|
||||
Ok(channel_search_list_v1(req, proxy_config).await?)
|
||||
} else if path == "/api/1/channels/config" {
|
||||
Ok(channels_config_v1(req, proxy_config).await?)
|
||||
} else if path == "/api/1/channelsA/config" {
|
||||
Ok(channel_search_configs_v1(req, proxy_config).await?)
|
||||
} else if path == "/api/1/stats/version" {
|
||||
Ok(gather_json_v1(req, "/stats/version").await?)
|
||||
} else if path == "/api/1/stats/" {
|
||||
@@ -118,6 +123,7 @@ pub async fn channel_search(req: Request<Body>, proxy_config: &ProxyConfig) -> R
|
||||
a.push(x);
|
||||
a
|
||||
})?;
|
||||
let tags = urls.iter().map(|k| k.to_string()).collect();
|
||||
let nt = |res| {
|
||||
let fut = async {
|
||||
let body = hyper::body::to_bytes(res).await?;
|
||||
@@ -130,10 +136,10 @@ pub async fn channel_search(req: Request<Body>, proxy_config: &ProxyConfig) -> R
|
||||
};
|
||||
Box::pin(fut) as Pin<Box<dyn Future<Output = _> + Send>>
|
||||
};
|
||||
let ft = |all: Vec<ChannelSearchResult>| {
|
||||
let ft = |all: Vec<SubRes<ChannelSearchResult>>| {
|
||||
let mut res = vec![];
|
||||
for j in all {
|
||||
for k in j.channels {
|
||||
for k in j.val.channels {
|
||||
res.push(k);
|
||||
}
|
||||
}
|
||||
@@ -143,10 +149,9 @@ pub async fn channel_search(req: Request<Body>, proxy_config: &ProxyConfig) -> R
|
||||
.body(Body::from(serde_json::to_string(&res)?))?;
|
||||
Ok(res)
|
||||
};
|
||||
let mut ret =
|
||||
gather_get_json_generic(http::Method::GET, urls, nt, ft, Duration::from_millis(3000)).await?;
|
||||
ret.headers_mut()
|
||||
.append("x-proxy-log-mark", HeaderValue::from_str(proxy_mark())?);
|
||||
let ret =
|
||||
gather_get_json_generic(http::Method::GET, urls, None, tags, nt, ft, Duration::from_millis(3000))
|
||||
.await?;
|
||||
Ok(ret)
|
||||
} else {
|
||||
Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?)
|
||||
@@ -183,6 +188,7 @@ where
|
||||
a.push(x);
|
||||
a
|
||||
})?;
|
||||
let tags: Vec<_> = urls.iter().map(|k| k.to_string()).collect();
|
||||
let nt = |res| {
|
||||
let fut = async {
|
||||
let body = hyper::body::to_bytes(res).await?;
|
||||
@@ -193,19 +199,20 @@ where
|
||||
};
|
||||
Box::pin(fut) as Pin<Box<dyn Future<Output = _> + Send>>
|
||||
};
|
||||
let ft = |all: Vec<JsonValue>| {
|
||||
let res = match all.first() {
|
||||
Some(item) => Ok(item),
|
||||
None => Err(Error::with_msg("no response from upstream")),
|
||||
}?;
|
||||
let res = response(StatusCode::OK)
|
||||
.header(http::header::CONTENT_TYPE, "application/json")
|
||||
.body(Body::from(serde_json::to_string(res)?))?;
|
||||
Ok(res)
|
||||
let ft = |mut all: Vec<SubRes<JsonValue>>| {
|
||||
if all.len() > 0 {
|
||||
all.truncate(1);
|
||||
let z = all.pop().unwrap();
|
||||
let res = z.val;
|
||||
let res = response(StatusCode::OK)
|
||||
.header(http::header::CONTENT_TYPE, APP_JSON)
|
||||
.body(Body::from(serde_json::to_string(&res)?))?;
|
||||
return Ok(res);
|
||||
} else {
|
||||
return Err(Error::with_msg("no response from upstream"));
|
||||
}
|
||||
};
|
||||
let mut ret = gather_get_json_generic(http::Method::GET, urls, nt, ft, query.timeout()).await?;
|
||||
ret.headers_mut()
|
||||
.append("x-proxy-log-mark", HeaderValue::from_str(proxy_mark())?);
|
||||
let ret = gather_get_json_generic(http::Method::GET, urls, None, tags, nt, ft, query.timeout()).await?;
|
||||
Ok(ret)
|
||||
} else {
|
||||
Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?)
|
||||
|
||||
@@ -19,6 +19,8 @@ use url::Url;
|
||||
pub mod status;
|
||||
pub mod streamext;
|
||||
|
||||
pub const APP_JSON: &'static str = "application/json";
|
||||
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
pub struct AggQuerySingleChannel {
|
||||
pub channel_config: ChannelConfig,
|
||||
|
||||
Reference in New Issue
Block a user