- implementing compression of result streams
- allowing for different responseFormats (JSON, CSV)
This commit is contained in:
Zellweger Christof Ralf
2015-12-21 10:09:26 +01:00
parent 04583789cb
commit 01a74f9e6b
10 changed files with 364 additions and 212 deletions

View File

@ -5,6 +5,11 @@
<projects>
</projects>
<buildSpec>
<buildCommand>
<name>org.eclipse.wst.common.project.facet.core.builder</name>
<arguments>
</arguments>
</buildCommand>
<buildCommand>
<name>org.eclipse.jdt.core.javabuilder</name>
<arguments>
@ -20,5 +25,6 @@
<nature>org.springframework.ide.eclipse.core.springnature</nature>
<nature>org.springsource.ide.eclipse.gradle.core.nature</nature>
<nature>org.eclipse.jdt.core.javanature</nature>
<nature>org.eclipse.wst.common.project.facet.core.nature</nature>
</natures>
</projectDescription>

129
Readme.md
View File

@ -48,6 +48,7 @@ The REST interface is accessible through `http://data-api.psi.ch/sf`.
<a name="query_channel_names"/>
## Query Channel Names
### Request
@ -71,14 +72,14 @@ POST http://<host>:<port>/channels
### Example
```bash
curl -H "Accept: application/json" -H "Content-Type: application/json" -X POST -d '{"regex": "TRFCA|TRFCB"}' http://data-api.psi.ch/sf/channels
curl -H "Content-Type: application/json" -X POST -d '{"regex": "TRFCA|TRFCB"}' http://data-api.psi.ch/sf/channels
```
<a name="query_range"/>
## Query Range
Queries are applied to a range. Following ranges are supported.
Queries are applied to a range. The following types of ranges ranges are supported.
### By Pulse-Id
@ -131,17 +132,68 @@ Queries are applied to a range. Following ranges are supported.
## Query Data
### Request
### `compressed`: data is compressed by default
To save bandwidth, all data transferred from the server to the client is compressed (gzipped) by default. In case compressing the data is too processor-intense, it can be disabled by specifying `compressed=false` in the body of the request.
Because of this, we have to tell `curl` that the data is compressed so that it is being decompressed automatically. `curl` decompresses the response when the `--compressed` parameter is set:
#### Example
```bash
curl --compressed -H "Content-Type: application/json" -X POST -d '{"range":{"startPulseId":0,"endPulseId":3},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query
```
If we want the raw data uncompressed from the server, we have to specify this in the query body parameter with by specifying `compressed=false`:
```bash
curl -H "Content-Type: application/json" -X POST -d '{"compressed":false,"range":{"startPulseId":0,"endPulseId":3},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query
```
### `responseFormat`: data is in JSON by default
Responses can be formatted as CSV or JSON using the `responseFormat` field. The returned data is JSON-formatted per default.
CSV export does not support `index` and `extrema` aggregations.
#### Example
```bash
curl --compressed -H "Content-Type: application/json" -X POST -d '{"responseFormat":"csv","range":{"startPulseId":0,"endPulseId":4},"channels":["channel1","channel2"],"fields":["channel","pulseId","iocMillis","iocNanos","globalMillis","globalNanos","shape","eventCount","value"]}' http://data-api.psi.ch/sf/query
```
#### Response example
The response is in CSV.
```text
channel;pulseId;iocMillis;iocNanos;globalMillis;globalNanos;shape;eventCount;value
testChannel1;0;0;0;0;0;[1];1;0
testChannel1;1;10;0;10;0;[1];1;1
testChannel1;2;20;0;20;0;[1];1;2
testChannel1;3;30;0;30;0;[1];1;3
testChannel1;4;40;0;40;0;[1];1;4
testChannel2;0;0;0;0;0;[1];1;0
testChannel2;1;10;0;10;0;[1];1;1
testChannel2;2;20;0;20;0;[1];1;2
testChannel2;3;30;0;30;0;[1];1;3
testChannel2;4;40;0;40;0;[1];1;4
```
### Query request endpoint
```
GET http://<host>:<port>/query
```
#### Data
#### Request body
A request is performed using JSON. The JSON query defines the channels to be queried, the range, and how the data should be aggregated (this is optional but highly recommended).
A request is performed by sending a valid JSON object in the HTTP request body. The JSON query defines the channels to be queried, the range, and how the data should be aggregated (this is optional but highly recommended).
There exist following fields:
The following attributes can be specified:
- **channels**: Array of channel names to be queried.
- **range**: The range of the query (see [Query Range](Readme.md#query_range)).
@ -153,12 +205,21 @@ There exist following fields:
- **aggregationType**: Specifies the type of aggregation (see [here](https://github.psi.ch/projects/ST/repos/ch.psi.daq.query/browse/src/main/java/ch/psi/daq/query/model/AggregationType.java)). The default type is *value* aggregation (e.g., sum([1,2,3])=6). Alternatively, it is possible to define *index* aggregation for multiple arrays in combination with binning (e.g., sum([1,2,3], [3,2,1]) = [4,4,4]).
- **aggregateChannels**: Specifies whether the data of the requested channels should be combined together using the defined aggregation (values: true|**false**)
- **dbMode**: Defines the database to access (values: **databuffer**|archiverappliance)
- **compressed**: Defines whether the response should be compressed or not (values: **true**|false)
- **responseFormat**: Specifies the format the response of the requested data is in, either in JSON or CSV format (values: **json**|csv)
### Example
Compressed data but uncompressed by `curl`:
```bash
curl -H "Accept: application/json" -H "Content-Type: application/json" -X POST -d '{"range":{"startPulseId":0,"endPulseId":4},"channels":["channel1","channel2"]}' http://data-api.psi.ch/sf/query
curl --compressed -H "Content-Type: application/json" -X POST -d '{"range":{"startPulseId":0,"endPulseId":4},"channels":["channel1","channel2"]}' http://data-api.psi.ch/sf/query
```
Raw, uncompressed data (returns non-human-readable data):
```bash
curl -H "Content-Type: application/json" -X POST -d '{"compressed": false,"range":{"startPulseId":0,"endPulseId":4},"channels":["channel1","channel2"]}' http://data-api.psi.ch/sf/query
```
### Response example
@ -222,7 +283,7 @@ The response is in JSON.
### Example Queries
Following examples build on a waveform data (see below). They also work for scalars (consider it as a waveform of length = 1) and images (waveform of length = dimX * dimY).
The following examples build on waveform data (see below). They also work for scalars (consider it as a waveform of length = 1) and images (waveform of length = dimX * dimY).
![Data Visualization](doc/images/Data_Visualization.png)
@ -299,7 +360,7 @@ Following examples build on a waveform data (see below). They also work for scal
###### Command
```bash
curl -H "Accept: application/json" -H "Content-Type: application/json" -X POST -d '{"range":{"startPulseId":0,"endPulseId":3},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query
curl --compressed -H "Content-Type: application/json" -X POST -d '{"range":{"startPulseId":0,"endPulseId":3},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query
```
###### Response
@ -327,7 +388,7 @@ See JSON representation of the data above.
###### Command
```bash
curl -H "Accept: application/json" -H "Content-Type: application/json" -X POST -d '{"range":{"startMillis":0,"startNanos":0,"endMillis":30,"endNanos":999999},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query
curl --compressed -H "Content-Type: application/json" -X POST -d '{"range":{"startMillis":0,"startNanos":0,"endMillis":30,"endNanos":999999},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query
```
###### Response
@ -357,7 +418,7 @@ Supported format is ISO8601 *YYYY-MM-DDThh:mm:ss.sTZD* (e.g. *1997-07-16T19:20:3
###### Command
```bash
curl -H "Accept: application/json" -H "Content-Type: application/json" -X POST -d '{"range":{"startDate":"1970-01-01T01:00:00.000","startNanos":0,"endDate":"1970-01-01T01:00:00.030","endNanos":999999},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query
curl --compressed -H "Content-Type: application/json" -X POST -d '{"range":{"startDate":"1970-01-01T01:00:00.000","startNanos":0,"endDate":"1970-01-01T01:00:00.030","endNanos":999999},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query
```
###### Response
@ -389,7 +450,7 @@ Archiver Appliance supports queries by *time range* and *date range* only (as it
###### Command
```bash
curl -H "Accept: application/json" -H "Content-Type: application/json" -X POST -d '{"dbmode":"archiverappliance","range":{"startMillis":0,"startNanos":0,"endMillis":30,"endNanos":999999},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query
curl --compressed -H "Content-Type: application/json" -X POST -d '{"dbmode":"archiverappliance","range":{"startMillis":0,"startNanos":0,"endMillis":30,"endNanos":999999},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query
```
###### Response
@ -418,7 +479,7 @@ Allows for server side optimizations since not all data needs to be retrieved.
###### Command
```bash
curl -H "Accept: application/json" -H "Content-Type: application/json" -X POST -d '{"fields":["pulseId","value"],"range":{"startPulseId":0,"endPulseId":3},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query
curl --compressed -H "Content-Type: application/json" -X POST -d '{"fields":["pulseId","value"],"range":{"startPulseId":0,"endPulseId":3},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query
```
###### Response
@ -471,7 +532,7 @@ Use **none** in case ordering does not matter (allows for server side optimizati
###### Command
```bash
curl -H "Accept: application/json" -H "Content-Type: application/json" -X POST -d '{"ordering":"desc","fields":["pulseId","value"],"range":{"startPulseId":0,"endPulseId":3},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query
curl --compressed -H "Content-Type: application/json" -X POST -d '{"ordering":"desc","fields":["pulseId","value"],"range":{"startPulseId":0,"endPulseId":3},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query
```
###### Response
@ -524,7 +585,7 @@ curl -H "Accept: application/json" -H "Content-Type: application/json" -X POST -
###### Command
```bash
curl -H "Accept: application/json" -H "Content-Type: application/json" -X POST -d '{"aggregationType":"value","aggregations":["min","max","mean"],"fields":["pulseId","value"],"range":{"startPulseId":0,"endPulseId":3},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query
curl --compressed -H "Content-Type: application/json" -X POST -d '{"aggregationType":"value","aggregations":["min","max","mean"],"fields":["pulseId","value"],"range":{"startPulseId":0,"endPulseId":3},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query
```
###### Response
@ -598,7 +659,7 @@ Array value [aggregations](https://github.psi.ch/projects/ST/repos/ch.psi.daq.qu
###### Command
```bash
curl -H "Accept: application/json" -H "Content-Type: application/json" -X POST -d '{"nrOfBins":2,"aggregationType":"value","aggregations":["min","max","mean"],"fields":["pulseId","value"],"range":{"startPulseId":0,"endPulseId":3},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query
curl --compressed -H "Content-Type: application/json" -X POST -d '{"nrOfBins":2,"aggregationType":"value","aggregations":["min","max","mean"],"fields":["pulseId","value"],"range":{"startPulseId":0,"endPulseId":3},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query
```
###### Response
@ -657,7 +718,7 @@ Array value [aggregations](https://github.psi.ch/projects/ST/repos/ch.psi.daq.qu
###### Command
```bash
curl -H "Accept: application/json" -H "Content-Type: application/json" -X POST -d '{"binSize":10,"aggregationType":"value","aggregations":["min","max","mean"],"fields":["globalMillis","value"],"range":{"globalMillis":0,"globalMillis":3},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query
curl --compressed -H "Content-Type: application/json" -X POST -d '{"binSize":10,"aggregationType":"value","aggregations":["min","max","mean"],"fields":["globalMillis","value"],"range":{"globalMillis":0,"globalMillis":3},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query
```
###### Response
@ -714,7 +775,7 @@ Array value [aggregations](https://github.psi.ch/projects/ST/repos/ch.psi.daq.qu
###### Command
```bash
curl -H "Accept: application/json" -H "Content-Type: application/json" -X POST -d '{"nrOfBins":1,"aggregationType":"index","aggregations":["min","max","mean","sum"],"fields":["pulseId","value"],"range":{"startPulseId":0,"endPulseId":3},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query
curl --compressed -H "Content-Type: application/json" -X POST -d '{"nrOfBins":1,"aggregationType":"index","aggregations":["min","max","mean","sum"],"fields":["pulseId","value"],"range":{"startPulseId":0,"endPulseId":3},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query
```
###### Response
@ -783,7 +844,7 @@ curl -H "Accept: application/json" -H "Content-Type: application/json" -X POST -
###### Command
```bash
curl -H "Accept: application/json" -H "Content-Type: application/json" -X POST -d '{"aggregationType":"extrema","aggregations":["min","max","sum"],"fields":["pulseId","value"],"range":{"startPulseId":0,"endPulseId":3},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query
curl --compressed -H "Content-Type: application/json" -X POST -d '{"aggregationType":"extrema","aggregations":["min","max","sum"],"fields":["pulseId","value"],"range":{"startPulseId":0,"endPulseId":3},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query
```
###### Response
@ -830,31 +891,3 @@ curl -H "Accept: application/json" -H "Content-Type: application/json" -X POST -
]
```
### CSV Export
Responses can be formatted as csv by requesting `text/csv`.
CSV export does not support `index` and `extrema` aggregations.
### Example
```bash
curl -H "Accept: text/csv" -H "Content-Type: application/json" -X POST -d '{"range":{"startPulseId":0,"endPulseId":4},"channels":["channel1","channel2"],"fields":["channel","pulseId","iocMillis","iocNanos","globalMillis","globalNanos","shape","eventCount","value"]}' http://data-api.psi.ch/sf/query
```
### Response example
The response is in CSV.
```text
channel;pulseId;iocMillis;iocNanos;globalMillis;globalNanos;shape;eventCount;value
testChannel1;0;0;0;0;0;[1];1;0
testChannel1;1;10;0;10;0;[1];1;1
testChannel1;2;20;0;20;0;[1];1;2
testChannel1;3;30;0;30;0;[1];1;3
testChannel1;4;40;0;40;0;[1];1;4
testChannel2;0;0;0;0;0;[1];1;0
testChannel2;1;10;0;10;0;[1];1;1
testChannel2;2;20;0;20;0;[1];1;2
testChannel2;3;30;0;30;0;[1];1;3
testChannel2;4;40;0;40;0;[1];1;4
```

View File

@ -26,6 +26,7 @@ dependencies {
exclude group: 'org.slf4j', module: 'log4j-over-slf4j'
}
compile libraries.commons_lang
compile libraries.commons_io
compile libraries.super_csv
compile libraries.super_csv_dozer

View File

@ -31,6 +31,7 @@ import ch.psi.daq.common.concurrent.singleton.Deferred;
import ch.psi.daq.common.json.deserialize.AttributeBasedDeserializer;
import ch.psi.daq.common.ordering.Ordering;
import ch.psi.daq.domain.DataEvent;
import ch.psi.daq.domain.ResponseFormat;
import ch.psi.daq.query.analyzer.QueryAnalyzer;
import ch.psi.daq.query.config.QueryConfig;
import ch.psi.daq.query.model.Aggregation;
@ -123,6 +124,89 @@ public class QueryRestController {
}
/**
* Catch-all query method for getting data from the backend for both JSON and CSV requests.
* <p>
* The {@link DAQQuery} object will be a concrete subclass based on the combination of fields
* defined in the user's query. The {@link AttributeBasedDeserializer} decides which class to
* deserialize the information into and has been configured (see
* QueryRestConfig#afterPropertiesSet) accordingly.
*
* @param query concrete implementation of {@link DAQQuery}
* @param res the {@link HttpServletResponse} instance associated with this request
* @throws IOException thrown if writing to the output stream fails
*/
@RequestMapping(
value = QUERY,
method = RequestMethod.POST,
consumes = {MediaType.APPLICATION_JSON_VALUE})
public void executeQuery(@RequestBody @Valid DAQQuery query, HttpServletResponse res) throws Exception {
try {
LOGGER.debug("Executing query '{}'", query.toString());
if (ResponseFormat.JSON.equals(query.getResponseFormat())) {
// write the response back to the client using java 8 streams
jsonResponseStreamWriter.respond(executeQuery(query), query, res);
} else {
// it's a CSV request
executeQueryCsv(query, res);
}
} catch (Exception e) {
LOGGER.error("Failed to execute query '{}'.", query, e);
throw e;
}
}
/**
* Returns data in CSV format to the client.
*/
private void executeQueryCsv(DAQQuery query, HttpServletResponse res) throws Exception {
if (!(query.getAggregationType() == null || AggregationType.value.equals(query.getAggregationType()))) {
// We allow only no aggregation or value aggregation as
// extrema: nested structure and not clear how to map it to one line
// index: value is an array of Statistics whose size is not clear at initialization time
String message = "CSV export does not support '" + query.getAggregationType() + "'";
LOGGER.warn(message);
throw new IllegalArgumentException(message);
}
try {
LOGGER.debug("Executing query '{}'", query.toString());
// write the response back to the client using java 8 streams
csvResponseStreamWriter.respond(executeQuery(query), query, res);
} catch (Exception e) {
LOGGER.error("Failed to execute query '{}'.", query, e);
throw e;
}
}
public Stream<Entry<String, ?>> executeQuery(DAQQuery query) {
QueryAnalyzer queryAnalizer = queryAnalizerFactory.apply(query);
// all the magic happens here
Stream<Entry<String, Stream<? extends DataEvent>>> channelToDataEvents =
getQueryProcessor(query.getDbMode()).process(queryAnalizer);
// do post-process
Stream<Entry<String, ?>> channelToData = queryAnalizer.postProcess(channelToDataEvents);
return channelToData;
}
private QueryProcessor getQueryProcessor(DBMode dbMode) {
if (DBMode.databuffer.equals(dbMode)) {
return cassandraQueryProcessor.get();
} else if (DBMode.archiverappliance.equals(dbMode)) {
return archiverApplianceQueryProcessor.get();
} else {
LOGGER.error("Unknown DBMode '{}'!", dbMode);
throw new IllegalArgumentException(String.format("Unknown DBMode '%s'", dbMode));
}
}
/**
* Returns the current list of {@link Ordering}s available.
*
@ -193,95 +277,4 @@ public class QueryRestController {
return value.toString();
}).collect(Collectors.toList());
}
/**
* Catch-all query method for getting data from the backend for JSON requests.
* <p>
* The {@link DAQQuery} object will be a concrete subclass based on the combination of fields
* defined in the user's query. The {@link AttributeBasedDeserializer} decides which class to
* deserialize the information into and has been configured (see
* QueryRestConfig#afterPropertiesSet) accordingly.
*
* @param query concrete implementation of {@link DAQQuery}
* @param res the {@link HttpServletResponse} instance associated with this request
* @throws IOException thrown if writing to the output stream fails
*/
@RequestMapping(
value = QUERY,
method = RequestMethod.POST,
consumes = {MediaType.APPLICATION_JSON_VALUE},
produces = {MediaType.APPLICATION_JSON_VALUE})
public void executeQueryJson(@RequestBody @Valid DAQQuery query, HttpServletResponse res) throws Exception {
try {
LOGGER.debug("Executing query '{}'", query.toString());
// write the response back to the client using java 8 streams
jsonResponseStreamWriter.respond(executeQuery(query), query, res);
} catch (Exception t) {
LOGGER.error("Failed to execute query '{}'.", query, t);
throw t;
}
}
/**
* Catch-all query method for getting data from the backend for JSON requests.
* <p>
* The {@link DAQQuery} object will be a concrete subclass based on the combination of fields
* defined in the user's query. The {@link AttributeBasedDeserializer} decides which class to
* deserialize the information into and has been configured (see
* QueryRestConfig#afterPropertiesSet) accordingly.
*
* @param query concrete implementation of {@link DAQQuery}
* @param res the {@link HttpServletResponse} instance associated with this request
* @throws IOException thrown if writing to the output stream fails
*/
@RequestMapping(
value = QUERY,
method = RequestMethod.POST,
consumes = {MediaType.APPLICATION_JSON_VALUE},
produces = {CSVResponseStreamWriter.APPLICATION_CSV_VALUE})
public void executeQueryCsv(@RequestBody @Valid DAQQuery query, HttpServletResponse res) throws Exception {
if (!(query.getAggregationType() == null || AggregationType.value.equals(query.getAggregationType()))) {
// We allow only no aggregation or value aggregation as
// extrema: nested structure and not clear how to map it to one line
// index: value is an array of Statistics whose size is not clear at initialization time
String message = "CSV export does not support '" + query.getAggregationType() + "'";
LOGGER.warn(message);
throw new IllegalArgumentException(message);
}
try {
LOGGER.debug("Executing query '{}'", query.toString());
// write the response back to the client using java 8 streams
csvResponseStreamWriter.respond(executeQuery(query), query, res);
} catch (Exception t) {
LOGGER.error("Failed to execute query '{}'.", query, t);
throw t;
}
}
public Stream<Entry<String, ?>> executeQuery(DAQQuery query) {
QueryAnalyzer queryAnalizer = queryAnalizerFactory.apply(query);
// all the magic happens here
Stream<Entry<String, Stream<? extends DataEvent>>> channelToDataEvents =
getQueryProcessor(query.getDbMode()).process(queryAnalizer);
// do post-process
Stream<Entry<String, ?>> channelToData = queryAnalizer.postProcess(channelToDataEvents);
return channelToData;
}
private QueryProcessor getQueryProcessor(DBMode dbMode) {
if (DBMode.databuffer.equals(dbMode)) {
return cassandraQueryProcessor.get();
} else if (DBMode.archiverappliance.equals(dbMode)) {
return archiverApplianceQueryProcessor.get();
} else {
LOGGER.error("Unknown DBMode '{}'!", dbMode);
throw new IllegalArgumentException(String.format("Unknown DBMode '%s'", dbMode));
}
}
}

View File

@ -0,0 +1,56 @@
/**
*
*/
package ch.psi.daq.queryrest.response;
import java.io.OutputStream;
import java.util.zip.GZIPOutputStream;
import javax.servlet.http.HttpServletResponse;
import org.springframework.http.MediaType;
import ch.psi.daq.query.model.impl.DAQQuery;
/**
* @author zellweger_c
*
*/
public abstract class AbstractResponseStreamWriter implements ResponseStreamWriter {
public static final String CONTENT_TYPE_CSV = "text/csv";
protected static final String CONTENT_TYPE_JSON = MediaType.APPLICATION_JSON_VALUE;
/**
* Configures the output stream and headers according to whether compression is wanted or not.
* <p>
* In order not to lose the information of the underlying type of data being transferred, the
* Content-Type header stays the same but, if compressed, the content-encoding header will be set
* accordingly.
*
* @param query
* @param response
* @return
* @throws Exception
*
* @see http://tools.ietf.org/html/rfc2616#section-14.11 and
* @see http://tools.ietf.org/html/rfc2616#section-3.5
*/
protected OutputStream handleCompressionAndResponseHeaders(DAQQuery query, HttpServletResponse response,
String contentType) throws Exception {
OutputStream out = response.getOutputStream();
response.addHeader("Content-Type", contentType);
if (query.isCompressed()) {
response.addHeader("Content-Disposition", "attachment; filename=data.gz");
response.addHeader("Content-Encoding", "gzip");
out = new GZIPOutputStream(out);
} else {
response.addHeader("Content-Disposition", "attachment; filename=data.csv");
}
return out;
}
}

View File

@ -5,6 +5,7 @@ import java.util.Map.Entry;
import java.util.stream.Stream;
import javax.servlet.ServletResponse;
import javax.servlet.http.HttpServletResponse;
import ch.psi.daq.query.model.impl.DAQQuery;
@ -19,5 +20,5 @@ public interface ResponseStreamWriter {
* @param response {@link ServletResponse} instance given by the current HTTP request
* @throws IOException thrown if writing to the output stream fails
*/
public void respond(Stream<Entry<String, ?>> stream, DAQQuery query, ServletResponse response) throws Exception;
public void respond(Stream<Entry<String, ?>> stream, DAQQuery query, HttpServletResponse response) throws Exception;
}

View File

@ -1,5 +1,8 @@
package ch.psi.daq.queryrest.response.csv;
import java.io.IOException;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.List;
@ -11,6 +14,7 @@ import java.util.stream.Stream;
import javax.annotation.Resource;
import javax.servlet.ServletResponse;
import javax.servlet.http.HttpServletResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -20,22 +24,21 @@ import org.supercsv.io.dozer.CsvDozerBeanWriter;
import org.supercsv.io.dozer.ICsvDozerBeanWriter;
import org.supercsv.prefs.CsvPreference;
import com.fasterxml.jackson.core.JsonEncoding;
import ch.psi.daq.domain.DataEvent;
import ch.psi.daq.query.analyzer.QueryAnalyzer;
import ch.psi.daq.query.model.Aggregation;
import ch.psi.daq.query.model.Query;
import ch.psi.daq.query.model.QueryField;
import ch.psi.daq.query.model.impl.DAQQuery;
import ch.psi.daq.queryrest.response.ResponseStreamWriter;
import ch.psi.daq.queryrest.response.AbstractResponseStreamWriter;
import com.fasterxml.jackson.core.JsonEncoding;
/**
* Takes a Java 8 stream and writes it to the output stream provided by the {@link ServletResponse}
* of the current request.
*/
public class CSVResponseStreamWriter implements ResponseStreamWriter {
public static final String APPLICATION_CSV_VALUE = "text/csv";
public class CSVResponseStreamWriter extends AbstractResponseStreamWriter {
private static final char DELIMITER_CVS = ';';
private static final char DELIMITER_ARRAY = ',';
@ -46,59 +49,100 @@ public class CSVResponseStreamWriter implements ResponseStreamWriter {
private Function<Query, QueryAnalyzer> queryAnalizerFactory;
@Override
public void respond(Stream<Entry<String, ?>> stream, DAQQuery query, ServletResponse response) throws Exception {
public void respond(Stream<Entry<String, ?>> stream, DAQQuery query, HttpServletResponse response) throws Exception {
response.setCharacterEncoding(JsonEncoding.UTF8.getJavaName());
response.setContentType(APPLICATION_CSV_VALUE);
response.setContentType(CONTENT_TYPE_CSV);
respondInternal(stream, query, response);
}
private void respondInternal(Stream<Entry<String, ?>> stream, DAQQuery query, ServletResponse response)
private void respondInternal(Stream<Entry<String, ?>> stream, DAQQuery query, HttpServletResponse response)
throws Exception {
Set<QueryField> queryFields = query.getFields();
List<Aggregation> aggregations = query.getAggregations();
int aggregationsSize = (aggregations != null ? aggregations.size() : 0);
Set<String> fieldMapping =
new LinkedHashSet<>(queryFields.size() + (aggregations != null ? aggregations.size() : 0));
List<String> header =
new ArrayList<>(queryFields.size() + (aggregations != null ? aggregations.size() : 0));
Set<String> fieldMapping = new LinkedHashSet<>(queryFields.size() + aggregationsSize);
List<String> header = new ArrayList<>(queryFields.size() + aggregationsSize);
CellProcessor[] processors = setupCellProcessors(query, fieldMapping, header);
OutputStream out = handleCompressionAndResponseHeaders(query, response, CONTENT_TYPE_CSV);
ICsvDozerBeanWriter beanWriter = setupBeanWriter(fieldMapping, out);
writeToOutput(stream, header, processors, beanWriter);
}
/**
* Sets up the bean writer instance.
* @throws Exception
*
*/
private ICsvDozerBeanWriter setupBeanWriter(Set<String> fieldMapping, OutputStream out) throws Exception {
CsvPreference preference = new CsvPreference.Builder(
(char) CsvPreference.EXCEL_NORTH_EUROPE_PREFERENCE.getQuoteChar(),
DELIMITER_CVS,
CsvPreference.EXCEL_NORTH_EUROPE_PREFERENCE.getEndOfLineSymbols()).build();
ICsvDozerBeanWriter beanWriter = new CsvDozerBeanWriter(new OutputStreamWriter(out), preference);
// configure the mapping from the fields to the CSV columns
beanWriter.configureBeanMapping(DataEvent.class, fieldMapping.toArray(new String[fieldMapping.size()]));
return beanWriter;
}
/**
* Sets up the array of {@link CellProcessor}s needed for later configuration of the bean writer.
*
* Cell processors are an integral part of reading and writing with Super CSV - they automate the
* data type conversions, and enforce constraints. They implement the chain of responsibility
* design pattern - each processor has a single, well-defined purpose and can be chained together
* with other processors to fully automate all of the required conversions and constraint
* validation for a single CSV column.
*
* @param query The current {@link DAQQuery}
* @return Array of {@link CellProcessor} entries
*/
private CellProcessor[] setupCellProcessors(DAQQuery query, Set<String> fieldMapping, List<String> header) {
Set<QueryField> queryFields = query.getFields();
List<Aggregation> aggregations = query.getAggregations();
List<CellProcessor> processorSet =
new ArrayList<>(queryFields.size() + (aggregations != null ? aggregations.size() : 0));
boolean isNewField;
QueryAnalyzer queryAnalyzer = queryAnalizerFactory.apply(query);
for (QueryField field : queryFields) {
if(!(QueryField.value.equals(field) && queryAnalyzer.isAggregationEnabled())){
isNewField = fieldMapping.add(field.name());
if (isNewField) {
header.add(field.name());
processorSet.add(new ArrayProcessor(new NotNull(), DELIMITER_ARRAY));
}
}
}
if (aggregations != null && queryAnalyzer.isAggregationEnabled()) {
for (Aggregation aggregation : query.getAggregations()) {
isNewField = fieldMapping.add("value." + aggregation.name());
if (isNewField) {
header.add(aggregation.name());
processorSet.add(new ArrayProcessor(new NotNull(), DELIMITER_ARRAY));
}
}
}
return processorSet.toArray(new CellProcessor[processorSet.size()]);
}
CellProcessor[] processors = processorSet.toArray(new CellProcessor[processorSet.size()]);
CsvPreference preference = new CsvPreference.Builder(
(char) CsvPreference.EXCEL_NORTH_EUROPE_PREFERENCE.getQuoteChar(),
DELIMITER_CVS,
CsvPreference.EXCEL_NORTH_EUROPE_PREFERENCE.getEndOfLineSymbols()).build();
ICsvDozerBeanWriter beanWriter = new CsvDozerBeanWriter(response.getWriter(), preference);
@SuppressWarnings("unchecked")
private void writeToOutput(Stream<Entry<String, ?>> stream, List<String> header, CellProcessor[] processors,
ICsvDozerBeanWriter beanWriter) throws IOException, Exception {
AtomicReference<Exception> exception = new AtomicReference<>();
try {
// configure the mapping from the fields to the CSV columns
beanWriter.configureBeanMapping(DataEvent.class, fieldMapping.toArray(new String[fieldMapping.size()]));
beanWriter.writeHeader(header.toArray(new String[header.size()]));
@ -106,28 +150,27 @@ public class CSVResponseStreamWriter implements ResponseStreamWriter {
/* ensure elements are sequentially written */
.sequential()
.forEach(
entry -> {
if (entry.getValue() instanceof Stream) {
@SuppressWarnings("unchecked")
Stream<DataEvent> eventStream = (Stream<DataEvent>) entry.getValue();
eventStream
.forEach(
event -> {
try {
beanWriter.write(event, processors);
} catch (Exception e) {
LOGGER.error("Could not write elements for '{}-{}'", event.getChannel(),
event.getPulseId(), e);
exception.compareAndSet(null, e);
}
});
} else {
String message = "Type '" + entry.getValue().getClass() + "' not supported.";
LOGGER.error(message);
exception.compareAndSet(null, new RuntimeException(message));
}
});
entry -> {
if (entry.getValue() instanceof Stream) {
Stream<DataEvent> eventStream = (Stream<DataEvent>) entry.getValue();
eventStream
.forEach(
event -> {
try {
beanWriter.write(event, processors);
} catch (Exception e) {
LOGGER.error("Could not write elements for '{}-{}'", event.getChannel(),
event.getPulseId(), e);
exception.compareAndSet(null, e);
}
});
} else {
String message = "Type '" + entry.getValue().getClass() + "' not supported.";
LOGGER.error(message);
exception.compareAndSet(null, new RuntimeException(message));
}
}
);
} finally {
beanWriter.close();
}

View File

@ -1,20 +1,27 @@
package ch.psi.daq.queryrest.response.json;
import java.io.IOException;
import java.util.Map.Entry;
import java.io.OutputStream;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;
import javax.annotation.Resource;
import javax.servlet.ServletResponse;
import javax.servlet.http.HttpServletResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.MediaType;
import ch.psi.daq.query.model.Aggregation;
import ch.psi.daq.query.model.QueryField;
import ch.psi.daq.query.model.impl.DAQQuery;
import ch.psi.daq.queryrest.response.AbstractResponseStreamWriter;
import com.fasterxml.jackson.core.JsonEncoding;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
@ -23,16 +30,11 @@ import com.fasterxml.jackson.databind.ObjectWriter;
import com.fasterxml.jackson.databind.ser.impl.SimpleBeanPropertyFilter;
import com.fasterxml.jackson.databind.ser.impl.SimpleFilterProvider;
import ch.psi.daq.query.model.Aggregation;
import ch.psi.daq.query.model.QueryField;
import ch.psi.daq.query.model.impl.DAQQuery;
import ch.psi.daq.queryrest.response.ResponseStreamWriter;
/**
* Takes a Java 8 stream and writes it to the output stream provided by the {@link ServletResponse}
* of the current request.
*/
public class JSONResponseStreamWriter implements ResponseStreamWriter {
public class JSONResponseStreamWriter extends AbstractResponseStreamWriter {
private static final String DATA_RESP_FIELD = "data";
@ -45,14 +47,15 @@ public class JSONResponseStreamWriter implements ResponseStreamWriter {
private ObjectMapper mapper;
@Override
public void respond(Stream<Entry<String, ?>> stream, DAQQuery query, ServletResponse response) throws Exception {
public void respond(Stream<Entry<String, ?>> stream, DAQQuery query, HttpServletResponse response) throws Exception {
response.setCharacterEncoding(JsonEncoding.UTF8.getJavaName());
response.setContentType(MediaType.APPLICATION_JSON_VALUE);
Set<String> includedFields = getFields(query);
ObjectWriter writer = configureWriter(includedFields);
respondInternal(stream, response, writer);
respondInternal(stream, response, writer, query);
}
protected Set<String> getFields(DAQQuery query) {
@ -99,11 +102,14 @@ public class JSONResponseStreamWriter implements ResponseStreamWriter {
* @param stream Mapping from channel name to data
* @param response {@link ServletResponse} instance given by the current HTTP request
* @param writer configured writer that includes the fields the end user wants to see
* @param query
* @throws IOException thrown if writing to the output stream fails
*/
private void respondInternal(Stream<Entry<String, ?>> stream, ServletResponse response, ObjectWriter writer)
private void respondInternal(Stream<Entry<String, ?>> stream, HttpServletResponse response, ObjectWriter writer, DAQQuery query)
throws Exception {
JsonGenerator generator = jsonFactory.createGenerator(response.getOutputStream(), JsonEncoding.UTF8);
OutputStream out = handleCompressionAndResponseHeaders(query, response, CONTENT_TYPE_JSON);
JsonGenerator generator = jsonFactory.createGenerator(out, JsonEncoding.UTF8);
AtomicReference<Exception> exception = new AtomicReference<>();
try {

View File

@ -29,13 +29,14 @@ import ch.psi.daq.cassandra.request.range.RequestRangePulseId;
import ch.psi.daq.cassandra.request.range.RequestRangeTime;
import ch.psi.daq.cassandra.util.test.CassandraDataGen;
import ch.psi.daq.common.ordering.Ordering;
import ch.psi.daq.domain.ResponseFormat;
import ch.psi.daq.query.model.Aggregation;
import ch.psi.daq.query.model.AggregationType;
import ch.psi.daq.query.model.QueryField;
import ch.psi.daq.query.model.impl.DAQQuery;
import ch.psi.daq.queryrest.controller.QueryRestController;
import ch.psi.daq.queryrest.filter.CorsFilter;
import ch.psi.daq.queryrest.response.csv.CSVResponseStreamWriter;
import ch.psi.daq.queryrest.response.AbstractResponseStreamWriter;
import ch.psi.daq.test.cassandra.admin.CassandraTestAdmin;
import ch.psi.daq.test.queryrest.AbstractDaqRestTest;
@ -68,6 +69,9 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest {
0,
1),
TEST_CHANNEL_NAMES);
request.setCompressed(false);
request.setResponseFormat(ResponseFormat.CSV);
LinkedHashSet<QueryField> queryFields = new LinkedHashSet<>();
LinkedHashSet<CellProcessor> cellProcessors = new LinkedHashSet<>();
queryFields.add(QueryField.channel);
@ -96,7 +100,6 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest {
MvcResult result = this.mockMvc
.perform(MockMvcRequestBuilders
.post(QueryRestController.QUERY)
.accept(CSVResponseStreamWriter.APPLICATION_CSV_VALUE)
.contentType(MediaType.APPLICATION_JSON)
.content(content))
.andDo(MockMvcResultHandlers.print())
@ -143,6 +146,9 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest {
0,
1),
channelName);
request.setCompressed(false);
request.setResponseFormat(ResponseFormat.CSV);
LinkedHashSet<QueryField> queryFields = new LinkedHashSet<>();
LinkedHashSet<CellProcessor> cellProcessors = new LinkedHashSet<>();
queryFields.add(QueryField.channel);
@ -171,7 +177,6 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest {
MvcResult result = this.mockMvc
.perform(MockMvcRequestBuilders
.post(QueryRestController.QUERY)
.accept(CSVResponseStreamWriter.APPLICATION_CSV_VALUE)
.contentType(MediaType.APPLICATION_JSON)
.content(content))
.andDo(MockMvcResultHandlers.print())
@ -213,6 +218,9 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest {
0,
10),
TEST_CHANNEL_NAMES);
request.setCompressed(false);
request.setResponseFormat(ResponseFormat.CSV);
LinkedHashSet<QueryField> queryFields = new LinkedHashSet<>();
LinkedHashSet<CellProcessor> cellProcessors = new LinkedHashSet<>();
queryFields.add(QueryField.channel);
@ -241,7 +249,6 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest {
MvcResult result = this.mockMvc
.perform(MockMvcRequestBuilders
.post(QueryRestController.QUERY)
.accept(CSVResponseStreamWriter.APPLICATION_CSV_VALUE)
.contentType(MediaType.APPLICATION_JSON)
.content(content))
.andDo(MockMvcResultHandlers.print())
@ -289,6 +296,9 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest {
startDate,
endDate),
TEST_CHANNEL_NAMES);
request.setCompressed(false);
request.setResponseFormat(ResponseFormat.CSV);
LinkedHashSet<QueryField> queryFields = new LinkedHashSet<>();
LinkedHashSet<CellProcessor> cellProcessors = new LinkedHashSet<>();
queryFields.add(QueryField.channel);
@ -317,7 +327,6 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest {
MvcResult result = this.mockMvc
.perform(MockMvcRequestBuilders
.post(QueryRestController.QUERY)
.accept(CSVResponseStreamWriter.APPLICATION_CSV_VALUE)
.contentType(MediaType.APPLICATION_JSON)
.content(content))
.andDo(MockMvcResultHandlers.print())
@ -366,6 +375,8 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest {
Ordering.asc,
AggregationType.extrema,
TEST_CHANNEL_NAMES[0]);
request.setCompressed(false);
request.setResponseFormat(ResponseFormat.CSV);
String content = mapper.writeValueAsString(request);
@ -373,7 +384,6 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest {
this.mockMvc
.perform(MockMvcRequestBuilders
.post(QueryRestController.QUERY)
.accept(CSVResponseStreamWriter.APPLICATION_CSV_VALUE)
.contentType(MediaType.APPLICATION_JSON)
.content(content))
.andDo(MockMvcResultHandlers.print());
@ -397,6 +407,8 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest {
Ordering.asc,
AggregationType.index,
TEST_CHANNEL_NAMES[0]);
request.setCompressed(false);
request.setResponseFormat(ResponseFormat.CSV);
String content = mapper.writeValueAsString(request);
@ -404,7 +416,6 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest {
this.mockMvc
.perform(MockMvcRequestBuilders
.post(QueryRestController.QUERY)
.accept(CSVResponseStreamWriter.APPLICATION_CSV_VALUE)
.contentType(MediaType.APPLICATION_JSON)
.content(content))
.andDo(MockMvcResultHandlers.print());
@ -430,6 +441,8 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest {
endDate),
TEST_CHANNEL_01);
request.setNrOfBins(2);
request.setCompressed(false);
request.setResponseFormat(ResponseFormat.CSV);
LinkedHashSet<QueryField> queryFields = new LinkedHashSet<>();
LinkedHashSet<CellProcessor> cellProcessors = new LinkedHashSet<>();
@ -467,7 +480,6 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest {
MvcResult result = this.mockMvc
.perform(MockMvcRequestBuilders
.post(QueryRestController.QUERY)
.accept(CSVResponseStreamWriter.APPLICATION_CSV_VALUE)
.contentType(MediaType.APPLICATION_JSON)
.content(content))
.andDo(MockMvcResultHandlers.print())
@ -515,6 +527,8 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest {
endDate),
TEST_CHANNEL_01);
request.setBinSize(100);
request.setCompressed(false);
request.setResponseFormat(ResponseFormat.CSV);
LinkedHashSet<QueryField> queryFields = new LinkedHashSet<>();
LinkedHashSet<CellProcessor> cellProcessors = new LinkedHashSet<>();
@ -552,7 +566,6 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest {
MvcResult result = this.mockMvc
.perform(MockMvcRequestBuilders
.post(QueryRestController.QUERY)
.accept(CSVResponseStreamWriter.APPLICATION_CSV_VALUE)
.contentType(MediaType.APPLICATION_JSON)
.content(content))
.andDo(MockMvcResultHandlers.print())

View File

@ -173,6 +173,7 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest {
10,
11),
TEST_CHANNEL_NAMES);
request.setCompressed(false);
String content = mapper.writeValueAsString(request);
System.out.println(content);
@ -180,7 +181,6 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest {
this.mockMvc
.perform(MockMvcRequestBuilders
.post(QueryRestController.QUERY)
.accept(MediaType.APPLICATION_JSON)
.contentType(MediaType.APPLICATION_JSON)
.content(content))
@ -209,12 +209,12 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest {
100,
110),
TEST_CHANNEL_NAMES);
request.setCompressed(false);
String content = mapper.writeValueAsString(request);
this.mockMvc.perform(MockMvcRequestBuilders
.post(QueryRestController.QUERY)
.accept(MediaType.APPLICATION_JSON)
.contentType(MediaType.APPLICATION_JSON)
.content(content))
@ -246,6 +246,7 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest {
startDate,
endDate),
TEST_CHANNEL_NAMES);
request.setCompressed(false);
String content = mapper.writeValueAsString(request);
System.out.println(content);
@ -254,7 +255,6 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest {
.perform(
MockMvcRequestBuilders
.post(QueryRestController.QUERY)
.accept(MediaType.APPLICATION_JSON)
.contentType(MediaType.APPLICATION_JSON)
.content(content)
)
@ -287,12 +287,12 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest {
Ordering.asc,
AggregationType.extrema,
TEST_CHANNEL_NAMES[0]);
request.setCompressed(false);
String content = mapper.writeValueAsString(request);
this.mockMvc
.perform(MockMvcRequestBuilders.post(QueryRestController.QUERY)
.accept(MediaType.APPLICATION_JSON)
.contentType(MediaType.APPLICATION_JSON)
.content(content))
@ -331,6 +331,7 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest {
endDate),
TEST_CHANNEL_01);
request.setNrOfBins(2);
request.setCompressed(false);
String content = mapper.writeValueAsString(request);
System.out.println(content);
@ -339,7 +340,6 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest {
.perform(
MockMvcRequestBuilders
.post(QueryRestController.QUERY)
.accept(MediaType.APPLICATION_JSON)
.contentType(MediaType.APPLICATION_JSON)
.content(content)
)
@ -369,6 +369,7 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest {
endDate),
TEST_CHANNEL_01);
request.setBinSize(100);
request.setCompressed(false);
String content = mapper.writeValueAsString(request);
System.out.println(content);
@ -377,7 +378,6 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest {
.perform(
MockMvcRequestBuilders
.post(QueryRestController.QUERY)
.accept(MediaType.APPLICATION_JSON)
.contentType(MediaType.APPLICATION_JSON)
.content(content)
)