From 01a74f9e6b7bfd15b280a55c7d1bcd4413074675 Mon Sep 17 00:00:00 2001 From: Zellweger Christof Ralf Date: Mon, 21 Dec 2015 10:09:26 +0100 Subject: [PATCH 1/5] sf_daq/ch.psi.daq.queryrest#1 - implementing compression of result streams - allowing for different responseFormats (JSON, CSV) --- .project | 6 + Readme.md | 129 ++++++++----- build.gradle | 1 + .../controller/QueryRestController.java | 175 +++++++++--------- .../AbstractResponseStreamWriter.java | 56 ++++++ .../response/ResponseStreamWriter.java | 3 +- .../response/csv/CSVResponseStreamWriter.java | 135 +++++++++----- .../json/JSONResponseStreamWriter.java | 28 +-- .../QueryRestControllerCsvTest.java | 31 +++- .../QueryRestControllerJsonTest.java | 12 +- 10 files changed, 364 insertions(+), 212 deletions(-) create mode 100644 src/main/java/ch/psi/daq/queryrest/response/AbstractResponseStreamWriter.java diff --git a/.project b/.project index 56f8001..08328ec 100644 --- a/.project +++ b/.project @@ -5,6 +5,11 @@ + + org.eclipse.wst.common.project.facet.core.builder + + + org.eclipse.jdt.core.javabuilder @@ -20,5 +25,6 @@ org.springframework.ide.eclipse.core.springnature org.springsource.ide.eclipse.gradle.core.nature org.eclipse.jdt.core.javanature + org.eclipse.wst.common.project.facet.core.nature diff --git a/Readme.md b/Readme.md index 4e72f5e..1824057 100644 --- a/Readme.md +++ b/Readme.md @@ -48,6 +48,7 @@ The REST interface is accessible through `http://data-api.psi.ch/sf`. + ## Query Channel Names ### Request @@ -71,14 +72,14 @@ POST http://:/channels ### Example ```bash -curl -H "Accept: application/json" -H "Content-Type: application/json" -X POST -d '{"regex": "TRFCA|TRFCB"}' http://data-api.psi.ch/sf/channels +curl -H "Content-Type: application/json" -X POST -d '{"regex": "TRFCA|TRFCB"}' http://data-api.psi.ch/sf/channels ``` ## Query Range -Queries are applied to a range. Following ranges are supported. +Queries are applied to a range. The following types of ranges ranges are supported. ### By Pulse-Id @@ -131,17 +132,68 @@ Queries are applied to a range. Following ranges are supported. ## Query Data -### Request +### `compressed`: data is compressed by default + +To save bandwidth, all data transferred from the server to the client is compressed (gzipped) by default. In case compressing the data is too processor-intense, it can be disabled by specifying `compressed=false` in the body of the request. + +Because of this, we have to tell `curl` that the data is compressed so that it is being decompressed automatically. `curl` decompresses the response when the `--compressed` parameter is set: + +#### Example + +```bash +curl --compressed -H "Content-Type: application/json" -X POST -d '{"range":{"startPulseId":0,"endPulseId":3},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query +``` + +If we want the raw data uncompressed from the server, we have to specify this in the query body parameter with by specifying `compressed=false`: + +```bash +curl -H "Content-Type: application/json" -X POST -d '{"compressed":false,"range":{"startPulseId":0,"endPulseId":3},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query +``` + + +### `responseFormat`: data is in JSON by default + +Responses can be formatted as CSV or JSON using the `responseFormat` field. The returned data is JSON-formatted per default. + +CSV export does not support `index` and `extrema` aggregations. + +#### Example + +```bash +curl --compressed -H "Content-Type: application/json" -X POST -d '{"responseFormat":"csv","range":{"startPulseId":0,"endPulseId":4},"channels":["channel1","channel2"],"fields":["channel","pulseId","iocMillis","iocNanos","globalMillis","globalNanos","shape","eventCount","value"]}' http://data-api.psi.ch/sf/query +``` + +#### Response example + +The response is in CSV. + +```text +channel;pulseId;iocMillis;iocNanos;globalMillis;globalNanos;shape;eventCount;value +testChannel1;0;0;0;0;0;[1];1;0 +testChannel1;1;10;0;10;0;[1];1;1 +testChannel1;2;20;0;20;0;[1];1;2 +testChannel1;3;30;0;30;0;[1];1;3 +testChannel1;4;40;0;40;0;[1];1;4 +testChannel2;0;0;0;0;0;[1];1;0 +testChannel2;1;10;0;10;0;[1];1;1 +testChannel2;2;20;0;20;0;[1];1;2 +testChannel2;3;30;0;30;0;[1];1;3 +testChannel2;4;40;0;40;0;[1];1;4 +``` + + + +### Query request endpoint ``` GET http://:/query ``` -#### Data +#### Request body -A request is performed using JSON. The JSON query defines the channels to be queried, the range, and how the data should be aggregated (this is optional but highly recommended). +A request is performed by sending a valid JSON object in the HTTP request body. The JSON query defines the channels to be queried, the range, and how the data should be aggregated (this is optional but highly recommended). -There exist following fields: +The following attributes can be specified: - **channels**: Array of channel names to be queried. - **range**: The range of the query (see [Query Range](Readme.md#query_range)). @@ -153,12 +205,21 @@ There exist following fields: - **aggregationType**: Specifies the type of aggregation (see [here](https://github.psi.ch/projects/ST/repos/ch.psi.daq.query/browse/src/main/java/ch/psi/daq/query/model/AggregationType.java)). The default type is *value* aggregation (e.g., sum([1,2,3])=6). Alternatively, it is possible to define *index* aggregation for multiple arrays in combination with binning (e.g., sum([1,2,3], [3,2,1]) = [4,4,4]). - **aggregateChannels**: Specifies whether the data of the requested channels should be combined together using the defined aggregation (values: true|**false**) - **dbMode**: Defines the database to access (values: **databuffer**|archiverappliance) - +- **compressed**: Defines whether the response should be compressed or not (values: **true**|false) +- **responseFormat**: Specifies the format the response of the requested data is in, either in JSON or CSV format (values: **json**|csv) ### Example +Compressed data but uncompressed by `curl`: + ```bash -curl -H "Accept: application/json" -H "Content-Type: application/json" -X POST -d '{"range":{"startPulseId":0,"endPulseId":4},"channels":["channel1","channel2"]}' http://data-api.psi.ch/sf/query +curl --compressed -H "Content-Type: application/json" -X POST -d '{"range":{"startPulseId":0,"endPulseId":4},"channels":["channel1","channel2"]}' http://data-api.psi.ch/sf/query +``` + +Raw, uncompressed data (returns non-human-readable data): + +```bash +curl -H "Content-Type: application/json" -X POST -d '{"compressed": false,"range":{"startPulseId":0,"endPulseId":4},"channels":["channel1","channel2"]}' http://data-api.psi.ch/sf/query ``` ### Response example @@ -222,7 +283,7 @@ The response is in JSON. ### Example Queries -Following examples build on a waveform data (see below). They also work for scalars (consider it as a waveform of length = 1) and images (waveform of length = dimX * dimY). +The following examples build on waveform data (see below). They also work for scalars (consider it as a waveform of length = 1) and images (waveform of length = dimX * dimY). ![Data Visualization](doc/images/Data_Visualization.png) @@ -299,7 +360,7 @@ Following examples build on a waveform data (see below). They also work for scal ###### Command ```bash -curl -H "Accept: application/json" -H "Content-Type: application/json" -X POST -d '{"range":{"startPulseId":0,"endPulseId":3},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query +curl --compressed -H "Content-Type: application/json" -X POST -d '{"range":{"startPulseId":0,"endPulseId":3},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query ``` ###### Response @@ -327,7 +388,7 @@ See JSON representation of the data above. ###### Command ```bash -curl -H "Accept: application/json" -H "Content-Type: application/json" -X POST -d '{"range":{"startMillis":0,"startNanos":0,"endMillis":30,"endNanos":999999},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query +curl --compressed -H "Content-Type: application/json" -X POST -d '{"range":{"startMillis":0,"startNanos":0,"endMillis":30,"endNanos":999999},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query ``` ###### Response @@ -357,7 +418,7 @@ Supported format is ISO8601 *YYYY-MM-DDThh:mm:ss.sTZD* (e.g. *1997-07-16T19:20:3 ###### Command ```bash -curl -H "Accept: application/json" -H "Content-Type: application/json" -X POST -d '{"range":{"startDate":"1970-01-01T01:00:00.000","startNanos":0,"endDate":"1970-01-01T01:00:00.030","endNanos":999999},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query +curl --compressed -H "Content-Type: application/json" -X POST -d '{"range":{"startDate":"1970-01-01T01:00:00.000","startNanos":0,"endDate":"1970-01-01T01:00:00.030","endNanos":999999},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query ``` ###### Response @@ -389,7 +450,7 @@ Archiver Appliance supports queries by *time range* and *date range* only (as it ###### Command ```bash -curl -H "Accept: application/json" -H "Content-Type: application/json" -X POST -d '{"dbmode":"archiverappliance","range":{"startMillis":0,"startNanos":0,"endMillis":30,"endNanos":999999},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query +curl --compressed -H "Content-Type: application/json" -X POST -d '{"dbmode":"archiverappliance","range":{"startMillis":0,"startNanos":0,"endMillis":30,"endNanos":999999},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query ``` ###### Response @@ -418,7 +479,7 @@ Allows for server side optimizations since not all data needs to be retrieved. ###### Command ```bash -curl -H "Accept: application/json" -H "Content-Type: application/json" -X POST -d '{"fields":["pulseId","value"],"range":{"startPulseId":0,"endPulseId":3},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query +curl --compressed -H "Content-Type: application/json" -X POST -d '{"fields":["pulseId","value"],"range":{"startPulseId":0,"endPulseId":3},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query ``` ###### Response @@ -471,7 +532,7 @@ Use **none** in case ordering does not matter (allows for server side optimizati ###### Command ```bash -curl -H "Accept: application/json" -H "Content-Type: application/json" -X POST -d '{"ordering":"desc","fields":["pulseId","value"],"range":{"startPulseId":0,"endPulseId":3},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query +curl --compressed -H "Content-Type: application/json" -X POST -d '{"ordering":"desc","fields":["pulseId","value"],"range":{"startPulseId":0,"endPulseId":3},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query ``` ###### Response @@ -524,7 +585,7 @@ curl -H "Accept: application/json" -H "Content-Type: application/json" -X POST - ###### Command ```bash -curl -H "Accept: application/json" -H "Content-Type: application/json" -X POST -d '{"aggregationType":"value","aggregations":["min","max","mean"],"fields":["pulseId","value"],"range":{"startPulseId":0,"endPulseId":3},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query +curl --compressed -H "Content-Type: application/json" -X POST -d '{"aggregationType":"value","aggregations":["min","max","mean"],"fields":["pulseId","value"],"range":{"startPulseId":0,"endPulseId":3},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query ``` ###### Response @@ -598,7 +659,7 @@ Array value [aggregations](https://github.psi.ch/projects/ST/repos/ch.psi.daq.qu ###### Command ```bash -curl -H "Accept: application/json" -H "Content-Type: application/json" -X POST -d '{"nrOfBins":2,"aggregationType":"value","aggregations":["min","max","mean"],"fields":["pulseId","value"],"range":{"startPulseId":0,"endPulseId":3},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query +curl --compressed -H "Content-Type: application/json" -X POST -d '{"nrOfBins":2,"aggregationType":"value","aggregations":["min","max","mean"],"fields":["pulseId","value"],"range":{"startPulseId":0,"endPulseId":3},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query ``` ###### Response @@ -657,7 +718,7 @@ Array value [aggregations](https://github.psi.ch/projects/ST/repos/ch.psi.daq.qu ###### Command ```bash -curl -H "Accept: application/json" -H "Content-Type: application/json" -X POST -d '{"binSize":10,"aggregationType":"value","aggregations":["min","max","mean"],"fields":["globalMillis","value"],"range":{"globalMillis":0,"globalMillis":3},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query +curl --compressed -H "Content-Type: application/json" -X POST -d '{"binSize":10,"aggregationType":"value","aggregations":["min","max","mean"],"fields":["globalMillis","value"],"range":{"globalMillis":0,"globalMillis":3},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query ``` ###### Response @@ -714,7 +775,7 @@ Array value [aggregations](https://github.psi.ch/projects/ST/repos/ch.psi.daq.qu ###### Command ```bash -curl -H "Accept: application/json" -H "Content-Type: application/json" -X POST -d '{"nrOfBins":1,"aggregationType":"index","aggregations":["min","max","mean","sum"],"fields":["pulseId","value"],"range":{"startPulseId":0,"endPulseId":3},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query +curl --compressed -H "Content-Type: application/json" -X POST -d '{"nrOfBins":1,"aggregationType":"index","aggregations":["min","max","mean","sum"],"fields":["pulseId","value"],"range":{"startPulseId":0,"endPulseId":3},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query ``` ###### Response @@ -783,7 +844,7 @@ curl -H "Accept: application/json" -H "Content-Type: application/json" -X POST - ###### Command ```bash -curl -H "Accept: application/json" -H "Content-Type: application/json" -X POST -d '{"aggregationType":"extrema","aggregations":["min","max","sum"],"fields":["pulseId","value"],"range":{"startPulseId":0,"endPulseId":3},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query +curl --compressed -H "Content-Type: application/json" -X POST -d '{"aggregationType":"extrema","aggregations":["min","max","sum"],"fields":["pulseId","value"],"range":{"startPulseId":0,"endPulseId":3},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query ``` ###### Response @@ -830,31 +891,3 @@ curl -H "Accept: application/json" -H "Content-Type: application/json" -X POST - ] ``` -### CSV Export - -Responses can be formatted as csv by requesting `text/csv`. -CSV export does not support `index` and `extrema` aggregations. - -### Example - -```bash -curl -H "Accept: text/csv" -H "Content-Type: application/json" -X POST -d '{"range":{"startPulseId":0,"endPulseId":4},"channels":["channel1","channel2"],"fields":["channel","pulseId","iocMillis","iocNanos","globalMillis","globalNanos","shape","eventCount","value"]}' http://data-api.psi.ch/sf/query -``` - -### Response example - -The response is in CSV. - -```text -channel;pulseId;iocMillis;iocNanos;globalMillis;globalNanos;shape;eventCount;value -testChannel1;0;0;0;0;0;[1];1;0 -testChannel1;1;10;0;10;0;[1];1;1 -testChannel1;2;20;0;20;0;[1];1;2 -testChannel1;3;30;0;30;0;[1];1;3 -testChannel1;4;40;0;40;0;[1];1;4 -testChannel2;0;0;0;0;0;[1];1;0 -testChannel2;1;10;0;10;0;[1];1;1 -testChannel2;2;20;0;20;0;[1];1;2 -testChannel2;3;30;0;30;0;[1];1;3 -testChannel2;4;40;0;40;0;[1];1;4 -``` diff --git a/build.gradle b/build.gradle index 677e97c..e5ed3ed 100644 --- a/build.gradle +++ b/build.gradle @@ -26,6 +26,7 @@ dependencies { exclude group: 'org.slf4j', module: 'log4j-over-slf4j' } compile libraries.commons_lang + compile libraries.commons_io compile libraries.super_csv compile libraries.super_csv_dozer diff --git a/src/main/java/ch/psi/daq/queryrest/controller/QueryRestController.java b/src/main/java/ch/psi/daq/queryrest/controller/QueryRestController.java index 0e511a2..ea02349 100644 --- a/src/main/java/ch/psi/daq/queryrest/controller/QueryRestController.java +++ b/src/main/java/ch/psi/daq/queryrest/controller/QueryRestController.java @@ -31,6 +31,7 @@ import ch.psi.daq.common.concurrent.singleton.Deferred; import ch.psi.daq.common.json.deserialize.AttributeBasedDeserializer; import ch.psi.daq.common.ordering.Ordering; import ch.psi.daq.domain.DataEvent; +import ch.psi.daq.domain.ResponseFormat; import ch.psi.daq.query.analyzer.QueryAnalyzer; import ch.psi.daq.query.config.QueryConfig; import ch.psi.daq.query.model.Aggregation; @@ -123,6 +124,89 @@ public class QueryRestController { } + /** + * Catch-all query method for getting data from the backend for both JSON and CSV requests. + *

+ * The {@link DAQQuery} object will be a concrete subclass based on the combination of fields + * defined in the user's query. The {@link AttributeBasedDeserializer} decides which class to + * deserialize the information into and has been configured (see + * QueryRestConfig#afterPropertiesSet) accordingly. + * + * @param query concrete implementation of {@link DAQQuery} + * @param res the {@link HttpServletResponse} instance associated with this request + * @throws IOException thrown if writing to the output stream fails + */ + @RequestMapping( + value = QUERY, + method = RequestMethod.POST, + consumes = {MediaType.APPLICATION_JSON_VALUE}) + public void executeQuery(@RequestBody @Valid DAQQuery query, HttpServletResponse res) throws Exception { + try { + LOGGER.debug("Executing query '{}'", query.toString()); + + if (ResponseFormat.JSON.equals(query.getResponseFormat())) { + // write the response back to the client using java 8 streams + jsonResponseStreamWriter.respond(executeQuery(query), query, res); + } else { + // it's a CSV request + executeQueryCsv(query, res); + } + + } catch (Exception e) { + LOGGER.error("Failed to execute query '{}'.", query, e); + throw e; + } + } + + /** + * Returns data in CSV format to the client. + */ + private void executeQueryCsv(DAQQuery query, HttpServletResponse res) throws Exception { + + if (!(query.getAggregationType() == null || AggregationType.value.equals(query.getAggregationType()))) { + // We allow only no aggregation or value aggregation as + // extrema: nested structure and not clear how to map it to one line + // index: value is an array of Statistics whose size is not clear at initialization time + String message = "CSV export does not support '" + query.getAggregationType() + "'"; + LOGGER.warn(message); + throw new IllegalArgumentException(message); + } + + try { + LOGGER.debug("Executing query '{}'", query.toString()); + // write the response back to the client using java 8 streams + csvResponseStreamWriter.respond(executeQuery(query), query, res); + } catch (Exception e) { + LOGGER.error("Failed to execute query '{}'.", query, e); + throw e; + } + + } + + public Stream> executeQuery(DAQQuery query) { + QueryAnalyzer queryAnalizer = queryAnalizerFactory.apply(query); + + // all the magic happens here + Stream>> channelToDataEvents = + getQueryProcessor(query.getDbMode()).process(queryAnalizer); + + // do post-process + Stream> channelToData = queryAnalizer.postProcess(channelToDataEvents); + + return channelToData; + } + + private QueryProcessor getQueryProcessor(DBMode dbMode) { + if (DBMode.databuffer.equals(dbMode)) { + return cassandraQueryProcessor.get(); + } else if (DBMode.archiverappliance.equals(dbMode)) { + return archiverApplianceQueryProcessor.get(); + } else { + LOGGER.error("Unknown DBMode '{}'!", dbMode); + throw new IllegalArgumentException(String.format("Unknown DBMode '%s'", dbMode)); + } + } + /** * Returns the current list of {@link Ordering}s available. * @@ -193,95 +277,4 @@ public class QueryRestController { return value.toString(); }).collect(Collectors.toList()); } - - /** - * Catch-all query method for getting data from the backend for JSON requests. - *

- * The {@link DAQQuery} object will be a concrete subclass based on the combination of fields - * defined in the user's query. The {@link AttributeBasedDeserializer} decides which class to - * deserialize the information into and has been configured (see - * QueryRestConfig#afterPropertiesSet) accordingly. - * - * @param query concrete implementation of {@link DAQQuery} - * @param res the {@link HttpServletResponse} instance associated with this request - * @throws IOException thrown if writing to the output stream fails - */ - @RequestMapping( - value = QUERY, - method = RequestMethod.POST, - consumes = {MediaType.APPLICATION_JSON_VALUE}, - produces = {MediaType.APPLICATION_JSON_VALUE}) - public void executeQueryJson(@RequestBody @Valid DAQQuery query, HttpServletResponse res) throws Exception { - try { - LOGGER.debug("Executing query '{}'", query.toString()); - - // write the response back to the client using java 8 streams - jsonResponseStreamWriter.respond(executeQuery(query), query, res); - } catch (Exception t) { - LOGGER.error("Failed to execute query '{}'.", query, t); - throw t; - } - } - - /** - * Catch-all query method for getting data from the backend for JSON requests. - *

- * The {@link DAQQuery} object will be a concrete subclass based on the combination of fields - * defined in the user's query. The {@link AttributeBasedDeserializer} decides which class to - * deserialize the information into and has been configured (see - * QueryRestConfig#afterPropertiesSet) accordingly. - * - * @param query concrete implementation of {@link DAQQuery} - * @param res the {@link HttpServletResponse} instance associated with this request - * @throws IOException thrown if writing to the output stream fails - */ - @RequestMapping( - value = QUERY, - method = RequestMethod.POST, - consumes = {MediaType.APPLICATION_JSON_VALUE}, - produces = {CSVResponseStreamWriter.APPLICATION_CSV_VALUE}) - public void executeQueryCsv(@RequestBody @Valid DAQQuery query, HttpServletResponse res) throws Exception { - if (!(query.getAggregationType() == null || AggregationType.value.equals(query.getAggregationType()))) { - // We allow only no aggregation or value aggregation as - // extrema: nested structure and not clear how to map it to one line - // index: value is an array of Statistics whose size is not clear at initialization time - String message = "CSV export does not support '" + query.getAggregationType() + "'"; - LOGGER.warn(message); - throw new IllegalArgumentException(message); - } - - try { - LOGGER.debug("Executing query '{}'", query.toString()); - - // write the response back to the client using java 8 streams - csvResponseStreamWriter.respond(executeQuery(query), query, res); - } catch (Exception t) { - LOGGER.error("Failed to execute query '{}'.", query, t); - throw t; - } - } - - public Stream> executeQuery(DAQQuery query) { - QueryAnalyzer queryAnalizer = queryAnalizerFactory.apply(query); - - // all the magic happens here - Stream>> channelToDataEvents = - getQueryProcessor(query.getDbMode()).process(queryAnalizer); - - // do post-process - Stream> channelToData = queryAnalizer.postProcess(channelToDataEvents); - - return channelToData; - } - - private QueryProcessor getQueryProcessor(DBMode dbMode) { - if (DBMode.databuffer.equals(dbMode)) { - return cassandraQueryProcessor.get(); - } else if (DBMode.archiverappliance.equals(dbMode)) { - return archiverApplianceQueryProcessor.get(); - } else { - LOGGER.error("Unknown DBMode '{}'!", dbMode); - throw new IllegalArgumentException(String.format("Unknown DBMode '%s'", dbMode)); - } - } } diff --git a/src/main/java/ch/psi/daq/queryrest/response/AbstractResponseStreamWriter.java b/src/main/java/ch/psi/daq/queryrest/response/AbstractResponseStreamWriter.java new file mode 100644 index 0000000..764f53b --- /dev/null +++ b/src/main/java/ch/psi/daq/queryrest/response/AbstractResponseStreamWriter.java @@ -0,0 +1,56 @@ +/** + * + */ +package ch.psi.daq.queryrest.response; + +import java.io.OutputStream; +import java.util.zip.GZIPOutputStream; + +import javax.servlet.http.HttpServletResponse; + +import org.springframework.http.MediaType; + +import ch.psi.daq.query.model.impl.DAQQuery; + + +/** + * @author zellweger_c + * + */ +public abstract class AbstractResponseStreamWriter implements ResponseStreamWriter { + + public static final String CONTENT_TYPE_CSV = "text/csv"; + protected static final String CONTENT_TYPE_JSON = MediaType.APPLICATION_JSON_VALUE; + + /** + * Configures the output stream and headers according to whether compression is wanted or not. + *

+ * In order not to lose the information of the underlying type of data being transferred, the + * Content-Type header stays the same but, if compressed, the content-encoding header will be set + * accordingly. + * + * @param query + * @param response + * @return + * @throws Exception + * + * @see http://tools.ietf.org/html/rfc2616#section-14.11 and + * @see http://tools.ietf.org/html/rfc2616#section-3.5 + */ + protected OutputStream handleCompressionAndResponseHeaders(DAQQuery query, HttpServletResponse response, + String contentType) throws Exception { + OutputStream out = response.getOutputStream(); + + response.addHeader("Content-Type", contentType); + if (query.isCompressed()) { + response.addHeader("Content-Disposition", "attachment; filename=data.gz"); + response.addHeader("Content-Encoding", "gzip"); + out = new GZIPOutputStream(out); + } else { + response.addHeader("Content-Disposition", "attachment; filename=data.csv"); + } + + return out; + } + +} diff --git a/src/main/java/ch/psi/daq/queryrest/response/ResponseStreamWriter.java b/src/main/java/ch/psi/daq/queryrest/response/ResponseStreamWriter.java index ad881fb..53d6be3 100644 --- a/src/main/java/ch/psi/daq/queryrest/response/ResponseStreamWriter.java +++ b/src/main/java/ch/psi/daq/queryrest/response/ResponseStreamWriter.java @@ -5,6 +5,7 @@ import java.util.Map.Entry; import java.util.stream.Stream; import javax.servlet.ServletResponse; +import javax.servlet.http.HttpServletResponse; import ch.psi.daq.query.model.impl.DAQQuery; @@ -19,5 +20,5 @@ public interface ResponseStreamWriter { * @param response {@link ServletResponse} instance given by the current HTTP request * @throws IOException thrown if writing to the output stream fails */ - public void respond(Stream> stream, DAQQuery query, ServletResponse response) throws Exception; + public void respond(Stream> stream, DAQQuery query, HttpServletResponse response) throws Exception; } diff --git a/src/main/java/ch/psi/daq/queryrest/response/csv/CSVResponseStreamWriter.java b/src/main/java/ch/psi/daq/queryrest/response/csv/CSVResponseStreamWriter.java index eec6674..7d762b4 100644 --- a/src/main/java/ch/psi/daq/queryrest/response/csv/CSVResponseStreamWriter.java +++ b/src/main/java/ch/psi/daq/queryrest/response/csv/CSVResponseStreamWriter.java @@ -1,5 +1,8 @@ package ch.psi.daq.queryrest.response.csv; +import java.io.IOException; +import java.io.OutputStream; +import java.io.OutputStreamWriter; import java.util.ArrayList; import java.util.LinkedHashSet; import java.util.List; @@ -11,6 +14,7 @@ import java.util.stream.Stream; import javax.annotation.Resource; import javax.servlet.ServletResponse; +import javax.servlet.http.HttpServletResponse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -20,22 +24,21 @@ import org.supercsv.io.dozer.CsvDozerBeanWriter; import org.supercsv.io.dozer.ICsvDozerBeanWriter; import org.supercsv.prefs.CsvPreference; -import com.fasterxml.jackson.core.JsonEncoding; - import ch.psi.daq.domain.DataEvent; import ch.psi.daq.query.analyzer.QueryAnalyzer; import ch.psi.daq.query.model.Aggregation; import ch.psi.daq.query.model.Query; import ch.psi.daq.query.model.QueryField; import ch.psi.daq.query.model.impl.DAQQuery; -import ch.psi.daq.queryrest.response.ResponseStreamWriter; +import ch.psi.daq.queryrest.response.AbstractResponseStreamWriter; + +import com.fasterxml.jackson.core.JsonEncoding; /** * Takes a Java 8 stream and writes it to the output stream provided by the {@link ServletResponse} * of the current request. */ -public class CSVResponseStreamWriter implements ResponseStreamWriter { - public static final String APPLICATION_CSV_VALUE = "text/csv"; +public class CSVResponseStreamWriter extends AbstractResponseStreamWriter { private static final char DELIMITER_CVS = ';'; private static final char DELIMITER_ARRAY = ','; @@ -46,59 +49,100 @@ public class CSVResponseStreamWriter implements ResponseStreamWriter { private Function queryAnalizerFactory; @Override - public void respond(Stream> stream, DAQQuery query, ServletResponse response) throws Exception { + public void respond(Stream> stream, DAQQuery query, HttpServletResponse response) throws Exception { response.setCharacterEncoding(JsonEncoding.UTF8.getJavaName()); - response.setContentType(APPLICATION_CSV_VALUE); + response.setContentType(CONTENT_TYPE_CSV); respondInternal(stream, query, response); } - private void respondInternal(Stream> stream, DAQQuery query, ServletResponse response) + private void respondInternal(Stream> stream, DAQQuery query, HttpServletResponse response) throws Exception { + Set queryFields = query.getFields(); List aggregations = query.getAggregations(); + int aggregationsSize = (aggregations != null ? aggregations.size() : 0); - Set fieldMapping = - new LinkedHashSet<>(queryFields.size() + (aggregations != null ? aggregations.size() : 0)); - List header = - new ArrayList<>(queryFields.size() + (aggregations != null ? aggregations.size() : 0)); + Set fieldMapping = new LinkedHashSet<>(queryFields.size() + aggregationsSize); + List header = new ArrayList<>(queryFields.size() + aggregationsSize); + + CellProcessor[] processors = setupCellProcessors(query, fieldMapping, header); + + OutputStream out = handleCompressionAndResponseHeaders(query, response, CONTENT_TYPE_CSV); + ICsvDozerBeanWriter beanWriter = setupBeanWriter(fieldMapping, out); + + writeToOutput(stream, header, processors, beanWriter); + + } + + /** + * Sets up the bean writer instance. + * @throws Exception + * + */ + private ICsvDozerBeanWriter setupBeanWriter(Set fieldMapping, OutputStream out) throws Exception { + CsvPreference preference = new CsvPreference.Builder( + (char) CsvPreference.EXCEL_NORTH_EUROPE_PREFERENCE.getQuoteChar(), + DELIMITER_CVS, + CsvPreference.EXCEL_NORTH_EUROPE_PREFERENCE.getEndOfLineSymbols()).build(); + + ICsvDozerBeanWriter beanWriter = new CsvDozerBeanWriter(new OutputStreamWriter(out), preference); + // configure the mapping from the fields to the CSV columns + beanWriter.configureBeanMapping(DataEvent.class, fieldMapping.toArray(new String[fieldMapping.size()])); + return beanWriter; + } + + /** + * Sets up the array of {@link CellProcessor}s needed for later configuration of the bean writer. + * + * Cell processors are an integral part of reading and writing with Super CSV - they automate the + * data type conversions, and enforce constraints. They implement the chain of responsibility + * design pattern - each processor has a single, well-defined purpose and can be chained together + * with other processors to fully automate all of the required conversions and constraint + * validation for a single CSV column. + * + * @param query The current {@link DAQQuery} + * @return Array of {@link CellProcessor} entries + */ + private CellProcessor[] setupCellProcessors(DAQQuery query, Set fieldMapping, List header) { + Set queryFields = query.getFields(); + List aggregations = query.getAggregations(); + List processorSet = new ArrayList<>(queryFields.size() + (aggregations != null ? aggregations.size() : 0)); + boolean isNewField; QueryAnalyzer queryAnalyzer = queryAnalizerFactory.apply(query); - + for (QueryField field : queryFields) { if(!(QueryField.value.equals(field) && queryAnalyzer.isAggregationEnabled())){ isNewField = fieldMapping.add(field.name()); - + if (isNewField) { header.add(field.name()); processorSet.add(new ArrayProcessor(new NotNull(), DELIMITER_ARRAY)); } } } + if (aggregations != null && queryAnalyzer.isAggregationEnabled()) { for (Aggregation aggregation : query.getAggregations()) { isNewField = fieldMapping.add("value." + aggregation.name()); - + if (isNewField) { header.add(aggregation.name()); processorSet.add(new ArrayProcessor(new NotNull(), DELIMITER_ARRAY)); } } } + return processorSet.toArray(new CellProcessor[processorSet.size()]); + } - CellProcessor[] processors = processorSet.toArray(new CellProcessor[processorSet.size()]); - CsvPreference preference = new CsvPreference.Builder( - (char) CsvPreference.EXCEL_NORTH_EUROPE_PREFERENCE.getQuoteChar(), - DELIMITER_CVS, - CsvPreference.EXCEL_NORTH_EUROPE_PREFERENCE.getEndOfLineSymbols()).build(); - ICsvDozerBeanWriter beanWriter = new CsvDozerBeanWriter(response.getWriter(), preference); - + @SuppressWarnings("unchecked") + private void writeToOutput(Stream> stream, List header, CellProcessor[] processors, + ICsvDozerBeanWriter beanWriter) throws IOException, Exception { AtomicReference exception = new AtomicReference<>(); try { - // configure the mapping from the fields to the CSV columns - beanWriter.configureBeanMapping(DataEvent.class, fieldMapping.toArray(new String[fieldMapping.size()])); beanWriter.writeHeader(header.toArray(new String[header.size()])); @@ -106,28 +150,27 @@ public class CSVResponseStreamWriter implements ResponseStreamWriter { /* ensure elements are sequentially written */ .sequential() .forEach( - entry -> { - if (entry.getValue() instanceof Stream) { - - @SuppressWarnings("unchecked") - Stream eventStream = (Stream) entry.getValue(); - eventStream - .forEach( - event -> { - try { - beanWriter.write(event, processors); - } catch (Exception e) { - LOGGER.error("Could not write elements for '{}-{}'", event.getChannel(), - event.getPulseId(), e); - exception.compareAndSet(null, e); - } - }); - } else { - String message = "Type '" + entry.getValue().getClass() + "' not supported."; - LOGGER.error(message); - exception.compareAndSet(null, new RuntimeException(message)); - } - }); + entry -> { + if (entry.getValue() instanceof Stream) { + Stream eventStream = (Stream) entry.getValue(); + eventStream + .forEach( + event -> { + try { + beanWriter.write(event, processors); + } catch (Exception e) { + LOGGER.error("Could not write elements for '{}-{}'", event.getChannel(), + event.getPulseId(), e); + exception.compareAndSet(null, e); + } + }); + } else { + String message = "Type '" + entry.getValue().getClass() + "' not supported."; + LOGGER.error(message); + exception.compareAndSet(null, new RuntimeException(message)); + } + } + ); } finally { beanWriter.close(); } diff --git a/src/main/java/ch/psi/daq/queryrest/response/json/JSONResponseStreamWriter.java b/src/main/java/ch/psi/daq/queryrest/response/json/JSONResponseStreamWriter.java index 47e8a7a..850c73a 100644 --- a/src/main/java/ch/psi/daq/queryrest/response/json/JSONResponseStreamWriter.java +++ b/src/main/java/ch/psi/daq/queryrest/response/json/JSONResponseStreamWriter.java @@ -1,20 +1,27 @@ package ch.psi.daq.queryrest.response.json; import java.io.IOException; -import java.util.Map.Entry; +import java.io.OutputStream; import java.util.LinkedHashSet; import java.util.List; +import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Stream; import javax.annotation.Resource; import javax.servlet.ServletResponse; +import javax.servlet.http.HttpServletResponse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.http.MediaType; +import ch.psi.daq.query.model.Aggregation; +import ch.psi.daq.query.model.QueryField; +import ch.psi.daq.query.model.impl.DAQQuery; +import ch.psi.daq.queryrest.response.AbstractResponseStreamWriter; + import com.fasterxml.jackson.core.JsonEncoding; import com.fasterxml.jackson.core.JsonFactory; import com.fasterxml.jackson.core.JsonGenerator; @@ -23,16 +30,11 @@ import com.fasterxml.jackson.databind.ObjectWriter; import com.fasterxml.jackson.databind.ser.impl.SimpleBeanPropertyFilter; import com.fasterxml.jackson.databind.ser.impl.SimpleFilterProvider; -import ch.psi.daq.query.model.Aggregation; -import ch.psi.daq.query.model.QueryField; -import ch.psi.daq.query.model.impl.DAQQuery; -import ch.psi.daq.queryrest.response.ResponseStreamWriter; - /** * Takes a Java 8 stream and writes it to the output stream provided by the {@link ServletResponse} * of the current request. */ -public class JSONResponseStreamWriter implements ResponseStreamWriter { +public class JSONResponseStreamWriter extends AbstractResponseStreamWriter { private static final String DATA_RESP_FIELD = "data"; @@ -45,14 +47,15 @@ public class JSONResponseStreamWriter implements ResponseStreamWriter { private ObjectMapper mapper; @Override - public void respond(Stream> stream, DAQQuery query, ServletResponse response) throws Exception { + public void respond(Stream> stream, DAQQuery query, HttpServletResponse response) throws Exception { response.setCharacterEncoding(JsonEncoding.UTF8.getJavaName()); response.setContentType(MediaType.APPLICATION_JSON_VALUE); Set includedFields = getFields(query); ObjectWriter writer = configureWriter(includedFields); - respondInternal(stream, response, writer); + + respondInternal(stream, response, writer, query); } protected Set getFields(DAQQuery query) { @@ -99,11 +102,14 @@ public class JSONResponseStreamWriter implements ResponseStreamWriter { * @param stream Mapping from channel name to data * @param response {@link ServletResponse} instance given by the current HTTP request * @param writer configured writer that includes the fields the end user wants to see + * @param query * @throws IOException thrown if writing to the output stream fails */ - private void respondInternal(Stream> stream, ServletResponse response, ObjectWriter writer) + private void respondInternal(Stream> stream, HttpServletResponse response, ObjectWriter writer, DAQQuery query) throws Exception { - JsonGenerator generator = jsonFactory.createGenerator(response.getOutputStream(), JsonEncoding.UTF8); + + OutputStream out = handleCompressionAndResponseHeaders(query, response, CONTENT_TYPE_JSON); + JsonGenerator generator = jsonFactory.createGenerator(out, JsonEncoding.UTF8); AtomicReference exception = new AtomicReference<>(); try { diff --git a/src/test/java/ch/psi/daq/test/queryrest/controller/QueryRestControllerCsvTest.java b/src/test/java/ch/psi/daq/test/queryrest/controller/QueryRestControllerCsvTest.java index 03ea21c..037397b 100644 --- a/src/test/java/ch/psi/daq/test/queryrest/controller/QueryRestControllerCsvTest.java +++ b/src/test/java/ch/psi/daq/test/queryrest/controller/QueryRestControllerCsvTest.java @@ -29,13 +29,14 @@ import ch.psi.daq.cassandra.request.range.RequestRangePulseId; import ch.psi.daq.cassandra.request.range.RequestRangeTime; import ch.psi.daq.cassandra.util.test.CassandraDataGen; import ch.psi.daq.common.ordering.Ordering; +import ch.psi.daq.domain.ResponseFormat; import ch.psi.daq.query.model.Aggregation; import ch.psi.daq.query.model.AggregationType; import ch.psi.daq.query.model.QueryField; import ch.psi.daq.query.model.impl.DAQQuery; import ch.psi.daq.queryrest.controller.QueryRestController; import ch.psi.daq.queryrest.filter.CorsFilter; -import ch.psi.daq.queryrest.response.csv.CSVResponseStreamWriter; +import ch.psi.daq.queryrest.response.AbstractResponseStreamWriter; import ch.psi.daq.test.cassandra.admin.CassandraTestAdmin; import ch.psi.daq.test.queryrest.AbstractDaqRestTest; @@ -68,6 +69,9 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest { 0, 1), TEST_CHANNEL_NAMES); + request.setCompressed(false); + request.setResponseFormat(ResponseFormat.CSV); + LinkedHashSet queryFields = new LinkedHashSet<>(); LinkedHashSet cellProcessors = new LinkedHashSet<>(); queryFields.add(QueryField.channel); @@ -96,7 +100,6 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest { MvcResult result = this.mockMvc .perform(MockMvcRequestBuilders .post(QueryRestController.QUERY) - .accept(CSVResponseStreamWriter.APPLICATION_CSV_VALUE) .contentType(MediaType.APPLICATION_JSON) .content(content)) .andDo(MockMvcResultHandlers.print()) @@ -143,6 +146,9 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest { 0, 1), channelName); + request.setCompressed(false); + request.setResponseFormat(ResponseFormat.CSV); + LinkedHashSet queryFields = new LinkedHashSet<>(); LinkedHashSet cellProcessors = new LinkedHashSet<>(); queryFields.add(QueryField.channel); @@ -171,7 +177,6 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest { MvcResult result = this.mockMvc .perform(MockMvcRequestBuilders .post(QueryRestController.QUERY) - .accept(CSVResponseStreamWriter.APPLICATION_CSV_VALUE) .contentType(MediaType.APPLICATION_JSON) .content(content)) .andDo(MockMvcResultHandlers.print()) @@ -213,6 +218,9 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest { 0, 10), TEST_CHANNEL_NAMES); + request.setCompressed(false); + request.setResponseFormat(ResponseFormat.CSV); + LinkedHashSet queryFields = new LinkedHashSet<>(); LinkedHashSet cellProcessors = new LinkedHashSet<>(); queryFields.add(QueryField.channel); @@ -241,7 +249,6 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest { MvcResult result = this.mockMvc .perform(MockMvcRequestBuilders .post(QueryRestController.QUERY) - .accept(CSVResponseStreamWriter.APPLICATION_CSV_VALUE) .contentType(MediaType.APPLICATION_JSON) .content(content)) .andDo(MockMvcResultHandlers.print()) @@ -289,6 +296,9 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest { startDate, endDate), TEST_CHANNEL_NAMES); + request.setCompressed(false); + request.setResponseFormat(ResponseFormat.CSV); + LinkedHashSet queryFields = new LinkedHashSet<>(); LinkedHashSet cellProcessors = new LinkedHashSet<>(); queryFields.add(QueryField.channel); @@ -317,7 +327,6 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest { MvcResult result = this.mockMvc .perform(MockMvcRequestBuilders .post(QueryRestController.QUERY) - .accept(CSVResponseStreamWriter.APPLICATION_CSV_VALUE) .contentType(MediaType.APPLICATION_JSON) .content(content)) .andDo(MockMvcResultHandlers.print()) @@ -366,6 +375,8 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest { Ordering.asc, AggregationType.extrema, TEST_CHANNEL_NAMES[0]); + request.setCompressed(false); + request.setResponseFormat(ResponseFormat.CSV); String content = mapper.writeValueAsString(request); @@ -373,7 +384,6 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest { this.mockMvc .perform(MockMvcRequestBuilders .post(QueryRestController.QUERY) - .accept(CSVResponseStreamWriter.APPLICATION_CSV_VALUE) .contentType(MediaType.APPLICATION_JSON) .content(content)) .andDo(MockMvcResultHandlers.print()); @@ -397,6 +407,8 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest { Ordering.asc, AggregationType.index, TEST_CHANNEL_NAMES[0]); + request.setCompressed(false); + request.setResponseFormat(ResponseFormat.CSV); String content = mapper.writeValueAsString(request); @@ -404,7 +416,6 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest { this.mockMvc .perform(MockMvcRequestBuilders .post(QueryRestController.QUERY) - .accept(CSVResponseStreamWriter.APPLICATION_CSV_VALUE) .contentType(MediaType.APPLICATION_JSON) .content(content)) .andDo(MockMvcResultHandlers.print()); @@ -430,6 +441,8 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest { endDate), TEST_CHANNEL_01); request.setNrOfBins(2); + request.setCompressed(false); + request.setResponseFormat(ResponseFormat.CSV); LinkedHashSet queryFields = new LinkedHashSet<>(); LinkedHashSet cellProcessors = new LinkedHashSet<>(); @@ -467,7 +480,6 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest { MvcResult result = this.mockMvc .perform(MockMvcRequestBuilders .post(QueryRestController.QUERY) - .accept(CSVResponseStreamWriter.APPLICATION_CSV_VALUE) .contentType(MediaType.APPLICATION_JSON) .content(content)) .andDo(MockMvcResultHandlers.print()) @@ -515,6 +527,8 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest { endDate), TEST_CHANNEL_01); request.setBinSize(100); + request.setCompressed(false); + request.setResponseFormat(ResponseFormat.CSV); LinkedHashSet queryFields = new LinkedHashSet<>(); LinkedHashSet cellProcessors = new LinkedHashSet<>(); @@ -552,7 +566,6 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest { MvcResult result = this.mockMvc .perform(MockMvcRequestBuilders .post(QueryRestController.QUERY) - .accept(CSVResponseStreamWriter.APPLICATION_CSV_VALUE) .contentType(MediaType.APPLICATION_JSON) .content(content)) .andDo(MockMvcResultHandlers.print()) diff --git a/src/test/java/ch/psi/daq/test/queryrest/controller/QueryRestControllerJsonTest.java b/src/test/java/ch/psi/daq/test/queryrest/controller/QueryRestControllerJsonTest.java index bd36005..f35091a 100644 --- a/src/test/java/ch/psi/daq/test/queryrest/controller/QueryRestControllerJsonTest.java +++ b/src/test/java/ch/psi/daq/test/queryrest/controller/QueryRestControllerJsonTest.java @@ -173,6 +173,7 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest { 10, 11), TEST_CHANNEL_NAMES); + request.setCompressed(false); String content = mapper.writeValueAsString(request); System.out.println(content); @@ -180,7 +181,6 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest { this.mockMvc .perform(MockMvcRequestBuilders .post(QueryRestController.QUERY) - .accept(MediaType.APPLICATION_JSON) .contentType(MediaType.APPLICATION_JSON) .content(content)) @@ -209,12 +209,12 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest { 100, 110), TEST_CHANNEL_NAMES); + request.setCompressed(false); String content = mapper.writeValueAsString(request); this.mockMvc.perform(MockMvcRequestBuilders .post(QueryRestController.QUERY) - .accept(MediaType.APPLICATION_JSON) .contentType(MediaType.APPLICATION_JSON) .content(content)) @@ -246,6 +246,7 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest { startDate, endDate), TEST_CHANNEL_NAMES); + request.setCompressed(false); String content = mapper.writeValueAsString(request); System.out.println(content); @@ -254,7 +255,6 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest { .perform( MockMvcRequestBuilders .post(QueryRestController.QUERY) - .accept(MediaType.APPLICATION_JSON) .contentType(MediaType.APPLICATION_JSON) .content(content) ) @@ -287,12 +287,12 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest { Ordering.asc, AggregationType.extrema, TEST_CHANNEL_NAMES[0]); + request.setCompressed(false); String content = mapper.writeValueAsString(request); this.mockMvc .perform(MockMvcRequestBuilders.post(QueryRestController.QUERY) - .accept(MediaType.APPLICATION_JSON) .contentType(MediaType.APPLICATION_JSON) .content(content)) @@ -331,6 +331,7 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest { endDate), TEST_CHANNEL_01); request.setNrOfBins(2); + request.setCompressed(false); String content = mapper.writeValueAsString(request); System.out.println(content); @@ -339,7 +340,6 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest { .perform( MockMvcRequestBuilders .post(QueryRestController.QUERY) - .accept(MediaType.APPLICATION_JSON) .contentType(MediaType.APPLICATION_JSON) .content(content) ) @@ -369,6 +369,7 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest { endDate), TEST_CHANNEL_01); request.setBinSize(100); + request.setCompressed(false); String content = mapper.writeValueAsString(request); System.out.println(content); @@ -377,7 +378,6 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest { .perform( MockMvcRequestBuilders .post(QueryRestController.QUERY) - .accept(MediaType.APPLICATION_JSON) .contentType(MediaType.APPLICATION_JSON) .content(content) ) From a2ffcab63734f7e2fce705431564e85277c85ca1 Mon Sep 17 00:00:00 2001 From: Zellweger Christof Ralf Date: Mon, 21 Dec 2015 15:14:50 +0100 Subject: [PATCH 2/5] sf_daq/ch.psi.daq.queryrest#1 - implementing logic to implement the 'compression' attribute - adjusting tests - updating readme.md --- Readme.md | 54 ++++++++--------- daqlocal-compression-benchmark.csv | 5 ++ .../AbstractResponseStreamWriter.java | 5 +- .../QueryRestControllerCsvTest.java | 17 +++--- .../QueryRestControllerJsonTest.java | 58 +++++++++++++++++-- 5 files changed, 92 insertions(+), 47 deletions(-) create mode 100644 daqlocal-compression-benchmark.csv diff --git a/Readme.md b/Readme.md index 1824057..b3ad2c4 100644 --- a/Readme.md +++ b/Readme.md @@ -132,22 +132,16 @@ Queries are applied to a range. The following types of ranges ranges are support ## Query Data -### `compressed`: data is compressed by default +### `compression`: compression of data can be enabled -To save bandwidth, all data transferred from the server to the client is compressed (gzipped) by default. In case compressing the data is too processor-intense, it can be disabled by specifying `compressed=false` in the body of the request. +By default, no data is compressed when transferred from the server to the client. However, compression can be enabled by setting the `compression` attribute to a value other than `none`, i.e. to `gzip` or `deflate`. -Because of this, we have to tell `curl` that the data is compressed so that it is being decompressed automatically. `curl` decompresses the response when the `--compressed` parameter is set: +If compression is enabled, we have to tell `curl` that the data is compressed so that it is being decompressed automatically. `curl` decompresses the response when the `--compressed` parameter is set: #### Example ```bash -curl --compressed -H "Content-Type: application/json" -X POST -d '{"range":{"startPulseId":0,"endPulseId":3},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query -``` - -If we want the raw data uncompressed from the server, we have to specify this in the query body parameter with by specifying `compressed=false`: - -```bash -curl -H "Content-Type: application/json" -X POST -d '{"compressed":false,"range":{"startPulseId":0,"endPulseId":3},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query +curl --compressed -H "Content-Type: application/json" -X POST -d '{"compression":"gzip","range":{"startPulseId":0,"endPulseId":3},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query ``` @@ -160,7 +154,7 @@ CSV export does not support `index` and `extrema` aggregations. #### Example ```bash -curl --compressed -H "Content-Type: application/json" -X POST -d '{"responseFormat":"csv","range":{"startPulseId":0,"endPulseId":4},"channels":["channel1","channel2"],"fields":["channel","pulseId","iocMillis","iocNanos","globalMillis","globalNanos","shape","eventCount","value"]}' http://data-api.psi.ch/sf/query +curl -H "Content-Type: application/json" -X POST -d '{"responseFormat":"csv","range":{"startPulseId":0,"endPulseId":4},"channels":["channel1","channel2"],"fields":["channel","pulseId","iocMillis","iocNanos","globalMillis","globalNanos","shape","eventCount","value"]}' http://data-api.psi.ch/sf/query ``` #### Response example @@ -197,29 +191,29 @@ The following attributes can be specified: - **channels**: Array of channel names to be queried. - **range**: The range of the query (see [Query Range](Readme.md#query_range)). -- **ordering**: The ordering of the data (see [here](https://github.psi.ch/projects/ST/repos/ch.psi.daq.common/browse/src/main/java/ch/psi/daq/common/ordering/Ordering.java) for possible values). -- **fields**: The requested fields (see [here](https://github.psi.ch/projects/ST/repos/ch.psi.daq.query/browse/src/main/java/ch/psi/daq/query/model/QueryField.java) for possible values). +- **ordering**: The ordering of the data (see [here](https://git.psi.ch/sf_daq/ch.psi.daq.common/blob/master/src/main/java/ch/psi/daq/common/ordering/Ordering.java) for possible values). +- **fields**: The requested fields (see [here](https://git.psi.ch/sf_daq/ch.psi.daq.query/blob/master/src/main/java/ch/psi/daq/query/model/QueryField.java) for possible values). - **nrOfBins**: Activates data binning. Specifies the number of bins the pulse/time range should be divided into. - **binSize**: Activates data binning. Specifies the number of pulses per bin for pulse-range queries or the number of milliseconds per bin for time-range queries (using number of pulses and number of milliseconds makes this binning strategy consistent between channel with different update frequencies). -- **aggregations**: Activates data aggregation. Array of requested aggregations (see [here](https://github.psi.ch/projects/ST/repos/ch.psi.daq.query/browse/src/main/java/ch/psi/daq/query/model/Aggregation.java) for possible values). These values will be added to the *data* array response. -- **aggregationType**: Specifies the type of aggregation (see [here](https://github.psi.ch/projects/ST/repos/ch.psi.daq.query/browse/src/main/java/ch/psi/daq/query/model/AggregationType.java)). The default type is *value* aggregation (e.g., sum([1,2,3])=6). Alternatively, it is possible to define *index* aggregation for multiple arrays in combination with binning (e.g., sum([1,2,3], [3,2,1]) = [4,4,4]). +- **aggregations**: Activates data aggregation. Array of requested aggregations (see [here](https://git.psi.ch/sf_daq/ch.psi.daq.query/blob/master/src/main/java/ch/psi/daq/query/model/Aggregation.java) for possible values). These values will be added to the *data* array response. +- **aggregationType**: Specifies the type of aggregation (see [here](https://git.psi.ch/sf_daq/ch.psi.daq.query/blob/master/src/main/java/ch/psi/daq/query/model/AggregationType.java)). The default type is *value* aggregation (e.g., sum([1,2,3])=6). Alternatively, it is possible to define *index* aggregation for multiple arrays in combination with binning (e.g., sum([1,2,3], [3,2,1]) = [4,4,4]). - **aggregateChannels**: Specifies whether the data of the requested channels should be combined together using the defined aggregation (values: true|**false**) - **dbMode**: Defines the database to access (values: **databuffer**|archiverappliance) -- **compressed**: Defines whether the response should be compressed or not (values: **true**|false) -- **responseFormat**: Specifies the format the response of the requested data is in, either in JSON or CSV format (values: **json**|csv) +- **compression**: Defines the compression algorithm to use, default value is **none**, see all values [here](https://git.psi.ch/sf_daq/ch.psi.daq.domain/blob/master/src/main/java/ch/psi/daq/domain/Compression.java)) +- **responseFormat**: Specifies the format the response of the requested data is in, either in JSON or CSV format, default value **json**, see all values [here](https://git.psi.ch/sf_daq/ch.psi.daq.domain/blob/master/src/main/java/ch/psi/daq/domain/ResponseType.java)) ### Example Compressed data but uncompressed by `curl`: ```bash -curl --compressed -H "Content-Type: application/json" -X POST -d '{"range":{"startPulseId":0,"endPulseId":4},"channels":["channel1","channel2"]}' http://data-api.psi.ch/sf/query +curl --compressed -H "Content-Type: application/json" -X POST -d '{"compression":"deflate","range":{"startPulseId":0,"endPulseId":4},"channels":["channel1","channel2"]}' http://data-api.psi.ch/sf/query ``` Raw, uncompressed data (returns non-human-readable data): ```bash -curl -H "Content-Type: application/json" -X POST -d '{"compressed": false,"range":{"startPulseId":0,"endPulseId":4},"channels":["channel1","channel2"]}' http://data-api.psi.ch/sf/query +curl -H "Content-Type: application/json" -X POST -d '{"range":{"startPulseId":0,"endPulseId":4},"channels":["channel1","channel2"]}' http://data-api.psi.ch/sf/query ``` ### Response example @@ -360,7 +354,7 @@ The following examples build on waveform data (see below). They also work for sc ###### Command ```bash -curl --compressed -H "Content-Type: application/json" -X POST -d '{"range":{"startPulseId":0,"endPulseId":3},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query +curl -H "Content-Type: application/json" -X POST -d '{"range":{"startPulseId":0,"endPulseId":3},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query ``` ###### Response @@ -388,7 +382,7 @@ See JSON representation of the data above. ###### Command ```bash -curl --compressed -H "Content-Type: application/json" -X POST -d '{"range":{"startMillis":0,"startNanos":0,"endMillis":30,"endNanos":999999},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query +curl -H "Content-Type: application/json" -X POST -d '{"range":{"startMillis":0,"startNanos":0,"endMillis":30,"endNanos":999999},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query ``` ###### Response @@ -418,7 +412,7 @@ Supported format is ISO8601 *YYYY-MM-DDThh:mm:ss.sTZD* (e.g. *1997-07-16T19:20:3 ###### Command ```bash -curl --compressed -H "Content-Type: application/json" -X POST -d '{"range":{"startDate":"1970-01-01T01:00:00.000","startNanos":0,"endDate":"1970-01-01T01:00:00.030","endNanos":999999},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query +curl -H "Content-Type: application/json" -X POST -d '{"range":{"startDate":"1970-01-01T01:00:00.000","startNanos":0,"endDate":"1970-01-01T01:00:00.030","endNanos":999999},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query ``` ###### Response @@ -450,7 +444,7 @@ Archiver Appliance supports queries by *time range* and *date range* only (as it ###### Command ```bash -curl --compressed -H "Content-Type: application/json" -X POST -d '{"dbmode":"archiverappliance","range":{"startMillis":0,"startNanos":0,"endMillis":30,"endNanos":999999},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query +curl -H "Content-Type: application/json" -X POST -d '{"dbmode":"archiverappliance","range":{"startMillis":0,"startNanos":0,"endMillis":30,"endNanos":999999},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query ``` ###### Response @@ -479,7 +473,7 @@ Allows for server side optimizations since not all data needs to be retrieved. ###### Command ```bash -curl --compressed -H "Content-Type: application/json" -X POST -d '{"fields":["pulseId","value"],"range":{"startPulseId":0,"endPulseId":3},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query +curl -H "Content-Type: application/json" -X POST -d '{"fields":["pulseId","value"],"range":{"startPulseId":0,"endPulseId":3},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query ``` ###### Response @@ -532,7 +526,7 @@ Use **none** in case ordering does not matter (allows for server side optimizati ###### Command ```bash -curl --compressed -H "Content-Type: application/json" -X POST -d '{"ordering":"desc","fields":["pulseId","value"],"range":{"startPulseId":0,"endPulseId":3},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query +curl -H "Content-Type: application/json" -X POST -d '{"ordering":"desc","fields":["pulseId","value"],"range":{"startPulseId":0,"endPulseId":3},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query ``` ###### Response @@ -585,7 +579,7 @@ curl --compressed -H "Content-Type: application/json" -X POST -d '{"ordering":"d ###### Command ```bash -curl --compressed -H "Content-Type: application/json" -X POST -d '{"aggregationType":"value","aggregations":["min","max","mean"],"fields":["pulseId","value"],"range":{"startPulseId":0,"endPulseId":3},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query +curl -H "Content-Type: application/json" -X POST -d '{"aggregationType":"value","aggregations":["min","max","mean"],"fields":["pulseId","value"],"range":{"startPulseId":0,"endPulseId":3},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query ``` ###### Response @@ -659,7 +653,7 @@ Array value [aggregations](https://github.psi.ch/projects/ST/repos/ch.psi.daq.qu ###### Command ```bash -curl --compressed -H "Content-Type: application/json" -X POST -d '{"nrOfBins":2,"aggregationType":"value","aggregations":["min","max","mean"],"fields":["pulseId","value"],"range":{"startPulseId":0,"endPulseId":3},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query +curl -H "Content-Type: application/json" -X POST -d '{"nrOfBins":2,"aggregationType":"value","aggregations":["min","max","mean"],"fields":["pulseId","value"],"range":{"startPulseId":0,"endPulseId":3},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query ``` ###### Response @@ -718,7 +712,7 @@ Array value [aggregations](https://github.psi.ch/projects/ST/repos/ch.psi.daq.qu ###### Command ```bash -curl --compressed -H "Content-Type: application/json" -X POST -d '{"binSize":10,"aggregationType":"value","aggregations":["min","max","mean"],"fields":["globalMillis","value"],"range":{"globalMillis":0,"globalMillis":3},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query +curl -H "Content-Type: application/json" -X POST -d '{"binSize":10,"aggregationType":"value","aggregations":["min","max","mean"],"fields":["globalMillis","value"],"range":{"globalMillis":0,"globalMillis":3},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query ``` ###### Response @@ -775,7 +769,7 @@ Array value [aggregations](https://github.psi.ch/projects/ST/repos/ch.psi.daq.qu ###### Command ```bash -curl --compressed -H "Content-Type: application/json" -X POST -d '{"nrOfBins":1,"aggregationType":"index","aggregations":["min","max","mean","sum"],"fields":["pulseId","value"],"range":{"startPulseId":0,"endPulseId":3},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query +curl -H "Content-Type: application/json" -X POST -d '{"nrOfBins":1,"aggregationType":"index","aggregations":["min","max","mean","sum"],"fields":["pulseId","value"],"range":{"startPulseId":0,"endPulseId":3},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query ``` ###### Response @@ -844,7 +838,7 @@ curl --compressed -H "Content-Type: application/json" -X POST -d '{"nrOfBins":1, ###### Command ```bash -curl --compressed -H "Content-Type: application/json" -X POST -d '{"aggregationType":"extrema","aggregations":["min","max","sum"],"fields":["pulseId","value"],"range":{"startPulseId":0,"endPulseId":3},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query +curl -H "Content-Type: application/json" -X POST -d '{"aggregationType":"extrema","aggregations":["min","max","sum"],"fields":["pulseId","value"],"range":{"startPulseId":0,"endPulseId":3},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query ``` ###### Response diff --git a/daqlocal-compression-benchmark.csv b/daqlocal-compression-benchmark.csv new file mode 100644 index 0000000..6074f04 --- /dev/null +++ b/daqlocal-compression-benchmark.csv @@ -0,0 +1,5 @@ +Query duration (s),none / time (s),none / space (mb),gzip / time (s),gzip / space (mb),deflate / time (s),deflate / space (mb) +10,1,35.3,5.1,16.2,5.1,16.2 +30,3,108,15.7,49.7,15.6,49.7 +60,6,208,30.4,95.5,30.1,95.5 +300,25,900,129,413,127,413 diff --git a/src/main/java/ch/psi/daq/queryrest/response/AbstractResponseStreamWriter.java b/src/main/java/ch/psi/daq/queryrest/response/AbstractResponseStreamWriter.java index 764f53b..295f1fe 100644 --- a/src/main/java/ch/psi/daq/queryrest/response/AbstractResponseStreamWriter.java +++ b/src/main/java/ch/psi/daq/queryrest/response/AbstractResponseStreamWriter.java @@ -4,7 +4,6 @@ package ch.psi.daq.queryrest.response; import java.io.OutputStream; -import java.util.zip.GZIPOutputStream; import javax.servlet.http.HttpServletResponse; @@ -44,8 +43,8 @@ public abstract class AbstractResponseStreamWriter implements ResponseStreamWrit response.addHeader("Content-Type", contentType); if (query.isCompressed()) { response.addHeader("Content-Disposition", "attachment; filename=data.gz"); - response.addHeader("Content-Encoding", "gzip"); - out = new GZIPOutputStream(out); + response.addHeader("Content-Encoding", query.getCompression().toString()); + out = query.getCompression().wrapStream(out); } else { response.addHeader("Content-Disposition", "attachment; filename=data.csv"); } diff --git a/src/test/java/ch/psi/daq/test/queryrest/controller/QueryRestControllerCsvTest.java b/src/test/java/ch/psi/daq/test/queryrest/controller/QueryRestControllerCsvTest.java index 037397b..4553239 100644 --- a/src/test/java/ch/psi/daq/test/queryrest/controller/QueryRestControllerCsvTest.java +++ b/src/test/java/ch/psi/daq/test/queryrest/controller/QueryRestControllerCsvTest.java @@ -29,6 +29,7 @@ import ch.psi.daq.cassandra.request.range.RequestRangePulseId; import ch.psi.daq.cassandra.request.range.RequestRangeTime; import ch.psi.daq.cassandra.util.test.CassandraDataGen; import ch.psi.daq.common.ordering.Ordering; +import ch.psi.daq.domain.Compression; import ch.psi.daq.domain.ResponseFormat; import ch.psi.daq.query.model.Aggregation; import ch.psi.daq.query.model.AggregationType; @@ -69,7 +70,7 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest { 0, 1), TEST_CHANNEL_NAMES); - request.setCompressed(false); + request.setCompression(Compression.NONE); request.setResponseFormat(ResponseFormat.CSV); LinkedHashSet queryFields = new LinkedHashSet<>(); @@ -146,7 +147,7 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest { 0, 1), channelName); - request.setCompressed(false); + request.setCompression(Compression.NONE); request.setResponseFormat(ResponseFormat.CSV); LinkedHashSet queryFields = new LinkedHashSet<>(); @@ -218,7 +219,7 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest { 0, 10), TEST_CHANNEL_NAMES); - request.setCompressed(false); + request.setCompression(Compression.NONE); request.setResponseFormat(ResponseFormat.CSV); LinkedHashSet queryFields = new LinkedHashSet<>(); @@ -296,7 +297,7 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest { startDate, endDate), TEST_CHANNEL_NAMES); - request.setCompressed(false); + request.setCompression(Compression.NONE); request.setResponseFormat(ResponseFormat.CSV); LinkedHashSet queryFields = new LinkedHashSet<>(); @@ -375,7 +376,7 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest { Ordering.asc, AggregationType.extrema, TEST_CHANNEL_NAMES[0]); - request.setCompressed(false); + request.setCompression(Compression.NONE); request.setResponseFormat(ResponseFormat.CSV); String content = mapper.writeValueAsString(request); @@ -407,7 +408,7 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest { Ordering.asc, AggregationType.index, TEST_CHANNEL_NAMES[0]); - request.setCompressed(false); + request.setCompression(Compression.NONE); request.setResponseFormat(ResponseFormat.CSV); String content = mapper.writeValueAsString(request); @@ -441,7 +442,7 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest { endDate), TEST_CHANNEL_01); request.setNrOfBins(2); - request.setCompressed(false); + request.setCompression(Compression.NONE); request.setResponseFormat(ResponseFormat.CSV); LinkedHashSet queryFields = new LinkedHashSet<>(); @@ -527,7 +528,7 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest { endDate), TEST_CHANNEL_01); request.setBinSize(100); - request.setCompressed(false); + request.setCompression(Compression.NONE); request.setResponseFormat(ResponseFormat.CSV); LinkedHashSet queryFields = new LinkedHashSet<>(); diff --git a/src/test/java/ch/psi/daq/test/queryrest/controller/QueryRestControllerJsonTest.java b/src/test/java/ch/psi/daq/test/queryrest/controller/QueryRestControllerJsonTest.java index f35091a..d3f8486 100644 --- a/src/test/java/ch/psi/daq/test/queryrest/controller/QueryRestControllerJsonTest.java +++ b/src/test/java/ch/psi/daq/test/queryrest/controller/QueryRestControllerJsonTest.java @@ -15,6 +15,7 @@ import ch.psi.daq.cassandra.request.range.RequestRangePulseId; import ch.psi.daq.cassandra.request.range.RequestRangeTime; import ch.psi.daq.cassandra.util.test.CassandraDataGen; import ch.psi.daq.common.ordering.Ordering; +import ch.psi.daq.domain.Compression; import ch.psi.daq.query.model.AggregationType; import ch.psi.daq.query.model.impl.DAQQuery; import ch.psi.daq.queryrest.controller.QueryRestController; @@ -173,7 +174,7 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest { 10, 11), TEST_CHANNEL_NAMES); - request.setCompressed(false); + request.setCompression(Compression.NONE); String content = mapper.writeValueAsString(request); System.out.println(content); @@ -209,7 +210,7 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest { 100, 110), TEST_CHANNEL_NAMES); - request.setCompressed(false); + request.setCompression(Compression.NONE); String content = mapper.writeValueAsString(request); @@ -246,7 +247,7 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest { startDate, endDate), TEST_CHANNEL_NAMES); - request.setCompressed(false); + request.setCompression(Compression.NONE); String content = mapper.writeValueAsString(request); System.out.println(content); @@ -287,7 +288,7 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest { Ordering.asc, AggregationType.extrema, TEST_CHANNEL_NAMES[0]); - request.setCompressed(false); + request.setCompression(Compression.NONE); String content = mapper.writeValueAsString(request); @@ -331,7 +332,7 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest { endDate), TEST_CHANNEL_01); request.setNrOfBins(2); - request.setCompressed(false); + request.setCompression(Compression.NONE); String content = mapper.writeValueAsString(request); System.out.println(content); @@ -369,7 +370,7 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest { endDate), TEST_CHANNEL_01); request.setBinSize(100); - request.setCompressed(false); + request.setCompression(Compression.NONE); String content = mapper.writeValueAsString(request); System.out.println(content); @@ -418,4 +419,49 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest { .andExpect(MockMvcResultMatchers.jsonPath("$[0].data[9].globalMillis").value(1900)) .andExpect(MockMvcResultMatchers.jsonPath("$[0].data[9].eventCount").value(10)); } + + @Test + public void testGzipCompression() throws Exception { + DAQQuery request = new DAQQuery( + new RequestRangePulseId( + 10, + 11), + TEST_CHANNEL_NAMES); + request.setCompression(Compression.GZIP); + + String content = mapper.writeValueAsString(request); + System.out.println(content); + + this.mockMvc + .perform(MockMvcRequestBuilders + .post(QueryRestController.QUERY) + .contentType(MediaType.APPLICATION_JSON) + .content(content)) + + .andDo(MockMvcResultHandlers.print()) + .andExpect(MockMvcResultMatchers.status().isOk()); + } + + @Test + public void testDeflateCompression() throws Exception { + DAQQuery request = new DAQQuery( + new RequestRangePulseId( + 10, + 11), + TEST_CHANNEL_NAMES); + request.setCompression(Compression.DEFLATE); + + String content = mapper.writeValueAsString(request); + System.out.println(content); + + this.mockMvc + .perform(MockMvcRequestBuilders + .post(QueryRestController.QUERY) + .contentType(MediaType.APPLICATION_JSON) + .content(content)) + + .andDo(MockMvcResultHandlers.print()) + .andExpect(MockMvcResultMatchers.status().isOk()); + } + } From c6f2010d674737992e753622534e37dc617278d8 Mon Sep 17 00:00:00 2001 From: Zellweger Christof Ralf Date: Tue, 22 Dec 2015 08:22:32 +0100 Subject: [PATCH 3/5] sf_daq/ch.psi.daq.queryrest#1: - updating docs, rearranging a little --- Readme.md | 195 +++++++++++++++++++++++++----------------------------- 1 file changed, 90 insertions(+), 105 deletions(-) diff --git a/Readme.md b/Readme.md index b3ad2c4..a618ff6 100644 --- a/Readme.md +++ b/Readme.md @@ -138,44 +138,12 @@ By default, no data is compressed when transferred from the server to the client If compression is enabled, we have to tell `curl` that the data is compressed so that it is being decompressed automatically. `curl` decompresses the response when the `--compressed` parameter is set: -#### Example - -```bash -curl --compressed -H "Content-Type: application/json" -X POST -d '{"compression":"gzip","range":{"startPulseId":0,"endPulseId":3},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query -``` - - ### `responseFormat`: data is in JSON by default Responses can be formatted as CSV or JSON using the `responseFormat` field. The returned data is JSON-formatted per default. CSV export does not support `index` and `extrema` aggregations. -#### Example - -```bash -curl -H "Content-Type: application/json" -X POST -d '{"responseFormat":"csv","range":{"startPulseId":0,"endPulseId":4},"channels":["channel1","channel2"],"fields":["channel","pulseId","iocMillis","iocNanos","globalMillis","globalNanos","shape","eventCount","value"]}' http://data-api.psi.ch/sf/query -``` - -#### Response example - -The response is in CSV. - -```text -channel;pulseId;iocMillis;iocNanos;globalMillis;globalNanos;shape;eventCount;value -testChannel1;0;0;0;0;0;[1];1;0 -testChannel1;1;10;0;10;0;[1];1;1 -testChannel1;2;20;0;20;0;[1];1;2 -testChannel1;3;30;0;30;0;[1];1;3 -testChannel1;4;40;0;40;0;[1];1;4 -testChannel2;0;0;0;0;0;[1];1;0 -testChannel2;1;10;0;10;0;[1];1;1 -testChannel2;2;20;0;20;0;[1];1;2 -testChannel2;3;30;0;30;0;[1];1;3 -testChannel2;4;40;0;40;0;[1];1;4 -``` - - ### Query request endpoint @@ -202,79 +170,6 @@ The following attributes can be specified: - **compression**: Defines the compression algorithm to use, default value is **none**, see all values [here](https://git.psi.ch/sf_daq/ch.psi.daq.domain/blob/master/src/main/java/ch/psi/daq/domain/Compression.java)) - **responseFormat**: Specifies the format the response of the requested data is in, either in JSON or CSV format, default value **json**, see all values [here](https://git.psi.ch/sf_daq/ch.psi.daq.domain/blob/master/src/main/java/ch/psi/daq/domain/ResponseType.java)) -### Example - -Compressed data but uncompressed by `curl`: - -```bash -curl --compressed -H "Content-Type: application/json" -X POST -d '{"compression":"deflate","range":{"startPulseId":0,"endPulseId":4},"channels":["channel1","channel2"]}' http://data-api.psi.ch/sf/query -``` - -Raw, uncompressed data (returns non-human-readable data): - -```bash -curl -H "Content-Type: application/json" -X POST -d '{"range":{"startPulseId":0,"endPulseId":4},"channels":["channel1","channel2"]}' http://data-api.psi.ch/sf/query -``` - -### Response example - -The response is in JSON. - -```json -[ - { - "channel":"channel1", - "data":[ - { - "pulseId":0, - "iocMillis":0, - "iocNanos":0, - "globalMillis":0, - "globalNanos":0, - "value":0 - }, - { - "pulseId":2, - "iocMillis":2, - "iocNanos":2, - "globalMillis":2, - "globalNanos":2, - "value":2 - }, - { - "pulseId":4, - "iocMillis":4, - "iocNanos":4, - "globalMillis":4, - "globalNanos":4, - "value":4 - } - ] - }, - { - "channel":"channel2", - "data":[ - { - "pulseId":1, - "iocMillis":1, - "iocNanos":1, - "globalMillis":1, - "globalNanos":1, - "value":1 - }, - { - "pulseId":3, - "iocMillis":3, - "iocNanos":3, - "globalMillis":3, - "globalNanos":3, - "value":3 - } - ] - } -] -``` - ### Example Queries The following examples build on waveform data (see below). They also work for scalars (consider it as a waveform of length = 1) and images (waveform of length = dimX * dimY). @@ -337,6 +232,96 @@ The following examples build on waveform data (see below). They also work for sc ### Query Examples +##### Query using compression + +```json +{ + "compression":"gzip", + "range":{ + "startPulseId":0, + "endPulseId":3 + }, + "channels":[ + "Channel_01" + ] +} +``` + +or `deflate` can be used too: + +```json +{ + "compression":"deflate", + "range":{ + "startPulseId":0, + "endPulseId":3 + }, + "channels":[ + "Channel_01" + ] +} +``` + +###### Command (gzip) + +The `curl` command has a `--compressed` option to decompress data automatically. + +```bash +curl --compressed -H "Content-Type: application/json" -X POST -d '{"compression":"gzip","range":{"startPulseId":0,"endPulseId":3},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query +``` + +##### Query setting CSV response format + +```json +{ + "responseFormat":"csv", + "range":{ + "startPulseId":0, + "endPulseId":4 + }, + "channels":[ + "channel1", + "channel2" + ], + "fields":[ + "channel", + "pulseId", + "iocMillis", + "iocNanos", + "globalMillis", + "globalNanos", + "shape", + "eventCount", + "value" + ] +} +``` + +###### Command + +```bash +curl -H "Content-Type: application/json" -X POST -d '{"responseFormat":"csv","range":{"startPulseId":0,"endPulseId":4},"channels":["channel1","channel2"],"fields":["channel","pulseId","iocMillis","iocNanos","globalMillis","globalNanos","shape","eventCount","value"]}' http://data-api.psi.ch/sf/query +``` + +###### Response + +The response is in CSV. + +```text +channel;pulseId;iocMillis;iocNanos;globalMillis;globalNanos;shape;eventCount;value +testChannel1;0;0;0;0;0;[1];1;0 +testChannel1;1;10;0;10;0;[1];1;1 +testChannel1;2;20;0;20;0;[1];1;2 +testChannel1;3;30;0;30;0;[1];1;3 +testChannel1;4;40;0;40;0;[1];1;4 +testChannel2;0;0;0;0;0;[1];1;0 +testChannel2;1;10;0;10;0;[1];1;1 +testChannel2;2;20;0;20;0;[1];1;2 +testChannel2;3;30;0;30;0;[1];1;3 +testChannel2;4;40;0;40;0;[1];1;4 +``` + + ##### Query by Pulse-Id Range ```json From d9158a8868450d2742b08229406825f001299e6c Mon Sep 17 00:00:00 2001 From: Zellweger Christof Ralf Date: Tue, 22 Dec 2015 13:12:03 +0100 Subject: [PATCH 4/5] sf_daq/ch.psi.daq.queryrest#1 --- .../daq/queryrest/config/QueryRestConfig.java | 3 +- .../AbstractResponseStreamWriter.java | 7 ++- .../QueryRestControllerCsvTest.java | 47 +++++++++++++++++- .../QueryRestControllerJsonTest.java | 48 ++++++++++++++----- 4 files changed, 87 insertions(+), 18 deletions(-) diff --git a/src/main/java/ch/psi/daq/queryrest/config/QueryRestConfig.java b/src/main/java/ch/psi/daq/queryrest/config/QueryRestConfig.java index e6cbd43..40c5864 100644 --- a/src/main/java/ch/psi/daq/queryrest/config/QueryRestConfig.java +++ b/src/main/java/ch/psi/daq/queryrest/config/QueryRestConfig.java @@ -58,8 +58,7 @@ public class QueryRestConfig extends WebMvcConfigurerAdapter { // a nested configuration // this guarantees that the ordering of the properties file is as expected - // see: - // https://jira.spring.io/browse/SPR-10409?focusedCommentId=101393&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-101393 + // see: https://jira.spring.io/browse/SPR-10409?focusedCommentId=101393&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-101393 @Configuration @Import({QueryConfig.class}) static class InnerConfiguration { diff --git a/src/main/java/ch/psi/daq/queryrest/response/AbstractResponseStreamWriter.java b/src/main/java/ch/psi/daq/queryrest/response/AbstractResponseStreamWriter.java index 295f1fe..45bea43 100644 --- a/src/main/java/ch/psi/daq/queryrest/response/AbstractResponseStreamWriter.java +++ b/src/main/java/ch/psi/daq/queryrest/response/AbstractResponseStreamWriter.java @@ -9,6 +9,7 @@ import javax.servlet.http.HttpServletResponse; import org.springframework.http.MediaType; +import ch.psi.daq.domain.ResponseFormat; import ch.psi.daq.query.model.impl.DAQQuery; @@ -42,11 +43,13 @@ public abstract class AbstractResponseStreamWriter implements ResponseStreamWrit response.addHeader("Content-Type", contentType); if (query.isCompressed()) { - response.addHeader("Content-Disposition", "attachment; filename=data.gz"); + String filename = "data." + query.getCompression().getFileSuffix(); + response.addHeader("Content-Disposition", "attachment; filename=" + filename); response.addHeader("Content-Encoding", query.getCompression().toString()); out = query.getCompression().wrapStream(out); } else { - response.addHeader("Content-Disposition", "attachment; filename=data.csv"); + String filename = "data." + (query.getResponseFormat() == ResponseFormat.CSV ? "csv" : "json"); + response.addHeader("Content-Disposition", "attachment; filename=" + filename); } return out; diff --git a/src/test/java/ch/psi/daq/test/queryrest/controller/QueryRestControllerCsvTest.java b/src/test/java/ch/psi/daq/test/queryrest/controller/QueryRestControllerCsvTest.java index 4553239..d3f7ec2 100644 --- a/src/test/java/ch/psi/daq/test/queryrest/controller/QueryRestControllerCsvTest.java +++ b/src/test/java/ch/psi/daq/test/queryrest/controller/QueryRestControllerCsvTest.java @@ -37,7 +37,6 @@ import ch.psi.daq.query.model.QueryField; import ch.psi.daq.query.model.impl.DAQQuery; import ch.psi.daq.queryrest.controller.QueryRestController; import ch.psi.daq.queryrest.filter.CorsFilter; -import ch.psi.daq.queryrest.response.AbstractResponseStreamWriter; import ch.psi.daq.test.cassandra.admin.CassandraTestAdmin; import ch.psi.daq.test.queryrest.AbstractDaqRestTest; @@ -601,4 +600,50 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest { mapReader.close(); } } + + @Test + public void testGzipFileSuffixHeader() throws Exception { + DAQQuery request = new DAQQuery( + new RequestRangePulseId( + 10, + 11), + TEST_CHANNEL_NAMES); + request.setResponseFormat(ResponseFormat.CSV); + request.setCompression(Compression.GZIP); + + String content = mapper.writeValueAsString(request); + + this.mockMvc + .perform(MockMvcRequestBuilders + .post(QueryRestController.QUERY) + .contentType(MediaType.APPLICATION_JSON) + .content(content)) + + .andDo(MockMvcResultHandlers.print()) + .andExpect(MockMvcResultMatchers.status().isOk()) + .andExpect(MockMvcResultMatchers.header().string("Content-Disposition", "attachment; filename=data.gz")); + } + + @Test + public void testJsonFileSuffixHeader() throws Exception { + DAQQuery request = new DAQQuery( + new RequestRangePulseId( + 10, + 11), + TEST_CHANNEL_NAMES); + request.setResponseFormat(ResponseFormat.CSV); + request.setCompression(Compression.NONE); + + String content = mapper.writeValueAsString(request); + + this.mockMvc + .perform(MockMvcRequestBuilders + .post(QueryRestController.QUERY) + .contentType(MediaType.APPLICATION_JSON) + .content(content)) + + .andDo(MockMvcResultHandlers.print()) + .andExpect(MockMvcResultMatchers.status().isOk()) + .andExpect(MockMvcResultMatchers.header().string("Content-Disposition", "attachment; filename=data.csv")); + } } diff --git a/src/test/java/ch/psi/daq/test/queryrest/controller/QueryRestControllerJsonTest.java b/src/test/java/ch/psi/daq/test/queryrest/controller/QueryRestControllerJsonTest.java index d3f8486..2d3ee83 100644 --- a/src/test/java/ch/psi/daq/test/queryrest/controller/QueryRestControllerJsonTest.java +++ b/src/test/java/ch/psi/daq/test/queryrest/controller/QueryRestControllerJsonTest.java @@ -441,27 +441,49 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest { .andDo(MockMvcResultHandlers.print()) .andExpect(MockMvcResultMatchers.status().isOk()); } - + @Test - public void testDeflateCompression() throws Exception { + public void testGzipFileSuffixHeader() throws Exception { DAQQuery request = new DAQQuery( new RequestRangePulseId( 10, 11), - TEST_CHANNEL_NAMES); - request.setCompression(Compression.DEFLATE); - + TEST_CHANNEL_NAMES); + request.setCompression(Compression.GZIP); + String content = mapper.writeValueAsString(request); - System.out.println(content); - + this.mockMvc - .perform(MockMvcRequestBuilders - .post(QueryRestController.QUERY) - .contentType(MediaType.APPLICATION_JSON) - .content(content)) - + .perform(MockMvcRequestBuilders + .post(QueryRestController.QUERY) + .contentType(MediaType.APPLICATION_JSON) + .content(content)) + .andDo(MockMvcResultHandlers.print()) - .andExpect(MockMvcResultMatchers.status().isOk()); + .andExpect(MockMvcResultMatchers.status().isOk()) + .andExpect(MockMvcResultMatchers.header().string("Content-Disposition", "attachment; filename=data.gz")); + } + + @Test + public void testJsonFileSuffixHeader() throws Exception { + DAQQuery request = new DAQQuery( + new RequestRangePulseId( + 10, + 11), + TEST_CHANNEL_NAMES); + request.setCompression(Compression.NONE); + + String content = mapper.writeValueAsString(request); + + this.mockMvc + .perform(MockMvcRequestBuilders + .post(QueryRestController.QUERY) + .contentType(MediaType.APPLICATION_JSON) + .content(content)) + + .andDo(MockMvcResultHandlers.print()) + .andExpect(MockMvcResultMatchers.status().isOk()) + .andExpect(MockMvcResultMatchers.header().string("Content-Disposition", "attachment; filename=data.json")); } } From 84c66ad73feeea4f5d12161d7c7032e0a1e56283 Mon Sep 17 00:00:00 2001 From: Zellweger Christof Ralf Date: Tue, 22 Dec 2015 13:20:47 +0100 Subject: [PATCH 5/5] sf_daq/ch.psi.daq.queryrest#1: - adjustments after moving classes --- .../ch/psi/daq/queryrest/controller/QueryRestController.java | 2 +- .../daq/queryrest/response/AbstractResponseStreamWriter.java | 2 +- .../test/queryrest/controller/QueryRestControllerCsvTest.java | 4 ++-- .../queryrest/controller/QueryRestControllerJsonTest.java | 2 +- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/main/java/ch/psi/daq/queryrest/controller/QueryRestController.java b/src/main/java/ch/psi/daq/queryrest/controller/QueryRestController.java index ea02349..18ebfe5 100644 --- a/src/main/java/ch/psi/daq/queryrest/controller/QueryRestController.java +++ b/src/main/java/ch/psi/daq/queryrest/controller/QueryRestController.java @@ -31,7 +31,6 @@ import ch.psi.daq.common.concurrent.singleton.Deferred; import ch.psi.daq.common.json.deserialize.AttributeBasedDeserializer; import ch.psi.daq.common.ordering.Ordering; import ch.psi.daq.domain.DataEvent; -import ch.psi.daq.domain.ResponseFormat; import ch.psi.daq.query.analyzer.QueryAnalyzer; import ch.psi.daq.query.config.QueryConfig; import ch.psi.daq.query.model.Aggregation; @@ -39,6 +38,7 @@ import ch.psi.daq.query.model.AggregationType; import ch.psi.daq.query.model.DBMode; import ch.psi.daq.query.model.Query; import ch.psi.daq.query.model.QueryField; +import ch.psi.daq.query.model.ResponseFormat; import ch.psi.daq.query.model.impl.DAQQuery; import ch.psi.daq.query.processor.QueryProcessor; import ch.psi.daq.query.request.ChannelsRequest; diff --git a/src/main/java/ch/psi/daq/queryrest/response/AbstractResponseStreamWriter.java b/src/main/java/ch/psi/daq/queryrest/response/AbstractResponseStreamWriter.java index 45bea43..a1f42f6 100644 --- a/src/main/java/ch/psi/daq/queryrest/response/AbstractResponseStreamWriter.java +++ b/src/main/java/ch/psi/daq/queryrest/response/AbstractResponseStreamWriter.java @@ -9,7 +9,7 @@ import javax.servlet.http.HttpServletResponse; import org.springframework.http.MediaType; -import ch.psi.daq.domain.ResponseFormat; +import ch.psi.daq.query.model.ResponseFormat; import ch.psi.daq.query.model.impl.DAQQuery; diff --git a/src/test/java/ch/psi/daq/test/queryrest/controller/QueryRestControllerCsvTest.java b/src/test/java/ch/psi/daq/test/queryrest/controller/QueryRestControllerCsvTest.java index d3f7ec2..9700f38 100644 --- a/src/test/java/ch/psi/daq/test/queryrest/controller/QueryRestControllerCsvTest.java +++ b/src/test/java/ch/psi/daq/test/queryrest/controller/QueryRestControllerCsvTest.java @@ -29,11 +29,11 @@ import ch.psi.daq.cassandra.request.range.RequestRangePulseId; import ch.psi.daq.cassandra.request.range.RequestRangeTime; import ch.psi.daq.cassandra.util.test.CassandraDataGen; import ch.psi.daq.common.ordering.Ordering; -import ch.psi.daq.domain.Compression; -import ch.psi.daq.domain.ResponseFormat; import ch.psi.daq.query.model.Aggregation; import ch.psi.daq.query.model.AggregationType; +import ch.psi.daq.query.model.Compression; import ch.psi.daq.query.model.QueryField; +import ch.psi.daq.query.model.ResponseFormat; import ch.psi.daq.query.model.impl.DAQQuery; import ch.psi.daq.queryrest.controller.QueryRestController; import ch.psi.daq.queryrest.filter.CorsFilter; diff --git a/src/test/java/ch/psi/daq/test/queryrest/controller/QueryRestControllerJsonTest.java b/src/test/java/ch/psi/daq/test/queryrest/controller/QueryRestControllerJsonTest.java index 2d3ee83..051fb3f 100644 --- a/src/test/java/ch/psi/daq/test/queryrest/controller/QueryRestControllerJsonTest.java +++ b/src/test/java/ch/psi/daq/test/queryrest/controller/QueryRestControllerJsonTest.java @@ -15,8 +15,8 @@ import ch.psi.daq.cassandra.request.range.RequestRangePulseId; import ch.psi.daq.cassandra.request.range.RequestRangeTime; import ch.psi.daq.cassandra.util.test.CassandraDataGen; import ch.psi.daq.common.ordering.Ordering; -import ch.psi.daq.domain.Compression; import ch.psi.daq.query.model.AggregationType; +import ch.psi.daq.query.model.Compression; import ch.psi.daq.query.model.impl.DAQQuery; import ch.psi.daq.queryrest.controller.QueryRestController; import ch.psi.daq.queryrest.filter.CorsFilter;