Simplify and fix channel search query parse

This commit is contained in:
Dominik Werder
2021-06-22 16:57:51 +02:00
parent 5219b56488
commit d0318a17c2
6 changed files with 33 additions and 254 deletions

View File

@@ -4,22 +4,14 @@ use err::Error;
use http::{Method, StatusCode};
use hyper::{Body, Client, Request, Response};
use itertools::Itertools;
use netpod::log::*;
use netpod::{ChannelSearchQuery, ChannelSearchResult, ProxyBackend, ProxyConfig, APP_JSON};
use netpod::{ChannelSearchQuery, ChannelSearchResult, ProxyConfig, APP_JSON};
use serde::{Deserialize, Serialize};
use serde_json::Value as JsonValue;
use std::future::Future;
use std::pin::Pin;
use std::time::Duration;
use tokio::time::timeout_at;
use url::Url;
fn get_live_hosts() -> &'static [(&'static str, u16)] {
// TODO take from config.
err::todo();
&[]
}
pub trait BackendAware {
fn backend(&self) -> &str;
}
@@ -272,101 +264,6 @@ pub async fn channel_search_configs_v1(
}
}
type TT0 = (ProxyBackend, http::response::Parts, hyper::body::Bytes);
type TT1 = Result<TT0, Error>;
type TT2 = tokio::task::JoinHandle<TT1>;
type TT3 = Result<TT1, tokio::task::JoinError>;
type TT4 = Result<TT3, tokio::time::error::Elapsed>;
type TT7 = Pin<Box<dyn Future<Output = TT4> + Send>>;
type TT8 = (String, TT7);
// TODO try to get rid of this.
fn subreq(
backends_req: &[&str],
endp: &str,
subq_maker: &dyn Fn(&str) -> JsonValue,
proxy_config: &ProxyConfig,
) -> Result<Vec<TT8>, Error> {
let backends = proxy_config.backends.clone();
let mut spawned = vec![];
for back in &backends {
if backends_req.contains(&back.name.as_str()) {
let back = back.clone();
let q = subq_maker(&back.name);
let endp = match back.name.as_str() {
"timeout" => "channels_timeout",
"error500" => "channels_error500",
_ => endp,
};
let uri = format!("http://{}:{}{}/{}", back.host, back.port, "/api/1", endp);
let req = Request::builder()
.method(Method::POST)
.uri(uri)
.header("content-type", "application/json")
.body(Body::from(serde_json::to_string(&q)?))?;
let jh: TT2 = tokio::spawn({
let back = back.clone();
async move {
let res = Client::new().request(req).await?;
let (pre, body) = res.into_parts();
//info!("Answer from {} status {}", back.1, pre.status);
let body_all = hyper::body::to_bytes(body).await?;
//info!("Got {} bytes from {}", body_all.len(), back.1);
Ok::<_, Error>((back, pre, body_all))
}
});
let jh = tokio::time::timeout(std::time::Duration::from_millis(5000), jh);
let bx: TT7 = Box::pin(jh);
spawned.push((back.name.clone(), bx));
}
}
Ok(spawned)
}
//fn extr<'a, T: BackendAware + FromErrorCode + Deserialize<'a>>(results: Vec<(&str, TT4)>) -> Vec<T> {
fn extr<T: BackendAware + FromErrorCode + for<'a> Deserialize<'a>>(results: Vec<(String, TT4)>) -> Vec<T> {
let mut ret = vec![];
for (backend, r) in results {
if let Ok(r20) = r {
if let Ok(r30) = r20 {
if let Ok(r2) = r30 {
if r2.1.status == 200 {
let inp_res: Result<Vec<T>, _> = serde_json::from_slice(&r2.2);
if let Ok(inp) = inp_res {
if inp.len() > 1 {
error!("more than one result item from {:?}", r2.0);
} else {
for inp2 in inp {
if inp2.backend() == r2.0.name {
ret.push(inp2);
}
}
}
} else {
error!("malformed answer from {:?}", r2.0);
ret.push(T::from_error_code(backend.as_str(), ErrorCode::Error));
}
} else {
error!("bad answer from {:?}", r2.0);
ret.push(T::from_error_code(backend.as_str(), ErrorCode::Error));
}
} else {
error!("bad answer from {:?}", r30);
ret.push(T::from_error_code(backend.as_str(), ErrorCode::Error));
}
} else {
error!("subrequest join handle error {:?}", r20);
ret.push(T::from_error_code(backend.as_str(), ErrorCode::Error));
}
} else {
error!("subrequest timeout {:?}", r);
ret.push(T::from_error_code(backend.as_str(), ErrorCode::Timeout));
}
}
ret
}
#[derive(Debug, PartialEq, Serialize, Deserialize)]
pub struct ChannelConfigV1 {
pub backend: String,
@@ -423,109 +320,7 @@ impl FromErrorCode for ChannelBackendConfigsV1 {
}
}
pub async fn channels_config_v1(req: Request<Body>, proxy_config: &ProxyConfig) -> Result<Response<Body>, Error> {
let reqbody = req.into_body();
let bodyslice = hyper::body::to_bytes(reqbody).await?;
let query: ChannelConfigsQueryV1 = serde_json::from_slice(&bodyslice)?;
let subq_maker = |backend: &str| -> JsonValue {
serde_json::to_value(ChannelConfigsQueryV1 {
regex: query.regex.clone(),
source_regex: query.source_regex.clone(),
description_regex: query.description_regex.clone(),
backends: vec![backend.into()],
ordering: query.ordering.clone(),
})
.unwrap()
};
let back2: Vec<_> = query.backends.iter().map(|x| x.as_str()).collect();
let spawned = subreq(&back2[..], "channels/config", &subq_maker, proxy_config)?;
let mut res = vec![];
for (backend, s) in spawned {
res.push((backend, s.await));
}
let res2 = ChannelConfigsResponseV1(extr(res));
let body = serde_json::to_string(&res2.0)?;
let res = response(StatusCode::OK).body(body.into())?;
Ok(res)
}
pub async fn gather_json_v1(req_m: Request<Body>, path: &str) -> Result<Response<Body>, Error> {
// TODO can this be removed?
err::todo();
let mut spawned = vec![];
let (req_h, _) = req_m.into_parts();
for host in get_live_hosts() {
for inst in &["00", "01", "02"] {
let req_hh = req_h.headers.clone();
let host_filter = if req_hh.contains_key("host_filter") {
Some(req_hh.get("host_filter").unwrap().to_str().unwrap())
} else {
None
};
let path = path.to_string();
let task = if host_filter.is_none() || host_filter.as_ref().unwrap() == &host.0 {
let task = (
host.clone(),
inst.to_string(),
tokio::spawn(async move {
let uri = format!("http://{}:{}{}", host.0, host.1, path);
let req = Request::builder().method(Method::GET).uri(uri);
let req = if false && req_hh.contains_key("retrieval_instance") {
req.header("retrieval_instance", req_hh.get("retrieval_instance").unwrap())
} else {
req
};
let req = req.header("retrieval_instance", *inst);
//.header("content-type", "application/json")
//.body(Body::from(serde_json::to_string(&q)?))?;
let req = req.body(Body::empty())?;
let deadline = tokio::time::Instant::now() + Duration::from_millis(1000);
let fut = async {
let res = Client::new().request(req).await?;
let (pre, body) = res.into_parts();
if pre.status != StatusCode::OK {
Err(Error::with_msg(format!("request failed, got {}", pre.status)))
} else {
// aggregate returns a hyper Buf which is not Read
let body_all = hyper::body::to_bytes(body).await?;
let val = match serde_json::from_slice(&body_all) {
Ok(k) => k,
Err(_e) => JsonValue::String(String::from_utf8(body_all.to_vec())?),
};
Ok(val)
}
};
let ret = timeout_at(deadline, fut).await??;
Ok::<_, Error>(ret)
}),
);
Some(task)
} else {
None
};
if let Some(task) = task {
spawned.push(task);
}
}
}
use serde_json::Map;
let mut m = Map::new();
for h in spawned {
let res = match h.2.await {
Ok(k) => match k {
Ok(k) => k,
Err(_e) => JsonValue::String(format!("ERROR")),
},
Err(_e) => JsonValue::String(format!("ERROR")),
};
m.insert(format!("{}:{}-{}", h.0 .0, h.0 .1, h.1), res);
}
let res = response(200)
.header("Content-Type", "application/json")
.body(serde_json::to_string(&m)?.into())?;
Ok(res)
}
// TODO replace usage of this by gather-generic
pub async fn gather_json_2_v1(
req: Request<Body>,
pathpre: &str,
@@ -546,7 +341,7 @@ pub async fn gather_json_2_v1(
} else {
req
};
let req = req.header(http::header::ACCEPT, "application/json");
let req = req.header(http::header::ACCEPT, APP_JSON);
//.body(Body::from(serde_json::to_string(&q)?))?;
let req = req.body(Body::empty());
let task = tokio::spawn(async move {
@@ -581,7 +376,7 @@ pub async fn gather_json_2_v1(
a.push(Hres { gh: tr.0, res });
}
let res = response(StatusCode::OK)
.header(http::header::CONTENT_TYPE, "application/json")
.header(http::header::CONTENT_TYPE, APP_JSON)
.body(serde_json::to_string(&Jres { hosts: a })?.into())?;
Ok(res)
}