Merge branch 'queryrest_1' into 'master'

sf_daq/ch.psi.daq.queryrest#1

- implementing compression of result streams
- allowing for different responseFormats (JSON, CSV)

See merge request !1
This commit is contained in:
maerki_f
2015-12-22 13:22:24 +01:00
12 changed files with 533 additions and 282 deletions

View File

@ -5,6 +5,11 @@
<projects>
</projects>
<buildSpec>
<buildCommand>
<name>org.eclipse.wst.common.project.facet.core.builder</name>
<arguments>
</arguments>
</buildCommand>
<buildCommand>
<name>org.eclipse.jdt.core.javabuilder</name>
<arguments>
@ -20,5 +25,6 @@
<nature>org.springframework.ide.eclipse.core.springnature</nature>
<nature>org.springsource.ide.eclipse.gradle.core.nature</nature>
<nature>org.eclipse.jdt.core.javanature</nature>
<nature>org.eclipse.wst.common.project.facet.core.nature</nature>
</natures>
</projectDescription>

244
Readme.md
View File

@ -48,6 +48,7 @@ The REST interface is accessible through `http://data-api.psi.ch/sf`.
<a name="query_channel_names"/>
## Query Channel Names
### Request
@ -71,14 +72,14 @@ POST http://<host>:<port>/channels
### Example
```bash
curl -H "Accept: application/json" -H "Content-Type: application/json" -X POST -d '{"regex": "TRFCA|TRFCB"}' http://data-api.psi.ch/sf/channels
curl -H "Content-Type: application/json" -X POST -d '{"regex": "TRFCA|TRFCB"}' http://data-api.psi.ch/sf/channels
```
<a name="query_range"/>
## Query Range
Queries are applied to a range. Following ranges are supported.
Queries are applied to a range. The following types of ranges ranges are supported.
### By Pulse-Id
@ -131,98 +132,47 @@ Queries are applied to a range. Following ranges are supported.
## Query Data
### Request
### `compression`: compression of data can be enabled
By default, no data is compressed when transferred from the server to the client. However, compression can be enabled by setting the `compression` attribute to a value other than `none`, i.e. to `gzip` or `deflate`.
If compression is enabled, we have to tell `curl` that the data is compressed so that it is being decompressed automatically. `curl` decompresses the response when the `--compressed` parameter is set:
### `responseFormat`: data is in JSON by default
Responses can be formatted as CSV or JSON using the `responseFormat` field. The returned data is JSON-formatted per default.
CSV export does not support `index` and `extrema` aggregations.
### Query request endpoint
```
GET http://<host>:<port>/query
```
#### Data
#### Request body
A request is performed using JSON. The JSON query defines the channels to be queried, the range, and how the data should be aggregated (this is optional but highly recommended).
A request is performed by sending a valid JSON object in the HTTP request body. The JSON query defines the channels to be queried, the range, and how the data should be aggregated (this is optional but highly recommended).
There exist following fields:
The following attributes can be specified:
- **channels**: Array of channel names to be queried.
- **range**: The range of the query (see [Query Range](Readme.md#query_range)).
- **ordering**: The ordering of the data (see [here](https://github.psi.ch/projects/ST/repos/ch.psi.daq.common/browse/src/main/java/ch/psi/daq/common/ordering/Ordering.java) for possible values).
- **fields**: The requested fields (see [here](https://github.psi.ch/projects/ST/repos/ch.psi.daq.query/browse/src/main/java/ch/psi/daq/query/model/QueryField.java) for possible values).
- **ordering**: The ordering of the data (see [here](https://git.psi.ch/sf_daq/ch.psi.daq.common/blob/master/src/main/java/ch/psi/daq/common/ordering/Ordering.java) for possible values).
- **fields**: The requested fields (see [here](https://git.psi.ch/sf_daq/ch.psi.daq.query/blob/master/src/main/java/ch/psi/daq/query/model/QueryField.java) for possible values).
- **nrOfBins**: Activates data binning. Specifies the number of bins the pulse/time range should be divided into.
- **binSize**: Activates data binning. Specifies the number of pulses per bin for pulse-range queries or the number of milliseconds per bin for time-range queries (using number of pulses and number of milliseconds makes this binning strategy consistent between channel with different update frequencies).
- **aggregations**: Activates data aggregation. Array of requested aggregations (see [here](https://github.psi.ch/projects/ST/repos/ch.psi.daq.query/browse/src/main/java/ch/psi/daq/query/model/Aggregation.java) for possible values). These values will be added to the *data* array response.
- **aggregationType**: Specifies the type of aggregation (see [here](https://github.psi.ch/projects/ST/repos/ch.psi.daq.query/browse/src/main/java/ch/psi/daq/query/model/AggregationType.java)). The default type is *value* aggregation (e.g., sum([1,2,3])=6). Alternatively, it is possible to define *index* aggregation for multiple arrays in combination with binning (e.g., sum([1,2,3], [3,2,1]) = [4,4,4]).
- **aggregations**: Activates data aggregation. Array of requested aggregations (see [here](https://git.psi.ch/sf_daq/ch.psi.daq.query/blob/master/src/main/java/ch/psi/daq/query/model/Aggregation.java) for possible values). These values will be added to the *data* array response.
- **aggregationType**: Specifies the type of aggregation (see [here](https://git.psi.ch/sf_daq/ch.psi.daq.query/blob/master/src/main/java/ch/psi/daq/query/model/AggregationType.java)). The default type is *value* aggregation (e.g., sum([1,2,3])=6). Alternatively, it is possible to define *index* aggregation for multiple arrays in combination with binning (e.g., sum([1,2,3], [3,2,1]) = [4,4,4]).
- **aggregateChannels**: Specifies whether the data of the requested channels should be combined together using the defined aggregation (values: true|**false**)
- **dbMode**: Defines the database to access (values: **databuffer**|archiverappliance)
### Example
```bash
curl -H "Accept: application/json" -H "Content-Type: application/json" -X POST -d '{"range":{"startPulseId":0,"endPulseId":4},"channels":["channel1","channel2"]}' http://data-api.psi.ch/sf/query
```
### Response example
The response is in JSON.
```json
[
{
"channel":"channel1",
"data":[
{
"pulseId":0,
"iocMillis":0,
"iocNanos":0,
"globalMillis":0,
"globalNanos":0,
"value":0
},
{
"pulseId":2,
"iocMillis":2,
"iocNanos":2,
"globalMillis":2,
"globalNanos":2,
"value":2
},
{
"pulseId":4,
"iocMillis":4,
"iocNanos":4,
"globalMillis":4,
"globalNanos":4,
"value":4
}
]
},
{
"channel":"channel2",
"data":[
{
"pulseId":1,
"iocMillis":1,
"iocNanos":1,
"globalMillis":1,
"globalNanos":1,
"value":1
},
{
"pulseId":3,
"iocMillis":3,
"iocNanos":3,
"globalMillis":3,
"globalNanos":3,
"value":3
}
]
}
]
```
- **compression**: Defines the compression algorithm to use, default value is **none**, see all values [here](https://git.psi.ch/sf_daq/ch.psi.daq.domain/blob/master/src/main/java/ch/psi/daq/domain/Compression.java))
- **responseFormat**: Specifies the format the response of the requested data is in, either in JSON or CSV format, default value **json**, see all values [here](https://git.psi.ch/sf_daq/ch.psi.daq.domain/blob/master/src/main/java/ch/psi/daq/domain/ResponseType.java))
### Example Queries
Following examples build on a waveform data (see below). They also work for scalars (consider it as a waveform of length = 1) and images (waveform of length = dimX * dimY).
The following examples build on waveform data (see below). They also work for scalars (consider it as a waveform of length = 1) and images (waveform of length = dimX * dimY).
![Data Visualization](doc/images/Data_Visualization.png)
@ -282,6 +232,96 @@ Following examples build on a waveform data (see below). They also work for scal
### Query Examples
##### Query using compression
```json
{
"compression":"gzip",
"range":{
"startPulseId":0,
"endPulseId":3
},
"channels":[
"Channel_01"
]
}
```
or `deflate` can be used too:
```json
{
"compression":"deflate",
"range":{
"startPulseId":0,
"endPulseId":3
},
"channels":[
"Channel_01"
]
}
```
###### Command (gzip)
The `curl` command has a `--compressed` option to decompress data automatically.
```bash
curl --compressed -H "Content-Type: application/json" -X POST -d '{"compression":"gzip","range":{"startPulseId":0,"endPulseId":3},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query
```
##### Query setting CSV response format
```json
{
"responseFormat":"csv",
"range":{
"startPulseId":0,
"endPulseId":4
},
"channels":[
"channel1",
"channel2"
],
"fields":[
"channel",
"pulseId",
"iocMillis",
"iocNanos",
"globalMillis",
"globalNanos",
"shape",
"eventCount",
"value"
]
}
```
###### Command
```bash
curl -H "Content-Type: application/json" -X POST -d '{"responseFormat":"csv","range":{"startPulseId":0,"endPulseId":4},"channels":["channel1","channel2"],"fields":["channel","pulseId","iocMillis","iocNanos","globalMillis","globalNanos","shape","eventCount","value"]}' http://data-api.psi.ch/sf/query
```
###### Response
The response is in CSV.
```text
channel;pulseId;iocMillis;iocNanos;globalMillis;globalNanos;shape;eventCount;value
testChannel1;0;0;0;0;0;[1];1;0
testChannel1;1;10;0;10;0;[1];1;1
testChannel1;2;20;0;20;0;[1];1;2
testChannel1;3;30;0;30;0;[1];1;3
testChannel1;4;40;0;40;0;[1];1;4
testChannel2;0;0;0;0;0;[1];1;0
testChannel2;1;10;0;10;0;[1];1;1
testChannel2;2;20;0;20;0;[1];1;2
testChannel2;3;30;0;30;0;[1];1;3
testChannel2;4;40;0;40;0;[1];1;4
```
##### Query by Pulse-Id Range
```json
@ -299,7 +339,7 @@ Following examples build on a waveform data (see below). They also work for scal
###### Command
```bash
curl -H "Accept: application/json" -H "Content-Type: application/json" -X POST -d '{"range":{"startPulseId":0,"endPulseId":3},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query
curl -H "Content-Type: application/json" -X POST -d '{"range":{"startPulseId":0,"endPulseId":3},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query
```
###### Response
@ -327,7 +367,7 @@ See JSON representation of the data above.
###### Command
```bash
curl -H "Accept: application/json" -H "Content-Type: application/json" -X POST -d '{"range":{"startMillis":0,"startNanos":0,"endMillis":30,"endNanos":999999},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query
curl -H "Content-Type: application/json" -X POST -d '{"range":{"startMillis":0,"startNanos":0,"endMillis":30,"endNanos":999999},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query
```
###### Response
@ -357,7 +397,7 @@ Supported format is ISO8601 *YYYY-MM-DDThh:mm:ss.sTZD* (e.g. *1997-07-16T19:20:3
###### Command
```bash
curl -H "Accept: application/json" -H "Content-Type: application/json" -X POST -d '{"range":{"startDate":"1970-01-01T01:00:00.000","startNanos":0,"endDate":"1970-01-01T01:00:00.030","endNanos":999999},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query
curl -H "Content-Type: application/json" -X POST -d '{"range":{"startDate":"1970-01-01T01:00:00.000","startNanos":0,"endDate":"1970-01-01T01:00:00.030","endNanos":999999},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query
```
###### Response
@ -389,7 +429,7 @@ Archiver Appliance supports queries by *time range* and *date range* only (as it
###### Command
```bash
curl -H "Accept: application/json" -H "Content-Type: application/json" -X POST -d '{"dbmode":"archiverappliance","range":{"startMillis":0,"startNanos":0,"endMillis":30,"endNanos":999999},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query
curl -H "Content-Type: application/json" -X POST -d '{"dbmode":"archiverappliance","range":{"startMillis":0,"startNanos":0,"endMillis":30,"endNanos":999999},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query
```
###### Response
@ -418,7 +458,7 @@ Allows for server side optimizations since not all data needs to be retrieved.
###### Command
```bash
curl -H "Accept: application/json" -H "Content-Type: application/json" -X POST -d '{"fields":["pulseId","value"],"range":{"startPulseId":0,"endPulseId":3},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query
curl -H "Content-Type: application/json" -X POST -d '{"fields":["pulseId","value"],"range":{"startPulseId":0,"endPulseId":3},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query
```
###### Response
@ -471,7 +511,7 @@ Use **none** in case ordering does not matter (allows for server side optimizati
###### Command
```bash
curl -H "Accept: application/json" -H "Content-Type: application/json" -X POST -d '{"ordering":"desc","fields":["pulseId","value"],"range":{"startPulseId":0,"endPulseId":3},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query
curl -H "Content-Type: application/json" -X POST -d '{"ordering":"desc","fields":["pulseId","value"],"range":{"startPulseId":0,"endPulseId":3},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query
```
###### Response
@ -524,7 +564,7 @@ curl -H "Accept: application/json" -H "Content-Type: application/json" -X POST -
###### Command
```bash
curl -H "Accept: application/json" -H "Content-Type: application/json" -X POST -d '{"aggregationType":"value","aggregations":["min","max","mean"],"fields":["pulseId","value"],"range":{"startPulseId":0,"endPulseId":3},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query
curl -H "Content-Type: application/json" -X POST -d '{"aggregationType":"value","aggregations":["min","max","mean"],"fields":["pulseId","value"],"range":{"startPulseId":0,"endPulseId":3},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query
```
###### Response
@ -598,7 +638,7 @@ Array value [aggregations](https://github.psi.ch/projects/ST/repos/ch.psi.daq.qu
###### Command
```bash
curl -H "Accept: application/json" -H "Content-Type: application/json" -X POST -d '{"nrOfBins":2,"aggregationType":"value","aggregations":["min","max","mean"],"fields":["pulseId","value"],"range":{"startPulseId":0,"endPulseId":3},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query
curl -H "Content-Type: application/json" -X POST -d '{"nrOfBins":2,"aggregationType":"value","aggregations":["min","max","mean"],"fields":["pulseId","value"],"range":{"startPulseId":0,"endPulseId":3},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query
```
###### Response
@ -657,7 +697,7 @@ Array value [aggregations](https://github.psi.ch/projects/ST/repos/ch.psi.daq.qu
###### Command
```bash
curl -H "Accept: application/json" -H "Content-Type: application/json" -X POST -d '{"binSize":10,"aggregationType":"value","aggregations":["min","max","mean"],"fields":["globalMillis","value"],"range":{"globalMillis":0,"globalMillis":3},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query
curl -H "Content-Type: application/json" -X POST -d '{"binSize":10,"aggregationType":"value","aggregations":["min","max","mean"],"fields":["globalMillis","value"],"range":{"globalMillis":0,"globalMillis":3},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query
```
###### Response
@ -714,7 +754,7 @@ Array value [aggregations](https://github.psi.ch/projects/ST/repos/ch.psi.daq.qu
###### Command
```bash
curl -H "Accept: application/json" -H "Content-Type: application/json" -X POST -d '{"nrOfBins":1,"aggregationType":"index","aggregations":["min","max","mean","sum"],"fields":["pulseId","value"],"range":{"startPulseId":0,"endPulseId":3},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query
curl -H "Content-Type: application/json" -X POST -d '{"nrOfBins":1,"aggregationType":"index","aggregations":["min","max","mean","sum"],"fields":["pulseId","value"],"range":{"startPulseId":0,"endPulseId":3},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query
```
###### Response
@ -783,7 +823,7 @@ curl -H "Accept: application/json" -H "Content-Type: application/json" -X POST -
###### Command
```bash
curl -H "Accept: application/json" -H "Content-Type: application/json" -X POST -d '{"aggregationType":"extrema","aggregations":["min","max","sum"],"fields":["pulseId","value"],"range":{"startPulseId":0,"endPulseId":3},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query
curl -H "Content-Type: application/json" -X POST -d '{"aggregationType":"extrema","aggregations":["min","max","sum"],"fields":["pulseId","value"],"range":{"startPulseId":0,"endPulseId":3},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query
```
###### Response
@ -830,31 +870,3 @@ curl -H "Accept: application/json" -H "Content-Type: application/json" -X POST -
]
```
### CSV Export
Responses can be formatted as csv by requesting `text/csv`.
CSV export does not support `index` and `extrema` aggregations.
### Example
```bash
curl -H "Accept: text/csv" -H "Content-Type: application/json" -X POST -d '{"range":{"startPulseId":0,"endPulseId":4},"channels":["channel1","channel2"],"fields":["channel","pulseId","iocMillis","iocNanos","globalMillis","globalNanos","shape","eventCount","value"]}' http://data-api.psi.ch/sf/query
```
### Response example
The response is in CSV.
```text
channel;pulseId;iocMillis;iocNanos;globalMillis;globalNanos;shape;eventCount;value
testChannel1;0;0;0;0;0;[1];1;0
testChannel1;1;10;0;10;0;[1];1;1
testChannel1;2;20;0;20;0;[1];1;2
testChannel1;3;30;0;30;0;[1];1;3
testChannel1;4;40;0;40;0;[1];1;4
testChannel2;0;0;0;0;0;[1];1;0
testChannel2;1;10;0;10;0;[1];1;1
testChannel2;2;20;0;20;0;[1];1;2
testChannel2;3;30;0;30;0;[1];1;3
testChannel2;4;40;0;40;0;[1];1;4
```

View File

@ -26,6 +26,7 @@ dependencies {
exclude group: 'org.slf4j', module: 'log4j-over-slf4j'
}
compile libraries.commons_lang
compile libraries.commons_io
compile libraries.super_csv
compile libraries.super_csv_dozer

View File

@ -0,0 +1,5 @@
Query duration (s),none / time (s),none / space (mb),gzip / time (s),gzip / space (mb),deflate / time (s),deflate / space (mb)
10,1,35.3,5.1,16.2,5.1,16.2
30,3,108,15.7,49.7,15.6,49.7
60,6,208,30.4,95.5,30.1,95.5
300,25,900,129,413,127,413
1 Query duration (s) none / time (s) none / space (mb) gzip / time (s) gzip / space (mb) deflate / time (s) deflate / space (mb)
2 10 1 35.3 5.1 16.2 5.1 16.2
3 30 3 108 15.7 49.7 15.6 49.7
4 60 6 208 30.4 95.5 30.1 95.5
5 300 25 900 129 413 127 413

View File

@ -60,8 +60,7 @@ public class QueryRestConfig extends WebMvcConfigurerAdapter {
// a nested configuration
// this guarantees that the ordering of the properties file is as expected
// see:
// https://jira.spring.io/browse/SPR-10409?focusedCommentId=101393&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-101393
// see: https://jira.spring.io/browse/SPR-10409?focusedCommentId=101393&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-101393
@Configuration
@Import({QueryConfig.class})
static class InnerConfiguration {

View File

@ -38,6 +38,7 @@ import ch.psi.daq.query.model.AggregationType;
import ch.psi.daq.query.model.DBMode;
import ch.psi.daq.query.model.Query;
import ch.psi.daq.query.model.QueryField;
import ch.psi.daq.query.model.ResponseFormat;
import ch.psi.daq.query.model.impl.DAQQuery;
import ch.psi.daq.query.processor.QueryProcessor;
import ch.psi.daq.query.request.ChannelsRequest;
@ -123,6 +124,89 @@ public class QueryRestController {
}
/**
* Catch-all query method for getting data from the backend for both JSON and CSV requests.
* <p>
* The {@link DAQQuery} object will be a concrete subclass based on the combination of fields
* defined in the user's query. The {@link AttributeBasedDeserializer} decides which class to
* deserialize the information into and has been configured (see
* QueryRestConfig#afterPropertiesSet) accordingly.
*
* @param query concrete implementation of {@link DAQQuery}
* @param res the {@link HttpServletResponse} instance associated with this request
* @throws IOException thrown if writing to the output stream fails
*/
@RequestMapping(
value = QUERY,
method = RequestMethod.POST,
consumes = {MediaType.APPLICATION_JSON_VALUE})
public void executeQuery(@RequestBody @Valid DAQQuery query, HttpServletResponse res) throws Exception {
try {
LOGGER.debug("Executing query '{}'", query.toString());
if (ResponseFormat.JSON.equals(query.getResponseFormat())) {
// write the response back to the client using java 8 streams
jsonResponseStreamWriter.respond(executeQuery(query), query, res);
} else {
// it's a CSV request
executeQueryCsv(query, res);
}
} catch (Exception e) {
LOGGER.error("Failed to execute query '{}'.", query, e);
throw e;
}
}
/**
* Returns data in CSV format to the client.
*/
private void executeQueryCsv(DAQQuery query, HttpServletResponse res) throws Exception {
if (!(query.getAggregationType() == null || AggregationType.value.equals(query.getAggregationType()))) {
// We allow only no aggregation or value aggregation as
// extrema: nested structure and not clear how to map it to one line
// index: value is an array of Statistics whose size is not clear at initialization time
String message = "CSV export does not support '" + query.getAggregationType() + "'";
LOGGER.warn(message);
throw new IllegalArgumentException(message);
}
try {
LOGGER.debug("Executing query '{}'", query.toString());
// write the response back to the client using java 8 streams
csvResponseStreamWriter.respond(executeQuery(query), query, res);
} catch (Exception e) {
LOGGER.error("Failed to execute query '{}'.", query, e);
throw e;
}
}
public Stream<Entry<String, ?>> executeQuery(DAQQuery query) {
QueryAnalyzer queryAnalizer = queryAnalizerFactory.apply(query);
// all the magic happens here
Stream<Entry<String, Stream<? extends DataEvent>>> channelToDataEvents =
getQueryProcessor(query.getDbMode()).process(queryAnalizer);
// do post-process
Stream<Entry<String, ?>> channelToData = queryAnalizer.postProcess(channelToDataEvents);
return channelToData;
}
private QueryProcessor getQueryProcessor(DBMode dbMode) {
if (DBMode.databuffer.equals(dbMode)) {
return cassandraQueryProcessor.get();
} else if (DBMode.archiverappliance.equals(dbMode)) {
return archiverApplianceQueryProcessor.get();
} else {
LOGGER.error("Unknown DBMode '{}'!", dbMode);
throw new IllegalArgumentException(String.format("Unknown DBMode '%s'", dbMode));
}
}
/**
* Returns the current list of {@link Ordering}s available.
*
@ -193,95 +277,4 @@ public class QueryRestController {
return value.toString();
}).collect(Collectors.toList());
}
/**
* Catch-all query method for getting data from the backend for JSON requests.
* <p>
* The {@link DAQQuery} object will be a concrete subclass based on the combination of fields
* defined in the user's query. The {@link AttributeBasedDeserializer} decides which class to
* deserialize the information into and has been configured (see
* QueryRestConfig#afterPropertiesSet) accordingly.
*
* @param query concrete implementation of {@link DAQQuery}
* @param res the {@link HttpServletResponse} instance associated with this request
* @throws IOException thrown if writing to the output stream fails
*/
@RequestMapping(
value = QUERY,
method = RequestMethod.POST,
consumes = {MediaType.APPLICATION_JSON_VALUE},
produces = {MediaType.APPLICATION_JSON_VALUE})
public void executeQueryJson(@RequestBody @Valid DAQQuery query, HttpServletResponse res) throws Exception {
try {
LOGGER.debug("Executing query '{}'", query.toString());
// write the response back to the client using java 8 streams
jsonResponseStreamWriter.respond(executeQuery(query), query, res);
} catch (Exception t) {
LOGGER.error("Failed to execute query '{}'.", query, t);
throw t;
}
}
/**
* Catch-all query method for getting data from the backend for JSON requests.
* <p>
* The {@link DAQQuery} object will be a concrete subclass based on the combination of fields
* defined in the user's query. The {@link AttributeBasedDeserializer} decides which class to
* deserialize the information into and has been configured (see
* QueryRestConfig#afterPropertiesSet) accordingly.
*
* @param query concrete implementation of {@link DAQQuery}
* @param res the {@link HttpServletResponse} instance associated with this request
* @throws IOException thrown if writing to the output stream fails
*/
@RequestMapping(
value = QUERY,
method = RequestMethod.POST,
consumes = {MediaType.APPLICATION_JSON_VALUE},
produces = {CSVResponseStreamWriter.APPLICATION_CSV_VALUE})
public void executeQueryCsv(@RequestBody @Valid DAQQuery query, HttpServletResponse res) throws Exception {
if (!(query.getAggregationType() == null || AggregationType.value.equals(query.getAggregationType()))) {
// We allow only no aggregation or value aggregation as
// extrema: nested structure and not clear how to map it to one line
// index: value is an array of Statistics whose size is not clear at initialization time
String message = "CSV export does not support '" + query.getAggregationType() + "'";
LOGGER.warn(message);
throw new IllegalArgumentException(message);
}
try {
LOGGER.debug("Executing query '{}'", query.toString());
// write the response back to the client using java 8 streams
csvResponseStreamWriter.respond(executeQuery(query), query, res);
} catch (Exception t) {
LOGGER.error("Failed to execute query '{}'.", query, t);
throw t;
}
}
public Stream<Entry<String, ?>> executeQuery(DAQQuery query) {
QueryAnalyzer queryAnalizer = queryAnalizerFactory.apply(query);
// all the magic happens here
Stream<Entry<String, Stream<? extends DataEvent>>> channelToDataEvents =
getQueryProcessor(query.getDbMode()).process(queryAnalizer);
// do post-process
Stream<Entry<String, ?>> channelToData = queryAnalizer.postProcess(channelToDataEvents);
return channelToData;
}
private QueryProcessor getQueryProcessor(DBMode dbMode) {
if (DBMode.databuffer.equals(dbMode)) {
return cassandraQueryProcessor.get();
} else if (DBMode.archiverappliance.equals(dbMode)) {
return archiverApplianceQueryProcessor.get();
} else {
LOGGER.error("Unknown DBMode '{}'!", dbMode);
throw new IllegalArgumentException(String.format("Unknown DBMode '%s'", dbMode));
}
}
}

View File

@ -0,0 +1,58 @@
/**
*
*/
package ch.psi.daq.queryrest.response;
import java.io.OutputStream;
import javax.servlet.http.HttpServletResponse;
import org.springframework.http.MediaType;
import ch.psi.daq.query.model.ResponseFormat;
import ch.psi.daq.query.model.impl.DAQQuery;
/**
* @author zellweger_c
*
*/
public abstract class AbstractResponseStreamWriter implements ResponseStreamWriter {
public static final String CONTENT_TYPE_CSV = "text/csv";
protected static final String CONTENT_TYPE_JSON = MediaType.APPLICATION_JSON_VALUE;
/**
* Configures the output stream and headers according to whether compression is wanted or not.
* <p>
* In order not to lose the information of the underlying type of data being transferred, the
* Content-Type header stays the same but, if compressed, the content-encoding header will be set
* accordingly.
*
* @param query
* @param response
* @return
* @throws Exception
*
* @see http://tools.ietf.org/html/rfc2616#section-14.11 and
* @see http://tools.ietf.org/html/rfc2616#section-3.5
*/
protected OutputStream handleCompressionAndResponseHeaders(DAQQuery query, HttpServletResponse response,
String contentType) throws Exception {
OutputStream out = response.getOutputStream();
response.addHeader("Content-Type", contentType);
if (query.isCompressed()) {
String filename = "data." + query.getCompression().getFileSuffix();
response.addHeader("Content-Disposition", "attachment; filename=" + filename);
response.addHeader("Content-Encoding", query.getCompression().toString());
out = query.getCompression().wrapStream(out);
} else {
String filename = "data." + (query.getResponseFormat() == ResponseFormat.CSV ? "csv" : "json");
response.addHeader("Content-Disposition", "attachment; filename=" + filename);
}
return out;
}
}

View File

@ -5,6 +5,7 @@ import java.util.Map.Entry;
import java.util.stream.Stream;
import javax.servlet.ServletResponse;
import javax.servlet.http.HttpServletResponse;
import ch.psi.daq.query.model.impl.DAQQuery;
@ -19,5 +20,5 @@ public interface ResponseStreamWriter {
* @param response {@link ServletResponse} instance given by the current HTTP request
* @throws IOException thrown if writing to the output stream fails
*/
public void respond(Stream<Entry<String, ?>> stream, DAQQuery query, ServletResponse response) throws Exception;
public void respond(Stream<Entry<String, ?>> stream, DAQQuery query, HttpServletResponse response) throws Exception;
}

View File

@ -1,5 +1,8 @@
package ch.psi.daq.queryrest.response.csv;
import java.io.IOException;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.List;
@ -11,6 +14,7 @@ import java.util.stream.Stream;
import javax.annotation.Resource;
import javax.servlet.ServletResponse;
import javax.servlet.http.HttpServletResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -20,22 +24,21 @@ import org.supercsv.io.dozer.CsvDozerBeanWriter;
import org.supercsv.io.dozer.ICsvDozerBeanWriter;
import org.supercsv.prefs.CsvPreference;
import com.fasterxml.jackson.core.JsonEncoding;
import ch.psi.daq.domain.DataEvent;
import ch.psi.daq.query.analyzer.QueryAnalyzer;
import ch.psi.daq.query.model.Aggregation;
import ch.psi.daq.query.model.Query;
import ch.psi.daq.query.model.QueryField;
import ch.psi.daq.query.model.impl.DAQQuery;
import ch.psi.daq.queryrest.response.ResponseStreamWriter;
import ch.psi.daq.queryrest.response.AbstractResponseStreamWriter;
import com.fasterxml.jackson.core.JsonEncoding;
/**
* Takes a Java 8 stream and writes it to the output stream provided by the {@link ServletResponse}
* of the current request.
*/
public class CSVResponseStreamWriter implements ResponseStreamWriter {
public static final String APPLICATION_CSV_VALUE = "text/csv";
public class CSVResponseStreamWriter extends AbstractResponseStreamWriter {
private static final char DELIMITER_CVS = ';';
private static final char DELIMITER_ARRAY = ',';
@ -46,59 +49,100 @@ public class CSVResponseStreamWriter implements ResponseStreamWriter {
private Function<Query, QueryAnalyzer> queryAnalizerFactory;
@Override
public void respond(Stream<Entry<String, ?>> stream, DAQQuery query, ServletResponse response) throws Exception {
public void respond(Stream<Entry<String, ?>> stream, DAQQuery query, HttpServletResponse response) throws Exception {
response.setCharacterEncoding(JsonEncoding.UTF8.getJavaName());
response.setContentType(APPLICATION_CSV_VALUE);
response.setContentType(CONTENT_TYPE_CSV);
respondInternal(stream, query, response);
}
private void respondInternal(Stream<Entry<String, ?>> stream, DAQQuery query, ServletResponse response)
private void respondInternal(Stream<Entry<String, ?>> stream, DAQQuery query, HttpServletResponse response)
throws Exception {
Set<QueryField> queryFields = query.getFields();
List<Aggregation> aggregations = query.getAggregations();
int aggregationsSize = (aggregations != null ? aggregations.size() : 0);
Set<String> fieldMapping =
new LinkedHashSet<>(queryFields.size() + (aggregations != null ? aggregations.size() : 0));
List<String> header =
new ArrayList<>(queryFields.size() + (aggregations != null ? aggregations.size() : 0));
Set<String> fieldMapping = new LinkedHashSet<>(queryFields.size() + aggregationsSize);
List<String> header = new ArrayList<>(queryFields.size() + aggregationsSize);
CellProcessor[] processors = setupCellProcessors(query, fieldMapping, header);
OutputStream out = handleCompressionAndResponseHeaders(query, response, CONTENT_TYPE_CSV);
ICsvDozerBeanWriter beanWriter = setupBeanWriter(fieldMapping, out);
writeToOutput(stream, header, processors, beanWriter);
}
/**
* Sets up the bean writer instance.
* @throws Exception
*
*/
private ICsvDozerBeanWriter setupBeanWriter(Set<String> fieldMapping, OutputStream out) throws Exception {
CsvPreference preference = new CsvPreference.Builder(
(char) CsvPreference.EXCEL_NORTH_EUROPE_PREFERENCE.getQuoteChar(),
DELIMITER_CVS,
CsvPreference.EXCEL_NORTH_EUROPE_PREFERENCE.getEndOfLineSymbols()).build();
ICsvDozerBeanWriter beanWriter = new CsvDozerBeanWriter(new OutputStreamWriter(out), preference);
// configure the mapping from the fields to the CSV columns
beanWriter.configureBeanMapping(DataEvent.class, fieldMapping.toArray(new String[fieldMapping.size()]));
return beanWriter;
}
/**
* Sets up the array of {@link CellProcessor}s needed for later configuration of the bean writer.
*
* Cell processors are an integral part of reading and writing with Super CSV - they automate the
* data type conversions, and enforce constraints. They implement the chain of responsibility
* design pattern - each processor has a single, well-defined purpose and can be chained together
* with other processors to fully automate all of the required conversions and constraint
* validation for a single CSV column.
*
* @param query The current {@link DAQQuery}
* @return Array of {@link CellProcessor} entries
*/
private CellProcessor[] setupCellProcessors(DAQQuery query, Set<String> fieldMapping, List<String> header) {
Set<QueryField> queryFields = query.getFields();
List<Aggregation> aggregations = query.getAggregations();
List<CellProcessor> processorSet =
new ArrayList<>(queryFields.size() + (aggregations != null ? aggregations.size() : 0));
boolean isNewField;
QueryAnalyzer queryAnalyzer = queryAnalizerFactory.apply(query);
for (QueryField field : queryFields) {
if(!(QueryField.value.equals(field) && queryAnalyzer.isAggregationEnabled())){
isNewField = fieldMapping.add(field.name());
if (isNewField) {
header.add(field.name());
processorSet.add(new ArrayProcessor(new NotNull(), DELIMITER_ARRAY));
}
}
}
if (aggregations != null && queryAnalyzer.isAggregationEnabled()) {
for (Aggregation aggregation : query.getAggregations()) {
isNewField = fieldMapping.add("value." + aggregation.name());
if (isNewField) {
header.add(aggregation.name());
processorSet.add(new ArrayProcessor(new NotNull(), DELIMITER_ARRAY));
}
}
}
return processorSet.toArray(new CellProcessor[processorSet.size()]);
}
CellProcessor[] processors = processorSet.toArray(new CellProcessor[processorSet.size()]);
CsvPreference preference = new CsvPreference.Builder(
(char) CsvPreference.EXCEL_NORTH_EUROPE_PREFERENCE.getQuoteChar(),
DELIMITER_CVS,
CsvPreference.EXCEL_NORTH_EUROPE_PREFERENCE.getEndOfLineSymbols()).build();
ICsvDozerBeanWriter beanWriter = new CsvDozerBeanWriter(response.getWriter(), preference);
@SuppressWarnings("unchecked")
private void writeToOutput(Stream<Entry<String, ?>> stream, List<String> header, CellProcessor[] processors,
ICsvDozerBeanWriter beanWriter) throws IOException, Exception {
AtomicReference<Exception> exception = new AtomicReference<>();
try {
// configure the mapping from the fields to the CSV columns
beanWriter.configureBeanMapping(DataEvent.class, fieldMapping.toArray(new String[fieldMapping.size()]));
beanWriter.writeHeader(header.toArray(new String[header.size()]));
@ -106,28 +150,27 @@ public class CSVResponseStreamWriter implements ResponseStreamWriter {
/* ensure elements are sequentially written */
.sequential()
.forEach(
entry -> {
if (entry.getValue() instanceof Stream) {
@SuppressWarnings("unchecked")
Stream<DataEvent> eventStream = (Stream<DataEvent>) entry.getValue();
eventStream
.forEach(
event -> {
try {
beanWriter.write(event, processors);
} catch (Exception e) {
LOGGER.error("Could not write elements for '{}-{}'", event.getChannel(),
event.getPulseId(), e);
exception.compareAndSet(null, e);
}
});
} else {
String message = "Type '" + entry.getValue().getClass() + "' not supported.";
LOGGER.error(message);
exception.compareAndSet(null, new RuntimeException(message));
}
});
entry -> {
if (entry.getValue() instanceof Stream) {
Stream<DataEvent> eventStream = (Stream<DataEvent>) entry.getValue();
eventStream
.forEach(
event -> {
try {
beanWriter.write(event, processors);
} catch (Exception e) {
LOGGER.error("Could not write elements for '{}-{}'", event.getChannel(),
event.getPulseId(), e);
exception.compareAndSet(null, e);
}
});
} else {
String message = "Type '" + entry.getValue().getClass() + "' not supported.";
LOGGER.error(message);
exception.compareAndSet(null, new RuntimeException(message));
}
}
);
} finally {
beanWriter.close();
}

View File

@ -1,20 +1,27 @@
package ch.psi.daq.queryrest.response.json;
import java.io.IOException;
import java.util.Map.Entry;
import java.io.OutputStream;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;
import javax.annotation.Resource;
import javax.servlet.ServletResponse;
import javax.servlet.http.HttpServletResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.MediaType;
import ch.psi.daq.query.model.Aggregation;
import ch.psi.daq.query.model.QueryField;
import ch.psi.daq.query.model.impl.DAQQuery;
import ch.psi.daq.queryrest.response.AbstractResponseStreamWriter;
import com.fasterxml.jackson.core.JsonEncoding;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
@ -23,16 +30,11 @@ import com.fasterxml.jackson.databind.ObjectWriter;
import com.fasterxml.jackson.databind.ser.impl.SimpleBeanPropertyFilter;
import com.fasterxml.jackson.databind.ser.impl.SimpleFilterProvider;
import ch.psi.daq.query.model.Aggregation;
import ch.psi.daq.query.model.QueryField;
import ch.psi.daq.query.model.impl.DAQQuery;
import ch.psi.daq.queryrest.response.ResponseStreamWriter;
/**
* Takes a Java 8 stream and writes it to the output stream provided by the {@link ServletResponse}
* of the current request.
*/
public class JSONResponseStreamWriter implements ResponseStreamWriter {
public class JSONResponseStreamWriter extends AbstractResponseStreamWriter {
private static final String DATA_RESP_FIELD = "data";
@ -45,14 +47,15 @@ public class JSONResponseStreamWriter implements ResponseStreamWriter {
private ObjectMapper mapper;
@Override
public void respond(Stream<Entry<String, ?>> stream, DAQQuery query, ServletResponse response) throws Exception {
public void respond(Stream<Entry<String, ?>> stream, DAQQuery query, HttpServletResponse response) throws Exception {
response.setCharacterEncoding(JsonEncoding.UTF8.getJavaName());
response.setContentType(MediaType.APPLICATION_JSON_VALUE);
Set<String> includedFields = getFields(query);
ObjectWriter writer = configureWriter(includedFields);
respondInternal(stream, response, writer);
respondInternal(stream, response, writer, query);
}
protected Set<String> getFields(DAQQuery query) {
@ -99,11 +102,14 @@ public class JSONResponseStreamWriter implements ResponseStreamWriter {
* @param stream Mapping from channel name to data
* @param response {@link ServletResponse} instance given by the current HTTP request
* @param writer configured writer that includes the fields the end user wants to see
* @param query
* @throws IOException thrown if writing to the output stream fails
*/
private void respondInternal(Stream<Entry<String, ?>> stream, ServletResponse response, ObjectWriter writer)
private void respondInternal(Stream<Entry<String, ?>> stream, HttpServletResponse response, ObjectWriter writer, DAQQuery query)
throws Exception {
JsonGenerator generator = jsonFactory.createGenerator(response.getOutputStream(), JsonEncoding.UTF8);
OutputStream out = handleCompressionAndResponseHeaders(query, response, CONTENT_TYPE_JSON);
JsonGenerator generator = jsonFactory.createGenerator(out, JsonEncoding.UTF8);
AtomicReference<Exception> exception = new AtomicReference<>();
try {

View File

@ -31,11 +31,12 @@ import ch.psi.daq.cassandra.util.test.CassandraDataGen;
import ch.psi.daq.common.ordering.Ordering;
import ch.psi.daq.query.model.Aggregation;
import ch.psi.daq.query.model.AggregationType;
import ch.psi.daq.query.model.Compression;
import ch.psi.daq.query.model.QueryField;
import ch.psi.daq.query.model.ResponseFormat;
import ch.psi.daq.query.model.impl.DAQQuery;
import ch.psi.daq.queryrest.controller.QueryRestController;
import ch.psi.daq.queryrest.filter.CorsFilter;
import ch.psi.daq.queryrest.response.csv.CSVResponseStreamWriter;
import ch.psi.daq.test.cassandra.admin.CassandraTestAdmin;
import ch.psi.daq.test.queryrest.AbstractDaqRestTest;
@ -68,6 +69,9 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest {
0,
1),
TEST_CHANNEL_NAMES);
request.setCompression(Compression.NONE);
request.setResponseFormat(ResponseFormat.CSV);
LinkedHashSet<QueryField> queryFields = new LinkedHashSet<>();
LinkedHashSet<CellProcessor> cellProcessors = new LinkedHashSet<>();
queryFields.add(QueryField.channel);
@ -96,7 +100,6 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest {
MvcResult result = this.mockMvc
.perform(MockMvcRequestBuilders
.post(QueryRestController.QUERY)
.accept(CSVResponseStreamWriter.APPLICATION_CSV_VALUE)
.contentType(MediaType.APPLICATION_JSON)
.content(content))
.andDo(MockMvcResultHandlers.print())
@ -143,6 +146,9 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest {
0,
1),
channelName);
request.setCompression(Compression.NONE);
request.setResponseFormat(ResponseFormat.CSV);
LinkedHashSet<QueryField> queryFields = new LinkedHashSet<>();
LinkedHashSet<CellProcessor> cellProcessors = new LinkedHashSet<>();
queryFields.add(QueryField.channel);
@ -171,7 +177,6 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest {
MvcResult result = this.mockMvc
.perform(MockMvcRequestBuilders
.post(QueryRestController.QUERY)
.accept(CSVResponseStreamWriter.APPLICATION_CSV_VALUE)
.contentType(MediaType.APPLICATION_JSON)
.content(content))
.andDo(MockMvcResultHandlers.print())
@ -213,6 +218,9 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest {
0,
10),
TEST_CHANNEL_NAMES);
request.setCompression(Compression.NONE);
request.setResponseFormat(ResponseFormat.CSV);
LinkedHashSet<QueryField> queryFields = new LinkedHashSet<>();
LinkedHashSet<CellProcessor> cellProcessors = new LinkedHashSet<>();
queryFields.add(QueryField.channel);
@ -241,7 +249,6 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest {
MvcResult result = this.mockMvc
.perform(MockMvcRequestBuilders
.post(QueryRestController.QUERY)
.accept(CSVResponseStreamWriter.APPLICATION_CSV_VALUE)
.contentType(MediaType.APPLICATION_JSON)
.content(content))
.andDo(MockMvcResultHandlers.print())
@ -289,6 +296,9 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest {
startDate,
endDate),
TEST_CHANNEL_NAMES);
request.setCompression(Compression.NONE);
request.setResponseFormat(ResponseFormat.CSV);
LinkedHashSet<QueryField> queryFields = new LinkedHashSet<>();
LinkedHashSet<CellProcessor> cellProcessors = new LinkedHashSet<>();
queryFields.add(QueryField.channel);
@ -317,7 +327,6 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest {
MvcResult result = this.mockMvc
.perform(MockMvcRequestBuilders
.post(QueryRestController.QUERY)
.accept(CSVResponseStreamWriter.APPLICATION_CSV_VALUE)
.contentType(MediaType.APPLICATION_JSON)
.content(content))
.andDo(MockMvcResultHandlers.print())
@ -366,6 +375,8 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest {
Ordering.asc,
AggregationType.extrema,
TEST_CHANNEL_NAMES[0]);
request.setCompression(Compression.NONE);
request.setResponseFormat(ResponseFormat.CSV);
String content = mapper.writeValueAsString(request);
@ -373,7 +384,6 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest {
this.mockMvc
.perform(MockMvcRequestBuilders
.post(QueryRestController.QUERY)
.accept(CSVResponseStreamWriter.APPLICATION_CSV_VALUE)
.contentType(MediaType.APPLICATION_JSON)
.content(content))
.andDo(MockMvcResultHandlers.print());
@ -397,6 +407,8 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest {
Ordering.asc,
AggregationType.index,
TEST_CHANNEL_NAMES[0]);
request.setCompression(Compression.NONE);
request.setResponseFormat(ResponseFormat.CSV);
String content = mapper.writeValueAsString(request);
@ -404,7 +416,6 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest {
this.mockMvc
.perform(MockMvcRequestBuilders
.post(QueryRestController.QUERY)
.accept(CSVResponseStreamWriter.APPLICATION_CSV_VALUE)
.contentType(MediaType.APPLICATION_JSON)
.content(content))
.andDo(MockMvcResultHandlers.print());
@ -430,6 +441,8 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest {
endDate),
TEST_CHANNEL_01);
request.setNrOfBins(2);
request.setCompression(Compression.NONE);
request.setResponseFormat(ResponseFormat.CSV);
LinkedHashSet<QueryField> queryFields = new LinkedHashSet<>();
LinkedHashSet<CellProcessor> cellProcessors = new LinkedHashSet<>();
@ -467,7 +480,6 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest {
MvcResult result = this.mockMvc
.perform(MockMvcRequestBuilders
.post(QueryRestController.QUERY)
.accept(CSVResponseStreamWriter.APPLICATION_CSV_VALUE)
.contentType(MediaType.APPLICATION_JSON)
.content(content))
.andDo(MockMvcResultHandlers.print())
@ -515,6 +527,8 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest {
endDate),
TEST_CHANNEL_01);
request.setBinSize(100);
request.setCompression(Compression.NONE);
request.setResponseFormat(ResponseFormat.CSV);
LinkedHashSet<QueryField> queryFields = new LinkedHashSet<>();
LinkedHashSet<CellProcessor> cellProcessors = new LinkedHashSet<>();
@ -552,7 +566,6 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest {
MvcResult result = this.mockMvc
.perform(MockMvcRequestBuilders
.post(QueryRestController.QUERY)
.accept(CSVResponseStreamWriter.APPLICATION_CSV_VALUE)
.contentType(MediaType.APPLICATION_JSON)
.content(content))
.andDo(MockMvcResultHandlers.print())
@ -587,4 +600,50 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest {
mapReader.close();
}
}
@Test
public void testGzipFileSuffixHeader() throws Exception {
DAQQuery request = new DAQQuery(
new RequestRangePulseId(
10,
11),
TEST_CHANNEL_NAMES);
request.setResponseFormat(ResponseFormat.CSV);
request.setCompression(Compression.GZIP);
String content = mapper.writeValueAsString(request);
this.mockMvc
.perform(MockMvcRequestBuilders
.post(QueryRestController.QUERY)
.contentType(MediaType.APPLICATION_JSON)
.content(content))
.andDo(MockMvcResultHandlers.print())
.andExpect(MockMvcResultMatchers.status().isOk())
.andExpect(MockMvcResultMatchers.header().string("Content-Disposition", "attachment; filename=data.gz"));
}
@Test
public void testJsonFileSuffixHeader() throws Exception {
DAQQuery request = new DAQQuery(
new RequestRangePulseId(
10,
11),
TEST_CHANNEL_NAMES);
request.setResponseFormat(ResponseFormat.CSV);
request.setCompression(Compression.NONE);
String content = mapper.writeValueAsString(request);
this.mockMvc
.perform(MockMvcRequestBuilders
.post(QueryRestController.QUERY)
.contentType(MediaType.APPLICATION_JSON)
.content(content))
.andDo(MockMvcResultHandlers.print())
.andExpect(MockMvcResultMatchers.status().isOk())
.andExpect(MockMvcResultMatchers.header().string("Content-Disposition", "attachment; filename=data.csv"));
}
}

View File

@ -16,6 +16,7 @@ import ch.psi.daq.cassandra.request.range.RequestRangeTime;
import ch.psi.daq.cassandra.util.test.CassandraDataGen;
import ch.psi.daq.common.ordering.Ordering;
import ch.psi.daq.query.model.AggregationType;
import ch.psi.daq.query.model.Compression;
import ch.psi.daq.query.model.impl.DAQQuery;
import ch.psi.daq.queryrest.controller.QueryRestController;
import ch.psi.daq.queryrest.filter.CorsFilter;
@ -173,6 +174,7 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest {
10,
11),
TEST_CHANNEL_NAMES);
request.setCompression(Compression.NONE);
String content = mapper.writeValueAsString(request);
System.out.println(content);
@ -180,7 +182,6 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest {
this.mockMvc
.perform(MockMvcRequestBuilders
.post(QueryRestController.QUERY)
.accept(MediaType.APPLICATION_JSON)
.contentType(MediaType.APPLICATION_JSON)
.content(content))
@ -209,12 +210,12 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest {
100,
110),
TEST_CHANNEL_NAMES);
request.setCompression(Compression.NONE);
String content = mapper.writeValueAsString(request);
this.mockMvc.perform(MockMvcRequestBuilders
.post(QueryRestController.QUERY)
.accept(MediaType.APPLICATION_JSON)
.contentType(MediaType.APPLICATION_JSON)
.content(content))
@ -246,6 +247,7 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest {
startDate,
endDate),
TEST_CHANNEL_NAMES);
request.setCompression(Compression.NONE);
String content = mapper.writeValueAsString(request);
System.out.println(content);
@ -254,7 +256,6 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest {
.perform(
MockMvcRequestBuilders
.post(QueryRestController.QUERY)
.accept(MediaType.APPLICATION_JSON)
.contentType(MediaType.APPLICATION_JSON)
.content(content)
)
@ -287,12 +288,12 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest {
Ordering.asc,
AggregationType.extrema,
TEST_CHANNEL_NAMES[0]);
request.setCompression(Compression.NONE);
String content = mapper.writeValueAsString(request);
this.mockMvc
.perform(MockMvcRequestBuilders.post(QueryRestController.QUERY)
.accept(MediaType.APPLICATION_JSON)
.contentType(MediaType.APPLICATION_JSON)
.content(content))
@ -331,6 +332,7 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest {
endDate),
TEST_CHANNEL_01);
request.setNrOfBins(2);
request.setCompression(Compression.NONE);
String content = mapper.writeValueAsString(request);
System.out.println(content);
@ -339,7 +341,6 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest {
.perform(
MockMvcRequestBuilders
.post(QueryRestController.QUERY)
.accept(MediaType.APPLICATION_JSON)
.contentType(MediaType.APPLICATION_JSON)
.content(content)
)
@ -369,6 +370,7 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest {
endDate),
TEST_CHANNEL_01);
request.setBinSize(100);
request.setCompression(Compression.NONE);
String content = mapper.writeValueAsString(request);
System.out.println(content);
@ -377,7 +379,6 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest {
.perform(
MockMvcRequestBuilders
.post(QueryRestController.QUERY)
.accept(MediaType.APPLICATION_JSON)
.contentType(MediaType.APPLICATION_JSON)
.content(content)
)
@ -418,4 +419,71 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest {
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[9].globalMillis").value(1900))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[9].eventCount").value(10));
}
@Test
public void testGzipCompression() throws Exception {
DAQQuery request = new DAQQuery(
new RequestRangePulseId(
10,
11),
TEST_CHANNEL_NAMES);
request.setCompression(Compression.GZIP);
String content = mapper.writeValueAsString(request);
System.out.println(content);
this.mockMvc
.perform(MockMvcRequestBuilders
.post(QueryRestController.QUERY)
.contentType(MediaType.APPLICATION_JSON)
.content(content))
.andDo(MockMvcResultHandlers.print())
.andExpect(MockMvcResultMatchers.status().isOk());
}
@Test
public void testGzipFileSuffixHeader() throws Exception {
DAQQuery request = new DAQQuery(
new RequestRangePulseId(
10,
11),
TEST_CHANNEL_NAMES);
request.setCompression(Compression.GZIP);
String content = mapper.writeValueAsString(request);
this.mockMvc
.perform(MockMvcRequestBuilders
.post(QueryRestController.QUERY)
.contentType(MediaType.APPLICATION_JSON)
.content(content))
.andDo(MockMvcResultHandlers.print())
.andExpect(MockMvcResultMatchers.status().isOk())
.andExpect(MockMvcResultMatchers.header().string("Content-Disposition", "attachment; filename=data.gz"));
}
@Test
public void testJsonFileSuffixHeader() throws Exception {
DAQQuery request = new DAQQuery(
new RequestRangePulseId(
10,
11),
TEST_CHANNEL_NAMES);
request.setCompression(Compression.NONE);
String content = mapper.writeValueAsString(request);
this.mockMvc
.perform(MockMvcRequestBuilders
.post(QueryRestController.QUERY)
.contentType(MediaType.APPLICATION_JSON)
.content(content))
.andDo(MockMvcResultHandlers.print())
.andExpect(MockMvcResultMatchers.status().isOk())
.andExpect(MockMvcResultMatchers.header().string("Content-Disposition", "attachment; filename=data.json"));
}
}