@ -1,5 +1,5 @@
|
||||
#
|
||||
#Wed May 04 13:27:42 CEST 2016
|
||||
#Wed Jun 08 12:47:13 CEST 2016
|
||||
org.eclipse.jdt.core.compiler.debug.localVariable=generate
|
||||
org.eclipse.jdt.core.compiler.compliance=1.8
|
||||
org.eclipse.jdt.core.compiler.codegen.unusedLocal=preserve
|
||||
|
273
Readme.md
273
Readme.md
@ -1,4 +1,3 @@
|
||||
|
||||
# Overview
|
||||
|
||||
This project provides a REST interface to execute queries on the databuffer.
|
||||
@ -61,7 +60,14 @@ POST http://<host>:<port>/channels
|
||||
#### Data
|
||||
|
||||
```json
|
||||
{"regex": "TRFCA|TRFCB","backends": ["sf-databuffer"],"ordering":"asc","reload":true}
|
||||
{
|
||||
"regex":"TRFCA|TRFCB",
|
||||
"backends":[
|
||||
"sf-databuffer"
|
||||
],
|
||||
"ordering":"asc",
|
||||
"reload":true
|
||||
}
|
||||
```
|
||||
|
||||
##### Explanation
|
||||
@ -116,18 +122,47 @@ GET http://<host>:<port>/query
|
||||
|
||||
A request is performed by sending a valid JSON object in the HTTP request body. The JSON query defines the channels to be queried, the range, and how the data should be aggregated (this is optional but highly recommended).
|
||||
|
||||
The following attributes can be specified:
|
||||
#### Data
|
||||
|
||||
```json
|
||||
{
|
||||
"channels":[
|
||||
"Channel_01"
|
||||
],
|
||||
"range":{
|
||||
"startPulseId":0,
|
||||
"endPulseId":3
|
||||
},
|
||||
"ordering":"asc",
|
||||
"fields":[
|
||||
"pulseId",
|
||||
"globalDate",
|
||||
"value"
|
||||
],
|
||||
"aggregation":{
|
||||
"aggregationType":"value",
|
||||
"aggregations":[
|
||||
"min",
|
||||
"mean",
|
||||
"max"
|
||||
],
|
||||
"nrOfBins":2
|
||||
},
|
||||
"response":{
|
||||
"format":"json",
|
||||
"compression":"none"
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
##### Explanation
|
||||
|
||||
- **channels**: Array of channels to be queried (see [here](Readme.md#query_channel_names) and [here](Readme.md#define_channel_names)).
|
||||
- **range**: The range of the query (see [here](Readme.md#query_range)).
|
||||
- **ordering**: The ordering of the data (see [here](https://github.psi.ch/sf_daq/ch.psi.daq.common/blob/master/src/main/java/ch/psi/daq/common/ordering/Ordering.java) for possible values).
|
||||
- **fields**: The requested fields (see [here](https://github.psi.ch/sf_daq/ch.psi.daq.domain/blob/master/src/main/java/ch/psi/daq/domain/query/operation/QueryField.java) for possible values).
|
||||
- **nrOfBins**: Activates data binning. Specifies the number of bins the pulse/time range should be divided into.
|
||||
- **binSize**: Activates data binning. Specifies the number of pulses per bin for pulse-range queries or the number of milliseconds per bin for time-range queries (using number of pulses and number of milliseconds makes this binning strategy consistent between channel with different update frequencies).
|
||||
- **aggregations**: Activates data aggregation. Array of requested aggregations (see [here](https://github.psi.ch/sf_daq/ch.psi.daq.domain/blob/master/src/main/java/ch/psi/daq/domain/query/operation/Aggregation.java) for possible values). These values will be added to the *data* array response.
|
||||
- **aggregationType**: Specifies the type of aggregation (see [here](https://github.psi.ch/sf_daq/ch.psi.daq.domain/blob/master/src/main/java/ch/psi/daq/domain/query/operation/AggregationType.java)). The default type is *value* aggregation (e.g., sum([1,2,3])=6). Alternatively, it is possible to define *index* aggregation for multiple arrays in combination with binning (e.g., sum([1,2,3], [3,2,1]) = [4,4,4]).
|
||||
- **responseFormat**: Specifies the format the response of the requested data (see [here](https://github.psi.ch/sf_daq/ch.psi.daq.domain/blob/master/src/main/java/ch/psi/daq/domain/query/operation/ResponseFormat.java) for possible values).
|
||||
- **compression**: Defines how the response should be compressed (see [here](https://github.psi.ch/sf_daq/ch.psi.daq.domain/blob/master/src/main/java/ch/psi/daq/domain/query/operation/Compression.java) for possible values).
|
||||
- **ordering**: The ordering of the data (see [here](Readme.md#data_ordering)).
|
||||
- **fields**: Array of requested fields (see [here](Readme.md#requested_fields)).
|
||||
- **aggregation**: Setting this attribute activates data aggregation (see [here](Readme.md#data_aggregation) for its specification).
|
||||
- **response**: Specifies the format of the response of the requested data (see [here](Readme.md#response_format)). If this value is not set it defaults to JSON.
|
||||
|
||||
<a name="define_channel_names"/>
|
||||
|
||||
@ -187,7 +222,7 @@ Queries are applied to a range. The following types of ranges are supported.
|
||||
}
|
||||
```
|
||||
|
||||
- **startDate**: The start date of the time range in the ISO8601 format (such as 1997-07-16T19:20:30.123+02:00 or 1997-07-16T19:20:30.123456789+02:00 (omitting +02:00 falls back to the local time zone)).
|
||||
- **startDate**: The start date of the time range in the ISO8601 format (such as 1997-07-16T19:20:30.123+02:00 or 1997-07-16T19:20:30.123456789+02:00 (omitting +02:00 falls back to the server's time zone)).
|
||||
- **endDate**: The end date of the time range.
|
||||
|
||||
|
||||
@ -200,20 +235,79 @@ Queries are applied to a range. The following types of ranges are supported.
|
||||
}
|
||||
```
|
||||
|
||||
- **startSeconds**: The start time of the range in seconds since January 1, 1970 (the UNIX epoch) as a decimal value including fractional seconds.
|
||||
- **startSeconds**: The start time of the range in seconds since midnight, January 1, 1970 UTC (the UNIX epoch) as a decimal value including fractional seconds.
|
||||
- **endSeconds**: The end time of the range in seconds.
|
||||
|
||||
|
||||
<a name="data_ordering"/>
|
||||
|
||||
### Data Ordering
|
||||
|
||||
```json
|
||||
"ordering":"asc"
|
||||
```
|
||||
|
||||
- **ordering**: Defines the ordering of the requested data (values: **asc**|desc|none). Use *none* in case ordering does not matter (allows for server side optimizations).
|
||||
|
||||
|
||||
<a name="requested_fields"/>
|
||||
|
||||
### Requested Fields
|
||||
|
||||
```json
|
||||
"fields":[
|
||||
"pulseId",
|
||||
"globalDate",
|
||||
"value"
|
||||
]
|
||||
```
|
||||
|
||||
- **fields**: Array of requested fields (see [here](https://github.psi.ch/sf_daq/ch.psi.daq.domain/blob/master/src/main/java/ch/psi/daq/domain/query/operation/QueryField.java) for possible values).
|
||||
|
||||
It is possible to request the time in seconds (since midnight, January 1, 1970 UTC (the UNIX epoch) as a decimal value including fractional seconds - using fields *globalSeconds* and *iocSeconds*), in milliseconds (since midnight, January 1, 1970 UTC (the JAVA epoch) - using fields *globalMillis* and *iocMillis*) or as a ISO8601 formatted String - using fields *globalDate* and *iocDate* (such as 1997-07-16T19:20:30.123456789+02:00).
|
||||
|
||||
|
||||
<a name="data_aggregation"/>
|
||||
|
||||
### Data Aggregation
|
||||
|
||||
It is possible (and recommended) to aggregate queried data.
|
||||
|
||||
```json
|
||||
"aggregation":{
|
||||
"aggregationType":"value",
|
||||
"aggregations":[
|
||||
"min",
|
||||
"mean",
|
||||
"max"
|
||||
],
|
||||
"nrOfBins":2
|
||||
}
|
||||
```
|
||||
|
||||
- **aggregationType**: Specifies the type of aggregation (see [here](https://github.psi.ch/sf_daq/ch.psi.daq.domain/blob/master/src/main/java/ch/psi/daq/domain/query/operation/AggregationType.java)). The default type is *value* aggregation (e.g., sum([1,2,3])=6). Alternatively, it is possible to define *index* aggregation for multiple arrays in combination with binning (e.g., sum([1,2,3], [3,2,1]) = [4,4,4]).
|
||||
- **aggregations**: Array of requested aggregations (see [here](https://github.psi.ch/sf_daq/ch.psi.daq.domain/blob/master/src/main/java/ch/psi/daq/domain/query/operation/Aggregation.java) for possible values). These values will be added to the *data* array response.
|
||||
- **extrema**: Array of requested extrema (see [here](https://github.psi.ch/sf_daq/ch.psi.daq.domain/blob/master/src/main/java/ch/psi/daq/domain/query/operation/Extrema.java) for possible values). These values will be added to the *data* array response.
|
||||
- **nrOfBins**: Activates data binning. Specifies the number of bins the pulse/time range should be divided into.
|
||||
- **durationPerBin**: Activates data binning. Specifies the duration per bin for time-range queries (using duration makes this binning strategy consistent between channel with different update frequencies). The duration is defined as a [ISO-8601](https://en.wikipedia.org/wiki/ISO_8601#Durations) duration (e.g., `PT1H` for 1 hour, `PT2S` for 2 seconds, `PT0.05S` for 50 milliseconds etc.). The resolution is in milliseconds and thus the minimal duration is 1 millisecond.
|
||||
- **pulsesPerBin**: Activates data binning. Specifies the number of pulses per bin for pulse-range queries (using number of pulses makes this binning strategy consistent between channel with different update frequencies).
|
||||
|
||||
|
||||
<a name="response_format"/>
|
||||
|
||||
### Response Format
|
||||
|
||||
The format of the response can be defined through the field `responseFormat` (values: **json**|csv). Please note that CSV does not support `index` and `extrema` aggregations.
|
||||
It is possible to specify the response format the queried data should have.
|
||||
|
||||
```json
|
||||
"response":{
|
||||
"format":"json",
|
||||
"compression":"none"
|
||||
}
|
||||
```
|
||||
|
||||
### Response Compression
|
||||
|
||||
Responses can be compressed when transferred from the server by setting the field `compression` (values: **none**|gzip|deflate).
|
||||
|
||||
If compression is enabled, you have to tell `curl` that the data is compressed by defining the attribute `--compressed` so that it decompresses the data automatically.
|
||||
- **format**: The format of the response (values: **json**|csv). Please note that `csv` does not support `index` and `extrema` aggregations.
|
||||
- **compression**: Responses can be compressed when transferred from the server (values: **none**|gzip). If compression is enabled, you have to tell `curl` that the data is compressed by defining the attribute `--compressed` so that it decompresses the data automatically.
|
||||
|
||||
|
||||
### Example Queries
|
||||
@ -336,7 +430,7 @@ See JSON representation of the data above.
|
||||
}
|
||||
```
|
||||
|
||||
The supported date format is ISO8601 (such as 1997-07-16T19:20:30.123+02:00 or 1997-07-16T19:20:30.123456789+02:00 (omitting +02:00 falls back to the local time zone)).
|
||||
The supported date format is ISO8601 (such as 1997-07-16T19:20:30.123+02:00 or 1997-07-16T19:20:30.123456789+02:00 (omitting +02:00 falls back to the server's time zone)).
|
||||
|
||||
##### Command
|
||||
|
||||
@ -389,22 +483,9 @@ See JSON representation of the data above.
|
||||
|
||||
```json
|
||||
{
|
||||
"compression":"gzip",
|
||||
"range":{
|
||||
"startPulseId":0,
|
||||
"endPulseId":3
|
||||
},
|
||||
"channels":[
|
||||
"Channel_01"
|
||||
]
|
||||
}
|
||||
```
|
||||
|
||||
or `deflate` can be used too:
|
||||
|
||||
```json
|
||||
{
|
||||
"compression":"deflate",
|
||||
"response":{
|
||||
"compression":"gzip"
|
||||
},
|
||||
"range":{
|
||||
"startPulseId":0,
|
||||
"endPulseId":3
|
||||
@ -420,7 +501,7 @@ or `deflate` can be used too:
|
||||
The `curl` command has a `--compressed` option to decompress data automatically.
|
||||
|
||||
```bash
|
||||
curl --compressed -H "Content-Type: application/json" -X POST -d '{"compression":"gzip","range":{"startPulseId":0,"endPulseId":3},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query | python -m json.tool
|
||||
curl --compressed -H "Content-Type: application/json" -X POST -d '{"response":{"compression":"gzip"},"range":{"startPulseId":0,"endPulseId":3},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query | python -m json.tool
|
||||
```
|
||||
|
||||
#### Querying for Specific Fields
|
||||
@ -478,7 +559,9 @@ curl -H "Content-Type: application/json" -X POST -d '{"fields":["pulseId","valu
|
||||
|
||||
```json
|
||||
{
|
||||
"responseFormat":"csv",
|
||||
"response":{
|
||||
"format":"csv"
|
||||
},
|
||||
"range":{
|
||||
"startPulseId":0,
|
||||
"endPulseId":4
|
||||
@ -499,12 +582,10 @@ curl -H "Content-Type: application/json" -X POST -d '{"fields":["pulseId","valu
|
||||
}
|
||||
```
|
||||
|
||||
It is possible to request the time in seconds (since January 1, 1970 (the UNIX epoch) as a decimal value including fractional seconds - using fields *globalSeconds* and *iocSeconds*), in milliseconds (since January 1, 1970 (the JAVA epoch) - using fields *globalMillis* and *iocMillis*) or as a ISO8601 formatted String - using fields *globalDate* and *iocDate* (such as 1997-07-16T19:20:30.123456789+02:00).
|
||||
|
||||
##### Command
|
||||
|
||||
```bash
|
||||
curl -H "Content-Type: application/json" -X POST -d '{"responseFormat":"csv","range":{"startPulseId":0,"endPulseId":4},"channels":["channel1","channel2"],"fields":["channel","pulseId","iocSeconds","globalSeconds","shape","eventCount","value"]}' http://data-api.psi.ch/sf/query
|
||||
curl -H "Content-Type: application/json" -X POST -d '{"response":{"format":"csv"},"range":{"startPulseId":0,"endPulseId":4},"channels":["channel1","channel2"],"fields":["channel","pulseId","iocSeconds","globalSeconds","shape","eventCount","value"]}' http://data-api.psi.ch/sf/query
|
||||
```
|
||||
|
||||
##### Response
|
||||
@ -584,8 +665,10 @@ curl -H "Content-Type: application/json" -X POST -d '{"ordering":"desc","fields
|
||||
|
||||
```json
|
||||
{
|
||||
"aggregationType":"value",
|
||||
"aggregations":["min","max","mean"],
|
||||
"aggregation":{
|
||||
"aggregationType":"value",
|
||||
"aggregations":["min","mean","max"]
|
||||
},
|
||||
"fields":["pulseId","value"],
|
||||
"range":{
|
||||
"startPulseId":0,
|
||||
@ -600,7 +683,7 @@ curl -H "Content-Type: application/json" -X POST -d '{"ordering":"desc","fields
|
||||
##### Command
|
||||
|
||||
```bash
|
||||
curl -H "Content-Type: application/json" -X POST -d '{"aggregationType":"value","aggregations":["min","max","mean"],"fields":["pulseId","value"],"range":{"startPulseId":0,"endPulseId":3},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query | python -m json.tool
|
||||
curl -H "Content-Type: application/json" -X POST -d '{"aggregation":{"aggregationType":"value","aggregations":["min","mean","max"]},"fields":["pulseId","value"],"range":{"startPulseId":0,"endPulseId":3},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query | python -m json.tool
|
||||
```
|
||||
|
||||
##### Response
|
||||
@ -657,9 +740,11 @@ Illustration of array value aggregation:
|
||||
|
||||
```json
|
||||
{
|
||||
"nrOfBins":2,
|
||||
"aggregationType":"value",
|
||||
"aggregations":["min","max","mean"],
|
||||
"aggregation":{
|
||||
"nrOfBins":2,
|
||||
"aggregationType":"value",
|
||||
"aggregations":["min","mean","max"]
|
||||
},
|
||||
"fields":["pulseId","value"],
|
||||
"range":{
|
||||
"startPulseId":0,
|
||||
@ -674,7 +759,7 @@ Illustration of array value aggregation:
|
||||
##### Command
|
||||
|
||||
```bash
|
||||
curl -H "Content-Type: application/json" -X POST -d '{"nrOfBins":2,"aggregationType":"value","aggregations":["min","max","mean"],"fields":["pulseId","value"],"range":{"startPulseId":0,"endPulseId":3},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query | python -m json.tool
|
||||
curl -H "Content-Type: application/json" -X POST -d '{"aggregation":{"nrOfBins":2,"aggregationType":"value","aggregations":["min","mean","max"]},"fields":["pulseId","value"],"range":{"startPulseId":0,"endPulseId":3},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query | python -m json.tool
|
||||
```
|
||||
|
||||
##### Response
|
||||
@ -710,15 +795,19 @@ Illustration of array value aggregation with additional binning:
|
||||
|
||||

|
||||
|
||||
#### Value Aggregation with Binning (binSize)
|
||||
#### Value Aggregation with Binning (durationPerBin/pulsesPerBin)
|
||||
|
||||
**binSize** specifies the number of pulses per bin for pulse-range queries or the number of milliseconds per bin for time-range queries (using number of pulses and number of milliseconds makes this binning strategy consistent between channel with different update frequencies).
|
||||
**durationPerBin** specifies the duration per bin for time-range queries (using duration makes this binning strategy consistent between channel with different update frequencies). The duration is defined as a [ISO-8601](https://en.wikipedia.org/wiki/ISO_8601#Durations) duration (e.g., `PT1H` for 1 hour, `PT2S` for 2 seconds, `PT0.05S` for 50 milliseconds etc.). The resolution is in milliseconds and thus the minimal duration is 1 millisecond.
|
||||
|
||||
**pulsesPerBin** specifies the number of pulses per bin for pulse-range queries (using number of pulses makes this binning strategy consistent between channel with different update frequencies).
|
||||
|
||||
```json
|
||||
{
|
||||
"binSize":10,
|
||||
"aggregationType":"value",
|
||||
"aggregations":["min","max","mean"],
|
||||
"aggregation":{
|
||||
"pulsesPerBin":2,
|
||||
"aggregationType":"value",
|
||||
"aggregations":["min","mean","max"]
|
||||
},
|
||||
"fields":["globalMillis","value"],
|
||||
"range":{
|
||||
"startSeconds":"0.0",
|
||||
@ -733,7 +822,7 @@ Illustration of array value aggregation with additional binning:
|
||||
##### Command
|
||||
|
||||
```bash
|
||||
curl -H "Content-Type: application/json" -X POST -d '{"binSize":10,"aggregationType":"value","aggregations":["min","max","mean"],"fields":["globalMillis","value"],"range":{"startSeconds":"0.0","endSeconds":"0.030000000"},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query | python -m json.tool
|
||||
curl -H "Content-Type: application/json" -X POST -d '{"aggregation":{"pulsesPerBin":2,"aggregationType":"value","aggregations":["min","mean","max"]},"fields":["globalMillis","value"],"range":{"startSeconds":"0.0","endSeconds":"0.030000000"},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query | python -m json.tool
|
||||
```
|
||||
|
||||
##### Response
|
||||
@ -773,9 +862,11 @@ Illustration of array value aggregation with additional binning:
|
||||
|
||||
```json
|
||||
{
|
||||
"nrOfBins":1,
|
||||
"aggregationType":"index",
|
||||
"aggregations":["min","max","mean","sum"],
|
||||
"aggregation":{
|
||||
"pulsesPerBin":1,
|
||||
"aggregationType":"index",
|
||||
"aggregations":["min","mean","max","sum"]
|
||||
},
|
||||
"fields":["pulseId","value"],
|
||||
"range":{
|
||||
"startPulseId":0,
|
||||
@ -790,7 +881,7 @@ Illustration of array value aggregation with additional binning:
|
||||
##### Command
|
||||
|
||||
```bash
|
||||
curl -H "Content-Type: application/json" -X POST -d '{"nrOfBins":1,"aggregationType":"index","aggregations":["min","max","mean","sum"],"fields":["pulseId","value"],"range":{"startPulseId":0,"endPulseId":3},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query | python -m json.tool
|
||||
curl -H "Content-Type: application/json" -X POST -d '{"aggregation":{"nrOfBins":1,"aggregationType":"index","aggregations":["min","max","mean","sum"]},"fields":["pulseId","value"],"range":{"startPulseId":0,"endPulseId":3},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query | python -m json.tool
|
||||
```
|
||||
|
||||
##### Response
|
||||
@ -838,72 +929,6 @@ Illustration of array index aggregation with additional with binning (several nr
|
||||
|
||||

|
||||
|
||||
|
||||
#### Extrema Search
|
||||
|
||||
```json
|
||||
{
|
||||
"aggregationType":"extrema",
|
||||
"aggregations":["min","max","sum"],
|
||||
"fields":["pulseId","value"],
|
||||
"range":{
|
||||
"startPulseId":0,
|
||||
"endPulseId":3
|
||||
},
|
||||
"channels":[
|
||||
"Channel_01"
|
||||
]
|
||||
}
|
||||
```
|
||||
|
||||
##### Command
|
||||
|
||||
```bash
|
||||
curl -H "Content-Type: application/json" -X POST -d '{"aggregationType":"extrema","aggregations":["min","max","sum"],"fields":["pulseId","value"],"range":{"startPulseId":0,"endPulseId":3},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query | python -m json.tool
|
||||
```
|
||||
|
||||
##### Response
|
||||
|
||||
```json
|
||||
[
|
||||
{
|
||||
"channel":"Channel_01",
|
||||
"data":{
|
||||
"minima":{
|
||||
"min":{
|
||||
"value":1.0,
|
||||
"event":{
|
||||
"pulseId":0,
|
||||
"value":[1,2,3,4]
|
||||
}
|
||||
},
|
||||
"sum":{
|
||||
"value":10.0,
|
||||
"event":{
|
||||
"pulseId":0,
|
||||
"value":[1,2,3,4]
|
||||
}
|
||||
}
|
||||
},
|
||||
"maxima":{
|
||||
"max":{
|
||||
"value":7.0,
|
||||
"event":{
|
||||
"pulseId":3,
|
||||
"value":[4,5,6,7]
|
||||
}
|
||||
},
|
||||
"sum":{
|
||||
"value":22.0,
|
||||
"event":{
|
||||
"pulseId":3,
|
||||
"value":[4,5,6,7]
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
]
|
||||
```
|
||||
|
||||
<a name="query_channel_status"/>
|
||||
|
@ -24,23 +24,28 @@ import org.springframework.util.StringUtils;
|
||||
import org.springframework.validation.Validator;
|
||||
import org.springframework.web.servlet.config.annotation.WebMvcConfigurerAdapter;
|
||||
|
||||
import ch.psi.daq.common.statistic.StorelessStatistics;
|
||||
import com.fasterxml.jackson.annotation.JsonInclude.Include;
|
||||
import com.fasterxml.jackson.core.JsonFactory;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
import ch.psi.daq.common.statistic.Statistics;
|
||||
import ch.psi.daq.domain.DataEvent;
|
||||
import ch.psi.daq.domain.query.operation.Aggregation;
|
||||
import ch.psi.daq.domain.query.operation.QueryField;
|
||||
import ch.psi.daq.domain.query.operation.Response;
|
||||
import ch.psi.daq.domain.query.operation.aggregation.extrema.AbstractExtremaMeta;
|
||||
import ch.psi.daq.query.analyzer.QueryAnalyzer;
|
||||
import ch.psi.daq.query.analyzer.QueryAnalyzerImpl;
|
||||
import ch.psi.daq.query.config.QueryConfig;
|
||||
import ch.psi.daq.query.model.Query;
|
||||
import ch.psi.daq.queryrest.controller.validator.QueryValidator;
|
||||
import ch.psi.daq.queryrest.model.PropertyFilterMixin;
|
||||
import ch.psi.daq.queryrest.query.QueryManager;
|
||||
import ch.psi.daq.queryrest.query.QueryManagerImpl;
|
||||
import ch.psi.daq.queryrest.response.PolymorphicResponseMixIn;
|
||||
import ch.psi.daq.queryrest.response.csv.CSVResponseStreamWriter;
|
||||
import ch.psi.daq.queryrest.response.json.JSONResponseStreamWriter;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonInclude.Include;
|
||||
import com.fasterxml.jackson.core.JsonFactory;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
@Configuration
|
||||
@Import(value=QueryRestConfigCORS.class)
|
||||
@PropertySource(value = {"classpath:queryrest.properties"})
|
||||
@ -84,8 +89,11 @@ public class QueryRestConfig extends WebMvcConfigurerAdapter {
|
||||
// Mixin which is used dynamically to filter out which properties get serialised and which
|
||||
// won't. This way, the user can specify which columns are to be received.
|
||||
objectMapper.addMixIn(DataEvent.class, PropertyFilterMixin.class);
|
||||
objectMapper.addMixIn(StorelessStatistics.class, PropertyFilterMixin.class);
|
||||
objectMapper.addMixIn(Statistics.class, PropertyFilterMixin.class);
|
||||
objectMapper.addMixIn(AbstractExtremaMeta.class, PropertyFilterMixin.class);
|
||||
objectMapper.addMixIn(EnumMap.class, PropertyFilterMixin.class);
|
||||
|
||||
objectMapper.addMixIn(Response.class, PolymorphicResponseMixIn.class);
|
||||
}
|
||||
|
||||
|
||||
@ -123,6 +131,11 @@ public class QueryRestConfig extends WebMvcConfigurerAdapter {
|
||||
public CSVResponseStreamWriter csvResponseStreamWriter() {
|
||||
return new CSVResponseStreamWriter();
|
||||
}
|
||||
|
||||
@Bean
|
||||
public QueryManager queryManager(){
|
||||
return new QueryManagerImpl();
|
||||
}
|
||||
|
||||
@Bean(name = BEAN_NAME_DEFAULT_RESPONSE_FIELDS)
|
||||
public Set<QueryField> defaultResponseFields() {
|
||||
|
@ -1,17 +1,13 @@
|
||||
package ch.psi.daq.queryrest.controller;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
import javax.annotation.PreDestroy;
|
||||
@ -19,9 +15,6 @@ import javax.annotation.Resource;
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
import javax.validation.Valid;
|
||||
|
||||
import org.apache.commons.lang3.ArrayUtils;
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
import org.apache.commons.lang3.tuple.Triple;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.context.ApplicationContext;
|
||||
@ -38,21 +31,18 @@ import org.springframework.web.bind.annotation.ResponseBody;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
|
||||
import ch.psi.daq.cassandra.config.CassandraConfig;
|
||||
import ch.psi.daq.common.json.deserialize.AttributeBasedDeserializer;
|
||||
import ch.psi.daq.common.ordering.Ordering;
|
||||
import ch.psi.daq.domain.DataEvent;
|
||||
import ch.psi.daq.domain.FieldNames;
|
||||
import ch.psi.daq.domain.config.DomainConfig;
|
||||
import ch.psi.daq.domain.json.ChannelName;
|
||||
import ch.psi.daq.domain.json.status.channel.ChannelStatus;
|
||||
import ch.psi.daq.domain.query.DAQQueries;
|
||||
import ch.psi.daq.domain.query.DAQQuery;
|
||||
import ch.psi.daq.domain.query.DAQQueryElement;
|
||||
import ch.psi.daq.domain.query.channels.ChannelsRequest;
|
||||
import ch.psi.daq.domain.query.channels.ChannelsResponse;
|
||||
import ch.psi.daq.domain.query.operation.Aggregation;
|
||||
import ch.psi.daq.domain.query.operation.AggregationType;
|
||||
import ch.psi.daq.domain.query.operation.QueryField;
|
||||
import ch.psi.daq.domain.query.operation.Response;
|
||||
import ch.psi.daq.domain.query.operation.ResponseFormat;
|
||||
import ch.psi.daq.domain.query.status.channel.ChannelStatusQuery;
|
||||
import ch.psi.daq.domain.reader.Backend;
|
||||
@ -61,12 +51,13 @@ import ch.psi.daq.domain.status.StatusReader;
|
||||
import ch.psi.daq.query.analyzer.QueryAnalyzer;
|
||||
import ch.psi.daq.query.config.QueryConfig;
|
||||
import ch.psi.daq.query.model.Query;
|
||||
import ch.psi.daq.query.model.impl.BackendQuery;
|
||||
import ch.psi.daq.query.processor.ChannelNameCache;
|
||||
import ch.psi.daq.query.processor.QueryProcessor;
|
||||
import ch.psi.daq.queryrest.config.QueryRestConfig;
|
||||
import ch.psi.daq.queryrest.response.csv.CSVResponseStreamWriter;
|
||||
import ch.psi.daq.queryrest.response.json.JSONResponseStreamWriter;
|
||||
import ch.psi.daq.queryrest.query.QueryManager;
|
||||
import ch.psi.daq.queryrest.response.AbstractHTTPResponse;
|
||||
import ch.psi.daq.queryrest.response.PolymorphicResponseMixIn;
|
||||
import ch.psi.daq.queryrest.response.json.JSONHTTPResponse;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.collect.Lists;
|
||||
@ -81,22 +72,21 @@ public class QueryRestController {
|
||||
public static final String PATH_QUERIES = DomainConfig.PATH_QUERIES;
|
||||
public static final String PATH_STATUS_CHANNELS = DomainConfig.PATH_STATUS_CHANNELS;
|
||||
|
||||
@Resource
|
||||
private ApplicationContext appContext;
|
||||
|
||||
@Resource
|
||||
private Validator queryValidator;
|
||||
private Validator requestProviderValidator = new RequestProviderValidator();
|
||||
|
||||
@Resource
|
||||
private JSONResponseStreamWriter jsonResponseStreamWriter;
|
||||
|
||||
@Resource
|
||||
private CSVResponseStreamWriter csvResponseStreamWriter;
|
||||
|
||||
@Resource
|
||||
private ApplicationContext appContext;
|
||||
private QueryManager queryManager;
|
||||
|
||||
@Resource
|
||||
private ObjectMapper objectMapper;
|
||||
|
||||
private Response defaultResponse = new JSONHTTPResponse();
|
||||
|
||||
@Resource
|
||||
private Function<Query, QueryAnalyzer> queryAnalizerFactory;
|
||||
|
||||
@ -199,12 +189,7 @@ public class QueryRestController {
|
||||
produces = {MediaType.APPLICATION_JSON_VALUE})
|
||||
public @ResponseBody List<ChannelsResponse> getChannels(@RequestBody(required = false) ChannelsRequest request)
|
||||
throws Throwable {
|
||||
// in case not specified use defaults (e.g. GET)
|
||||
if (request == null) {
|
||||
request = new ChannelsRequest();
|
||||
}
|
||||
|
||||
return channelNameCache.getChannels(request);
|
||||
return queryManager.getChannels(request);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -330,102 +315,24 @@ public class QueryRestController {
|
||||
try {
|
||||
LOGGER.debug("Executing queries '{}'", queries);
|
||||
|
||||
if (ResponseFormat.JSON.equals(queries.getResponseFormat())) {
|
||||
// write the response back to the client using java 8 streams
|
||||
jsonResponseStreamWriter.respond(executeQueries(queries), queries, res);
|
||||
} else if (ResponseFormat.CSV.equals(queries.getResponseFormat())) {
|
||||
// it's a CSV request
|
||||
executeQueriesCsv(queries, res);
|
||||
Response response = queries.getResponseOrDefault(defaultResponse);
|
||||
if (response instanceof AbstractHTTPResponse) {
|
||||
((AbstractHTTPResponse) response).respond(appContext, queries, res);
|
||||
} else {
|
||||
String message = String.format("Unsupported response format '%s'", queries.getResponseFormat().name());
|
||||
String message =
|
||||
String.format(
|
||||
"Expecting Response of type '%s' but received '%s'. Check JSON deserialization defined in '%s'",
|
||||
AbstractHTTPResponse.class.getName(), response.getClass().getName(),
|
||||
PolymorphicResponseMixIn.class.getName());
|
||||
LOGGER.error(message);
|
||||
throw new RuntimeException(message);
|
||||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("Failed to execute query '{}'.", queries, e);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns data in CSV format to the client.
|
||||
*/
|
||||
private void executeQueriesCsv(DAQQueries queries, HttpServletResponse res) throws Exception {
|
||||
|
||||
for (DAQQueryElement query : queries) {
|
||||
if (!(query.getAggregationType() == null || AggregationType.value.equals(query.getAggregationType()))) {
|
||||
// We allow only no aggregation or value aggregation as
|
||||
// extrema: nested structure and not clear how to map it to one line
|
||||
// index: value is an array of Statistics whose size is not clear at initialization time
|
||||
String message = "CSV export does not support '" + query.getAggregationType() + "'";
|
||||
LOGGER.warn(message);
|
||||
throw new IllegalArgumentException(message);
|
||||
}
|
||||
|
||||
|
||||
if (!ArrayUtils.contains(query.getColumns(), FieldNames.FIELD_GLOBAL_TIME)) {
|
||||
query.addField(QueryField.globalMillis);
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
LOGGER.debug("Executing query '{}'", queries);
|
||||
// write the response back to the client using java 8 streams
|
||||
csvResponseStreamWriter.respond(executeQueries(queries), queries, res);
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("Failed to execute query '{}'.", queries, e);
|
||||
throw e;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public List<Entry<DAQQueryElement, Stream<Triple<BackendQuery, ChannelName, ?>>>> executeQueries(DAQQueries queries) {
|
||||
// set backends if not defined yet
|
||||
channelNameCache.setBackends(queries);
|
||||
|
||||
List<Entry<DAQQueryElement, Stream<Triple<BackendQuery, ChannelName, ?>>>> results =
|
||||
new ArrayList<>(queries.getQueries().size());
|
||||
|
||||
for (DAQQueryElement queryElement : queries) {
|
||||
Stream<Triple<BackendQuery, ChannelName, ?>> resultStreams =
|
||||
BackendQuery
|
||||
.getBackendQueries(queryElement)
|
||||
.stream()
|
||||
.filter(query -> {
|
||||
QueryProcessor processor = queryProcessors.get(query.getBackend());
|
||||
if (processor != null) {
|
||||
return true;
|
||||
} else {
|
||||
LOGGER.warn("There is no QueryProcessor available for '{}'", query.getBackend());
|
||||
return false;
|
||||
}
|
||||
})
|
||||
.flatMap(query -> {
|
||||
QueryProcessor processor = queryProcessors.get(query.getBackend());
|
||||
QueryAnalyzer queryAnalizer = queryAnalizerFactory.apply(query);
|
||||
|
||||
// all the magic happens here
|
||||
Stream<Entry<ChannelName, Stream<? extends DataEvent>>> channelToDataEvents =
|
||||
processor.process(queryAnalizer);
|
||||
// do post-process
|
||||
Stream<Entry<ChannelName, ?>> channelToData = queryAnalizer.postProcess(channelToDataEvents);
|
||||
|
||||
return channelToData.map(entry -> {
|
||||
return Triple.of(query, entry.getKey(), entry.getValue());
|
||||
});
|
||||
});
|
||||
|
||||
// Now we have a stream that loads elements sequential BackendQuery by BackendQuery.
|
||||
// By materializing the outer Stream the elements of all BackendQuery are loaded async
|
||||
// (speeds things up but requires also more memory - i.e. it relies on Backends not loading
|
||||
// all elements into memory at once)
|
||||
resultStreams = resultStreams.collect(Collectors.toList()).stream();
|
||||
|
||||
results.add(Pair.of(queryElement, resultStreams));
|
||||
}
|
||||
|
||||
return results;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -14,6 +14,7 @@ import ch.psi.daq.domain.query.DAQQuery;
|
||||
import ch.psi.daq.domain.query.DAQQueryElement;
|
||||
import ch.psi.daq.domain.query.operation.Aggregation;
|
||||
import ch.psi.daq.domain.query.operation.QueryField;
|
||||
import ch.psi.daq.domain.request.Request;
|
||||
import ch.psi.daq.queryrest.config.QueryRestConfig;
|
||||
|
||||
public class QueryValidator implements Validator {
|
||||
@ -38,23 +39,46 @@ public class QueryValidator implements Validator {
|
||||
@Override
|
||||
public void validate(Object target, Errors errors) {
|
||||
if (target instanceof DAQQuery) {
|
||||
this.checkElement((DAQQuery) target);
|
||||
}else if(target instanceof DAQQueries){
|
||||
this.checkElement((DAQQuery) target, errors);
|
||||
} else if (target instanceof DAQQueries) {
|
||||
DAQQueries queries = (DAQQueries) target;
|
||||
for (DAQQueryElement daqQueryElement : queries) {
|
||||
this.checkElement(daqQueryElement);
|
||||
this.checkElement(daqQueryElement, errors);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void checkElement(DAQQueryElement query) {
|
||||
private void checkElement(DAQQueryElement query, Errors errors) {
|
||||
// set default values (if not set)
|
||||
if (query.getFields() == null || query.getFields().isEmpty()) {
|
||||
query.setFields(new LinkedHashSet<>(defaultResponseFields));
|
||||
}
|
||||
|
||||
if (query.getAggregations() == null || query.getAggregations().isEmpty()) {
|
||||
query.setAggregations(new ArrayList<>(defaultResponseAggregations));
|
||||
if (query.getAggregation() != null) {
|
||||
// check if only one binning element is defined
|
||||
long durationPerBin = query.getAggregation().getDurationPerBin();
|
||||
long pulsesPerBin = query.getAggregation().getPulsesPerBin();
|
||||
int nrOfBins = query.getAggregation().getNrOfBins();
|
||||
if ((durationPerBin != Request.NOT_SET && (pulsesPerBin != Request.NOT_SET || nrOfBins != Request.NOT_SET))
|
||||
|| (pulsesPerBin != Request.NOT_SET && (durationPerBin != Request.NOT_SET || nrOfBins != Request.NOT_SET))
|
||||
|| (nrOfBins != Request.NOT_SET && (durationPerBin != Request.NOT_SET || pulsesPerBin != Request.NOT_SET))) {
|
||||
errors.reject("durationPerBin", "Only one binning element must be defined.");
|
||||
errors.reject("pulsesPerBin", "Only one binning element must be defined.");
|
||||
errors.reject("nrOfBins", "Only one binning element must be defined.");
|
||||
}
|
||||
|
||||
if (query.getRange().isPulseIdRangeDefined() && durationPerBin != Request.NOT_SET) {
|
||||
errors.reject("durationPerBin", "Pulse range queries only support pulse based binning.");
|
||||
}
|
||||
if (query.getRange().isTimeRangeDefined() && pulsesPerBin != Request.NOT_SET) {
|
||||
errors.reject("pulsesPerBin", "Time range queries only support time based binning.");
|
||||
}
|
||||
|
||||
|
||||
// set default values (if not set)
|
||||
if (query.getAggregation().getAggregations() == null || query.getAggregation().getAggregations().isEmpty()) {
|
||||
query.getAggregation().setAggregations(new ArrayList<>(defaultResponseAggregations));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
22
src/main/java/ch/psi/daq/queryrest/query/QueryManager.java
Normal file
22
src/main/java/ch/psi/daq/queryrest/query/QueryManager.java
Normal file
@ -0,0 +1,22 @@
|
||||
package ch.psi.daq.queryrest.query;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import org.apache.commons.lang3.tuple.Triple;
|
||||
|
||||
import ch.psi.daq.domain.json.ChannelName;
|
||||
import ch.psi.daq.domain.query.DAQQueries;
|
||||
import ch.psi.daq.domain.query.DAQQueryElement;
|
||||
import ch.psi.daq.domain.query.channels.ChannelsRequest;
|
||||
import ch.psi.daq.domain.query.channels.ChannelsResponse;
|
||||
import ch.psi.daq.query.model.impl.BackendQuery;
|
||||
|
||||
public interface QueryManager {
|
||||
|
||||
List<ChannelsResponse> getChannels(ChannelsRequest request) throws Exception;
|
||||
|
||||
List<Entry<DAQQueryElement, Stream<Triple<BackendQuery, ChannelName, ?>>>> executeQueries(DAQQueries queries)
|
||||
throws Exception;
|
||||
}
|
157
src/main/java/ch/psi/daq/queryrest/query/QueryManagerImpl.java
Normal file
157
src/main/java/ch/psi/daq/queryrest/query/QueryManagerImpl.java
Normal file
@ -0,0 +1,157 @@
|
||||
package ch.psi.daq.queryrest.query;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
import javax.annotation.PreDestroy;
|
||||
import javax.annotation.Resource;
|
||||
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
import org.apache.commons.lang3.tuple.Triple;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.context.ApplicationContext;
|
||||
|
||||
import ch.psi.daq.cassandra.config.CassandraConfig;
|
||||
import ch.psi.daq.domain.DataEvent;
|
||||
import ch.psi.daq.domain.json.ChannelName;
|
||||
import ch.psi.daq.domain.query.DAQQueries;
|
||||
import ch.psi.daq.domain.query.DAQQueryElement;
|
||||
import ch.psi.daq.domain.query.channels.ChannelsRequest;
|
||||
import ch.psi.daq.domain.query.channels.ChannelsResponse;
|
||||
import ch.psi.daq.domain.reader.Backend;
|
||||
import ch.psi.daq.query.analyzer.QueryAnalyzer;
|
||||
import ch.psi.daq.query.config.QueryConfig;
|
||||
import ch.psi.daq.query.model.Query;
|
||||
import ch.psi.daq.query.model.impl.BackendQuery;
|
||||
import ch.psi.daq.query.processor.ChannelNameCache;
|
||||
import ch.psi.daq.query.processor.QueryProcessor;
|
||||
|
||||
public class QueryManagerImpl implements QueryManager {
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(QueryManagerImpl.class);
|
||||
|
||||
@Resource
|
||||
private ApplicationContext appContext;
|
||||
|
||||
@Resource
|
||||
private Function<Query, QueryAnalyzer> queryAnalizerFactory;
|
||||
|
||||
private Map<Backend, QueryProcessor> queryProcessors = new LinkedHashMap<>();
|
||||
private ChannelNameCache channelNameCache;
|
||||
|
||||
@PostConstruct
|
||||
public void afterPropertiesSet() {
|
||||
List<Exception> exceptions = new ArrayList<>();
|
||||
|
||||
try {
|
||||
QueryProcessor queryProcessor =
|
||||
appContext.getBean(QueryConfig.BEAN_NAME_CASSANDRA_QUERY_PROCESSOR, QueryProcessor.class);
|
||||
queryProcessors.put(queryProcessor.getBackend(), queryProcessor);
|
||||
} catch (Exception e) {
|
||||
exceptions.add(e);
|
||||
LOGGER.warn("");
|
||||
LOGGER.warn("##########");
|
||||
LOGGER.warn("Could not load query processor for cassandra.");
|
||||
LOGGER.warn("##########");
|
||||
LOGGER.warn("");
|
||||
}
|
||||
|
||||
try {
|
||||
QueryProcessor queryProcessor =
|
||||
appContext.getBean(QueryConfig.BEAN_NAME_ARCHIVER_APPLIANCE_QUERY_PROCESSOR, QueryProcessor.class);
|
||||
queryProcessors.put(queryProcessor.getBackend(), queryProcessor);
|
||||
} catch (Exception e) {
|
||||
exceptions.add(e);
|
||||
LOGGER.warn("");
|
||||
LOGGER.warn("##########");
|
||||
LOGGER.warn("Could not load query processor for archiverappliance.");
|
||||
LOGGER.warn("##########");
|
||||
LOGGER.warn("");
|
||||
}
|
||||
|
||||
if (queryProcessors.isEmpty()) {
|
||||
LOGGER.error("No query processor could be loaded! Exceptions were: ");
|
||||
for (Exception exception : exceptions) {
|
||||
LOGGER.error("", exception);
|
||||
}
|
||||
|
||||
throw new RuntimeException("No Backends available!");
|
||||
}
|
||||
|
||||
channelNameCache =
|
||||
new ChannelNameCache(queryProcessors, appContext.getBean(CassandraConfig.BEAN_NAME_READ_TIMEOUT,
|
||||
Integer.class).longValue());
|
||||
}
|
||||
|
||||
@PreDestroy
|
||||
public void destroy() {
|
||||
channelNameCache.destroy();
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ChannelsResponse> getChannels(ChannelsRequest request) {
|
||||
// in case not specified use defaults (e.g. GET)
|
||||
if (request == null) {
|
||||
request = new ChannelsRequest();
|
||||
}
|
||||
|
||||
return channelNameCache.getChannels(request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Entry<DAQQueryElement, Stream<Triple<BackendQuery, ChannelName, ?>>>> executeQueries(DAQQueries queries) {
|
||||
// set backends if not defined yet
|
||||
channelNameCache.setBackends(queries);
|
||||
|
||||
List<Entry<DAQQueryElement, Stream<Triple<BackendQuery, ChannelName, ?>>>> results =
|
||||
new ArrayList<>(queries.getQueries().size());
|
||||
|
||||
for (DAQQueryElement queryElement : queries) {
|
||||
Stream<Triple<BackendQuery, ChannelName, ?>> resultStreams =
|
||||
BackendQuery
|
||||
.getBackendQueries(queryElement)
|
||||
.stream()
|
||||
.filter(query -> {
|
||||
QueryProcessor processor = queryProcessors.get(query.getBackend());
|
||||
if (processor != null) {
|
||||
return true;
|
||||
} else {
|
||||
LOGGER.warn("There is no QueryProcessor available for '{}'", query.getBackend());
|
||||
return false;
|
||||
}
|
||||
})
|
||||
.flatMap(query -> {
|
||||
QueryProcessor processor = queryProcessors.get(query.getBackend());
|
||||
QueryAnalyzer queryAnalizer = queryAnalizerFactory.apply(query);
|
||||
|
||||
// all the magic happens here
|
||||
Stream<Entry<ChannelName, Stream<? extends DataEvent>>> channelToDataEvents =
|
||||
processor.process(queryAnalizer);
|
||||
// do post-process
|
||||
Stream<Entry<ChannelName, ?>> channelToData = queryAnalizer.postProcess(channelToDataEvents);
|
||||
|
||||
return channelToData.map(entry -> {
|
||||
return Triple.of(query, entry.getKey(), entry.getValue());
|
||||
});
|
||||
});
|
||||
|
||||
// Now we have a stream that loads elements sequential BackendQuery by BackendQuery.
|
||||
// By materializing the outer Stream the elements of all BackendQuery are loaded async
|
||||
// (speeds things up but requires also more memory - i.e. it relies on Backends not loading
|
||||
// all elements into memory at once)
|
||||
resultStreams = resultStreams.collect(Collectors.toList()).stream();
|
||||
|
||||
results.add(Pair.of(queryElement, resultStreams));
|
||||
}
|
||||
|
||||
return results;
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,59 @@
|
||||
package ch.psi.daq.queryrest.response;
|
||||
|
||||
import java.io.OutputStream;
|
||||
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
|
||||
import org.springframework.context.ApplicationContext;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonIgnore;
|
||||
import com.fasterxml.jackson.core.JsonEncoding;
|
||||
|
||||
import ch.psi.daq.domain.query.DAQQueries;
|
||||
import ch.psi.daq.domain.query.operation.ResponseFormat;
|
||||
import ch.psi.daq.domain.query.operation.ResponseImpl;
|
||||
|
||||
public abstract class AbstractHTTPResponse extends ResponseImpl {
|
||||
|
||||
public AbstractHTTPResponse(ResponseFormat format) {
|
||||
super(format);
|
||||
}
|
||||
|
||||
@JsonIgnore
|
||||
public abstract void respond(ApplicationContext context, DAQQueries queries, HttpServletResponse httpResponse) throws Exception;
|
||||
|
||||
/**
|
||||
* Configures the output stream and headers according to whether compression is wanted or not.
|
||||
* <p>
|
||||
* In order not to lose the information of the underlying type of data being transferred, the
|
||||
* Content-Type header stays the same but, if compressed, the content-encoding header will be set
|
||||
* accordingly.
|
||||
*
|
||||
* see http://tools.ietf.org/html/rfc2616#section-14.11 and see
|
||||
* http://tools.ietf.org/html/rfc2616#section-3.5
|
||||
*
|
||||
* @param httpResponse The HttpServletResponse
|
||||
* @param contentType The content type
|
||||
* @return OutputStream The OutputStream
|
||||
* @throws Exception Something goes wrong
|
||||
*/
|
||||
@JsonIgnore
|
||||
protected OutputStream handleCompressionAndResponseHeaders(HttpServletResponse httpResponse,
|
||||
String contentType) throws Exception {
|
||||
OutputStream out = httpResponse.getOutputStream();
|
||||
|
||||
httpResponse.setCharacterEncoding(JsonEncoding.UTF8.getJavaName());
|
||||
httpResponse.setContentType(contentType);
|
||||
|
||||
httpResponse.addHeader("Content-Type", contentType);
|
||||
String filename = "data." + this.getFileSuffix();
|
||||
httpResponse.addHeader("Content-Disposition", "attachment; filename=" + filename);
|
||||
|
||||
if (this.isCompressed()) {
|
||||
httpResponse.addHeader("Content-Encoding", this.getCompression().toString());
|
||||
out = this.getCompression().wrapStream(out);
|
||||
}
|
||||
|
||||
return out;
|
||||
}
|
||||
}
|
@ -1,54 +0,0 @@
|
||||
/**
|
||||
*
|
||||
*/
|
||||
package ch.psi.daq.queryrest.response;
|
||||
|
||||
import java.io.OutputStream;
|
||||
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
|
||||
import org.springframework.http.MediaType;
|
||||
|
||||
import ch.psi.daq.domain.query.operation.ResponseFormat;
|
||||
import ch.psi.daq.domain.query.operation.ResponseOptions;
|
||||
|
||||
public abstract class AbstractResponseStreamWriter implements ResponseStreamWriter {
|
||||
|
||||
public static final String CONTENT_TYPE_CSV = "text/csv";
|
||||
protected static final String CONTENT_TYPE_JSON = MediaType.APPLICATION_JSON_VALUE;
|
||||
|
||||
/**
|
||||
* Configures the output stream and headers according to whether compression is wanted or not.
|
||||
* <p>
|
||||
* In order not to lose the information of the underlying type of data being transferred, the
|
||||
* Content-Type header stays the same but, if compressed, the content-encoding header will be set
|
||||
* accordingly.
|
||||
*
|
||||
* see http://tools.ietf.org/html/rfc2616#section-14.11 and
|
||||
* see http://tools.ietf.org/html/rfc2616#section-3.5
|
||||
*
|
||||
* @param options The options for the response
|
||||
* @param response The HttpServletResponse
|
||||
* @param contentType The content type
|
||||
* @return OutputStream The OutputStream
|
||||
* @throws Exception Something goes wrong
|
||||
*/
|
||||
protected OutputStream handleCompressionAndResponseHeaders(ResponseOptions options, HttpServletResponse response,
|
||||
String contentType) throws Exception {
|
||||
OutputStream out = response.getOutputStream();
|
||||
|
||||
response.addHeader("Content-Type", contentType);
|
||||
if (options.isCompressed()) {
|
||||
String filename = "data." + options.getCompression().getFileSuffix();
|
||||
response.addHeader("Content-Disposition", "attachment; filename=" + filename);
|
||||
response.addHeader("Content-Encoding", options.getCompression().toString());
|
||||
out = options.getCompression().wrapStream(out);
|
||||
} else {
|
||||
String filename = "data." + (options.getResponseFormat() == ResponseFormat.CSV ? "csv" : "json");
|
||||
response.addHeader("Content-Disposition", "attachment; filename=" + filename);
|
||||
}
|
||||
|
||||
return out;
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,20 @@
|
||||
package ch.psi.daq.queryrest.response;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonSubTypes;
|
||||
import com.fasterxml.jackson.annotation.JsonSubTypes.Type;
|
||||
import com.fasterxml.jackson.annotation.JsonTypeInfo;
|
||||
|
||||
import ch.psi.daq.queryrest.response.csv.CSVHTTPResponse;
|
||||
import ch.psi.daq.queryrest.response.json.JSONHTTPResponse;
|
||||
|
||||
@JsonTypeInfo(
|
||||
use = JsonTypeInfo.Id.NAME,
|
||||
include = JsonTypeInfo.As.EXISTING_PROPERTY,
|
||||
property = "format")
|
||||
@JsonSubTypes({
|
||||
@Type(value = JSONHTTPResponse.class, name = JSONHTTPResponse.FORMAT),
|
||||
@Type(value = CSVHTTPResponse.class, name = CSVHTTPResponse.FORMAT)
|
||||
})
|
||||
// see: http://stackoverflow.com/questions/24631923/alternative-to-jackson-jsonsubtypes
|
||||
public abstract class PolymorphicResponseMixIn {
|
||||
}
|
@ -1,17 +1,16 @@
|
||||
package ch.psi.daq.queryrest.response;
|
||||
|
||||
import java.io.OutputStream;
|
||||
import java.util.List;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import javax.servlet.ServletResponse;
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
|
||||
import org.apache.commons.lang3.tuple.Triple;
|
||||
|
||||
import ch.psi.daq.domain.json.ChannelName;
|
||||
import ch.psi.daq.domain.query.DAQQueryElement;
|
||||
import ch.psi.daq.domain.query.operation.ResponseOptions;
|
||||
import ch.psi.daq.query.model.impl.BackendQuery;
|
||||
|
||||
public interface ResponseStreamWriter {
|
||||
@ -21,10 +20,8 @@ public interface ResponseStreamWriter {
|
||||
* {@link ServletResponse}.
|
||||
*
|
||||
* @param results The results results
|
||||
* @param options The options for the response
|
||||
* @param response {@link ServletResponse} instance given by the current HTTP request
|
||||
* @param out The OutputStream
|
||||
* @throws Exception thrown if writing to the output stream fails
|
||||
*/
|
||||
public void respond(List<Entry<DAQQueryElement, Stream<Triple<BackendQuery, ChannelName, ?>>>> results, ResponseOptions options,
|
||||
HttpServletResponse response) throws Exception;
|
||||
public void respond(List<Entry<DAQQueryElement, Stream<Triple<BackendQuery, ChannelName, ?>>>> results, OutputStream out) throws Exception;
|
||||
}
|
||||
|
@ -0,0 +1,85 @@
|
||||
package ch.psi.daq.queryrest.response.csv;
|
||||
|
||||
import java.io.OutputStream;
|
||||
import java.util.List;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
|
||||
import org.apache.commons.lang3.tuple.Triple;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.context.ApplicationContext;
|
||||
|
||||
import com.hazelcast.util.collection.ArrayUtils;
|
||||
|
||||
import ch.psi.daq.domain.FieldNames;
|
||||
import ch.psi.daq.domain.json.ChannelName;
|
||||
import ch.psi.daq.domain.query.DAQQueries;
|
||||
import ch.psi.daq.domain.query.DAQQueryElement;
|
||||
import ch.psi.daq.domain.query.operation.AggregationType;
|
||||
import ch.psi.daq.domain.query.operation.Compression;
|
||||
import ch.psi.daq.domain.query.operation.QueryField;
|
||||
import ch.psi.daq.domain.query.operation.ResponseFormat;
|
||||
import ch.psi.daq.query.model.impl.BackendQuery;
|
||||
import ch.psi.daq.queryrest.query.QueryManager;
|
||||
import ch.psi.daq.queryrest.response.AbstractHTTPResponse;
|
||||
|
||||
public class CSVHTTPResponse extends AbstractHTTPResponse {
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(CSVHTTPResponse.class);
|
||||
|
||||
public static final String FORMAT = "csv";
|
||||
public static final String CONTENT_TYPE = "text/csv";
|
||||
|
||||
public CSVHTTPResponse() {
|
||||
super(ResponseFormat.CSV);
|
||||
}
|
||||
|
||||
public CSVHTTPResponse(Compression compression) {
|
||||
this();
|
||||
setCompression(compression);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void respond(ApplicationContext context, DAQQueries queries, HttpServletResponse httpResponse) throws Exception {
|
||||
OutputStream out = handleCompressionAndResponseHeaders(httpResponse, CONTENT_TYPE);
|
||||
|
||||
// do csv specific validations
|
||||
validateQueries(queries);
|
||||
|
||||
try {
|
||||
LOGGER.debug("Executing query '{}'", queries);
|
||||
|
||||
QueryManager queryManager = context.getBean(QueryManager.class);
|
||||
CSVResponseStreamWriter streamWriter = context.getBean(CSVResponseStreamWriter.class);
|
||||
|
||||
// execute query
|
||||
List<Entry<DAQQueryElement, Stream<Triple<BackendQuery, ChannelName, ?>>>> result =
|
||||
queryManager.executeQueries(queries);
|
||||
// write the response back to the client using java 8 streams
|
||||
streamWriter.respond(result, out);
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("Failed to execute query '{}'.", queries, e);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
protected void validateQueries(DAQQueries queries) {
|
||||
for (DAQQueryElement query : queries) {
|
||||
if (!(query.getAggregation() == null || AggregationType.value.equals(query.getAggregation().getAggregationType()))) {
|
||||
// We allow only no aggregation or value aggregation as
|
||||
// extrema: nested structure and not clear how to map it to one line
|
||||
// index: value is an array of Statistics whose size is not clear at initialization time
|
||||
String message = "CSV export does not support '" + query.getAggregation().getAggregationType() + "'";
|
||||
LOGGER.warn(message);
|
||||
throw new IllegalArgumentException(message);
|
||||
}
|
||||
|
||||
|
||||
if (!ArrayUtils.contains(query.getColumns(), FieldNames.FIELD_GLOBAL_TIME)) {
|
||||
query.addField(QueryField.globalMillis);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -21,7 +21,6 @@ import java.util.stream.Stream;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import javax.servlet.ServletResponse;
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
|
||||
import org.apache.commons.csv.CSVFormat;
|
||||
import org.apache.commons.csv.CSVPrinter;
|
||||
@ -30,58 +29,45 @@ import org.apache.commons.lang3.tuple.Triple;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonEncoding;
|
||||
|
||||
import ch.psi.daq.common.stream.StreamIterable;
|
||||
import ch.psi.daq.common.stream.StreamMatcher;
|
||||
import ch.psi.daq.domain.DataEvent;
|
||||
import ch.psi.daq.domain.json.ChannelName;
|
||||
import ch.psi.daq.domain.query.DAQQueryElement;
|
||||
import ch.psi.daq.domain.query.operation.Aggregation;
|
||||
import ch.psi.daq.domain.query.operation.Extrema;
|
||||
import ch.psi.daq.domain.query.operation.QueryField;
|
||||
import ch.psi.daq.domain.query.operation.ResponseOptions;
|
||||
import ch.psi.daq.query.analyzer.QueryAnalyzer;
|
||||
import ch.psi.daq.query.model.Query;
|
||||
import ch.psi.daq.query.model.impl.BackendQuery;
|
||||
import ch.psi.daq.queryrest.response.AbstractResponseStreamWriter;
|
||||
import ch.psi.daq.queryrest.response.ResponseStreamWriter;
|
||||
|
||||
/**
|
||||
* Takes a Java 8 stream and writes it to the output stream provided by the {@link ServletResponse}
|
||||
* of the current request.
|
||||
*/
|
||||
public class CSVResponseStreamWriter extends AbstractResponseStreamWriter {
|
||||
public class CSVResponseStreamWriter implements ResponseStreamWriter {
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(CSVResponseStreamWriter.class);
|
||||
|
||||
public static final char DELIMITER_CVS = ';';
|
||||
public static final String DELIMITER_ARRAY = ",";
|
||||
public static final char DELIMITER_CHANNELNAME_FIELDNAME = '.';
|
||||
public static final String EMPTY_VALUE = "";
|
||||
public static final String FIELDNAME_EXTREMA = "extrema";
|
||||
private static final Function<Pair<ChannelName, DataEvent>, ChannelName> KEY_PROVIDER = (pair) -> pair.getKey();
|
||||
// try to match sync data (bsread) with non sync data (epics) based on the time usin 10 millis
|
||||
// buckets.
|
||||
private static final ToLongFunction<Pair<ChannelName, DataEvent>> MATCHER_PROVIDER = (pair) -> pair.getValue()
|
||||
.getGlobalMillis() / 10L;
|
||||
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(CSVResponseStreamWriter.class);
|
||||
|
||||
@Resource
|
||||
private Function<Query, QueryAnalyzer> queryAnalizerFactory;
|
||||
|
||||
@Override
|
||||
public void respond(List<Entry<DAQQueryElement, Stream<Triple<BackendQuery, ChannelName, ?>>>> results,
|
||||
ResponseOptions options,
|
||||
HttpServletResponse response) throws Exception {
|
||||
response.setCharacterEncoding(JsonEncoding.UTF8.getJavaName());
|
||||
response.setContentType(CONTENT_TYPE_CSV);
|
||||
|
||||
respondInternal(results, options, response);
|
||||
}
|
||||
|
||||
private void respondInternal(List<Entry<DAQQueryElement, Stream<Triple<BackendQuery, ChannelName, ?>>>> results,
|
||||
ResponseOptions options, HttpServletResponse response) throws Exception {
|
||||
public void respond(final List<Entry<DAQQueryElement, Stream<Triple<BackendQuery, ChannelName, ?>>>> results,
|
||||
final OutputStream out) throws Exception {
|
||||
AtomicReference<Exception> exception = new AtomicReference<>();
|
||||
|
||||
final OutputStream out = handleCompressionAndResponseHeaders(options, response, CONTENT_TYPE_CSV);
|
||||
|
||||
final Map<ChannelName, Stream<Pair<ChannelName, DataEvent>>> streams = new LinkedHashMap<>(results.size());
|
||||
final List<String> header = new ArrayList<>();
|
||||
final Collection<Pair<ChannelName, Function<DataEvent, String>>> accessors = new ArrayList<>();
|
||||
@ -168,25 +154,60 @@ public class CSVResponseStreamWriter extends AbstractResponseStreamWriter {
|
||||
private void setupChannelColumns(DAQQueryElement daqQuery, BackendQuery backendQuery, ChannelName channelName,
|
||||
Collection<String> header, Collection<Pair<ChannelName, Function<DataEvent, String>>> accessors) {
|
||||
Set<QueryField> queryFields = daqQuery.getFields();
|
||||
List<Aggregation> aggregations = daqQuery.getAggregations();
|
||||
List<Aggregation> aggregations =
|
||||
daqQuery.getAggregation() != null ? daqQuery.getAggregation().getAggregations() : null;
|
||||
List<Extrema> extrema = daqQuery.getAggregation() != null ? daqQuery.getAggregation().getExtrema() : null;
|
||||
|
||||
QueryAnalyzer queryAnalyzer = queryAnalizerFactory.apply(backendQuery);
|
||||
|
||||
for (QueryField field : queryFields) {
|
||||
if (!(QueryField.value.equals(field) && queryAnalyzer.isAggregationEnabled())) {
|
||||
header.add(channelName.getName() + DELIMITER_CHANNELNAME_FIELDNAME + field.name());
|
||||
StringBuilder buf = new StringBuilder(3)
|
||||
.append(channelName.getName())
|
||||
.append(DELIMITER_CHANNELNAME_FIELDNAME)
|
||||
.append(field.name());
|
||||
|
||||
header.add(buf.toString());
|
||||
accessors.add(Pair.of(channelName, new QueryFieldStringifyer(field.getAccessor(), EMPTY_VALUE,
|
||||
DELIMITER_ARRAY)));
|
||||
}
|
||||
}
|
||||
|
||||
if (aggregations != null && queryAnalyzer.isAggregationEnabled()) {
|
||||
for (Aggregation aggregation : daqQuery.getAggregations()) {
|
||||
header.add(channelName.getName() + DELIMITER_CHANNELNAME_FIELDNAME + QueryField.value.name()
|
||||
+ DELIMITER_CHANNELNAME_FIELDNAME + aggregation.name());
|
||||
for (Aggregation aggregation : aggregations) {
|
||||
StringBuilder buf = new StringBuilder(5)
|
||||
.append(channelName.getName())
|
||||
.append(DELIMITER_CHANNELNAME_FIELDNAME)
|
||||
.append(QueryField.value.name())
|
||||
.append(DELIMITER_CHANNELNAME_FIELDNAME)
|
||||
.append(aggregation.name());
|
||||
|
||||
header.add(buf.toString());
|
||||
accessors.add(Pair.of(channelName, new AggregationStringifyer(aggregation.getAccessor(), EMPTY_VALUE)));
|
||||
}
|
||||
}
|
||||
|
||||
if (extrema != null && queryAnalyzer.isAggregationEnabled()) {
|
||||
for (Extrema extremum : extrema) {
|
||||
for (QueryField field : queryFields) {
|
||||
Function<DataEvent, Object> accessor = extremum.getAccessor(field);
|
||||
if (accessor != null) {
|
||||
StringBuilder buf = new StringBuilder(7)
|
||||
.append(channelName.getName())
|
||||
.append(DELIMITER_CHANNELNAME_FIELDNAME)
|
||||
.append(FIELDNAME_EXTREMA)
|
||||
.append(DELIMITER_CHANNELNAME_FIELDNAME)
|
||||
.append(extremum.name())
|
||||
.append(DELIMITER_CHANNELNAME_FIELDNAME)
|
||||
.append(field.name());
|
||||
|
||||
header.add(buf.toString());
|
||||
accessors
|
||||
.add(Pair.of(channelName, new QueryFieldStringifyer(accessor, EMPTY_VALUE,
|
||||
DELIMITER_ARRAY)));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,60 @@
|
||||
package ch.psi.daq.queryrest.response.json;
|
||||
|
||||
import java.io.OutputStream;
|
||||
import java.util.List;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
|
||||
import org.apache.commons.lang3.tuple.Triple;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.context.ApplicationContext;
|
||||
import org.springframework.http.MediaType;
|
||||
|
||||
import ch.psi.daq.domain.json.ChannelName;
|
||||
import ch.psi.daq.domain.query.DAQQueries;
|
||||
import ch.psi.daq.domain.query.DAQQueryElement;
|
||||
import ch.psi.daq.domain.query.operation.Compression;
|
||||
import ch.psi.daq.domain.query.operation.ResponseFormat;
|
||||
import ch.psi.daq.query.model.impl.BackendQuery;
|
||||
import ch.psi.daq.queryrest.query.QueryManager;
|
||||
import ch.psi.daq.queryrest.response.AbstractHTTPResponse;
|
||||
|
||||
public class JSONHTTPResponse extends AbstractHTTPResponse {
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(JSONHTTPResponse.class);
|
||||
|
||||
public static final String FORMAT = "json";
|
||||
public static final String CONTENT_TYPE = MediaType.APPLICATION_JSON_VALUE;
|
||||
|
||||
public JSONHTTPResponse() {
|
||||
super(ResponseFormat.JSON);
|
||||
}
|
||||
|
||||
public JSONHTTPResponse(Compression compression) {
|
||||
this();
|
||||
setCompression(compression);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void respond(ApplicationContext context, DAQQueries queries, HttpServletResponse response) throws Exception {
|
||||
OutputStream out = handleCompressionAndResponseHeaders(response, CONTENT_TYPE);
|
||||
|
||||
try {
|
||||
LOGGER.debug("Executing query '{}'", queries);
|
||||
|
||||
QueryManager queryManager = context.getBean(QueryManager.class);
|
||||
JSONResponseStreamWriter streamWriter = context.getBean(JSONResponseStreamWriter.class);
|
||||
|
||||
// execute query
|
||||
List<Entry<DAQQueryElement, Stream<Triple<BackendQuery, ChannelName, ?>>>> result = queryManager.executeQueries(queries);
|
||||
// write the response back to the client using java 8 streams
|
||||
streamWriter.respond(result, out);
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("Failed to execute query '{}'.", queries, e);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
@ -10,12 +10,10 @@ import java.util.stream.Stream;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import javax.servlet.ServletResponse;
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
|
||||
import org.apache.commons.lang3.tuple.Triple;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.http.MediaType;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonEncoding;
|
||||
import com.fasterxml.jackson.core.JsonFactory;
|
||||
@ -28,16 +26,16 @@ import com.fasterxml.jackson.databind.ser.impl.SimpleFilterProvider;
|
||||
import ch.psi.daq.domain.json.ChannelName;
|
||||
import ch.psi.daq.domain.query.DAQQueryElement;
|
||||
import ch.psi.daq.domain.query.operation.Aggregation;
|
||||
import ch.psi.daq.domain.query.operation.Extrema;
|
||||
import ch.psi.daq.domain.query.operation.QueryField;
|
||||
import ch.psi.daq.domain.query.operation.ResponseOptions;
|
||||
import ch.psi.daq.query.model.impl.BackendQuery;
|
||||
import ch.psi.daq.queryrest.response.AbstractResponseStreamWriter;
|
||||
import ch.psi.daq.queryrest.response.ResponseStreamWriter;
|
||||
|
||||
/**
|
||||
* Takes a Java 8 stream and writes it to the output stream provided by the {@link ServletResponse}
|
||||
* of the current request.
|
||||
*/
|
||||
public class JSONResponseStreamWriter extends AbstractResponseStreamWriter {
|
||||
public class JSONResponseStreamWriter implements ResponseStreamWriter {
|
||||
|
||||
private static final String DATA_RESP_FIELD = "data";
|
||||
|
||||
@ -49,20 +47,10 @@ public class JSONResponseStreamWriter extends AbstractResponseStreamWriter {
|
||||
@Resource
|
||||
private ObjectMapper mapper;
|
||||
|
||||
|
||||
@Override
|
||||
public void respond(List<Entry<DAQQueryElement, Stream<Triple<BackendQuery, ChannelName, ?>>>> results,
|
||||
ResponseOptions options, HttpServletResponse response) throws Exception {
|
||||
response.setCharacterEncoding(JsonEncoding.UTF8.getJavaName());
|
||||
response.setContentType(MediaType.APPLICATION_JSON_VALUE);
|
||||
|
||||
respondInternal(results, options, response);
|
||||
}
|
||||
|
||||
private void respondInternal(List<Entry<DAQQueryElement, Stream<Triple<BackendQuery, ChannelName, ?>>>> results,
|
||||
ResponseOptions options, HttpServletResponse response) throws Exception {
|
||||
OutputStream out) throws Exception {
|
||||
AtomicReference<Exception> exception = new AtomicReference<>();
|
||||
OutputStream out = handleCompressionAndResponseHeaders(options, response, CONTENT_TYPE_JSON);
|
||||
JsonGenerator generator = jsonFactory.createGenerator(out, JsonEncoding.UTF8);
|
||||
|
||||
try {
|
||||
@ -92,7 +80,8 @@ public class JSONResponseStreamWriter extends AbstractResponseStreamWriter {
|
||||
writer.writeValue(generator, triple.getRight());
|
||||
generator.writeEndObject();
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("Could not write channel name of channel '{}'", triple.getMiddle(), e);
|
||||
LOGGER.error("Could not write channel name of channel '{}'", triple.getMiddle(),
|
||||
e);
|
||||
exception.compareAndSet(null, e);
|
||||
}
|
||||
});
|
||||
@ -119,19 +108,26 @@ public class JSONResponseStreamWriter extends AbstractResponseStreamWriter {
|
||||
|
||||
protected Set<String> getFields(DAQQueryElement query) {
|
||||
Set<QueryField> queryFields = query.getFields();
|
||||
List<Aggregation> aggregations = query.getAggregations();
|
||||
List<Aggregation> aggregations = query.getAggregation() != null ? query.getAggregation().getAggregations() : null;
|
||||
List<Extrema> extrema = query.getAggregation() != null ? query.getAggregation().getExtrema() : null;
|
||||
|
||||
Set<String> includedFields =
|
||||
new LinkedHashSet<String>(queryFields.size() + (aggregations != null ? aggregations.size() : 0));
|
||||
new LinkedHashSet<String>(queryFields.size() + (aggregations != null ? aggregations.size() : 0)
|
||||
+ (extrema != null ? extrema.size() : 0));
|
||||
|
||||
for (QueryField field : queryFields) {
|
||||
includedFields.add(field.name());
|
||||
}
|
||||
if (aggregations != null) {
|
||||
for (Aggregation aggregation : query.getAggregations()) {
|
||||
for (Aggregation aggregation : aggregations) {
|
||||
includedFields.add(aggregation.name());
|
||||
}
|
||||
}
|
||||
if (extrema != null) {
|
||||
// field of ExtremaCalculator (extrema in BinnedValueCombinedDataEvent and
|
||||
// BinnedIndexCombinedDataEvent)
|
||||
includedFields.add("extrema");
|
||||
}
|
||||
|
||||
// do not write channel since it is already provided as key in mapping
|
||||
includedFields.remove(QueryField.channel.name());
|
||||
|
@ -29,15 +29,17 @@ import ch.psi.daq.domain.query.DAQQueries;
|
||||
import ch.psi.daq.domain.query.DAQQuery;
|
||||
import ch.psi.daq.domain.query.DAQQueryElement;
|
||||
import ch.psi.daq.domain.query.operation.Aggregation;
|
||||
import ch.psi.daq.domain.query.operation.AggregationDescriptor;
|
||||
import ch.psi.daq.domain.query.operation.AggregationType;
|
||||
import ch.psi.daq.domain.query.operation.Compression;
|
||||
import ch.psi.daq.domain.query.operation.Extrema;
|
||||
import ch.psi.daq.domain.query.operation.QueryField;
|
||||
import ch.psi.daq.domain.query.operation.ResponseFormat;
|
||||
import ch.psi.daq.domain.request.range.RequestRangeDate;
|
||||
import ch.psi.daq.domain.request.range.RequestRangePulseId;
|
||||
import ch.psi.daq.domain.request.range.RequestRangeTime;
|
||||
import ch.psi.daq.domain.test.TestTimeUtils;
|
||||
import ch.psi.daq.queryrest.controller.QueryRestController;
|
||||
import ch.psi.daq.queryrest.response.csv.CSVHTTPResponse;
|
||||
import ch.psi.daq.queryrest.response.csv.CSVResponseStreamWriter;
|
||||
import ch.psi.daq.test.queryrest.AbstractDaqRestTest;
|
||||
|
||||
@ -62,7 +64,7 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest {
|
||||
0,
|
||||
1),
|
||||
channels);
|
||||
request.setResponseFormat(ResponseFormat.CSV);
|
||||
request.setResponse(new CSVHTTPResponse());
|
||||
|
||||
LinkedHashSet<QueryField> queryFields = new LinkedHashSet<>();
|
||||
queryFields.add(QueryField.channel);
|
||||
@ -155,7 +157,7 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest {
|
||||
-1,
|
||||
-1),
|
||||
channels);
|
||||
request.setResponseFormat(ResponseFormat.CSV);
|
||||
request.setResponse(new CSVHTTPResponse());
|
||||
|
||||
LinkedHashSet<QueryField> queryFields = new LinkedHashSet<>();
|
||||
queryFields.add(QueryField.channel);
|
||||
@ -245,7 +247,7 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest {
|
||||
-1,
|
||||
-1),
|
||||
channels);
|
||||
request.setResponseFormat(ResponseFormat.CSV);
|
||||
request.setResponse(new CSVHTTPResponse());
|
||||
|
||||
LinkedHashSet<QueryField> queryFields = new LinkedHashSet<>();
|
||||
queryFields.add(QueryField.channel);
|
||||
@ -332,7 +334,7 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest {
|
||||
0,
|
||||
1),
|
||||
testChannel3));
|
||||
request.setResponseFormat(ResponseFormat.CSV);
|
||||
request.setResponse(new CSVHTTPResponse());
|
||||
channels = Arrays.asList(TEST_CHANNEL_01, TEST_CHANNEL_02, testChannel3);
|
||||
|
||||
LinkedHashSet<QueryField> queryFields = new LinkedHashSet<>();
|
||||
@ -418,7 +420,7 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest {
|
||||
0,
|
||||
1),
|
||||
channels);
|
||||
request.setResponseFormat(ResponseFormat.CSV);
|
||||
request.setResponse(new CSVHTTPResponse());
|
||||
|
||||
LinkedHashSet<QueryField> queryFields = new LinkedHashSet<>();
|
||||
queryFields.add(QueryField.channel);
|
||||
@ -481,7 +483,7 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest {
|
||||
assertEquals("" + TimeUtils.getMillis(TestTimeUtils.getTimeFromPulseId(pulse)), record.get(column++));
|
||||
assertEquals(TimeUtils.getTimeStr(TestTimeUtils.getTimeFromPulseId(pulse)), record.get(column++));
|
||||
assertEquals("" + TimeUtils.getMillis(TestTimeUtils.getTimeFromPulseId(pulse)), record.get(column++));
|
||||
assertEquals("[2048]", record.get(column++));
|
||||
assertEquals("[8]", record.get(column++));
|
||||
assertEquals("1", record.get(column++));
|
||||
assertTrue(record.get(column).startsWith("["));
|
||||
assertTrue(record.get(column++).endsWith("]"));
|
||||
@ -502,7 +504,7 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest {
|
||||
TimeUtils.getTimeFromMillis(0, 0),
|
||||
TimeUtils.getTimeFromMillis(10, 0)),
|
||||
channels);
|
||||
request.setResponseFormat(ResponseFormat.CSV);
|
||||
request.setResponse(new CSVHTTPResponse());
|
||||
|
||||
LinkedHashSet<QueryField> queryFields = new LinkedHashSet<>();
|
||||
queryFields.add(QueryField.channel);
|
||||
@ -587,7 +589,7 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest {
|
||||
startDate,
|
||||
endDate),
|
||||
channels);
|
||||
request.setResponseFormat(ResponseFormat.CSV);
|
||||
request.setResponse(new CSVHTTPResponse());
|
||||
|
||||
LinkedHashSet<QueryField> queryFields = new LinkedHashSet<>();
|
||||
queryFields.add(QueryField.channel);
|
||||
@ -673,9 +675,9 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest {
|
||||
0,
|
||||
1),
|
||||
Ordering.asc,
|
||||
AggregationType.extrema,
|
||||
new AggregationDescriptor(AggregationType.extrema),
|
||||
TEST_CHANNEL_NAMES[0]);
|
||||
request.setResponseFormat(ResponseFormat.CSV);
|
||||
request.setResponse(new CSVHTTPResponse());
|
||||
|
||||
String content = mapper.writeValueAsString(request);
|
||||
|
||||
@ -703,9 +705,9 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest {
|
||||
0,
|
||||
1),
|
||||
Ordering.asc,
|
||||
AggregationType.index,
|
||||
new AggregationDescriptor(AggregationType.index),
|
||||
TEST_CHANNEL_NAMES[0]);
|
||||
request.setResponseFormat(ResponseFormat.CSV);
|
||||
request.setResponse(new CSVHTTPResponse());
|
||||
|
||||
String content = mapper.writeValueAsString(request);
|
||||
|
||||
@ -733,13 +735,18 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest {
|
||||
long endTime = 99;
|
||||
String startDate = TimeUtils.format(startTime);
|
||||
String endDate = TimeUtils.format(endTime);
|
||||
List<Aggregation> aggregations = new ArrayList<>();
|
||||
aggregations.add(Aggregation.min);
|
||||
aggregations.add(Aggregation.mean);
|
||||
aggregations.add(Aggregation.max);
|
||||
|
||||
DAQQuery request = new DAQQuery(
|
||||
new RequestRangeDate(
|
||||
startDate,
|
||||
endDate),
|
||||
channels);
|
||||
request.setNrOfBins(2);
|
||||
request.setResponseFormat(ResponseFormat.CSV);
|
||||
request.setAggregation(new AggregationDescriptor().setNrOfBins(2).setAggregations(aggregations));
|
||||
request.setResponse(new CSVHTTPResponse());
|
||||
|
||||
LinkedHashSet<QueryField> queryFields = new LinkedHashSet<>();
|
||||
queryFields.add(QueryField.channel);
|
||||
@ -752,12 +759,6 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest {
|
||||
queryFields.add(QueryField.eventCount);
|
||||
request.setFields(queryFields);
|
||||
|
||||
List<Aggregation> aggregations = new ArrayList<>();
|
||||
aggregations.add(Aggregation.min);
|
||||
aggregations.add(Aggregation.mean);
|
||||
aggregations.add(Aggregation.max);
|
||||
request.setAggregations(aggregations);
|
||||
|
||||
String content = mapper.writeValueAsString(request);
|
||||
System.out.println(content);
|
||||
|
||||
@ -826,6 +827,146 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest {
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDateRangeQueryNrOfBinsAggregateExtrema() throws Exception {
|
||||
List<String> channels = Arrays.asList(TEST_CHANNEL_01);
|
||||
long startTime = 0;
|
||||
long endTime = 99;
|
||||
String startDate = TimeUtils.format(startTime);
|
||||
String endDate = TimeUtils.format(endTime);
|
||||
List<Aggregation> aggregations = new ArrayList<>();
|
||||
aggregations.add(Aggregation.min);
|
||||
aggregations.add(Aggregation.mean);
|
||||
aggregations.add(Aggregation.max);
|
||||
List<Extrema> extrema = new ArrayList<>();
|
||||
extrema.add(Extrema.minValue);
|
||||
extrema.add(Extrema.maxValue);
|
||||
|
||||
DAQQuery request = new DAQQuery(
|
||||
new RequestRangeDate(
|
||||
startDate,
|
||||
endDate),
|
||||
channels);
|
||||
request.setAggregation(new AggregationDescriptor().setNrOfBins(2).setAggregations(aggregations)
|
||||
.setExtrema(extrema));
|
||||
request.setResponse(new CSVHTTPResponse());
|
||||
|
||||
LinkedHashSet<QueryField> queryFields = new LinkedHashSet<>();
|
||||
queryFields.add(QueryField.channel);
|
||||
queryFields.add(QueryField.pulseId);
|
||||
queryFields.add(QueryField.iocSeconds);
|
||||
queryFields.add(QueryField.iocMillis);
|
||||
queryFields.add(QueryField.globalSeconds);
|
||||
queryFields.add(QueryField.globalMillis);
|
||||
queryFields.add(QueryField.shape);
|
||||
queryFields.add(QueryField.eventCount);
|
||||
queryFields.add(QueryField.value);
|
||||
request.setFields(queryFields);
|
||||
|
||||
Set<QueryField> extremaFields = new LinkedHashSet<>();
|
||||
for (Extrema extremum : extrema) {
|
||||
for (QueryField queryField : queryFields) {
|
||||
if (extremum.getAccessor(queryField) != null) {
|
||||
extremaFields.add(queryField);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
String content = mapper.writeValueAsString(request);
|
||||
System.out.println(content);
|
||||
|
||||
MvcResult result = this.mockMvc
|
||||
.perform(MockMvcRequestBuilders
|
||||
.post(QueryRestController.PATH_QUERY)
|
||||
.contentType(MediaType.APPLICATION_JSON)
|
||||
.content(content))
|
||||
.andDo(MockMvcResultHandlers.print())
|
||||
.andExpect(MockMvcResultMatchers.status().isOk())
|
||||
.andReturn();
|
||||
|
||||
String response = result.getResponse().getContentAsString();
|
||||
System.out.println("Response: " + response);
|
||||
|
||||
CSVFormat csvFormat = CSVFormat.EXCEL.withDelimiter(CSVResponseStreamWriter.DELIMITER_CVS);
|
||||
StringReader reader = new StringReader(response);
|
||||
CSVParser csvParser = new CSVParser(reader, csvFormat);
|
||||
|
||||
// will not be included as it is an aggregation
|
||||
queryFields.remove(QueryField.value);
|
||||
try {
|
||||
long pulse = 0;
|
||||
int totalRows = 2;
|
||||
|
||||
List<CSVRecord> records = csvParser.getRecords();
|
||||
assertEquals(totalRows + 1, records.size());
|
||||
// remove header
|
||||
CSVRecord record = records.remove(0);
|
||||
assertEquals(
|
||||
(queryFields.size() + aggregations.size() + (extremaFields.size() * extrema.size())) * channels.size(),
|
||||
record.size());
|
||||
int column = 0;
|
||||
for (String channel : channels) {
|
||||
for (QueryField queryField : queryFields) {
|
||||
assertEquals(channel + CSVResponseStreamWriter.DELIMITER_CHANNELNAME_FIELDNAME + queryField.name(),
|
||||
record.get(column++));
|
||||
}
|
||||
for (Aggregation aggregation : aggregations) {
|
||||
assertEquals(channel + CSVResponseStreamWriter.DELIMITER_CHANNELNAME_FIELDNAME + QueryField.value
|
||||
+ CSVResponseStreamWriter.DELIMITER_CHANNELNAME_FIELDNAME + aggregation.name(),
|
||||
record.get(column++));
|
||||
}
|
||||
for (Extrema extremum : extrema) {
|
||||
for (QueryField queryField : extremaFields) {
|
||||
assertEquals(channel + CSVResponseStreamWriter.DELIMITER_CHANNELNAME_FIELDNAME
|
||||
+ CSVResponseStreamWriter.FIELDNAME_EXTREMA
|
||||
+ CSVResponseStreamWriter.DELIMITER_CHANNELNAME_FIELDNAME + extremum.name()
|
||||
+ CSVResponseStreamWriter.DELIMITER_CHANNELNAME_FIELDNAME + queryField.name(),
|
||||
record.get(column++));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (int row = 0; row < totalRows; ++row) {
|
||||
record = records.get(row);
|
||||
|
||||
assertEquals((queryFields.size() + aggregations.size() + (extremaFields.size() * extrema.size()))
|
||||
* channels.size(), record.size());
|
||||
|
||||
column = 0;
|
||||
for (String channel : channels) {
|
||||
assertEquals(channel, record.get(column++));
|
||||
assertEquals("" + pulse, record.get(column++));
|
||||
assertEquals(TimeUtils.getTimeStr(TestTimeUtils.getTimeFromPulseId(pulse)), record.get(column++));
|
||||
assertEquals("" + TimeUtils.getMillis(TestTimeUtils.getTimeFromPulseId(pulse)), record.get(column++));
|
||||
assertEquals(TimeUtils.getTimeStr(TestTimeUtils.getTimeFromPulseId(pulse)), record.get(column++));
|
||||
assertEquals("" + TimeUtils.getMillis(TestTimeUtils.getTimeFromPulseId(pulse)), record.get(column++));
|
||||
assertEquals("[1]", record.get(column++));
|
||||
assertEquals("5", record.get(column++));
|
||||
assertEquals("" + pulse + ".0", record.get(column++));
|
||||
assertEquals("" + (pulse + 2) + ".0", record.get(column++));
|
||||
assertEquals("" + (pulse + 4) + ".0", record.get(column++));
|
||||
|
||||
assertEquals("" + pulse, record.get(column++));
|
||||
assertEquals(TimeUtils.getTimeStr(TestTimeUtils.getTimeFromPulseId(pulse)), record.get(column++));
|
||||
assertEquals("" + TimeUtils.getMillis(TestTimeUtils.getTimeFromPulseId(pulse)), record.get(column++));
|
||||
assertEquals("1", record.get(column++));
|
||||
assertEquals("" + pulse + ".0", record.get(column++));
|
||||
|
||||
assertEquals("" + (pulse + 4), record.get(column++));
|
||||
assertEquals(TimeUtils.getTimeStr(TestTimeUtils.getTimeFromPulseId((pulse + 4))), record.get(column++));
|
||||
assertEquals("" + TimeUtils.getMillis(TestTimeUtils.getTimeFromPulseId((pulse + 4))),
|
||||
record.get(column++));
|
||||
assertEquals("1", record.get(column++));
|
||||
assertEquals("" + (pulse + 4) + ".0", record.get(column++));
|
||||
}
|
||||
pulse += 5;
|
||||
}
|
||||
} finally {
|
||||
reader.close();
|
||||
csvParser.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDateRangeQueryBinSizeAggregate() throws Exception {
|
||||
List<String> channels = Arrays.asList(TEST_CHANNEL_01);
|
||||
@ -833,13 +974,18 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest {
|
||||
long endTime = 999;
|
||||
String startDate = TimeUtils.format(startTime);
|
||||
String endDate = TimeUtils.format(endTime);
|
||||
List<Aggregation> aggregations = new ArrayList<>();
|
||||
aggregations.add(Aggregation.min);
|
||||
aggregations.add(Aggregation.mean);
|
||||
aggregations.add(Aggregation.max);
|
||||
|
||||
DAQQuery request = new DAQQuery(
|
||||
new RequestRangeDate(
|
||||
startDate,
|
||||
endDate),
|
||||
channels);
|
||||
request.setBinSize(100);
|
||||
request.setResponseFormat(ResponseFormat.CSV);
|
||||
request.setAggregation(new AggregationDescriptor().setDurationPerBin(100).setAggregations(aggregations));
|
||||
request.setResponse(new CSVHTTPResponse());
|
||||
|
||||
LinkedHashSet<QueryField> queryFields = new LinkedHashSet<>();
|
||||
queryFields.add(QueryField.channel);
|
||||
@ -852,12 +998,6 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest {
|
||||
queryFields.add(QueryField.eventCount);
|
||||
request.setFields(queryFields);
|
||||
|
||||
List<Aggregation> aggregations = new ArrayList<>();
|
||||
aggregations.add(Aggregation.min);
|
||||
aggregations.add(Aggregation.mean);
|
||||
aggregations.add(Aggregation.max);
|
||||
request.setAggregations(aggregations);
|
||||
|
||||
String content = mapper.writeValueAsString(request);
|
||||
System.out.println(content);
|
||||
|
||||
@ -934,7 +1074,7 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest {
|
||||
0,
|
||||
1),
|
||||
channels);
|
||||
request.setResponseFormat(ResponseFormat.CSV);
|
||||
request.setResponse(new CSVHTTPResponse());
|
||||
|
||||
LinkedHashSet<QueryField> queryFields = new LinkedHashSet<>();
|
||||
queryFields.add(QueryField.channel);
|
||||
@ -995,8 +1135,7 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest {
|
||||
10,
|
||||
11),
|
||||
TEST_CHANNEL_NAMES);
|
||||
request.setResponseFormat(ResponseFormat.CSV);
|
||||
request.setCompression(Compression.GZIP);
|
||||
request.setResponse(new CSVHTTPResponse(Compression.GZIP));
|
||||
|
||||
String content = mapper.writeValueAsString(request);
|
||||
|
||||
@ -1018,7 +1157,7 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest {
|
||||
10,
|
||||
11),
|
||||
TEST_CHANNEL_NAMES);
|
||||
request.setResponseFormat(ResponseFormat.CSV);
|
||||
request.setResponse(new CSVHTTPResponse());
|
||||
|
||||
String content = mapper.writeValueAsString(request);
|
||||
|
||||
|
@ -1,5 +1,7 @@
|
||||
package ch.psi.daq.test.queryrest.controller;
|
||||
|
||||
import java.util.Arrays;
|
||||
|
||||
import org.junit.After;
|
||||
import org.junit.Test;
|
||||
import org.springframework.http.MediaType;
|
||||
@ -15,8 +17,11 @@ import ch.psi.daq.domain.query.DAQQueries;
|
||||
import ch.psi.daq.domain.query.DAQQuery;
|
||||
import ch.psi.daq.domain.query.DAQQueryElement;
|
||||
import ch.psi.daq.domain.query.channels.ChannelsRequest;
|
||||
import ch.psi.daq.domain.query.operation.Aggregation;
|
||||
import ch.psi.daq.domain.query.operation.AggregationDescriptor;
|
||||
import ch.psi.daq.domain.query.operation.AggregationType;
|
||||
import ch.psi.daq.domain.query.operation.Compression;
|
||||
import ch.psi.daq.domain.query.operation.Extrema;
|
||||
import ch.psi.daq.domain.query.operation.QueryField;
|
||||
import ch.psi.daq.domain.reader.Backend;
|
||||
import ch.psi.daq.domain.request.range.RequestRangeDate;
|
||||
@ -24,6 +29,7 @@ import ch.psi.daq.domain.request.range.RequestRangePulseId;
|
||||
import ch.psi.daq.domain.request.range.RequestRangeTime;
|
||||
import ch.psi.daq.domain.test.TestTimeUtils;
|
||||
import ch.psi.daq.queryrest.controller.QueryRestController;
|
||||
import ch.psi.daq.queryrest.response.json.JSONHTTPResponse;
|
||||
import ch.psi.daq.test.queryrest.AbstractDaqRestTest;
|
||||
|
||||
/**
|
||||
@ -33,6 +39,7 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest {
|
||||
|
||||
public static final String TEST_CHANNEL_01 = "testChannel1";
|
||||
public static final String TEST_CHANNEL_02 = "testChannel2";
|
||||
public static final String TEST_CHANNEL_WAVEFORM_01 = "testChannelWaveform1";
|
||||
public static final String[] TEST_CHANNEL_NAMES = new String[] {TEST_CHANNEL_01, TEST_CHANNEL_02};
|
||||
|
||||
@After
|
||||
@ -196,9 +203,10 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest {
|
||||
@Test
|
||||
public void testCorsFilterIncludesHeaders() throws Exception {
|
||||
// all headers are set
|
||||
this.mockMvc = MockMvcBuilders.webAppContextSetup(webApplicationContext).build();
|
||||
|
||||
// curl -H "Origin: *" -H "Access-Control-Request-Method: POST" -X OPTIONS -v http://localhost:8080/channels
|
||||
this.mockMvc = MockMvcBuilders.webAppContextSetup(webApplicationContext).build();
|
||||
|
||||
// curl -H "Origin: *" -H "Access-Control-Request-Method: POST" -X OPTIONS -v
|
||||
// http://localhost:8080/channels
|
||||
this.mockMvc.perform(
|
||||
MockMvcRequestBuilders
|
||||
.options(QueryRestController.PATH_CHANNELS)
|
||||
@ -209,7 +217,8 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest {
|
||||
.andExpect(MockMvcResultMatchers.status().isOk())
|
||||
.andExpect(MockMvcResultMatchers.header().string("Access-Control-Allow-Origin", "*"));
|
||||
|
||||
// curl -H "Origin: http://localhost:8080" -H "Access-Control-Request-Method: POST" -X OPTIONS -v http://localhost:8080/channels
|
||||
// curl -H "Origin: http://localhost:8080" -H "Access-Control-Request-Method: POST" -X OPTIONS
|
||||
// -v http://localhost:8080/channels
|
||||
this.mockMvc.perform(
|
||||
MockMvcRequestBuilders
|
||||
.options(QueryRestController.PATH_CHANNELS)
|
||||
@ -280,17 +289,17 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest {
|
||||
TestTimeUtils.getTimeStr(1, 10000000)))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[1].data[1].iocMillis").value(1010));
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testPulseRangeQuery_Fields() throws Exception {
|
||||
DAQQuery request = new DAQQuery(
|
||||
new RequestRangePulseId(
|
||||
100,
|
||||
199),
|
||||
new AggregationDescriptor().setNrOfBins(2),
|
||||
TEST_CHANNEL_NAMES);
|
||||
request.addField(QueryField.pulseId);
|
||||
request.addField(QueryField.eventCount);
|
||||
request.setNrOfBins(2);
|
||||
|
||||
String content = mapper.writeValueAsString(request);
|
||||
System.out.println(content);
|
||||
@ -510,7 +519,7 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest {
|
||||
100,
|
||||
101),
|
||||
Ordering.asc,
|
||||
AggregationType.extrema,
|
||||
new AggregationDescriptor(AggregationType.extrema),
|
||||
TEST_CHANNEL_NAMES[0]);
|
||||
|
||||
String content = mapper.writeValueAsString(request);
|
||||
@ -549,12 +558,14 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest {
|
||||
long endTime = 1099;
|
||||
String startDate = TimeUtils.format(startTime);
|
||||
String endDate = TimeUtils.format(endTime);
|
||||
DAQQuery request = new DAQQuery(
|
||||
new RequestRangeDate(
|
||||
startDate,
|
||||
endDate),
|
||||
TEST_CHANNEL_01);
|
||||
request.setNrOfBins(2);
|
||||
DAQQuery request =
|
||||
new DAQQuery(
|
||||
new RequestRangeDate(
|
||||
startDate,
|
||||
endDate),
|
||||
new AggregationDescriptor().setNrOfBins(2).setAggregations(
|
||||
Arrays.asList(Aggregation.min, Aggregation.mean, Aggregation.max)),
|
||||
TEST_CHANNEL_01);
|
||||
|
||||
String content = mapper.writeValueAsString(request);
|
||||
System.out.println(content);
|
||||
@ -578,10 +589,89 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest {
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].globalSeconds").value(
|
||||
TestTimeUtils.getTimeStr(1, 0)))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].eventCount").value(5))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].value.min").value(100.0))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].value.mean").value(102.0))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].value.max").value(104.0))
|
||||
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].pulseId").value(105))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].globalSeconds").value(
|
||||
TestTimeUtils.getTimeStr(1, 50000000)))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].eventCount").value(5));
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].eventCount").value(5))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].value.min").value(105.0))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].value.mean").value(107.0))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].value.max").value(109.0));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDateRangeQueryNrOfBinsAggregateExtrema() throws Exception {
|
||||
long startTime = 1000;
|
||||
long endTime = 1099;
|
||||
String startDate = TimeUtils.format(startTime);
|
||||
String endDate = TimeUtils.format(endTime);
|
||||
DAQQuery request =
|
||||
new DAQQuery(
|
||||
new RequestRangeDate(
|
||||
startDate,
|
||||
endDate),
|
||||
new AggregationDescriptor()
|
||||
.setNrOfBins(2)
|
||||
.setAggregations(Arrays.asList(Aggregation.min, Aggregation.mean, Aggregation.max))
|
||||
.setExtrema(Arrays.asList(Extrema.minValue, Extrema.maxValue)),
|
||||
TEST_CHANNEL_01);
|
||||
|
||||
String content = mapper.writeValueAsString(request);
|
||||
System.out.println(content);
|
||||
|
||||
this.mockMvc
|
||||
.perform(
|
||||
MockMvcRequestBuilders
|
||||
.post(QueryRestController.PATH_QUERY)
|
||||
.contentType(MediaType.APPLICATION_JSON)
|
||||
.content(content)
|
||||
)
|
||||
.andDo(MockMvcResultHandlers.print())
|
||||
.andExpect(MockMvcResultMatchers.status().isOk())
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$").isArray())
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0]").exists())
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].channel").isMap())
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].channel.name").value(TEST_CHANNEL_01))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].channel.backend").value(Backend.SF_DATABUFFER.getKey()))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data").isArray())
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].pulseId").value(100))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].globalSeconds").value(
|
||||
TestTimeUtils.getTimeStr(1, 0)))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].eventCount").value(5))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].value.min").value(100.0))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].value.mean").value(102.0))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].value.max").value(104.0))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].extrema.minValue.value").value(100.0))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].extrema.minValue.pulseId").value(100))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].extrema.minValue.globalSeconds").value(
|
||||
TestTimeUtils.getTimeStr(1, 0)))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].extrema.minValue.eventCount").value(1))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].extrema.maxValue.value").value(104.0))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].extrema.maxValue.pulseId").value(104))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].extrema.maxValue.globalSeconds").value(
|
||||
TestTimeUtils.getTimeStr(1, 40000000)))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].extrema.maxValue.eventCount").value(1))
|
||||
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].pulseId").value(105))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].globalSeconds").value(
|
||||
TestTimeUtils.getTimeStr(1, 50000000)))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].eventCount").value(5))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].value.min").value(105.0))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].value.mean").value(107.0))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].value.max").value(109.0))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].extrema.minValue.value").value(105.0))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].extrema.minValue.pulseId").value(105))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].extrema.minValue.globalSeconds").value(
|
||||
TestTimeUtils.getTimeStr(1, 50000000)))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].extrema.minValue.eventCount").value(1))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].extrema.maxValue.value").value(109.0))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].extrema.maxValue.pulseId").value(109))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].extrema.maxValue.globalSeconds").value(
|
||||
TestTimeUtils.getTimeStr(1, 90000000)))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].extrema.maxValue.eventCount").value(1));
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -594,8 +684,8 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest {
|
||||
new RequestRangeDate(
|
||||
startDate,
|
||||
endDate),
|
||||
new AggregationDescriptor().setDurationPerBin(100),
|
||||
TEST_CHANNEL_01);
|
||||
request.setBinSize(100);
|
||||
|
||||
String content = mapper.writeValueAsString(request);
|
||||
System.out.println(content);
|
||||
@ -619,42 +709,244 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest {
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].globalSeconds").value(
|
||||
TestTimeUtils.getTimeStr(10, 0)))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].eventCount").value(10))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].value.min").value(1000.0))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].value.mean").value(1004.5))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].value.max").value(1009.0))
|
||||
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].pulseId").value(1010))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].globalSeconds").value(
|
||||
TestTimeUtils.getTimeStr(10, 100000000)))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].eventCount").value(10))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].value.min").value(1010.0))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].value.mean").value(1014.5))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].value.max").value(1019.0))
|
||||
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[2].pulseId").value(1020))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[2].globalSeconds").value(
|
||||
TestTimeUtils.getTimeStr(10, 200000000)))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[2].eventCount").value(10))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[2].value.min").value(1020.0))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[2].value.mean").value(1024.5))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[2].value.max").value(1029.0))
|
||||
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[3].pulseId").value(1030))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[3].globalSeconds").value(
|
||||
TestTimeUtils.getTimeStr(10, 300000000)))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[3].eventCount").value(10))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[3].value.min").value(1030.0))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[3].value.mean").value(1034.5))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[3].value.max").value(1039.0))
|
||||
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[4].pulseId").value(1040))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[4].globalSeconds").value(
|
||||
TestTimeUtils.getTimeStr(10, 400000000)))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[4].eventCount").value(10))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[4].value.min").value(1040.0))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[4].value.mean").value(1044.5))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[4].value.max").value(1049.0))
|
||||
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[5].pulseId").value(1050))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[5].globalSeconds").value(
|
||||
TestTimeUtils.getTimeStr(10, 500000000)))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[5].eventCount").value(10))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[5].value.min").value(1050.0))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[5].value.mean").value(1054.5))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[5].value.max").value(1059.0))
|
||||
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[6].pulseId").value(1060))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[6].globalSeconds").value(
|
||||
TestTimeUtils.getTimeStr(10, 600000000)))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[6].eventCount").value(10))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[6].value.min").value(1060.0))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[6].value.mean").value(1064.5))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[6].value.max").value(1069.0))
|
||||
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[7].pulseId").value(1070))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[7].globalSeconds").value(
|
||||
TestTimeUtils.getTimeStr(10, 700000000)))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[7].eventCount").value(10))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[7].value.min").value(1070.0))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[7].value.mean").value(1074.5))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[7].value.max").value(1079.0))
|
||||
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[8].pulseId").value(1080))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[8].globalSeconds").value(
|
||||
TestTimeUtils.getTimeStr(10, 800000000)))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[8].eventCount").value(10))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[8].value.min").value(1080.0))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[8].value.mean").value(1084.5))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[8].value.max").value(1089.0))
|
||||
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[9].pulseId").value(1090))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[9].globalSeconds").value(
|
||||
TestTimeUtils.getTimeStr(10, 900000000)))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[9].eventCount").value(10));
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[9].eventCount").value(10))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[9].value.min").value(1090.0))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[9].value.mean").value(1094.5))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[9].value.max").value(1099.0));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDateRangeQueryIndexAggregate() throws Exception {
|
||||
long startTime = 1000;
|
||||
long endTime = 1099;
|
||||
String startDate = TimeUtils.format(startTime);
|
||||
String endDate = TimeUtils.format(endTime);
|
||||
DAQQuery request =
|
||||
new DAQQuery(
|
||||
new RequestRangeDate(
|
||||
startDate,
|
||||
endDate),
|
||||
new AggregationDescriptor()
|
||||
.setAggregationType(AggregationType.index)
|
||||
.setNrOfBins(2)
|
||||
.setAggregations(Arrays.asList(Aggregation.min, Aggregation.mean, Aggregation.max)),
|
||||
TEST_CHANNEL_WAVEFORM_01);
|
||||
|
||||
String content = mapper.writeValueAsString(request);
|
||||
System.out.println(content);
|
||||
|
||||
this.mockMvc
|
||||
.perform(
|
||||
MockMvcRequestBuilders
|
||||
.post(QueryRestController.PATH_QUERY)
|
||||
.contentType(MediaType.APPLICATION_JSON)
|
||||
.content(content)
|
||||
)
|
||||
.andDo(MockMvcResultHandlers.print())
|
||||
.andExpect(MockMvcResultMatchers.status().isOk())
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$").isArray())
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0]").exists())
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].channel").isMap())
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].channel.name").value(TEST_CHANNEL_WAVEFORM_01))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].channel.backend").value(Backend.SF_DATABUFFER.getKey()))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data").isArray())
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].pulseId").value(100))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].globalSeconds").value(
|
||||
TestTimeUtils.getTimeStr(1, 0)))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].eventCount").value(5))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].value").isArray())
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].value[0].min").value(100.0))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].value[0].mean").value(102.0))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].value[0].max").value(104.0))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].value[1].min").value(100.0))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].value[1].mean").value(102.0))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].value[1].max").value(104.0))
|
||||
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].pulseId").value(105))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].globalSeconds").value(
|
||||
TestTimeUtils.getTimeStr(1, 50000000)))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].eventCount").value(5))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].value").isArray())
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].value[0].min").value(105.0))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].value[0].mean").value(107.0))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].value[0].max").value(109.0))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].value[1].min").value(105.0))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].value[1].mean").value(107.0))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].value[1].max").value(109.0));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDateRangeQueryIndexAggregateExtrema() throws Exception {
|
||||
long startTime = 1000;
|
||||
long endTime = 1099;
|
||||
String startDate = TimeUtils.format(startTime);
|
||||
String endDate = TimeUtils.format(endTime);
|
||||
DAQQuery request =
|
||||
new DAQQuery(
|
||||
new RequestRangeDate(
|
||||
startDate,
|
||||
endDate),
|
||||
new AggregationDescriptor()
|
||||
.setAggregationType(AggregationType.index)
|
||||
.setNrOfBins(2)
|
||||
.setAggregations(Arrays.asList(Aggregation.min, Aggregation.mean, Aggregation.max))
|
||||
.setExtrema(Arrays.asList(Extrema.minValue, Extrema.maxValue)),
|
||||
TEST_CHANNEL_WAVEFORM_01);
|
||||
|
||||
String content = mapper.writeValueAsString(request);
|
||||
System.out.println(content);
|
||||
|
||||
this.mockMvc
|
||||
.perform(
|
||||
MockMvcRequestBuilders
|
||||
.post(QueryRestController.PATH_QUERY)
|
||||
.contentType(MediaType.APPLICATION_JSON)
|
||||
.content(content)
|
||||
)
|
||||
.andDo(MockMvcResultHandlers.print())
|
||||
.andExpect(MockMvcResultMatchers.status().isOk())
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$").isArray())
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0]").exists())
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].channel").isMap())
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].channel.name").value(TEST_CHANNEL_WAVEFORM_01))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].channel.backend").value(Backend.SF_DATABUFFER.getKey()))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data").isArray())
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].pulseId").value(100))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].globalSeconds").value(
|
||||
TestTimeUtils.getTimeStr(1, 0)))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].eventCount").value(5))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].value").isArray())
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].value[0].min").value(100.0))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].value[0].mean").value(102.0))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].value[0].max").value(104.0))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].value[1].min").value(100.0))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].value[1].mean").value(102.0))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].value[1].max").value(104.0))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].extrema").isArray())
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].extrema[0].minValue.value").value(100.0))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].extrema[0].minValue.pulseId").value(100))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].extrema[0].minValue.globalSeconds").value(
|
||||
TestTimeUtils.getTimeStr(1, 0)))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].extrema[0].minValue.eventCount").value(1))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].extrema[0].maxValue.value").value(104.0))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].extrema[0].maxValue.pulseId").value(104))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].extrema[0].maxValue.globalSeconds").value(
|
||||
TestTimeUtils.getTimeStr(1, 40000000)))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].extrema[0].maxValue.eventCount").value(1))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].extrema[1].minValue.value").value(100.0))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].extrema[1].minValue.pulseId").value(100))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].extrema[1].minValue.globalSeconds").value(
|
||||
TestTimeUtils.getTimeStr(1, 0)))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].extrema[1].minValue.eventCount").value(1))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].extrema[1].maxValue.value").value(104.0))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].extrema[1].maxValue.pulseId").value(104))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].extrema[1].maxValue.globalSeconds").value(
|
||||
TestTimeUtils.getTimeStr(1, 40000000)))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].extrema[1].maxValue.eventCount").value(1))
|
||||
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].pulseId").value(105))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].globalSeconds").value(
|
||||
TestTimeUtils.getTimeStr(1, 50000000)))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].eventCount").value(5))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].value").isArray())
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].value[0].min").value(105.0))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].value[0].mean").value(107.0))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].value[0].max").value(109.0))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].value[1].min").value(105.0))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].value[1].mean").value(107.0))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].value[1].max").value(109.0))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].extrema").isArray())
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].extrema[0].minValue.value").value(105.0))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].extrema[0].minValue.pulseId").value(105))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].extrema[0].minValue.globalSeconds").value(
|
||||
TestTimeUtils.getTimeStr(1, 50000000)))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].extrema[0].minValue.eventCount").value(1))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].extrema[0].maxValue.value").value(109.0))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].extrema[0].maxValue.pulseId").value(109))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].extrema[0].maxValue.globalSeconds").value(
|
||||
TestTimeUtils.getTimeStr(1, 90000000)))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].extrema[0].maxValue.eventCount").value(1))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].extrema[1].minValue.value").value(105.0))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].extrema[1].minValue.pulseId").value(105))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].extrema[1].minValue.globalSeconds").value(
|
||||
TestTimeUtils.getTimeStr(1, 50000000)))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].extrema[1].minValue.eventCount").value(1))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].extrema[1].maxValue.value").value(109.0))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].extrema[1].maxValue.pulseId").value(109))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].extrema[1].maxValue.globalSeconds").value(
|
||||
TestTimeUtils.getTimeStr(1, 90000000)))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].extrema[1].maxValue.eventCount").value(1));
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -664,7 +956,7 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest {
|
||||
10,
|
||||
11),
|
||||
TEST_CHANNEL_NAMES);
|
||||
request.setCompression(Compression.GZIP);
|
||||
request.setResponse(new JSONHTTPResponse(Compression.GZIP));
|
||||
|
||||
String content = mapper.writeValueAsString(request);
|
||||
System.out.println(content);
|
||||
@ -700,4 +992,70 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest {
|
||||
.andExpect(MockMvcResultMatchers.status().isOk())
|
||||
.andExpect(MockMvcResultMatchers.header().string("Content-Disposition", "attachment; filename=data.json"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBadAggregation_01() throws Exception {
|
||||
DAQQuery request = new DAQQuery(
|
||||
new RequestRangePulseId(
|
||||
10,
|
||||
11),
|
||||
new AggregationDescriptor().setDurationPerBin(1000),
|
||||
TEST_CHANNEL_NAMES);
|
||||
|
||||
String content = mapper.writeValueAsString(request);
|
||||
System.out.println(content);
|
||||
|
||||
this.mockMvc
|
||||
.perform(MockMvcRequestBuilders
|
||||
.post(QueryRestController.PATH_QUERY)
|
||||
.contentType(MediaType.APPLICATION_JSON)
|
||||
.content(content))
|
||||
|
||||
.andDo(MockMvcResultHandlers.print())
|
||||
.andExpect(MockMvcResultMatchers.status().isBadRequest());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBadAggregation_02() throws Exception {
|
||||
DAQQuery request = new DAQQuery(
|
||||
new RequestRangeTime(
|
||||
TimeUtils.getTimeFromMillis(0, 0),
|
||||
TimeUtils.getTimeFromMillis(10, 0)),
|
||||
new AggregationDescriptor().setPulsesPerBin(100),
|
||||
TEST_CHANNEL_NAMES);
|
||||
|
||||
String content = mapper.writeValueAsString(request);
|
||||
System.out.println(content);
|
||||
|
||||
this.mockMvc
|
||||
.perform(MockMvcRequestBuilders
|
||||
.post(QueryRestController.PATH_QUERY)
|
||||
.contentType(MediaType.APPLICATION_JSON)
|
||||
.content(content))
|
||||
|
||||
.andDo(MockMvcResultHandlers.print())
|
||||
.andExpect(MockMvcResultMatchers.status().isBadRequest());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBadAggregation_03() throws Exception {
|
||||
DAQQuery request = new DAQQuery(
|
||||
new RequestRangePulseId(
|
||||
10,
|
||||
11),
|
||||
new AggregationDescriptor().setDurationPerBin(1000).setNrOfBins(100),
|
||||
TEST_CHANNEL_NAMES);
|
||||
|
||||
String content = mapper.writeValueAsString(request);
|
||||
System.out.println(content);
|
||||
|
||||
this.mockMvc
|
||||
.perform(MockMvcRequestBuilders
|
||||
.post(QueryRestController.PATH_QUERY)
|
||||
.contentType(MediaType.APPLICATION_JSON)
|
||||
.content(content))
|
||||
|
||||
.andDo(MockMvcResultHandlers.print())
|
||||
.andExpect(MockMvcResultMatchers.status().isBadRequest());
|
||||
}
|
||||
}
|
||||
|
@ -168,8 +168,9 @@ public class DummyCassandraReader implements CassandraReader {
|
||||
FieldNames.FIELD_PULSE_ID)) ? i : PropertiesUtils.DEFAULT_VALUE_BIGINT_PRIMITIVE;
|
||||
|
||||
if (channelLower.contains("waveform")) {
|
||||
long[] value = random.longs(2048).toArray();
|
||||
long[] value = random.longs(8).toArray();
|
||||
value[0] = i;
|
||||
value[1] = i;
|
||||
return new ChannelEventImpl(
|
||||
channel,
|
||||
iocTime,
|
||||
@ -180,11 +181,12 @@ public class DummyCassandraReader implements CassandraReader {
|
||||
);
|
||||
|
||||
} else if (channelLower.contains("image")) {
|
||||
int x = 640;
|
||||
int y = 480;
|
||||
int x = 4;
|
||||
int y = 8;
|
||||
int[] shape = new int[] {x, y};
|
||||
long[] value = random.longs(x * y).toArray();
|
||||
value[0] = i;
|
||||
value[1] = i;
|
||||
return new ChannelEventImpl(
|
||||
channel,
|
||||
iocTime,
|
||||
|
@ -0,0 +1,106 @@
|
||||
package ch.psi.daq.test.queryrest.response;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonParseException;
|
||||
import com.fasterxml.jackson.databind.JsonMappingException;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
import ch.psi.daq.domain.query.DAQQuery;
|
||||
import ch.psi.daq.domain.query.operation.Compression;
|
||||
import ch.psi.daq.domain.query.operation.Response;
|
||||
import ch.psi.daq.domain.request.range.RequestRangePulseId;
|
||||
import ch.psi.daq.queryrest.response.csv.CSVHTTPResponse;
|
||||
import ch.psi.daq.queryrest.response.json.JSONHTTPResponse;
|
||||
import ch.psi.daq.test.queryrest.AbstractDaqRestTest;
|
||||
|
||||
public class ResponseQueryTest extends AbstractDaqRestTest{
|
||||
|
||||
@Resource
|
||||
private ObjectMapper mapper;
|
||||
|
||||
@Test
|
||||
public void test_JSON_01() throws JsonParseException, JsonMappingException, IOException {
|
||||
Response respose = new CSVHTTPResponse();
|
||||
|
||||
String value = mapper.writeValueAsString(respose);
|
||||
|
||||
Response deserial = mapper.readValue(value, Response.class);
|
||||
|
||||
assertEquals(respose.getClass(), deserial.getClass());
|
||||
assertEquals(respose.getFormat(), deserial.getFormat());
|
||||
assertEquals(respose.getCompression(), deserial.getCompression());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test_JSON_02() throws JsonParseException, JsonMappingException, IOException {
|
||||
DAQQuery query = new DAQQuery(
|
||||
new RequestRangePulseId(
|
||||
0,
|
||||
100),
|
||||
"TestChannel_01");
|
||||
query.setResponse(new CSVHTTPResponse(Compression.GZIP));
|
||||
|
||||
String value = mapper.writeValueAsString(query);
|
||||
|
||||
DAQQuery deserial = mapper.readValue(value, DAQQuery.class);
|
||||
|
||||
assertNotNull(deserial.getResponse());
|
||||
assertEquals(query.getResponse().getClass(), deserial.getResponse().getClass());
|
||||
assertEquals(query.getResponse().getFormat(), deserial.getResponse().getFormat());
|
||||
assertEquals(query.getResponse().getCompression(), deserial.getResponse().getCompression());
|
||||
|
||||
assertEquals(query.getResponse().getCompression().getFileSuffix(), deserial.getResponse().getFileSuffix());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test_JSON_03() throws JsonParseException, JsonMappingException, IOException {
|
||||
DAQQuery query = new DAQQuery(
|
||||
new RequestRangePulseId(
|
||||
0,
|
||||
100),
|
||||
"TestChannel_01");
|
||||
query.setResponse(new JSONHTTPResponse(Compression.NONE));
|
||||
|
||||
String value = mapper.writeValueAsString(query);
|
||||
|
||||
int index = value.indexOf("format");
|
||||
assertTrue(index >= 0);
|
||||
index = value.indexOf("format", index + 1);
|
||||
// ensure string contains identifier only once
|
||||
assertEquals(-1, index);
|
||||
|
||||
DAQQuery deserial = mapper.readValue(value, DAQQuery.class);
|
||||
|
||||
assertNotNull(deserial.getResponse());
|
||||
assertEquals(query.getResponse().getClass(), deserial.getResponse().getClass());
|
||||
assertEquals(query.getResponse().getFormat(), deserial.getResponse().getFormat());
|
||||
assertEquals(query.getResponse().getCompression(), deserial.getResponse().getCompression());
|
||||
|
||||
assertEquals(query.getResponse().getFormat().getFileSuffix(), deserial.getResponse().getFileSuffix());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test_JSON_04() throws JsonParseException, JsonMappingException, IOException {
|
||||
DAQQuery query = new DAQQuery(
|
||||
new RequestRangePulseId(
|
||||
0,
|
||||
100),
|
||||
"TestChannel_01");
|
||||
|
||||
String value = mapper.writeValueAsString(query);
|
||||
|
||||
DAQQuery deserial = mapper.readValue(value, DAQQuery.class);
|
||||
|
||||
assertNull(deserial.getResponse());
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user