WIP on proxy pulse map

This commit is contained in:
Dominik Werder
2021-09-20 21:17:25 +02:00
parent e1e930f453
commit b54b31fe98
6 changed files with 99 additions and 4 deletions

View File

@@ -106,6 +106,15 @@ where
impl<F> UnwindSafe for Cont<F> {}
pub fn response_err<T>(status: StatusCode, msg: T) -> Result<Response<Body>, Error>
where
T: AsRef<str>,
{
let msg = format!("Error:\n\n{}\n\nDocumentation:\nhttps://data-api.psi.ch/api/1/documentation/\nhttps://data-api.psi.ch/api/4/documentation/", msg.as_ref());
let ret = response(status).body(Body::from(msg))?;
Ok(ret)
}
macro_rules! static_http {
($path:expr, $tgt:expr, $tgtex:expr, $ctype:expr) => {
if $path == concat!("/api/4/documentation/", $tgt) {

View File

@@ -9,6 +9,7 @@ use futures_util::pin_mut;
use http::{Method, StatusCode};
use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Request, Response, Server};
use hyper_tls::HttpsConnector;
use itertools::Itertools;
use netpod::log::*;
use netpod::{
@@ -70,6 +71,8 @@ async fn proxy_http_service_try(req: Request<Body>, proxy_config: &ProxyConfig)
Err(Error::with_msg("todo"))
} else if path == "/api/1/query" {
Ok(proxy_api1_single_backend_query(req, proxy_config).await?)
} else if path.starts_with("/api/1/map/pulse/") {
Ok(proxy_api1_map_pulse(req, proxy_config).await?)
} else if path.starts_with("/api/1/gather/") {
Ok(gather_json_2_v1(req, "/api/1/gather/", proxy_config).await?)
} else if path == "/api/4/backends" {
@@ -112,7 +115,7 @@ async fn proxy_http_service_try(req: Request<Body>, proxy_config: &ProxyConfig)
}
} else {
Ok(response(StatusCode::NOT_FOUND).body(Body::from(format!(
"Sorry, not found: {:?} {:?} {:?}",
"Sorry, proxy can not find: {:?} {:?} {:?}",
req.method(),
req.uri().path(),
req.uri().query(),
@@ -298,6 +301,69 @@ pub async fn channel_search(req: Request<Body>, proxy_config: &ProxyConfig) -> R
}
}
pub async fn proxy_api1_map_pulse(req: Request<Body>, proxy_config: &ProxyConfig) -> Result<Response<Body>, Error> {
let s2 = format!("http://dummy/{}", req.uri());
info!("s2: {:?}", s2);
let url = Url::parse(&s2)?;
let mut backend = None;
for (k, v) in url.query_pairs() {
if k == "backend" {
backend = Some(v.to_string());
}
}
let backend = if let Some(backend) = backend {
backend
} else {
return Ok(super::response_err(
StatusCode::BAD_REQUEST,
"Required parameter `backend` not specified.",
)?);
};
let pulseid = if let Some(k) = url.path_segments() {
if let Some(k) = k.rev().next() {
if let Ok(k) = k.to_string().parse::<u64>() {
k
} else {
return Ok(super::response_err(
StatusCode::BAD_REQUEST,
"Can not parse parameter `pulseid`.",
)?);
}
} else {
return Ok(super::response_err(
StatusCode::BAD_REQUEST,
"Can not parse parameter `pulseid`.",
)?);
}
} else {
return Ok(super::response_err(
StatusCode::BAD_REQUEST,
"Required parameter `pulseid` not specified.",
)?);
};
if backend == "sf-databuffer" {
if proxy_config.search_hosts.len() == 1 {
let url = format!("http://sf-daqbuf-21:8380/api/1/map/pulse/{}", pulseid);
let req = Request::builder().method(Method::GET).uri(url).body(Body::empty())?;
let res = hyper::Client::new().request(req).await?;
let ret = response(StatusCode::OK).body(res.into_body())?;
Ok(ret)
} else {
let url = format!("https://sf-data-api.psi.ch/api/1/map/pulse/{}", pulseid);
let req = Request::builder().method(Method::GET).uri(url).body(Body::empty())?;
let https = HttpsConnector::new();
let res = hyper::Client::builder().build(https).request(req).await?;
let ret = response(StatusCode::OK).body(res.into_body())?;
Ok(ret)
}
} else {
return Ok(super::response_err(
StatusCode::BAD_REQUEST,
format!("backend \"{}\" not supported for this action", backend),
)?);
}
}
pub async fn proxy_api1_single_backend_query(
_req: Request<Body>,
_proxy_config: &ProxyConfig,

View File

@@ -18,6 +18,7 @@ use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Duration;
use std::time::Instant;
use std::{io::SeekFrom, path::PathBuf};
use tokio::task::JoinHandle;
use tokio::{
@@ -326,6 +327,7 @@ async fn update_task(do_abort: Arc<AtomicUsize>, node_config: NodeConfigCached)
break;
}
info!("Start update task");
let ts1 = Instant::now();
match IndexFullHttpFunction::index(&node_config).await {
Ok(_) => {}
Err(e) => {
@@ -333,7 +335,9 @@ async fn update_task(do_abort: Arc<AtomicUsize>, node_config: NodeConfigCached)
tokio::time::sleep(Duration::from_millis(5000)).await;
}
}
info!("Done update task");
let ts2 = Instant::now();
let dt = ts2.duration_since(ts1).as_secs_f64() * 1e3;
info!("Done update task {:.0} ms", dt);
}
Ok(())
}