ATEST-302
This commit is contained in:
72
Readme.md
72
Readme.md
@ -60,21 +60,47 @@ POST http://<host>:<port>/channels
|
||||
#### Data
|
||||
|
||||
```json
|
||||
{"regex": "TRFCA|TRFCB", "dbMode": "databuffer"}
|
||||
{"regex": "TRFCA|TRFCB","backends": ["databuffer"],"ordering":"asc","reload":true}
|
||||
```
|
||||
|
||||
##### Explanation
|
||||
|
||||
- **regex**: Reqular expression used to filter channel names (default: no filtering). Filtering is done using JAVA's [Pattern](https://docs.oracle.com/javase/8/docs/api/java/util/regex/Pattern.html), more precisely [Matcher.find()](https://docs.oracle.com/javase/8/docs/api/java/util/regex/Matcher.html#find--)).
|
||||
- **dbMode**: Defines the database to access (values: **databuffer**|archiverappliance)
|
||||
|
||||
- **regex**: Reqular expression used to filter channel names. In case this value is undefined, no filter will be applied. Filtering is done using JAVA's [Pattern](https://docs.oracle.com/javase/8/docs/api/java/util/regex/Pattern.html), more precisely [Matcher.find()](https://docs.oracle.com/javase/8/docs/api/java/util/regex/Matcher.html#find--)).
|
||||
- **backends**: Array of backends to access (values: databuffer|archiverappliance|filestorage). In case this value is undefined, all backends will be queried for their channels.
|
||||
- **ordering**: The ordering of the channel names (values: **none**|asc|desc).
|
||||
- **reload**: Forces the server to reload cached channel names (values: **false**|true).
|
||||
|
||||
### Example
|
||||
|
||||
#### Command
|
||||
|
||||
```bash
|
||||
curl -H "Content-Type: application/json" -X POST -d '{"regex": "TRFCA|TRFCB"}' http://data-api.psi.ch/sf/channels
|
||||
```
|
||||
|
||||
#### Response
|
||||
|
||||
```json
|
||||
[
|
||||
{
|
||||
"backend":"databuffer",
|
||||
"channels":[
|
||||
"Channel_01",
|
||||
"Channel_02",
|
||||
"Channel_03"
|
||||
]
|
||||
},
|
||||
{
|
||||
"backend":"archiverappliance",
|
||||
"channels":[
|
||||
"Channel_01",
|
||||
"Channel_04",
|
||||
"Channel_05"
|
||||
]
|
||||
}
|
||||
]
|
||||
```
|
||||
|
||||
<a name="query_range"/>
|
||||
|
||||
## Query Range
|
||||
@ -127,6 +153,36 @@ Queries are applied to a range. The following types of ranges ranges are support
|
||||
- **endMillis**: The end time of the range.
|
||||
- **[endNanos]**: The optional nanosecond offset.
|
||||
|
||||
<a name="query_channel_names"/>
|
||||
|
||||
## Query Channel Names
|
||||
|
||||
The simplest way to define channels is to use an array of channel name Strings.
|
||||
|
||||
```json
|
||||
"channels":[
|
||||
"Channel_02",
|
||||
"Channel_04"
|
||||
]
|
||||
```
|
||||
|
||||
The query interface will automatically select the backend which contains the channel (e.g., *databuffer* for *Channel_02* and *archiverappliance* for *Channel_04*. In case name clashes exist, the query interface will use following order of priority: *databuffer*, *archiverappliance* and *filestorage*.
|
||||
|
||||
It is also possible to explicitly define the backend to overcome name clashes.
|
||||
|
||||
```json
|
||||
"channels":[
|
||||
{
|
||||
"name":"Channel_01",
|
||||
"backend":"archiverappliance"
|
||||
},
|
||||
{
|
||||
"name":"Channel_01",
|
||||
"backend":"databuffer"
|
||||
}
|
||||
]
|
||||
```
|
||||
|
||||
|
||||
<a name="query_data"/>
|
||||
|
||||
@ -144,7 +200,7 @@ A request is performed by sending a valid JSON object in the HTTP request body.
|
||||
|
||||
The following attributes can be specified:
|
||||
|
||||
- **channels**: Array of channel names to be queried.
|
||||
- **channels**: Array of channels to be queried (see [Query Range](Readme.md#query_channel_names)).
|
||||
- **range**: The range of the query (see [Query Range](Readme.md#query_range)).
|
||||
- **ordering**: The ordering of the data (see [here](https://git.psi.ch/sf_daq/ch.psi.daq.common/blob/master/src/main/java/ch/psi/daq/common/ordering/Ordering.java) for possible values).
|
||||
- **fields**: The requested fields (see [here](https://git.psi.ch/sf_daq/ch.psi.daq.query/blob/master/src/main/java/ch/psi/daq/query/model/QueryField.java) for possible values).
|
||||
@ -152,8 +208,6 @@ The following attributes can be specified:
|
||||
- **binSize**: Activates data binning. Specifies the number of pulses per bin for pulse-range queries or the number of milliseconds per bin for time-range queries (using number of pulses and number of milliseconds makes this binning strategy consistent between channel with different update frequencies).
|
||||
- **aggregations**: Activates data aggregation. Array of requested aggregations (see [here](https://git.psi.ch/sf_daq/ch.psi.daq.query/blob/master/src/main/java/ch/psi/daq/query/model/Aggregation.java) for possible values). These values will be added to the *data* array response.
|
||||
- **aggregationType**: Specifies the type of aggregation (see [here](https://git.psi.ch/sf_daq/ch.psi.daq.query/blob/master/src/main/java/ch/psi/daq/query/model/AggregationType.java)). The default type is *value* aggregation (e.g., sum([1,2,3])=6). Alternatively, it is possible to define *index* aggregation for multiple arrays in combination with binning (e.g., sum([1,2,3], [3,2,1]) = [4,4,4]).
|
||||
- **aggregateChannels**: Specifies whether the data of the requested channels should be combined together using the defined aggregation (values: true|**false**)
|
||||
- **dbMode**: Defines the database to access (values: **databuffer**|archiverappliance)
|
||||
- **compression**: Defines the compression algorithm to use, default value is **none**, see all values [here](https://github.psi.ch/sf_daq/ch.psi.daq.query/blob/master/src/main/java/ch/psi/daq/query/model/Compression.java))
|
||||
- **responseFormat**: Specifies the format the response of the requested data is in, either in JSON or CSV format, default value **JSON**, see all values [here](https://github.psi.ch/sf_daq/ch.psi.daq.query/blob/master/src/main/java/ch/psi/daq/query/model/ResponseFormat.java))
|
||||
|
||||
@ -165,7 +219,7 @@ If compression is enabled, we have to tell `curl` that the data is compressed so
|
||||
|
||||
### `responseFormat`: data is in JSON by default
|
||||
|
||||
Responses can be formatted as CSV or JSON using the `responseFormat` field. The returned data is JSON-formatted per default.
|
||||
Responses can be formatted as CSV or JSON using the `responseFormat` field. The returned data is JSON-formatted by default.
|
||||
|
||||
CSV export does not support `index` and `extrema` aggregations.
|
||||
|
||||
@ -319,7 +373,7 @@ See JSON representation of the data above.
|
||||
|
||||
```json
|
||||
{
|
||||
"dbMode":"archiverappliance",
|
||||
"backend":"archiverappliance",
|
||||
"range":{
|
||||
"startMillis":0,
|
||||
"startNanos":0,
|
||||
|
@ -27,9 +27,7 @@ import org.springframework.web.servlet.config.annotation.WebMvcConfigurerAdapter
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonInclude.Include;
|
||||
import com.fasterxml.jackson.core.JsonFactory;
|
||||
import com.fasterxml.jackson.core.Version;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.databind.module.SimpleModule;
|
||||
|
||||
import ch.psi.daq.common.statistic.StorelessStatistics;
|
||||
import ch.psi.daq.domain.DataEvent;
|
||||
@ -44,8 +42,6 @@ import ch.psi.daq.queryrest.filter.CorsFilter;
|
||||
import ch.psi.daq.queryrest.model.PropertyFilterMixin;
|
||||
import ch.psi.daq.queryrest.response.csv.CSVResponseStreamWriter;
|
||||
import ch.psi.daq.queryrest.response.json.JSONResponseStreamWriter;
|
||||
import ch.psi.daq.queryrest.response.json.JsonByteArraySerializer;
|
||||
import ch.psi.daq.queryrest.response.json.JsonStreamSerializer;
|
||||
|
||||
@Configuration
|
||||
@PropertySource(value = {"classpath:queryrest.properties"})
|
||||
@ -88,16 +84,6 @@ public class QueryRestConfig extends WebMvcConfigurerAdapter {
|
||||
objectMapper.addMixIn(DataEvent.class, PropertyFilterMixin.class);
|
||||
objectMapper.addMixIn(StorelessStatistics.class, PropertyFilterMixin.class);
|
||||
objectMapper.addMixIn(EnumMap.class, PropertyFilterMixin.class);
|
||||
|
||||
// defines how to writer inner Streams (i.e. Stream<Entry<String, Stream<?>>> toSerialize)
|
||||
SimpleModule module = new SimpleModule("Streams API", Version.unknownVersion());
|
||||
module.addSerializer(new JsonStreamSerializer());
|
||||
objectMapper.registerModule(module);
|
||||
|
||||
// by default, byte[] are written as String
|
||||
module = new SimpleModule("byte[]", Version.unknownVersion());
|
||||
module.addSerializer(new JsonByteArraySerializer());
|
||||
objectMapper.registerModule(module);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1,17 +1,24 @@
|
||||
package ch.psi.daq.queryrest.controller;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
import javax.annotation.PreDestroy;
|
||||
import javax.annotation.Resource;
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
import javax.validation.Valid;
|
||||
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
import org.apache.commons.lang3.tuple.Triple;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.context.ApplicationContext;
|
||||
@ -27,28 +34,34 @@ import org.springframework.web.bind.annotation.RequestParam;
|
||||
import org.springframework.web.bind.annotation.ResponseBody;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
import ch.psi.daq.cassandra.config.CassandraConfig;
|
||||
import ch.psi.daq.cassandra.request.validate.RequestProviderValidator;
|
||||
import ch.psi.daq.common.concurrent.singleton.Deferred;
|
||||
import ch.psi.daq.common.json.deserialize.AttributeBasedDeserializer;
|
||||
import ch.psi.daq.common.ordering.Ordering;
|
||||
import ch.psi.daq.domain.DataEvent;
|
||||
import ch.psi.daq.domain.reader.Backend;
|
||||
import ch.psi.daq.query.analyzer.QueryAnalyzer;
|
||||
import ch.psi.daq.query.config.QueryConfig;
|
||||
import ch.psi.daq.query.model.Aggregation;
|
||||
import ch.psi.daq.query.model.AggregationType;
|
||||
import ch.psi.daq.query.model.DBMode;
|
||||
import ch.psi.daq.query.model.Query;
|
||||
import ch.psi.daq.query.model.QueryField;
|
||||
import ch.psi.daq.query.model.ResponseFormat;
|
||||
import ch.psi.daq.query.model.impl.BackendQuery;
|
||||
import ch.psi.daq.query.model.impl.DAQQueries;
|
||||
import ch.psi.daq.query.model.impl.DAQQuery;
|
||||
import ch.psi.daq.query.model.impl.DAQQueryElement;
|
||||
import ch.psi.daq.query.processor.ChannelNameCache;
|
||||
import ch.psi.daq.query.processor.QueryProcessor;
|
||||
import ch.psi.daq.query.request.ChannelName;
|
||||
import ch.psi.daq.query.request.ChannelsRequest;
|
||||
import ch.psi.daq.query.request.ChannelsResponse;
|
||||
import ch.psi.daq.queryrest.response.csv.CSVResponseStreamWriter;
|
||||
import ch.psi.daq.queryrest.response.json.JSONResponseStreamWriter;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
@RestController
|
||||
public class QueryRestController {
|
||||
|
||||
@ -56,6 +69,7 @@ public class QueryRestController {
|
||||
|
||||
public static final String CHANNELS = "/channels";
|
||||
public static final String QUERY = "/query";
|
||||
public static final String QUERIES = "/queries";
|
||||
|
||||
@Resource
|
||||
private Validator queryValidator;
|
||||
@ -69,20 +83,44 @@ public class QueryRestController {
|
||||
|
||||
@Resource
|
||||
private ApplicationContext appContext;
|
||||
|
||||
|
||||
@Resource
|
||||
private ObjectMapper objectMapper;
|
||||
|
||||
// using Deferred ensures that data-api startup succeeds even if DataBuffer/ArchiverAppliance is
|
||||
// not reachable at startup
|
||||
private Deferred<QueryProcessor> cassandraQueryProcessor = new Deferred<QueryProcessor>(
|
||||
() -> appContext.getBean(QueryConfig.BEAN_NAME_CASSANDRA_QUERY_PROCESSOR, QueryProcessor.class));
|
||||
private Deferred<QueryProcessor> archiverApplianceQueryProcessor = new Deferred<QueryProcessor>(
|
||||
() -> appContext.getBean(QueryConfig.BEAN_NAME_ARCHIVER_APPLIANCE_QUERY_PROCESSOR, QueryProcessor.class));
|
||||
|
||||
@Resource
|
||||
private Function<Query, QueryAnalyzer> queryAnalizerFactory;
|
||||
|
||||
private Map<Backend, QueryProcessor> queryProcessors = new LinkedHashMap<>();
|
||||
private ChannelNameCache channelNameCache;
|
||||
|
||||
@PostConstruct
|
||||
public void afterPropertiesSet() {
|
||||
try {
|
||||
QueryProcessor queryProcessor =
|
||||
appContext.getBean(QueryConfig.BEAN_NAME_CASSANDRA_QUERY_PROCESSOR, QueryProcessor.class);
|
||||
queryProcessors.put(queryProcessor.getBackend(), queryProcessor);
|
||||
} catch (Exception e) {
|
||||
LOGGER.warn("Could not load query processor for cassandra.", e);
|
||||
}
|
||||
|
||||
try {
|
||||
QueryProcessor queryProcessor =
|
||||
appContext.getBean(QueryConfig.BEAN_NAME_ARCHIVER_APPLIANCE_QUERY_PROCESSOR, QueryProcessor.class);
|
||||
queryProcessors.put(queryProcessor.getBackend(), queryProcessor);
|
||||
} catch (Exception e) {
|
||||
LOGGER.warn("Could not load query processor for archiverappliance.", e);
|
||||
}
|
||||
|
||||
channelNameCache =
|
||||
new ChannelNameCache(queryProcessors, appContext.getBean(CassandraConfig.BEAN_NAME_READ_TIMEOUT,
|
||||
Integer.class).longValue());
|
||||
}
|
||||
|
||||
@PreDestroy
|
||||
public void destroy() {
|
||||
channelNameCache.destroy();
|
||||
}
|
||||
|
||||
@InitBinder
|
||||
protected void initBinder(WebDataBinder binder) {
|
||||
if (binder.getTarget() != null) {
|
||||
@ -97,21 +135,14 @@ public class QueryRestController {
|
||||
|
||||
@RequestMapping(value = CHANNELS, method = {RequestMethod.GET, RequestMethod.POST},
|
||||
produces = {MediaType.APPLICATION_JSON_VALUE})
|
||||
public @ResponseBody Collection<String> getChannels(@RequestBody(required = false) ChannelsRequest request)
|
||||
public @ResponseBody List<ChannelsResponse> getChannels(@RequestBody(required = false) ChannelsRequest request)
|
||||
throws Throwable {
|
||||
// in case not specified use default (e.g. GET)
|
||||
// in case not specified use defaults (e.g. GET)
|
||||
if (request == null) {
|
||||
request = new ChannelsRequest();
|
||||
}
|
||||
|
||||
try {
|
||||
// sorted collection
|
||||
Collection<String> allChannels = getQueryProcessor(request.getDbMode()).getChannels(request.getRegex());
|
||||
return allChannels;
|
||||
} catch (Throwable t) {
|
||||
LOGGER.error("Failed to query channel names.", t);
|
||||
throw t;
|
||||
}
|
||||
return channelNameCache.getChannels(request);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -123,46 +154,11 @@ public class QueryRestController {
|
||||
*/
|
||||
@RequestMapping(value = CHANNELS + "/{channelName}", method = {RequestMethod.GET},
|
||||
produces = {MediaType.APPLICATION_JSON_VALUE})
|
||||
public @ResponseBody Collection<String> getChannels(@PathVariable(value = "channelName") String channelName)
|
||||
public @ResponseBody Collection<ChannelsResponse> getChannels(@PathVariable(value = "channelName") String channelName)
|
||||
throws Throwable {
|
||||
return getChannels(new ChannelsRequest(channelName));
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Catch-all query method for getting data from the backend for both JSON and CSV requests.
|
||||
* <p>
|
||||
* The {@link DAQQuery} object will be a concrete subclass based on the combination of fields
|
||||
* defined in the user's query. The {@link AttributeBasedDeserializer} decides which class to
|
||||
* deserialize the information into and has been configured (see
|
||||
* QueryRestConfig#afterPropertiesSet) accordingly.
|
||||
*
|
||||
* @param query concrete implementation of {@link DAQQuery}
|
||||
* @param res the {@link HttpServletResponse} instance associated with this request
|
||||
* @throws IOException thrown if writing to the output stream fails
|
||||
*/
|
||||
@RequestMapping(
|
||||
value = QUERY,
|
||||
method = RequestMethod.POST,
|
||||
consumes = {MediaType.APPLICATION_JSON_VALUE})
|
||||
public void executeQuery(@RequestBody @Valid DAQQuery query, HttpServletResponse res) throws Exception {
|
||||
try {
|
||||
LOGGER.debug("Executing query '{}'", query.toString());
|
||||
|
||||
if (ResponseFormat.JSON.equals(query.getResponseFormat())) {
|
||||
// write the response back to the client using java 8 streams
|
||||
jsonResponseStreamWriter.respond(executeQuery(query), query, res);
|
||||
} else {
|
||||
// it's a CSV request
|
||||
executeQueryCsv(query, res);
|
||||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("Failed to execute query '{}'.", query, e);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Accepts the properties for the {@link DAQQuery} instance as stringified JSON query string.
|
||||
*
|
||||
@ -176,59 +172,154 @@ public class QueryRestController {
|
||||
value = QUERY,
|
||||
method = RequestMethod.GET)
|
||||
public void executeQueryBodyAsString(@RequestParam String jsonBody, HttpServletResponse res) throws Exception {
|
||||
|
||||
DAQQuery query = objectMapper.readValue(jsonBody, DAQQuery.class);
|
||||
executeQuery(query, res);
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Executes a single query.
|
||||
*
|
||||
* @param query the {@link DAQQuery}
|
||||
* @param res the {@link HttpServletResponse} instance associated with this request
|
||||
* @throws IOException thrown if writing to the output stream fails
|
||||
*/
|
||||
@RequestMapping(
|
||||
value = QUERY,
|
||||
method = RequestMethod.POST,
|
||||
consumes = {MediaType.APPLICATION_JSON_VALUE})
|
||||
public void executeQuery(@RequestBody @Valid DAQQuery query, HttpServletResponse res) throws Exception {
|
||||
executeQueries(new DAQQueries(query), res);
|
||||
}
|
||||
|
||||
/**
|
||||
* Accepts the properties for the {@link DAQQueries} instance as stringified JSON query string.
|
||||
*
|
||||
* @param jsonBody The {@link DAQQueries} properties sent as a JSON string, i.e. this is the
|
||||
* stringified body of the POST request method
|
||||
* @param res the current {@link HttpServletResponse} instance
|
||||
* @throws Exception if reading the JSON string fails or if the subsequent call to
|
||||
* {@link #executeQuery(DAQQueries, HttpServletResponse)} fails
|
||||
*/
|
||||
@RequestMapping(
|
||||
value = QUERIES,
|
||||
method = RequestMethod.GET)
|
||||
public void executeQueriesBodyAsString(@RequestParam String jsonBody, HttpServletResponse res) throws Exception {
|
||||
DAQQueries queries = objectMapper.readValue(jsonBody, DAQQueries.class);
|
||||
executeQueries(queries, res);
|
||||
}
|
||||
|
||||
/**
|
||||
* Catch-all query method for getting data from the backend for both JSON and CSV requests.
|
||||
* <p>
|
||||
* The {@link DAQQueries} object will contain the concrete subclass based on the combination of
|
||||
* fields defined in the user's query. The {@link AttributeBasedDeserializer} decides which class
|
||||
* to deserialize the information into and has been configured (see
|
||||
* QueryRestConfig#afterPropertiesSet) accordingly.
|
||||
*
|
||||
* @param queries the {@link DAQQueryElement}s
|
||||
* @param res the {@link HttpServletResponse} instance associated with this request
|
||||
* @throws IOException thrown if writing to the output stream fails
|
||||
*/
|
||||
@RequestMapping(
|
||||
value = QUERIES,
|
||||
method = RequestMethod.POST,
|
||||
consumes = {MediaType.APPLICATION_JSON_VALUE})
|
||||
public void executeQueries(@RequestBody @Valid DAQQueries queries, HttpServletResponse res) throws Exception {
|
||||
try {
|
||||
LOGGER.debug("Executing queries '{}'", queries);
|
||||
|
||||
if (ResponseFormat.JSON.equals(queries.getResponseFormat())) {
|
||||
// write the response back to the client using java 8 streams
|
||||
jsonResponseStreamWriter.respond(executeQueries(queries), queries, res);
|
||||
} else if (ResponseFormat.CSV.equals(queries.getResponseFormat())) {
|
||||
// it's a CSV request
|
||||
executeQueriesCsv(queries, res);
|
||||
} else {
|
||||
String message = String.format("Unsupported response format '%s'", queries.getResponseFormat().name());
|
||||
LOGGER.error(message);
|
||||
throw new RuntimeException(message);
|
||||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("Failed to execute query '{}'.", queries, e);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns data in CSV format to the client.
|
||||
*/
|
||||
private void executeQueryCsv(DAQQuery query, HttpServletResponse res) throws Exception {
|
||||
|
||||
if (!(query.getAggregationType() == null || AggregationType.value.equals(query.getAggregationType()))) {
|
||||
// We allow only no aggregation or value aggregation as
|
||||
// extrema: nested structure and not clear how to map it to one line
|
||||
// index: value is an array of Statistics whose size is not clear at initialization time
|
||||
String message = "CSV export does not support '" + query.getAggregationType() + "'";
|
||||
LOGGER.warn(message);
|
||||
throw new IllegalArgumentException(message);
|
||||
private void executeQueriesCsv(DAQQueries queries, HttpServletResponse res) throws Exception {
|
||||
|
||||
for (DAQQueryElement query : queries) {
|
||||
if (!(query.getAggregationType() == null || AggregationType.value.equals(query.getAggregationType()))) {
|
||||
// We allow only no aggregation or value aggregation as
|
||||
// extrema: nested structure and not clear how to map it to one line
|
||||
// index: value is an array of Statistics whose size is not clear at initialization time
|
||||
String message = "CSV export does not support '" + query.getAggregationType() + "'";
|
||||
LOGGER.warn(message);
|
||||
throw new IllegalArgumentException(message);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
try {
|
||||
LOGGER.debug("Executing query '{}'", query.toString());
|
||||
LOGGER.debug("Executing query '{}'", queries);
|
||||
// write the response back to the client using java 8 streams
|
||||
csvResponseStreamWriter.respond(executeQuery(query), query, res);
|
||||
csvResponseStreamWriter.respond(executeQueries(queries), queries, res);
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("Failed to execute query '{}'.", query, e);
|
||||
LOGGER.error("Failed to execute query '{}'.", queries, e);
|
||||
throw e;
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
public Stream<Entry<String, ?>> executeQuery(DAQQuery query) {
|
||||
QueryAnalyzer queryAnalizer = queryAnalizerFactory.apply(query);
|
||||
public List<Entry<DAQQueryElement, Stream<Triple<BackendQuery, ChannelName, ?>>>> executeQueries(DAQQueries queries) {
|
||||
// set backends if not defined yet
|
||||
channelNameCache.setBackends(queries);
|
||||
|
||||
// all the magic happens here
|
||||
Stream<Entry<String, Stream<? extends DataEvent>>> channelToDataEvents =
|
||||
getQueryProcessor(query.getDbMode()).process(queryAnalizer);
|
||||
List<Entry<DAQQueryElement, Stream<Triple<BackendQuery, ChannelName, ?>>>> results =
|
||||
new ArrayList<>(queries.getQueries().size());
|
||||
|
||||
// do post-process
|
||||
Stream<Entry<String, ?>> channelToData = queryAnalizer.postProcess(channelToDataEvents);
|
||||
for (DAQQueryElement queryElement : queries) {
|
||||
Stream<Triple<BackendQuery, ChannelName, ?>> resultStreams =
|
||||
queryElement
|
||||
.getBackendQueries()
|
||||
.stream()
|
||||
.filter(query -> {
|
||||
QueryProcessor processor = queryProcessors.get(query.getBackend());
|
||||
if (processor != null) {
|
||||
return true;
|
||||
} else {
|
||||
LOGGER.warn("There is no QueryProcessor available for '{}'", query.getBackend());
|
||||
return false;
|
||||
}
|
||||
})
|
||||
.flatMap(query -> {
|
||||
QueryProcessor processor = queryProcessors.get(query.getBackend());
|
||||
QueryAnalyzer queryAnalizer = queryAnalizerFactory.apply(query);
|
||||
|
||||
return channelToData;
|
||||
}
|
||||
// all the magic happens here
|
||||
Stream<Entry<String, Stream<? extends DataEvent>>> channelToDataEvents =
|
||||
processor.process(queryAnalizer);
|
||||
// do post-process
|
||||
Stream<Entry<String, ?>> channelToData = queryAnalizer.postProcess(channelToDataEvents);
|
||||
|
||||
private QueryProcessor getQueryProcessor(DBMode dbMode) {
|
||||
if (DBMode.databuffer.equals(dbMode)) {
|
||||
return cassandraQueryProcessor.get();
|
||||
} else if (DBMode.archiverappliance.equals(dbMode)) {
|
||||
return archiverApplianceQueryProcessor.get();
|
||||
} else {
|
||||
LOGGER.error("Unknown DBMode '{}'!", dbMode);
|
||||
throw new IllegalArgumentException(String.format("Unknown DBMode '%s'", dbMode));
|
||||
return channelToData.map(entry -> {
|
||||
return Triple.of(query, new ChannelName(entry.getKey(), query.getBackend()),
|
||||
entry.getValue());
|
||||
});
|
||||
});
|
||||
|
||||
// Now we have a stream that loads elements sequential BackendQuery by BackendQuery.
|
||||
// By materializing the outer Stream the elements of all BackendQuery are loaded async
|
||||
// (speeds things up but requires also more memory - i.e. it relies on Backends not loading
|
||||
// all elements into memory at once)
|
||||
resultStreams = resultStreams.collect(Collectors.toList()).stream();
|
||||
|
||||
results.add(Pair.of(queryElement, resultStreams));
|
||||
}
|
||||
|
||||
return results;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -237,12 +328,8 @@ public class QueryRestController {
|
||||
* @return list of {@link Ordering}s as String array
|
||||
*/
|
||||
@RequestMapping(value = "ordering", method = {RequestMethod.GET}, produces = {MediaType.APPLICATION_JSON_VALUE})
|
||||
public @ResponseBody List<String> getOrderingValues() {
|
||||
List<Ordering> orderings = Lists.newArrayList(Ordering.values());
|
||||
return orderings.stream()
|
||||
.map((Ordering ord) -> {
|
||||
return ord.toString();
|
||||
}).collect(Collectors.toList());
|
||||
public @ResponseBody List<Ordering> getOrderingValues() {
|
||||
return Lists.newArrayList(Ordering.values());
|
||||
}
|
||||
|
||||
/**
|
||||
@ -251,12 +338,8 @@ public class QueryRestController {
|
||||
* @return list of {@link QueryField}s as String array
|
||||
*/
|
||||
@RequestMapping(value = "queryfields", method = {RequestMethod.GET}, produces = {MediaType.APPLICATION_JSON_VALUE})
|
||||
public @ResponseBody List<String> getQueryFieldValues() {
|
||||
List<QueryField> orderings = Lists.newArrayList(QueryField.values());
|
||||
return orderings.stream()
|
||||
.map((QueryField qf) -> {
|
||||
return qf.toString();
|
||||
}).collect(Collectors.toList());
|
||||
public @ResponseBody List<QueryField> getQueryFieldValues() {
|
||||
return Lists.newArrayList(QueryField.values());
|
||||
}
|
||||
|
||||
/**
|
||||
@ -265,12 +348,8 @@ public class QueryRestController {
|
||||
* @return list of {@link Aggregation}s as String array
|
||||
*/
|
||||
@RequestMapping(value = "aggregations", method = {RequestMethod.GET}, produces = {MediaType.APPLICATION_JSON_VALUE})
|
||||
public @ResponseBody List<String> getAggregationsValues() {
|
||||
List<Aggregation> orderings = Lists.newArrayList(Aggregation.values());
|
||||
return orderings.stream()
|
||||
.map((Aggregation value) -> {
|
||||
return value.toString();
|
||||
}).collect(Collectors.toList());
|
||||
public @ResponseBody List<Aggregation> getAggregationsValues() {
|
||||
return Lists.newArrayList(Aggregation.values());
|
||||
}
|
||||
|
||||
/**
|
||||
@ -280,25 +359,17 @@ public class QueryRestController {
|
||||
*/
|
||||
@RequestMapping(value = "aggregationtypes", method = {RequestMethod.GET},
|
||||
produces = {MediaType.APPLICATION_JSON_VALUE})
|
||||
public @ResponseBody List<String> getAggregationTypesValues() {
|
||||
List<AggregationType> orderings = Lists.newArrayList(AggregationType.values());
|
||||
return orderings.stream()
|
||||
.map((AggregationType value) -> {
|
||||
return value.toString();
|
||||
}).collect(Collectors.toList());
|
||||
public @ResponseBody List<AggregationType> getAggregationTypesValues() {
|
||||
return Lists.newArrayList(AggregationType.values());
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the current list of {@link DBMode} values.
|
||||
* Returns the current list of {@link Backend} values.
|
||||
*
|
||||
* @return list of {@link DBMode}s as String array
|
||||
* @return list of {@link Backend}s as String array
|
||||
*/
|
||||
@RequestMapping(value = "dbmodes", method = {RequestMethod.GET}, produces = {MediaType.APPLICATION_JSON_VALUE})
|
||||
public @ResponseBody List<String> getDBModeValues() {
|
||||
List<DBMode> orderings = Lists.newArrayList(DBMode.values());
|
||||
return orderings.stream()
|
||||
.map((DBMode value) -> {
|
||||
return value.toString();
|
||||
}).collect(Collectors.toList());
|
||||
@RequestMapping(value = "backends", method = {RequestMethod.GET}, produces = {MediaType.APPLICATION_JSON_VALUE})
|
||||
public @ResponseBody List<Backend> getDBModeValues() {
|
||||
return Lists.newArrayList(Backend.values());
|
||||
}
|
||||
}
|
||||
|
@ -1,7 +1,7 @@
|
||||
package ch.psi.daq.queryrest.controller.validator;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.LinkedHashSet;
|
||||
import java.util.LinkedList;
|
||||
import java.util.Set;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
@ -10,10 +10,10 @@ import org.springframework.validation.Errors;
|
||||
import org.springframework.validation.Validator;
|
||||
|
||||
import ch.psi.daq.query.model.Aggregation;
|
||||
import ch.psi.daq.query.model.DBMode;
|
||||
import ch.psi.daq.query.model.QueryField;
|
||||
import ch.psi.daq.query.model.impl.DAQQueries;
|
||||
import ch.psi.daq.query.model.impl.DAQQuery;
|
||||
import ch.psi.daq.cassandra.request.Request;
|
||||
import ch.psi.daq.query.model.impl.DAQQueryElement;
|
||||
import ch.psi.daq.queryrest.config.QueryRestConfig;
|
||||
|
||||
public class QueryValidator implements Validator {
|
||||
@ -28,8 +28,8 @@ public class QueryValidator implements Validator {
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
public boolean supports(Class<?> clazz) {
|
||||
return DAQQuery.class.isAssignableFrom(clazz);
|
||||
public boolean supports(Class<?> clazz) {
|
||||
return DAQQuery.class.isAssignableFrom(clazz) || DAQQueries.class.isAssignableFrom(clazz);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -37,25 +37,24 @@ public class QueryValidator implements Validator {
|
||||
*/
|
||||
@Override
|
||||
public void validate(Object target, Errors errors) {
|
||||
|
||||
DAQQuery query = (DAQQuery) target;
|
||||
|
||||
Request request = query.getRequest();
|
||||
|
||||
if (DBMode.archiverappliance.equals(query.getDbMode())) {
|
||||
if (!request.getRequestRange().isTimeRangeDefined()) {
|
||||
errors.reject("dbMode", "ArchiverAppliance supports time range queries only!");
|
||||
if (target instanceof DAQQuery) {
|
||||
this.checkElement((DAQQuery) target);
|
||||
}else if(target instanceof DAQQueries){
|
||||
DAQQueries queries = (DAQQueries) target;
|
||||
for (DAQQueryElement daqQueryElement : queries) {
|
||||
this.checkElement(daqQueryElement);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void checkElement(DAQQueryElement query) {
|
||||
// set default values (if not set)
|
||||
if (query.getFields() == null || query.getFields().isEmpty()) {
|
||||
query.setFields(new LinkedHashSet<>(defaultResponseFields));
|
||||
}
|
||||
|
||||
if (query.getAggregations() == null || query.getAggregations().isEmpty()) {
|
||||
query.setAggregations(new LinkedList<>(defaultResponseAggregations));
|
||||
query.setAggregations(new ArrayList<>(defaultResponseAggregations));
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -10,13 +10,8 @@ import javax.servlet.http.HttpServletResponse;
|
||||
import org.springframework.http.MediaType;
|
||||
|
||||
import ch.psi.daq.query.model.ResponseFormat;
|
||||
import ch.psi.daq.query.model.impl.DAQQuery;
|
||||
import ch.psi.daq.query.model.ResponseOptions;
|
||||
|
||||
|
||||
/**
|
||||
* @author zellweger_c
|
||||
*
|
||||
*/
|
||||
public abstract class AbstractResponseStreamWriter implements ResponseStreamWriter {
|
||||
|
||||
public static final String CONTENT_TYPE_CSV = "text/csv";
|
||||
@ -32,24 +27,24 @@ public abstract class AbstractResponseStreamWriter implements ResponseStreamWrit
|
||||
* see http://tools.ietf.org/html/rfc2616#section-14.11 and
|
||||
* see http://tools.ietf.org/html/rfc2616#section-3.5
|
||||
*
|
||||
* @param query The query
|
||||
* @param options The options for the response
|
||||
* @param response The HttpServletResponse
|
||||
* @param contentType The content type
|
||||
* @return OutputStream The OutputStream
|
||||
* @throws Exception Something goes wrong
|
||||
*/
|
||||
protected OutputStream handleCompressionAndResponseHeaders(DAQQuery query, HttpServletResponse response,
|
||||
protected OutputStream handleCompressionAndResponseHeaders(ResponseOptions options, HttpServletResponse response,
|
||||
String contentType) throws Exception {
|
||||
OutputStream out = response.getOutputStream();
|
||||
|
||||
response.addHeader("Content-Type", contentType);
|
||||
if (query.isCompressed()) {
|
||||
String filename = "data." + query.getCompression().getFileSuffix();
|
||||
if (options.isCompressed()) {
|
||||
String filename = "data." + options.getCompression().getFileSuffix();
|
||||
response.addHeader("Content-Disposition", "attachment; filename=" + filename);
|
||||
response.addHeader("Content-Encoding", query.getCompression().toString());
|
||||
out = query.getCompression().wrapStream(out);
|
||||
response.addHeader("Content-Encoding", options.getCompression().toString());
|
||||
out = options.getCompression().wrapStream(out);
|
||||
} else {
|
||||
String filename = "data." + (query.getResponseFormat() == ResponseFormat.CSV ? "csv" : "json");
|
||||
String filename = "data." + (options.getResponseFormat() == ResponseFormat.CSV ? "csv" : "json");
|
||||
response.addHeader("Content-Disposition", "attachment; filename=" + filename);
|
||||
}
|
||||
|
||||
|
@ -1,13 +1,18 @@
|
||||
package ch.psi.daq.queryrest.response;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import javax.servlet.ServletResponse;
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
|
||||
import ch.psi.daq.query.model.impl.DAQQuery;
|
||||
import org.apache.commons.lang3.tuple.Triple;
|
||||
|
||||
import ch.psi.daq.query.model.ResponseOptions;
|
||||
import ch.psi.daq.query.model.impl.BackendQuery;
|
||||
import ch.psi.daq.query.model.impl.DAQQueryElement;
|
||||
import ch.psi.daq.query.request.ChannelName;
|
||||
|
||||
public interface ResponseStreamWriter {
|
||||
|
||||
@ -15,10 +20,11 @@ public interface ResponseStreamWriter {
|
||||
* Responding with the the contents of the stream by writing into the output stream of the
|
||||
* {@link ServletResponse}.
|
||||
*
|
||||
* @param stream Mapping from channel name to data
|
||||
* @param query concrete instance of {@link DAQQuery}
|
||||
* @param stream The results results
|
||||
* @param options The options for the response
|
||||
* @param response {@link ServletResponse} instance given by the current HTTP request
|
||||
* @throws IOException thrown if writing to the output stream fails
|
||||
* @throws Exception thrown if writing to the output stream fails
|
||||
*/
|
||||
public void respond(Stream<Entry<String, ?>> stream, DAQQuery query, HttpServletResponse response) throws Exception;
|
||||
public void respond(List<Entry<DAQQueryElement, Stream<Triple<BackendQuery, ChannelName, ?>>>> results, ResponseOptions options,
|
||||
HttpServletResponse response) throws Exception;
|
||||
}
|
||||
|
@ -16,6 +16,7 @@ import javax.annotation.Resource;
|
||||
import javax.servlet.ServletResponse;
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
|
||||
import org.apache.commons.lang3.tuple.Triple;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.supercsv.cellprocessor.constraint.NotNull;
|
||||
@ -24,16 +25,19 @@ import org.supercsv.io.dozer.CsvDozerBeanWriter;
|
||||
import org.supercsv.io.dozer.ICsvDozerBeanWriter;
|
||||
import org.supercsv.prefs.CsvPreference;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonEncoding;
|
||||
|
||||
import ch.psi.daq.domain.DataEvent;
|
||||
import ch.psi.daq.query.analyzer.QueryAnalyzer;
|
||||
import ch.psi.daq.query.model.Aggregation;
|
||||
import ch.psi.daq.query.model.Query;
|
||||
import ch.psi.daq.query.model.QueryField;
|
||||
import ch.psi.daq.query.model.impl.DAQQuery;
|
||||
import ch.psi.daq.query.model.ResponseOptions;
|
||||
import ch.psi.daq.query.model.impl.BackendQuery;
|
||||
import ch.psi.daq.query.model.impl.DAQQueryElement;
|
||||
import ch.psi.daq.query.request.ChannelName;
|
||||
import ch.psi.daq.queryrest.response.AbstractResponseStreamWriter;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonEncoding;
|
||||
|
||||
/**
|
||||
* Takes a Java 8 stream and writes it to the output stream provided by the {@link ServletResponse}
|
||||
* of the current request.
|
||||
@ -44,53 +48,103 @@ public class CSVResponseStreamWriter extends AbstractResponseStreamWriter {
|
||||
private static final char DELIMITER_ARRAY = ',';
|
||||
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(CSVResponseStreamWriter.class);
|
||||
|
||||
|
||||
@Resource
|
||||
private Function<Query, QueryAnalyzer> queryAnalizerFactory;
|
||||
|
||||
@Override
|
||||
public void respond(Stream<Entry<String, ?>> stream, DAQQuery query, HttpServletResponse response) throws Exception {
|
||||
public void respond(List<Entry<DAQQueryElement, Stream<Triple<BackendQuery, ChannelName, ?>>>> results,
|
||||
ResponseOptions options,
|
||||
HttpServletResponse response) throws Exception {
|
||||
response.setCharacterEncoding(JsonEncoding.UTF8.getJavaName());
|
||||
response.setContentType(CONTENT_TYPE_CSV);
|
||||
|
||||
respondInternal(stream, query, response);
|
||||
respondInternal(results, options, response);
|
||||
}
|
||||
|
||||
private void respondInternal(Stream<Entry<String, ?>> stream, DAQQuery query, HttpServletResponse response)
|
||||
throws Exception {
|
||||
private void respondInternal(List<Entry<DAQQueryElement, Stream<Triple<BackendQuery, ChannelName, ?>>>> results,
|
||||
ResponseOptions options, HttpServletResponse response) throws Exception {
|
||||
AtomicReference<Exception> exception = new AtomicReference<>();
|
||||
OutputStream out = handleCompressionAndResponseHeaders(options, response, CONTENT_TYPE_CSV);
|
||||
List<ICsvDozerBeanWriter> beanWriters = new ArrayList<>();
|
||||
|
||||
Set<QueryField> queryFields = query.getFields();
|
||||
List<Aggregation> aggregations = query.getAggregations();
|
||||
int aggregationsSize = (aggregations != null ? aggregations.size() : 0);
|
||||
results.forEach(entry -> {
|
||||
DAQQueryElement query = entry.getKey();
|
||||
|
||||
Set<String> fieldMapping = new LinkedHashSet<>(queryFields.size() + aggregationsSize);
|
||||
List<String> header = new ArrayList<>(queryFields.size() + aggregationsSize);
|
||||
Set<QueryField> queryFields = query.getFields();
|
||||
List<Aggregation> aggregations = query.getAggregations();
|
||||
int aggregationsSize = (aggregations != null ? aggregations.size() : 0);
|
||||
|
||||
CellProcessor[] processors = setupCellProcessors(query, fieldMapping, header);
|
||||
|
||||
OutputStream out = handleCompressionAndResponseHeaders(query, response, CONTENT_TYPE_CSV);
|
||||
ICsvDozerBeanWriter beanWriter = setupBeanWriter(fieldMapping, out);
|
||||
Set<String> fieldMapping = new LinkedHashSet<>(queryFields.size() + aggregationsSize);
|
||||
List<String> header = new ArrayList<>(queryFields.size() + aggregationsSize);
|
||||
|
||||
writeToOutput(stream, header, processors, beanWriter);
|
||||
|
||||
AtomicReference<CellProcessor[]> processorsRef = new AtomicReference<>();
|
||||
|
||||
entry.getValue()
|
||||
.sequential()
|
||||
.forEach(triple -> {
|
||||
try {
|
||||
CellProcessor[] processors = processorsRef.get();
|
||||
ICsvDozerBeanWriter beanWriter;
|
||||
|
||||
if (processors == null) {
|
||||
processors = setupCellProcessors(query, triple.getLeft(), fieldMapping, header);
|
||||
processorsRef.set(processors);
|
||||
|
||||
beanWriter = setupBeanWriter(fieldMapping, out);
|
||||
beanWriters.add(beanWriter);
|
||||
} else {
|
||||
beanWriter = beanWriters.get(beanWriters.size() - 1);
|
||||
}
|
||||
|
||||
writeToOutput(triple, processors, beanWriter, header);
|
||||
} catch (Exception e) {
|
||||
LOGGER.warn("Could not write CSV of '{}'", triple.getMiddle(), e);
|
||||
exception.compareAndSet(null, e);
|
||||
}
|
||||
});
|
||||
|
||||
if (!beanWriters.isEmpty()) {
|
||||
try {
|
||||
beanWriters.get(beanWriters.size() - 1).flush();
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("Could not flush ICsvDozerBeanWriter.", e);
|
||||
exception.compareAndSet(null, e);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
for (ICsvDozerBeanWriter beanWriter : beanWriters) {
|
||||
try {
|
||||
beanWriter.close();
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("Could not close ICsvDozerBeanWriter.", e);
|
||||
exception.compareAndSet(null, e);
|
||||
}
|
||||
}
|
||||
|
||||
if (exception.get() != null) {
|
||||
throw exception.get();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets up the bean writer instance.
|
||||
* @throws Exception
|
||||
*
|
||||
*/
|
||||
private ICsvDozerBeanWriter setupBeanWriter(Set<String> fieldMapping, OutputStream out) throws Exception {
|
||||
CsvPreference preference = new CsvPreference.Builder(
|
||||
(char) CsvPreference.EXCEL_NORTH_EUROPE_PREFERENCE.getQuoteChar(),
|
||||
DELIMITER_CVS,
|
||||
CsvPreference.EXCEL_NORTH_EUROPE_PREFERENCE.getEndOfLineSymbols()).build();
|
||||
|
||||
ICsvDozerBeanWriter beanWriter = new CsvDozerBeanWriter(new OutputStreamWriter(out), preference);
|
||||
// configure the mapping from the fields to the CSV columns
|
||||
beanWriter.configureBeanMapping(DataEvent.class, fieldMapping.toArray(new String[fieldMapping.size()]));
|
||||
return beanWriter;
|
||||
}
|
||||
* Sets up the bean writer instance.
|
||||
*
|
||||
* @throws Exception
|
||||
*
|
||||
*/
|
||||
private ICsvDozerBeanWriter setupBeanWriter(Set<String> fieldMapping, OutputStream out) throws Exception {
|
||||
CsvPreference preference = new CsvPreference.Builder(
|
||||
(char) CsvPreference.EXCEL_NORTH_EUROPE_PREFERENCE.getQuoteChar(),
|
||||
DELIMITER_CVS,
|
||||
CsvPreference.EXCEL_NORTH_EUROPE_PREFERENCE.getEndOfLineSymbols()).build();
|
||||
|
||||
ICsvDozerBeanWriter beanWriter = new CsvDozerBeanWriter(new OutputStreamWriter(out), preference);
|
||||
// configure the mapping from the fields to the CSV columns
|
||||
beanWriter.configureBeanMapping(DataEvent.class, fieldMapping.toArray(new String[fieldMapping.size()]));
|
||||
return beanWriter;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets up the array of {@link CellProcessor}s needed for later configuration of the bean writer.
|
||||
@ -101,34 +155,36 @@ public class CSVResponseStreamWriter extends AbstractResponseStreamWriter {
|
||||
* with other processors to fully automate all of the required conversions and constraint
|
||||
* validation for a single CSV column.
|
||||
*
|
||||
* @param query The current {@link DAQQuery}
|
||||
* @param daqQuery The current {@link DAQQueryElement}
|
||||
* @param backendQuery One BackendQuery of the current {@link DAQQueryElement}
|
||||
* @return Array of {@link CellProcessor} entries
|
||||
*/
|
||||
private CellProcessor[] setupCellProcessors(DAQQuery query, Set<String> fieldMapping, List<String> header) {
|
||||
Set<QueryField> queryFields = query.getFields();
|
||||
List<Aggregation> aggregations = query.getAggregations();
|
||||
|
||||
private CellProcessor[] setupCellProcessors(DAQQueryElement daqQuery, BackendQuery backendQuery,
|
||||
Set<String> fieldMapping, List<String> header) {
|
||||
Set<QueryField> queryFields = daqQuery.getFields();
|
||||
List<Aggregation> aggregations = daqQuery.getAggregations();
|
||||
|
||||
List<CellProcessor> processorSet =
|
||||
new ArrayList<>(queryFields.size() + (aggregations != null ? aggregations.size() : 0));
|
||||
|
||||
|
||||
boolean isNewField;
|
||||
QueryAnalyzer queryAnalyzer = queryAnalizerFactory.apply(query);
|
||||
|
||||
QueryAnalyzer queryAnalyzer = queryAnalizerFactory.apply(backendQuery);
|
||||
|
||||
for (QueryField field : queryFields) {
|
||||
if(!(QueryField.value.equals(field) && queryAnalyzer.isAggregationEnabled())){
|
||||
if (!(QueryField.value.equals(field) && queryAnalyzer.isAggregationEnabled())) {
|
||||
isNewField = fieldMapping.add(field.name());
|
||||
|
||||
|
||||
if (isNewField) {
|
||||
header.add(field.name());
|
||||
processorSet.add(new ArrayProcessor(new NotNull(), DELIMITER_ARRAY));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
if (aggregations != null && queryAnalyzer.isAggregationEnabled()) {
|
||||
for (Aggregation aggregation : query.getAggregations()) {
|
||||
for (Aggregation aggregation : daqQuery.getAggregations()) {
|
||||
isNewField = fieldMapping.add("value." + aggregation.name());
|
||||
|
||||
|
||||
if (isNewField) {
|
||||
header.add(aggregation.name());
|
||||
processorSet.add(new ArrayProcessor(new NotNull(), DELIMITER_ARRAY));
|
||||
@ -139,44 +195,27 @@ public class CSVResponseStreamWriter extends AbstractResponseStreamWriter {
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private void writeToOutput(Stream<Entry<String, ?>> stream, List<String> header, CellProcessor[] processors,
|
||||
ICsvDozerBeanWriter beanWriter) throws IOException, Exception {
|
||||
AtomicReference<Exception> exception = new AtomicReference<>();
|
||||
try {
|
||||
|
||||
private void writeToOutput(Triple<BackendQuery, ChannelName, ?> triple, CellProcessor[] processors,
|
||||
ICsvDozerBeanWriter beanWriter, List<String> header) throws IOException {
|
||||
if (triple.getRight() instanceof Stream) {
|
||||
beanWriter.writeComment("");
|
||||
beanWriter.writeComment("Start of " + triple.getMiddle());
|
||||
beanWriter.writeHeader(header.toArray(new String[header.size()]));
|
||||
|
||||
stream
|
||||
/* ensure elements are sequentially written */
|
||||
.sequential()
|
||||
Stream<DataEvent> eventStream = (Stream<DataEvent>) triple.getRight();
|
||||
eventStream
|
||||
.forEach(
|
||||
entry -> {
|
||||
if (entry.getValue() instanceof Stream) {
|
||||
Stream<DataEvent> eventStream = (Stream<DataEvent>) entry.getValue();
|
||||
eventStream
|
||||
.forEach(
|
||||
event -> {
|
||||
try {
|
||||
beanWriter.write(event, processors);
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("Could not write elements for '{}-{}'", event.getChannel(),
|
||||
event.getPulseId(), e);
|
||||
exception.compareAndSet(null, e);
|
||||
}
|
||||
});
|
||||
} else {
|
||||
String message = "Type '" + entry.getValue().getClass() + "' not supported.";
|
||||
LOGGER.error(message);
|
||||
exception.compareAndSet(null, new RuntimeException(message));
|
||||
}
|
||||
}
|
||||
);
|
||||
} finally {
|
||||
beanWriter.close();
|
||||
}
|
||||
|
||||
if (exception.get() != null) {
|
||||
throw exception.get();
|
||||
event -> {
|
||||
try {
|
||||
beanWriter.write(event, processors);
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("Could not write elements for '{}-{}'", event.getChannel(),
|
||||
event.getPulseId(), e);
|
||||
}
|
||||
});
|
||||
} else {
|
||||
String message = "Type '" + triple.getRight().getClass() + "' not supported.";
|
||||
LOGGER.error(message);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1,6 +1,5 @@
|
||||
package ch.psi.daq.queryrest.response.json;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.util.LinkedHashSet;
|
||||
import java.util.List;
|
||||
@ -13,15 +12,11 @@ import javax.annotation.Resource;
|
||||
import javax.servlet.ServletResponse;
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
|
||||
import org.apache.commons.lang3.tuple.Triple;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.http.MediaType;
|
||||
|
||||
import ch.psi.daq.query.model.Aggregation;
|
||||
import ch.psi.daq.query.model.QueryField;
|
||||
import ch.psi.daq.query.model.impl.DAQQuery;
|
||||
import ch.psi.daq.queryrest.response.AbstractResponseStreamWriter;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonEncoding;
|
||||
import com.fasterxml.jackson.core.JsonFactory;
|
||||
import com.fasterxml.jackson.core.JsonGenerator;
|
||||
@ -30,6 +25,14 @@ import com.fasterxml.jackson.databind.ObjectWriter;
|
||||
import com.fasterxml.jackson.databind.ser.impl.SimpleBeanPropertyFilter;
|
||||
import com.fasterxml.jackson.databind.ser.impl.SimpleFilterProvider;
|
||||
|
||||
import ch.psi.daq.query.model.Aggregation;
|
||||
import ch.psi.daq.query.model.QueryField;
|
||||
import ch.psi.daq.query.model.ResponseOptions;
|
||||
import ch.psi.daq.query.model.impl.BackendQuery;
|
||||
import ch.psi.daq.query.model.impl.DAQQueryElement;
|
||||
import ch.psi.daq.query.request.ChannelName;
|
||||
import ch.psi.daq.queryrest.response.AbstractResponseStreamWriter;
|
||||
|
||||
/**
|
||||
* Takes a Java 8 stream and writes it to the output stream provided by the {@link ServletResponse}
|
||||
* of the current request.
|
||||
@ -38,7 +41,7 @@ public class JSONResponseStreamWriter extends AbstractResponseStreamWriter {
|
||||
|
||||
private static final String DATA_RESP_FIELD = "data";
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(JSONResponseStreamWriter.class);
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(JSONResponseStreamWriter.class);
|
||||
|
||||
@Resource
|
||||
private JsonFactory jsonFactory;
|
||||
@ -46,19 +49,78 @@ public class JSONResponseStreamWriter extends AbstractResponseStreamWriter {
|
||||
@Resource
|
||||
private ObjectMapper mapper;
|
||||
|
||||
|
||||
@Override
|
||||
public void respond(Stream<Entry<String, ?>> stream, DAQQuery query, HttpServletResponse response) throws Exception {
|
||||
public void respond(List<Entry<DAQQueryElement, Stream<Triple<BackendQuery, ChannelName, ?>>>> results,
|
||||
ResponseOptions options, HttpServletResponse response) throws Exception {
|
||||
response.setCharacterEncoding(JsonEncoding.UTF8.getJavaName());
|
||||
response.setContentType(MediaType.APPLICATION_JSON_VALUE);
|
||||
|
||||
Set<String> includedFields = getFields(query);
|
||||
|
||||
ObjectWriter writer = configureWriter(includedFields);
|
||||
|
||||
respondInternal(stream, response, writer, query);
|
||||
respondInternal(results, options, response);
|
||||
}
|
||||
|
||||
protected Set<String> getFields(DAQQuery query) {
|
||||
private void respondInternal(List<Entry<DAQQueryElement, Stream<Triple<BackendQuery, ChannelName, ?>>>> results,
|
||||
ResponseOptions options, HttpServletResponse response) throws Exception {
|
||||
AtomicReference<Exception> exception = new AtomicReference<>();
|
||||
OutputStream out = handleCompressionAndResponseHeaders(options, response, CONTENT_TYPE_JSON);
|
||||
JsonGenerator generator = jsonFactory.createGenerator(out, JsonEncoding.UTF8);
|
||||
|
||||
try {
|
||||
if (results.size() > 1) {
|
||||
generator.writeStartArray();
|
||||
}
|
||||
|
||||
results
|
||||
.forEach(entryy -> {
|
||||
DAQQueryElement daqQuery = entryy.getKey();
|
||||
Set<String> includedFields = getFields(daqQuery);
|
||||
ObjectWriter writer = configureWriter(includedFields);
|
||||
|
||||
try {
|
||||
generator.writeStartArray();
|
||||
|
||||
entryy.getValue()
|
||||
/* ensure elements are sequentially written */
|
||||
.sequential()
|
||||
.forEach(
|
||||
triple -> {
|
||||
try {
|
||||
generator.writeStartObject();
|
||||
generator.writeStringField(QueryField.channel.name(), triple.getMiddle()
|
||||
.getName());
|
||||
|
||||
generator.writeFieldName(DATA_RESP_FIELD);
|
||||
writer.writeValue(generator, triple.getRight());
|
||||
|
||||
generator.writeEndObject();
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("Could not write channel name of channel '{}'", triple.getMiddle(),
|
||||
e);
|
||||
exception.compareAndSet(null, e);
|
||||
}
|
||||
});
|
||||
|
||||
generator.writeEndArray();
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("Exception while writing json for '{}'", daqQuery.getChannels(), e);
|
||||
exception.compareAndSet(null, e);
|
||||
}
|
||||
});
|
||||
} finally {
|
||||
if (results.size() > 1) {
|
||||
generator.writeEndArray();
|
||||
}
|
||||
|
||||
generator.flush();
|
||||
generator.close();
|
||||
}
|
||||
|
||||
if (exception.get() != null) {
|
||||
throw exception.get();
|
||||
}
|
||||
}
|
||||
|
||||
protected Set<String> getFields(DAQQueryElement query) {
|
||||
Set<QueryField> queryFields = query.getFields();
|
||||
List<Aggregation> aggregations = query.getAggregations();
|
||||
|
||||
@ -95,52 +157,4 @@ public class JSONResponseStreamWriter extends AbstractResponseStreamWriter {
|
||||
ObjectWriter writer = mapper.writer(propertyFilter);
|
||||
return writer;
|
||||
}
|
||||
|
||||
/**
|
||||
* Writes the outer Java stream into the output stream.
|
||||
*
|
||||
* @param stream Mapping from channel name to data
|
||||
* @param response {@link ServletResponse} instance given by the current HTTP request
|
||||
* @param writer configured writer that includes the fields the end user wants to see
|
||||
* @param query
|
||||
* @throws IOException thrown if writing to the output stream fails
|
||||
*/
|
||||
private void respondInternal(Stream<Entry<String, ?>> stream, HttpServletResponse response, ObjectWriter writer, DAQQuery query)
|
||||
throws Exception {
|
||||
|
||||
OutputStream out = handleCompressionAndResponseHeaders(query, response, CONTENT_TYPE_JSON);
|
||||
JsonGenerator generator = jsonFactory.createGenerator(out, JsonEncoding.UTF8);
|
||||
|
||||
AtomicReference<Exception> exception = new AtomicReference<>();
|
||||
try {
|
||||
generator.writeStartArray();
|
||||
stream
|
||||
/* ensure elements are sequentially written */
|
||||
.sequential()
|
||||
.forEach(
|
||||
entry -> {
|
||||
try {
|
||||
generator.writeStartObject();
|
||||
generator.writeStringField(QueryField.channel.name(), entry.getKey());
|
||||
|
||||
generator.writeFieldName(DATA_RESP_FIELD);
|
||||
writer.writeValue(generator, entry.getValue());
|
||||
|
||||
generator.writeEndObject();
|
||||
} catch (Exception e) {
|
||||
logger.error("Could not write channel name of channel '{}'", entry.getKey(), e);
|
||||
exception.compareAndSet(null, e);
|
||||
}
|
||||
});
|
||||
generator.writeEndArray();
|
||||
} finally {
|
||||
generator.flush();
|
||||
generator.close();
|
||||
}
|
||||
|
||||
if (exception.get() != null) {
|
||||
throw exception.get();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -1,30 +0,0 @@
|
||||
package ch.psi.daq.queryrest.response.json;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonGenerationException;
|
||||
import com.fasterxml.jackson.core.JsonGenerator;
|
||||
import com.fasterxml.jackson.databind.SerializerProvider;
|
||||
import com.fasterxml.jackson.databind.ser.std.StdSerializer;
|
||||
|
||||
// see: http://stackoverflow.com/a/15037329
|
||||
public class JsonByteArraySerializer extends StdSerializer<byte[]> {
|
||||
private static final long serialVersionUID = -5914688899857435263L;
|
||||
|
||||
public JsonByteArraySerializer() {
|
||||
super(byte[].class, true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void serialize(byte[] bytes, JsonGenerator jgen, SerializerProvider provider) throws IOException,
|
||||
JsonGenerationException {
|
||||
jgen.writeStartArray();
|
||||
|
||||
for (byte b : bytes) {
|
||||
// stackoverflow example used a mask -> ?
|
||||
jgen.writeNumber(b);
|
||||
}
|
||||
|
||||
jgen.writeEndArray();
|
||||
}
|
||||
}
|
@ -1,24 +0,0 @@
|
||||
package ch.psi.daq.queryrest.response.json;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Iterator;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonGenerationException;
|
||||
import com.fasterxml.jackson.core.JsonGenerator;
|
||||
import com.fasterxml.jackson.databind.SerializerProvider;
|
||||
import com.fasterxml.jackson.databind.ser.std.StdSerializer;
|
||||
|
||||
public class JsonStreamSerializer extends StdSerializer<Stream<?>>{
|
||||
private static final long serialVersionUID = 4695859735299703478L;
|
||||
|
||||
public JsonStreamSerializer() {
|
||||
super(Stream.class, true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void serialize(Stream<?> stream, JsonGenerator jgen, SerializerProvider provider) throws IOException,
|
||||
JsonGenerationException {
|
||||
provider.findValueSerializer(Iterator.class, null).serialize(stream.iterator(), jgen, provider);
|
||||
}
|
||||
}
|
@ -12,11 +12,13 @@ import org.springframework.web.servlet.config.annotation.WebMvcConfigurationSupp
|
||||
|
||||
import ch.psi.daq.cassandra.reader.CassandraReader;
|
||||
import ch.psi.daq.cassandra.util.test.CassandraDataGen;
|
||||
import ch.psi.daq.domain.reader.DataReader;
|
||||
import ch.psi.daq.query.processor.QueryProcessor;
|
||||
import ch.psi.daq.query.processor.QueryProcessorLocal;
|
||||
import ch.psi.daq.test.cassandra.admin.CassandraTestAdmin;
|
||||
import ch.psi.daq.test.cassandra.admin.CassandraTestAdminImpl;
|
||||
import ch.psi.daq.test.query.config.LocalQueryTestConfig;
|
||||
import ch.psi.daq.test.queryrest.query.DummyArchiverApplianceReader;
|
||||
import ch.psi.daq.test.queryrest.query.DummyCassandraReader;
|
||||
|
||||
@Configuration
|
||||
@ -45,6 +47,18 @@ public class DaqWebMvcConfig extends WebMvcConfigurationSupport {
|
||||
return new DummyCassandraReader();
|
||||
}
|
||||
|
||||
// @Bean
|
||||
// @Lazy
|
||||
// public DataReader archiverApplianceReader() {
|
||||
// return new DummyArchiverApplianceReader();
|
||||
// }
|
||||
//
|
||||
// @Bean
|
||||
// @Lazy
|
||||
// public QueryProcessor archiverApplianceQueryProcessor() {
|
||||
// return new QueryProcessorLocal(archiverApplianceReader());
|
||||
// }
|
||||
|
||||
@Bean
|
||||
@Lazy
|
||||
public CassandraTestAdmin cassandraTestAdmin() {
|
||||
|
@ -34,7 +34,9 @@ import ch.psi.daq.query.model.AggregationType;
|
||||
import ch.psi.daq.query.model.Compression;
|
||||
import ch.psi.daq.query.model.QueryField;
|
||||
import ch.psi.daq.query.model.ResponseFormat;
|
||||
import ch.psi.daq.query.model.impl.DAQQueries;
|
||||
import ch.psi.daq.query.model.impl.DAQQuery;
|
||||
import ch.psi.daq.query.model.impl.DAQQueryElement;
|
||||
import ch.psi.daq.queryrest.controller.QueryRestController;
|
||||
import ch.psi.daq.queryrest.filter.CorsFilter;
|
||||
import ch.psi.daq.test.cassandra.admin.CassandraTestAdmin;
|
||||
@ -69,9 +71,8 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest {
|
||||
0,
|
||||
1),
|
||||
TEST_CHANNEL_NAMES);
|
||||
request.setCompression(Compression.NONE);
|
||||
request.setResponseFormat(ResponseFormat.CSV);
|
||||
|
||||
|
||||
LinkedHashSet<QueryField> queryFields = new LinkedHashSet<>();
|
||||
LinkedHashSet<CellProcessor> cellProcessors = new LinkedHashSet<>();
|
||||
queryFields.add(QueryField.channel);
|
||||
@ -109,10 +110,14 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest {
|
||||
CellProcessor[] processors = cellProcessors.toArray(new CellProcessor[cellProcessors.size()]);
|
||||
|
||||
String response = result.getResponse().getContentAsString();
|
||||
System.out.println("Response: " + response);
|
||||
|
||||
ICsvMapReader mapReader =
|
||||
new CsvMapReader(new StringReader(response), CsvPreference.EXCEL_NORTH_EUROPE_PREFERENCE);
|
||||
try {
|
||||
final String[] header = mapReader.getHeader(true);
|
||||
// read comment
|
||||
mapReader.read("");
|
||||
String[] header = mapReader.getHeader(false);
|
||||
Map<String, Object> customerMap;
|
||||
long resultsPerChannel = 2;
|
||||
long pulse = 0;
|
||||
@ -131,13 +136,111 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest {
|
||||
pulse = ++pulse % resultsPerChannel;
|
||||
if (pulse == 0) {
|
||||
++channelCount;
|
||||
if (channelCount <= TEST_CHANNEL_NAMES.length) {
|
||||
// read comment (empty rows are skiped
|
||||
mapReader.read("");
|
||||
header = mapReader.getHeader(false);
|
||||
}
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
mapReader.close();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testPulseRangeQueries() throws Exception {
|
||||
String testChannel3 = "testChannel3";
|
||||
DAQQueries request = new DAQQueries(
|
||||
new DAQQueryElement(
|
||||
new RequestRangePulseId(
|
||||
0,
|
||||
1),
|
||||
TEST_CHANNEL_NAMES),
|
||||
new DAQQueryElement(
|
||||
new RequestRangePulseId(
|
||||
0,
|
||||
1),
|
||||
testChannel3));
|
||||
request.setResponseFormat(ResponseFormat.CSV);
|
||||
|
||||
LinkedHashSet<QueryField> queryFields = new LinkedHashSet<>();
|
||||
LinkedHashSet<CellProcessor> cellProcessors = new LinkedHashSet<>();
|
||||
queryFields.add(QueryField.channel);
|
||||
cellProcessors.add(new NotNull());
|
||||
queryFields.add(QueryField.pulseId);
|
||||
cellProcessors.add(new NotNull());
|
||||
queryFields.add(QueryField.iocMillis);
|
||||
cellProcessors.add(new NotNull());
|
||||
queryFields.add(QueryField.iocNanos);
|
||||
cellProcessors.add(new NotNull());
|
||||
queryFields.add(QueryField.globalMillis);
|
||||
cellProcessors.add(new NotNull());
|
||||
queryFields.add(QueryField.globalNanos);
|
||||
cellProcessors.add(new NotNull());
|
||||
queryFields.add(QueryField.shape);
|
||||
cellProcessors.add(new NotNull());
|
||||
queryFields.add(QueryField.eventCount);
|
||||
cellProcessors.add(new NotNull());
|
||||
queryFields.add(QueryField.value);
|
||||
cellProcessors.add(new NotNull());
|
||||
for (DAQQueryElement element : request) {
|
||||
element.setFields(queryFields);
|
||||
}
|
||||
|
||||
String content = mapper.writeValueAsString(request);
|
||||
System.out.println(content);
|
||||
|
||||
MvcResult result = this.mockMvc
|
||||
.perform(MockMvcRequestBuilders
|
||||
.post(QueryRestController.QUERIES)
|
||||
.contentType(MediaType.APPLICATION_JSON)
|
||||
.content(content))
|
||||
.andDo(MockMvcResultHandlers.print())
|
||||
.andExpect(MockMvcResultMatchers.status().isOk())
|
||||
.andReturn();
|
||||
|
||||
CellProcessor[] processors = cellProcessors.toArray(new CellProcessor[cellProcessors.size()]);
|
||||
|
||||
String response = result.getResponse().getContentAsString();
|
||||
System.out.println("Response: " + response);
|
||||
|
||||
ICsvMapReader mapReader =
|
||||
new CsvMapReader(new StringReader(response), CsvPreference.EXCEL_NORTH_EUROPE_PREFERENCE);
|
||||
try {
|
||||
// read comment
|
||||
mapReader.read("");
|
||||
String[] header = mapReader.getHeader(false);
|
||||
Map<String, Object> customerMap;
|
||||
long resultsPerChannel = 2;
|
||||
long pulse = 0;
|
||||
long channelCount = 1;
|
||||
while ((customerMap = mapReader.read(header, processors)) != null) {
|
||||
assertEquals(TEST_CHANNEL + channelCount, customerMap.get(QueryField.channel.name()));
|
||||
assertEquals("" + pulse, customerMap.get(QueryField.pulseId.name()));
|
||||
assertEquals("" + pulse * 10, customerMap.get(QueryField.iocMillis.name()));
|
||||
assertEquals("0", customerMap.get(QueryField.iocNanos.name()));
|
||||
assertEquals("" + pulse * 10, customerMap.get(QueryField.globalMillis.name()));
|
||||
assertEquals("0", customerMap.get(QueryField.globalNanos.name()));
|
||||
assertEquals("[1]", customerMap.get(QueryField.shape.name()));
|
||||
assertEquals("1", customerMap.get(QueryField.eventCount.name()));
|
||||
assertEquals("" + pulse, customerMap.get(QueryField.value.name()));
|
||||
|
||||
pulse = ++pulse % resultsPerChannel;
|
||||
if (pulse == 0) {
|
||||
++channelCount;
|
||||
if (channelCount <= TEST_CHANNEL_NAMES.length + 1) {
|
||||
// read comment (empty rows are skiped
|
||||
mapReader.read("");
|
||||
header = mapReader.getHeader(false);
|
||||
}
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
mapReader.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPulseRangeQueryWaveform() throws Exception {
|
||||
String channelName = "XYWaveform";
|
||||
@ -145,10 +248,9 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest {
|
||||
new RequestRangePulseId(
|
||||
0,
|
||||
1),
|
||||
channelName);
|
||||
request.setCompression(Compression.NONE);
|
||||
channelName);
|
||||
request.setResponseFormat(ResponseFormat.CSV);
|
||||
|
||||
|
||||
LinkedHashSet<QueryField> queryFields = new LinkedHashSet<>();
|
||||
LinkedHashSet<CellProcessor> cellProcessors = new LinkedHashSet<>();
|
||||
queryFields.add(QueryField.channel);
|
||||
@ -186,10 +288,14 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest {
|
||||
CellProcessor[] processors = cellProcessors.toArray(new CellProcessor[cellProcessors.size()]);
|
||||
|
||||
String response = result.getResponse().getContentAsString();
|
||||
System.out.println("Response: " + response);
|
||||
|
||||
ICsvMapReader mapReader =
|
||||
new CsvMapReader(new StringReader(response), CsvPreference.EXCEL_NORTH_EUROPE_PREFERENCE);
|
||||
try {
|
||||
final String[] header = mapReader.getHeader(true);
|
||||
// read comment
|
||||
mapReader.read("");
|
||||
final String[] header = mapReader.getHeader(false);
|
||||
Map<String, Object> customerMap;
|
||||
long pulse = 0;
|
||||
while ((customerMap = mapReader.read(header, processors)) != null) {
|
||||
@ -218,9 +324,8 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest {
|
||||
0,
|
||||
10),
|
||||
TEST_CHANNEL_NAMES);
|
||||
request.setCompression(Compression.NONE);
|
||||
request.setResponseFormat(ResponseFormat.CSV);
|
||||
|
||||
|
||||
LinkedHashSet<QueryField> queryFields = new LinkedHashSet<>();
|
||||
LinkedHashSet<CellProcessor> cellProcessors = new LinkedHashSet<>();
|
||||
queryFields.add(QueryField.channel);
|
||||
@ -258,10 +363,14 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest {
|
||||
CellProcessor[] processors = cellProcessors.toArray(new CellProcessor[cellProcessors.size()]);
|
||||
|
||||
String response = result.getResponse().getContentAsString();
|
||||
ICsvMapReader mapReader =
|
||||
System.out.println("Response: " + response);
|
||||
|
||||
CsvMapReader mapReader =
|
||||
new CsvMapReader(new StringReader(response), CsvPreference.EXCEL_NORTH_EUROPE_PREFERENCE);
|
||||
try {
|
||||
final String[] header = mapReader.getHeader(true);
|
||||
// read comment
|
||||
mapReader.read("");
|
||||
String[] header = mapReader.getHeader(false);
|
||||
Map<String, Object> customerMap;
|
||||
long resultsPerChannel = 2;
|
||||
long pulse = 0;
|
||||
@ -280,6 +389,11 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest {
|
||||
pulse = ++pulse % resultsPerChannel;
|
||||
if (pulse == 0) {
|
||||
++channelCount;
|
||||
if (channelCount <= TEST_CHANNEL_NAMES.length) {
|
||||
// read comment (empty rows are skiped
|
||||
mapReader.read("");
|
||||
header = mapReader.getHeader(false);
|
||||
}
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
@ -296,9 +410,8 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest {
|
||||
startDate,
|
||||
endDate),
|
||||
TEST_CHANNEL_NAMES);
|
||||
request.setCompression(Compression.NONE);
|
||||
request.setResponseFormat(ResponseFormat.CSV);
|
||||
|
||||
|
||||
LinkedHashSet<QueryField> queryFields = new LinkedHashSet<>();
|
||||
LinkedHashSet<CellProcessor> cellProcessors = new LinkedHashSet<>();
|
||||
queryFields.add(QueryField.channel);
|
||||
@ -336,10 +449,14 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest {
|
||||
CellProcessor[] processors = cellProcessors.toArray(new CellProcessor[cellProcessors.size()]);
|
||||
|
||||
String response = result.getResponse().getContentAsString();
|
||||
System.out.println("Response: " + response);
|
||||
|
||||
ICsvMapReader mapReader =
|
||||
new CsvMapReader(new StringReader(response), CsvPreference.EXCEL_NORTH_EUROPE_PREFERENCE);
|
||||
try {
|
||||
final String[] header = mapReader.getHeader(true);
|
||||
// read comment
|
||||
mapReader.read("");
|
||||
String[] header = mapReader.getHeader(false);
|
||||
Map<String, Object> customerMap;
|
||||
long resultsPerChannel = 2;
|
||||
long pulse = 0;
|
||||
@ -358,6 +475,11 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest {
|
||||
pulse = ++pulse % resultsPerChannel;
|
||||
if (pulse == 0) {
|
||||
++channelCount;
|
||||
if (channelCount <= TEST_CHANNEL_NAMES.length) {
|
||||
// read comment (empty rows are skiped
|
||||
mapReader.read("");
|
||||
header = mapReader.getHeader(false);
|
||||
}
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
@ -371,11 +493,9 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest {
|
||||
new RequestRangePulseId(
|
||||
0,
|
||||
1),
|
||||
false,
|
||||
Ordering.asc,
|
||||
AggregationType.extrema,
|
||||
TEST_CHANNEL_NAMES[0]);
|
||||
request.setCompression(Compression.NONE);
|
||||
request.setResponseFormat(ResponseFormat.CSV);
|
||||
|
||||
String content = mapper.writeValueAsString(request);
|
||||
@ -403,11 +523,9 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest {
|
||||
new RequestRangePulseId(
|
||||
0,
|
||||
1),
|
||||
false,
|
||||
Ordering.asc,
|
||||
AggregationType.index,
|
||||
TEST_CHANNEL_NAMES[0]);
|
||||
request.setCompression(Compression.NONE);
|
||||
request.setResponseFormat(ResponseFormat.CSV);
|
||||
|
||||
String content = mapper.writeValueAsString(request);
|
||||
@ -441,7 +559,6 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest {
|
||||
endDate),
|
||||
TEST_CHANNEL_01);
|
||||
request.setNrOfBins(2);
|
||||
request.setCompression(Compression.NONE);
|
||||
request.setResponseFormat(ResponseFormat.CSV);
|
||||
|
||||
LinkedHashSet<QueryField> queryFields = new LinkedHashSet<>();
|
||||
@ -473,7 +590,6 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest {
|
||||
cellProcessors.add(new NotNull());
|
||||
request.setAggregations(aggregations);
|
||||
|
||||
|
||||
String content = mapper.writeValueAsString(request);
|
||||
System.out.println(content);
|
||||
|
||||
@ -489,10 +605,14 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest {
|
||||
CellProcessor[] processors = cellProcessors.toArray(new CellProcessor[cellProcessors.size()]);
|
||||
|
||||
String response = result.getResponse().getContentAsString();
|
||||
System.out.println("Response: " + response);
|
||||
|
||||
ICsvMapReader mapReader =
|
||||
new CsvMapReader(new StringReader(response), CsvPreference.EXCEL_NORTH_EUROPE_PREFERENCE);
|
||||
try {
|
||||
final String[] header = mapReader.getHeader(true);
|
||||
// read comment
|
||||
mapReader.read("");
|
||||
String[] header = mapReader.getHeader(false);
|
||||
Map<String, Object> customerMap;
|
||||
long pulse = 0;
|
||||
while ((customerMap = mapReader.read(header, processors)) != null) {
|
||||
@ -527,7 +647,6 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest {
|
||||
endDate),
|
||||
TEST_CHANNEL_01);
|
||||
request.setBinSize(100);
|
||||
request.setCompression(Compression.NONE);
|
||||
request.setResponseFormat(ResponseFormat.CSV);
|
||||
|
||||
LinkedHashSet<QueryField> queryFields = new LinkedHashSet<>();
|
||||
@ -575,10 +694,14 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest {
|
||||
CellProcessor[] processors = cellProcessors.toArray(new CellProcessor[cellProcessors.size()]);
|
||||
|
||||
String response = result.getResponse().getContentAsString();
|
||||
System.out.println("Response: " + response);
|
||||
|
||||
ICsvMapReader mapReader =
|
||||
new CsvMapReader(new StringReader(response), CsvPreference.EXCEL_NORTH_EUROPE_PREFERENCE);
|
||||
try {
|
||||
final String[] header = mapReader.getHeader(true);
|
||||
// read comment
|
||||
mapReader.read("");
|
||||
String[] header = mapReader.getHeader(false);
|
||||
Map<String, Object> customerMap;
|
||||
long pulse = 0;
|
||||
while ((customerMap = mapReader.read(header, processors)) != null) {
|
||||
@ -600,7 +723,7 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest {
|
||||
mapReader.close();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testGzipFileSuffixHeader() throws Exception {
|
||||
DAQQuery request = new DAQQuery(
|
||||
@ -623,16 +746,15 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest {
|
||||
.andExpect(MockMvcResultMatchers.status().isOk())
|
||||
.andExpect(MockMvcResultMatchers.header().string("Content-Disposition", "attachment; filename=data.gz"));
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testJsonFileSuffixHeader() throws Exception {
|
||||
public void testCvsFileSuffixHeader() throws Exception {
|
||||
DAQQuery request = new DAQQuery(
|
||||
new RequestRangePulseId(
|
||||
10,
|
||||
11),
|
||||
TEST_CHANNEL_NAMES);
|
||||
request.setResponseFormat(ResponseFormat.CSV);
|
||||
request.setCompression(Compression.NONE);
|
||||
|
||||
String content = mapper.writeValueAsString(request);
|
||||
|
||||
|
@ -15,13 +15,19 @@ import ch.psi.daq.cassandra.request.range.RequestRangePulseId;
|
||||
import ch.psi.daq.cassandra.request.range.RequestRangeTime;
|
||||
import ch.psi.daq.cassandra.util.test.CassandraDataGen;
|
||||
import ch.psi.daq.common.ordering.Ordering;
|
||||
import ch.psi.daq.domain.reader.Backend;
|
||||
import ch.psi.daq.query.model.AggregationType;
|
||||
import ch.psi.daq.query.model.Compression;
|
||||
import ch.psi.daq.query.model.impl.DAQQueries;
|
||||
import ch.psi.daq.query.model.impl.DAQQuery;
|
||||
import ch.psi.daq.query.model.impl.DAQQueryElement;
|
||||
import ch.psi.daq.query.request.ChannelName;
|
||||
import ch.psi.daq.query.request.ChannelsRequest;
|
||||
import ch.psi.daq.queryrest.controller.QueryRestController;
|
||||
import ch.psi.daq.queryrest.filter.CorsFilter;
|
||||
import ch.psi.daq.test.cassandra.admin.CassandraTestAdmin;
|
||||
import ch.psi.daq.test.queryrest.AbstractDaqRestTest;
|
||||
import ch.psi.daq.test.queryrest.query.DummyArchiverApplianceReader;
|
||||
|
||||
/**
|
||||
* Tests the {@link DaqController} implementation.
|
||||
@ -47,17 +53,30 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest {
|
||||
@Test
|
||||
public void testChannelNameQuery() throws Exception {
|
||||
|
||||
this.mockMvc.perform(
|
||||
MockMvcRequestBuilders
|
||||
.get(QueryRestController.CHANNELS)
|
||||
.contentType(MediaType.APPLICATION_JSON))
|
||||
this.mockMvc
|
||||
.perform(
|
||||
MockMvcRequestBuilders
|
||||
.get(QueryRestController.CHANNELS)
|
||||
.contentType(MediaType.APPLICATION_JSON))
|
||||
.andDo(MockMvcResultHandlers.print())
|
||||
.andExpect(MockMvcResultMatchers.status().isOk())
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$").isArray())
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0]").exists())
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0]").value("BoolScalar"))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].backend").value(Backend.databuffer.name()))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].channels").isArray())
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].channels[0]").exists())
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].channels[0]").value("BoolScalar"))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].channels[1]").exists())
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].channels[1]").value("BoolWaveform"))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[1]").exists())
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[1]").value("BoolWaveform"));
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[1].backend").value(Backend.archiverappliance.name()))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[1].channels").isArray())
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[1].channels[0]").exists())
|
||||
.andExpect(
|
||||
MockMvcResultMatchers.jsonPath("$[1].channels[0]").value(DummyArchiverApplianceReader.TEST_CHANNEL_1))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[1].channels[1]").exists())
|
||||
.andExpect(
|
||||
MockMvcResultMatchers.jsonPath("$[1].channels[1]").value(DummyArchiverApplianceReader.TEST_CHANNEL_2));
|
||||
|
||||
}
|
||||
|
||||
@ -71,14 +90,101 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest {
|
||||
.andExpect(MockMvcResultMatchers.status().isOk())
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$").isArray())
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0]").exists())
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0]").value("Int32Scalar"))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].backend").value(Backend.databuffer.name()))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].channels").isArray())
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].channels[0]").exists())
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].channels[0]").value("Int32Scalar"))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].channels[1]").exists())
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].channels[1]").value("Int32Waveform"))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].channels[2]").exists())
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].channels[2]").value("UInt32Scalar"))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].channels[3]").exists())
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].channels[3]").value("UInt32Waveform"))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[1]").exists())
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[1]").value("Int32Waveform"))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[2]").exists())
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[2]").value("UInt32Scalar"))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[3]").exists())
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[3]").value("UInt32Waveform"))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[4]").doesNotExist());
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[1].backend").value(Backend.archiverappliance.name()))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[1].channels").isArray())
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[1].channels[0]").doesNotExist());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testChannelNameQueryBackendOrder() throws Exception {
|
||||
ChannelsRequest request = new ChannelsRequest("int64", Ordering.desc, Backend.databuffer);
|
||||
|
||||
String content = mapper.writeValueAsString(request);
|
||||
System.out.println(content);
|
||||
|
||||
this.mockMvc
|
||||
.perform(MockMvcRequestBuilders
|
||||
.post(QueryRestController.CHANNELS)
|
||||
.contentType(MediaType.APPLICATION_JSON)
|
||||
.content(content))
|
||||
|
||||
.andExpect(MockMvcResultMatchers.status().isOk())
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$").isArray())
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0]").exists())
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].backend").value(Backend.databuffer.name()))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].channels").isArray())
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].channels[0]").exists())
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].channels[0]").value("UInt64Waveform"))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].channels[1]").exists())
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].channels[1]").value("UInt64Scalar"))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].channels[2]").exists())
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].channels[2]").value("Int64Waveform"))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].channels[3]").exists())
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].channels[3]").value("Int64Scalar"))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[1]").doesNotExist());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testChannelNameQueryReload() throws Exception {
|
||||
ChannelsRequest request = new ChannelsRequest();
|
||||
|
||||
String content = mapper.writeValueAsString(request);
|
||||
System.out.println(content);
|
||||
|
||||
this.mockMvc
|
||||
.perform(MockMvcRequestBuilders
|
||||
.post(QueryRestController.CHANNELS)
|
||||
.contentType(MediaType.APPLICATION_JSON)
|
||||
.content(content))
|
||||
|
||||
.andExpect(MockMvcResultMatchers.status().isOk())
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$").isArray())
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0]").exists())
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].backend").value(Backend.databuffer.name()))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].channels").isArray())
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].channels[23]").exists())
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].channels[24]").doesNotExist())
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[1]").exists())
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[1].backend").value(Backend.archiverappliance.name()))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[1].channels").isArray())
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[1].channels[2]").exists())
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[1].channels[3]").doesNotExist());
|
||||
|
||||
// each reload add another channel
|
||||
request.setReload(true);
|
||||
|
||||
content = mapper.writeValueAsString(request);
|
||||
System.out.println(content);
|
||||
|
||||
this.mockMvc
|
||||
.perform(MockMvcRequestBuilders
|
||||
.post(QueryRestController.CHANNELS)
|
||||
.contentType(MediaType.APPLICATION_JSON)
|
||||
.content(content))
|
||||
|
||||
.andExpect(MockMvcResultMatchers.status().isOk())
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$").isArray())
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0]").exists())
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].backend").value(Backend.databuffer.name()))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].channels").isArray())
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].channels[24]").exists())
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].channels[25]").doesNotExist())
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[1]").exists())
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[1].backend").value(Backend.archiverappliance.name()))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[1].channels").isArray())
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[1].channels[3]").exists())
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[1].channels[4]").doesNotExist());
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -166,7 +272,6 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest {
|
||||
"Origin, Authorization, Accept, Content-Type"));
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testPulseRangeQuery() throws Exception {
|
||||
DAQQuery request = new DAQQuery(
|
||||
@ -174,7 +279,42 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest {
|
||||
10,
|
||||
11),
|
||||
TEST_CHANNEL_NAMES);
|
||||
request.setCompression(Compression.NONE);
|
||||
|
||||
String content = mapper.writeValueAsString(request);
|
||||
System.out.println(content);
|
||||
|
||||
this.mockMvc
|
||||
.perform(MockMvcRequestBuilders
|
||||
.post(QueryRestController.QUERY)
|
||||
.contentType(MediaType.APPLICATION_JSON)
|
||||
.content(content))
|
||||
|
||||
.andDo(MockMvcResultHandlers.print())
|
||||
.andExpect(MockMvcResultMatchers.status().isOk())
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$").isArray())
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0]").exists())
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].channel").value(TEST_CHANNEL_01))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].pulseId").value(10))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].globalMillis").value(100))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].pulseId").value(11))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].globalMillis").value(110))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[1]").exists())
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[1].channel").value(TEST_CHANNEL_02))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[1].data").isArray())
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[1].data[0].pulseId").value(10))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[1].data[0].globalMillis").value(100))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[1].data[1].pulseId").value(11))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[1].data[1].globalMillis").value(110));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPulseRangeQueryBackends() throws Exception {
|
||||
DAQQuery request = new DAQQuery(
|
||||
new RequestRangePulseId(
|
||||
10,
|
||||
11),
|
||||
new ChannelName(TEST_CHANNEL_01, Backend.databuffer),
|
||||
new ChannelName(TEST_CHANNEL_02, Backend.archiverappliance));
|
||||
|
||||
String content = mapper.writeValueAsString(request);
|
||||
System.out.println(content);
|
||||
@ -203,6 +343,58 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest {
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[1].data[1].globalMillis").value(110));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPulseRangeQueries() throws Exception {
|
||||
String testChannel3 = "testChannel3";
|
||||
DAQQueries request = new DAQQueries(
|
||||
new DAQQueryElement(
|
||||
new RequestRangePulseId(
|
||||
10,
|
||||
11),
|
||||
TEST_CHANNEL_NAMES),
|
||||
new DAQQueryElement(
|
||||
new RequestRangePulseId(
|
||||
10,
|
||||
11),
|
||||
testChannel3));
|
||||
|
||||
String content = mapper.writeValueAsString(request);
|
||||
System.out.println(content);
|
||||
|
||||
this.mockMvc
|
||||
.perform(MockMvcRequestBuilders
|
||||
.post(QueryRestController.QUERIES)
|
||||
.contentType(MediaType.APPLICATION_JSON)
|
||||
.content(content))
|
||||
|
||||
.andDo(MockMvcResultHandlers.print())
|
||||
.andExpect(MockMvcResultMatchers.status().isOk())
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$").isArray())
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0]").exists())
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0]").isArray())
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0][0]").exists())
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0][0].channel").value(TEST_CHANNEL_01))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0][0].data[0].pulseId").value(10))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0][0].data[0].globalMillis").value(100))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0][0].data[1].pulseId").value(11))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0][0].data[1].globalMillis").value(110))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0][1]").exists())
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0][1].channel").value(TEST_CHANNEL_02))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0][1].data").isArray())
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0][1].data[0].pulseId").value(10))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0][1].data[0].globalMillis").value(100))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0][1].data[1].pulseId").value(11))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0][1].data[1].globalMillis").value(110))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[1]").exists())
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[1]").isArray())
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[1][0]").exists())
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[1][0].channel").value(testChannel3))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[1][0].data[0].pulseId").value(10))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[1][0].data[0].globalMillis").value(100))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[1][0].data[1].pulseId").value(11))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[1][0].data[1].globalMillis").value(110));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTimeRangeQuery() throws Exception {
|
||||
DAQQuery request = new DAQQuery(
|
||||
@ -210,7 +402,6 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest {
|
||||
100,
|
||||
110),
|
||||
TEST_CHANNEL_NAMES);
|
||||
request.setCompression(Compression.NONE);
|
||||
|
||||
String content = mapper.writeValueAsString(request);
|
||||
|
||||
@ -247,7 +438,6 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest {
|
||||
startDate,
|
||||
endDate),
|
||||
TEST_CHANNEL_NAMES);
|
||||
request.setCompression(Compression.NONE);
|
||||
|
||||
String content = mapper.writeValueAsString(request);
|
||||
System.out.println(content);
|
||||
@ -284,11 +474,9 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest {
|
||||
new RequestRangePulseId(
|
||||
100,
|
||||
101),
|
||||
false,
|
||||
Ordering.asc,
|
||||
AggregationType.extrema,
|
||||
TEST_CHANNEL_NAMES[0]);
|
||||
request.setCompression(Compression.NONE);
|
||||
|
||||
String content = mapper.writeValueAsString(request);
|
||||
|
||||
@ -332,7 +520,6 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest {
|
||||
endDate),
|
||||
TEST_CHANNEL_01);
|
||||
request.setNrOfBins(2);
|
||||
request.setCompression(Compression.NONE);
|
||||
|
||||
String content = mapper.writeValueAsString(request);
|
||||
System.out.println(content);
|
||||
@ -360,17 +547,16 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest {
|
||||
|
||||
@Test
|
||||
public void testDateRangeQueryBinSizeAggregate() throws Exception {
|
||||
long startTime = 1000;
|
||||
long endTime = 1999;
|
||||
String startDate = RequestRangeDate.format(startTime);
|
||||
String endDate = RequestRangeDate.format(endTime);
|
||||
long startTime = 1000;
|
||||
long endTime = 1999;
|
||||
String startDate = RequestRangeDate.format(startTime);
|
||||
String endDate = RequestRangeDate.format(endTime);
|
||||
DAQQuery request = new DAQQuery(
|
||||
new RequestRangeDate(
|
||||
startDate,
|
||||
endDate),
|
||||
TEST_CHANNEL_01);
|
||||
request.setBinSize(100);
|
||||
request.setCompression(Compression.NONE);
|
||||
|
||||
String content = mapper.writeValueAsString(request);
|
||||
System.out.println(content);
|
||||
@ -419,9 +605,9 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest {
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[9].globalMillis").value(1900))
|
||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[9].eventCount").value(10));
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testGzipCompression() throws Exception {
|
||||
public void testGzipFileSuffixHeader() throws Exception {
|
||||
DAQQuery request = new DAQQuery(
|
||||
new RequestRangePulseId(
|
||||
10,
|
||||
@ -432,27 +618,6 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest {
|
||||
String content = mapper.writeValueAsString(request);
|
||||
System.out.println(content);
|
||||
|
||||
this.mockMvc
|
||||
.perform(MockMvcRequestBuilders
|
||||
.post(QueryRestController.QUERY)
|
||||
.contentType(MediaType.APPLICATION_JSON)
|
||||
.content(content))
|
||||
|
||||
.andDo(MockMvcResultHandlers.print())
|
||||
.andExpect(MockMvcResultMatchers.status().isOk());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGzipFileSuffixHeader() throws Exception {
|
||||
DAQQuery request = new DAQQuery(
|
||||
new RequestRangePulseId(
|
||||
10,
|
||||
11),
|
||||
TEST_CHANNEL_NAMES);
|
||||
request.setCompression(Compression.GZIP);
|
||||
|
||||
String content = mapper.writeValueAsString(request);
|
||||
|
||||
this.mockMvc
|
||||
.perform(MockMvcRequestBuilders
|
||||
.post(QueryRestController.QUERY)
|
||||
@ -463,7 +628,7 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest {
|
||||
.andExpect(MockMvcResultMatchers.status().isOk())
|
||||
.andExpect(MockMvcResultMatchers.header().string("Content-Disposition", "attachment; filename=data.gz"));
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testJsonFileSuffixHeader() throws Exception {
|
||||
DAQQuery request = new DAQQuery(
|
||||
@ -471,7 +636,6 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest {
|
||||
10,
|
||||
11),
|
||||
TEST_CHANNEL_NAMES);
|
||||
request.setCompression(Compression.NONE);
|
||||
|
||||
String content = mapper.writeValueAsString(request);
|
||||
|
||||
@ -485,5 +649,4 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest {
|
||||
.andExpect(MockMvcResultMatchers.status().isOk())
|
||||
.andExpect(MockMvcResultMatchers.header().string("Content-Disposition", "attachment; filename=data.json"));
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -2,6 +2,7 @@ package ch.psi.daq.test.queryrest.query;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.regex.Pattern;
|
||||
import java.util.stream.LongStream;
|
||||
import java.util.stream.Stream;
|
||||
@ -11,20 +12,30 @@ import com.google.common.collect.Lists;
|
||||
import ch.psi.daq.common.ordering.Ordering;
|
||||
import ch.psi.daq.domain.DataEvent;
|
||||
import ch.psi.daq.domain.cassandra.ChannelEvent;
|
||||
import ch.psi.daq.domain.reader.Backend;
|
||||
import ch.psi.daq.domain.reader.DataReader;
|
||||
|
||||
public class DummyDataReader implements DataReader {
|
||||
public class DummyArchiverApplianceReader implements DataReader {
|
||||
public static final String ARCHIVER_TEST_CHANNEL = "ArchiverTestChannel_";
|
||||
|
||||
public static final String TEST_CHANNEL_1 = "testChannel1";
|
||||
public static final String TEST_CHANNEL_2 = "testChannel2";
|
||||
public static final List<String> TEST_CHANNEL_NAMES = Lists.newArrayList(TEST_CHANNEL_1, TEST_CHANNEL_2);
|
||||
public static final String TEST_CHANNEL_1 = "ArchiverChannel_1";
|
||||
public static final String TEST_CHANNEL_2 = "ArchiverChannel_2";
|
||||
private List<String> channels = Lists.newArrayList(TEST_CHANNEL_1, TEST_CHANNEL_2);
|
||||
|
||||
private final Random random = new Random(0);
|
||||
private AtomicLong channelNameCallCounter = new AtomicLong();
|
||||
|
||||
|
||||
@Override
|
||||
public Backend getBackend() {
|
||||
return Backend.archiverappliance;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Stream<String> getChannelStream(String regex) {
|
||||
Stream<String> channelStream = TEST_CHANNEL_NAMES.stream();
|
||||
channels.add(ARCHIVER_TEST_CHANNEL + channelNameCallCounter.incrementAndGet());
|
||||
|
||||
Stream<String> channelStream = channels.stream();
|
||||
if (regex != null) {
|
||||
Pattern pattern = Pattern.compile(regex);
|
||||
channelStream = channelStream.filter(channel -> pattern.matcher(channel).find());
|
@ -7,6 +7,7 @@ import java.util.List;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.regex.Pattern;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.LongStream;
|
||||
@ -29,23 +30,25 @@ import ch.psi.daq.domain.cassandra.query.TimeRangeQuery;
|
||||
import ch.psi.daq.domain.cassandra.querying.ChannelEventQuery;
|
||||
import ch.psi.daq.domain.cassandra.querying.MetaChannelEvent;
|
||||
import ch.psi.daq.domain.cassandra.querying.EventQuery;
|
||||
import ch.psi.daq.domain.reader.Backend;
|
||||
|
||||
public class DummyCassandraReader implements CassandraReader {
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(DummyCassandraReader.class);
|
||||
public static final String DATABUFFER_TEST_CHANNEL = "DataBufferTestChannel_";
|
||||
|
||||
private static final int KEYSPACE = 1;
|
||||
private CassandraDataGen dataGen;
|
||||
private String[] channels;
|
||||
private List<String> channels;
|
||||
private Random random = new Random(0);
|
||||
private AtomicLong channelNameCallCounter = new AtomicLong();
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public DummyCassandraReader() {
|
||||
this.dataGen = new CassandraDataGen();
|
||||
this.channels = new String[] {
|
||||
"testChannel1",
|
||||
"testChannel2",
|
||||
|
||||
this.channels = Lists.newArrayList(
|
||||
"BoolScalar",
|
||||
"BoolWaveform",
|
||||
"Int8Scalar",
|
||||
@ -68,7 +71,12 @@ public class DummyCassandraReader implements CassandraReader {
|
||||
"Float32Waveform",
|
||||
"Float64Scalar",
|
||||
"Float64Waveform",
|
||||
"StringScalar"};
|
||||
"StringScalar");
|
||||
}
|
||||
|
||||
@Override
|
||||
public Backend getBackend() {
|
||||
return Backend.databuffer;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -76,10 +84,12 @@ public class DummyCassandraReader implements CassandraReader {
|
||||
*/
|
||||
@Override
|
||||
public Stream<String> getChannelStream(String regex) {
|
||||
Stream<String> channelStream = Stream.of(channels);
|
||||
channels.add(DATABUFFER_TEST_CHANNEL + channelNameCallCounter.incrementAndGet());
|
||||
|
||||
Stream<String> channelStream = channels.stream();
|
||||
if (regex != null) {
|
||||
Pattern pattern = Pattern.compile(regex.toLowerCase());
|
||||
channelStream = Stream.of(channels).filter(channel -> pattern.matcher(channel.toLowerCase()).find());
|
||||
channelStream = channelStream.filter(channel -> pattern.matcher(channel.toLowerCase()).find());
|
||||
}
|
||||
return channelStream;
|
||||
}
|
||||
|
Reference in New Issue
Block a user