From 321b31d6a67484b58956e7575114506fdb874200 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Fri, 11 Jun 2021 15:13:24 +0200 Subject: [PATCH] Serve plain events depending on Accept, update docs --- daqbuffer/src/test/events.rs | 2 +- disk/src/channelexec.rs | 7 +- disk/src/events.rs | 4 +- httpret/src/lib.rs | 31 +++-- httpret/static/documentation/api4.html | 157 +++++++++++++++++-------- 5 files changed, 140 insertions(+), 61 deletions(-) diff --git a/daqbuffer/src/test/events.rs b/daqbuffer/src/test/events.rs index b2e09d5..8309417 100644 --- a/daqbuffer/src/test/events.rs +++ b/daqbuffer/src/test/events.rs @@ -275,7 +275,7 @@ async fn get_plain_events_json( let req = hyper::Request::builder() .method(http::Method::GET) .uri(url) - .header("Accept", "application/octet-stream") + .header("Accept", "application/json") .body(Body::empty())?; let client = hyper::Client::new(); let res = client.request(req).await?; diff --git a/disk/src/channelexec.rs b/disk/src/channelexec.rs index 4a770c8..95d41d8 100644 --- a/disk/src/channelexec.rs +++ b/disk/src/channelexec.rs @@ -321,7 +321,12 @@ impl ChannelExecFunction for PlainEventsJson { ); let f = collect_plain_events_json(s, self.timeout); let f = FutureExt::map(f, |item| match item { - Ok(item) => Ok(Bytes::from(serde_json::to_vec(&item)?)), + Ok(item) => { + // TODO add channel entry info here? + //let obj = item.as_object_mut().unwrap(); + //obj.insert("channelName", JsonValue::String(en)); + Ok(Bytes::from(serde_json::to_vec(&item)?)) + } Err(e) => Err(e.into()), }); let s = futures_util::stream::once(f); diff --git a/disk/src/events.rs b/disk/src/events.rs index 99fbbdc..3a6835b 100644 --- a/disk/src/events.rs +++ b/disk/src/events.rs @@ -71,7 +71,7 @@ impl PlainEventsQuery { pub fn url(&self, host: &HostPort) -> String { let date_fmt = "%Y-%m-%dT%H:%M:%S.%3fZ"; format!( - "http://{}:{}/api/4/plain_events?channelBackend={}&channelName={}&begDate={}&endDate={}&timeout={}", + "http://{}:{}/api/4/events?channelBackend={}&channelName={}&begDate={}&endDate={}&timeout={}", host.host, host.port, self.channel.backend, @@ -150,7 +150,7 @@ impl PlainEventsJsonQuery { pub fn url(&self, host: &HostPort) -> String { let date_fmt = "%Y-%m-%dT%H:%M:%S.%3fZ"; format!( - "http://{}:{}/api/4/alpha_plain_events_json?channelBackend={}&channelName={}&begDate={}&endDate={}&timeout={}", + "http://{}:{}/api/4/events?channelBackend={}&channelName={}&begDate={}&endDate={}&timeout={}", host.host, host.port, self.channel.backend, diff --git a/httpret/src/lib.rs b/httpret/src/lib.rs index 4d3eb4f..e209972 100644 --- a/httpret/src/lib.rs +++ b/httpret/src/lib.rs @@ -2,7 +2,7 @@ use crate::gather::gather_get_json; use bytes::Bytes; use disk::binned::prebinned::pre_binned_bytes_for_http; use disk::binned::query::{BinnedQuery, PreBinnedQuery}; -use disk::events::PlainEventsQuery; +use disk::events::{PlainEventsJsonQuery, PlainEventsQuery}; use disk::raw::conn::events_service; use err::Error; use future::Future; @@ -170,18 +170,12 @@ async fn http_service_try(req: Request, node_config: &NodeConfigCached) -> } else { Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) } - } else if path == "/api/4/plain_events" { + } else if path == "/api/4/events" { if req.method() == Method::GET { Ok(plain_events(req, &node_config).await?) } else { Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) } - } else if path == "/api/4/alpha_plain_events_json" { - if req.method() == Method::GET { - Ok(plain_events_json(req, &node_config).await?) - } else { - Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) - } } else if path.starts_with("/api/4/gather/") { if req.method() == Method::GET { Ok(gather_get_json(req, &node_config).await?) @@ -260,8 +254,8 @@ where { Response::builder() .status(status) - .header("access-control-allow-origin", "*") - .header("access-control-allow-headers", "*") + .header("Access-Control-Allow-Origin", "*") + .header("Access-Control-Allow-Headers", "*") } struct BodyStreamWrap(netpod::BodyStream); @@ -400,6 +394,21 @@ async fn prebinned(req: Request, node_config: &NodeConfigCached) -> Result } async fn plain_events(req: Request, node_config: &NodeConfigCached) -> Result, Error> { + let accept_def = ""; + let accept = req + .headers() + .get("Accept") + .map_or(accept_def, |k| k.to_str().unwrap_or(accept_def)); + if accept == "application/json" { + Ok(plain_events_json(req, node_config).await?) + } else if accept == "application/octet-stream" { + Ok(plain_events_binary(req, node_config).await?) + } else { + Err(Error::with_msg(format!("unexpected Accept: {:?}", accept))) + } +} + +async fn plain_events_binary(req: Request, node_config: &NodeConfigCached) -> Result, Error> { let (head, _body) = req.into_parts(); let query = PlainEventsQuery::from_request(&head)?; let op = disk::channelexec::PlainEvents::new(query.channel().clone(), query.range().clone(), node_config.clone()); @@ -411,7 +420,7 @@ async fn plain_events(req: Request, node_config: &NodeConfigCached) -> Res async fn plain_events_json(req: Request, node_config: &NodeConfigCached) -> Result, Error> { let (head, _body) = req.into_parts(); - let query = PlainEventsQuery::from_request(&head)?; + let query = PlainEventsJsonQuery::from_request(&head)?; let op = disk::channelexec::PlainEventsJson::new( query.channel().clone(), query.range().clone(), diff --git a/httpret/static/documentation/api4.html b/httpret/static/documentation/api4.html index fe04068..96d4ee9 100644 --- a/httpret/static/documentation/api4.html +++ b/httpret/static/documentation/api4.html @@ -26,10 +26,118 @@ Currently available:

API functions

Currently available functionality:

+ + + +

Search channel

+

Method: GET

+

URL: https://data-api.psi.ch/api/4/search/channel

+

Query parameters:

+
    +
  • nameRegex (e.g. "LSCP.*6")
  • +
  • sourceRegex (e.g. "178:9999")
  • +
  • descriptionRegex (e.g. "celsius")
  • +
+

Request header: "Accept" must be "application/json"

+ +

CURL example:

+
+curl -H 'Accept: application/json' 'https://data-api.psi.ch/api/4/search/channel?sourceRegex=CV.E.+37&nameRegex=120.+y2$'
+
+ +

Example response:

+
+{
+  "channels": [
+    {
+      "name": "S10MA01-DBPM120:Y2",
+      "backend": "sf-databuffer",
+      "source": "tcp://S20-CVME-DBPM2371:9000",
+      "type": "Float32",
+      "shape": [],
+      "unit": "",
+      "description": ""
+    },
+    {
+      "name": "S20SY02-DBPM120:Y2",
+      "backend": "sf-databuffer",
+      "source": "tcp://S20-CVME-DBPM2371:9000",
+      "type": "Float32",
+      "shape": [],
+      "unit": "",
+      "description": ""
+    }
+  ]
+}
+
+

The search constraints are AND'd.

+ + + + + +

Query event data

+

Method: GET

+

URL: https://data-api.psi.ch/api/4/events

+

Query parameters:

+
    +
  • channelBackend (e.g. "sf-databuffer")
  • +
  • channelName (e.g. "SLAAR-LSCP4-LAS6891:CH7:1")
  • +
  • begDate (e.g. "2021-05-26T07:10:00.000Z")
  • +
  • endDate (e.g. "2021-05-26T07:16:00.000Z")
  • +
+

Request header: "Accept" must be "application/json"

+ +

CURL example:

+
+curl -H 'Accept: application/json' 'https://data-api.psi.ch/api/4/events?channelBackend=sf-databuffer&channelName=SLAAR-LSCP4-LAS6891:CH7:1&begDate=2021-06-11T07:00:00.000Z&endDate=2021-06-11T07:00:01.000Z'
+
+ +

Timestamp format

+

Javascript can not represent the full 64-bit integer and the databuffer nanosecond timestamps would lose precision. +Therefore, timestamps are represented in the response by ts0 which gives an absolute anchor +in time in units of seconds, and the array tsoff with the offset of each event in microseconds.

+ +

Timeout

+

If the requested range takes too long to retrieve, then the flags timedOut: true will be set.

+ +

Example response:

+
+{
+  "finalisedRange": true,
+  "ts0": 1623394800,
+  "tsoff": [
+    68461150,
+    169461160,
+    269461170,
+    369461180,
+    479461191,
+    579461201,
+    ...
+  ],
+  "values": [
+    [378, 325, 321, 381, ... waveform of 1st event ],
+    [334, 355, 360, 345, ... waveform of 2nd event ],
+    ...
+  ]
+}
+
+ +

Complete result

+

If the result does not contain a continueAt key then the result is complete.

+ +

Finalised range

+

If the server can determine that no more data will be added to the requested time range + then it will add the flag finalisedRange: true to the response.

+ + + +

Query binned data

Method: GET

@@ -57,7 +165,8 @@ starts with the first not-yet-retrieved bin.

This information is provided by the continueAt and missingBins fields.

This enables the user agent to start the presentation to the user while updating the user interface as new bins are received.

-

Example:

+ +

Example response:

 {
   "continueAt": "2021-05-25T16:00:00.000Z",
@@ -104,50 +213,6 @@ as new bins are received.

then it will add the flag finalisedRange: true to the response.

- -

Search channel

-

Method: GET

-

URL: https://data-api.psi.ch/api/4/search/channel

-

Query parameters:

-
    -
  • nameRegex (e.g. "LSCP.*6")
  • -
  • sourceRegex (e.g. "178:9999")
  • -
  • descriptionRegex (e.g. "celsius")
  • -
-

Request header: "Accept" must be "application/json"

- -

CURL example:

-
-curl -H 'Accept: application/json' 'https://data-api.psi.ch/api/4/search/channel?sourceRegex=CV.E.+37&nameRegex=120.+y2$'
-
- -

Example result

-
-{
-  "channels": [
-    {
-      "name": "S10MA01-DBPM120:Y2",
-      "backend": "sf-databuffer",
-      "source": "tcp://S20-CVME-DBPM2371:9000",
-      "type": "Float32",
-      "shape": [],
-      "unit": "",
-      "description": ""
-    },
-    {
-      "name": "S20SY02-DBPM120:Y2",
-      "backend": "sf-databuffer",
-      "source": "tcp://S20-CVME-DBPM2371:9000",
-      "type": "Float32",
-      "shape": [],
-      "unit": "",
-      "description": ""
-    }
-  ]
-}
-
-

The search constraints are AND'd.

-

Feedback and comments very much appreciated!

dominik.werder@psi.ch