Move /api/1/ related proxy functionality

This commit is contained in:
Dominik Werder
2021-06-17 18:32:05 +02:00
parent 7077d6b09a
commit 0d73251db8
16 changed files with 848 additions and 142 deletions

390
httpret/src/api1.rs Normal file
View File

@@ -0,0 +1,390 @@
use crate::response;
use err::Error;
use http::{Method, StatusCode};
use hyper::{Body, Client, Request, Response};
use netpod::log::*;
use serde::{Deserialize, Serialize};
use serde_json::Value as JsonValue;
use std::future::Future;
use std::pin::Pin;
use std::time::Duration;
use tokio::time::timeout_at;
fn get_backends() -> [(&'static str, &'static str, u16); 6] {
// TODO take from config.
err::todo();
[
("gls-archive", "gls-data-api.psi.ch", 8371),
("hipa-archive", "hipa-data-api.psi.ch", 8082),
("sf-databuffer", "sf-daqbuf-33.psi.ch", 8371),
("sf-imagebuffer", "sf-daq-5.psi.ch", 8371),
("timeout", "sf-daqbuf-33.psi.ch", 8371),
("error500", "sf-daqbuf-33.psi.ch", 8371),
]
}
fn get_live_hosts() -> &'static [(&'static str, u16)] {
// TODO take from config.
err::todo();
&[
("sf-daqbuf-21", 8371),
("sf-daqbuf-22", 8371),
("sf-daqbuf-23", 8371),
("sf-daqbuf-24", 8371),
("sf-daqbuf-25", 8371),
("sf-daqbuf-26", 8371),
("sf-daqbuf-27", 8371),
("sf-daqbuf-28", 8371),
("sf-daqbuf-29", 8371),
("sf-daqbuf-30", 8371),
("sf-daqbuf-31", 8371),
("sf-daqbuf-32", 8371),
("sf-daqbuf-33", 8371),
("sf-daq-5", 8371),
("sf-daq-6", 8371),
("hipa-data-api", 8082),
("gls-data-api", 8371),
]
}
pub trait BackendAware {
fn backend(&self) -> &str;
}
pub trait FromErrorCode {
fn from_error_code(backend: &str, code: ErrorCode) -> Self;
}
#[derive(Debug, PartialEq, Serialize, Deserialize)]
pub enum ErrorCode {
Error,
Timeout,
}
#[derive(Debug, PartialEq, Serialize, Deserialize)]
pub struct ErrorDescription {
code: ErrorCode,
}
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub enum Ordering {
#[serde(rename = "none")]
NONE,
#[serde(rename = "asc")]
ASC,
#[serde(rename = "desc")]
DESC,
}
#[derive(Debug, PartialEq, Serialize, Deserialize)]
pub struct ChannelSearchQueryV1 {
#[serde(skip_serializing_if = "Option::is_none")]
pub regex: Option<String>,
#[serde(rename = "sourceRegex", skip_serializing_if = "Option::is_none")]
pub source_regex: Option<String>,
#[serde(rename = "descriptionRegex", skip_serializing_if = "Option::is_none")]
pub description_regex: Option<String>,
#[serde(skip_serializing_if = "Vec::is_empty")]
pub backends: Vec<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub ordering: Option<Ordering>,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct ChannelSearchResultItemV1 {
pub backend: String,
pub channels: Vec<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub error: Option<ErrorDescription>,
}
impl BackendAware for ChannelSearchResultItemV1 {
fn backend(&self) -> &str {
&self.backend
}
}
impl FromErrorCode for ChannelSearchResultItemV1 {
fn from_error_code(backend: &str, code: ErrorCode) -> Self {
Self {
backend: backend.into(),
channels: vec![],
error: Some(ErrorDescription { code }),
}
}
}
#[derive(Debug, Serialize, Deserialize)]
pub struct ChannelSearchResultV1(pub Vec<ChannelSearchResultItemV1>);
pub async fn channels_list_v1(req: Request<Body>) -> Result<Response<Body>, Error> {
let reqbody = req.into_body();
let bodyslice = hyper::body::to_bytes(reqbody).await?;
let query: ChannelSearchQueryV1 = serde_json::from_slice(&bodyslice)?;
let subq_maker = |backend: &str| -> JsonValue {
serde_json::to_value(ChannelSearchQueryV1 {
regex: query.regex.clone(),
source_regex: query.source_regex.clone(),
description_regex: query.description_regex.clone(),
backends: vec![backend.into()],
ordering: query.ordering.clone(),
})
.unwrap()
};
let back2: Vec<_> = query.backends.iter().map(|x| x.as_str()).collect();
let spawned = subreq(&back2[..], "channels", &subq_maker)?;
let mut res = vec![];
for (backend, s) in spawned {
res.push((backend, s.await));
}
let res2 = ChannelSearchResultV1(extr(res));
let body = serde_json::to_string(&res2.0)?;
let res = response(StatusCode::OK).body(body.into())?;
Ok(res)
}
type TT0 = (
(&'static str, &'static str, u16),
http::response::Parts,
hyper::body::Bytes,
);
type TT1 = Result<TT0, Error>;
type TT2 = tokio::task::JoinHandle<TT1>;
type TT3 = Result<TT1, tokio::task::JoinError>;
type TT4 = Result<TT3, tokio::time::error::Elapsed>;
type TT7 = Pin<Box<dyn Future<Output = TT4> + Send>>;
type TT8 = (&'static str, TT7);
fn subreq(backends_req: &[&str], endp: &str, subq_maker: &dyn Fn(&str) -> JsonValue) -> Result<Vec<TT8>, Error> {
let backends = get_backends();
let mut spawned = vec![];
for back in &backends {
if backends_req.contains(&back.0) {
let back = back.clone();
let q = subq_maker(back.0);
let endp = match back.0 {
"timeout" => "channels_timeout",
"error500" => "channels_error500",
_ => endp,
};
let uri = format!("http://{}:{}{}/{}", back.1, back.2, "/api/1", endp);
let req = Request::builder()
.method(Method::POST)
.uri(uri)
.header("content-type", "application/json")
.body(Body::from(serde_json::to_string(&q)?))?;
let jh: TT2 = tokio::spawn(async move {
let res = Client::new().request(req).await?;
let (pre, body) = res.into_parts();
//info!("Answer from {} status {}", back.1, pre.status);
let body_all = hyper::body::to_bytes(body).await?;
//info!("Got {} bytes from {}", body_all.len(), back.1);
Ok::<_, Error>((back, pre, body_all))
});
let jh = tokio::time::timeout(std::time::Duration::from_millis(5000), jh);
let bx: TT7 = Box::pin(jh);
spawned.push((back.0, bx));
}
}
Ok(spawned)
}
//fn extr<'a, T: BackendAware + FromErrorCode + Deserialize<'a>>(results: Vec<(&str, TT4)>) -> Vec<T> {
fn extr<T: BackendAware + FromErrorCode + for<'a> Deserialize<'a>>(results: Vec<(&str, TT4)>) -> Vec<T> {
let mut ret = vec![];
for (backend, r) in results {
if let Ok(r20) = r {
if let Ok(r30) = r20 {
if let Ok(r2) = r30 {
if r2.1.status == 200 {
let inp_res: Result<Vec<T>, _> = serde_json::from_slice(&r2.2);
if let Ok(inp) = inp_res {
if inp.len() > 1 {
error!("more than one result item from {:?}", r2.0);
} else {
for inp2 in inp {
if inp2.backend() == r2.0 .0 {
ret.push(inp2);
}
}
}
} else {
error!("malformed answer from {:?}", r2.0);
ret.push(T::from_error_code(backend, ErrorCode::Error));
}
} else {
error!("bad answer from {:?}", r2.0);
ret.push(T::from_error_code(backend, ErrorCode::Error));
}
} else {
error!("bad answer from {:?}", r30);
ret.push(T::from_error_code(backend, ErrorCode::Error));
}
} else {
error!("subrequest join handle error {:?}", r20);
ret.push(T::from_error_code(backend, ErrorCode::Error));
}
} else {
error!("subrequest timeout {:?}", r);
ret.push(T::from_error_code(backend, ErrorCode::Timeout));
}
}
ret
}
#[derive(Debug, PartialEq, Serialize, Deserialize)]
pub struct ChannelConfigV1 {
pub backend: String,
pub name: String,
pub source: String,
#[serde(rename = "type")]
pub ty: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub shape: Option<Vec<u32>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub unit: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub description: Option<String>,
}
#[derive(Debug, PartialEq, Serialize, Deserialize)]
pub struct ChannelConfigsQueryV1 {
#[serde(skip_serializing_if = "Option::is_none")]
pub regex: Option<String>,
#[serde(rename = "sourceRegex")]
pub source_regex: Option<String>,
#[serde(rename = "descriptionRegex")]
pub description_regex: Option<String>,
#[serde(skip_serializing_if = "Vec::is_empty", default)]
pub backends: Vec<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub ordering: Option<Ordering>,
}
#[derive(Debug, PartialEq, Serialize, Deserialize)]
pub struct ChannelBackendConfigsV1 {
pub backend: String,
pub channels: Vec<ChannelConfigV1>,
#[serde(skip_serializing_if = "Option::is_none")]
pub error: Option<ErrorDescription>,
}
#[derive(Debug, PartialEq, Serialize, Deserialize)]
pub struct ChannelConfigsResponseV1(pub Vec<ChannelBackendConfigsV1>);
impl BackendAware for ChannelBackendConfigsV1 {
fn backend(&self) -> &str {
&self.backend
}
}
impl FromErrorCode for ChannelBackendConfigsV1 {
fn from_error_code(backend: &str, code: ErrorCode) -> Self {
Self {
backend: backend.into(),
channels: vec![],
error: Some(ErrorDescription { code }),
}
}
}
pub async fn channels_config_v1(req: Request<Body>) -> Result<Response<Body>, Error> {
let reqbody = req.into_body();
let bodyslice = hyper::body::to_bytes(reqbody).await?;
let query: ChannelConfigsQueryV1 = serde_json::from_slice(&bodyslice)?;
let subq_maker = |backend: &str| -> JsonValue {
serde_json::to_value(ChannelConfigsQueryV1 {
regex: query.regex.clone(),
source_regex: query.source_regex.clone(),
description_regex: query.description_regex.clone(),
backends: vec![backend.into()],
ordering: query.ordering.clone(),
})
.unwrap()
};
let back2: Vec<_> = query.backends.iter().map(|x| x.as_str()).collect();
let spawned = subreq(&back2[..], "channels/config", &subq_maker)?;
let mut res = vec![];
for (backend, s) in spawned {
res.push((backend, s.await));
}
let res2 = ChannelConfigsResponseV1(extr(res));
let body = serde_json::to_string(&res2.0)?;
let res = response(StatusCode::OK).body(body.into())?;
Ok(res)
}
pub async fn gather_json_v1(req_m: Request<Body>, path: &str) -> Result<Response<Body>, Error> {
let mut spawned = vec![];
let (req_h, _) = req_m.into_parts();
for host in get_live_hosts() {
for inst in &["00", "01", "02"] {
let req_hh = req_h.headers.clone();
let host_filter = if req_hh.contains_key("host_filter") {
Some(req_hh.get("host_filter").unwrap().to_str().unwrap())
} else {
None
};
let path = path.to_string();
let task = if host_filter.is_none() || host_filter.as_ref().unwrap() == &host.0 {
let task = (
host.clone(),
inst.to_string(),
tokio::spawn(async move {
let uri = format!("http://{}:{}{}", host.0, host.1, path);
let req = Request::builder().method(Method::GET).uri(uri);
let req = if false && req_hh.contains_key("retrieval_instance") {
req.header("retrieval_instance", req_hh.get("retrieval_instance").unwrap())
} else {
req
};
let req = req.header("retrieval_instance", *inst);
//.header("content-type", "application/json")
//.body(Body::from(serde_json::to_string(&q)?))?;
let req = req.body(Body::empty())?;
let deadline = tokio::time::Instant::now() + Duration::from_millis(1000);
let fut = async {
let res = Client::new().request(req).await?;
let (pre, body) = res.into_parts();
if pre.status != StatusCode::OK {
Err(Error::with_msg(format!("request failed, got {}", pre.status)))
} else {
// aggregate returns a hyper Buf which is not Read
let body_all = hyper::body::to_bytes(body).await?;
let val = match serde_json::from_slice(&body_all) {
Ok(k) => k,
Err(_e) => JsonValue::String(String::from_utf8(body_all.to_vec())?),
};
Ok(val)
}
};
let ret = timeout_at(deadline, fut).await??;
Ok::<_, Error>(ret)
}),
);
Some(task)
} else {
None
};
if let Some(task) = task {
spawned.push(task);
}
}
}
use serde_json::Map;
let mut m = Map::new();
for h in spawned {
let res = match h.2.await {
Ok(k) => match k {
Ok(k) => k,
Err(_e) => JsonValue::String(format!("ERROR")),
},
Err(_e) => JsonValue::String(format!("ERROR")),
};
m.insert(format!("{}:{}-{}", h.0 .0, h.0 .1, h.1), res);
}
let res = response(200)
.header("Content-Type", "application/json")
.body(serde_json::to_string(&m)?.into())?;
Ok(res)
}

View File

@@ -1,10 +1,15 @@
use crate::response;
use err::Error;
use futures_util::{select, FutureExt};
use http::{Method, StatusCode};
use hyper::{Body, Client, Request, Response};
use netpod::{Node, NodeConfigCached};
use serde::{Deserialize, Serialize};
use serde_json::Value as JsonValue;
use std::future::Future;
use std::pin::Pin;
use std::time::Duration;
use tokio::time::sleep;
#[derive(Clone, Serialize, Deserialize)]
struct GatherFrom {
@@ -44,7 +49,7 @@ async fn process_answer(res: Response<Body>) -> Result<JsonValue, Error> {
}
}
pub async fn gather_json_from_hosts(req: Request<Body>, pathpre: &str) -> Result<Response<Body>, Error> {
pub async fn unused_gather_json_from_hosts(req: Request<Body>, pathpre: &str) -> Result<Response<Body>, Error> {
let (part_head, part_body) = req.into_parts();
let bodyslice = hyper::body::to_bytes(part_body).await?;
let gather_from: GatherFrom = serde_json::from_slice(&bodyslice)?;
@@ -61,10 +66,6 @@ pub async fn gather_json_from_hosts(req: Request<Body>, pathpre: &str) -> Result
};
let req = req.header(http::header::ACCEPT, "application/json");
let req = req.body(Body::empty());
use futures_util::select;
use futures_util::FutureExt;
use std::time::Duration;
use tokio::time::sleep;
let task = tokio::spawn(async move {
select! {
_ = sleep(Duration::from_millis(1500)).fuse() => {
@@ -114,13 +115,9 @@ pub async fn gather_get_json(req: Request<Body>, node_config: &NodeConfigCached)
.map(|node| {
let uri = format!("http://{}:{}/api/4/{}", node.host, node.port, pathsuf);
let req = Request::builder().method(Method::GET).uri(uri);
let req = req.header("x-node-from-name", format!("{}", node_config.node_config.name));
let req = req.header("x-log-from-node-name", format!("{}", node_config.node_config.name));
let req = req.header(http::header::ACCEPT, "application/json");
let req = req.body(Body::empty());
use futures_util::select;
use futures_util::FutureExt;
use std::time::Duration;
use tokio::time::sleep;
let task = tokio::spawn(async move {
select! {
_ = sleep(Duration::from_millis(1500)).fuse() => {
@@ -162,3 +159,77 @@ pub async fn gather_get_json(req: Request<Body>, node_config: &NodeConfigCached)
.body(serde_json::to_string(&Jres { hosts: a })?.into())?;
Ok(res)
}
pub async fn gather_get_json_generic<SM, NT, FT>(
method: http::Method,
uri: String,
schemehostports: Vec<String>,
nt: NT,
ft: FT,
timeout: Duration,
) -> Result<Response<Body>, Error>
where
SM: Send + 'static,
NT: Fn(Response<Body>) -> Pin<Box<dyn Future<Output = Result<SM, Error>> + Send>> + Send + Sync + Copy + 'static,
FT: Fn(Vec<SM>) -> Result<Response<Body>, Error>,
{
let spawned: Vec<_> = schemehostports
.iter()
.map(move |schemehostport| {
let uri = format!("{}{}", schemehostport, uri.clone());
let req = Request::builder().method(method.clone()).uri(uri);
//let req = req.header("x-log-from-node-name", format!("{}", node_config.node_config.name));
let req = req.header(http::header::ACCEPT, "application/json");
let req = req.body(Body::empty());
let task = tokio::spawn(async move {
select! {
_ = sleep(timeout).fuse() => {
Err(Error::with_msg("timeout"))
}
res = Client::new().request(req?).fuse() => Ok(nt(res?).await?)
}
});
(schemehostport.clone(), task)
})
.collect();
let mut a = vec![];
for (_schemehostport, jh) in spawned {
let res = match jh.await {
Ok(k) => match k {
Ok(k) => k,
Err(e) => return Err(e),
},
Err(e) => return Err(e.into()),
};
a.push(res);
}
let a = a;
ft(a)
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn try_search() {
let schemehostports = ["http://sf-daqbuf-22:8371".into()];
let fut = gather_get_json_generic(
hyper::Method::GET,
format!("/api/4/search/channel"),
schemehostports.to_vec(),
|_res| {
let fut = async { Ok(()) };
Box::pin(fut)
},
|_all| {
let res = response(StatusCode::OK)
.header(http::header::CONTENT_TYPE, "application/json")
.body(serde_json::to_string(&42)?.into())?;
Ok(res)
},
Duration::from_millis(4000),
);
let _ = fut;
}
}

View File

@@ -1,3 +1,4 @@
use crate::api1::{channels_config_v1, channels_list_v1, gather_json_v1};
use crate::gather::gather_get_json;
use bytes::Bytes;
use disk::binned::prebinned::pre_binned_bytes_for_http;
@@ -13,7 +14,7 @@ use hyper::service::{make_service_fn, service_fn};
use hyper::{server::Server, Body, Request, Response};
use net::SocketAddr;
use netpod::log::*;
use netpod::{AggKind, Channel, NodeConfigCached};
use netpod::{AggKind, Channel, NodeConfigCached, ProxyConfig};
use panic::{AssertUnwindSafe, UnwindSafe};
use pin::Pin;
use serde::{Deserialize, Serialize};
@@ -22,11 +23,12 @@ use task::{Context, Poll};
use tracing::field::Empty;
use tracing::Instrument;
pub mod api1;
pub mod gather;
pub mod proxy;
pub mod search;
pub async fn host(node_config: NodeConfigCached) -> Result<(), Error> {
let node_config = node_config.clone();
let rawjh = taskrun::spawn(events_service(node_config.clone()));
use std::str::FromStr;
let addr = SocketAddr::from_str(&format!("{}:{}", node_config.node.listen, node_config.node.port))?;
@@ -52,7 +54,7 @@ async fn http_service(req: Request<Body>, node_config: NodeConfigCached) -> Resu
match http_service_try(req, &node_config).await {
Ok(k) => Ok(k),
Err(e) => {
error!("data_api_proxy sees error: {:?}", e);
error!("daqbuffer node http_service sees error: {:?}", e);
Err(e)
}
}
@@ -141,7 +143,6 @@ async fn http_service_try(req: Request<Body>, node_config: &NodeConfigCached) ->
}
} else if path == "/api/4/search/channel" {
if req.method() == Method::GET {
// TODO multi-facility search
Ok(search::channel_search(req, &node_config).await?)
} else {
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?)
@@ -237,6 +238,14 @@ async fn http_service_try(req: Request<Body>, node_config: &NodeConfigCached) ->
} else {
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?)
}
} else if path == "/api/1/channels" {
Ok(channels_list_v1(req).await?)
} else if path == "/api/1/channels/config" {
Ok(channels_config_v1(req).await?)
} else if path == "/api/1/stats/version" {
Ok(gather_json_v1(req, "/stats/version").await?)
} else if path.starts_with("/api/1/stats/") {
Ok(gather_json_v1(req, path).await?)
} else {
Ok(response(StatusCode::NOT_FOUND).body(Body::from(format!(
"Sorry, not found: {:?} {:?} {:?}",
@@ -594,3 +603,52 @@ pub async fn ca_connect_1(req: Request<Body>, node_config: &NodeConfigCached) ->
})))?;
Ok(ret)
}
pub async fn proxy(proxy_config: ProxyConfig) -> Result<(), Error> {
use std::str::FromStr;
let addr = SocketAddr::from_str(&format!("{}:{}", proxy_config.listen, proxy_config.port))?;
let make_service = make_service_fn({
move |_conn| {
let proxy_config = proxy_config.clone();
async move {
Ok::<_, Error>(service_fn({
move |req| {
let f = proxy_http_service(req, proxy_config.clone());
Cont { f: Box::pin(f) }
}
}))
}
}
});
Server::bind(&addr).serve(make_service).await?;
Ok(())
}
async fn proxy_http_service(req: Request<Body>, proxy_config: ProxyConfig) -> Result<Response<Body>, Error> {
match proxy_http_service_try(req, &proxy_config).await {
Ok(k) => Ok(k),
Err(e) => {
error!("data_api_proxy sees error: {:?}", e);
Err(e)
}
}
}
async fn proxy_http_service_try(req: Request<Body>, proxy_config: &ProxyConfig) -> Result<Response<Body>, Error> {
let uri = req.uri().clone();
let path = uri.path();
if path == "/api/4/search/channel" {
if req.method() == Method::GET {
Ok(proxy::channel_search(req, &proxy_config).await?)
} else {
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?)
}
} else {
Ok(response(StatusCode::NOT_FOUND).body(Body::from(format!(
"Sorry, not found: {:?} {:?} {:?}",
req.method(),
req.uri().path(),
req.uri().query(),
)))?)
}
}

47
httpret/src/proxy.rs Normal file
View File

@@ -0,0 +1,47 @@
use crate::response;
use err::Error;
use http::{HeaderValue, StatusCode};
use hyper::{Body, Request, Response};
use netpod::{ChannelSearchQuery, ProxyConfig};
use std::future::Future;
use std::pin::Pin;
use std::time::Duration;
pub async fn channel_search(req: Request<Body>, proxy_config: &ProxyConfig) -> Result<Response<Body>, Error> {
let (head, _body) = req.into_parts();
match head.headers.get("accept") {
Some(v) => {
if v == "application/json" {
// TODO actually pass on the query parameters to the sub query.
err::todo();
let query = ChannelSearchQuery::from_request(head.uri.query())?;
let uri = format!("/api/4/search/channel");
let nt = |_res| {
let fut = async { Ok(0f32) };
Box::pin(fut) as Pin<Box<dyn Future<Output = Result<f32, Error>> + Send>>
};
let ft = |_all| {
let res = response(StatusCode::OK)
.header(http::header::CONTENT_TYPE, "application/json")
.body(serde_json::to_string(&42)?.into())?;
Ok(res)
};
let mut ret = crate::gather::gather_get_json_generic::<f32, _, _>(
http::Method::GET,
uri,
proxy_config.search_hosts.clone(),
nt,
ft,
Duration::from_millis(3000),
)
.await?;
ret.headers_mut()
.append("x-proxy-log-mark", HeaderValue::from_str("proxied")?);
Ok(ret)
} else {
Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?)
}
}
_ => Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?),
}
}