diff --git a/.settings/org.eclipse.jdt.core.prefs b/.settings/org.eclipse.jdt.core.prefs index 8afdfe4..bbb58d2 100644 --- a/.settings/org.eclipse.jdt.core.prefs +++ b/.settings/org.eclipse.jdt.core.prefs @@ -1,5 +1,5 @@ # -#Mon Dec 07 08:06:14 CET 2015 +#Mon Dec 14 16:07:41 CET 2015 org.eclipse.jdt.core.compiler.debug.localVariable=generate org.eclipse.jdt.core.compiler.compliance=1.8 org.eclipse.jdt.core.compiler.codegen.unusedLocal=preserve diff --git a/Readme.md b/Readme.md index 08977a6..4e72f5e 100644 --- a/Readme.md +++ b/Readme.md @@ -71,7 +71,7 @@ POST http://:/channels ### Example ```bash -curl -H "Content-Type: application/json" -X POST -d '{"regex": "TRFCA|TRFCB"}' http://data-api.psi.ch/sf/channels +curl -H "Accept: application/json" -H "Content-Type: application/json" -X POST -d '{"regex": "TRFCA|TRFCB"}' http://data-api.psi.ch/sf/channels ``` @@ -158,7 +158,7 @@ There exist following fields: ### Example ```bash -curl -H "Content-Type: application/json" -X POST -d '{"range":{"startPulseId":0,"endPulseId":4},"channels":["channel1","channel2"]}' http://data-api.psi.ch/sf/query +curl -H "Accept: application/json" -H "Content-Type: application/json" -X POST -d '{"range":{"startPulseId":0,"endPulseId":4},"channels":["channel1","channel2"]}' http://data-api.psi.ch/sf/query ``` ### Response example @@ -299,7 +299,7 @@ Following examples build on a waveform data (see below). They also work for scal ###### Command ```bash -curl -H "Content-Type: application/json" -X POST -d '{"range":{"startPulseId":0,"endPulseId":3},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query +curl -H "Accept: application/json" -H "Content-Type: application/json" -X POST -d '{"range":{"startPulseId":0,"endPulseId":3},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query ``` ###### Response @@ -327,7 +327,7 @@ See JSON representation of the data above. ###### Command ```bash -curl -H "Content-Type: application/json" -X POST -d '{"range":{"startMillis":0,"startNanos":0,"endMillis":30,"endNanos":999999},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query +curl -H "Accept: application/json" -H "Content-Type: application/json" -X POST -d '{"range":{"startMillis":0,"startNanos":0,"endMillis":30,"endNanos":999999},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query ``` ###### Response @@ -357,7 +357,7 @@ Supported format is ISO8601 *YYYY-MM-DDThh:mm:ss.sTZD* (e.g. *1997-07-16T19:20:3 ###### Command ```bash -curl -H "Content-Type: application/json" -X POST -d '{"range":{"startDate":"1970-01-01T01:00:00.000","startNanos":0,"endDate":"1970-01-01T01:00:00.030","endNanos":999999},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query +curl -H "Accept: application/json" -H "Content-Type: application/json" -X POST -d '{"range":{"startDate":"1970-01-01T01:00:00.000","startNanos":0,"endDate":"1970-01-01T01:00:00.030","endNanos":999999},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query ``` ###### Response @@ -389,7 +389,7 @@ Archiver Appliance supports queries by *time range* and *date range* only (as it ###### Command ```bash -curl -H "Content-Type: application/json" -X POST -d '{"dbmode":"archiverappliance","range":{"startMillis":0,"startNanos":0,"endMillis":30,"endNanos":999999},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query +curl -H "Accept: application/json" -H "Content-Type: application/json" -X POST -d '{"dbmode":"archiverappliance","range":{"startMillis":0,"startNanos":0,"endMillis":30,"endNanos":999999},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query ``` ###### Response @@ -418,7 +418,7 @@ Allows for server side optimizations since not all data needs to be retrieved. ###### Command ```bash -curl -H "Content-Type: application/json" -X POST -d '{"fields":["pulseId","value"],"range":{"startPulseId":0,"endPulseId":3},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query +curl -H "Accept: application/json" -H "Content-Type: application/json" -X POST -d '{"fields":["pulseId","value"],"range":{"startPulseId":0,"endPulseId":3},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query ``` ###### Response @@ -471,7 +471,7 @@ Use **none** in case ordering does not matter (allows for server side optimizati ###### Command ```bash -curl -H "Content-Type: application/json" -X POST -d '{"ordering":"desc","fields":["pulseId","value"],"range":{"startPulseId":0,"endPulseId":3},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query +curl -H "Accept: application/json" -H "Content-Type: application/json" -X POST -d '{"ordering":"desc","fields":["pulseId","value"],"range":{"startPulseId":0,"endPulseId":3},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query ``` ###### Response @@ -524,7 +524,7 @@ curl -H "Content-Type: application/json" -X POST -d '{"ordering":"desc","fields" ###### Command ```bash -curl -H "Content-Type: application/json" -X POST -d '{"aggregationType":"value","aggregations":["min","max","mean"],"fields":["pulseId","value"],"range":{"startPulseId":0,"endPulseId":3},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query +curl -H "Accept: application/json" -H "Content-Type: application/json" -X POST -d '{"aggregationType":"value","aggregations":["min","max","mean"],"fields":["pulseId","value"],"range":{"startPulseId":0,"endPulseId":3},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query ``` ###### Response @@ -598,7 +598,7 @@ Array value [aggregations](https://github.psi.ch/projects/ST/repos/ch.psi.daq.qu ###### Command ```bash -curl -H "Content-Type: application/json" -X POST -d '{"nrOfBins":2,"aggregationType":"value","aggregations":["min","max","mean"],"fields":["pulseId","value"],"range":{"startPulseId":0,"endPulseId":3},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query +curl -H "Accept: application/json" -H "Content-Type: application/json" -X POST -d '{"nrOfBins":2,"aggregationType":"value","aggregations":["min","max","mean"],"fields":["pulseId","value"],"range":{"startPulseId":0,"endPulseId":3},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query ``` ###### Response @@ -657,7 +657,7 @@ Array value [aggregations](https://github.psi.ch/projects/ST/repos/ch.psi.daq.qu ###### Command ```bash -curl -H "Content-Type: application/json" -X POST -d '{"binSize":10,"aggregationType":"value","aggregations":["min","max","mean"],"fields":["globalMillis","value"],"range":{"globalMillis":0,"globalMillis":3},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query +curl -H "Accept: application/json" -H "Content-Type: application/json" -X POST -d '{"binSize":10,"aggregationType":"value","aggregations":["min","max","mean"],"fields":["globalMillis","value"],"range":{"globalMillis":0,"globalMillis":3},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query ``` ###### Response @@ -714,7 +714,7 @@ Array value [aggregations](https://github.psi.ch/projects/ST/repos/ch.psi.daq.qu ###### Command ```bash -curl -H "Content-Type: application/json" -X POST -d '{"nrOfBins":1,"aggregationType":"index","aggregations":["min","max","mean","sum"],"fields":["pulseId","value"],"range":{"startPulseId":0,"endPulseId":3},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query +curl -H "Accept: application/json" -H "Content-Type: application/json" -X POST -d '{"nrOfBins":1,"aggregationType":"index","aggregations":["min","max","mean","sum"],"fields":["pulseId","value"],"range":{"startPulseId":0,"endPulseId":3},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query ``` ###### Response @@ -783,7 +783,7 @@ curl -H "Content-Type: application/json" -X POST -d '{"nrOfBins":1,"aggregationT ###### Command ```bash -curl -H "Content-Type: application/json" -X POST -d '{"aggregationType":"extrema","aggregations":["min","max","sum"],"fields":["pulseId","value"],"range":{"startPulseId":0,"endPulseId":3},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query +curl -H "Accept: application/json" -H "Content-Type: application/json" -X POST -d '{"aggregationType":"extrema","aggregations":["min","max","sum"],"fields":["pulseId","value"],"range":{"startPulseId":0,"endPulseId":3},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query ``` ###### Response @@ -828,4 +828,33 @@ curl -H "Content-Type: application/json" -X POST -d '{"aggregationType":"extrema } } ] -``` \ No newline at end of file +``` + +### CSV Export + +Responses can be formatted as csv by requesting `text/csv`. +CSV export does not support `index` and `extrema` aggregations. + +### Example + +```bash +curl -H "Accept: text/csv" -H "Content-Type: application/json" -X POST -d '{"range":{"startPulseId":0,"endPulseId":4},"channels":["channel1","channel2"],"fields":["channel","pulseId","iocMillis","iocNanos","globalMillis","globalNanos","shape","eventCount","value"]}' http://data-api.psi.ch/sf/query +``` + +### Response example + +The response is in CSV. + +```text +channel;pulseId;iocMillis;iocNanos;globalMillis;globalNanos;shape;eventCount;value +testChannel1;0;0;0;0;0;[1];1;0 +testChannel1;1;10;0;10;0;[1];1;1 +testChannel1;2;20;0;20;0;[1];1;2 +testChannel1;3;30;0;30;0;[1];1;3 +testChannel1;4;40;0;40;0;[1];1;4 +testChannel2;0;0;0;0;0;[1];1;0 +testChannel2;1;10;0;10;0;[1];1;1 +testChannel2;2;20;0;20;0;[1];1;2 +testChannel2;3;30;0;30;0;[1];1;3 +testChannel2;4;40;0;40;0;[1];1;4 +``` diff --git a/build.gradle b/build.gradle index 7d933e5..677e97c 100644 --- a/build.gradle +++ b/build.gradle @@ -26,6 +26,8 @@ dependencies { exclude group: 'org.slf4j', module: 'log4j-over-slf4j' } compile libraries.commons_lang + compile libraries.super_csv + compile libraries.super_csv_dozer testCompile libraries.spring_boot_starter_test testCompile libraries.jsonassert diff --git a/src/main/java/ch/psi/daq/queryrest/config/QueryRestConfig.java b/src/main/java/ch/psi/daq/queryrest/config/QueryRestConfig.java index 85b7848..e6cbd43 100644 --- a/src/main/java/ch/psi/daq/queryrest/config/QueryRestConfig.java +++ b/src/main/java/ch/psi/daq/queryrest/config/QueryRestConfig.java @@ -42,9 +42,10 @@ import ch.psi.daq.query.model.QueryField; import ch.psi.daq.queryrest.controller.validator.QueryValidator; import ch.psi.daq.queryrest.filter.CorsFilter; import ch.psi.daq.queryrest.model.PropertyFilterMixin; -import ch.psi.daq.queryrest.response.JsonByteArraySerializer; -import ch.psi.daq.queryrest.response.JsonStreamSerializer; -import ch.psi.daq.queryrest.response.ResponseStreamWriter; +import ch.psi.daq.queryrest.response.csv.CSVResponseStreamWriter; +import ch.psi.daq.queryrest.response.json.JSONResponseStreamWriter; +import ch.psi.daq.queryrest.response.json.JsonByteArraySerializer; +import ch.psi.daq.queryrest.response.json.JsonStreamSerializer; @Configuration @PropertySource(value = {"classpath:queryrest.properties"}) @@ -122,8 +123,13 @@ public class QueryRestConfig extends WebMvcConfigurerAdapter { } @Bean - public ResponseStreamWriter responseStreamWriter() { - return new ResponseStreamWriter(); + public JSONResponseStreamWriter jsonResponseStreamWriter() { + return new JSONResponseStreamWriter(); + } + + @Bean + public CSVResponseStreamWriter csvResponseStreamWriter() { + return new CSVResponseStreamWriter(); } @Bean(name = BEAN_NAME_DEFAULT_RESPONSE_FIELDS) diff --git a/src/main/java/ch/psi/daq/queryrest/controller/QueryRestController.java b/src/main/java/ch/psi/daq/queryrest/controller/QueryRestController.java index e0db109..0e511a2 100644 --- a/src/main/java/ch/psi/daq/queryrest/controller/QueryRestController.java +++ b/src/main/java/ch/psi/daq/queryrest/controller/QueryRestController.java @@ -41,7 +41,8 @@ import ch.psi.daq.query.model.QueryField; import ch.psi.daq.query.model.impl.DAQQuery; import ch.psi.daq.query.processor.QueryProcessor; import ch.psi.daq.query.request.ChannelsRequest; -import ch.psi.daq.queryrest.response.ResponseStreamWriter; +import ch.psi.daq.queryrest.response.csv.CSVResponseStreamWriter; +import ch.psi.daq.queryrest.response.json.JSONResponseStreamWriter; import com.google.common.collect.Lists; @@ -58,7 +59,10 @@ public class QueryRestController { private Validator requestProviderValidator = new RequestProviderValidator(); @Resource - private ResponseStreamWriter responseStreamWriter; + private JSONResponseStreamWriter jsonResponseStreamWriter; + + @Resource + private CSVResponseStreamWriter csvResponseStreamWriter; @Resource private ApplicationContext appContext; @@ -190,9 +194,8 @@ public class QueryRestController { }).collect(Collectors.toList()); } - /** - * Catch-all query method for getting data from the backend. + * Catch-all query method for getting data from the backend for JSON requests. *

* The {@link DAQQuery} object will be a concrete subclass based on the combination of fields * defined in the user's query. The {@link AttributeBasedDeserializer} decides which class to @@ -203,14 +206,56 @@ public class QueryRestController { * @param res the {@link HttpServletResponse} instance associated with this request * @throws IOException thrown if writing to the output stream fails */ - @RequestMapping(value = QUERY, method = RequestMethod.POST, consumes = {MediaType.APPLICATION_JSON_VALUE}) - public void executeQuery(@RequestBody @Valid DAQQuery query, HttpServletResponse res) throws IOException { + @RequestMapping( + value = QUERY, + method = RequestMethod.POST, + consumes = {MediaType.APPLICATION_JSON_VALUE}, + produces = {MediaType.APPLICATION_JSON_VALUE}) + public void executeQueryJson(@RequestBody @Valid DAQQuery query, HttpServletResponse res) throws Exception { try { LOGGER.debug("Executing query '{}'", query.toString()); // write the response back to the client using java 8 streams - responseStreamWriter.respond(executeQuery(query), query, res); - } catch (IOException t) { + jsonResponseStreamWriter.respond(executeQuery(query), query, res); + } catch (Exception t) { + LOGGER.error("Failed to execute query '{}'.", query, t); + throw t; + } + } + + /** + * Catch-all query method for getting data from the backend for JSON requests. + *

+ * The {@link DAQQuery} object will be a concrete subclass based on the combination of fields + * defined in the user's query. The {@link AttributeBasedDeserializer} decides which class to + * deserialize the information into and has been configured (see + * QueryRestConfig#afterPropertiesSet) accordingly. + * + * @param query concrete implementation of {@link DAQQuery} + * @param res the {@link HttpServletResponse} instance associated with this request + * @throws IOException thrown if writing to the output stream fails + */ + @RequestMapping( + value = QUERY, + method = RequestMethod.POST, + consumes = {MediaType.APPLICATION_JSON_VALUE}, + produces = {CSVResponseStreamWriter.APPLICATION_CSV_VALUE}) + public void executeQueryCsv(@RequestBody @Valid DAQQuery query, HttpServletResponse res) throws Exception { + if (!(query.getAggregationType() == null || AggregationType.value.equals(query.getAggregationType()))) { + // We allow only no aggregation or value aggregation as + // extrema: nested structure and not clear how to map it to one line + // index: value is an array of Statistics whose size is not clear at initialization time + String message = "CSV export does not support '" + query.getAggregationType() + "'"; + LOGGER.warn(message); + throw new IllegalArgumentException(message); + } + + try { + LOGGER.debug("Executing query '{}'", query.toString()); + + // write the response back to the client using java 8 streams + csvResponseStreamWriter.respond(executeQuery(query), query, res); + } catch (Exception t) { LOGGER.error("Failed to execute query '{}'.", query, t); throw t; } diff --git a/src/main/java/ch/psi/daq/queryrest/controller/validator/QueryValidator.java b/src/main/java/ch/psi/daq/queryrest/controller/validator/QueryValidator.java index f1c0749..e1c0767 100644 --- a/src/main/java/ch/psi/daq/queryrest/controller/validator/QueryValidator.java +++ b/src/main/java/ch/psi/daq/queryrest/controller/validator/QueryValidator.java @@ -14,7 +14,6 @@ import ch.psi.daq.query.model.DBMode; import ch.psi.daq.query.model.QueryField; import ch.psi.daq.query.model.impl.DAQQuery; import ch.psi.daq.cassandra.request.Request; -import ch.psi.daq.cassandra.request.range.RequestRangeTime; import ch.psi.daq.queryrest.config.QueryRestConfig; public class QueryValidator implements Validator { diff --git a/src/main/java/ch/psi/daq/queryrest/response/ResponseStreamWriter.java b/src/main/java/ch/psi/daq/queryrest/response/ResponseStreamWriter.java index 7dba652..ad881fb 100644 --- a/src/main/java/ch/psi/daq/queryrest/response/ResponseStreamWriter.java +++ b/src/main/java/ch/psi/daq/queryrest/response/ResponseStreamWriter.java @@ -1,46 +1,14 @@ package ch.psi.daq.queryrest.response; import java.io.IOException; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Set; import java.util.Map.Entry; import java.util.stream.Stream; -import javax.annotation.Resource; import javax.servlet.ServletResponse; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.http.MediaType; - -import com.fasterxml.jackson.core.JsonEncoding; -import com.fasterxml.jackson.core.JsonFactory; -import com.fasterxml.jackson.core.JsonGenerator; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.ObjectWriter; -import com.fasterxml.jackson.databind.ser.impl.SimpleBeanPropertyFilter; -import com.fasterxml.jackson.databind.ser.impl.SimpleFilterProvider; - -import ch.psi.daq.query.model.Aggregation; -import ch.psi.daq.query.model.QueryField; import ch.psi.daq.query.model.impl.DAQQuery; -/** - * Takes a Java 8 stream and writes it to the output stream provided by the {@link ServletResponse} - * of the current request. - */ -public class ResponseStreamWriter { - - private static final String DATA_RESP_FIELD = "data"; - - private static final Logger logger = LoggerFactory.getLogger(ResponseStreamWriter.class); - - @Resource - private JsonFactory jsonFactory; - - @Resource - private ObjectMapper mapper; +public interface ResponseStreamWriter { /** * Responding with the the contents of the stream by writing into the output stream of the @@ -51,82 +19,5 @@ public class ResponseStreamWriter { * @param response {@link ServletResponse} instance given by the current HTTP request * @throws IOException thrown if writing to the output stream fails */ - public void respond(Stream> stream, DAQQuery query, - ServletResponse response) throws IOException { - - Set queryFields = query.getFields(); - List aggregations = query.getAggregations(); - - Set includedFields = - new LinkedHashSet(queryFields.size() + (aggregations != null ? aggregations.size() : 0)); - - for (QueryField field : queryFields) { - includedFields.add(field.name()); - } - if (aggregations != null) { - for (Aggregation aggregation : query.getAggregations()) { - includedFields.add(aggregation.name()); - } - } - // do not write channel since it is already provided as key in mapping - includedFields.remove(QueryField.channel.name()); - - ObjectWriter writer = configureWriter(includedFields); - respondInternal(stream, response, writer); - } - - /** - * Configures the writer dynamically by including the fields which should be included in the - * response. - * - * @param includedFields set of strings which correspond to the getter method names of the - * classes registered as a mixed-in - * @return the configured writer that includes the specified fields - */ - private ObjectWriter configureWriter(Set includedFields) { - SimpleFilterProvider propertyFilter = new SimpleFilterProvider(); - propertyFilter.addFilter("namedPropertyFilter", SimpleBeanPropertyFilter.filterOutAllExcept(includedFields)); - // only write the properties not excluded in the filter - ObjectWriter writer = mapper.writer(propertyFilter); - return writer; - } - - /** - * Writes the outer Java stream into the output stream. - * - * @param stream Mapping from channel name to data - * @param response {@link ServletResponse} instance given by the current HTTP request - * @param writer configured writer that includes the fields the end user wants to see - * @throws IOException thrown if writing to the output stream fails - */ - private void respondInternal(Stream> stream, ServletResponse response, - ObjectWriter writer) - throws IOException { - - response.setCharacterEncoding(JsonEncoding.UTF8.getJavaName()); - response.setContentType(MediaType.APPLICATION_JSON_VALUE); - JsonGenerator generator = jsonFactory.createGenerator(response.getOutputStream(), JsonEncoding.UTF8); - generator.writeStartArray(); - stream - /* ensure elements are sequentially written */ - .sequential() - .forEach( - entry -> { - try { - generator.writeStartObject(); - generator.writeStringField(QueryField.channel.name(), entry.getKey()); - - generator.writeFieldName(DATA_RESP_FIELD); - writer.writeValue(generator, entry.getValue()); - - generator.writeEndObject(); - } catch (Exception e) { - logger.error("Could not write channel name of channel '{}'", entry.getKey(), e); - } - }); - generator.writeEndArray(); - generator.flush(); - generator.close(); - } - + public void respond(Stream> stream, DAQQuery query, ServletResponse response) throws Exception; } diff --git a/src/main/java/ch/psi/daq/queryrest/response/csv/ArrayProcessor.java b/src/main/java/ch/psi/daq/queryrest/response/csv/ArrayProcessor.java new file mode 100644 index 0000000..17aaaac --- /dev/null +++ b/src/main/java/ch/psi/daq/queryrest/response/csv/ArrayProcessor.java @@ -0,0 +1,58 @@ +package ch.psi.daq.queryrest.response.csv; + +import java.lang.reflect.Array; + +import org.supercsv.cellprocessor.CellProcessorAdaptor; +import org.supercsv.cellprocessor.ift.CellProcessor; +import org.supercsv.util.CsvContext; + +public class ArrayProcessor extends CellProcessorAdaptor { + public static final char DEFAULT_SEPARATOR = ','; + public static final String OPEN_BRACKET = "["; + public static final String CLOSE_BRACKET = "]"; + + private char separator = DEFAULT_SEPARATOR; + + public ArrayProcessor() { + super(); + } + + public ArrayProcessor(char separator) { + super(); + this.separator = separator; + } + + public ArrayProcessor(CellProcessor next) { + super(next); + } + + public ArrayProcessor(CellProcessor next, char separator) { + super(next); + this.separator = separator; + } + + @SuppressWarnings("unchecked") + @Override + public Object execute(Object value, CsvContext context) { + if (value.getClass().isArray()) { + StringBuilder buf = new StringBuilder(); + + int length = Array.getLength(value); + buf.append(OPEN_BRACKET); + for (int i = 0; i < length;) { + Object val = next.execute(Array.get(value, i), context); + buf.append(val); + + ++i; + if (i < length) { + buf.append(separator); + } + } + buf.append(CLOSE_BRACKET); + + return buf.toString(); + } else { + return next.execute(value, context); + } + } +} diff --git a/src/main/java/ch/psi/daq/queryrest/response/csv/CSVResponseStreamWriter.java b/src/main/java/ch/psi/daq/queryrest/response/csv/CSVResponseStreamWriter.java new file mode 100644 index 0000000..108190a --- /dev/null +++ b/src/main/java/ch/psi/daq/queryrest/response/csv/CSVResponseStreamWriter.java @@ -0,0 +1,140 @@ +package ch.psi.daq.queryrest.response.csv; + +import java.util.ArrayList; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; +import java.util.stream.Stream; + +import javax.annotation.Resource; +import javax.servlet.ServletResponse; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.supercsv.cellprocessor.constraint.NotNull; +import org.supercsv.cellprocessor.ift.CellProcessor; +import org.supercsv.io.dozer.CsvDozerBeanWriter; +import org.supercsv.io.dozer.ICsvDozerBeanWriter; +import org.supercsv.prefs.CsvPreference; + +import com.fasterxml.jackson.core.JsonEncoding; + +import ch.psi.daq.domain.DataEvent; +import ch.psi.daq.query.analyzer.QueryAnalyzer; +import ch.psi.daq.query.model.Aggregation; +import ch.psi.daq.query.model.Query; +import ch.psi.daq.query.model.QueryField; +import ch.psi.daq.query.model.impl.DAQQuery; +import ch.psi.daq.queryrest.response.ResponseStreamWriter; + +/** + * Takes a Java 8 stream and writes it to the output stream provided by the {@link ServletResponse} + * of the current request. + */ +public class CSVResponseStreamWriter implements ResponseStreamWriter { + public static final String APPLICATION_CSV_VALUE = "text/csv"; + + private static final char DELIMITER_CVS = ';'; + private static final char DELIMITER_ARRAY = ','; + + private static final Logger LOGGER = LoggerFactory.getLogger(CSVResponseStreamWriter.class); + + @Resource + private Function queryAnalizerFactory; + + @Override + public void respond(Stream> stream, DAQQuery query, ServletResponse response) throws Exception { + response.setCharacterEncoding(JsonEncoding.UTF8.getJavaName()); + response.setContentType(APPLICATION_CSV_VALUE); + + respondInternal(stream, query, response); + } + + private void respondInternal(Stream> stream, DAQQuery query, ServletResponse response) + throws Exception { + Set queryFields = query.getFields(); + List aggregations = query.getAggregations(); + + Set fieldMapping = + new LinkedHashSet<>(queryFields.size() + (aggregations != null ? aggregations.size() : 0)); + List header = + new ArrayList<>(queryFields.size() + (aggregations != null ? aggregations.size() : 0)); + List processorSet = + new ArrayList<>(queryFields.size() + (aggregations != null ? aggregations.size() : 0)); + boolean isNewField; + QueryAnalyzer queryAnalyzer = queryAnalizerFactory.apply(query); + + for (QueryField field : queryFields) { + if(!(QueryField.value.equals(field) && queryAnalyzer.isAggregationEnabled())){ + isNewField = fieldMapping.add(field.name()); + + if (isNewField) { + header.add(field.name()); + processorSet.add(new ArrayProcessor(new NotNull(), DELIMITER_ARRAY)); + } + } + } + if (aggregations != null && queryAnalyzer.isAggregationEnabled()) { + for (Aggregation aggregation : query.getAggregations()) { + isNewField = fieldMapping.add("value." + aggregation.name()); + + if (isNewField) { + header.add(aggregation.name()); + processorSet.add(new ArrayProcessor(new NotNull(), DELIMITER_ARRAY)); + } + } + } + + CellProcessor[] processors = processorSet.toArray(new CellProcessor[processorSet.size()]); + CsvPreference preference = new CsvPreference.Builder( + CsvPreference.EXCEL_NORTH_EUROPE_PREFERENCE.getQuoteChar(), + DELIMITER_CVS, + CsvPreference.EXCEL_NORTH_EUROPE_PREFERENCE.getEndOfLineSymbols()).build(); + ICsvDozerBeanWriter beanWriter = new CsvDozerBeanWriter(response.getWriter(), preference); + + AtomicReference exception = new AtomicReference<>(); + try { + // configure the mapping from the fields to the CSV columns + beanWriter.configureBeanMapping(DataEvent.class, fieldMapping.toArray(new String[fieldMapping.size()])); + + beanWriter.writeHeader(header.toArray(new String[header.size()])); + + stream + /* ensure elements are sequentially written */ + .sequential() + .forEach( + entry -> { + if (entry.getValue() instanceof Stream) { + + @SuppressWarnings("unchecked") + Stream eventStream = (Stream) entry.getValue(); + eventStream + .forEach( + event -> { + try { + beanWriter.write(event, processors); + } catch (Exception e) { + LOGGER.error("Could not write elements for '{}-{}'", event.getChannel(), + event.getPulseId(), e); + exception.compareAndSet(null, e); + } + }); + } else { + String message = "Type '" + entry.getValue().getClass() + "' not supported."; + LOGGER.error(message); + exception.compareAndSet(null, new RuntimeException(message)); + } + }); + } finally { + beanWriter.close(); + } + + if (exception.get() != null) { + throw exception.get(); + } + } + +} diff --git a/src/main/java/ch/psi/daq/queryrest/response/json/JSONResponseStreamWriter.java b/src/main/java/ch/psi/daq/queryrest/response/json/JSONResponseStreamWriter.java new file mode 100644 index 0000000..47e8a7a --- /dev/null +++ b/src/main/java/ch/psi/daq/queryrest/response/json/JSONResponseStreamWriter.java @@ -0,0 +1,140 @@ +package ch.psi.daq.queryrest.response.json; + +import java.io.IOException; +import java.util.Map.Entry; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Stream; + +import javax.annotation.Resource; +import javax.servlet.ServletResponse; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.http.MediaType; + +import com.fasterxml.jackson.core.JsonEncoding; +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.ObjectWriter; +import com.fasterxml.jackson.databind.ser.impl.SimpleBeanPropertyFilter; +import com.fasterxml.jackson.databind.ser.impl.SimpleFilterProvider; + +import ch.psi.daq.query.model.Aggregation; +import ch.psi.daq.query.model.QueryField; +import ch.psi.daq.query.model.impl.DAQQuery; +import ch.psi.daq.queryrest.response.ResponseStreamWriter; + +/** + * Takes a Java 8 stream and writes it to the output stream provided by the {@link ServletResponse} + * of the current request. + */ +public class JSONResponseStreamWriter implements ResponseStreamWriter { + + private static final String DATA_RESP_FIELD = "data"; + + private static final Logger logger = LoggerFactory.getLogger(JSONResponseStreamWriter.class); + + @Resource + private JsonFactory jsonFactory; + + @Resource + private ObjectMapper mapper; + + @Override + public void respond(Stream> stream, DAQQuery query, ServletResponse response) throws Exception { + response.setCharacterEncoding(JsonEncoding.UTF8.getJavaName()); + response.setContentType(MediaType.APPLICATION_JSON_VALUE); + + Set includedFields = getFields(query); + + ObjectWriter writer = configureWriter(includedFields); + respondInternal(stream, response, writer); + } + + protected Set getFields(DAQQuery query) { + Set queryFields = query.getFields(); + List aggregations = query.getAggregations(); + + Set includedFields = + new LinkedHashSet(queryFields.size() + (aggregations != null ? aggregations.size() : 0)); + + for (QueryField field : queryFields) { + includedFields.add(field.name()); + } + if (aggregations != null) { + for (Aggregation aggregation : query.getAggregations()) { + includedFields.add(aggregation.name()); + } + } + + // do not write channel since it is already provided as key in mapping + includedFields.remove(QueryField.channel.name()); + + return includedFields; + } + + /** + * Configures the writer dynamically by including the fields which should be included in the + * response. + * + * @param includedFields set of strings which correspond to the getter method names of the + * classes registered as a mixed-in + * @return the configured writer that includes the specified fields + */ + private ObjectWriter configureWriter(Set includedFields) { + SimpleFilterProvider propertyFilter = new SimpleFilterProvider(); + propertyFilter.addFilter("namedPropertyFilter", SimpleBeanPropertyFilter.filterOutAllExcept(includedFields)); + // only write the properties not excluded in the filter + ObjectWriter writer = mapper.writer(propertyFilter); + return writer; + } + + /** + * Writes the outer Java stream into the output stream. + * + * @param stream Mapping from channel name to data + * @param response {@link ServletResponse} instance given by the current HTTP request + * @param writer configured writer that includes the fields the end user wants to see + * @throws IOException thrown if writing to the output stream fails + */ + private void respondInternal(Stream> stream, ServletResponse response, ObjectWriter writer) + throws Exception { + JsonGenerator generator = jsonFactory.createGenerator(response.getOutputStream(), JsonEncoding.UTF8); + + AtomicReference exception = new AtomicReference<>(); + try { + generator.writeStartArray(); + stream + /* ensure elements are sequentially written */ + .sequential() + .forEach( + entry -> { + try { + generator.writeStartObject(); + generator.writeStringField(QueryField.channel.name(), entry.getKey()); + + generator.writeFieldName(DATA_RESP_FIELD); + writer.writeValue(generator, entry.getValue()); + + generator.writeEndObject(); + } catch (Exception e) { + logger.error("Could not write channel name of channel '{}'", entry.getKey(), e); + exception.compareAndSet(null, e); + } + }); + generator.writeEndArray(); + } finally { + generator.flush(); + generator.close(); + } + + if (exception.get() != null) { + throw exception.get(); + } + } + +} diff --git a/src/main/java/ch/psi/daq/queryrest/response/JsonByteArraySerializer.java b/src/main/java/ch/psi/daq/queryrest/response/json/JsonByteArraySerializer.java similarity index 95% rename from src/main/java/ch/psi/daq/queryrest/response/JsonByteArraySerializer.java rename to src/main/java/ch/psi/daq/queryrest/response/json/JsonByteArraySerializer.java index e01cea1..dee1d8d 100644 --- a/src/main/java/ch/psi/daq/queryrest/response/JsonByteArraySerializer.java +++ b/src/main/java/ch/psi/daq/queryrest/response/json/JsonByteArraySerializer.java @@ -1,4 +1,4 @@ -package ch.psi.daq.queryrest.response; +package ch.psi.daq.queryrest.response.json; import java.io.IOException; diff --git a/src/main/java/ch/psi/daq/queryrest/response/JsonStreamSerializer.java b/src/main/java/ch/psi/daq/queryrest/response/json/JsonStreamSerializer.java similarity index 94% rename from src/main/java/ch/psi/daq/queryrest/response/JsonStreamSerializer.java rename to src/main/java/ch/psi/daq/queryrest/response/json/JsonStreamSerializer.java index 6333d69..61fede0 100644 --- a/src/main/java/ch/psi/daq/queryrest/response/JsonStreamSerializer.java +++ b/src/main/java/ch/psi/daq/queryrest/response/json/JsonStreamSerializer.java @@ -1,4 +1,4 @@ -package ch.psi.daq.queryrest.response; +package ch.psi.daq.queryrest.response.json; import java.io.IOException; import java.util.Iterator; diff --git a/src/main/resources/queryrest.properties b/src/main/resources/queryrest.properties index bf99eef..a21e89a 100644 --- a/src/main/resources/queryrest.properties +++ b/src/main/resources/queryrest.properties @@ -3,7 +3,7 @@ queryrest.default.response.fields=channel,pulseId,globalMillis,globalNanos,iocMillis,iocNanos,shape,eventCount,value # aggregation which are included in the response by default if aggregation is enabled for a given query -queryrest.default.response.aggregations=min,max,sum +queryrest.default.response.aggregations=min,mean,max # enables / disables the CORS servlet filter. Adds multiple CORS headers to the response queryrest.cors.enable=true diff --git a/src/test/java/ch/psi/daq/test/queryrest/controller/QueryRestControllerCsvTest.java b/src/test/java/ch/psi/daq/test/queryrest/controller/QueryRestControllerCsvTest.java new file mode 100644 index 0000000..03ea21c --- /dev/null +++ b/src/test/java/ch/psi/daq/test/queryrest/controller/QueryRestControllerCsvTest.java @@ -0,0 +1,590 @@ +package ch.psi.daq.test.queryrest.controller; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.StringReader; +import java.util.ArrayList; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; + +import javax.annotation.Resource; + +import org.junit.After; +import org.junit.Test; +import org.springframework.http.MediaType; +import org.springframework.test.web.servlet.MvcResult; +import org.springframework.test.web.servlet.request.MockMvcRequestBuilders; +import org.springframework.test.web.servlet.result.MockMvcResultHandlers; +import org.springframework.test.web.servlet.result.MockMvcResultMatchers; +import org.supercsv.cellprocessor.constraint.NotNull; +import org.supercsv.cellprocessor.ift.CellProcessor; +import org.supercsv.io.CsvMapReader; +import org.supercsv.io.ICsvMapReader; +import org.supercsv.prefs.CsvPreference; + +import ch.psi.daq.cassandra.request.range.RequestRangeDate; +import ch.psi.daq.cassandra.request.range.RequestRangePulseId; +import ch.psi.daq.cassandra.request.range.RequestRangeTime; +import ch.psi.daq.cassandra.util.test.CassandraDataGen; +import ch.psi.daq.common.ordering.Ordering; +import ch.psi.daq.query.model.Aggregation; +import ch.psi.daq.query.model.AggregationType; +import ch.psi.daq.query.model.QueryField; +import ch.psi.daq.query.model.impl.DAQQuery; +import ch.psi.daq.queryrest.controller.QueryRestController; +import ch.psi.daq.queryrest.filter.CorsFilter; +import ch.psi.daq.queryrest.response.csv.CSVResponseStreamWriter; +import ch.psi.daq.test.cassandra.admin.CassandraTestAdmin; +import ch.psi.daq.test.queryrest.AbstractDaqRestTest; + +/** + * Tests the {@link DaqController} implementation. + */ +public class QueryRestControllerCsvTest extends AbstractDaqRestTest { + + @Resource + private CassandraTestAdmin cassandraTestAdmin; + + @Resource + private CassandraDataGen dataGen; + + @Resource + private CorsFilter corsFilter; + + public static final String TEST_CHANNEL = "testChannel"; + public static final String TEST_CHANNEL_01 = TEST_CHANNEL + "1"; + public static final String TEST_CHANNEL_02 = TEST_CHANNEL + "2"; + public static final String[] TEST_CHANNEL_NAMES = new String[] {TEST_CHANNEL_01, TEST_CHANNEL_02}; + + @After + public void tearDown() throws Exception {} + + @Test + public void testPulseRangeQuery() throws Exception { + DAQQuery request = new DAQQuery( + new RequestRangePulseId( + 0, + 1), + TEST_CHANNEL_NAMES); + LinkedHashSet queryFields = new LinkedHashSet<>(); + LinkedHashSet cellProcessors = new LinkedHashSet<>(); + queryFields.add(QueryField.channel); + cellProcessors.add(new NotNull()); + queryFields.add(QueryField.pulseId); + cellProcessors.add(new NotNull()); + queryFields.add(QueryField.iocMillis); + cellProcessors.add(new NotNull()); + queryFields.add(QueryField.iocNanos); + cellProcessors.add(new NotNull()); + queryFields.add(QueryField.globalMillis); + cellProcessors.add(new NotNull()); + queryFields.add(QueryField.globalNanos); + cellProcessors.add(new NotNull()); + queryFields.add(QueryField.shape); + cellProcessors.add(new NotNull()); + queryFields.add(QueryField.eventCount); + cellProcessors.add(new NotNull()); + queryFields.add(QueryField.value); + cellProcessors.add(new NotNull()); + request.setFields(queryFields); + + String content = mapper.writeValueAsString(request); + System.out.println(content); + + MvcResult result = this.mockMvc + .perform(MockMvcRequestBuilders + .post(QueryRestController.QUERY) + .accept(CSVResponseStreamWriter.APPLICATION_CSV_VALUE) + .contentType(MediaType.APPLICATION_JSON) + .content(content)) + .andDo(MockMvcResultHandlers.print()) + .andExpect(MockMvcResultMatchers.status().isOk()) + .andReturn(); + + CellProcessor[] processors = cellProcessors.toArray(new CellProcessor[cellProcessors.size()]); + + String response = result.getResponse().getContentAsString(); + ICsvMapReader mapReader = + new CsvMapReader(new StringReader(response), CsvPreference.EXCEL_NORTH_EUROPE_PREFERENCE); + try { + final String[] header = mapReader.getHeader(true); + Map customerMap; + long resultsPerChannel = 2; + long pulse = 0; + long channelCount = 1; + while ((customerMap = mapReader.read(header, processors)) != null) { + assertEquals(TEST_CHANNEL + channelCount, customerMap.get(QueryField.channel.name())); + assertEquals("" + pulse, customerMap.get(QueryField.pulseId.name())); + assertEquals("" + pulse * 10, customerMap.get(QueryField.iocMillis.name())); + assertEquals("0", customerMap.get(QueryField.iocNanos.name())); + assertEquals("" + pulse * 10, customerMap.get(QueryField.globalMillis.name())); + assertEquals("0", customerMap.get(QueryField.globalNanos.name())); + assertEquals("[1]", customerMap.get(QueryField.shape.name())); + assertEquals("1", customerMap.get(QueryField.eventCount.name())); + assertEquals("" + pulse, customerMap.get(QueryField.value.name())); + + pulse = ++pulse % resultsPerChannel; + if (pulse == 0) { + ++channelCount; + } + } + } finally { + mapReader.close(); + } + } + + @Test + public void testPulseRangeQueryWaveform() throws Exception { + String channelName = "XYWaveform"; + DAQQuery request = new DAQQuery( + new RequestRangePulseId( + 0, + 1), + channelName); + LinkedHashSet queryFields = new LinkedHashSet<>(); + LinkedHashSet cellProcessors = new LinkedHashSet<>(); + queryFields.add(QueryField.channel); + cellProcessors.add(new NotNull()); + queryFields.add(QueryField.pulseId); + cellProcessors.add(new NotNull()); + queryFields.add(QueryField.iocMillis); + cellProcessors.add(new NotNull()); + queryFields.add(QueryField.iocNanos); + cellProcessors.add(new NotNull()); + queryFields.add(QueryField.globalMillis); + cellProcessors.add(new NotNull()); + queryFields.add(QueryField.globalNanos); + cellProcessors.add(new NotNull()); + queryFields.add(QueryField.shape); + cellProcessors.add(new NotNull()); + queryFields.add(QueryField.eventCount); + cellProcessors.add(new NotNull()); + queryFields.add(QueryField.value); + cellProcessors.add(new NotNull()); + request.setFields(queryFields); + + String content = mapper.writeValueAsString(request); + System.out.println(content); + + MvcResult result = this.mockMvc + .perform(MockMvcRequestBuilders + .post(QueryRestController.QUERY) + .accept(CSVResponseStreamWriter.APPLICATION_CSV_VALUE) + .contentType(MediaType.APPLICATION_JSON) + .content(content)) + .andDo(MockMvcResultHandlers.print()) + .andExpect(MockMvcResultMatchers.status().isOk()) + .andReturn(); + + CellProcessor[] processors = cellProcessors.toArray(new CellProcessor[cellProcessors.size()]); + + String response = result.getResponse().getContentAsString(); + ICsvMapReader mapReader = + new CsvMapReader(new StringReader(response), CsvPreference.EXCEL_NORTH_EUROPE_PREFERENCE); + try { + final String[] header = mapReader.getHeader(true); + Map customerMap; + long pulse = 0; + while ((customerMap = mapReader.read(header, processors)) != null) { + assertEquals(channelName, customerMap.get(QueryField.channel.name())); + assertEquals("" + pulse, customerMap.get(QueryField.pulseId.name())); + assertEquals("" + pulse * 10, customerMap.get(QueryField.iocMillis.name())); + assertEquals("0", customerMap.get(QueryField.iocNanos.name())); + assertEquals("" + pulse * 10, customerMap.get(QueryField.globalMillis.name())); + assertEquals("0", customerMap.get(QueryField.globalNanos.name())); + assertEquals("[2048]", customerMap.get(QueryField.shape.name())); + assertEquals("1", customerMap.get(QueryField.eventCount.name())); + assertTrue(customerMap.get(QueryField.value.name()).toString().startsWith("[")); + assertTrue(customerMap.get(QueryField.value.name()).toString().endsWith("]")); + + ++pulse; + } + } finally { + mapReader.close(); + } + } + + @Test + public void testTimeRangeQuery() throws Exception { + DAQQuery request = new DAQQuery( + new RequestRangeTime( + 0, + 10), + TEST_CHANNEL_NAMES); + LinkedHashSet queryFields = new LinkedHashSet<>(); + LinkedHashSet cellProcessors = new LinkedHashSet<>(); + queryFields.add(QueryField.channel); + cellProcessors.add(new NotNull()); + queryFields.add(QueryField.pulseId); + cellProcessors.add(new NotNull()); + queryFields.add(QueryField.iocMillis); + cellProcessors.add(new NotNull()); + queryFields.add(QueryField.iocNanos); + cellProcessors.add(new NotNull()); + queryFields.add(QueryField.globalMillis); + cellProcessors.add(new NotNull()); + queryFields.add(QueryField.globalNanos); + cellProcessors.add(new NotNull()); + queryFields.add(QueryField.shape); + cellProcessors.add(new NotNull()); + queryFields.add(QueryField.eventCount); + cellProcessors.add(new NotNull()); + queryFields.add(QueryField.value); + cellProcessors.add(new NotNull()); + request.setFields(queryFields); + + String content = mapper.writeValueAsString(request); + System.out.println(content); + + MvcResult result = this.mockMvc + .perform(MockMvcRequestBuilders + .post(QueryRestController.QUERY) + .accept(CSVResponseStreamWriter.APPLICATION_CSV_VALUE) + .contentType(MediaType.APPLICATION_JSON) + .content(content)) + .andDo(MockMvcResultHandlers.print()) + .andExpect(MockMvcResultMatchers.status().isOk()) + .andReturn(); + + CellProcessor[] processors = cellProcessors.toArray(new CellProcessor[cellProcessors.size()]); + + String response = result.getResponse().getContentAsString(); + ICsvMapReader mapReader = + new CsvMapReader(new StringReader(response), CsvPreference.EXCEL_NORTH_EUROPE_PREFERENCE); + try { + final String[] header = mapReader.getHeader(true); + Map customerMap; + long resultsPerChannel = 2; + long pulse = 0; + long channelCount = 1; + while ((customerMap = mapReader.read(header, processors)) != null) { + assertEquals(TEST_CHANNEL + channelCount, customerMap.get(QueryField.channel.name())); + assertEquals("" + pulse, customerMap.get(QueryField.pulseId.name())); + assertEquals("" + pulse * 10, customerMap.get(QueryField.iocMillis.name())); + assertEquals("0", customerMap.get(QueryField.iocNanos.name())); + assertEquals("" + pulse * 10, customerMap.get(QueryField.globalMillis.name())); + assertEquals("0", customerMap.get(QueryField.globalNanos.name())); + assertEquals("[1]", customerMap.get(QueryField.shape.name())); + assertEquals("1", customerMap.get(QueryField.eventCount.name())); + assertEquals("" + pulse, customerMap.get(QueryField.value.name())); + + pulse = ++pulse % resultsPerChannel; + if (pulse == 0) { + ++channelCount; + } + } + } finally { + mapReader.close(); + } + } + + @Test + public void testDateRangeQuery() throws Exception { + String startDate = RequestRangeDate.format(0); + String endDate = RequestRangeDate.format(10); + DAQQuery request = new DAQQuery( + new RequestRangeDate( + startDate, + endDate), + TEST_CHANNEL_NAMES); + LinkedHashSet queryFields = new LinkedHashSet<>(); + LinkedHashSet cellProcessors = new LinkedHashSet<>(); + queryFields.add(QueryField.channel); + cellProcessors.add(new NotNull()); + queryFields.add(QueryField.pulseId); + cellProcessors.add(new NotNull()); + queryFields.add(QueryField.iocMillis); + cellProcessors.add(new NotNull()); + queryFields.add(QueryField.iocNanos); + cellProcessors.add(new NotNull()); + queryFields.add(QueryField.globalMillis); + cellProcessors.add(new NotNull()); + queryFields.add(QueryField.globalNanos); + cellProcessors.add(new NotNull()); + queryFields.add(QueryField.shape); + cellProcessors.add(new NotNull()); + queryFields.add(QueryField.eventCount); + cellProcessors.add(new NotNull()); + queryFields.add(QueryField.value); + cellProcessors.add(new NotNull()); + request.setFields(queryFields); + + String content = mapper.writeValueAsString(request); + System.out.println(content); + + MvcResult result = this.mockMvc + .perform(MockMvcRequestBuilders + .post(QueryRestController.QUERY) + .accept(CSVResponseStreamWriter.APPLICATION_CSV_VALUE) + .contentType(MediaType.APPLICATION_JSON) + .content(content)) + .andDo(MockMvcResultHandlers.print()) + .andExpect(MockMvcResultMatchers.status().isOk()) + .andReturn(); + + CellProcessor[] processors = cellProcessors.toArray(new CellProcessor[cellProcessors.size()]); + + String response = result.getResponse().getContentAsString(); + ICsvMapReader mapReader = + new CsvMapReader(new StringReader(response), CsvPreference.EXCEL_NORTH_EUROPE_PREFERENCE); + try { + final String[] header = mapReader.getHeader(true); + Map customerMap; + long resultsPerChannel = 2; + long pulse = 0; + long channelCount = 1; + while ((customerMap = mapReader.read(header, processors)) != null) { + assertEquals(TEST_CHANNEL + channelCount, customerMap.get(QueryField.channel.name())); + assertEquals("" + pulse, customerMap.get(QueryField.pulseId.name())); + assertEquals("" + pulse * 10, customerMap.get(QueryField.iocMillis.name())); + assertEquals("0", customerMap.get(QueryField.iocNanos.name())); + assertEquals("" + pulse * 10, customerMap.get(QueryField.globalMillis.name())); + assertEquals("0", customerMap.get(QueryField.globalNanos.name())); + assertEquals("[1]", customerMap.get(QueryField.shape.name())); + assertEquals("1", customerMap.get(QueryField.eventCount.name())); + assertEquals("" + pulse, customerMap.get(QueryField.value.name())); + + pulse = ++pulse % resultsPerChannel; + if (pulse == 0) { + ++channelCount; + } + } + } finally { + mapReader.close(); + } + } + + @Test + public void testExtremaAggregation() throws Exception { + DAQQuery request = new DAQQuery( + new RequestRangePulseId( + 0, + 1), + false, + Ordering.asc, + AggregationType.extrema, + TEST_CHANNEL_NAMES[0]); + + String content = mapper.writeValueAsString(request); + + try { + this.mockMvc + .perform(MockMvcRequestBuilders + .post(QueryRestController.QUERY) + .accept(CSVResponseStreamWriter.APPLICATION_CSV_VALUE) + .contentType(MediaType.APPLICATION_JSON) + .content(content)) + .andDo(MockMvcResultHandlers.print()); + // .andExpect(MockMvcResultMatchers.model().attribute( + // "exception", + // Matchers.isA(IllegalArgumentException.class))); + // should throw an exception + assertTrue(false); + } catch (Throwable e) { + assertTrue(true); + } + } + + @Test + public void testIndexAggregation() throws Exception { + DAQQuery request = new DAQQuery( + new RequestRangePulseId( + 0, + 1), + false, + Ordering.asc, + AggregationType.index, + TEST_CHANNEL_NAMES[0]); + + String content = mapper.writeValueAsString(request); + + try { + this.mockMvc + .perform(MockMvcRequestBuilders + .post(QueryRestController.QUERY) + .accept(CSVResponseStreamWriter.APPLICATION_CSV_VALUE) + .contentType(MediaType.APPLICATION_JSON) + .content(content)) + .andDo(MockMvcResultHandlers.print()); + // .andExpect(MockMvcResultMatchers.model().attribute( + // "exception", + // Matchers.isA(IllegalArgumentException.class))); + // should throw an exception + assertTrue(false); + } catch (Throwable e) { + assertTrue(true); + } + } + + @Test + public void testDateRangeQueryNrOfBinsAggregate() throws Exception { + long startTime = 0; + long endTime = 99; + String startDate = RequestRangeDate.format(startTime); + String endDate = RequestRangeDate.format(endTime); + DAQQuery request = new DAQQuery( + new RequestRangeDate( + startDate, + endDate), + TEST_CHANNEL_01); + request.setNrOfBins(2); + + LinkedHashSet queryFields = new LinkedHashSet<>(); + LinkedHashSet cellProcessors = new LinkedHashSet<>(); + queryFields.add(QueryField.channel); + cellProcessors.add(new NotNull()); + queryFields.add(QueryField.pulseId); + cellProcessors.add(new NotNull()); + queryFields.add(QueryField.iocMillis); + cellProcessors.add(new NotNull()); + queryFields.add(QueryField.iocNanos); + cellProcessors.add(new NotNull()); + queryFields.add(QueryField.globalMillis); + cellProcessors.add(new NotNull()); + queryFields.add(QueryField.globalNanos); + cellProcessors.add(new NotNull()); + queryFields.add(QueryField.shape); + cellProcessors.add(new NotNull()); + queryFields.add(QueryField.eventCount); + cellProcessors.add(new NotNull()); + request.setFields(queryFields); + + List aggregations = new ArrayList<>(); + aggregations.add(Aggregation.min); + cellProcessors.add(new NotNull()); + aggregations.add(Aggregation.mean); + cellProcessors.add(new NotNull()); + aggregations.add(Aggregation.max); + cellProcessors.add(new NotNull()); + request.setAggregations(aggregations); + + + String content = mapper.writeValueAsString(request); + System.out.println(content); + + MvcResult result = this.mockMvc + .perform(MockMvcRequestBuilders + .post(QueryRestController.QUERY) + .accept(CSVResponseStreamWriter.APPLICATION_CSV_VALUE) + .contentType(MediaType.APPLICATION_JSON) + .content(content)) + .andDo(MockMvcResultHandlers.print()) + .andExpect(MockMvcResultMatchers.status().isOk()) + .andReturn(); + + CellProcessor[] processors = cellProcessors.toArray(new CellProcessor[cellProcessors.size()]); + + String response = result.getResponse().getContentAsString(); + ICsvMapReader mapReader = + new CsvMapReader(new StringReader(response), CsvPreference.EXCEL_NORTH_EUROPE_PREFERENCE); + try { + final String[] header = mapReader.getHeader(true); + Map customerMap; + long pulse = 0; + while ((customerMap = mapReader.read(header, processors)) != null) { + assertEquals(TEST_CHANNEL_01, customerMap.get(QueryField.channel.name())); + assertEquals("" + pulse, customerMap.get(QueryField.pulseId.name())); + assertEquals("" + pulse * 10, customerMap.get(QueryField.iocMillis.name())); + assertEquals("0", customerMap.get(QueryField.iocNanos.name())); + assertEquals("" + pulse * 10, customerMap.get(QueryField.globalMillis.name())); + assertEquals("0", customerMap.get(QueryField.globalNanos.name())); + assertEquals("[1]", customerMap.get(QueryField.shape.name())); + assertEquals("5", customerMap.get(QueryField.eventCount.name())); + assertEquals("" + pulse + ".0", customerMap.get(Aggregation.min.name())); + assertEquals("" + (pulse + 2) + ".0", customerMap.get(Aggregation.mean.name())); + assertEquals("" + (pulse + 4) + ".0", customerMap.get(Aggregation.max.name())); + + pulse += 5; + } + } finally { + mapReader.close(); + } + } + + @Test + public void testDateRangeQueryBinSizeAggregate() throws Exception { + long startTime = 0; + long endTime = 999; + String startDate = RequestRangeDate.format(startTime); + String endDate = RequestRangeDate.format(endTime); + DAQQuery request = new DAQQuery( + new RequestRangeDate( + startDate, + endDate), + TEST_CHANNEL_01); + request.setBinSize(100); + + LinkedHashSet queryFields = new LinkedHashSet<>(); + LinkedHashSet cellProcessors = new LinkedHashSet<>(); + queryFields.add(QueryField.channel); + cellProcessors.add(new NotNull()); + queryFields.add(QueryField.pulseId); + cellProcessors.add(new NotNull()); + queryFields.add(QueryField.iocMillis); + cellProcessors.add(new NotNull()); + queryFields.add(QueryField.iocNanos); + cellProcessors.add(new NotNull()); + queryFields.add(QueryField.globalMillis); + cellProcessors.add(new NotNull()); + queryFields.add(QueryField.globalNanos); + cellProcessors.add(new NotNull()); + queryFields.add(QueryField.shape); + cellProcessors.add(new NotNull()); + queryFields.add(QueryField.eventCount); + cellProcessors.add(new NotNull()); + request.setFields(queryFields); + + List aggregations = new ArrayList<>(); + aggregations.add(Aggregation.min); + cellProcessors.add(new NotNull()); + aggregations.add(Aggregation.mean); + cellProcessors.add(new NotNull()); + aggregations.add(Aggregation.max); + cellProcessors.add(new NotNull()); + request.setAggregations(aggregations); + + + String content = mapper.writeValueAsString(request); + System.out.println(content); + + MvcResult result = this.mockMvc + .perform(MockMvcRequestBuilders + .post(QueryRestController.QUERY) + .accept(CSVResponseStreamWriter.APPLICATION_CSV_VALUE) + .contentType(MediaType.APPLICATION_JSON) + .content(content)) + .andDo(MockMvcResultHandlers.print()) + .andExpect(MockMvcResultMatchers.status().isOk()) + .andReturn(); + + CellProcessor[] processors = cellProcessors.toArray(new CellProcessor[cellProcessors.size()]); + + String response = result.getResponse().getContentAsString(); + ICsvMapReader mapReader = + new CsvMapReader(new StringReader(response), CsvPreference.EXCEL_NORTH_EUROPE_PREFERENCE); + try { + final String[] header = mapReader.getHeader(true); + Map customerMap; + long pulse = 0; + while ((customerMap = mapReader.read(header, processors)) != null) { + assertEquals(TEST_CHANNEL_01, customerMap.get(QueryField.channel.name())); + assertEquals("" + pulse, customerMap.get(QueryField.pulseId.name())); + assertEquals("" + pulse * 10, customerMap.get(QueryField.iocMillis.name())); + assertEquals("0", customerMap.get(QueryField.iocNanos.name())); + assertEquals("" + pulse * 10, customerMap.get(QueryField.globalMillis.name())); + assertEquals("0", customerMap.get(QueryField.globalNanos.name())); + assertEquals("[1]", customerMap.get(QueryField.shape.name())); + assertEquals("10", customerMap.get(QueryField.eventCount.name())); + assertEquals("" + pulse + ".0", customerMap.get(Aggregation.min.name())); + assertEquals("" + (pulse + 4.5), customerMap.get(Aggregation.mean.name())); + assertEquals("" + (pulse + 9) + ".0", customerMap.get(Aggregation.max.name())); + + pulse += 10; + } + } finally { + mapReader.close(); + } + } +} diff --git a/src/test/java/ch/psi/daq/test/queryrest/controller/QueryRestControllerTest.java b/src/test/java/ch/psi/daq/test/queryrest/controller/QueryRestControllerJsonTest.java similarity index 98% rename from src/test/java/ch/psi/daq/test/queryrest/controller/QueryRestControllerTest.java rename to src/test/java/ch/psi/daq/test/queryrest/controller/QueryRestControllerJsonTest.java index fe52cfa..bd36005 100644 --- a/src/test/java/ch/psi/daq/test/queryrest/controller/QueryRestControllerTest.java +++ b/src/test/java/ch/psi/daq/test/queryrest/controller/QueryRestControllerJsonTest.java @@ -25,7 +25,7 @@ import ch.psi.daq.test.queryrest.AbstractDaqRestTest; /** * Tests the {@link DaqController} implementation. */ -public class QueryRestControllerTest extends AbstractDaqRestTest { +public class QueryRestControllerJsonTest extends AbstractDaqRestTest { @Resource private CassandraTestAdmin cassandraTestAdmin; @@ -180,6 +180,7 @@ public class QueryRestControllerTest extends AbstractDaqRestTest { this.mockMvc .perform(MockMvcRequestBuilders .post(QueryRestController.QUERY) + .accept(MediaType.APPLICATION_JSON) .contentType(MediaType.APPLICATION_JSON) .content(content)) @@ -213,6 +214,7 @@ public class QueryRestControllerTest extends AbstractDaqRestTest { this.mockMvc.perform(MockMvcRequestBuilders .post(QueryRestController.QUERY) + .accept(MediaType.APPLICATION_JSON) .contentType(MediaType.APPLICATION_JSON) .content(content)) @@ -252,6 +254,7 @@ public class QueryRestControllerTest extends AbstractDaqRestTest { .perform( MockMvcRequestBuilders .post(QueryRestController.QUERY) + .accept(MediaType.APPLICATION_JSON) .contentType(MediaType.APPLICATION_JSON) .content(content) ) @@ -289,6 +292,7 @@ public class QueryRestControllerTest extends AbstractDaqRestTest { this.mockMvc .perform(MockMvcRequestBuilders.post(QueryRestController.QUERY) + .accept(MediaType.APPLICATION_JSON) .contentType(MediaType.APPLICATION_JSON) .content(content)) @@ -335,6 +339,7 @@ public class QueryRestControllerTest extends AbstractDaqRestTest { .perform( MockMvcRequestBuilders .post(QueryRestController.QUERY) + .accept(MediaType.APPLICATION_JSON) .contentType(MediaType.APPLICATION_JSON) .content(content) ) @@ -372,6 +377,7 @@ public class QueryRestControllerTest extends AbstractDaqRestTest { .perform( MockMvcRequestBuilders .post(QueryRestController.QUERY) + .accept(MediaType.APPLICATION_JSON) .contentType(MediaType.APPLICATION_JSON) .content(content) ) diff --git a/src/test/java/ch/psi/daq/test/queryrest/query/DummyCassandraReader.java b/src/test/java/ch/psi/daq/test/queryrest/query/DummyCassandraReader.java index eab0b7a..7496ac7 100644 --- a/src/test/java/ch/psi/daq/test/queryrest/query/DummyCassandraReader.java +++ b/src/test/java/ch/psi/daq/test/queryrest/query/DummyCassandraReader.java @@ -4,11 +4,15 @@ package ch.psi.daq.test.queryrest.query; import java.util.List; +import java.util.Random; import java.util.concurrent.CompletableFuture; import java.util.regex.Pattern; import java.util.stream.Collectors; +import java.util.stream.LongStream; import java.util.stream.Stream; +import com.google.common.collect.Lists; + import ch.psi.daq.cassandra.reader.CassandraReader; import ch.psi.daq.cassandra.reader.query.PulseIdRangeQuery; import ch.psi.daq.cassandra.reader.query.TimeRangeQuery; @@ -23,8 +27,6 @@ import ch.psi.daq.domain.cassandra.querying.ChannelEventQueryInfo; import ch.psi.daq.domain.cassandra.querying.EventQuery; import ch.psi.data.converters.ConverterProvider; -import com.google.common.collect.Lists; - /** * @author zellweger_c * @@ -34,7 +36,7 @@ public class DummyCassandraReader implements CassandraReader { private static final int KEYSPACE = 1; private CassandraDataGen dataGen; private String[] channels; - + private Random random = new Random(0); /** * @@ -145,27 +147,56 @@ public class DummyCassandraReader implements CassandraReader { private Stream getDummyEventStream(String channel, long startIndex, long endIndex) { + String channelLower = channel.toLowerCase(); - return dataGen.generateData(startIndex, (endIndex - startIndex + 1), - i -> i * 10, - i -> 0, - i -> i, - i -> i * 10, - i -> 0, - i -> Long.valueOf(i), - channel).stream(); + Stream eventStream = LongStream.rangeClosed(startIndex, endIndex).mapToObj(i -> { + if (channelLower.contains("waveform")) { + long[] value = random.longs(2048).toArray(); + value[0] = i; + return new ChannelEvent( + channel, + i * 10, + 0, + i, + i * 10, + 0, + value + ); + + } else if (channelLower.contains("image")) { + int x = 640; + int y = 480; + int[] shape = new int[] {x, y}; + long[] value = random.longs(x * y).toArray(); + value[0] = i; + return new ChannelEvent( + channel, + i * 10, + 0, + i, + i * 10, + 0, + value, + shape + ); + } else { + return new ChannelEvent( + channel, + i * 10, + 0, + i, + i * 10, + 0, + i + ); + } + }); + + return eventStream; } private List getDummyEvents(String channel, long startIndex, long endIndex) { - - return dataGen.generateData(startIndex, (endIndex - startIndex + 1), - i -> i * 10, - i -> 0, - i -> i, - i -> i * 10, - i -> 0, - i -> Long.valueOf(i), - channel); + return getDummyEventStream(channel, startIndex, endIndex).collect(Collectors.toList()); } /**