ATEST-472
This commit is contained in:
26
Readme.md
26
Readme.md
@ -126,8 +126,7 @@ The following attributes can be specified:
|
||||
- **binSize**: Activates data binning. Specifies the number of pulses per bin for pulse-range queries or the number of milliseconds per bin for time-range queries (using number of pulses and number of milliseconds makes this binning strategy consistent between channel with different update frequencies).
|
||||
- **aggregations**: Activates data aggregation. Array of requested aggregations (see [here](https://github.psi.ch/sf_daq/ch.psi.daq.domain/blob/master/src/main/java/ch/psi/daq/domain/query/operation/Aggregation.java) for possible values). These values will be added to the *data* array response.
|
||||
- **aggregationType**: Specifies the type of aggregation (see [here](https://github.psi.ch/sf_daq/ch.psi.daq.domain/blob/master/src/main/java/ch/psi/daq/domain/query/operation/AggregationType.java)). The default type is *value* aggregation (e.g., sum([1,2,3])=6). Alternatively, it is possible to define *index* aggregation for multiple arrays in combination with binning (e.g., sum([1,2,3], [3,2,1]) = [4,4,4]).
|
||||
- **responseFormat**: Specifies the format the response of the requested data (see [here](https://github.psi.ch/sf_daq/ch.psi.daq.domain/blob/master/src/main/java/ch/psi/daq/domain/query/operation/ResponseFormat.java) for possible values).
|
||||
- **compression**: Defines how the response should be compressed (see [here](https://github.psi.ch/sf_daq/ch.psi.daq.domain/blob/master/src/main/java/ch/psi/daq/domain/query/operation/Compression.java) for possible values).
|
||||
- **response**: Specifies the format of the response of the requested data (see [here](Readme.md#response_format)). If this value is not set it defaults to JSON.
|
||||
|
||||
<a name="define_channel_names"/>
|
||||
|
||||
@ -204,16 +203,21 @@ Queries are applied to a range. The following types of ranges are supported.
|
||||
- **endSeconds**: The end time of the range in seconds.
|
||||
|
||||
|
||||
<a name="response_format"/>
|
||||
|
||||
### Response Format
|
||||
|
||||
The format of the response can be defined through the field `responseFormat` (values: **json**|csv). Please note that CSV does not support `index` and `extrema` aggregations.
|
||||
It is possible to specify the response format the queried data should have.
|
||||
|
||||
```json
|
||||
"response":{
|
||||
"format":"json",
|
||||
"compression":"none"
|
||||
}
|
||||
```
|
||||
|
||||
### Response Compression
|
||||
|
||||
Responses can be compressed when transferred from the server by setting the field `compression` (values: **none**|gzip|deflate).
|
||||
|
||||
If compression is enabled, you have to tell `curl` that the data is compressed by defining the attribute `--compressed` so that it decompresses the data automatically.
|
||||
- **format**: The format of the response (values: **json**|csv). Please note that `csv` does not support `index` and `extrema` aggregations.
|
||||
- **compression**: Responses can be compressed when transferred from the server (values: **none**|gzip). If compression is enabled, you have to tell `curl` that the data is compressed by defining the attribute `--compressed` so that it decompresses the data automatically.
|
||||
|
||||
|
||||
### Example Queries
|
||||
@ -478,7 +482,9 @@ curl -H "Content-Type: application/json" -X POST -d '{"fields":["pulseId","valu
|
||||
|
||||
```json
|
||||
{
|
||||
"responseFormat":"csv",
|
||||
"response":{
|
||||
"format":"csv"
|
||||
},
|
||||
"range":{
|
||||
"startPulseId":0,
|
||||
"endPulseId":4
|
||||
@ -504,7 +510,7 @@ It is possible to request the time in seconds (since January 1, 1970 (the UNIX e
|
||||
##### Command
|
||||
|
||||
```bash
|
||||
curl -H "Content-Type: application/json" -X POST -d '{"responseFormat":"csv","range":{"startPulseId":0,"endPulseId":4},"channels":["channel1","channel2"],"fields":["channel","pulseId","iocSeconds","globalSeconds","shape","eventCount","value"]}' http://data-api.psi.ch/sf/query
|
||||
curl -H "Content-Type: application/json" -X POST -d '{"response":{"format":"csv"},"range":{"startPulseId":0,"endPulseId":4},"channels":["channel1","channel2"],"fields":["channel","pulseId","iocSeconds","globalSeconds","shape","eventCount","value"]}' http://data-api.psi.ch/sf/query
|
||||
```
|
||||
|
||||
##### Response
|
||||
|
@ -23,23 +23,27 @@ import org.springframework.util.StringUtils;
|
||||
import org.springframework.validation.Validator;
|
||||
import org.springframework.web.servlet.config.annotation.WebMvcConfigurerAdapter;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonInclude.Include;
|
||||
import com.fasterxml.jackson.core.JsonFactory;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
import ch.psi.daq.common.statistic.StorelessStatistics;
|
||||
import ch.psi.daq.domain.DataEvent;
|
||||
import ch.psi.daq.domain.query.operation.Aggregation;
|
||||
import ch.psi.daq.domain.query.operation.QueryField;
|
||||
import ch.psi.daq.domain.query.operation.Response;
|
||||
import ch.psi.daq.query.analyzer.QueryAnalyzer;
|
||||
import ch.psi.daq.query.analyzer.QueryAnalyzerImpl;
|
||||
import ch.psi.daq.query.config.QueryConfig;
|
||||
import ch.psi.daq.query.model.Query;
|
||||
import ch.psi.daq.queryrest.controller.validator.QueryValidator;
|
||||
import ch.psi.daq.queryrest.model.PropertyFilterMixin;
|
||||
import ch.psi.daq.queryrest.query.QueryManager;
|
||||
import ch.psi.daq.queryrest.query.QueryManagerImpl;
|
||||
import ch.psi.daq.queryrest.response.PolymorphicResponseMixIn;
|
||||
import ch.psi.daq.queryrest.response.csv.CSVResponseStreamWriter;
|
||||
import ch.psi.daq.queryrest.response.json.JSONResponseStreamWriter;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonInclude.Include;
|
||||
import com.fasterxml.jackson.core.JsonFactory;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
@Configuration
|
||||
@Import(value=QueryRestConfigCORS.class)
|
||||
@PropertySource(value = {"classpath:queryrest.properties"})
|
||||
@ -83,6 +87,8 @@ public class QueryRestConfig extends WebMvcConfigurerAdapter {
|
||||
objectMapper.addMixIn(DataEvent.class, PropertyFilterMixin.class);
|
||||
objectMapper.addMixIn(StorelessStatistics.class, PropertyFilterMixin.class);
|
||||
objectMapper.addMixIn(EnumMap.class, PropertyFilterMixin.class);
|
||||
|
||||
objectMapper.addMixIn(Response.class, PolymorphicResponseMixIn.class);
|
||||
}
|
||||
|
||||
|
||||
@ -120,6 +126,11 @@ public class QueryRestConfig extends WebMvcConfigurerAdapter {
|
||||
public CSVResponseStreamWriter csvResponseStreamWriter() {
|
||||
return new CSVResponseStreamWriter();
|
||||
}
|
||||
|
||||
@Bean
|
||||
public QueryManager queryManager(){
|
||||
return new QueryManagerImpl();
|
||||
}
|
||||
|
||||
@Bean(name = BEAN_NAME_DEFAULT_RESPONSE_FIELDS)
|
||||
public Set<QueryField> defaultResponseFields() {
|
||||
|
@ -1,25 +1,13 @@
|
||||
package ch.psi.daq.queryrest.controller;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
import javax.annotation.PreDestroy;
|
||||
import javax.annotation.Resource;
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
import javax.validation.Valid;
|
||||
|
||||
import org.apache.commons.lang3.ArrayUtils;
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
import org.apache.commons.lang3.tuple.Triple;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.context.ApplicationContext;
|
||||
@ -35,13 +23,12 @@ import org.springframework.web.bind.annotation.RequestParam;
|
||||
import org.springframework.web.bind.annotation.ResponseBody;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
|
||||
import ch.psi.daq.cassandra.config.CassandraConfig;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
import ch.psi.daq.common.json.deserialize.AttributeBasedDeserializer;
|
||||
import ch.psi.daq.common.ordering.Ordering;
|
||||
import ch.psi.daq.domain.DataEvent;
|
||||
import ch.psi.daq.domain.FieldNames;
|
||||
import ch.psi.daq.domain.config.DomainConfig;
|
||||
import ch.psi.daq.domain.json.ChannelName;
|
||||
import ch.psi.daq.domain.query.DAQQueries;
|
||||
import ch.psi.daq.domain.query.DAQQuery;
|
||||
import ch.psi.daq.domain.query.DAQQueryElement;
|
||||
@ -50,21 +37,15 @@ import ch.psi.daq.domain.query.channels.ChannelsResponse;
|
||||
import ch.psi.daq.domain.query.operation.Aggregation;
|
||||
import ch.psi.daq.domain.query.operation.AggregationType;
|
||||
import ch.psi.daq.domain.query.operation.QueryField;
|
||||
import ch.psi.daq.domain.query.operation.Response;
|
||||
import ch.psi.daq.domain.query.operation.ResponseFormat;
|
||||
import ch.psi.daq.domain.reader.Backend;
|
||||
import ch.psi.daq.domain.request.validate.RequestProviderValidator;
|
||||
import ch.psi.daq.query.analyzer.QueryAnalyzer;
|
||||
import ch.psi.daq.query.config.QueryConfig;
|
||||
import ch.psi.daq.query.model.Query;
|
||||
import ch.psi.daq.query.model.impl.BackendQuery;
|
||||
import ch.psi.daq.query.processor.ChannelNameCache;
|
||||
import ch.psi.daq.query.processor.QueryProcessor;
|
||||
import ch.psi.daq.queryrest.query.QueryManager;
|
||||
import ch.psi.daq.queryrest.response.csv.CSVResponseStreamWriter;
|
||||
import ch.psi.daq.queryrest.response.json.JSONResponse;
|
||||
import ch.psi.daq.queryrest.response.json.JSONResponseStreamWriter;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
@RestController
|
||||
public class QueryRestController {
|
||||
|
||||
@ -74,6 +55,9 @@ public class QueryRestController {
|
||||
public static final String PATH_QUERY = DomainConfig.PATH_QUERY;
|
||||
public static final String PATH_QUERIES = DomainConfig.PATH_QUERIES;
|
||||
|
||||
@Resource
|
||||
private ApplicationContext appContext;
|
||||
|
||||
@Resource
|
||||
private Validator queryValidator;
|
||||
private Validator requestProviderValidator = new RequestProviderValidator();
|
||||
@ -85,65 +69,12 @@ public class QueryRestController {
|
||||
private CSVResponseStreamWriter csvResponseStreamWriter;
|
||||
|
||||
@Resource
|
||||
private ApplicationContext appContext;
|
||||
|
||||
private QueryManager queryManager;
|
||||
|
||||
@Resource
|
||||
private ObjectMapper objectMapper;
|
||||
|
||||
@Resource
|
||||
private Function<Query, QueryAnalyzer> queryAnalizerFactory;
|
||||
|
||||
private Map<Backend, QueryProcessor> queryProcessors = new LinkedHashMap<>();
|
||||
private ChannelNameCache channelNameCache;
|
||||
|
||||
@PostConstruct
|
||||
public void afterPropertiesSet() {
|
||||
List<Exception> exceptions = new ArrayList<>();
|
||||
|
||||
try {
|
||||
QueryProcessor queryProcessor =
|
||||
appContext.getBean(QueryConfig.BEAN_NAME_CASSANDRA_QUERY_PROCESSOR, QueryProcessor.class);
|
||||
queryProcessors.put(queryProcessor.getBackend(), queryProcessor);
|
||||
} catch (Exception e) {
|
||||
exceptions.add(e);
|
||||
LOGGER.warn("");
|
||||
LOGGER.warn("##########");
|
||||
LOGGER.warn("Could not load query processor for cassandra.");
|
||||
LOGGER.warn("##########");
|
||||
LOGGER.warn("");
|
||||
}
|
||||
|
||||
try {
|
||||
QueryProcessor queryProcessor =
|
||||
appContext.getBean(QueryConfig.BEAN_NAME_ARCHIVER_APPLIANCE_QUERY_PROCESSOR, QueryProcessor.class);
|
||||
queryProcessors.put(queryProcessor.getBackend(), queryProcessor);
|
||||
} catch (Exception e) {
|
||||
exceptions.add(e);
|
||||
LOGGER.warn("");
|
||||
LOGGER.warn("##########");
|
||||
LOGGER.warn("Could not load query processor for archiverappliance.");
|
||||
LOGGER.warn("##########");
|
||||
LOGGER.warn("");
|
||||
}
|
||||
|
||||
if (queryProcessors.isEmpty()) {
|
||||
LOGGER.error("No query processor could be loaded! Exceptions were: ");
|
||||
for (Exception exception : exceptions) {
|
||||
LOGGER.error("", exception);
|
||||
}
|
||||
|
||||
throw new RuntimeException("No Backends available!");
|
||||
}
|
||||
|
||||
channelNameCache =
|
||||
new ChannelNameCache(queryProcessors, appContext.getBean(CassandraConfig.BEAN_NAME_READ_TIMEOUT,
|
||||
Integer.class).longValue());
|
||||
}
|
||||
|
||||
@PreDestroy
|
||||
public void destroy() {
|
||||
channelNameCache.destroy();
|
||||
}
|
||||
|
||||
private Response defaultResponse = new JSONResponse();
|
||||
|
||||
@InitBinder
|
||||
protected void initBinder(WebDataBinder binder) {
|
||||
@ -161,12 +92,7 @@ public class QueryRestController {
|
||||
produces = {MediaType.APPLICATION_JSON_VALUE})
|
||||
public @ResponseBody List<ChannelsResponse> getChannels(@RequestBody(required = false) ChannelsRequest request)
|
||||
throws Throwable {
|
||||
// in case not specified use defaults (e.g. GET)
|
||||
if (request == null) {
|
||||
request = new ChannelsRequest();
|
||||
}
|
||||
|
||||
return channelNameCache.getChannels(request);
|
||||
return queryManager.getChannels(request);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -252,17 +178,7 @@ public class QueryRestController {
|
||||
try {
|
||||
LOGGER.debug("Executing queries '{}'", queries);
|
||||
|
||||
if (ResponseFormat.JSON.equals(queries.getResponseFormat())) {
|
||||
// write the response back to the client using java 8 streams
|
||||
jsonResponseStreamWriter.respond(executeQueries(queries), queries, res);
|
||||
} else if (ResponseFormat.CSV.equals(queries.getResponseFormat())) {
|
||||
// it's a CSV request
|
||||
executeQueriesCsv(queries, res);
|
||||
} else {
|
||||
String message = String.format("Unsupported response format '%s'", queries.getResponseFormat().name());
|
||||
LOGGER.error(message);
|
||||
throw new RuntimeException(message);
|
||||
}
|
||||
queries.getResponseOrDefault(defaultResponse).respond(appContext, queries, res);
|
||||
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("Failed to execute query '{}'.", queries, e);
|
||||
@ -270,86 +186,6 @@ public class QueryRestController {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns data in CSV format to the client.
|
||||
*/
|
||||
private void executeQueriesCsv(DAQQueries queries, HttpServletResponse res) throws Exception {
|
||||
|
||||
for (DAQQueryElement query : queries) {
|
||||
if (!(query.getAggregationType() == null || AggregationType.value.equals(query.getAggregationType()))) {
|
||||
// We allow only no aggregation or value aggregation as
|
||||
// extrema: nested structure and not clear how to map it to one line
|
||||
// index: value is an array of Statistics whose size is not clear at initialization time
|
||||
String message = "CSV export does not support '" + query.getAggregationType() + "'";
|
||||
LOGGER.warn(message);
|
||||
throw new IllegalArgumentException(message);
|
||||
}
|
||||
|
||||
|
||||
if (!ArrayUtils.contains(query.getColumns(), FieldNames.FIELD_GLOBAL_TIME)) {
|
||||
query.addField(QueryField.globalMillis);
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
LOGGER.debug("Executing query '{}'", queries);
|
||||
// write the response back to the client using java 8 streams
|
||||
csvResponseStreamWriter.respond(executeQueries(queries), queries, res);
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("Failed to execute query '{}'.", queries, e);
|
||||
throw e;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public List<Entry<DAQQueryElement, Stream<Triple<BackendQuery, ChannelName, ?>>>> executeQueries(DAQQueries queries) {
|
||||
// set backends if not defined yet
|
||||
channelNameCache.setBackends(queries);
|
||||
|
||||
List<Entry<DAQQueryElement, Stream<Triple<BackendQuery, ChannelName, ?>>>> results =
|
||||
new ArrayList<>(queries.getQueries().size());
|
||||
|
||||
for (DAQQueryElement queryElement : queries) {
|
||||
Stream<Triple<BackendQuery, ChannelName, ?>> resultStreams =
|
||||
BackendQuery
|
||||
.getBackendQueries(queryElement)
|
||||
.stream()
|
||||
.filter(query -> {
|
||||
QueryProcessor processor = queryProcessors.get(query.getBackend());
|
||||
if (processor != null) {
|
||||
return true;
|
||||
} else {
|
||||
LOGGER.warn("There is no QueryProcessor available for '{}'", query.getBackend());
|
||||
return false;
|
||||
}
|
||||
})
|
||||
.flatMap(query -> {
|
||||
QueryProcessor processor = queryProcessors.get(query.getBackend());
|
||||
QueryAnalyzer queryAnalizer = queryAnalizerFactory.apply(query);
|
||||
|
||||
// all the magic happens here
|
||||
Stream<Entry<ChannelName, Stream<? extends DataEvent>>> channelToDataEvents =
|
||||
processor.process(queryAnalizer);
|
||||
// do post-process
|
||||
Stream<Entry<ChannelName, ?>> channelToData = queryAnalizer.postProcess(channelToDataEvents);
|
||||
|
||||
return channelToData.map(entry -> {
|
||||
return Triple.of(query, entry.getKey(), entry.getValue());
|
||||
});
|
||||
});
|
||||
|
||||
// Now we have a stream that loads elements sequential BackendQuery by BackendQuery.
|
||||
// By materializing the outer Stream the elements of all BackendQuery are loaded async
|
||||
// (speeds things up but requires also more memory - i.e. it relies on Backends not loading
|
||||
// all elements into memory at once)
|
||||
resultStreams = resultStreams.collect(Collectors.toList()).stream();
|
||||
|
||||
results.add(Pair.of(queryElement, resultStreams));
|
||||
}
|
||||
|
||||
return results;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the current list of {@link Ordering}s available.
|
||||
*
|
||||
|
22
src/main/java/ch/psi/daq/queryrest/query/QueryManager.java
Normal file
22
src/main/java/ch/psi/daq/queryrest/query/QueryManager.java
Normal file
@ -0,0 +1,22 @@
|
||||
package ch.psi.daq.queryrest.query;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import org.apache.commons.lang3.tuple.Triple;
|
||||
|
||||
import ch.psi.daq.domain.json.ChannelName;
|
||||
import ch.psi.daq.domain.query.DAQQueries;
|
||||
import ch.psi.daq.domain.query.DAQQueryElement;
|
||||
import ch.psi.daq.domain.query.channels.ChannelsRequest;
|
||||
import ch.psi.daq.domain.query.channels.ChannelsResponse;
|
||||
import ch.psi.daq.query.model.impl.BackendQuery;
|
||||
|
||||
public interface QueryManager {
|
||||
|
||||
List<ChannelsResponse> getChannels(ChannelsRequest request) throws Exception;
|
||||
|
||||
List<Entry<DAQQueryElement, Stream<Triple<BackendQuery, ChannelName, ?>>>> executeQueries(DAQQueries queries)
|
||||
throws Exception;
|
||||
}
|
157
src/main/java/ch/psi/daq/queryrest/query/QueryManagerImpl.java
Normal file
157
src/main/java/ch/psi/daq/queryrest/query/QueryManagerImpl.java
Normal file
@ -0,0 +1,157 @@
|
||||
package ch.psi.daq.queryrest.query;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
import javax.annotation.PreDestroy;
|
||||
import javax.annotation.Resource;
|
||||
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
import org.apache.commons.lang3.tuple.Triple;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.context.ApplicationContext;
|
||||
|
||||
import ch.psi.daq.cassandra.config.CassandraConfig;
|
||||
import ch.psi.daq.domain.DataEvent;
|
||||
import ch.psi.daq.domain.json.ChannelName;
|
||||
import ch.psi.daq.domain.query.DAQQueries;
|
||||
import ch.psi.daq.domain.query.DAQQueryElement;
|
||||
import ch.psi.daq.domain.query.channels.ChannelsRequest;
|
||||
import ch.psi.daq.domain.query.channels.ChannelsResponse;
|
||||
import ch.psi.daq.domain.reader.Backend;
|
||||
import ch.psi.daq.query.analyzer.QueryAnalyzer;
|
||||
import ch.psi.daq.query.config.QueryConfig;
|
||||
import ch.psi.daq.query.model.Query;
|
||||
import ch.psi.daq.query.model.impl.BackendQuery;
|
||||
import ch.psi.daq.query.processor.ChannelNameCache;
|
||||
import ch.psi.daq.query.processor.QueryProcessor;
|
||||
|
||||
public class QueryManagerImpl implements QueryManager {
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(QueryManagerImpl.class);
|
||||
|
||||
@Resource
|
||||
private ApplicationContext appContext;
|
||||
|
||||
@Resource
|
||||
private Function<Query, QueryAnalyzer> queryAnalizerFactory;
|
||||
|
||||
private Map<Backend, QueryProcessor> queryProcessors = new LinkedHashMap<>();
|
||||
private ChannelNameCache channelNameCache;
|
||||
|
||||
@PostConstruct
|
||||
public void afterPropertiesSet() {
|
||||
List<Exception> exceptions = new ArrayList<>();
|
||||
|
||||
try {
|
||||
QueryProcessor queryProcessor =
|
||||
appContext.getBean(QueryConfig.BEAN_NAME_CASSANDRA_QUERY_PROCESSOR, QueryProcessor.class);
|
||||
queryProcessors.put(queryProcessor.getBackend(), queryProcessor);
|
||||
} catch (Exception e) {
|
||||
exceptions.add(e);
|
||||
LOGGER.warn("");
|
||||
LOGGER.warn("##########");
|
||||
LOGGER.warn("Could not load query processor for cassandra.");
|
||||
LOGGER.warn("##########");
|
||||
LOGGER.warn("");
|
||||
}
|
||||
|
||||
try {
|
||||
QueryProcessor queryProcessor =
|
||||
appContext.getBean(QueryConfig.BEAN_NAME_ARCHIVER_APPLIANCE_QUERY_PROCESSOR, QueryProcessor.class);
|
||||
queryProcessors.put(queryProcessor.getBackend(), queryProcessor);
|
||||
} catch (Exception e) {
|
||||
exceptions.add(e);
|
||||
LOGGER.warn("");
|
||||
LOGGER.warn("##########");
|
||||
LOGGER.warn("Could not load query processor for archiverappliance.");
|
||||
LOGGER.warn("##########");
|
||||
LOGGER.warn("");
|
||||
}
|
||||
|
||||
if (queryProcessors.isEmpty()) {
|
||||
LOGGER.error("No query processor could be loaded! Exceptions were: ");
|
||||
for (Exception exception : exceptions) {
|
||||
LOGGER.error("", exception);
|
||||
}
|
||||
|
||||
throw new RuntimeException("No Backends available!");
|
||||
}
|
||||
|
||||
channelNameCache =
|
||||
new ChannelNameCache(queryProcessors, appContext.getBean(CassandraConfig.BEAN_NAME_READ_TIMEOUT,
|
||||
Integer.class).longValue());
|
||||
}
|
||||
|
||||
@PreDestroy
|
||||
public void destroy() {
|
||||
channelNameCache.destroy();
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ChannelsResponse> getChannels(ChannelsRequest request) {
|
||||
// in case not specified use defaults (e.g. GET)
|
||||
if (request == null) {
|
||||
request = new ChannelsRequest();
|
||||
}
|
||||
|
||||
return channelNameCache.getChannels(request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Entry<DAQQueryElement, Stream<Triple<BackendQuery, ChannelName, ?>>>> executeQueries(DAQQueries queries) {
|
||||
// set backends if not defined yet
|
||||
channelNameCache.setBackends(queries);
|
||||
|
||||
List<Entry<DAQQueryElement, Stream<Triple<BackendQuery, ChannelName, ?>>>> results =
|
||||
new ArrayList<>(queries.getQueries().size());
|
||||
|
||||
for (DAQQueryElement queryElement : queries) {
|
||||
Stream<Triple<BackendQuery, ChannelName, ?>> resultStreams =
|
||||
BackendQuery
|
||||
.getBackendQueries(queryElement)
|
||||
.stream()
|
||||
.filter(query -> {
|
||||
QueryProcessor processor = queryProcessors.get(query.getBackend());
|
||||
if (processor != null) {
|
||||
return true;
|
||||
} else {
|
||||
LOGGER.warn("There is no QueryProcessor available for '{}'", query.getBackend());
|
||||
return false;
|
||||
}
|
||||
})
|
||||
.flatMap(query -> {
|
||||
QueryProcessor processor = queryProcessors.get(query.getBackend());
|
||||
QueryAnalyzer queryAnalizer = queryAnalizerFactory.apply(query);
|
||||
|
||||
// all the magic happens here
|
||||
Stream<Entry<ChannelName, Stream<? extends DataEvent>>> channelToDataEvents =
|
||||
processor.process(queryAnalizer);
|
||||
// do post-process
|
||||
Stream<Entry<ChannelName, ?>> channelToData = queryAnalizer.postProcess(channelToDataEvents);
|
||||
|
||||
return channelToData.map(entry -> {
|
||||
return Triple.of(query, entry.getKey(), entry.getValue());
|
||||
});
|
||||
});
|
||||
|
||||
// Now we have a stream that loads elements sequential BackendQuery by BackendQuery.
|
||||
// By materializing the outer Stream the elements of all BackendQuery are loaded async
|
||||
// (speeds things up but requires also more memory - i.e. it relies on Backends not loading
|
||||
// all elements into memory at once)
|
||||
resultStreams = resultStreams.collect(Collectors.toList()).stream();
|
||||
|
||||
results.add(Pair.of(queryElement, resultStreams));
|
||||
}
|
||||
|
||||
return results;
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,60 @@
|
||||
package ch.psi.daq.queryrest.response;
|
||||
|
||||
import java.io.OutputStream;
|
||||
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
|
||||
import org.springframework.context.ApplicationContext;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonIgnore;
|
||||
import com.fasterxml.jackson.core.JsonEncoding;
|
||||
|
||||
import ch.psi.daq.domain.query.DAQQueries;
|
||||
import ch.psi.daq.domain.query.operation.ResponseFormat;
|
||||
import ch.psi.daq.domain.query.operation.ResponseImpl;
|
||||
|
||||
public abstract class AbstractResponse extends ResponseImpl {
|
||||
|
||||
public AbstractResponse(ResponseFormat format) {
|
||||
super(format);
|
||||
}
|
||||
|
||||
@JsonIgnore
|
||||
@Override
|
||||
public abstract void respond(ApplicationContext context, DAQQueries queries, Object response) throws Exception;
|
||||
|
||||
/**
|
||||
* Configures the output stream and headers according to whether compression is wanted or not.
|
||||
* <p>
|
||||
* In order not to lose the information of the underlying type of data being transferred, the
|
||||
* Content-Type header stays the same but, if compressed, the content-encoding header will be set
|
||||
* accordingly.
|
||||
*
|
||||
* see http://tools.ietf.org/html/rfc2616#section-14.11 and see
|
||||
* http://tools.ietf.org/html/rfc2616#section-3.5
|
||||
*
|
||||
* @param httpResponse The HttpServletResponse
|
||||
* @param contentType The content type
|
||||
* @return OutputStream The OutputStream
|
||||
* @throws Exception Something goes wrong
|
||||
*/
|
||||
@JsonIgnore
|
||||
protected OutputStream handleCompressionAndResponseHeaders(HttpServletResponse httpResponse,
|
||||
String contentType) throws Exception {
|
||||
OutputStream out = httpResponse.getOutputStream();
|
||||
|
||||
httpResponse.setCharacterEncoding(JsonEncoding.UTF8.getJavaName());
|
||||
httpResponse.setContentType(contentType);
|
||||
|
||||
httpResponse.addHeader("Content-Type", contentType);
|
||||
String filename = "data." + this.getFileSuffix();
|
||||
httpResponse.addHeader("Content-Disposition", "attachment; filename=" + filename);
|
||||
|
||||
if (this.isCompressed()) {
|
||||
httpResponse.addHeader("Content-Encoding", this.getCompression().toString());
|
||||
out = this.getCompression().wrapStream(out);
|
||||
}
|
||||
|
||||
return out;
|
||||
}
|
||||
}
|
@ -1,54 +0,0 @@
|
||||
/**
|
||||
*
|
||||
*/
|
||||
package ch.psi.daq.queryrest.response;
|
||||
|
||||
import java.io.OutputStream;
|
||||
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
|
||||
import org.springframework.http.MediaType;
|
||||
|
||||
import ch.psi.daq.domain.query.operation.ResponseFormat;
|
||||
import ch.psi.daq.domain.query.operation.ResponseOptions;
|
||||
|
||||
public abstract class AbstractResponseStreamWriter implements ResponseStreamWriter {
|
||||
|
||||
public static final String CONTENT_TYPE_CSV = "text/csv";
|
||||
protected static final String CONTENT_TYPE_JSON = MediaType.APPLICATION_JSON_VALUE;
|
||||
|
||||
/**
|
||||
* Configures the output stream and headers according to whether compression is wanted or not.
|
||||
* <p>
|
||||
* In order not to lose the information of the underlying type of data being transferred, the
|
||||
* Content-Type header stays the same but, if compressed, the content-encoding header will be set
|
||||
* accordingly.
|
||||
*
|
||||
* see http://tools.ietf.org/html/rfc2616#section-14.11 and
|
||||
* see http://tools.ietf.org/html/rfc2616#section-3.5
|
||||
*
|
||||
* @param options The options for the response
|
||||
* @param response The HttpServletResponse
|
||||
* @param contentType The content type
|
||||
* @return OutputStream The OutputStream
|
||||
* @throws Exception Something goes wrong
|
||||
*/
|
||||
protected OutputStream handleCompressionAndResponseHeaders(ResponseOptions options, HttpServletResponse response,
|
||||
String contentType) throws Exception {
|
||||
OutputStream out = response.getOutputStream();
|
||||
|
||||
response.addHeader("Content-Type", contentType);
|
||||
if (options.isCompressed()) {
|
||||
String filename = "data." + options.getCompression().getFileSuffix();
|
||||
response.addHeader("Content-Disposition", "attachment; filename=" + filename);
|
||||
response.addHeader("Content-Encoding", options.getCompression().toString());
|
||||
out = options.getCompression().wrapStream(out);
|
||||
} else {
|
||||
String filename = "data." + (options.getResponseFormat() == ResponseFormat.CSV ? "csv" : "json");
|
||||
response.addHeader("Content-Disposition", "attachment; filename=" + filename);
|
||||
}
|
||||
|
||||
return out;
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,20 @@
|
||||
package ch.psi.daq.queryrest.response;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonSubTypes;
|
||||
import com.fasterxml.jackson.annotation.JsonSubTypes.Type;
|
||||
import com.fasterxml.jackson.annotation.JsonTypeInfo;
|
||||
|
||||
import ch.psi.daq.queryrest.response.csv.CSVResponse;
|
||||
import ch.psi.daq.queryrest.response.json.JSONResponse;
|
||||
|
||||
@JsonTypeInfo(
|
||||
use = JsonTypeInfo.Id.NAME,
|
||||
include = JsonTypeInfo.As.EXISTING_PROPERTY,
|
||||
property = "format")
|
||||
@JsonSubTypes({
|
||||
@Type(value = JSONResponse.class, name = JSONResponse.FORMAT),
|
||||
@Type(value = CSVResponse.class, name = CSVResponse.FORMAT)
|
||||
})
|
||||
// see: http://stackoverflow.com/questions/24631923/alternative-to-jackson-jsonsubtypes
|
||||
public abstract class PolymorphicResponseMixIn {
|
||||
}
|
@ -1,17 +1,16 @@
|
||||
package ch.psi.daq.queryrest.response;
|
||||
|
||||
import java.io.OutputStream;
|
||||
import java.util.List;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import javax.servlet.ServletResponse;
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
|
||||
import org.apache.commons.lang3.tuple.Triple;
|
||||
|
||||
import ch.psi.daq.domain.json.ChannelName;
|
||||
import ch.psi.daq.domain.query.DAQQueryElement;
|
||||
import ch.psi.daq.domain.query.operation.ResponseOptions;
|
||||
import ch.psi.daq.query.model.impl.BackendQuery;
|
||||
|
||||
public interface ResponseStreamWriter {
|
||||
@ -21,10 +20,8 @@ public interface ResponseStreamWriter {
|
||||
* {@link ServletResponse}.
|
||||
*
|
||||
* @param results The results results
|
||||
* @param options The options for the response
|
||||
* @param response {@link ServletResponse} instance given by the current HTTP request
|
||||
* @param out The OutputStream
|
||||
* @throws Exception thrown if writing to the output stream fails
|
||||
*/
|
||||
public void respond(List<Entry<DAQQueryElement, Stream<Triple<BackendQuery, ChannelName, ?>>>> results, ResponseOptions options,
|
||||
HttpServletResponse response) throws Exception;
|
||||
public void respond(List<Entry<DAQQueryElement, Stream<Triple<BackendQuery, ChannelName, ?>>>> results, OutputStream out) throws Exception;
|
||||
}
|
||||
|
@ -0,0 +1,94 @@
|
||||
package ch.psi.daq.queryrest.response.csv;
|
||||
|
||||
import java.io.OutputStream;
|
||||
import java.util.List;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
|
||||
import org.apache.commons.lang3.tuple.Triple;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.context.ApplicationContext;
|
||||
|
||||
import com.hazelcast.util.collection.ArrayUtils;
|
||||
|
||||
import ch.psi.daq.domain.FieldNames;
|
||||
import ch.psi.daq.domain.json.ChannelName;
|
||||
import ch.psi.daq.domain.query.DAQQueries;
|
||||
import ch.psi.daq.domain.query.DAQQueryElement;
|
||||
import ch.psi.daq.domain.query.operation.AggregationType;
|
||||
import ch.psi.daq.domain.query.operation.Compression;
|
||||
import ch.psi.daq.domain.query.operation.QueryField;
|
||||
import ch.psi.daq.domain.query.operation.ResponseFormat;
|
||||
import ch.psi.daq.query.model.impl.BackendQuery;
|
||||
import ch.psi.daq.queryrest.query.QueryManager;
|
||||
import ch.psi.daq.queryrest.response.AbstractResponse;
|
||||
|
||||
public class CSVResponse extends AbstractResponse {
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(CSVResponse.class);
|
||||
|
||||
public static final String FORMAT = "csv";
|
||||
public static final String CONTENT_TYPE = "text/csv";
|
||||
|
||||
public CSVResponse() {
|
||||
super(ResponseFormat.CSV);
|
||||
}
|
||||
|
||||
public CSVResponse(Compression compression) {
|
||||
this();
|
||||
setCompression(compression);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void respond(ApplicationContext context, DAQQueries queries, Object response) throws Exception {
|
||||
OutputStream out;
|
||||
if (response instanceof HttpServletResponse) {
|
||||
out = super.handleCompressionAndResponseHeaders((HttpServletResponse) response, CONTENT_TYPE);
|
||||
} else {
|
||||
String message =
|
||||
String.format("'%s' does not support response Object of type '%s'", getFormat().getKey(), response
|
||||
.getClass().getName());
|
||||
LOGGER.error(message);
|
||||
throw new IllegalArgumentException(message);
|
||||
}
|
||||
|
||||
// do csv specific validations
|
||||
validateQueries(queries);
|
||||
|
||||
try {
|
||||
LOGGER.debug("Executing query '{}'", queries);
|
||||
|
||||
QueryManager queryManager = context.getBean(QueryManager.class);
|
||||
CSVResponseStreamWriter streamWriter = context.getBean(CSVResponseStreamWriter.class);
|
||||
|
||||
// execute query
|
||||
List<Entry<DAQQueryElement, Stream<Triple<BackendQuery, ChannelName, ?>>>> result =
|
||||
queryManager.executeQueries(queries);
|
||||
// write the response back to the client using java 8 streams
|
||||
streamWriter.respond(result, out);
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("Failed to execute query '{}'.", queries, e);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
protected void validateQueries(DAQQueries queries) {
|
||||
for (DAQQueryElement query : queries) {
|
||||
if (!(query.getAggregationType() == null || AggregationType.value.equals(query.getAggregationType()))) {
|
||||
// We allow only no aggregation or value aggregation as
|
||||
// extrema: nested structure and not clear how to map it to one line
|
||||
// index: value is an array of Statistics whose size is not clear at initialization time
|
||||
String message = "CSV export does not support '" + query.getAggregationType() + "'";
|
||||
LOGGER.warn(message);
|
||||
throw new IllegalArgumentException(message);
|
||||
}
|
||||
|
||||
|
||||
if (!ArrayUtils.contains(query.getColumns(), FieldNames.FIELD_GLOBAL_TIME)) {
|
||||
query.addField(QueryField.globalMillis);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -21,7 +21,6 @@ import java.util.stream.Stream;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import javax.servlet.ServletResponse;
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
|
||||
import org.apache.commons.csv.CSVFormat;
|
||||
import org.apache.commons.csv.CSVPrinter;
|
||||
@ -30,8 +29,6 @@ import org.apache.commons.lang3.tuple.Triple;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonEncoding;
|
||||
|
||||
import ch.psi.daq.common.stream.StreamIterable;
|
||||
import ch.psi.daq.common.stream.StreamMatcher;
|
||||
import ch.psi.daq.domain.DataEvent;
|
||||
@ -39,18 +36,18 @@ import ch.psi.daq.domain.json.ChannelName;
|
||||
import ch.psi.daq.domain.query.DAQQueryElement;
|
||||
import ch.psi.daq.domain.query.operation.Aggregation;
|
||||
import ch.psi.daq.domain.query.operation.QueryField;
|
||||
import ch.psi.daq.domain.query.operation.ResponseOptions;
|
||||
import ch.psi.daq.query.analyzer.QueryAnalyzer;
|
||||
import ch.psi.daq.query.model.Query;
|
||||
import ch.psi.daq.query.model.impl.BackendQuery;
|
||||
import ch.psi.daq.queryrest.response.AbstractResponseStreamWriter;
|
||||
import ch.psi.daq.queryrest.response.ResponseStreamWriter;
|
||||
|
||||
/**
|
||||
* Takes a Java 8 stream and writes it to the output stream provided by the {@link ServletResponse}
|
||||
* of the current request.
|
||||
*/
|
||||
public class CSVResponseStreamWriter extends AbstractResponseStreamWriter {
|
||||
|
||||
public class CSVResponseStreamWriter implements ResponseStreamWriter {
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(CSVResponseStreamWriter.class);
|
||||
|
||||
public static final char DELIMITER_CVS = ';';
|
||||
public static final String DELIMITER_ARRAY = ",";
|
||||
public static final char DELIMITER_CHANNELNAME_FIELDNAME = '.';
|
||||
@ -61,27 +58,13 @@ public class CSVResponseStreamWriter extends AbstractResponseStreamWriter {
|
||||
private static final ToLongFunction<Pair<ChannelName, DataEvent>> MATCHER_PROVIDER = (pair) -> pair.getValue()
|
||||
.getGlobalMillis() / 10L;
|
||||
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(CSVResponseStreamWriter.class);
|
||||
|
||||
@Resource
|
||||
private Function<Query, QueryAnalyzer> queryAnalizerFactory;
|
||||
|
||||
@Override
|
||||
public void respond(List<Entry<DAQQueryElement, Stream<Triple<BackendQuery, ChannelName, ?>>>> results,
|
||||
ResponseOptions options,
|
||||
HttpServletResponse response) throws Exception {
|
||||
response.setCharacterEncoding(JsonEncoding.UTF8.getJavaName());
|
||||
response.setContentType(CONTENT_TYPE_CSV);
|
||||
|
||||
respondInternal(results, options, response);
|
||||
}
|
||||
|
||||
private void respondInternal(List<Entry<DAQQueryElement, Stream<Triple<BackendQuery, ChannelName, ?>>>> results,
|
||||
ResponseOptions options, HttpServletResponse response) throws Exception {
|
||||
public void respond(final List<Entry<DAQQueryElement, Stream<Triple<BackendQuery, ChannelName, ?>>>> results, final OutputStream out) throws Exception {
|
||||
AtomicReference<Exception> exception = new AtomicReference<>();
|
||||
|
||||
final OutputStream out = handleCompressionAndResponseHeaders(options, response, CONTENT_TYPE_CSV);
|
||||
|
||||
final Map<ChannelName, Stream<Pair<ChannelName, DataEvent>>> streams = new LinkedHashMap<>(results.size());
|
||||
final List<String> header = new ArrayList<>();
|
||||
final Collection<Pair<ChannelName, Function<DataEvent, String>>> accessors = new ArrayList<>();
|
||||
|
@ -0,0 +1,69 @@
|
||||
package ch.psi.daq.queryrest.response.json;
|
||||
|
||||
import java.io.OutputStream;
|
||||
import java.util.List;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
|
||||
import org.apache.commons.lang3.tuple.Triple;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.context.ApplicationContext;
|
||||
import org.springframework.http.MediaType;
|
||||
|
||||
import ch.psi.daq.domain.json.ChannelName;
|
||||
import ch.psi.daq.domain.query.DAQQueries;
|
||||
import ch.psi.daq.domain.query.DAQQueryElement;
|
||||
import ch.psi.daq.domain.query.operation.Compression;
|
||||
import ch.psi.daq.domain.query.operation.ResponseFormat;
|
||||
import ch.psi.daq.query.model.impl.BackendQuery;
|
||||
import ch.psi.daq.queryrest.query.QueryManager;
|
||||
import ch.psi.daq.queryrest.response.AbstractResponse;
|
||||
|
||||
public class JSONResponse extends AbstractResponse {
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(JSONResponse.class);
|
||||
|
||||
public static final String FORMAT = "json";
|
||||
public static final String CONTENT_TYPE = MediaType.APPLICATION_JSON_VALUE;
|
||||
|
||||
public JSONResponse() {
|
||||
super(ResponseFormat.JSON);
|
||||
}
|
||||
|
||||
public JSONResponse(Compression compression) {
|
||||
this();
|
||||
setCompression(compression);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void respond(ApplicationContext context, DAQQueries queries, Object response) throws Exception {
|
||||
OutputStream out;
|
||||
if (response instanceof HttpServletResponse) {
|
||||
out = super.handleCompressionAndResponseHeaders((HttpServletResponse) response, CONTENT_TYPE);
|
||||
} else {
|
||||
String message =
|
||||
String.format("'%s' does not support response Object of type '%s'", getFormat().getKey(), response.getClass()
|
||||
.getName());
|
||||
LOGGER.error(message);
|
||||
throw new IllegalArgumentException(message);
|
||||
}
|
||||
|
||||
try {
|
||||
LOGGER.debug("Executing query '{}'", queries);
|
||||
|
||||
QueryManager queryManager = context.getBean(QueryManager.class);
|
||||
JSONResponseStreamWriter streamWriter = context.getBean(JSONResponseStreamWriter.class);
|
||||
|
||||
// execute query
|
||||
List<Entry<DAQQueryElement, Stream<Triple<BackendQuery, ChannelName, ?>>>> result = queryManager.executeQueries(queries);
|
||||
// write the response back to the client using java 8 streams
|
||||
streamWriter.respond(result, out);
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("Failed to execute query '{}'.", queries, e);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
@ -10,12 +10,10 @@ import java.util.stream.Stream;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import javax.servlet.ServletResponse;
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
|
||||
import org.apache.commons.lang3.tuple.Triple;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.http.MediaType;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonEncoding;
|
||||
import com.fasterxml.jackson.core.JsonFactory;
|
||||
@ -29,15 +27,14 @@ import ch.psi.daq.domain.json.ChannelName;
|
||||
import ch.psi.daq.domain.query.DAQQueryElement;
|
||||
import ch.psi.daq.domain.query.operation.Aggregation;
|
||||
import ch.psi.daq.domain.query.operation.QueryField;
|
||||
import ch.psi.daq.domain.query.operation.ResponseOptions;
|
||||
import ch.psi.daq.query.model.impl.BackendQuery;
|
||||
import ch.psi.daq.queryrest.response.AbstractResponseStreamWriter;
|
||||
import ch.psi.daq.queryrest.response.ResponseStreamWriter;
|
||||
|
||||
/**
|
||||
* Takes a Java 8 stream and writes it to the output stream provided by the {@link ServletResponse}
|
||||
* of the current request.
|
||||
*/
|
||||
public class JSONResponseStreamWriter extends AbstractResponseStreamWriter {
|
||||
public class JSONResponseStreamWriter implements ResponseStreamWriter {
|
||||
|
||||
private static final String DATA_RESP_FIELD = "data";
|
||||
|
||||
@ -49,20 +46,9 @@ public class JSONResponseStreamWriter extends AbstractResponseStreamWriter {
|
||||
@Resource
|
||||
private ObjectMapper mapper;
|
||||
|
||||
|
||||
@Override
|
||||
public void respond(List<Entry<DAQQueryElement, Stream<Triple<BackendQuery, ChannelName, ?>>>> results,
|
||||
ResponseOptions options, HttpServletResponse response) throws Exception {
|
||||
response.setCharacterEncoding(JsonEncoding.UTF8.getJavaName());
|
||||
response.setContentType(MediaType.APPLICATION_JSON_VALUE);
|
||||
|
||||
respondInternal(results, options, response);
|
||||
}
|
||||
|
||||
private void respondInternal(List<Entry<DAQQueryElement, Stream<Triple<BackendQuery, ChannelName, ?>>>> results,
|
||||
ResponseOptions options, HttpServletResponse response) throws Exception {
|
||||
public void respond(List<Entry<DAQQueryElement, Stream<Triple<BackendQuery, ChannelName, ?>>>> results, OutputStream out) throws Exception {
|
||||
AtomicReference<Exception> exception = new AtomicReference<>();
|
||||
OutputStream out = handleCompressionAndResponseHeaders(options, response, CONTENT_TYPE_JSON);
|
||||
JsonGenerator generator = jsonFactory.createGenerator(out, JsonEncoding.UTF8);
|
||||
|
||||
try {
|
||||
|
@ -32,12 +32,12 @@ import ch.psi.daq.domain.query.operation.Aggregation;
|
||||
import ch.psi.daq.domain.query.operation.AggregationType;
|
||||
import ch.psi.daq.domain.query.operation.Compression;
|
||||
import ch.psi.daq.domain.query.operation.QueryField;
|
||||
import ch.psi.daq.domain.query.operation.ResponseFormat;
|
||||
import ch.psi.daq.domain.request.range.RequestRangeDate;
|
||||
import ch.psi.daq.domain.request.range.RequestRangePulseId;
|
||||
import ch.psi.daq.domain.request.range.RequestRangeTime;
|
||||
import ch.psi.daq.domain.test.TestTimeUtils;
|
||||
import ch.psi.daq.queryrest.controller.QueryRestController;
|
||||
import ch.psi.daq.queryrest.response.csv.CSVResponse;
|
||||
import ch.psi.daq.queryrest.response.csv.CSVResponseStreamWriter;
|
||||
import ch.psi.daq.test.queryrest.AbstractDaqRestTest;
|
||||
|
||||
@ -62,7 +62,7 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest {
|
||||
0,
|
||||
1),
|
||||
channels);
|
||||
request.setResponseFormat(ResponseFormat.CSV);
|
||||
request.setResponse(new CSVResponse());
|
||||
|
||||
LinkedHashSet<QueryField> queryFields = new LinkedHashSet<>();
|
||||
queryFields.add(QueryField.channel);
|
||||
@ -155,7 +155,7 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest {
|
||||
-1,
|
||||
-1),
|
||||
channels);
|
||||
request.setResponseFormat(ResponseFormat.CSV);
|
||||
request.setResponse(new CSVResponse());
|
||||
|
||||
LinkedHashSet<QueryField> queryFields = new LinkedHashSet<>();
|
||||
queryFields.add(QueryField.channel);
|
||||
@ -245,7 +245,7 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest {
|
||||
-1,
|
||||
-1),
|
||||
channels);
|
||||
request.setResponseFormat(ResponseFormat.CSV);
|
||||
request.setResponse(new CSVResponse());
|
||||
|
||||
LinkedHashSet<QueryField> queryFields = new LinkedHashSet<>();
|
||||
queryFields.add(QueryField.channel);
|
||||
@ -332,7 +332,7 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest {
|
||||
0,
|
||||
1),
|
||||
testChannel3));
|
||||
request.setResponseFormat(ResponseFormat.CSV);
|
||||
request.setResponse(new CSVResponse());
|
||||
channels = Arrays.asList(TEST_CHANNEL_01, TEST_CHANNEL_02, testChannel3);
|
||||
|
||||
LinkedHashSet<QueryField> queryFields = new LinkedHashSet<>();
|
||||
@ -418,7 +418,7 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest {
|
||||
0,
|
||||
1),
|
||||
channels);
|
||||
request.setResponseFormat(ResponseFormat.CSV);
|
||||
request.setResponse(new CSVResponse());
|
||||
|
||||
LinkedHashSet<QueryField> queryFields = new LinkedHashSet<>();
|
||||
queryFields.add(QueryField.channel);
|
||||
@ -502,7 +502,7 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest {
|
||||
TimeUtils.getTimeFromMillis(0, 0),
|
||||
TimeUtils.getTimeFromMillis(10, 0)),
|
||||
channels);
|
||||
request.setResponseFormat(ResponseFormat.CSV);
|
||||
request.setResponse(new CSVResponse());
|
||||
|
||||
LinkedHashSet<QueryField> queryFields = new LinkedHashSet<>();
|
||||
queryFields.add(QueryField.channel);
|
||||
@ -587,7 +587,7 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest {
|
||||
startDate,
|
||||
endDate),
|
||||
channels);
|
||||
request.setResponseFormat(ResponseFormat.CSV);
|
||||
request.setResponse(new CSVResponse());
|
||||
|
||||
LinkedHashSet<QueryField> queryFields = new LinkedHashSet<>();
|
||||
queryFields.add(QueryField.channel);
|
||||
@ -675,7 +675,7 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest {
|
||||
Ordering.asc,
|
||||
AggregationType.extrema,
|
||||
TEST_CHANNEL_NAMES[0]);
|
||||
request.setResponseFormat(ResponseFormat.CSV);
|
||||
request.setResponse(new CSVResponse());
|
||||
|
||||
String content = mapper.writeValueAsString(request);
|
||||
|
||||
@ -705,7 +705,7 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest {
|
||||
Ordering.asc,
|
||||
AggregationType.index,
|
||||
TEST_CHANNEL_NAMES[0]);
|
||||
request.setResponseFormat(ResponseFormat.CSV);
|
||||
request.setResponse(new CSVResponse());
|
||||
|
||||
String content = mapper.writeValueAsString(request);
|
||||
|
||||
@ -739,7 +739,7 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest {
|
||||
endDate),
|
||||
channels);
|
||||
request.setNrOfBins(2);
|
||||
request.setResponseFormat(ResponseFormat.CSV);
|
||||
request.setResponse(new CSVResponse());
|
||||
|
||||
LinkedHashSet<QueryField> queryFields = new LinkedHashSet<>();
|
||||
queryFields.add(QueryField.channel);
|
||||
@ -839,7 +839,7 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest {
|
||||
endDate),
|
||||
channels);
|
||||
request.setBinSize(100);
|
||||
request.setResponseFormat(ResponseFormat.CSV);
|
||||
request.setResponse(new CSVResponse());
|
||||
|
||||
LinkedHashSet<QueryField> queryFields = new LinkedHashSet<>();
|
||||
queryFields.add(QueryField.channel);
|
||||
@ -934,7 +934,7 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest {
|
||||
0,
|
||||
1),
|
||||
channels);
|
||||
request.setResponseFormat(ResponseFormat.CSV);
|
||||
request.setResponse(new CSVResponse());
|
||||
|
||||
LinkedHashSet<QueryField> queryFields = new LinkedHashSet<>();
|
||||
queryFields.add(QueryField.channel);
|
||||
@ -995,8 +995,7 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest {
|
||||
10,
|
||||
11),
|
||||
TEST_CHANNEL_NAMES);
|
||||
request.setResponseFormat(ResponseFormat.CSV);
|
||||
request.setCompression(Compression.GZIP);
|
||||
request.setResponse(new CSVResponse(Compression.GZIP));
|
||||
|
||||
String content = mapper.writeValueAsString(request);
|
||||
|
||||
@ -1018,7 +1017,7 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest {
|
||||
10,
|
||||
11),
|
||||
TEST_CHANNEL_NAMES);
|
||||
request.setResponseFormat(ResponseFormat.CSV);
|
||||
request.setResponse(new CSVResponse());
|
||||
|
||||
String content = mapper.writeValueAsString(request);
|
||||
|
||||
|
@ -24,6 +24,7 @@ import ch.psi.daq.domain.request.range.RequestRangePulseId;
|
||||
import ch.psi.daq.domain.request.range.RequestRangeTime;
|
||||
import ch.psi.daq.domain.test.TestTimeUtils;
|
||||
import ch.psi.daq.queryrest.controller.QueryRestController;
|
||||
import ch.psi.daq.queryrest.response.json.JSONResponse;
|
||||
import ch.psi.daq.test.queryrest.AbstractDaqRestTest;
|
||||
|
||||
/**
|
||||
@ -664,7 +665,7 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest {
|
||||
10,
|
||||
11),
|
||||
TEST_CHANNEL_NAMES);
|
||||
request.setCompression(Compression.GZIP);
|
||||
request.setResponse(new JSONResponse(Compression.GZIP));
|
||||
|
||||
String content = mapper.writeValueAsString(request);
|
||||
System.out.println(content);
|
||||
|
@ -0,0 +1,106 @@
|
||||
package ch.psi.daq.test.queryrest.response;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonParseException;
|
||||
import com.fasterxml.jackson.databind.JsonMappingException;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
import ch.psi.daq.domain.query.DAQQuery;
|
||||
import ch.psi.daq.domain.query.operation.Compression;
|
||||
import ch.psi.daq.domain.query.operation.Response;
|
||||
import ch.psi.daq.domain.request.range.RequestRangePulseId;
|
||||
import ch.psi.daq.queryrest.response.csv.CSVResponse;
|
||||
import ch.psi.daq.queryrest.response.json.JSONResponse;
|
||||
import ch.psi.daq.test.queryrest.AbstractDaqRestTest;
|
||||
|
||||
public class ResponseQueryTest extends AbstractDaqRestTest{
|
||||
|
||||
@Resource
|
||||
private ObjectMapper mapper;
|
||||
|
||||
@Test
|
||||
public void test_JSON_01() throws JsonParseException, JsonMappingException, IOException {
|
||||
Response respose = new CSVResponse();
|
||||
|
||||
String value = mapper.writeValueAsString(respose);
|
||||
|
||||
Response deserial = mapper.readValue(value, Response.class);
|
||||
|
||||
assertEquals(respose.getClass(), deserial.getClass());
|
||||
assertEquals(respose.getFormat(), deserial.getFormat());
|
||||
assertEquals(respose.getCompression(), deserial.getCompression());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test_JSON_02() throws JsonParseException, JsonMappingException, IOException {
|
||||
DAQQuery query = new DAQQuery(
|
||||
new RequestRangePulseId(
|
||||
0,
|
||||
100),
|
||||
"TestChannel_01");
|
||||
query.setResponse(new CSVResponse(Compression.GZIP));
|
||||
|
||||
String value = mapper.writeValueAsString(query);
|
||||
|
||||
DAQQuery deserial = mapper.readValue(value, DAQQuery.class);
|
||||
|
||||
assertNotNull(deserial.getResponse());
|
||||
assertEquals(query.getResponse().getClass(), deserial.getResponse().getClass());
|
||||
assertEquals(query.getResponse().getFormat(), deserial.getResponse().getFormat());
|
||||
assertEquals(query.getResponse().getCompression(), deserial.getResponse().getCompression());
|
||||
|
||||
assertEquals(query.getResponse().getCompression().getFileSuffix(), deserial.getResponse().getFileSuffix());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test_JSON_03() throws JsonParseException, JsonMappingException, IOException {
|
||||
DAQQuery query = new DAQQuery(
|
||||
new RequestRangePulseId(
|
||||
0,
|
||||
100),
|
||||
"TestChannel_01");
|
||||
query.setResponse(new JSONResponse(Compression.NONE));
|
||||
|
||||
String value = mapper.writeValueAsString(query);
|
||||
|
||||
int index = value.indexOf("format");
|
||||
assertTrue(index >= 0);
|
||||
index = value.indexOf("format", index + 1);
|
||||
// ensure string contains identifier only once
|
||||
assertEquals(-1, index);
|
||||
|
||||
DAQQuery deserial = mapper.readValue(value, DAQQuery.class);
|
||||
|
||||
assertNotNull(deserial.getResponse());
|
||||
assertEquals(query.getResponse().getClass(), deserial.getResponse().getClass());
|
||||
assertEquals(query.getResponse().getFormat(), deserial.getResponse().getFormat());
|
||||
assertEquals(query.getResponse().getCompression(), deserial.getResponse().getCompression());
|
||||
|
||||
assertEquals(query.getResponse().getFormat().getFileSuffix(), deserial.getResponse().getFileSuffix());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test_JSON_04() throws JsonParseException, JsonMappingException, IOException {
|
||||
DAQQuery query = new DAQQuery(
|
||||
new RequestRangePulseId(
|
||||
0,
|
||||
100),
|
||||
"TestChannel_01");
|
||||
|
||||
String value = mapper.writeValueAsString(query);
|
||||
|
||||
DAQQuery deserial = mapper.readValue(value, DAQQuery.class);
|
||||
|
||||
assertNull(deserial.getResponse());
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user