ATEST-299
This commit is contained in:
@ -1,5 +1,5 @@
|
|||||||
#
|
#
|
||||||
#Mon Dec 07 08:06:14 CET 2015
|
#Mon Dec 14 16:07:41 CET 2015
|
||||||
org.eclipse.jdt.core.compiler.debug.localVariable=generate
|
org.eclipse.jdt.core.compiler.debug.localVariable=generate
|
||||||
org.eclipse.jdt.core.compiler.compliance=1.8
|
org.eclipse.jdt.core.compiler.compliance=1.8
|
||||||
org.eclipse.jdt.core.compiler.codegen.unusedLocal=preserve
|
org.eclipse.jdt.core.compiler.codegen.unusedLocal=preserve
|
||||||
|
55
Readme.md
55
Readme.md
@ -71,7 +71,7 @@ POST http://<host>:<port>/channels
|
|||||||
### Example
|
### Example
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
curl -H "Content-Type: application/json" -X POST -d '{"regex": "TRFCA|TRFCB"}' http://data-api.psi.ch/sf/channels
|
curl -H "Accept: application/json" -H "Content-Type: application/json" -X POST -d '{"regex": "TRFCA|TRFCB"}' http://data-api.psi.ch/sf/channels
|
||||||
```
|
```
|
||||||
|
|
||||||
<a name="query_range"/>
|
<a name="query_range"/>
|
||||||
@ -158,7 +158,7 @@ There exist following fields:
|
|||||||
### Example
|
### Example
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
curl -H "Content-Type: application/json" -X POST -d '{"range":{"startPulseId":0,"endPulseId":4},"channels":["channel1","channel2"]}' http://data-api.psi.ch/sf/query
|
curl -H "Accept: application/json" -H "Content-Type: application/json" -X POST -d '{"range":{"startPulseId":0,"endPulseId":4},"channels":["channel1","channel2"]}' http://data-api.psi.ch/sf/query
|
||||||
```
|
```
|
||||||
|
|
||||||
### Response example
|
### Response example
|
||||||
@ -299,7 +299,7 @@ Following examples build on a waveform data (see below). They also work for scal
|
|||||||
###### Command
|
###### Command
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
curl -H "Content-Type: application/json" -X POST -d '{"range":{"startPulseId":0,"endPulseId":3},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query
|
curl -H "Accept: application/json" -H "Content-Type: application/json" -X POST -d '{"range":{"startPulseId":0,"endPulseId":3},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query
|
||||||
```
|
```
|
||||||
|
|
||||||
###### Response
|
###### Response
|
||||||
@ -327,7 +327,7 @@ See JSON representation of the data above.
|
|||||||
###### Command
|
###### Command
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
curl -H "Content-Type: application/json" -X POST -d '{"range":{"startMillis":0,"startNanos":0,"endMillis":30,"endNanos":999999},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query
|
curl -H "Accept: application/json" -H "Content-Type: application/json" -X POST -d '{"range":{"startMillis":0,"startNanos":0,"endMillis":30,"endNanos":999999},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query
|
||||||
```
|
```
|
||||||
|
|
||||||
###### Response
|
###### Response
|
||||||
@ -357,7 +357,7 @@ Supported format is ISO8601 *YYYY-MM-DDThh:mm:ss.sTZD* (e.g. *1997-07-16T19:20:3
|
|||||||
###### Command
|
###### Command
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
curl -H "Content-Type: application/json" -X POST -d '{"range":{"startDate":"1970-01-01T01:00:00.000","startNanos":0,"endDate":"1970-01-01T01:00:00.030","endNanos":999999},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query
|
curl -H "Accept: application/json" -H "Content-Type: application/json" -X POST -d '{"range":{"startDate":"1970-01-01T01:00:00.000","startNanos":0,"endDate":"1970-01-01T01:00:00.030","endNanos":999999},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query
|
||||||
```
|
```
|
||||||
|
|
||||||
###### Response
|
###### Response
|
||||||
@ -389,7 +389,7 @@ Archiver Appliance supports queries by *time range* and *date range* only (as it
|
|||||||
###### Command
|
###### Command
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
curl -H "Content-Type: application/json" -X POST -d '{"dbmode":"archiverappliance","range":{"startMillis":0,"startNanos":0,"endMillis":30,"endNanos":999999},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query
|
curl -H "Accept: application/json" -H "Content-Type: application/json" -X POST -d '{"dbmode":"archiverappliance","range":{"startMillis":0,"startNanos":0,"endMillis":30,"endNanos":999999},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query
|
||||||
```
|
```
|
||||||
|
|
||||||
###### Response
|
###### Response
|
||||||
@ -418,7 +418,7 @@ Allows for server side optimizations since not all data needs to be retrieved.
|
|||||||
###### Command
|
###### Command
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
curl -H "Content-Type: application/json" -X POST -d '{"fields":["pulseId","value"],"range":{"startPulseId":0,"endPulseId":3},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query
|
curl -H "Accept: application/json" -H "Content-Type: application/json" -X POST -d '{"fields":["pulseId","value"],"range":{"startPulseId":0,"endPulseId":3},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query
|
||||||
```
|
```
|
||||||
|
|
||||||
###### Response
|
###### Response
|
||||||
@ -471,7 +471,7 @@ Use **none** in case ordering does not matter (allows for server side optimizati
|
|||||||
###### Command
|
###### Command
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
curl -H "Content-Type: application/json" -X POST -d '{"ordering":"desc","fields":["pulseId","value"],"range":{"startPulseId":0,"endPulseId":3},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query
|
curl -H "Accept: application/json" -H "Content-Type: application/json" -X POST -d '{"ordering":"desc","fields":["pulseId","value"],"range":{"startPulseId":0,"endPulseId":3},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query
|
||||||
```
|
```
|
||||||
|
|
||||||
###### Response
|
###### Response
|
||||||
@ -524,7 +524,7 @@ curl -H "Content-Type: application/json" -X POST -d '{"ordering":"desc","fields"
|
|||||||
###### Command
|
###### Command
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
curl -H "Content-Type: application/json" -X POST -d '{"aggregationType":"value","aggregations":["min","max","mean"],"fields":["pulseId","value"],"range":{"startPulseId":0,"endPulseId":3},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query
|
curl -H "Accept: application/json" -H "Content-Type: application/json" -X POST -d '{"aggregationType":"value","aggregations":["min","max","mean"],"fields":["pulseId","value"],"range":{"startPulseId":0,"endPulseId":3},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query
|
||||||
```
|
```
|
||||||
|
|
||||||
###### Response
|
###### Response
|
||||||
@ -598,7 +598,7 @@ Array value [aggregations](https://github.psi.ch/projects/ST/repos/ch.psi.daq.qu
|
|||||||
###### Command
|
###### Command
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
curl -H "Content-Type: application/json" -X POST -d '{"nrOfBins":2,"aggregationType":"value","aggregations":["min","max","mean"],"fields":["pulseId","value"],"range":{"startPulseId":0,"endPulseId":3},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query
|
curl -H "Accept: application/json" -H "Content-Type: application/json" -X POST -d '{"nrOfBins":2,"aggregationType":"value","aggregations":["min","max","mean"],"fields":["pulseId","value"],"range":{"startPulseId":0,"endPulseId":3},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query
|
||||||
```
|
```
|
||||||
|
|
||||||
###### Response
|
###### Response
|
||||||
@ -657,7 +657,7 @@ Array value [aggregations](https://github.psi.ch/projects/ST/repos/ch.psi.daq.qu
|
|||||||
###### Command
|
###### Command
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
curl -H "Content-Type: application/json" -X POST -d '{"binSize":10,"aggregationType":"value","aggregations":["min","max","mean"],"fields":["globalMillis","value"],"range":{"globalMillis":0,"globalMillis":3},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query
|
curl -H "Accept: application/json" -H "Content-Type: application/json" -X POST -d '{"binSize":10,"aggregationType":"value","aggregations":["min","max","mean"],"fields":["globalMillis","value"],"range":{"globalMillis":0,"globalMillis":3},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query
|
||||||
```
|
```
|
||||||
|
|
||||||
###### Response
|
###### Response
|
||||||
@ -714,7 +714,7 @@ Array value [aggregations](https://github.psi.ch/projects/ST/repos/ch.psi.daq.qu
|
|||||||
###### Command
|
###### Command
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
curl -H "Content-Type: application/json" -X POST -d '{"nrOfBins":1,"aggregationType":"index","aggregations":["min","max","mean","sum"],"fields":["pulseId","value"],"range":{"startPulseId":0,"endPulseId":3},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query
|
curl -H "Accept: application/json" -H "Content-Type: application/json" -X POST -d '{"nrOfBins":1,"aggregationType":"index","aggregations":["min","max","mean","sum"],"fields":["pulseId","value"],"range":{"startPulseId":0,"endPulseId":3},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query
|
||||||
```
|
```
|
||||||
|
|
||||||
###### Response
|
###### Response
|
||||||
@ -783,7 +783,7 @@ curl -H "Content-Type: application/json" -X POST -d '{"nrOfBins":1,"aggregationT
|
|||||||
###### Command
|
###### Command
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
curl -H "Content-Type: application/json" -X POST -d '{"aggregationType":"extrema","aggregations":["min","max","sum"],"fields":["pulseId","value"],"range":{"startPulseId":0,"endPulseId":3},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query
|
curl -H "Accept: application/json" -H "Content-Type: application/json" -X POST -d '{"aggregationType":"extrema","aggregations":["min","max","sum"],"fields":["pulseId","value"],"range":{"startPulseId":0,"endPulseId":3},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query
|
||||||
```
|
```
|
||||||
|
|
||||||
###### Response
|
###### Response
|
||||||
@ -829,3 +829,32 @@ curl -H "Content-Type: application/json" -X POST -d '{"aggregationType":"extrema
|
|||||||
}
|
}
|
||||||
]
|
]
|
||||||
```
|
```
|
||||||
|
|
||||||
|
### CSV Export
|
||||||
|
|
||||||
|
Responses can be formatted as csv by requesting `text/csv`.
|
||||||
|
CSV export does not support `index` and `extrema` aggregations.
|
||||||
|
|
||||||
|
### Example
|
||||||
|
|
||||||
|
```bash
|
||||||
|
curl -H "Accept: text/csv" -H "Content-Type: application/json" -X POST -d '{"range":{"startPulseId":0,"endPulseId":4},"channels":["channel1","channel2"],"fields":["channel","pulseId","iocMillis","iocNanos","globalMillis","globalNanos","shape","eventCount","value"]}' http://data-api.psi.ch/sf/query
|
||||||
|
```
|
||||||
|
|
||||||
|
### Response example
|
||||||
|
|
||||||
|
The response is in CSV.
|
||||||
|
|
||||||
|
```text
|
||||||
|
channel;pulseId;iocMillis;iocNanos;globalMillis;globalNanos;shape;eventCount;value
|
||||||
|
testChannel1;0;0;0;0;0;[1];1;0
|
||||||
|
testChannel1;1;10;0;10;0;[1];1;1
|
||||||
|
testChannel1;2;20;0;20;0;[1];1;2
|
||||||
|
testChannel1;3;30;0;30;0;[1];1;3
|
||||||
|
testChannel1;4;40;0;40;0;[1];1;4
|
||||||
|
testChannel2;0;0;0;0;0;[1];1;0
|
||||||
|
testChannel2;1;10;0;10;0;[1];1;1
|
||||||
|
testChannel2;2;20;0;20;0;[1];1;2
|
||||||
|
testChannel2;3;30;0;30;0;[1];1;3
|
||||||
|
testChannel2;4;40;0;40;0;[1];1;4
|
||||||
|
```
|
||||||
|
@ -26,6 +26,8 @@ dependencies {
|
|||||||
exclude group: 'org.slf4j', module: 'log4j-over-slf4j'
|
exclude group: 'org.slf4j', module: 'log4j-over-slf4j'
|
||||||
}
|
}
|
||||||
compile libraries.commons_lang
|
compile libraries.commons_lang
|
||||||
|
compile libraries.super_csv
|
||||||
|
compile libraries.super_csv_dozer
|
||||||
|
|
||||||
testCompile libraries.spring_boot_starter_test
|
testCompile libraries.spring_boot_starter_test
|
||||||
testCompile libraries.jsonassert
|
testCompile libraries.jsonassert
|
||||||
|
@ -42,9 +42,10 @@ import ch.psi.daq.query.model.QueryField;
|
|||||||
import ch.psi.daq.queryrest.controller.validator.QueryValidator;
|
import ch.psi.daq.queryrest.controller.validator.QueryValidator;
|
||||||
import ch.psi.daq.queryrest.filter.CorsFilter;
|
import ch.psi.daq.queryrest.filter.CorsFilter;
|
||||||
import ch.psi.daq.queryrest.model.PropertyFilterMixin;
|
import ch.psi.daq.queryrest.model.PropertyFilterMixin;
|
||||||
import ch.psi.daq.queryrest.response.JsonByteArraySerializer;
|
import ch.psi.daq.queryrest.response.csv.CSVResponseStreamWriter;
|
||||||
import ch.psi.daq.queryrest.response.JsonStreamSerializer;
|
import ch.psi.daq.queryrest.response.json.JSONResponseStreamWriter;
|
||||||
import ch.psi.daq.queryrest.response.ResponseStreamWriter;
|
import ch.psi.daq.queryrest.response.json.JsonByteArraySerializer;
|
||||||
|
import ch.psi.daq.queryrest.response.json.JsonStreamSerializer;
|
||||||
|
|
||||||
@Configuration
|
@Configuration
|
||||||
@PropertySource(value = {"classpath:queryrest.properties"})
|
@PropertySource(value = {"classpath:queryrest.properties"})
|
||||||
@ -122,8 +123,13 @@ public class QueryRestConfig extends WebMvcConfigurerAdapter {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Bean
|
@Bean
|
||||||
public ResponseStreamWriter responseStreamWriter() {
|
public JSONResponseStreamWriter jsonResponseStreamWriter() {
|
||||||
return new ResponseStreamWriter();
|
return new JSONResponseStreamWriter();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public CSVResponseStreamWriter csvResponseStreamWriter() {
|
||||||
|
return new CSVResponseStreamWriter();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Bean(name = BEAN_NAME_DEFAULT_RESPONSE_FIELDS)
|
@Bean(name = BEAN_NAME_DEFAULT_RESPONSE_FIELDS)
|
||||||
|
@ -41,7 +41,8 @@ import ch.psi.daq.query.model.QueryField;
|
|||||||
import ch.psi.daq.query.model.impl.DAQQuery;
|
import ch.psi.daq.query.model.impl.DAQQuery;
|
||||||
import ch.psi.daq.query.processor.QueryProcessor;
|
import ch.psi.daq.query.processor.QueryProcessor;
|
||||||
import ch.psi.daq.query.request.ChannelsRequest;
|
import ch.psi.daq.query.request.ChannelsRequest;
|
||||||
import ch.psi.daq.queryrest.response.ResponseStreamWriter;
|
import ch.psi.daq.queryrest.response.csv.CSVResponseStreamWriter;
|
||||||
|
import ch.psi.daq.queryrest.response.json.JSONResponseStreamWriter;
|
||||||
|
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
|
|
||||||
@ -58,7 +59,10 @@ public class QueryRestController {
|
|||||||
private Validator requestProviderValidator = new RequestProviderValidator();
|
private Validator requestProviderValidator = new RequestProviderValidator();
|
||||||
|
|
||||||
@Resource
|
@Resource
|
||||||
private ResponseStreamWriter responseStreamWriter;
|
private JSONResponseStreamWriter jsonResponseStreamWriter;
|
||||||
|
|
||||||
|
@Resource
|
||||||
|
private CSVResponseStreamWriter csvResponseStreamWriter;
|
||||||
|
|
||||||
@Resource
|
@Resource
|
||||||
private ApplicationContext appContext;
|
private ApplicationContext appContext;
|
||||||
@ -190,9 +194,8 @@ public class QueryRestController {
|
|||||||
}).collect(Collectors.toList());
|
}).collect(Collectors.toList());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Catch-all query method for getting data from the backend.
|
* Catch-all query method for getting data from the backend for JSON requests.
|
||||||
* <p>
|
* <p>
|
||||||
* The {@link DAQQuery} object will be a concrete subclass based on the combination of fields
|
* The {@link DAQQuery} object will be a concrete subclass based on the combination of fields
|
||||||
* defined in the user's query. The {@link AttributeBasedDeserializer} decides which class to
|
* defined in the user's query. The {@link AttributeBasedDeserializer} decides which class to
|
||||||
@ -203,14 +206,56 @@ public class QueryRestController {
|
|||||||
* @param res the {@link HttpServletResponse} instance associated with this request
|
* @param res the {@link HttpServletResponse} instance associated with this request
|
||||||
* @throws IOException thrown if writing to the output stream fails
|
* @throws IOException thrown if writing to the output stream fails
|
||||||
*/
|
*/
|
||||||
@RequestMapping(value = QUERY, method = RequestMethod.POST, consumes = {MediaType.APPLICATION_JSON_VALUE})
|
@RequestMapping(
|
||||||
public void executeQuery(@RequestBody @Valid DAQQuery query, HttpServletResponse res) throws IOException {
|
value = QUERY,
|
||||||
|
method = RequestMethod.POST,
|
||||||
|
consumes = {MediaType.APPLICATION_JSON_VALUE},
|
||||||
|
produces = {MediaType.APPLICATION_JSON_VALUE})
|
||||||
|
public void executeQueryJson(@RequestBody @Valid DAQQuery query, HttpServletResponse res) throws Exception {
|
||||||
try {
|
try {
|
||||||
LOGGER.debug("Executing query '{}'", query.toString());
|
LOGGER.debug("Executing query '{}'", query.toString());
|
||||||
|
|
||||||
// write the response back to the client using java 8 streams
|
// write the response back to the client using java 8 streams
|
||||||
responseStreamWriter.respond(executeQuery(query), query, res);
|
jsonResponseStreamWriter.respond(executeQuery(query), query, res);
|
||||||
} catch (IOException t) {
|
} catch (Exception t) {
|
||||||
|
LOGGER.error("Failed to execute query '{}'.", query, t);
|
||||||
|
throw t;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Catch-all query method for getting data from the backend for JSON requests.
|
||||||
|
* <p>
|
||||||
|
* The {@link DAQQuery} object will be a concrete subclass based on the combination of fields
|
||||||
|
* defined in the user's query. The {@link AttributeBasedDeserializer} decides which class to
|
||||||
|
* deserialize the information into and has been configured (see
|
||||||
|
* QueryRestConfig#afterPropertiesSet) accordingly.
|
||||||
|
*
|
||||||
|
* @param query concrete implementation of {@link DAQQuery}
|
||||||
|
* @param res the {@link HttpServletResponse} instance associated with this request
|
||||||
|
* @throws IOException thrown if writing to the output stream fails
|
||||||
|
*/
|
||||||
|
@RequestMapping(
|
||||||
|
value = QUERY,
|
||||||
|
method = RequestMethod.POST,
|
||||||
|
consumes = {MediaType.APPLICATION_JSON_VALUE},
|
||||||
|
produces = {CSVResponseStreamWriter.APPLICATION_CSV_VALUE})
|
||||||
|
public void executeQueryCsv(@RequestBody @Valid DAQQuery query, HttpServletResponse res) throws Exception {
|
||||||
|
if (!(query.getAggregationType() == null || AggregationType.value.equals(query.getAggregationType()))) {
|
||||||
|
// We allow only no aggregation or value aggregation as
|
||||||
|
// extrema: nested structure and not clear how to map it to one line
|
||||||
|
// index: value is an array of Statistics whose size is not clear at initialization time
|
||||||
|
String message = "CSV export does not support '" + query.getAggregationType() + "'";
|
||||||
|
LOGGER.warn(message);
|
||||||
|
throw new IllegalArgumentException(message);
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
LOGGER.debug("Executing query '{}'", query.toString());
|
||||||
|
|
||||||
|
// write the response back to the client using java 8 streams
|
||||||
|
csvResponseStreamWriter.respond(executeQuery(query), query, res);
|
||||||
|
} catch (Exception t) {
|
||||||
LOGGER.error("Failed to execute query '{}'.", query, t);
|
LOGGER.error("Failed to execute query '{}'.", query, t);
|
||||||
throw t;
|
throw t;
|
||||||
}
|
}
|
||||||
|
@ -14,7 +14,6 @@ import ch.psi.daq.query.model.DBMode;
|
|||||||
import ch.psi.daq.query.model.QueryField;
|
import ch.psi.daq.query.model.QueryField;
|
||||||
import ch.psi.daq.query.model.impl.DAQQuery;
|
import ch.psi.daq.query.model.impl.DAQQuery;
|
||||||
import ch.psi.daq.cassandra.request.Request;
|
import ch.psi.daq.cassandra.request.Request;
|
||||||
import ch.psi.daq.cassandra.request.range.RequestRangeTime;
|
|
||||||
import ch.psi.daq.queryrest.config.QueryRestConfig;
|
import ch.psi.daq.queryrest.config.QueryRestConfig;
|
||||||
|
|
||||||
public class QueryValidator implements Validator {
|
public class QueryValidator implements Validator {
|
||||||
|
@ -1,46 +1,14 @@
|
|||||||
package ch.psi.daq.queryrest.response;
|
package ch.psi.daq.queryrest.response;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.LinkedHashSet;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Set;
|
|
||||||
import java.util.Map.Entry;
|
import java.util.Map.Entry;
|
||||||
import java.util.stream.Stream;
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
import javax.annotation.Resource;
|
|
||||||
import javax.servlet.ServletResponse;
|
import javax.servlet.ServletResponse;
|
||||||
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
import org.springframework.http.MediaType;
|
|
||||||
|
|
||||||
import com.fasterxml.jackson.core.JsonEncoding;
|
|
||||||
import com.fasterxml.jackson.core.JsonFactory;
|
|
||||||
import com.fasterxml.jackson.core.JsonGenerator;
|
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
|
||||||
import com.fasterxml.jackson.databind.ObjectWriter;
|
|
||||||
import com.fasterxml.jackson.databind.ser.impl.SimpleBeanPropertyFilter;
|
|
||||||
import com.fasterxml.jackson.databind.ser.impl.SimpleFilterProvider;
|
|
||||||
|
|
||||||
import ch.psi.daq.query.model.Aggregation;
|
|
||||||
import ch.psi.daq.query.model.QueryField;
|
|
||||||
import ch.psi.daq.query.model.impl.DAQQuery;
|
import ch.psi.daq.query.model.impl.DAQQuery;
|
||||||
|
|
||||||
/**
|
public interface ResponseStreamWriter {
|
||||||
* Takes a Java 8 stream and writes it to the output stream provided by the {@link ServletResponse}
|
|
||||||
* of the current request.
|
|
||||||
*/
|
|
||||||
public class ResponseStreamWriter {
|
|
||||||
|
|
||||||
private static final String DATA_RESP_FIELD = "data";
|
|
||||||
|
|
||||||
private static final Logger logger = LoggerFactory.getLogger(ResponseStreamWriter.class);
|
|
||||||
|
|
||||||
@Resource
|
|
||||||
private JsonFactory jsonFactory;
|
|
||||||
|
|
||||||
@Resource
|
|
||||||
private ObjectMapper mapper;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Responding with the the contents of the stream by writing into the output stream of the
|
* Responding with the the contents of the stream by writing into the output stream of the
|
||||||
@ -51,82 +19,5 @@ public class ResponseStreamWriter {
|
|||||||
* @param response {@link ServletResponse} instance given by the current HTTP request
|
* @param response {@link ServletResponse} instance given by the current HTTP request
|
||||||
* @throws IOException thrown if writing to the output stream fails
|
* @throws IOException thrown if writing to the output stream fails
|
||||||
*/
|
*/
|
||||||
public void respond(Stream<Entry<String, ?>> stream, DAQQuery query,
|
public void respond(Stream<Entry<String, ?>> stream, DAQQuery query, ServletResponse response) throws Exception;
|
||||||
ServletResponse response) throws IOException {
|
|
||||||
|
|
||||||
Set<QueryField> queryFields = query.getFields();
|
|
||||||
List<Aggregation> aggregations = query.getAggregations();
|
|
||||||
|
|
||||||
Set<String> includedFields =
|
|
||||||
new LinkedHashSet<String>(queryFields.size() + (aggregations != null ? aggregations.size() : 0));
|
|
||||||
|
|
||||||
for (QueryField field : queryFields) {
|
|
||||||
includedFields.add(field.name());
|
|
||||||
}
|
|
||||||
if (aggregations != null) {
|
|
||||||
for (Aggregation aggregation : query.getAggregations()) {
|
|
||||||
includedFields.add(aggregation.name());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// do not write channel since it is already provided as key in mapping
|
|
||||||
includedFields.remove(QueryField.channel.name());
|
|
||||||
|
|
||||||
ObjectWriter writer = configureWriter(includedFields);
|
|
||||||
respondInternal(stream, response, writer);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Configures the writer dynamically by including the fields which should be included in the
|
|
||||||
* response.
|
|
||||||
*
|
|
||||||
* @param includedFields set of strings which correspond to the getter method names of the
|
|
||||||
* classes registered as a mixed-in
|
|
||||||
* @return the configured writer that includes the specified fields
|
|
||||||
*/
|
|
||||||
private ObjectWriter configureWriter(Set<String> includedFields) {
|
|
||||||
SimpleFilterProvider propertyFilter = new SimpleFilterProvider();
|
|
||||||
propertyFilter.addFilter("namedPropertyFilter", SimpleBeanPropertyFilter.filterOutAllExcept(includedFields));
|
|
||||||
// only write the properties not excluded in the filter
|
|
||||||
ObjectWriter writer = mapper.writer(propertyFilter);
|
|
||||||
return writer;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Writes the outer Java stream into the output stream.
|
|
||||||
*
|
|
||||||
* @param stream Mapping from channel name to data
|
|
||||||
* @param response {@link ServletResponse} instance given by the current HTTP request
|
|
||||||
* @param writer configured writer that includes the fields the end user wants to see
|
|
||||||
* @throws IOException thrown if writing to the output stream fails
|
|
||||||
*/
|
|
||||||
private void respondInternal(Stream<Entry<String, ?>> stream, ServletResponse response,
|
|
||||||
ObjectWriter writer)
|
|
||||||
throws IOException {
|
|
||||||
|
|
||||||
response.setCharacterEncoding(JsonEncoding.UTF8.getJavaName());
|
|
||||||
response.setContentType(MediaType.APPLICATION_JSON_VALUE);
|
|
||||||
JsonGenerator generator = jsonFactory.createGenerator(response.getOutputStream(), JsonEncoding.UTF8);
|
|
||||||
generator.writeStartArray();
|
|
||||||
stream
|
|
||||||
/* ensure elements are sequentially written */
|
|
||||||
.sequential()
|
|
||||||
.forEach(
|
|
||||||
entry -> {
|
|
||||||
try {
|
|
||||||
generator.writeStartObject();
|
|
||||||
generator.writeStringField(QueryField.channel.name(), entry.getKey());
|
|
||||||
|
|
||||||
generator.writeFieldName(DATA_RESP_FIELD);
|
|
||||||
writer.writeValue(generator, entry.getValue());
|
|
||||||
|
|
||||||
generator.writeEndObject();
|
|
||||||
} catch (Exception e) {
|
|
||||||
logger.error("Could not write channel name of channel '{}'", entry.getKey(), e);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
generator.writeEndArray();
|
|
||||||
generator.flush();
|
|
||||||
generator.close();
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,58 @@
|
|||||||
|
package ch.psi.daq.queryrest.response.csv;
|
||||||
|
|
||||||
|
import java.lang.reflect.Array;
|
||||||
|
|
||||||
|
import org.supercsv.cellprocessor.CellProcessorAdaptor;
|
||||||
|
import org.supercsv.cellprocessor.ift.CellProcessor;
|
||||||
|
import org.supercsv.util.CsvContext;
|
||||||
|
|
||||||
|
public class ArrayProcessor extends CellProcessorAdaptor {
|
||||||
|
public static final char DEFAULT_SEPARATOR = ',';
|
||||||
|
public static final String OPEN_BRACKET = "[";
|
||||||
|
public static final String CLOSE_BRACKET = "]";
|
||||||
|
|
||||||
|
private char separator = DEFAULT_SEPARATOR;
|
||||||
|
|
||||||
|
public ArrayProcessor() {
|
||||||
|
super();
|
||||||
|
}
|
||||||
|
|
||||||
|
public ArrayProcessor(char separator) {
|
||||||
|
super();
|
||||||
|
this.separator = separator;
|
||||||
|
}
|
||||||
|
|
||||||
|
public ArrayProcessor(CellProcessor next) {
|
||||||
|
super(next);
|
||||||
|
}
|
||||||
|
|
||||||
|
public ArrayProcessor(CellProcessor next, char separator) {
|
||||||
|
super(next);
|
||||||
|
this.separator = separator;
|
||||||
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
@Override
|
||||||
|
public Object execute(Object value, CsvContext context) {
|
||||||
|
if (value.getClass().isArray()) {
|
||||||
|
StringBuilder buf = new StringBuilder();
|
||||||
|
|
||||||
|
int length = Array.getLength(value);
|
||||||
|
buf.append(OPEN_BRACKET);
|
||||||
|
for (int i = 0; i < length;) {
|
||||||
|
Object val = next.execute(Array.get(value, i), context);
|
||||||
|
buf.append(val);
|
||||||
|
|
||||||
|
++i;
|
||||||
|
if (i < length) {
|
||||||
|
buf.append(separator);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
buf.append(CLOSE_BRACKET);
|
||||||
|
|
||||||
|
return buf.toString();
|
||||||
|
} else {
|
||||||
|
return next.execute(value, context);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,140 @@
|
|||||||
|
package ch.psi.daq.queryrest.response.csv;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.LinkedHashSet;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map.Entry;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
import java.util.function.Function;
|
||||||
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
|
import javax.annotation.Resource;
|
||||||
|
import javax.servlet.ServletResponse;
|
||||||
|
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
import org.supercsv.cellprocessor.constraint.NotNull;
|
||||||
|
import org.supercsv.cellprocessor.ift.CellProcessor;
|
||||||
|
import org.supercsv.io.dozer.CsvDozerBeanWriter;
|
||||||
|
import org.supercsv.io.dozer.ICsvDozerBeanWriter;
|
||||||
|
import org.supercsv.prefs.CsvPreference;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.core.JsonEncoding;
|
||||||
|
|
||||||
|
import ch.psi.daq.domain.DataEvent;
|
||||||
|
import ch.psi.daq.query.analyzer.QueryAnalyzer;
|
||||||
|
import ch.psi.daq.query.model.Aggregation;
|
||||||
|
import ch.psi.daq.query.model.Query;
|
||||||
|
import ch.psi.daq.query.model.QueryField;
|
||||||
|
import ch.psi.daq.query.model.impl.DAQQuery;
|
||||||
|
import ch.psi.daq.queryrest.response.ResponseStreamWriter;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Takes a Java 8 stream and writes it to the output stream provided by the {@link ServletResponse}
|
||||||
|
* of the current request.
|
||||||
|
*/
|
||||||
|
public class CSVResponseStreamWriter implements ResponseStreamWriter {
|
||||||
|
public static final String APPLICATION_CSV_VALUE = "text/csv";
|
||||||
|
|
||||||
|
private static final char DELIMITER_CVS = ';';
|
||||||
|
private static final char DELIMITER_ARRAY = ',';
|
||||||
|
|
||||||
|
private static final Logger LOGGER = LoggerFactory.getLogger(CSVResponseStreamWriter.class);
|
||||||
|
|
||||||
|
@Resource
|
||||||
|
private Function<Query, QueryAnalyzer> queryAnalizerFactory;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void respond(Stream<Entry<String, ?>> stream, DAQQuery query, ServletResponse response) throws Exception {
|
||||||
|
response.setCharacterEncoding(JsonEncoding.UTF8.getJavaName());
|
||||||
|
response.setContentType(APPLICATION_CSV_VALUE);
|
||||||
|
|
||||||
|
respondInternal(stream, query, response);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void respondInternal(Stream<Entry<String, ?>> stream, DAQQuery query, ServletResponse response)
|
||||||
|
throws Exception {
|
||||||
|
Set<QueryField> queryFields = query.getFields();
|
||||||
|
List<Aggregation> aggregations = query.getAggregations();
|
||||||
|
|
||||||
|
Set<String> fieldMapping =
|
||||||
|
new LinkedHashSet<>(queryFields.size() + (aggregations != null ? aggregations.size() : 0));
|
||||||
|
List<String> header =
|
||||||
|
new ArrayList<>(queryFields.size() + (aggregations != null ? aggregations.size() : 0));
|
||||||
|
List<CellProcessor> processorSet =
|
||||||
|
new ArrayList<>(queryFields.size() + (aggregations != null ? aggregations.size() : 0));
|
||||||
|
boolean isNewField;
|
||||||
|
QueryAnalyzer queryAnalyzer = queryAnalizerFactory.apply(query);
|
||||||
|
|
||||||
|
for (QueryField field : queryFields) {
|
||||||
|
if(!(QueryField.value.equals(field) && queryAnalyzer.isAggregationEnabled())){
|
||||||
|
isNewField = fieldMapping.add(field.name());
|
||||||
|
|
||||||
|
if (isNewField) {
|
||||||
|
header.add(field.name());
|
||||||
|
processorSet.add(new ArrayProcessor(new NotNull(), DELIMITER_ARRAY));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (aggregations != null && queryAnalyzer.isAggregationEnabled()) {
|
||||||
|
for (Aggregation aggregation : query.getAggregations()) {
|
||||||
|
isNewField = fieldMapping.add("value." + aggregation.name());
|
||||||
|
|
||||||
|
if (isNewField) {
|
||||||
|
header.add(aggregation.name());
|
||||||
|
processorSet.add(new ArrayProcessor(new NotNull(), DELIMITER_ARRAY));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
CellProcessor[] processors = processorSet.toArray(new CellProcessor[processorSet.size()]);
|
||||||
|
CsvPreference preference = new CsvPreference.Builder(
|
||||||
|
CsvPreference.EXCEL_NORTH_EUROPE_PREFERENCE.getQuoteChar(),
|
||||||
|
DELIMITER_CVS,
|
||||||
|
CsvPreference.EXCEL_NORTH_EUROPE_PREFERENCE.getEndOfLineSymbols()).build();
|
||||||
|
ICsvDozerBeanWriter beanWriter = new CsvDozerBeanWriter(response.getWriter(), preference);
|
||||||
|
|
||||||
|
AtomicReference<Exception> exception = new AtomicReference<>();
|
||||||
|
try {
|
||||||
|
// configure the mapping from the fields to the CSV columns
|
||||||
|
beanWriter.configureBeanMapping(DataEvent.class, fieldMapping.toArray(new String[fieldMapping.size()]));
|
||||||
|
|
||||||
|
beanWriter.writeHeader(header.toArray(new String[header.size()]));
|
||||||
|
|
||||||
|
stream
|
||||||
|
/* ensure elements are sequentially written */
|
||||||
|
.sequential()
|
||||||
|
.forEach(
|
||||||
|
entry -> {
|
||||||
|
if (entry.getValue() instanceof Stream) {
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
Stream<DataEvent> eventStream = (Stream<DataEvent>) entry.getValue();
|
||||||
|
eventStream
|
||||||
|
.forEach(
|
||||||
|
event -> {
|
||||||
|
try {
|
||||||
|
beanWriter.write(event, processors);
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOGGER.error("Could not write elements for '{}-{}'", event.getChannel(),
|
||||||
|
event.getPulseId(), e);
|
||||||
|
exception.compareAndSet(null, e);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
} else {
|
||||||
|
String message = "Type '" + entry.getValue().getClass() + "' not supported.";
|
||||||
|
LOGGER.error(message);
|
||||||
|
exception.compareAndSet(null, new RuntimeException(message));
|
||||||
|
}
|
||||||
|
});
|
||||||
|
} finally {
|
||||||
|
beanWriter.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (exception.get() != null) {
|
||||||
|
throw exception.get();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -0,0 +1,140 @@
|
|||||||
|
package ch.psi.daq.queryrest.response.json;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Map.Entry;
|
||||||
|
import java.util.LinkedHashSet;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
|
import javax.annotation.Resource;
|
||||||
|
import javax.servlet.ServletResponse;
|
||||||
|
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
import org.springframework.http.MediaType;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.core.JsonEncoding;
|
||||||
|
import com.fasterxml.jackson.core.JsonFactory;
|
||||||
|
import com.fasterxml.jackson.core.JsonGenerator;
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
import com.fasterxml.jackson.databind.ObjectWriter;
|
||||||
|
import com.fasterxml.jackson.databind.ser.impl.SimpleBeanPropertyFilter;
|
||||||
|
import com.fasterxml.jackson.databind.ser.impl.SimpleFilterProvider;
|
||||||
|
|
||||||
|
import ch.psi.daq.query.model.Aggregation;
|
||||||
|
import ch.psi.daq.query.model.QueryField;
|
||||||
|
import ch.psi.daq.query.model.impl.DAQQuery;
|
||||||
|
import ch.psi.daq.queryrest.response.ResponseStreamWriter;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Takes a Java 8 stream and writes it to the output stream provided by the {@link ServletResponse}
|
||||||
|
* of the current request.
|
||||||
|
*/
|
||||||
|
public class JSONResponseStreamWriter implements ResponseStreamWriter {
|
||||||
|
|
||||||
|
private static final String DATA_RESP_FIELD = "data";
|
||||||
|
|
||||||
|
private static final Logger logger = LoggerFactory.getLogger(JSONResponseStreamWriter.class);
|
||||||
|
|
||||||
|
@Resource
|
||||||
|
private JsonFactory jsonFactory;
|
||||||
|
|
||||||
|
@Resource
|
||||||
|
private ObjectMapper mapper;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void respond(Stream<Entry<String, ?>> stream, DAQQuery query, ServletResponse response) throws Exception {
|
||||||
|
response.setCharacterEncoding(JsonEncoding.UTF8.getJavaName());
|
||||||
|
response.setContentType(MediaType.APPLICATION_JSON_VALUE);
|
||||||
|
|
||||||
|
Set<String> includedFields = getFields(query);
|
||||||
|
|
||||||
|
ObjectWriter writer = configureWriter(includedFields);
|
||||||
|
respondInternal(stream, response, writer);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected Set<String> getFields(DAQQuery query) {
|
||||||
|
Set<QueryField> queryFields = query.getFields();
|
||||||
|
List<Aggregation> aggregations = query.getAggregations();
|
||||||
|
|
||||||
|
Set<String> includedFields =
|
||||||
|
new LinkedHashSet<String>(queryFields.size() + (aggregations != null ? aggregations.size() : 0));
|
||||||
|
|
||||||
|
for (QueryField field : queryFields) {
|
||||||
|
includedFields.add(field.name());
|
||||||
|
}
|
||||||
|
if (aggregations != null) {
|
||||||
|
for (Aggregation aggregation : query.getAggregations()) {
|
||||||
|
includedFields.add(aggregation.name());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// do not write channel since it is already provided as key in mapping
|
||||||
|
includedFields.remove(QueryField.channel.name());
|
||||||
|
|
||||||
|
return includedFields;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Configures the writer dynamically by including the fields which should be included in the
|
||||||
|
* response.
|
||||||
|
*
|
||||||
|
* @param includedFields set of strings which correspond to the getter method names of the
|
||||||
|
* classes registered as a mixed-in
|
||||||
|
* @return the configured writer that includes the specified fields
|
||||||
|
*/
|
||||||
|
private ObjectWriter configureWriter(Set<String> includedFields) {
|
||||||
|
SimpleFilterProvider propertyFilter = new SimpleFilterProvider();
|
||||||
|
propertyFilter.addFilter("namedPropertyFilter", SimpleBeanPropertyFilter.filterOutAllExcept(includedFields));
|
||||||
|
// only write the properties not excluded in the filter
|
||||||
|
ObjectWriter writer = mapper.writer(propertyFilter);
|
||||||
|
return writer;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Writes the outer Java stream into the output stream.
|
||||||
|
*
|
||||||
|
* @param stream Mapping from channel name to data
|
||||||
|
* @param response {@link ServletResponse} instance given by the current HTTP request
|
||||||
|
* @param writer configured writer that includes the fields the end user wants to see
|
||||||
|
* @throws IOException thrown if writing to the output stream fails
|
||||||
|
*/
|
||||||
|
private void respondInternal(Stream<Entry<String, ?>> stream, ServletResponse response, ObjectWriter writer)
|
||||||
|
throws Exception {
|
||||||
|
JsonGenerator generator = jsonFactory.createGenerator(response.getOutputStream(), JsonEncoding.UTF8);
|
||||||
|
|
||||||
|
AtomicReference<Exception> exception = new AtomicReference<>();
|
||||||
|
try {
|
||||||
|
generator.writeStartArray();
|
||||||
|
stream
|
||||||
|
/* ensure elements are sequentially written */
|
||||||
|
.sequential()
|
||||||
|
.forEach(
|
||||||
|
entry -> {
|
||||||
|
try {
|
||||||
|
generator.writeStartObject();
|
||||||
|
generator.writeStringField(QueryField.channel.name(), entry.getKey());
|
||||||
|
|
||||||
|
generator.writeFieldName(DATA_RESP_FIELD);
|
||||||
|
writer.writeValue(generator, entry.getValue());
|
||||||
|
|
||||||
|
generator.writeEndObject();
|
||||||
|
} catch (Exception e) {
|
||||||
|
logger.error("Could not write channel name of channel '{}'", entry.getKey(), e);
|
||||||
|
exception.compareAndSet(null, e);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
generator.writeEndArray();
|
||||||
|
} finally {
|
||||||
|
generator.flush();
|
||||||
|
generator.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (exception.get() != null) {
|
||||||
|
throw exception.get();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -1,4 +1,4 @@
|
|||||||
package ch.psi.daq.queryrest.response;
|
package ch.psi.daq.queryrest.response.json;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
@ -1,4 +1,4 @@
|
|||||||
package ch.psi.daq.queryrest.response;
|
package ch.psi.daq.queryrest.response.json;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
@ -3,7 +3,7 @@
|
|||||||
queryrest.default.response.fields=channel,pulseId,globalMillis,globalNanos,iocMillis,iocNanos,shape,eventCount,value
|
queryrest.default.response.fields=channel,pulseId,globalMillis,globalNanos,iocMillis,iocNanos,shape,eventCount,value
|
||||||
|
|
||||||
# aggregation which are included in the response by default if aggregation is enabled for a given query
|
# aggregation which are included in the response by default if aggregation is enabled for a given query
|
||||||
queryrest.default.response.aggregations=min,max,sum
|
queryrest.default.response.aggregations=min,mean,max
|
||||||
|
|
||||||
# enables / disables the CORS servlet filter. Adds multiple CORS headers to the response
|
# enables / disables the CORS servlet filter. Adds multiple CORS headers to the response
|
||||||
queryrest.cors.enable=true
|
queryrest.cors.enable=true
|
||||||
|
@ -0,0 +1,590 @@
|
|||||||
|
package ch.psi.daq.test.queryrest.controller;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
import java.io.StringReader;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.LinkedHashSet;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import javax.annotation.Resource;
|
||||||
|
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.springframework.http.MediaType;
|
||||||
|
import org.springframework.test.web.servlet.MvcResult;
|
||||||
|
import org.springframework.test.web.servlet.request.MockMvcRequestBuilders;
|
||||||
|
import org.springframework.test.web.servlet.result.MockMvcResultHandlers;
|
||||||
|
import org.springframework.test.web.servlet.result.MockMvcResultMatchers;
|
||||||
|
import org.supercsv.cellprocessor.constraint.NotNull;
|
||||||
|
import org.supercsv.cellprocessor.ift.CellProcessor;
|
||||||
|
import org.supercsv.io.CsvMapReader;
|
||||||
|
import org.supercsv.io.ICsvMapReader;
|
||||||
|
import org.supercsv.prefs.CsvPreference;
|
||||||
|
|
||||||
|
import ch.psi.daq.cassandra.request.range.RequestRangeDate;
|
||||||
|
import ch.psi.daq.cassandra.request.range.RequestRangePulseId;
|
||||||
|
import ch.psi.daq.cassandra.request.range.RequestRangeTime;
|
||||||
|
import ch.psi.daq.cassandra.util.test.CassandraDataGen;
|
||||||
|
import ch.psi.daq.common.ordering.Ordering;
|
||||||
|
import ch.psi.daq.query.model.Aggregation;
|
||||||
|
import ch.psi.daq.query.model.AggregationType;
|
||||||
|
import ch.psi.daq.query.model.QueryField;
|
||||||
|
import ch.psi.daq.query.model.impl.DAQQuery;
|
||||||
|
import ch.psi.daq.queryrest.controller.QueryRestController;
|
||||||
|
import ch.psi.daq.queryrest.filter.CorsFilter;
|
||||||
|
import ch.psi.daq.queryrest.response.csv.CSVResponseStreamWriter;
|
||||||
|
import ch.psi.daq.test.cassandra.admin.CassandraTestAdmin;
|
||||||
|
import ch.psi.daq.test.queryrest.AbstractDaqRestTest;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests the {@link DaqController} implementation.
|
||||||
|
*/
|
||||||
|
public class QueryRestControllerCsvTest extends AbstractDaqRestTest {
|
||||||
|
|
||||||
|
@Resource
|
||||||
|
private CassandraTestAdmin cassandraTestAdmin;
|
||||||
|
|
||||||
|
@Resource
|
||||||
|
private CassandraDataGen dataGen;
|
||||||
|
|
||||||
|
@Resource
|
||||||
|
private CorsFilter corsFilter;
|
||||||
|
|
||||||
|
public static final String TEST_CHANNEL = "testChannel";
|
||||||
|
public static final String TEST_CHANNEL_01 = TEST_CHANNEL + "1";
|
||||||
|
public static final String TEST_CHANNEL_02 = TEST_CHANNEL + "2";
|
||||||
|
public static final String[] TEST_CHANNEL_NAMES = new String[] {TEST_CHANNEL_01, TEST_CHANNEL_02};
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void tearDown() throws Exception {}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPulseRangeQuery() throws Exception {
|
||||||
|
DAQQuery request = new DAQQuery(
|
||||||
|
new RequestRangePulseId(
|
||||||
|
0,
|
||||||
|
1),
|
||||||
|
TEST_CHANNEL_NAMES);
|
||||||
|
LinkedHashSet<QueryField> queryFields = new LinkedHashSet<>();
|
||||||
|
LinkedHashSet<CellProcessor> cellProcessors = new LinkedHashSet<>();
|
||||||
|
queryFields.add(QueryField.channel);
|
||||||
|
cellProcessors.add(new NotNull());
|
||||||
|
queryFields.add(QueryField.pulseId);
|
||||||
|
cellProcessors.add(new NotNull());
|
||||||
|
queryFields.add(QueryField.iocMillis);
|
||||||
|
cellProcessors.add(new NotNull());
|
||||||
|
queryFields.add(QueryField.iocNanos);
|
||||||
|
cellProcessors.add(new NotNull());
|
||||||
|
queryFields.add(QueryField.globalMillis);
|
||||||
|
cellProcessors.add(new NotNull());
|
||||||
|
queryFields.add(QueryField.globalNanos);
|
||||||
|
cellProcessors.add(new NotNull());
|
||||||
|
queryFields.add(QueryField.shape);
|
||||||
|
cellProcessors.add(new NotNull());
|
||||||
|
queryFields.add(QueryField.eventCount);
|
||||||
|
cellProcessors.add(new NotNull());
|
||||||
|
queryFields.add(QueryField.value);
|
||||||
|
cellProcessors.add(new NotNull());
|
||||||
|
request.setFields(queryFields);
|
||||||
|
|
||||||
|
String content = mapper.writeValueAsString(request);
|
||||||
|
System.out.println(content);
|
||||||
|
|
||||||
|
MvcResult result = this.mockMvc
|
||||||
|
.perform(MockMvcRequestBuilders
|
||||||
|
.post(QueryRestController.QUERY)
|
||||||
|
.accept(CSVResponseStreamWriter.APPLICATION_CSV_VALUE)
|
||||||
|
.contentType(MediaType.APPLICATION_JSON)
|
||||||
|
.content(content))
|
||||||
|
.andDo(MockMvcResultHandlers.print())
|
||||||
|
.andExpect(MockMvcResultMatchers.status().isOk())
|
||||||
|
.andReturn();
|
||||||
|
|
||||||
|
CellProcessor[] processors = cellProcessors.toArray(new CellProcessor[cellProcessors.size()]);
|
||||||
|
|
||||||
|
String response = result.getResponse().getContentAsString();
|
||||||
|
ICsvMapReader mapReader =
|
||||||
|
new CsvMapReader(new StringReader(response), CsvPreference.EXCEL_NORTH_EUROPE_PREFERENCE);
|
||||||
|
try {
|
||||||
|
final String[] header = mapReader.getHeader(true);
|
||||||
|
Map<String, Object> customerMap;
|
||||||
|
long resultsPerChannel = 2;
|
||||||
|
long pulse = 0;
|
||||||
|
long channelCount = 1;
|
||||||
|
while ((customerMap = mapReader.read(header, processors)) != null) {
|
||||||
|
assertEquals(TEST_CHANNEL + channelCount, customerMap.get(QueryField.channel.name()));
|
||||||
|
assertEquals("" + pulse, customerMap.get(QueryField.pulseId.name()));
|
||||||
|
assertEquals("" + pulse * 10, customerMap.get(QueryField.iocMillis.name()));
|
||||||
|
assertEquals("0", customerMap.get(QueryField.iocNanos.name()));
|
||||||
|
assertEquals("" + pulse * 10, customerMap.get(QueryField.globalMillis.name()));
|
||||||
|
assertEquals("0", customerMap.get(QueryField.globalNanos.name()));
|
||||||
|
assertEquals("[1]", customerMap.get(QueryField.shape.name()));
|
||||||
|
assertEquals("1", customerMap.get(QueryField.eventCount.name()));
|
||||||
|
assertEquals("" + pulse, customerMap.get(QueryField.value.name()));
|
||||||
|
|
||||||
|
pulse = ++pulse % resultsPerChannel;
|
||||||
|
if (pulse == 0) {
|
||||||
|
++channelCount;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
mapReader.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPulseRangeQueryWaveform() throws Exception {
|
||||||
|
String channelName = "XYWaveform";
|
||||||
|
DAQQuery request = new DAQQuery(
|
||||||
|
new RequestRangePulseId(
|
||||||
|
0,
|
||||||
|
1),
|
||||||
|
channelName);
|
||||||
|
LinkedHashSet<QueryField> queryFields = new LinkedHashSet<>();
|
||||||
|
LinkedHashSet<CellProcessor> cellProcessors = new LinkedHashSet<>();
|
||||||
|
queryFields.add(QueryField.channel);
|
||||||
|
cellProcessors.add(new NotNull());
|
||||||
|
queryFields.add(QueryField.pulseId);
|
||||||
|
cellProcessors.add(new NotNull());
|
||||||
|
queryFields.add(QueryField.iocMillis);
|
||||||
|
cellProcessors.add(new NotNull());
|
||||||
|
queryFields.add(QueryField.iocNanos);
|
||||||
|
cellProcessors.add(new NotNull());
|
||||||
|
queryFields.add(QueryField.globalMillis);
|
||||||
|
cellProcessors.add(new NotNull());
|
||||||
|
queryFields.add(QueryField.globalNanos);
|
||||||
|
cellProcessors.add(new NotNull());
|
||||||
|
queryFields.add(QueryField.shape);
|
||||||
|
cellProcessors.add(new NotNull());
|
||||||
|
queryFields.add(QueryField.eventCount);
|
||||||
|
cellProcessors.add(new NotNull());
|
||||||
|
queryFields.add(QueryField.value);
|
||||||
|
cellProcessors.add(new NotNull());
|
||||||
|
request.setFields(queryFields);
|
||||||
|
|
||||||
|
String content = mapper.writeValueAsString(request);
|
||||||
|
System.out.println(content);
|
||||||
|
|
||||||
|
MvcResult result = this.mockMvc
|
||||||
|
.perform(MockMvcRequestBuilders
|
||||||
|
.post(QueryRestController.QUERY)
|
||||||
|
.accept(CSVResponseStreamWriter.APPLICATION_CSV_VALUE)
|
||||||
|
.contentType(MediaType.APPLICATION_JSON)
|
||||||
|
.content(content))
|
||||||
|
.andDo(MockMvcResultHandlers.print())
|
||||||
|
.andExpect(MockMvcResultMatchers.status().isOk())
|
||||||
|
.andReturn();
|
||||||
|
|
||||||
|
CellProcessor[] processors = cellProcessors.toArray(new CellProcessor[cellProcessors.size()]);
|
||||||
|
|
||||||
|
String response = result.getResponse().getContentAsString();
|
||||||
|
ICsvMapReader mapReader =
|
||||||
|
new CsvMapReader(new StringReader(response), CsvPreference.EXCEL_NORTH_EUROPE_PREFERENCE);
|
||||||
|
try {
|
||||||
|
final String[] header = mapReader.getHeader(true);
|
||||||
|
Map<String, Object> customerMap;
|
||||||
|
long pulse = 0;
|
||||||
|
while ((customerMap = mapReader.read(header, processors)) != null) {
|
||||||
|
assertEquals(channelName, customerMap.get(QueryField.channel.name()));
|
||||||
|
assertEquals("" + pulse, customerMap.get(QueryField.pulseId.name()));
|
||||||
|
assertEquals("" + pulse * 10, customerMap.get(QueryField.iocMillis.name()));
|
||||||
|
assertEquals("0", customerMap.get(QueryField.iocNanos.name()));
|
||||||
|
assertEquals("" + pulse * 10, customerMap.get(QueryField.globalMillis.name()));
|
||||||
|
assertEquals("0", customerMap.get(QueryField.globalNanos.name()));
|
||||||
|
assertEquals("[2048]", customerMap.get(QueryField.shape.name()));
|
||||||
|
assertEquals("1", customerMap.get(QueryField.eventCount.name()));
|
||||||
|
assertTrue(customerMap.get(QueryField.value.name()).toString().startsWith("["));
|
||||||
|
assertTrue(customerMap.get(QueryField.value.name()).toString().endsWith("]"));
|
||||||
|
|
||||||
|
++pulse;
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
mapReader.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testTimeRangeQuery() throws Exception {
|
||||||
|
DAQQuery request = new DAQQuery(
|
||||||
|
new RequestRangeTime(
|
||||||
|
0,
|
||||||
|
10),
|
||||||
|
TEST_CHANNEL_NAMES);
|
||||||
|
LinkedHashSet<QueryField> queryFields = new LinkedHashSet<>();
|
||||||
|
LinkedHashSet<CellProcessor> cellProcessors = new LinkedHashSet<>();
|
||||||
|
queryFields.add(QueryField.channel);
|
||||||
|
cellProcessors.add(new NotNull());
|
||||||
|
queryFields.add(QueryField.pulseId);
|
||||||
|
cellProcessors.add(new NotNull());
|
||||||
|
queryFields.add(QueryField.iocMillis);
|
||||||
|
cellProcessors.add(new NotNull());
|
||||||
|
queryFields.add(QueryField.iocNanos);
|
||||||
|
cellProcessors.add(new NotNull());
|
||||||
|
queryFields.add(QueryField.globalMillis);
|
||||||
|
cellProcessors.add(new NotNull());
|
||||||
|
queryFields.add(QueryField.globalNanos);
|
||||||
|
cellProcessors.add(new NotNull());
|
||||||
|
queryFields.add(QueryField.shape);
|
||||||
|
cellProcessors.add(new NotNull());
|
||||||
|
queryFields.add(QueryField.eventCount);
|
||||||
|
cellProcessors.add(new NotNull());
|
||||||
|
queryFields.add(QueryField.value);
|
||||||
|
cellProcessors.add(new NotNull());
|
||||||
|
request.setFields(queryFields);
|
||||||
|
|
||||||
|
String content = mapper.writeValueAsString(request);
|
||||||
|
System.out.println(content);
|
||||||
|
|
||||||
|
MvcResult result = this.mockMvc
|
||||||
|
.perform(MockMvcRequestBuilders
|
||||||
|
.post(QueryRestController.QUERY)
|
||||||
|
.accept(CSVResponseStreamWriter.APPLICATION_CSV_VALUE)
|
||||||
|
.contentType(MediaType.APPLICATION_JSON)
|
||||||
|
.content(content))
|
||||||
|
.andDo(MockMvcResultHandlers.print())
|
||||||
|
.andExpect(MockMvcResultMatchers.status().isOk())
|
||||||
|
.andReturn();
|
||||||
|
|
||||||
|
CellProcessor[] processors = cellProcessors.toArray(new CellProcessor[cellProcessors.size()]);
|
||||||
|
|
||||||
|
String response = result.getResponse().getContentAsString();
|
||||||
|
ICsvMapReader mapReader =
|
||||||
|
new CsvMapReader(new StringReader(response), CsvPreference.EXCEL_NORTH_EUROPE_PREFERENCE);
|
||||||
|
try {
|
||||||
|
final String[] header = mapReader.getHeader(true);
|
||||||
|
Map<String, Object> customerMap;
|
||||||
|
long resultsPerChannel = 2;
|
||||||
|
long pulse = 0;
|
||||||
|
long channelCount = 1;
|
||||||
|
while ((customerMap = mapReader.read(header, processors)) != null) {
|
||||||
|
assertEquals(TEST_CHANNEL + channelCount, customerMap.get(QueryField.channel.name()));
|
||||||
|
assertEquals("" + pulse, customerMap.get(QueryField.pulseId.name()));
|
||||||
|
assertEquals("" + pulse * 10, customerMap.get(QueryField.iocMillis.name()));
|
||||||
|
assertEquals("0", customerMap.get(QueryField.iocNanos.name()));
|
||||||
|
assertEquals("" + pulse * 10, customerMap.get(QueryField.globalMillis.name()));
|
||||||
|
assertEquals("0", customerMap.get(QueryField.globalNanos.name()));
|
||||||
|
assertEquals("[1]", customerMap.get(QueryField.shape.name()));
|
||||||
|
assertEquals("1", customerMap.get(QueryField.eventCount.name()));
|
||||||
|
assertEquals("" + pulse, customerMap.get(QueryField.value.name()));
|
||||||
|
|
||||||
|
pulse = ++pulse % resultsPerChannel;
|
||||||
|
if (pulse == 0) {
|
||||||
|
++channelCount;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
mapReader.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDateRangeQuery() throws Exception {
|
||||||
|
String startDate = RequestRangeDate.format(0);
|
||||||
|
String endDate = RequestRangeDate.format(10);
|
||||||
|
DAQQuery request = new DAQQuery(
|
||||||
|
new RequestRangeDate(
|
||||||
|
startDate,
|
||||||
|
endDate),
|
||||||
|
TEST_CHANNEL_NAMES);
|
||||||
|
LinkedHashSet<QueryField> queryFields = new LinkedHashSet<>();
|
||||||
|
LinkedHashSet<CellProcessor> cellProcessors = new LinkedHashSet<>();
|
||||||
|
queryFields.add(QueryField.channel);
|
||||||
|
cellProcessors.add(new NotNull());
|
||||||
|
queryFields.add(QueryField.pulseId);
|
||||||
|
cellProcessors.add(new NotNull());
|
||||||
|
queryFields.add(QueryField.iocMillis);
|
||||||
|
cellProcessors.add(new NotNull());
|
||||||
|
queryFields.add(QueryField.iocNanos);
|
||||||
|
cellProcessors.add(new NotNull());
|
||||||
|
queryFields.add(QueryField.globalMillis);
|
||||||
|
cellProcessors.add(new NotNull());
|
||||||
|
queryFields.add(QueryField.globalNanos);
|
||||||
|
cellProcessors.add(new NotNull());
|
||||||
|
queryFields.add(QueryField.shape);
|
||||||
|
cellProcessors.add(new NotNull());
|
||||||
|
queryFields.add(QueryField.eventCount);
|
||||||
|
cellProcessors.add(new NotNull());
|
||||||
|
queryFields.add(QueryField.value);
|
||||||
|
cellProcessors.add(new NotNull());
|
||||||
|
request.setFields(queryFields);
|
||||||
|
|
||||||
|
String content = mapper.writeValueAsString(request);
|
||||||
|
System.out.println(content);
|
||||||
|
|
||||||
|
MvcResult result = this.mockMvc
|
||||||
|
.perform(MockMvcRequestBuilders
|
||||||
|
.post(QueryRestController.QUERY)
|
||||||
|
.accept(CSVResponseStreamWriter.APPLICATION_CSV_VALUE)
|
||||||
|
.contentType(MediaType.APPLICATION_JSON)
|
||||||
|
.content(content))
|
||||||
|
.andDo(MockMvcResultHandlers.print())
|
||||||
|
.andExpect(MockMvcResultMatchers.status().isOk())
|
||||||
|
.andReturn();
|
||||||
|
|
||||||
|
CellProcessor[] processors = cellProcessors.toArray(new CellProcessor[cellProcessors.size()]);
|
||||||
|
|
||||||
|
String response = result.getResponse().getContentAsString();
|
||||||
|
ICsvMapReader mapReader =
|
||||||
|
new CsvMapReader(new StringReader(response), CsvPreference.EXCEL_NORTH_EUROPE_PREFERENCE);
|
||||||
|
try {
|
||||||
|
final String[] header = mapReader.getHeader(true);
|
||||||
|
Map<String, Object> customerMap;
|
||||||
|
long resultsPerChannel = 2;
|
||||||
|
long pulse = 0;
|
||||||
|
long channelCount = 1;
|
||||||
|
while ((customerMap = mapReader.read(header, processors)) != null) {
|
||||||
|
assertEquals(TEST_CHANNEL + channelCount, customerMap.get(QueryField.channel.name()));
|
||||||
|
assertEquals("" + pulse, customerMap.get(QueryField.pulseId.name()));
|
||||||
|
assertEquals("" + pulse * 10, customerMap.get(QueryField.iocMillis.name()));
|
||||||
|
assertEquals("0", customerMap.get(QueryField.iocNanos.name()));
|
||||||
|
assertEquals("" + pulse * 10, customerMap.get(QueryField.globalMillis.name()));
|
||||||
|
assertEquals("0", customerMap.get(QueryField.globalNanos.name()));
|
||||||
|
assertEquals("[1]", customerMap.get(QueryField.shape.name()));
|
||||||
|
assertEquals("1", customerMap.get(QueryField.eventCount.name()));
|
||||||
|
assertEquals("" + pulse, customerMap.get(QueryField.value.name()));
|
||||||
|
|
||||||
|
pulse = ++pulse % resultsPerChannel;
|
||||||
|
if (pulse == 0) {
|
||||||
|
++channelCount;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
mapReader.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testExtremaAggregation() throws Exception {
|
||||||
|
DAQQuery request = new DAQQuery(
|
||||||
|
new RequestRangePulseId(
|
||||||
|
0,
|
||||||
|
1),
|
||||||
|
false,
|
||||||
|
Ordering.asc,
|
||||||
|
AggregationType.extrema,
|
||||||
|
TEST_CHANNEL_NAMES[0]);
|
||||||
|
|
||||||
|
String content = mapper.writeValueAsString(request);
|
||||||
|
|
||||||
|
try {
|
||||||
|
this.mockMvc
|
||||||
|
.perform(MockMvcRequestBuilders
|
||||||
|
.post(QueryRestController.QUERY)
|
||||||
|
.accept(CSVResponseStreamWriter.APPLICATION_CSV_VALUE)
|
||||||
|
.contentType(MediaType.APPLICATION_JSON)
|
||||||
|
.content(content))
|
||||||
|
.andDo(MockMvcResultHandlers.print());
|
||||||
|
// .andExpect(MockMvcResultMatchers.model().attribute(
|
||||||
|
// "exception",
|
||||||
|
// Matchers.isA(IllegalArgumentException.class)));
|
||||||
|
// should throw an exception
|
||||||
|
assertTrue(false);
|
||||||
|
} catch (Throwable e) {
|
||||||
|
assertTrue(true);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testIndexAggregation() throws Exception {
|
||||||
|
DAQQuery request = new DAQQuery(
|
||||||
|
new RequestRangePulseId(
|
||||||
|
0,
|
||||||
|
1),
|
||||||
|
false,
|
||||||
|
Ordering.asc,
|
||||||
|
AggregationType.index,
|
||||||
|
TEST_CHANNEL_NAMES[0]);
|
||||||
|
|
||||||
|
String content = mapper.writeValueAsString(request);
|
||||||
|
|
||||||
|
try {
|
||||||
|
this.mockMvc
|
||||||
|
.perform(MockMvcRequestBuilders
|
||||||
|
.post(QueryRestController.QUERY)
|
||||||
|
.accept(CSVResponseStreamWriter.APPLICATION_CSV_VALUE)
|
||||||
|
.contentType(MediaType.APPLICATION_JSON)
|
||||||
|
.content(content))
|
||||||
|
.andDo(MockMvcResultHandlers.print());
|
||||||
|
// .andExpect(MockMvcResultMatchers.model().attribute(
|
||||||
|
// "exception",
|
||||||
|
// Matchers.isA(IllegalArgumentException.class)));
|
||||||
|
// should throw an exception
|
||||||
|
assertTrue(false);
|
||||||
|
} catch (Throwable e) {
|
||||||
|
assertTrue(true);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDateRangeQueryNrOfBinsAggregate() throws Exception {
|
||||||
|
long startTime = 0;
|
||||||
|
long endTime = 99;
|
||||||
|
String startDate = RequestRangeDate.format(startTime);
|
||||||
|
String endDate = RequestRangeDate.format(endTime);
|
||||||
|
DAQQuery request = new DAQQuery(
|
||||||
|
new RequestRangeDate(
|
||||||
|
startDate,
|
||||||
|
endDate),
|
||||||
|
TEST_CHANNEL_01);
|
||||||
|
request.setNrOfBins(2);
|
||||||
|
|
||||||
|
LinkedHashSet<QueryField> queryFields = new LinkedHashSet<>();
|
||||||
|
LinkedHashSet<CellProcessor> cellProcessors = new LinkedHashSet<>();
|
||||||
|
queryFields.add(QueryField.channel);
|
||||||
|
cellProcessors.add(new NotNull());
|
||||||
|
queryFields.add(QueryField.pulseId);
|
||||||
|
cellProcessors.add(new NotNull());
|
||||||
|
queryFields.add(QueryField.iocMillis);
|
||||||
|
cellProcessors.add(new NotNull());
|
||||||
|
queryFields.add(QueryField.iocNanos);
|
||||||
|
cellProcessors.add(new NotNull());
|
||||||
|
queryFields.add(QueryField.globalMillis);
|
||||||
|
cellProcessors.add(new NotNull());
|
||||||
|
queryFields.add(QueryField.globalNanos);
|
||||||
|
cellProcessors.add(new NotNull());
|
||||||
|
queryFields.add(QueryField.shape);
|
||||||
|
cellProcessors.add(new NotNull());
|
||||||
|
queryFields.add(QueryField.eventCount);
|
||||||
|
cellProcessors.add(new NotNull());
|
||||||
|
request.setFields(queryFields);
|
||||||
|
|
||||||
|
List<Aggregation> aggregations = new ArrayList<>();
|
||||||
|
aggregations.add(Aggregation.min);
|
||||||
|
cellProcessors.add(new NotNull());
|
||||||
|
aggregations.add(Aggregation.mean);
|
||||||
|
cellProcessors.add(new NotNull());
|
||||||
|
aggregations.add(Aggregation.max);
|
||||||
|
cellProcessors.add(new NotNull());
|
||||||
|
request.setAggregations(aggregations);
|
||||||
|
|
||||||
|
|
||||||
|
String content = mapper.writeValueAsString(request);
|
||||||
|
System.out.println(content);
|
||||||
|
|
||||||
|
MvcResult result = this.mockMvc
|
||||||
|
.perform(MockMvcRequestBuilders
|
||||||
|
.post(QueryRestController.QUERY)
|
||||||
|
.accept(CSVResponseStreamWriter.APPLICATION_CSV_VALUE)
|
||||||
|
.contentType(MediaType.APPLICATION_JSON)
|
||||||
|
.content(content))
|
||||||
|
.andDo(MockMvcResultHandlers.print())
|
||||||
|
.andExpect(MockMvcResultMatchers.status().isOk())
|
||||||
|
.andReturn();
|
||||||
|
|
||||||
|
CellProcessor[] processors = cellProcessors.toArray(new CellProcessor[cellProcessors.size()]);
|
||||||
|
|
||||||
|
String response = result.getResponse().getContentAsString();
|
||||||
|
ICsvMapReader mapReader =
|
||||||
|
new CsvMapReader(new StringReader(response), CsvPreference.EXCEL_NORTH_EUROPE_PREFERENCE);
|
||||||
|
try {
|
||||||
|
final String[] header = mapReader.getHeader(true);
|
||||||
|
Map<String, Object> customerMap;
|
||||||
|
long pulse = 0;
|
||||||
|
while ((customerMap = mapReader.read(header, processors)) != null) {
|
||||||
|
assertEquals(TEST_CHANNEL_01, customerMap.get(QueryField.channel.name()));
|
||||||
|
assertEquals("" + pulse, customerMap.get(QueryField.pulseId.name()));
|
||||||
|
assertEquals("" + pulse * 10, customerMap.get(QueryField.iocMillis.name()));
|
||||||
|
assertEquals("0", customerMap.get(QueryField.iocNanos.name()));
|
||||||
|
assertEquals("" + pulse * 10, customerMap.get(QueryField.globalMillis.name()));
|
||||||
|
assertEquals("0", customerMap.get(QueryField.globalNanos.name()));
|
||||||
|
assertEquals("[1]", customerMap.get(QueryField.shape.name()));
|
||||||
|
assertEquals("5", customerMap.get(QueryField.eventCount.name()));
|
||||||
|
assertEquals("" + pulse + ".0", customerMap.get(Aggregation.min.name()));
|
||||||
|
assertEquals("" + (pulse + 2) + ".0", customerMap.get(Aggregation.mean.name()));
|
||||||
|
assertEquals("" + (pulse + 4) + ".0", customerMap.get(Aggregation.max.name()));
|
||||||
|
|
||||||
|
pulse += 5;
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
mapReader.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDateRangeQueryBinSizeAggregate() throws Exception {
|
||||||
|
long startTime = 0;
|
||||||
|
long endTime = 999;
|
||||||
|
String startDate = RequestRangeDate.format(startTime);
|
||||||
|
String endDate = RequestRangeDate.format(endTime);
|
||||||
|
DAQQuery request = new DAQQuery(
|
||||||
|
new RequestRangeDate(
|
||||||
|
startDate,
|
||||||
|
endDate),
|
||||||
|
TEST_CHANNEL_01);
|
||||||
|
request.setBinSize(100);
|
||||||
|
|
||||||
|
LinkedHashSet<QueryField> queryFields = new LinkedHashSet<>();
|
||||||
|
LinkedHashSet<CellProcessor> cellProcessors = new LinkedHashSet<>();
|
||||||
|
queryFields.add(QueryField.channel);
|
||||||
|
cellProcessors.add(new NotNull());
|
||||||
|
queryFields.add(QueryField.pulseId);
|
||||||
|
cellProcessors.add(new NotNull());
|
||||||
|
queryFields.add(QueryField.iocMillis);
|
||||||
|
cellProcessors.add(new NotNull());
|
||||||
|
queryFields.add(QueryField.iocNanos);
|
||||||
|
cellProcessors.add(new NotNull());
|
||||||
|
queryFields.add(QueryField.globalMillis);
|
||||||
|
cellProcessors.add(new NotNull());
|
||||||
|
queryFields.add(QueryField.globalNanos);
|
||||||
|
cellProcessors.add(new NotNull());
|
||||||
|
queryFields.add(QueryField.shape);
|
||||||
|
cellProcessors.add(new NotNull());
|
||||||
|
queryFields.add(QueryField.eventCount);
|
||||||
|
cellProcessors.add(new NotNull());
|
||||||
|
request.setFields(queryFields);
|
||||||
|
|
||||||
|
List<Aggregation> aggregations = new ArrayList<>();
|
||||||
|
aggregations.add(Aggregation.min);
|
||||||
|
cellProcessors.add(new NotNull());
|
||||||
|
aggregations.add(Aggregation.mean);
|
||||||
|
cellProcessors.add(new NotNull());
|
||||||
|
aggregations.add(Aggregation.max);
|
||||||
|
cellProcessors.add(new NotNull());
|
||||||
|
request.setAggregations(aggregations);
|
||||||
|
|
||||||
|
|
||||||
|
String content = mapper.writeValueAsString(request);
|
||||||
|
System.out.println(content);
|
||||||
|
|
||||||
|
MvcResult result = this.mockMvc
|
||||||
|
.perform(MockMvcRequestBuilders
|
||||||
|
.post(QueryRestController.QUERY)
|
||||||
|
.accept(CSVResponseStreamWriter.APPLICATION_CSV_VALUE)
|
||||||
|
.contentType(MediaType.APPLICATION_JSON)
|
||||||
|
.content(content))
|
||||||
|
.andDo(MockMvcResultHandlers.print())
|
||||||
|
.andExpect(MockMvcResultMatchers.status().isOk())
|
||||||
|
.andReturn();
|
||||||
|
|
||||||
|
CellProcessor[] processors = cellProcessors.toArray(new CellProcessor[cellProcessors.size()]);
|
||||||
|
|
||||||
|
String response = result.getResponse().getContentAsString();
|
||||||
|
ICsvMapReader mapReader =
|
||||||
|
new CsvMapReader(new StringReader(response), CsvPreference.EXCEL_NORTH_EUROPE_PREFERENCE);
|
||||||
|
try {
|
||||||
|
final String[] header = mapReader.getHeader(true);
|
||||||
|
Map<String, Object> customerMap;
|
||||||
|
long pulse = 0;
|
||||||
|
while ((customerMap = mapReader.read(header, processors)) != null) {
|
||||||
|
assertEquals(TEST_CHANNEL_01, customerMap.get(QueryField.channel.name()));
|
||||||
|
assertEquals("" + pulse, customerMap.get(QueryField.pulseId.name()));
|
||||||
|
assertEquals("" + pulse * 10, customerMap.get(QueryField.iocMillis.name()));
|
||||||
|
assertEquals("0", customerMap.get(QueryField.iocNanos.name()));
|
||||||
|
assertEquals("" + pulse * 10, customerMap.get(QueryField.globalMillis.name()));
|
||||||
|
assertEquals("0", customerMap.get(QueryField.globalNanos.name()));
|
||||||
|
assertEquals("[1]", customerMap.get(QueryField.shape.name()));
|
||||||
|
assertEquals("10", customerMap.get(QueryField.eventCount.name()));
|
||||||
|
assertEquals("" + pulse + ".0", customerMap.get(Aggregation.min.name()));
|
||||||
|
assertEquals("" + (pulse + 4.5), customerMap.get(Aggregation.mean.name()));
|
||||||
|
assertEquals("" + (pulse + 9) + ".0", customerMap.get(Aggregation.max.name()));
|
||||||
|
|
||||||
|
pulse += 10;
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
mapReader.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -25,7 +25,7 @@ import ch.psi.daq.test.queryrest.AbstractDaqRestTest;
|
|||||||
/**
|
/**
|
||||||
* Tests the {@link DaqController} implementation.
|
* Tests the {@link DaqController} implementation.
|
||||||
*/
|
*/
|
||||||
public class QueryRestControllerTest extends AbstractDaqRestTest {
|
public class QueryRestControllerJsonTest extends AbstractDaqRestTest {
|
||||||
|
|
||||||
@Resource
|
@Resource
|
||||||
private CassandraTestAdmin cassandraTestAdmin;
|
private CassandraTestAdmin cassandraTestAdmin;
|
||||||
@ -180,6 +180,7 @@ public class QueryRestControllerTest extends AbstractDaqRestTest {
|
|||||||
this.mockMvc
|
this.mockMvc
|
||||||
.perform(MockMvcRequestBuilders
|
.perform(MockMvcRequestBuilders
|
||||||
.post(QueryRestController.QUERY)
|
.post(QueryRestController.QUERY)
|
||||||
|
.accept(MediaType.APPLICATION_JSON)
|
||||||
.contentType(MediaType.APPLICATION_JSON)
|
.contentType(MediaType.APPLICATION_JSON)
|
||||||
.content(content))
|
.content(content))
|
||||||
|
|
||||||
@ -213,6 +214,7 @@ public class QueryRestControllerTest extends AbstractDaqRestTest {
|
|||||||
|
|
||||||
this.mockMvc.perform(MockMvcRequestBuilders
|
this.mockMvc.perform(MockMvcRequestBuilders
|
||||||
.post(QueryRestController.QUERY)
|
.post(QueryRestController.QUERY)
|
||||||
|
.accept(MediaType.APPLICATION_JSON)
|
||||||
.contentType(MediaType.APPLICATION_JSON)
|
.contentType(MediaType.APPLICATION_JSON)
|
||||||
.content(content))
|
.content(content))
|
||||||
|
|
||||||
@ -252,6 +254,7 @@ public class QueryRestControllerTest extends AbstractDaqRestTest {
|
|||||||
.perform(
|
.perform(
|
||||||
MockMvcRequestBuilders
|
MockMvcRequestBuilders
|
||||||
.post(QueryRestController.QUERY)
|
.post(QueryRestController.QUERY)
|
||||||
|
.accept(MediaType.APPLICATION_JSON)
|
||||||
.contentType(MediaType.APPLICATION_JSON)
|
.contentType(MediaType.APPLICATION_JSON)
|
||||||
.content(content)
|
.content(content)
|
||||||
)
|
)
|
||||||
@ -289,6 +292,7 @@ public class QueryRestControllerTest extends AbstractDaqRestTest {
|
|||||||
|
|
||||||
this.mockMvc
|
this.mockMvc
|
||||||
.perform(MockMvcRequestBuilders.post(QueryRestController.QUERY)
|
.perform(MockMvcRequestBuilders.post(QueryRestController.QUERY)
|
||||||
|
.accept(MediaType.APPLICATION_JSON)
|
||||||
.contentType(MediaType.APPLICATION_JSON)
|
.contentType(MediaType.APPLICATION_JSON)
|
||||||
.content(content))
|
.content(content))
|
||||||
|
|
||||||
@ -335,6 +339,7 @@ public class QueryRestControllerTest extends AbstractDaqRestTest {
|
|||||||
.perform(
|
.perform(
|
||||||
MockMvcRequestBuilders
|
MockMvcRequestBuilders
|
||||||
.post(QueryRestController.QUERY)
|
.post(QueryRestController.QUERY)
|
||||||
|
.accept(MediaType.APPLICATION_JSON)
|
||||||
.contentType(MediaType.APPLICATION_JSON)
|
.contentType(MediaType.APPLICATION_JSON)
|
||||||
.content(content)
|
.content(content)
|
||||||
)
|
)
|
||||||
@ -372,6 +377,7 @@ public class QueryRestControllerTest extends AbstractDaqRestTest {
|
|||||||
.perform(
|
.perform(
|
||||||
MockMvcRequestBuilders
|
MockMvcRequestBuilders
|
||||||
.post(QueryRestController.QUERY)
|
.post(QueryRestController.QUERY)
|
||||||
|
.accept(MediaType.APPLICATION_JSON)
|
||||||
.contentType(MediaType.APPLICATION_JSON)
|
.contentType(MediaType.APPLICATION_JSON)
|
||||||
.content(content)
|
.content(content)
|
||||||
)
|
)
|
@ -4,11 +4,15 @@
|
|||||||
package ch.psi.daq.test.queryrest.query;
|
package ch.psi.daq.test.queryrest.query;
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Random;
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.CompletableFuture;
|
||||||
import java.util.regex.Pattern;
|
import java.util.regex.Pattern;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
import java.util.stream.LongStream;
|
||||||
import java.util.stream.Stream;
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
|
import com.google.common.collect.Lists;
|
||||||
|
|
||||||
import ch.psi.daq.cassandra.reader.CassandraReader;
|
import ch.psi.daq.cassandra.reader.CassandraReader;
|
||||||
import ch.psi.daq.cassandra.reader.query.PulseIdRangeQuery;
|
import ch.psi.daq.cassandra.reader.query.PulseIdRangeQuery;
|
||||||
import ch.psi.daq.cassandra.reader.query.TimeRangeQuery;
|
import ch.psi.daq.cassandra.reader.query.TimeRangeQuery;
|
||||||
@ -23,8 +27,6 @@ import ch.psi.daq.domain.cassandra.querying.ChannelEventQueryInfo;
|
|||||||
import ch.psi.daq.domain.cassandra.querying.EventQuery;
|
import ch.psi.daq.domain.cassandra.querying.EventQuery;
|
||||||
import ch.psi.data.converters.ConverterProvider;
|
import ch.psi.data.converters.ConverterProvider;
|
||||||
|
|
||||||
import com.google.common.collect.Lists;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author zellweger_c
|
* @author zellweger_c
|
||||||
*
|
*
|
||||||
@ -34,7 +36,7 @@ public class DummyCassandraReader implements CassandraReader {
|
|||||||
private static final int KEYSPACE = 1;
|
private static final int KEYSPACE = 1;
|
||||||
private CassandraDataGen dataGen;
|
private CassandraDataGen dataGen;
|
||||||
private String[] channels;
|
private String[] channels;
|
||||||
|
private Random random = new Random(0);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*
|
*
|
||||||
@ -145,27 +147,56 @@ public class DummyCassandraReader implements CassandraReader {
|
|||||||
|
|
||||||
|
|
||||||
private Stream<? extends DataEvent> getDummyEventStream(String channel, long startIndex, long endIndex) {
|
private Stream<? extends DataEvent> getDummyEventStream(String channel, long startIndex, long endIndex) {
|
||||||
|
String channelLower = channel.toLowerCase();
|
||||||
|
|
||||||
return dataGen.generateData(startIndex, (endIndex - startIndex + 1),
|
Stream<? extends DataEvent> eventStream = LongStream.rangeClosed(startIndex, endIndex).mapToObj(i -> {
|
||||||
i -> i * 10,
|
if (channelLower.contains("waveform")) {
|
||||||
i -> 0,
|
long[] value = random.longs(2048).toArray();
|
||||||
i -> i,
|
value[0] = i;
|
||||||
i -> i * 10,
|
return new ChannelEvent(
|
||||||
i -> 0,
|
channel,
|
||||||
i -> Long.valueOf(i),
|
i * 10,
|
||||||
channel).stream();
|
0,
|
||||||
|
i,
|
||||||
|
i * 10,
|
||||||
|
0,
|
||||||
|
value
|
||||||
|
);
|
||||||
|
|
||||||
|
} else if (channelLower.contains("image")) {
|
||||||
|
int x = 640;
|
||||||
|
int y = 480;
|
||||||
|
int[] shape = new int[] {x, y};
|
||||||
|
long[] value = random.longs(x * y).toArray();
|
||||||
|
value[0] = i;
|
||||||
|
return new ChannelEvent(
|
||||||
|
channel,
|
||||||
|
i * 10,
|
||||||
|
0,
|
||||||
|
i,
|
||||||
|
i * 10,
|
||||||
|
0,
|
||||||
|
value,
|
||||||
|
shape
|
||||||
|
);
|
||||||
|
} else {
|
||||||
|
return new ChannelEvent(
|
||||||
|
channel,
|
||||||
|
i * 10,
|
||||||
|
0,
|
||||||
|
i,
|
||||||
|
i * 10,
|
||||||
|
0,
|
||||||
|
i
|
||||||
|
);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
return eventStream;
|
||||||
}
|
}
|
||||||
|
|
||||||
private List<? extends DataEvent> getDummyEvents(String channel, long startIndex, long endIndex) {
|
private List<? extends DataEvent> getDummyEvents(String channel, long startIndex, long endIndex) {
|
||||||
|
return getDummyEventStream(channel, startIndex, endIndex).collect(Collectors.toList());
|
||||||
return dataGen.generateData(startIndex, (endIndex - startIndex + 1),
|
|
||||||
i -> i * 10,
|
|
||||||
i -> 0,
|
|
||||||
i -> i,
|
|
||||||
i -> i * 10,
|
|
||||||
i -> 0,
|
|
||||||
i -> Long.valueOf(i),
|
|
||||||
channel);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
Reference in New Issue
Block a user