diff --git a/disk/src/binned.rs b/disk/src/binned.rs index 3e7c5c1..d4704e7 100644 --- a/disk/src/binned.rs +++ b/disk/src/binned.rs @@ -1,19 +1,20 @@ -use crate::agg::binnedt::{AggregatableTdim, AggregatorTdim}; +use crate::agg::binnedt::{AggregatableTdim, AggregatorTdim, IntoBinnedT}; use crate::agg::scalarbinbatch::{MinMaxAvgScalarBinBatch, MinMaxAvgScalarBinBatchAggregator}; use crate::agg::streams::{Collectable, Collected, StreamItem, ToJsonResult}; use crate::agg::AggregatableXdim1Bin; -use crate::binned::scalar::binned_scalar_stream; +use crate::binned::scalar::{adapter_to_stream_item, binned_stream}; use crate::binnedstream::{BinnedScalarStreamFromPreBinnedPatches, BinnedStream}; -use crate::cache::BinnedQuery; +use crate::cache::{BinnedQuery, MergedFromRemotes}; use crate::channelconfig::{extract_matching_config_entry, read_local_config}; use crate::frame::makeframe::make_frame; +use crate::raw::EventsQuery; use bytes::Bytes; use chrono::{TimeZone, Utc}; use err::Error; use futures_core::Stream; use futures_util::StreamExt; use netpod::log::*; -use netpod::{AggKind, BinnedRange, NodeConfigCached}; +use netpod::{AggKind, BinnedRange, NodeConfigCached, PerfOpts, PreBinnedPatchIterator, PreBinnedPatchRange}; use num_traits::Zero; use serde::{Deserialize, Serialize, Serializer}; use std::pin::Pin; @@ -233,12 +234,12 @@ pub async fn binned_bytes_for_http( info!("binned_bytes_for_http found config entry {:?}", entry); match query.agg_kind() { AggKind::DimXBins1 => { - let res = binned_scalar_stream(node_config, query).await?; + let res = binned_stream(node_config, query, BinnedStreamKindScalar::new()).await?; let ret = BinnedBytesForHttpStream::new(res.binned_stream); Ok(Box::pin(ret)) } AggKind::DimXBinsN(_) => { - let res = binned_scalar_stream(node_config, query).await?; + let res = binned_stream(node_config, query, BinnedStreamKindScalar::new()).await?; let ret = BinnedBytesForHttpStream::new(res.binned_stream); Ok(Box::pin(ret)) } @@ -387,8 +388,78 @@ pub async fn binned_json(node_config: &NodeConfigCached, query: &BinnedQuery) -> // TODO create the matching stream based on AggKind and ConfigEntry. - let t = binned_scalar_stream(node_config, query).await?; + let t = binned_stream(node_config, query, BinnedStreamKindScalar::new()).await?; let collected = collect_all(t.binned_stream, t.range.count as u32).await?; - let ret = collected.to_json_result(); + let ret = collected.to_json_result()?; Ok(serde_json::to_value(ret)?) } + +pub trait BinnedStreamKind { + type BinnedStreamItem: MakeBytesFrame; + type BinnedStreamType: Stream + Send + 'static; + + fn new_binned_from_prebinned( + query: &BinnedQuery, + range: BinnedRange, + pre_range: PreBinnedPatchRange, + node_config: &NodeConfigCached, + ) -> Result; + + fn new_binned_from_merged( + evq: EventsQuery, + perf_opts: PerfOpts, + range: BinnedRange, + node_config: &NodeConfigCached, + ) -> Result; +} + +pub struct BinnedStreamKindScalar {} + +pub struct BinnedStreamKindWave {} + +impl BinnedStreamKindScalar { + pub fn new() -> Self { + Self {} + } +} + +impl BinnedStreamKindWave { + pub fn new() -> Self { + Self {} + } +} + +impl BinnedStreamKind for BinnedStreamKindScalar { + type BinnedStreamItem = Result, Error>; + type BinnedStreamType = BinnedStream; + + fn new_binned_from_prebinned( + query: &BinnedQuery, + range: BinnedRange, + pre_range: PreBinnedPatchRange, + node_config: &NodeConfigCached, + ) -> Result { + let s = BinnedScalarStreamFromPreBinnedPatches::new( + PreBinnedPatchIterator::from_range(pre_range), + query.channel().clone(), + range.clone(), + query.agg_kind().clone(), + query.cache_usage().clone(), + node_config, + query.disk_stats_every().clone(), + )?; + Ok(BinnedStream::new(Box::pin(s))?) + } + + fn new_binned_from_merged( + evq: EventsQuery, + perf_opts: PerfOpts, + range: BinnedRange, + node_config: &NodeConfigCached, + ) -> Result { + let s = MergedFromRemotes::new(evq, perf_opts, node_config.node_config.cluster.clone()) + .into_binned_t(range.clone()) + .map(adapter_to_stream_item); + Ok(BinnedStream::new(Box::pin(s))?) + } +} diff --git a/disk/src/binned/scalar.rs b/disk/src/binned/scalar.rs index d03b828..29335d1 100644 --- a/disk/src/binned/scalar.rs +++ b/disk/src/binned/scalar.rs @@ -1,14 +1,14 @@ -use crate::agg::binnedt::IntoBinnedT; use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatchStreamItem; use crate::agg::streams::StreamItem; -use crate::binned::{BinnedScalarStreamItem, BinnedStreamRes}; -use crate::binnedstream::{BinnedScalarStreamFromPreBinnedPatches, BinnedStream}; -use crate::cache::{BinnedQuery, MergedFromRemotes}; +use crate::binned::{BinnedScalarStreamItem, BinnedStreamKind, BinnedStreamRes}; +use crate::binnedstream::BinnedStream; +use crate::cache::BinnedQuery; use crate::raw::EventsQuery; use err::Error; +use futures_core::Stream; use futures_util::StreamExt; use netpod::log::*; -use netpod::{BinnedRange, NodeConfigCached, PerfOpts, PreBinnedPatchIterator, PreBinnedPatchRange}; +use netpod::{BinnedRange, NodeConfigCached, PerfOpts, PreBinnedPatchRange}; pub fn adapter_to_stream_item( k: Result, Error>, @@ -30,10 +30,14 @@ pub fn adapter_to_stream_item( } } -pub async fn binned_scalar_stream( +pub async fn binned_stream( node_config: &NodeConfigCached, query: &BinnedQuery, -) -> Result, Error>>, Error> { + stream_kind: BK, +) -> Result::Item>, Error> +where + BK: BinnedStreamKind, +{ if query.channel().backend != node_config.node.backend { let err = Error::with_msg(format!( "backend mismatch node: {} requested: {}", @@ -57,16 +61,8 @@ pub async fn binned_scalar_stream( ); return Err(Error::with_msg(msg)); } - let s1 = BinnedScalarStreamFromPreBinnedPatches::new( - PreBinnedPatchIterator::from_range(pre_range), - query.channel().clone(), - range.clone(), - query.agg_kind().clone(), - query.cache_usage().clone(), - node_config, - query.disk_stats_every().clone(), - )?; - let s = BinnedStream::new(Box::pin(s1))?; + let s = BK::new_binned_from_prebinned(query, range.clone(), pre_range, node_config)?; + let s = BinnedStream::new(Box::pin(s))?; let ret = BinnedStreamRes { binned_stream: s, range, @@ -84,9 +80,7 @@ pub async fn binned_scalar_stream( agg_kind: query.agg_kind().clone(), }; // TODO do I need to set up more transformations or binning to deliver the requested data? - let s = MergedFromRemotes::new(evq, perf_opts, node_config.node_config.cluster.clone()); - let s = s.into_binned_t(range.clone()); - let s = s.map(adapter_to_stream_item); + let s = BK::new_binned_from_merged(evq, perf_opts, range.clone(), node_config)?; let s = BinnedStream::new(Box::pin(s))?; let ret = BinnedStreamRes { binned_stream: s, diff --git a/httpret/static/documentation/index.html b/httpret/static/documentation/index.html index 2cd35d3..5badf48 100644 --- a/httpret/static/documentation/index.html +++ b/httpret/static/documentation/index.html @@ -3,11 +3,196 @@ Retrieval Documentation + + +

Retrieval 4.0 Documentation

-

Some docs to be shown here...

+ +

HTTP API documentation

+ +

This API follows the common convention that the addition of a key to a json object is not considered a breaking change.

+ +

Currently available:

+ + + +

Query binned data

+

Method: GET

+

URL: https://data-api.psi.ch/api/4/binned

+

Query parameters:

+
    +
  • channel_backend
  • +
  • channel_name
  • +
  • beg_date
  • +
  • end_date
  • +
  • bin_count
  • +
+

Request header: "Accept" must be "application/json"

+

Example:

+
http://sf-daqbuf-21:8380/api/4/binned?channel_backend=sf-databuffer&channel_name=SLAAR-LSCP4-LAS6891:CH7:1&beg_date=2021-05-21T00:00:00.000Z&end_date=2021-05-21T02:00:00.000Z&bin_count=20
+

Result body example:

+
+[
+    {
+        "backend": "sf-databuffer",
+        "channels": [
+            "SARES20-LSCP9:CH0:2",
+            "SARES20-LSCP9:CH0:1"
+        ]
+    },
+    {
+        "backend": "hipa-archive",
+        "channels": [],
+        "error": {
+            "code": "Error"  // can be: "Error" | "Timeout" (more to be added in the future)
+        }
+    }
+]
+
+ +

CURL example:

+
+curl -H 'Accept: application/json' 'http://sf-daqbuf-21:8380/api/4/binned?channel_backend=sf-databuffer&channel_name=SLAAR-LSCP4-LAS6891:CH7:1&beg_date=2021-05-21T00:00:00.000Z&end_date=2021-05-21T02:00:00.000Z&bin_count=20'
+
+

Answer:

+
+{
+    "counts": [
+        458,
+        459,
+        458,
+        459,
+        459,
+        458,
+        459,
+        458,
+        459,
+        459,
+        458,
+        459,
+        458,
+        459,
+        458,
+        459,
+        459,
+        458,
+        459,
+        458,
+        459,
+        458,
+        459,
+        459
+    ],
+    "ts_bin_edges": [
+        "2021-05-21T00:00:00.000Z",
+        "2021-05-21T00:05:00.000Z",
+        "2021-05-21T00:10:00.000Z",
+        "2021-05-21T00:15:00.000Z",
+        "2021-05-21T00:20:00.000Z",
+        "2021-05-21T00:25:00.000Z",
+        "2021-05-21T00:30:00.000Z",
+        "2021-05-21T00:35:00.000Z",
+        "2021-05-21T00:40:00.000Z",
+        "2021-05-21T00:45:00.000Z",
+        "2021-05-21T00:50:00.000Z",
+        "2021-05-21T00:55:00.000Z",
+        "2021-05-21T01:00:00.000Z",
+        "2021-05-21T01:05:00.000Z",
+        "2021-05-21T01:10:00.000Z",
+        "2021-05-21T01:15:00.000Z",
+        "2021-05-21T01:20:00.000Z",
+        "2021-05-21T01:25:00.000Z",
+        "2021-05-21T01:30:00.000Z",
+        "2021-05-21T01:35:00.000Z",
+        "2021-05-21T01:40:00.000Z",
+        "2021-05-21T01:45:00.000Z",
+        "2021-05-21T01:50:00.000Z",
+        "2021-05-21T01:55:00.000Z",
+        "2021-05-21T02:00:00.000Z"
+    ]
+}
+
+ + + + +

Channel Search, with return of configuration information

+

Method: POST

+

URL: https://data-api.psi.ch/api/1/channels/config

+

Request body: JSON with search parameters

+

Request body outline:

+
+{
+  "regex":             "[Optional: Regular expression to search in channel name]",
+  "sourceRegex":       "[Optional: Search in sourcename of the channel]",
+  "descriptionRegex":  "[Optional: Search in the channel's description]",
+  "backends":          ["gls-archive", "hipa-archive", "sf-databuffer"]
+}
+
+

Result body example:

+

Assuming that "hipa-archive" would be unavailable:

+
+[
+    {
+        "backend": "sf-databuffer",
+        "channels": [
+            {
+                "backend": "sf-databuffer",
+                "description": "",
+                "name": "SARES20-LSCP9:CH0:2",
+                "shape": [
+                    512
+                ],
+                "source": "tcp://SARES20-CVME-01:9999",
+                "type": "Float32",
+                "unit": ""
+            },
+            {
+                "backend": "sf-databuffer",
+                "description": "",
+                "name": "SARES20-LSCP9:CH0:1",
+                "shape": [
+                    512
+                ],
+                "source": "tcp://SARES20-CVME-01:9999",
+                "type": "Int16",
+                "unit": ""
+            }
+        ]
+    },
+    {
+        "backend": "hipa-archive",
+        "channels": [],
+        "error": {
+            "code": "Error"  // can be: "Error" | "Timeout" (more to be added in the future)
+        }
+    }
+]
+
+

Notes:

+

The search constraints are AND'ed together.

+

If some backend responds with an error, that error is indicated by the error key in the affected backend (see example above).

+ + +

CURL example:

+
+QUERY='{ "regex": "LSCP9:CH0", "backends": ["sf-databuffer"] }'
+curl -H 'Content-Type: application/json' -H 'Accept: application/json' -d "$QUERY" https://data-api.psi.ch/api/1/channels/config
+
+ +

Feedback and comments

+

Feedback is very much appreciated:

+

dominik.werder@psi.ch

+

or please assign me a JIRA ticket.

+ + + +