From 353db96a76e8bba127d254ad9641247641677f6c Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Tue, 18 Jan 2022 10:46:38 +0100 Subject: [PATCH] Add api 4 pulse mapping and refactor some bits --- daqbuffer/Cargo.toml | 4 +- httpret/src/channelarchiver.rs | 16 +-- httpret/src/httpret.rs | 38 +++---- httpret/src/proxy.rs | 19 +++- httpret/src/pulsemap.rs | 135 ++++++++++++++++++++++--- httpret/static/documentation/api4.html | 15 ++- 6 files changed, 180 insertions(+), 47 deletions(-) diff --git a/daqbuffer/Cargo.toml b/daqbuffer/Cargo.toml index f7adbd4..a5d8856 100644 --- a/daqbuffer/Cargo.toml +++ b/daqbuffer/Cargo.toml @@ -1,8 +1,8 @@ [package] name = "daqbuffer" -version = "4.0.0-a.dev.12" +version = "4.1.0" authors = ["Dominik Werder "] -edition = "2018" +edition = "2021" [dependencies] tokio = { version = "=1.14.77", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] } diff --git a/httpret/src/channelarchiver.rs b/httpret/src/channelarchiver.rs index dfb867c..2ed7a65 100644 --- a/httpret/src/channelarchiver.rs +++ b/httpret/src/channelarchiver.rs @@ -36,7 +36,7 @@ impl ListIndexFilesHttpFunction { "ListIndexFilesHttpFunction" } - pub fn should_handle(path: &str) -> Option { + pub fn handler(path: &str) -> Option { if path.starts_with(Self::prefix()) { Some(Self {}) } else { @@ -91,7 +91,7 @@ impl ScanIndexFiles { "ScanIndexFiles" } - pub fn should_handle(path: &str) -> Option { + pub fn handler(path: &str) -> Option { if path.starts_with(Self::prefix()) { Some(Self {}) } else { @@ -131,7 +131,7 @@ impl ScanChannels { "ScanChannels" } - pub fn should_handle(path: &str) -> Option { + pub fn handler(path: &str) -> Option { if path.starts_with(Self::prefix()) { Some(Self {}) } else { @@ -171,7 +171,7 @@ impl ChannelNames { "ChannelNames" } - pub fn should_handle(path: &str) -> Option { + pub fn handler(path: &str) -> Option { if path.starts_with(Self::prefix()) { Some(Self {}) } else { @@ -212,7 +212,7 @@ impl ScanConfigs { "ScanConfigs" } - pub fn should_handle(path: &str) -> Option { + pub fn handler(path: &str) -> Option { if path.starts_with(Self::prefix()) { Some(Self {}) } else { @@ -254,7 +254,7 @@ impl BlockRefStream { "BlockRefStream" } - pub fn should_handle(path: &str) -> Option { + pub fn handler(path: &str) -> Option { if path.starts_with(Self::prefix()) { Some(Self {}) } else { @@ -324,7 +324,7 @@ impl BlockStream { "BlockStream" } - pub fn should_handle(path: &str) -> Option { + pub fn handler(path: &str) -> Option { if path.starts_with(Self::prefix()) { Some(Self {}) } else { @@ -396,7 +396,7 @@ impl ListChannelsHttpFunction { "ListChannelsHttpFunction" } - pub fn should_handle(path: &str) -> Option { + pub fn handler(path: &str) -> Option { if path.starts_with(Self::prefix()) { Some(Self {}) } else { diff --git a/httpret/src/httpret.rs b/httpret/src/httpret.rs index 052fd82..3ee38a0 100644 --- a/httpret/src/httpret.rs +++ b/httpret/src/httpret.rs @@ -289,31 +289,33 @@ async fn http_service_try(req: Request, node_config: &NodeConfigCached) -> } else { Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) } - } else if pulsemap::IndexFullHttpFunction::path_matches(path) { - pulsemap::IndexFullHttpFunction::handle(req, &node_config).await - } else if pulsemap::MarkClosedHttpFunction::path_matches(path) { - pulsemap::MarkClosedHttpFunction::handle(req, &node_config).await - } else if pulsemap::MapPulseLocalHttpFunction::path_matches(path) { - pulsemap::MapPulseLocalHttpFunction::handle(req, &node_config).await - } else if pulsemap::MapPulseHistoHttpFunction::path_matches(path) { - pulsemap::MapPulseHistoHttpFunction::handle(req, &node_config).await - } else if pulsemap::MapPulseHttpFunction::path_matches(path) { - pulsemap::MapPulseHttpFunction::handle(req, &node_config).await - } else if let Some(h) = channelarchiver::ListIndexFilesHttpFunction::should_handle(path) { + } else if let Some(h) = pulsemap::IndexFullHttpFunction::handler(&req) { h.handle(req, &node_config).await - } else if let Some(h) = channelarchiver::ListChannelsHttpFunction::should_handle(path) { + } else if let Some(h) = pulsemap::MarkClosedHttpFunction::handler(&req) { h.handle(req, &node_config).await - } else if let Some(h) = channelarchiver::ScanIndexFiles::should_handle(path) { + } else if let Some(h) = pulsemap::MapPulseLocalHttpFunction::handler(&req) { h.handle(req, &node_config).await - } else if let Some(h) = channelarchiver::ScanChannels::should_handle(path) { + } else if let Some(h) = pulsemap::MapPulseHistoHttpFunction::handler(&req) { h.handle(req, &node_config).await - } else if let Some(h) = channelarchiver::ScanConfigs::should_handle(path) { + } else if let Some(h) = pulsemap::MapPulseHttpFunction::handler(&req) { h.handle(req, &node_config).await - } else if let Some(h) = channelarchiver::ChannelNames::should_handle(path) { + } else if let Some(h) = pulsemap::Api4MapPulseHttpFunction::handler(&req) { h.handle(req, &node_config).await - } else if let Some(h) = channelarchiver::BlockRefStream::should_handle(path) { + } else if let Some(h) = channelarchiver::ListIndexFilesHttpFunction::handler(path) { h.handle(req, &node_config).await - } else if let Some(h) = channelarchiver::BlockStream::should_handle(path) { + } else if let Some(h) = channelarchiver::ListChannelsHttpFunction::handler(path) { + h.handle(req, &node_config).await + } else if let Some(h) = channelarchiver::ScanIndexFiles::handler(path) { + h.handle(req, &node_config).await + } else if let Some(h) = channelarchiver::ScanChannels::handler(path) { + h.handle(req, &node_config).await + } else if let Some(h) = channelarchiver::ScanConfigs::handler(path) { + h.handle(req, &node_config).await + } else if let Some(h) = channelarchiver::ChannelNames::handler(path) { + h.handle(req, &node_config).await + } else if let Some(h) = channelarchiver::BlockRefStream::handler(path) { + h.handle(req, &node_config).await + } else if let Some(h) = channelarchiver::BlockStream::handler(path) { h.handle(req, &node_config).await } else if path.starts_with("/api/1/requestStatus/") { Ok(response(StatusCode::OK).body(Body::from("{}"))?) diff --git a/httpret/src/proxy.rs b/httpret/src/proxy.rs index 0ba67bb..2d29d61 100644 --- a/httpret/src/proxy.rs +++ b/httpret/src/proxy.rs @@ -3,6 +3,7 @@ pub mod api4; use crate::api1::{channel_search_configs_v1, channel_search_list_v1, gather_json_2_v1, proxy_distribute_v1}; use crate::err::Error; use crate::gather::{gather_get_json_generic, SubRes}; +use crate::pulsemap::MapPulseQuery; use crate::{api_1_docs, api_4_docs, response, Cont}; use disk::events::PlainEventsJsonQuery; use futures_core::Stream; @@ -93,10 +94,11 @@ async fn proxy_http_service_try(req: Request, proxy_config: &ProxyConfig) } else if path == "/api/4/backends" { Ok(backends(req, proxy_config).await?) } else if path == "/api/4/search/channel" { - //Ok(channel_search(req, proxy_config).await?) Ok(api4::channel_search(req, proxy_config).await?) } else if path == "/api/4/events" { Ok(proxy_single_backend_query::(req, proxy_config).await?) + } else if path.starts_with("/api/4/map/pulse/") { + Ok(proxy_single_backend_query::(req, proxy_config).await?) } else if path == "/api/4/binned" { Ok(proxy_single_backend_query::(req, proxy_config).await?) } else if path == "/api/4/channel/config" { @@ -392,7 +394,7 @@ pub async fn proxy_single_backend_query( proxy_config: &ProxyConfig, ) -> Result, Error> where - QT: FromUrl + HasBackend + AppendToUrl + HasTimeout, + QT: FromUrl + AppendToUrl + HasBackend + HasTimeout, { let (head, _body) = req.into_parts(); match head.headers.get(http::header::ACCEPT) { @@ -400,7 +402,11 @@ where if v == APP_JSON { let url = Url::parse(&format!("dummy:{}", head.uri))?; let query = QT::from_url(&url)?; - let sh = get_query_host_for_backend(&query.backend(), proxy_config)?; + let sh = if url.as_str().contains("/map/pulse/") { + get_query_host_for_backend_2(&query.backend(), proxy_config)? + } else { + get_query_host_for_backend(&query.backend(), proxy_config)? + }; let urls = [sh] .iter() .map(|sh| match Url::parse(&format!("{}{}", sh, head.uri.path())) { @@ -458,3 +464,10 @@ fn get_query_host_for_backend(backend: &str, proxy_config: &ProxyConfig) -> Resu } return Err(Error::with_msg(format!("host not found for backend {:?}", backend))); } + +fn get_query_host_for_backend_2(backend: &str, _proxy_config: &ProxyConfig) -> Result { + if backend == "sf-databuffer" { + return Ok("https://sf-data-api.psi.ch".into()); + } + return Err(Error::with_msg(format!("host not found for backend {:?}", backend))); +} diff --git a/httpret/src/pulsemap.rs b/httpret/src/pulsemap.rs index 62344a1..34e6639 100644 --- a/httpret/src/pulsemap.rs +++ b/httpret/src/pulsemap.rs @@ -6,6 +6,10 @@ use futures_util::FutureExt; use http::{Method, StatusCode, Uri}; use hyper::{Body, Request, Response}; use netpod::log::*; +use netpod::AppendToUrl; +use netpod::FromUrl; +use netpod::HasBackend; +use netpod::HasTimeout; use netpod::NodeConfigCached; use serde::{Deserialize, Serialize}; use std::collections::BTreeMap; @@ -20,6 +24,7 @@ use std::{io::SeekFrom, path::PathBuf}; use tokio::fs::File; use tokio::io::{AsyncReadExt, AsyncSeekExt}; use tokio::task::JoinHandle; +use url::Url; pub struct MapPulseHisto { _pulse: u64, @@ -33,6 +38,7 @@ const MAP_PULSE_HISTO_URL_PREFIX: &'static str = "/api/1/map/pulse/histo/"; const MAP_PULSE_URL_PREFIX: &'static str = "/api/1/map/pulse/"; const MAP_PULSE_LOCAL_URL_PREFIX: &'static str = "/api/1/map/pulse/local/"; const MAP_PULSE_MARK_CLOSED_URL_PREFIX: &'static str = "/api/1/map/pulse/mark/closed/"; +const API_4_MAP_PULSE_URL_PREFIX: &'static str = "/api/4/map/pulse/"; async fn make_tables(node_config: &NodeConfigCached) -> Result<(), Error> { let conn = dbconn::create_connection(&node_config.node_config.cluster.database).await?; @@ -206,11 +212,15 @@ async fn read_chunk_at(mut file: File, pos: u64, chunk_len: u64) -> Result<(Chun pub struct IndexFullHttpFunction {} impl IndexFullHttpFunction { - pub fn path_matches(path: &str) -> bool { - path.starts_with(MAP_INDEX_FULL_URL_PREFIX) + pub fn handler(req: &Request) -> Option { + if req.uri().path().starts_with(MAP_INDEX_FULL_URL_PREFIX) { + Some(Self {}) + } else { + None + } } - pub async fn handle(req: Request, node_config: &NodeConfigCached) -> Result, Error> { + pub async fn handle(&self, req: Request, node_config: &NodeConfigCached) -> Result, Error> { if req.method() != Method::GET { return Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?); } @@ -423,6 +433,46 @@ async fn search_pulse(pulse: u64, path: &Path) -> Result, Error> { } } +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct MapPulseQuery { + backend: String, + pulse: u64, +} + +impl HasBackend for MapPulseQuery { + fn backend(&self) -> &str { + &self.backend + } +} + +impl HasTimeout for MapPulseQuery { + fn timeout(&self) -> Duration { + Duration::from_millis(2000) + } +} + +impl FromUrl for MapPulseQuery { + fn from_url(url: &url::Url) -> Result { + let mut pit = url + .path_segments() + .ok_or(Error::with_msg_no_trace("no path in url"))? + .rev(); + let pulsestr = pit.next().ok_or(Error::with_msg_no_trace("no pulse in url path"))?; + let backend = pit + .next() + .ok_or(Error::with_msg_no_trace("no backend in url path"))? + .into(); + let pulse: u64 = pulsestr.parse()?; + let ret = Self { backend, pulse }; + info!("GOT {:?}", ret); + Ok(ret) + } +} + +impl AppendToUrl for MapPulseQuery { + fn append_to_url(&self, _url: &mut Url) {} +} + #[derive(Debug, Clone, Serialize, Deserialize)] struct LocalMap { pulse: u64, @@ -433,11 +483,15 @@ struct LocalMap { pub struct MapPulseLocalHttpFunction {} impl MapPulseLocalHttpFunction { - pub fn path_matches(path: &str) -> bool { - path.starts_with(MAP_PULSE_LOCAL_URL_PREFIX) + pub fn handler(req: &Request) -> Option { + if req.uri().path().starts_with(MAP_PULSE_LOCAL_URL_PREFIX) { + Some(Self {}) + } else { + None + } } - pub async fn handle(req: Request, node_config: &NodeConfigCached) -> Result, Error> { + pub async fn handle(&self, req: Request, node_config: &NodeConfigCached) -> Result, Error> { if req.method() != Method::GET { return Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?); } @@ -487,11 +541,15 @@ pub struct TsHisto { pub struct MapPulseHistoHttpFunction {} impl MapPulseHistoHttpFunction { - pub fn path_matches(path: &str) -> bool { - path.starts_with(MAP_PULSE_HISTO_URL_PREFIX) + pub fn handler(req: &Request) -> Option { + if req.uri().path().starts_with(MAP_PULSE_HISTO_URL_PREFIX) { + Some(Self {}) + } else { + None + } } - pub async fn handle(req: Request, node_config: &NodeConfigCached) -> Result, Error> { + pub async fn handle(&self, req: Request, node_config: &NodeConfigCached) -> Result, Error> { if req.method() != Method::GET { return Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?); } @@ -538,11 +596,15 @@ impl MapPulseHistoHttpFunction { pub struct MapPulseHttpFunction {} impl MapPulseHttpFunction { - pub fn path_matches(path: &str) -> bool { - path.starts_with(MAP_PULSE_URL_PREFIX) + pub fn handler(req: &Request) -> Option { + if req.uri().path().starts_with(MAP_PULSE_URL_PREFIX) { + Some(Self {}) + } else { + None + } } - pub async fn handle(req: Request, node_config: &NodeConfigCached) -> Result, Error> { + pub async fn handle(&self, req: Request, node_config: &NodeConfigCached) -> Result, Error> { if req.method() != Method::GET { return Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?); } @@ -566,14 +628,57 @@ impl MapPulseHttpFunction { } } +pub struct Api4MapPulseHttpFunction {} + +impl Api4MapPulseHttpFunction { + pub fn handler(req: &Request) -> Option { + if req.uri().path().starts_with(API_4_MAP_PULSE_URL_PREFIX) { + Some(Self {}) + } else { + None + } + } + + pub fn path_matches(path: &str) -> bool { + path.starts_with(API_4_MAP_PULSE_URL_PREFIX) + } + + pub async fn handle(&self, req: Request, node_config: &NodeConfigCached) -> Result, Error> { + if req.method() != Method::GET { + return Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?); + } + info!("Api4MapPulseHttpFunction handle uri: {:?}", req.uri()); + let url = Url::parse(&format!("dummy:{}", req.uri()))?; + let q = MapPulseQuery::from_url(&url)?; + let histo = MapPulseHistoHttpFunction::histo(q.pulse, node_config).await?; + let mut i1 = 0; + let mut max = 0; + for i2 in 0..histo.tss.len() { + if histo.counts[i2] > max { + max = histo.counts[i2]; + i1 = i2; + } + } + if max > 0 { + Ok(response(StatusCode::OK).body(Body::from(serde_json::to_vec(&histo.tss[i1])?))?) + } else { + Ok(response(StatusCode::NO_CONTENT).body(Body::empty())?) + } + } +} + pub struct MarkClosedHttpFunction {} impl MarkClosedHttpFunction { - pub fn path_matches(path: &str) -> bool { - path.starts_with(MAP_PULSE_MARK_CLOSED_URL_PREFIX) + pub fn handler(req: &Request) -> Option { + if req.uri().path().starts_with(MAP_PULSE_MARK_CLOSED_URL_PREFIX) { + Some(Self {}) + } else { + None + } } - pub async fn handle(req: Request, node_config: &NodeConfigCached) -> Result, Error> { + pub async fn handle(&self, req: Request, node_config: &NodeConfigCached) -> Result, Error> { if req.method() != Method::GET { return Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?); } diff --git a/httpret/static/documentation/api4.html b/httpret/static/documentation/api4.html index 03622a6..460460b 100644 --- a/httpret/static/documentation/api4.html +++ b/httpret/static/documentation/api4.html @@ -32,6 +32,7 @@
  • List available backends.
  • More version information about the running service.
  • Search channel.
  • +
  • Map pulse.
  • Query unbinned event data.
  • Query binned data.
  • @@ -100,7 +101,6 @@ curl -H 'Accept: application/json' 'https://data-api.psi.ch/api/4/version'

    Search channel

    Method: GET

    URL: https://data-api.psi.ch/api/4/search/channel

    -

    Request header: "Accept" should be "application/json" for forward compatibility.

    Query parameters: (all optional)

    • backend (e.g. "sf-databuffer", "sls-archive", ... any from list-backends API)
    • @@ -158,6 +158,19 @@ curl -H 'Accept: application/json' 'https://data-api.psi.ch/api/4/search/channel + +

      Map pulse

      +

      Method: GET

      +

      URL: https://data-api.psi.ch/api/4/map/pulse/BACKEND/PULSE

      +

      Request header: "Accept" should be "application/json" for forward-compatibility but can be + omitted for e.g. a quick manual search using CURL.

      + +

      CURL example:

      +
      +curl -H 'Accept: application/json' 'https://data-api.psi.ch/api/4/map/pulse/sf-databuffer/424242'
      +
      + +

      Query event data