From b54b31fe981a9498fb49618bebb1f043eea4a8a7 Mon Sep 17 00:00:00 2001
From: Dominik Werder
Date: Mon, 20 Sep 2021 21:17:25 +0200
Subject: [PATCH] WIP on proxy pulse map
---
disk/src/gen.rs | 2 +-
httpret/src/lib.rs | 9 ++++
httpret/src/proxy.rs | 68 +++++++++++++++++++++++++-
httpret/src/pulsemap.rs | 6 ++-
httpret/static/documentation/api1.html | 14 ++++++
httpret/static/documentation/api4.html | 4 +-
6 files changed, 99 insertions(+), 4 deletions(-)
diff --git a/disk/src/gen.rs b/disk/src/gen.rs
index d245b36..2a2b50c 100644
--- a/disk/src/gen.rs
+++ b/disk/src/gen.rs
@@ -322,7 +322,7 @@ async fn gen_timebin(
channel_path: &Path,
config: &ChannelConfig,
split: u32,
- node: &Node,
+ _node: &Node,
ensemble: &Ensemble,
gen_var: &GenVar,
) -> Result {
diff --git a/httpret/src/lib.rs b/httpret/src/lib.rs
index 827d6df..0ddd204 100644
--- a/httpret/src/lib.rs
+++ b/httpret/src/lib.rs
@@ -106,6 +106,15 @@ where
impl UnwindSafe for Cont {}
+pub fn response_err(status: StatusCode, msg: T) -> Result, Error>
+where
+ T: AsRef,
+{
+ let msg = format!("Error:\n\n{}\n\nDocumentation:\nhttps://data-api.psi.ch/api/1/documentation/\nhttps://data-api.psi.ch/api/4/documentation/", msg.as_ref());
+ let ret = response(status).body(Body::from(msg))?;
+ Ok(ret)
+}
+
macro_rules! static_http {
($path:expr, $tgt:expr, $tgtex:expr, $ctype:expr) => {
if $path == concat!("/api/4/documentation/", $tgt) {
diff --git a/httpret/src/proxy.rs b/httpret/src/proxy.rs
index 5c03c2b..5be057e 100644
--- a/httpret/src/proxy.rs
+++ b/httpret/src/proxy.rs
@@ -9,6 +9,7 @@ use futures_util::pin_mut;
use http::{Method, StatusCode};
use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Request, Response, Server};
+use hyper_tls::HttpsConnector;
use itertools::Itertools;
use netpod::log::*;
use netpod::{
@@ -70,6 +71,8 @@ async fn proxy_http_service_try(req: Request, proxy_config: &ProxyConfig)
Err(Error::with_msg("todo"))
} else if path == "/api/1/query" {
Ok(proxy_api1_single_backend_query(req, proxy_config).await?)
+ } else if path.starts_with("/api/1/map/pulse/") {
+ Ok(proxy_api1_map_pulse(req, proxy_config).await?)
} else if path.starts_with("/api/1/gather/") {
Ok(gather_json_2_v1(req, "/api/1/gather/", proxy_config).await?)
} else if path == "/api/4/backends" {
@@ -112,7 +115,7 @@ async fn proxy_http_service_try(req: Request, proxy_config: &ProxyConfig)
}
} else {
Ok(response(StatusCode::NOT_FOUND).body(Body::from(format!(
- "Sorry, not found: {:?} {:?} {:?}",
+ "Sorry, proxy can not find: {:?} {:?} {:?}",
req.method(),
req.uri().path(),
req.uri().query(),
@@ -298,6 +301,69 @@ pub async fn channel_search(req: Request, proxy_config: &ProxyConfig) -> R
}
}
+pub async fn proxy_api1_map_pulse(req: Request, proxy_config: &ProxyConfig) -> Result, Error> {
+ let s2 = format!("http://dummy/{}", req.uri());
+ info!("s2: {:?}", s2);
+ let url = Url::parse(&s2)?;
+ let mut backend = None;
+ for (k, v) in url.query_pairs() {
+ if k == "backend" {
+ backend = Some(v.to_string());
+ }
+ }
+ let backend = if let Some(backend) = backend {
+ backend
+ } else {
+ return Ok(super::response_err(
+ StatusCode::BAD_REQUEST,
+ "Required parameter `backend` not specified.",
+ )?);
+ };
+ let pulseid = if let Some(k) = url.path_segments() {
+ if let Some(k) = k.rev().next() {
+ if let Ok(k) = k.to_string().parse::() {
+ k
+ } else {
+ return Ok(super::response_err(
+ StatusCode::BAD_REQUEST,
+ "Can not parse parameter `pulseid`.",
+ )?);
+ }
+ } else {
+ return Ok(super::response_err(
+ StatusCode::BAD_REQUEST,
+ "Can not parse parameter `pulseid`.",
+ )?);
+ }
+ } else {
+ return Ok(super::response_err(
+ StatusCode::BAD_REQUEST,
+ "Required parameter `pulseid` not specified.",
+ )?);
+ };
+ if backend == "sf-databuffer" {
+ if proxy_config.search_hosts.len() == 1 {
+ let url = format!("http://sf-daqbuf-21:8380/api/1/map/pulse/{}", pulseid);
+ let req = Request::builder().method(Method::GET).uri(url).body(Body::empty())?;
+ let res = hyper::Client::new().request(req).await?;
+ let ret = response(StatusCode::OK).body(res.into_body())?;
+ Ok(ret)
+ } else {
+ let url = format!("https://sf-data-api.psi.ch/api/1/map/pulse/{}", pulseid);
+ let req = Request::builder().method(Method::GET).uri(url).body(Body::empty())?;
+ let https = HttpsConnector::new();
+ let res = hyper::Client::builder().build(https).request(req).await?;
+ let ret = response(StatusCode::OK).body(res.into_body())?;
+ Ok(ret)
+ }
+ } else {
+ return Ok(super::response_err(
+ StatusCode::BAD_REQUEST,
+ format!("backend \"{}\" not supported for this action", backend),
+ )?);
+ }
+}
+
pub async fn proxy_api1_single_backend_query(
_req: Request,
_proxy_config: &ProxyConfig,
diff --git a/httpret/src/pulsemap.rs b/httpret/src/pulsemap.rs
index 3a06f36..a46eca1 100644
--- a/httpret/src/pulsemap.rs
+++ b/httpret/src/pulsemap.rs
@@ -18,6 +18,7 @@ use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Duration;
+use std::time::Instant;
use std::{io::SeekFrom, path::PathBuf};
use tokio::task::JoinHandle;
use tokio::{
@@ -326,6 +327,7 @@ async fn update_task(do_abort: Arc, node_config: NodeConfigCached)
break;
}
info!("Start update task");
+ let ts1 = Instant::now();
match IndexFullHttpFunction::index(&node_config).await {
Ok(_) => {}
Err(e) => {
@@ -333,7 +335,9 @@ async fn update_task(do_abort: Arc, node_config: NodeConfigCached)
tokio::time::sleep(Duration::from_millis(5000)).await;
}
}
- info!("Done update task");
+ let ts2 = Instant::now();
+ let dt = ts2.duration_since(ts1).as_secs_f64() * 1e3;
+ info!("Done update task {:.0} ms", dt);
}
Ok(())
}
diff --git a/httpret/static/documentation/api1.html b/httpret/static/documentation/api1.html
index 4e492fd..e475f4b 100644
--- a/httpret/static/documentation/api1.html
+++ b/httpret/static/documentation/api1.html
@@ -27,6 +27,7 @@ Currently available backends:
@@ -148,6 +149,19 @@ QUERY='{ "regex": "LSCP9:CH0", "backends": ["sf-databuffer"] }'
curl -H 'Content-Type: application/json' -H 'Accept: application/json' -d "$QUERY" https://data-api.psi.ch/api/1/channels/config
+
+
+Map pulse-id to timestamp (SwissFEL)
+Method: GET
+URL: https://data-api.psi.ch/api/1/map/pulse/PULSEID
+Result body example:
+1677392847564
+CURL example:
+
+curl -H 'Content-Type: application/json' -H 'Accept: application/json' https://data-api.psi.ch/api/1/map/pulse/7461843
+
+
+
Feedback and comments
Feedback is very much appreciated:
dominik.werder@psi.ch
diff --git a/httpret/static/documentation/api4.html b/httpret/static/documentation/api4.html
index ba0e271..1788085 100644
--- a/httpret/static/documentation/api4.html
+++ b/httpret/static/documentation/api4.html
@@ -13,7 +13,9 @@
Databuffer API 4 Documentation
Documented here is the databuffer http api 4. The "original" unversioned api is documented at
-this location.
+ this location.
+API version 1:
+https://data-api.psi.ch/api/1/documentation/
In order to keep the api surface as small as possible in comparison to api 0, we add functionality on demand,
so please feel free to create some Jira ticket!