Add api 4 pulse mapping and refactor some bits

This commit is contained in:
Dominik Werder
2022-01-18 10:46:38 +01:00
parent 5d935414ea
commit 353db96a76
6 changed files with 180 additions and 47 deletions

View File

@@ -1,8 +1,8 @@
[package]
name = "daqbuffer"
version = "4.0.0-a.dev.12"
version = "4.1.0"
authors = ["Dominik Werder <dominik.werder@gmail.com>"]
edition = "2018"
edition = "2021"
[dependencies]
tokio = { version = "=1.14.77", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] }

View File

@@ -36,7 +36,7 @@ impl ListIndexFilesHttpFunction {
"ListIndexFilesHttpFunction"
}
pub fn should_handle(path: &str) -> Option<Self> {
pub fn handler(path: &str) -> Option<Self> {
if path.starts_with(Self::prefix()) {
Some(Self {})
} else {
@@ -91,7 +91,7 @@ impl ScanIndexFiles {
"ScanIndexFiles"
}
pub fn should_handle(path: &str) -> Option<Self> {
pub fn handler(path: &str) -> Option<Self> {
if path.starts_with(Self::prefix()) {
Some(Self {})
} else {
@@ -131,7 +131,7 @@ impl ScanChannels {
"ScanChannels"
}
pub fn should_handle(path: &str) -> Option<Self> {
pub fn handler(path: &str) -> Option<Self> {
if path.starts_with(Self::prefix()) {
Some(Self {})
} else {
@@ -171,7 +171,7 @@ impl ChannelNames {
"ChannelNames"
}
pub fn should_handle(path: &str) -> Option<Self> {
pub fn handler(path: &str) -> Option<Self> {
if path.starts_with(Self::prefix()) {
Some(Self {})
} else {
@@ -212,7 +212,7 @@ impl ScanConfigs {
"ScanConfigs"
}
pub fn should_handle(path: &str) -> Option<Self> {
pub fn handler(path: &str) -> Option<Self> {
if path.starts_with(Self::prefix()) {
Some(Self {})
} else {
@@ -254,7 +254,7 @@ impl BlockRefStream {
"BlockRefStream"
}
pub fn should_handle(path: &str) -> Option<Self> {
pub fn handler(path: &str) -> Option<Self> {
if path.starts_with(Self::prefix()) {
Some(Self {})
} else {
@@ -324,7 +324,7 @@ impl BlockStream {
"BlockStream"
}
pub fn should_handle(path: &str) -> Option<Self> {
pub fn handler(path: &str) -> Option<Self> {
if path.starts_with(Self::prefix()) {
Some(Self {})
} else {
@@ -396,7 +396,7 @@ impl ListChannelsHttpFunction {
"ListChannelsHttpFunction"
}
pub fn should_handle(path: &str) -> Option<Self> {
pub fn handler(path: &str) -> Option<Self> {
if path.starts_with(Self::prefix()) {
Some(Self {})
} else {

View File

@@ -289,31 +289,33 @@ async fn http_service_try(req: Request<Body>, node_config: &NodeConfigCached) ->
} else {
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?)
}
} else if pulsemap::IndexFullHttpFunction::path_matches(path) {
pulsemap::IndexFullHttpFunction::handle(req, &node_config).await
} else if pulsemap::MarkClosedHttpFunction::path_matches(path) {
pulsemap::MarkClosedHttpFunction::handle(req, &node_config).await
} else if pulsemap::MapPulseLocalHttpFunction::path_matches(path) {
pulsemap::MapPulseLocalHttpFunction::handle(req, &node_config).await
} else if pulsemap::MapPulseHistoHttpFunction::path_matches(path) {
pulsemap::MapPulseHistoHttpFunction::handle(req, &node_config).await
} else if pulsemap::MapPulseHttpFunction::path_matches(path) {
pulsemap::MapPulseHttpFunction::handle(req, &node_config).await
} else if let Some(h) = channelarchiver::ListIndexFilesHttpFunction::should_handle(path) {
} else if let Some(h) = pulsemap::IndexFullHttpFunction::handler(&req) {
h.handle(req, &node_config).await
} else if let Some(h) = channelarchiver::ListChannelsHttpFunction::should_handle(path) {
} else if let Some(h) = pulsemap::MarkClosedHttpFunction::handler(&req) {
h.handle(req, &node_config).await
} else if let Some(h) = channelarchiver::ScanIndexFiles::should_handle(path) {
} else if let Some(h) = pulsemap::MapPulseLocalHttpFunction::handler(&req) {
h.handle(req, &node_config).await
} else if let Some(h) = channelarchiver::ScanChannels::should_handle(path) {
} else if let Some(h) = pulsemap::MapPulseHistoHttpFunction::handler(&req) {
h.handle(req, &node_config).await
} else if let Some(h) = channelarchiver::ScanConfigs::should_handle(path) {
} else if let Some(h) = pulsemap::MapPulseHttpFunction::handler(&req) {
h.handle(req, &node_config).await
} else if let Some(h) = channelarchiver::ChannelNames::should_handle(path) {
} else if let Some(h) = pulsemap::Api4MapPulseHttpFunction::handler(&req) {
h.handle(req, &node_config).await
} else if let Some(h) = channelarchiver::BlockRefStream::should_handle(path) {
} else if let Some(h) = channelarchiver::ListIndexFilesHttpFunction::handler(path) {
h.handle(req, &node_config).await
} else if let Some(h) = channelarchiver::BlockStream::should_handle(path) {
} else if let Some(h) = channelarchiver::ListChannelsHttpFunction::handler(path) {
h.handle(req, &node_config).await
} else if let Some(h) = channelarchiver::ScanIndexFiles::handler(path) {
h.handle(req, &node_config).await
} else if let Some(h) = channelarchiver::ScanChannels::handler(path) {
h.handle(req, &node_config).await
} else if let Some(h) = channelarchiver::ScanConfigs::handler(path) {
h.handle(req, &node_config).await
} else if let Some(h) = channelarchiver::ChannelNames::handler(path) {
h.handle(req, &node_config).await
} else if let Some(h) = channelarchiver::BlockRefStream::handler(path) {
h.handle(req, &node_config).await
} else if let Some(h) = channelarchiver::BlockStream::handler(path) {
h.handle(req, &node_config).await
} else if path.starts_with("/api/1/requestStatus/") {
Ok(response(StatusCode::OK).body(Body::from("{}"))?)

View File

@@ -3,6 +3,7 @@ pub mod api4;
use crate::api1::{channel_search_configs_v1, channel_search_list_v1, gather_json_2_v1, proxy_distribute_v1};
use crate::err::Error;
use crate::gather::{gather_get_json_generic, SubRes};
use crate::pulsemap::MapPulseQuery;
use crate::{api_1_docs, api_4_docs, response, Cont};
use disk::events::PlainEventsJsonQuery;
use futures_core::Stream;
@@ -93,10 +94,11 @@ async fn proxy_http_service_try(req: Request<Body>, proxy_config: &ProxyConfig)
} else if path == "/api/4/backends" {
Ok(backends(req, proxy_config).await?)
} else if path == "/api/4/search/channel" {
//Ok(channel_search(req, proxy_config).await?)
Ok(api4::channel_search(req, proxy_config).await?)
} else if path == "/api/4/events" {
Ok(proxy_single_backend_query::<PlainEventsJsonQuery>(req, proxy_config).await?)
} else if path.starts_with("/api/4/map/pulse/") {
Ok(proxy_single_backend_query::<MapPulseQuery>(req, proxy_config).await?)
} else if path == "/api/4/binned" {
Ok(proxy_single_backend_query::<BinnedQuery>(req, proxy_config).await?)
} else if path == "/api/4/channel/config" {
@@ -392,7 +394,7 @@ pub async fn proxy_single_backend_query<QT>(
proxy_config: &ProxyConfig,
) -> Result<Response<Body>, Error>
where
QT: FromUrl + HasBackend + AppendToUrl + HasTimeout,
QT: FromUrl + AppendToUrl + HasBackend + HasTimeout,
{
let (head, _body) = req.into_parts();
match head.headers.get(http::header::ACCEPT) {
@@ -400,7 +402,11 @@ where
if v == APP_JSON {
let url = Url::parse(&format!("dummy:{}", head.uri))?;
let query = QT::from_url(&url)?;
let sh = get_query_host_for_backend(&query.backend(), proxy_config)?;
let sh = if url.as_str().contains("/map/pulse/") {
get_query_host_for_backend_2(&query.backend(), proxy_config)?
} else {
get_query_host_for_backend(&query.backend(), proxy_config)?
};
let urls = [sh]
.iter()
.map(|sh| match Url::parse(&format!("{}{}", sh, head.uri.path())) {
@@ -458,3 +464,10 @@ fn get_query_host_for_backend(backend: &str, proxy_config: &ProxyConfig) -> Resu
}
return Err(Error::with_msg(format!("host not found for backend {:?}", backend)));
}
fn get_query_host_for_backend_2(backend: &str, _proxy_config: &ProxyConfig) -> Result<String, Error> {
if backend == "sf-databuffer" {
return Ok("https://sf-data-api.psi.ch".into());
}
return Err(Error::with_msg(format!("host not found for backend {:?}", backend)));
}

View File

@@ -6,6 +6,10 @@ use futures_util::FutureExt;
use http::{Method, StatusCode, Uri};
use hyper::{Body, Request, Response};
use netpod::log::*;
use netpod::AppendToUrl;
use netpod::FromUrl;
use netpod::HasBackend;
use netpod::HasTimeout;
use netpod::NodeConfigCached;
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
@@ -20,6 +24,7 @@ use std::{io::SeekFrom, path::PathBuf};
use tokio::fs::File;
use tokio::io::{AsyncReadExt, AsyncSeekExt};
use tokio::task::JoinHandle;
use url::Url;
pub struct MapPulseHisto {
_pulse: u64,
@@ -33,6 +38,7 @@ const MAP_PULSE_HISTO_URL_PREFIX: &'static str = "/api/1/map/pulse/histo/";
const MAP_PULSE_URL_PREFIX: &'static str = "/api/1/map/pulse/";
const MAP_PULSE_LOCAL_URL_PREFIX: &'static str = "/api/1/map/pulse/local/";
const MAP_PULSE_MARK_CLOSED_URL_PREFIX: &'static str = "/api/1/map/pulse/mark/closed/";
const API_4_MAP_PULSE_URL_PREFIX: &'static str = "/api/4/map/pulse/";
async fn make_tables(node_config: &NodeConfigCached) -> Result<(), Error> {
let conn = dbconn::create_connection(&node_config.node_config.cluster.database).await?;
@@ -206,11 +212,15 @@ async fn read_chunk_at(mut file: File, pos: u64, chunk_len: u64) -> Result<(Chun
pub struct IndexFullHttpFunction {}
impl IndexFullHttpFunction {
pub fn path_matches(path: &str) -> bool {
path.starts_with(MAP_INDEX_FULL_URL_PREFIX)
pub fn handler(req: &Request<Body>) -> Option<Self> {
if req.uri().path().starts_with(MAP_INDEX_FULL_URL_PREFIX) {
Some(Self {})
} else {
None
}
}
pub async fn handle(req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
pub async fn handle(&self, req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
if req.method() != Method::GET {
return Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?);
}
@@ -423,6 +433,46 @@ async fn search_pulse(pulse: u64, path: &Path) -> Result<Option<u64>, Error> {
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MapPulseQuery {
backend: String,
pulse: u64,
}
impl HasBackend for MapPulseQuery {
fn backend(&self) -> &str {
&self.backend
}
}
impl HasTimeout for MapPulseQuery {
fn timeout(&self) -> Duration {
Duration::from_millis(2000)
}
}
impl FromUrl for MapPulseQuery {
fn from_url(url: &url::Url) -> Result<Self, err::Error> {
let mut pit = url
.path_segments()
.ok_or(Error::with_msg_no_trace("no path in url"))?
.rev();
let pulsestr = pit.next().ok_or(Error::with_msg_no_trace("no pulse in url path"))?;
let backend = pit
.next()
.ok_or(Error::with_msg_no_trace("no backend in url path"))?
.into();
let pulse: u64 = pulsestr.parse()?;
let ret = Self { backend, pulse };
info!("GOT {:?}", ret);
Ok(ret)
}
}
impl AppendToUrl for MapPulseQuery {
fn append_to_url(&self, _url: &mut Url) {}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct LocalMap {
pulse: u64,
@@ -433,11 +483,15 @@ struct LocalMap {
pub struct MapPulseLocalHttpFunction {}
impl MapPulseLocalHttpFunction {
pub fn path_matches(path: &str) -> bool {
path.starts_with(MAP_PULSE_LOCAL_URL_PREFIX)
pub fn handler(req: &Request<Body>) -> Option<Self> {
if req.uri().path().starts_with(MAP_PULSE_LOCAL_URL_PREFIX) {
Some(Self {})
} else {
None
}
}
pub async fn handle(req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
pub async fn handle(&self, req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
if req.method() != Method::GET {
return Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?);
}
@@ -487,11 +541,15 @@ pub struct TsHisto {
pub struct MapPulseHistoHttpFunction {}
impl MapPulseHistoHttpFunction {
pub fn path_matches(path: &str) -> bool {
path.starts_with(MAP_PULSE_HISTO_URL_PREFIX)
pub fn handler(req: &Request<Body>) -> Option<Self> {
if req.uri().path().starts_with(MAP_PULSE_HISTO_URL_PREFIX) {
Some(Self {})
} else {
None
}
}
pub async fn handle(req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
pub async fn handle(&self, req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
if req.method() != Method::GET {
return Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?);
}
@@ -538,11 +596,15 @@ impl MapPulseHistoHttpFunction {
pub struct MapPulseHttpFunction {}
impl MapPulseHttpFunction {
pub fn path_matches(path: &str) -> bool {
path.starts_with(MAP_PULSE_URL_PREFIX)
pub fn handler(req: &Request<Body>) -> Option<Self> {
if req.uri().path().starts_with(MAP_PULSE_URL_PREFIX) {
Some(Self {})
} else {
None
}
}
pub async fn handle(req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
pub async fn handle(&self, req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
if req.method() != Method::GET {
return Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?);
}
@@ -566,14 +628,57 @@ impl MapPulseHttpFunction {
}
}
pub struct Api4MapPulseHttpFunction {}
impl Api4MapPulseHttpFunction {
pub fn handler(req: &Request<Body>) -> Option<Self> {
if req.uri().path().starts_with(API_4_MAP_PULSE_URL_PREFIX) {
Some(Self {})
} else {
None
}
}
pub fn path_matches(path: &str) -> bool {
path.starts_with(API_4_MAP_PULSE_URL_PREFIX)
}
pub async fn handle(&self, req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
if req.method() != Method::GET {
return Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?);
}
info!("Api4MapPulseHttpFunction handle uri: {:?}", req.uri());
let url = Url::parse(&format!("dummy:{}", req.uri()))?;
let q = MapPulseQuery::from_url(&url)?;
let histo = MapPulseHistoHttpFunction::histo(q.pulse, node_config).await?;
let mut i1 = 0;
let mut max = 0;
for i2 in 0..histo.tss.len() {
if histo.counts[i2] > max {
max = histo.counts[i2];
i1 = i2;
}
}
if max > 0 {
Ok(response(StatusCode::OK).body(Body::from(serde_json::to_vec(&histo.tss[i1])?))?)
} else {
Ok(response(StatusCode::NO_CONTENT).body(Body::empty())?)
}
}
}
pub struct MarkClosedHttpFunction {}
impl MarkClosedHttpFunction {
pub fn path_matches(path: &str) -> bool {
path.starts_with(MAP_PULSE_MARK_CLOSED_URL_PREFIX)
pub fn handler(req: &Request<Body>) -> Option<Self> {
if req.uri().path().starts_with(MAP_PULSE_MARK_CLOSED_URL_PREFIX) {
Some(Self {})
} else {
None
}
}
pub async fn handle(req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
pub async fn handle(&self, req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
if req.method() != Method::GET {
return Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?);
}

View File

@@ -32,6 +32,7 @@
<li><a href="#list-backends">List available backends.</a></li>
<li><a href="#api-version">More version information about the running service.</a></li>
<li><a href="#search-channel">Search channel.</a></li>
<li><a href="#map-pulse">Map pulse.</a></li>
<li><a href="#query-events">Query unbinned event data.</a></li>
<li><a href="#query-binned">Query binned data.</a></li>
</ul>
@@ -100,7 +101,6 @@ curl -H 'Accept: application/json' 'https://data-api.psi.ch/api/4/version'
<h2>Search channel</h2>
<p><strong>Method:</strong> GET</p>
<p><strong>URL:</strong> https://data-api.psi.ch/api/4/search/channel</p>
<p><strong>Request header:</strong> "Accept" should be "application/json" for forward compatibility.</p>
<p><strong>Query parameters:</strong> (all optional)</p>
<ul>
<li>backend (e.g. "sf-databuffer", "sls-archive", ... any from list-backends API)</li>
@@ -158,6 +158,19 @@ curl -H 'Accept: application/json' 'https://data-api.psi.ch/api/4/search/channel
<a id="map-pulse"></a>
<h2>Map pulse</h2>
<p><strong>Method:</strong> GET</p>
<p><strong>URL:</strong> https://data-api.psi.ch/api/4/map/pulse/BACKEND/PULSE</p>
<p><strong>Request header:</strong> "Accept" should be "application/json" for forward-compatibility but can be
omitted for e.g. a quick manual search using CURL.</p>
<h4>CURL example:</h4>
<pre>
curl -H 'Accept: application/json' 'https://data-api.psi.ch/api/4/map/pulse/sf-databuffer/424242'
</pre>
<a id="query-events"></a>
<h2>Query event data</h2>