Merge pull request #8 in ST/ch.psi.daq.queryrest from ATEST-122 to master
# By Fabian Märki # Via Fabian Märki * commit '8ce83706010d3ad29ff7a03c9d7e4cfc58e9443a': ATEST-122
This commit is contained in:
@ -139,7 +139,7 @@ There exist following fields:
|
|||||||
- **fields**: The requested fields (see [here](https://github.psi.ch/projects/ST/repos/ch.psi.daq.query/browse/src/main/java/ch/psi/daq/query/model/QueryField.java) for possible values).
|
- **fields**: The requested fields (see [here](https://github.psi.ch/projects/ST/repos/ch.psi.daq.query/browse/src/main/java/ch/psi/daq/query/model/QueryField.java) for possible values).
|
||||||
- **nrOfBins**: Activates data binning. Specifies the number of bins the pulse/time range should be devided into.
|
- **nrOfBins**: Activates data binning. Specifies the number of bins the pulse/time range should be devided into.
|
||||||
- **binSize**: Activates data binning. Specifies the number of pulses per bin for pulse-range queries or the number of milliseconds per bin for time-range queries.
|
- **binSize**: Activates data binning. Specifies the number of pulses per bin for pulse-range queries or the number of milliseconds per bin for time-range queries.
|
||||||
- **aggregations**: Activates data aggregation. Array of requested aggregations (see [here](https://github.psi.ch/projects/ST/repos/ch.psi.daq.query/browse/src/main/java/ch/psi/daq/query/model/AggregationEnum.java) for possible values). These values will be added to the *data* array response.
|
- **aggregations**: Activates data aggregation. Array of requested aggregations (see [here](https://github.psi.ch/projects/ST/repos/ch.psi.daq.query/browse/src/main/java/ch/psi/daq/query/model/Aggregation.java) for possible values). These values will be added to the *data* array response.
|
||||||
- **aggregationType**: Specifies the type of aggregation (see [here](https://github.psi.ch/projects/ST/repos/ch.psi.daq.query/browse/src/main/java/ch/psi/daq/query/model/AggregationType.java)). The default type is *value* aggregation (e.g., sum([1,2,3])=6). Alternatively, it is possible to define *index* aggregation for multiple arrays in combination with binning (e.g., sum([1,2,3], [3,2,1]) = [4,4,4]).
|
- **aggregationType**: Specifies the type of aggregation (see [here](https://github.psi.ch/projects/ST/repos/ch.psi.daq.query/browse/src/main/java/ch/psi/daq/query/model/AggregationType.java)). The default type is *value* aggregation (e.g., sum([1,2,3])=6). Alternatively, it is possible to define *index* aggregation for multiple arrays in combination with binning (e.g., sum([1,2,3], [3,2,1]) = [4,4,4]).
|
||||||
- **aggregateChannels**: Specifies whether the data of the requested channels should be combined together using the defined aggregation (values: true|**false**)
|
- **aggregateChannels**: Specifies whether the data of the requested channels should be combined together using the defined aggregation (values: true|**false**)
|
||||||
|
|
||||||
|
@ -1,7 +1,9 @@
|
|||||||
package ch.psi.daq.queryrest.config;
|
package ch.psi.daq.queryrest.config;
|
||||||
|
|
||||||
|
import java.util.EnumMap;
|
||||||
import java.util.LinkedHashSet;
|
import java.util.LinkedHashSet;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.function.Function;
|
||||||
|
|
||||||
import javax.annotation.Resource;
|
import javax.annotation.Resource;
|
||||||
|
|
||||||
@ -23,11 +25,16 @@ import com.fasterxml.jackson.databind.module.SimpleModule;
|
|||||||
import ch.psi.daq.cassandra.util.test.CassandraDataGen;
|
import ch.psi.daq.cassandra.util.test.CassandraDataGen;
|
||||||
import ch.psi.daq.common.json.deserialize.AttributeBasedDeserializer;
|
import ch.psi.daq.common.json.deserialize.AttributeBasedDeserializer;
|
||||||
import ch.psi.daq.common.statistic.StorelessStatistics;
|
import ch.psi.daq.common.statistic.StorelessStatistics;
|
||||||
import ch.psi.daq.domain.cassandra.ChannelEvent;
|
import ch.psi.daq.domain.cassandra.DataEvent;
|
||||||
|
import ch.psi.daq.query.analyzer.CassandraQueryAnalyzer;
|
||||||
|
import ch.psi.daq.query.analyzer.QueryAnalyzer;
|
||||||
import ch.psi.daq.query.config.QueryConfig;
|
import ch.psi.daq.query.config.QueryConfig;
|
||||||
import ch.psi.daq.query.model.AbstractQuery;
|
import ch.psi.daq.query.model.AbstractQuery;
|
||||||
|
import ch.psi.daq.query.model.Aggregation;
|
||||||
|
import ch.psi.daq.query.model.Query;
|
||||||
import ch.psi.daq.query.model.QueryField;
|
import ch.psi.daq.query.model.QueryField;
|
||||||
import ch.psi.daq.queryrest.model.PropertyFilterMixin;
|
import ch.psi.daq.queryrest.model.PropertyFilterMixin;
|
||||||
|
import ch.psi.daq.queryrest.response.JsonStreamSerializer;
|
||||||
import ch.psi.daq.queryrest.response.ResponseStreamWriter;
|
import ch.psi.daq.queryrest.response.ResponseStreamWriter;
|
||||||
|
|
||||||
@Configuration
|
@Configuration
|
||||||
@ -35,6 +42,10 @@ import ch.psi.daq.queryrest.response.ResponseStreamWriter;
|
|||||||
@PropertySource(value = {"file:${user.home}/.config/daq/queryrest.properties"}, ignoreResourceNotFound = true)
|
@PropertySource(value = {"file:${user.home}/.config/daq/queryrest.properties"}, ignoreResourceNotFound = true)
|
||||||
public class QueryRestConfig {
|
public class QueryRestConfig {
|
||||||
|
|
||||||
|
private static final String QUERYREST_DEFAULT_RESPONSE_AGGREGATIONS = "queryrest.default.response.aggregations";
|
||||||
|
|
||||||
|
private static final String QUERYREST_DEFAULT_RESPONSE_FIELDS = "queryrest.default.response.fields";
|
||||||
|
|
||||||
// a nested configuration
|
// a nested configuration
|
||||||
// this guarantees that the ordering of the properties file is as expected
|
// this guarantees that the ordering of the properties file is as expected
|
||||||
// see:
|
// see:
|
||||||
@ -47,6 +58,7 @@ public class QueryRestConfig {
|
|||||||
private static final Logger LOGGER = LoggerFactory.getLogger(QueryRestConfig.class);
|
private static final Logger LOGGER = LoggerFactory.getLogger(QueryRestConfig.class);
|
||||||
|
|
||||||
public static final String BEAN_NAME_DEFAULT_RESPONSE_FIELDS = "defaultResponseFields";
|
public static final String BEAN_NAME_DEFAULT_RESPONSE_FIELDS = "defaultResponseFields";
|
||||||
|
public static final String BEAN_NAME_DEFAULT_RESPONSE_AGGREGATIONS = "defaultResponseAggregations";
|
||||||
|
|
||||||
@Resource
|
@Resource
|
||||||
private Environment env;
|
private Environment env;
|
||||||
@ -67,8 +79,14 @@ public class QueryRestConfig {
|
|||||||
mapper.setSerializationInclusion(Include.NON_NULL);
|
mapper.setSerializationInclusion(Include.NON_NULL);
|
||||||
// Mixin which is used dynamically to filter out which properties get serialised and which
|
// Mixin which is used dynamically to filter out which properties get serialised and which
|
||||||
// won't. This way, the user can specify which columns are to be received.
|
// won't. This way, the user can specify which columns are to be received.
|
||||||
mapper.addMixIn(ChannelEvent.class, PropertyFilterMixin.class);
|
mapper.addMixIn(DataEvent.class, PropertyFilterMixin.class);
|
||||||
mapper.addMixIn(StorelessStatistics.class, PropertyFilterMixin.class);
|
mapper.addMixIn(StorelessStatistics.class, PropertyFilterMixin.class);
|
||||||
|
mapper.addMixIn(EnumMap.class, PropertyFilterMixin.class);
|
||||||
|
|
||||||
|
// defines how to writer inner Streams (i.e. Stream<Entry<String, Stream<?>>> toSerialize)
|
||||||
|
module = new SimpleModule("Streams API", Version.unknownVersion());
|
||||||
|
module.addSerializer(new JsonStreamSerializer());
|
||||||
|
mapper.registerModule(module);
|
||||||
|
|
||||||
return mapper;
|
return mapper;
|
||||||
}
|
}
|
||||||
@ -78,6 +96,11 @@ public class QueryRestConfig {
|
|||||||
return new JsonFactory();
|
return new JsonFactory();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public Function<Query, QueryAnalyzer> queryAnalizerFactory() {
|
||||||
|
return (query) -> new CassandraQueryAnalyzer(query);
|
||||||
|
}
|
||||||
|
|
||||||
@Bean
|
@Bean
|
||||||
public ResponseStreamWriter responseStreamWriter() {
|
public ResponseStreamWriter responseStreamWriter() {
|
||||||
return new ResponseStreamWriter();
|
return new ResponseStreamWriter();
|
||||||
@ -86,20 +109,37 @@ public class QueryRestConfig {
|
|||||||
@Bean(name = BEAN_NAME_DEFAULT_RESPONSE_FIELDS)
|
@Bean(name = BEAN_NAME_DEFAULT_RESPONSE_FIELDS)
|
||||||
public Set<QueryField> defaultResponseFields() {
|
public Set<QueryField> defaultResponseFields() {
|
||||||
String[] responseFields =
|
String[] responseFields =
|
||||||
StringUtils.commaDelimitedListToStringArray(env.getProperty("queryrest.default.response.fields"));
|
StringUtils.commaDelimitedListToStringArray(env.getProperty(QUERYREST_DEFAULT_RESPONSE_FIELDS));
|
||||||
// preserve order
|
// preserve order
|
||||||
LinkedHashSet<QueryField> defaultResponseFields = new LinkedHashSet<>(responseFields.length);
|
LinkedHashSet<QueryField> defaultResponseFields = new LinkedHashSet<>(responseFields.length);
|
||||||
for (String field : responseFields) {
|
for (String field : responseFields) {
|
||||||
try {
|
try {
|
||||||
defaultResponseFields.add(QueryField.valueOf(field));
|
defaultResponseFields.add(QueryField.valueOf(field));
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
LOGGER.error("Field '{}' in queryrest.default.response.fields is invalid.", field, e);
|
LOGGER.error("Field '{}' in '{}' is invalid.", field, QUERYREST_DEFAULT_RESPONSE_FIELDS, e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return defaultResponseFields;
|
return defaultResponseFields;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Bean(name = BEAN_NAME_DEFAULT_RESPONSE_AGGREGATIONS)
|
||||||
|
public Set<Aggregation> defaultResponseAggregations() {
|
||||||
|
String[] responseAggregations =
|
||||||
|
StringUtils.commaDelimitedListToStringArray(env.getProperty(QUERYREST_DEFAULT_RESPONSE_AGGREGATIONS));
|
||||||
|
// preserve order
|
||||||
|
LinkedHashSet<Aggregation> defaultResponseAggregations = new LinkedHashSet<>(responseAggregations.length);
|
||||||
|
for (String aggregation : responseAggregations) {
|
||||||
|
try {
|
||||||
|
defaultResponseAggregations.add(Aggregation.valueOf(aggregation));
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOGGER.error("Aggregation '{}' in '{}' is invalid.", aggregation, QUERYREST_DEFAULT_RESPONSE_AGGREGATIONS,
|
||||||
|
e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return defaultResponseAggregations;
|
||||||
|
}
|
||||||
|
|
||||||
// ==========================================================================================
|
// ==========================================================================================
|
||||||
// TODO: This is simply for initial / rudimentary testing - remove once further evolved
|
// TODO: This is simply for initial / rudimentary testing - remove once further evolved
|
||||||
|
@ -3,8 +3,10 @@ package ch.psi.daq.queryrest.controller;
|
|||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.LinkedHashSet;
|
import java.util.LinkedHashSet;
|
||||||
|
import java.util.LinkedList;
|
||||||
import java.util.Map.Entry;
|
import java.util.Map.Entry;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.function.Function;
|
||||||
import java.util.stream.Stream;
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
import javax.annotation.Resource;
|
import javax.annotation.Resource;
|
||||||
@ -13,7 +15,6 @@ import javax.servlet.http.HttpServletResponse;
|
|||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.springframework.http.MediaType;
|
import org.springframework.http.MediaType;
|
||||||
import org.springframework.web.bind.annotation.PathVariable;
|
|
||||||
import org.springframework.web.bind.annotation.RequestBody;
|
import org.springframework.web.bind.annotation.RequestBody;
|
||||||
import org.springframework.web.bind.annotation.RequestMapping;
|
import org.springframework.web.bind.annotation.RequestMapping;
|
||||||
import org.springframework.web.bind.annotation.RequestMethod;
|
import org.springframework.web.bind.annotation.RequestMethod;
|
||||||
@ -22,7 +23,10 @@ import org.springframework.web.bind.annotation.RestController;
|
|||||||
|
|
||||||
import ch.psi.daq.cassandra.util.test.CassandraDataGen;
|
import ch.psi.daq.cassandra.util.test.CassandraDataGen;
|
||||||
import ch.psi.daq.domain.cassandra.DataEvent;
|
import ch.psi.daq.domain.cassandra.DataEvent;
|
||||||
|
import ch.psi.daq.query.analyzer.QueryAnalyzer;
|
||||||
import ch.psi.daq.query.model.AbstractQuery;
|
import ch.psi.daq.query.model.AbstractQuery;
|
||||||
|
import ch.psi.daq.query.model.Aggregation;
|
||||||
|
import ch.psi.daq.query.model.Query;
|
||||||
import ch.psi.daq.query.model.QueryField;
|
import ch.psi.daq.query.model.QueryField;
|
||||||
import ch.psi.daq.query.processor.QueryProcessor;
|
import ch.psi.daq.query.processor.QueryProcessor;
|
||||||
import ch.psi.daq.queryrest.config.QueryRestConfig;
|
import ch.psi.daq.queryrest.config.QueryRestConfig;
|
||||||
@ -43,9 +47,15 @@ public class QueryRestController {
|
|||||||
@Resource
|
@Resource
|
||||||
private QueryProcessor queryProcessor;
|
private QueryProcessor queryProcessor;
|
||||||
|
|
||||||
|
@Resource
|
||||||
|
private Function<Query, QueryAnalyzer> queryAnalizerFactory;
|
||||||
|
|
||||||
@Resource(name = QueryRestConfig.BEAN_NAME_DEFAULT_RESPONSE_FIELDS)
|
@Resource(name = QueryRestConfig.BEAN_NAME_DEFAULT_RESPONSE_FIELDS)
|
||||||
private Set<QueryField> defaultResponseFields;
|
private Set<QueryField> defaultResponseFields;
|
||||||
|
|
||||||
|
@Resource(name = QueryRestConfig.BEAN_NAME_DEFAULT_RESPONSE_AGGREGATIONS)
|
||||||
|
private Set<Aggregation> defaultResponseAggregations;
|
||||||
|
|
||||||
@RequestMapping(
|
@RequestMapping(
|
||||||
value = CHANNELS,
|
value = CHANNELS,
|
||||||
method = RequestMethod.GET,
|
method = RequestMethod.GET,
|
||||||
@ -61,9 +71,10 @@ public class QueryRestController {
|
|||||||
|
|
||||||
@RequestMapping(
|
@RequestMapping(
|
||||||
value = CHANNELS_REGEX,
|
value = CHANNELS_REGEX,
|
||||||
method = RequestMethod.GET,
|
method = RequestMethod.POST,
|
||||||
|
consumes = {MediaType.APPLICATION_JSON_VALUE},
|
||||||
produces = {MediaType.APPLICATION_JSON_VALUE})
|
produces = {MediaType.APPLICATION_JSON_VALUE})
|
||||||
public @ResponseBody Collection<String> getChannels(@PathVariable String regex) throws Throwable {
|
public @ResponseBody Collection<String> getChannels(@RequestBody String regex) throws Throwable {
|
||||||
try {
|
try {
|
||||||
return queryProcessor.getChannels(regex);
|
return queryProcessor.getChannels(regex);
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
@ -74,31 +85,39 @@ public class QueryRestController {
|
|||||||
|
|
||||||
@RequestMapping(
|
@RequestMapping(
|
||||||
value = QUERY,
|
value = QUERY,
|
||||||
method = RequestMethod.GET,
|
method = RequestMethod.POST,
|
||||||
consumes = {MediaType.APPLICATION_JSON_VALUE},
|
consumes = {MediaType.APPLICATION_JSON_VALUE},
|
||||||
produces = {MediaType.APPLICATION_JSON_VALUE})
|
produces = {MediaType.APPLICATION_JSON_VALUE})
|
||||||
public void executeQuery(@RequestBody AbstractQuery query, HttpServletResponse res) throws IOException {
|
public void executeQuery(@RequestBody AbstractQuery query, HttpServletResponse res) throws IOException {
|
||||||
try {
|
try {
|
||||||
LOGGER.debug("Execute query '{}'", query.getClass().getSimpleName());
|
LOGGER.debug("Execute query '{}'", query.getClass().getSimpleName());
|
||||||
|
|
||||||
validateQuery(query);
|
QueryAnalyzer queryAnalizer = queryAnalizerFactory.apply(query);
|
||||||
|
queryAnalizer.validate();
|
||||||
|
|
||||||
|
extendQuery(query);
|
||||||
|
|
||||||
// all the magic happens here
|
// all the magic happens here
|
||||||
Stream<Entry<String, Stream<? extends DataEvent>>> process = queryProcessor.process(query);
|
Stream<Entry<String, Stream<? extends DataEvent>>> channelToDataEvents = queryProcessor.process(queryAnalizer);
|
||||||
|
|
||||||
|
// do post-process
|
||||||
|
Stream<Entry<String, ?>> channelToData = queryAnalizer.postProcess(channelToDataEvents);
|
||||||
|
|
||||||
// write the response back to the client using java 8 streams
|
// write the response back to the client using java 8 streams
|
||||||
responseStreamWriter.respond(process, query, res);
|
responseStreamWriter.respond(channelToData, query, res);
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
LOGGER.error("Failed execute query '{}'.", query, t);
|
LOGGER.error("Failed execute query '{}'.", query, t);
|
||||||
throw t;
|
throw t;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: can we do this with built-in validators?
|
private void extendQuery(AbstractQuery query) {
|
||||||
private void validateQuery(AbstractQuery query) {
|
|
||||||
if (query.getFields() == null || query.getFields().isEmpty()) {
|
if (query.getFields() == null || query.getFields().isEmpty()) {
|
||||||
query.setFields(new LinkedHashSet<>(defaultResponseFields));
|
query.setFields(new LinkedHashSet<>(defaultResponseFields));
|
||||||
}
|
}
|
||||||
|
if(query.getAggregations() == null || query.getAggregations().isEmpty()){
|
||||||
|
query.setAggregations(new LinkedList<>(defaultResponseAggregations));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// ==========================================================================================
|
// ==========================================================================================
|
||||||
|
@ -0,0 +1,24 @@
|
|||||||
|
package ch.psi.daq.queryrest.response;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.core.JsonGenerationException;
|
||||||
|
import com.fasterxml.jackson.core.JsonGenerator;
|
||||||
|
import com.fasterxml.jackson.databind.SerializerProvider;
|
||||||
|
import com.fasterxml.jackson.databind.ser.std.StdSerializer;
|
||||||
|
|
||||||
|
public class JsonStreamSerializer extends StdSerializer<Stream<?>>{
|
||||||
|
private static final long serialVersionUID = 4695859735299703478L;
|
||||||
|
|
||||||
|
public JsonStreamSerializer() {
|
||||||
|
super(Stream.class, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void serialize(Stream<?> stream, JsonGenerator jgen, SerializerProvider provider) throws IOException,
|
||||||
|
JsonGenerationException {
|
||||||
|
provider.findValueSerializer(Iterator.class, null).serialize(stream.iterator(), jgen, provider);
|
||||||
|
}
|
||||||
|
}
|
@ -21,9 +21,8 @@ import com.fasterxml.jackson.databind.ObjectWriter;
|
|||||||
import com.fasterxml.jackson.databind.ser.impl.SimpleBeanPropertyFilter;
|
import com.fasterxml.jackson.databind.ser.impl.SimpleBeanPropertyFilter;
|
||||||
import com.fasterxml.jackson.databind.ser.impl.SimpleFilterProvider;
|
import com.fasterxml.jackson.databind.ser.impl.SimpleFilterProvider;
|
||||||
|
|
||||||
import ch.psi.daq.domain.cassandra.DataEvent;
|
|
||||||
import ch.psi.daq.query.model.AbstractQuery;
|
import ch.psi.daq.query.model.AbstractQuery;
|
||||||
import ch.psi.daq.query.model.AggregationEnum;
|
import ch.psi.daq.query.model.Aggregation;
|
||||||
import ch.psi.daq.query.model.QueryField;
|
import ch.psi.daq.query.model.QueryField;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -44,16 +43,16 @@ public class ResponseStreamWriter {
|
|||||||
* Responding with the the contents of the stream by writing into the output stream of the
|
* Responding with the the contents of the stream by writing into the output stream of the
|
||||||
* {@link ServletResponse}.
|
* {@link ServletResponse}.
|
||||||
*
|
*
|
||||||
* @param stream {@link Stream} instance of {@link DataEvent}s
|
* @param stream Mapping from channel name to data
|
||||||
* @param query concrete instance of {@link AbstractQuery}
|
* @param query concrete instance of {@link AbstractQuery}
|
||||||
* @param response {@link ServletResponse} instance given by the current HTTP request
|
* @param response {@link ServletResponse} instance given by the current HTTP request
|
||||||
* @throws IOException thrown if writing to the output stream fails
|
* @throws IOException thrown if writing to the output stream fails
|
||||||
*/
|
*/
|
||||||
public void respond(Stream<Entry<String, Stream<? extends DataEvent>>> stream, AbstractQuery query,
|
public void respond(Stream<Entry<String, ?>> stream, AbstractQuery query,
|
||||||
ServletResponse response) throws IOException {
|
ServletResponse response) throws IOException {
|
||||||
|
|
||||||
Set<QueryField> queryFields = query.getFields();
|
Set<QueryField> queryFields = query.getFields();
|
||||||
List<AggregationEnum> aggregations = query.getAggregations();
|
List<Aggregation> aggregations = query.getAggregations();
|
||||||
|
|
||||||
Set<String> includedFields =
|
Set<String> includedFields =
|
||||||
new LinkedHashSet<String>(queryFields.size() + (aggregations != null ? aggregations.size() : 0));
|
new LinkedHashSet<String>(queryFields.size() + (aggregations != null ? aggregations.size() : 0));
|
||||||
@ -62,7 +61,7 @@ public class ResponseStreamWriter {
|
|||||||
includedFields.add(field.name());
|
includedFields.add(field.name());
|
||||||
}
|
}
|
||||||
if (aggregations != null) {
|
if (aggregations != null) {
|
||||||
for (AggregationEnum aggregation : query.getAggregations()) {
|
for (Aggregation aggregation : query.getAggregations()) {
|
||||||
includedFields.add(aggregation.name());
|
includedFields.add(aggregation.name());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -90,41 +89,31 @@ public class ResponseStreamWriter {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Writes the Java stream into the output stream.
|
* Writes the outer Java stream into the output stream.
|
||||||
*
|
*
|
||||||
* @param stream {@link Stream} instance of {@link DataEvent}s
|
* @param stream Mapping from channel name to data
|
||||||
* @param response {@link ServletResponse} instance given by the current HTTP request
|
* @param response {@link ServletResponse} instance given by the current HTTP request
|
||||||
* @param writer configured writer that includes the fields the end user wants to see
|
* @param writer configured writer that includes the fields the end user wants to see
|
||||||
* @throws IOException thrown if writing to the output stream fails
|
* @throws IOException thrown if writing to the output stream fails
|
||||||
*/
|
*/
|
||||||
private void respondInternal(Stream<Entry<String, Stream<? extends DataEvent>>> stream, ServletResponse response,
|
private void respondInternal(Stream<Entry<String, ?>> stream, ServletResponse response,
|
||||||
ObjectWriter writer)
|
ObjectWriter writer)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
|
||||||
JsonGenerator generator = jsonFactory.createGenerator(response.getOutputStream(), JsonEncoding.UTF8);
|
JsonGenerator generator = jsonFactory.createGenerator(response.getOutputStream(), JsonEncoding.UTF8);
|
||||||
generator.writeStartArray();
|
generator.writeStartArray();
|
||||||
stream
|
stream
|
||||||
/* ensure elements are sequentially written to the stream */
|
/* ensure elements are sequentially written */
|
||||||
.sequential()
|
.sequential()
|
||||||
.forEach(
|
.forEach(
|
||||||
entry -> {
|
entry -> {
|
||||||
try {
|
try {
|
||||||
generator.writeStartObject();
|
generator.writeStartObject();
|
||||||
generator.writeStringField(QueryField.channel.name(), entry.getKey());
|
generator.writeStringField(QueryField.channel.name(), entry.getKey());
|
||||||
generator.writeArrayFieldStart("data");
|
|
||||||
entry.getValue()
|
generator.writeFieldName("data");
|
||||||
/* ensure elements are sequentially written to the stream */
|
writer.writeValue(generator, entry.getValue());
|
||||||
.sequential()
|
|
||||||
.forEach(
|
|
||||||
dataEvent -> {
|
|
||||||
try {
|
|
||||||
writer.writeValue(generator, dataEvent);
|
|
||||||
} catch (Exception e) {
|
|
||||||
logger.error("Could not write event with pulse-id '{}' of channel '{}'",
|
|
||||||
dataEvent.getPulseId(), entry.getKey(), e);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
generator.writeEndArray();
|
|
||||||
generator.writeEndObject();
|
generator.writeEndObject();
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
logger.error("Could not write channel name of channel '{}'", entry.getKey(), e);
|
logger.error("Could not write channel name of channel '{}'", entry.getKey(), e);
|
||||||
|
@ -4,3 +4,4 @@ server.port=8080
|
|||||||
# defines the fields that are included in the response
|
# defines the fields that are included in the response
|
||||||
# if no fields have been specified by the user
|
# if no fields have been specified by the user
|
||||||
queryrest.default.response.fields=channel,pulseId,globalMillis,globalNanos,value
|
queryrest.default.response.fields=channel,pulseId,globalMillis,globalNanos,value
|
||||||
|
queryrest.default.response.aggregations=min,max,sum
|
@ -8,9 +8,11 @@ import org.springframework.context.annotation.PropertySources;
|
|||||||
import org.springframework.web.servlet.config.annotation.EnableWebMvc;
|
import org.springframework.web.servlet.config.annotation.EnableWebMvc;
|
||||||
import org.springframework.web.servlet.config.annotation.WebMvcConfigurationSupport;
|
import org.springframework.web.servlet.config.annotation.WebMvcConfigurationSupport;
|
||||||
|
|
||||||
|
import ch.psi.daq.cassandra.reader.DataReader;
|
||||||
import ch.psi.daq.query.processor.QueryProcessor;
|
import ch.psi.daq.query.processor.QueryProcessor;
|
||||||
|
import ch.psi.daq.query.processor.cassandra.CassandraQueryProcessorLocal;
|
||||||
import ch.psi.daq.test.query.config.LocalQueryTestConfig;
|
import ch.psi.daq.test.query.config.LocalQueryTestConfig;
|
||||||
import ch.psi.daq.test.queryrest.query.DummyQueryProcessor;
|
import ch.psi.daq.test.queryrest.query.DummyDataReader;
|
||||||
|
|
||||||
@Configuration
|
@Configuration
|
||||||
@PropertySources(value = {
|
@PropertySources(value = {
|
||||||
@ -27,6 +29,11 @@ public class DaqWebMvcConfig extends WebMvcConfigurationSupport {
|
|||||||
|
|
||||||
@Bean
|
@Bean
|
||||||
public QueryProcessor queryProcessor() {
|
public QueryProcessor queryProcessor() {
|
||||||
return new DummyQueryProcessor();
|
return new CassandraQueryProcessorLocal();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public DataReader dataReader() {
|
||||||
|
return new DummyDataReader();
|
||||||
}
|
}
|
||||||
}
|
}
|
@ -1,33 +1,64 @@
|
|||||||
package ch.psi.daq.test.queryrest.controller;
|
package ch.psi.daq.test.queryrest.controller;
|
||||||
|
|
||||||
|
import javax.annotation.Resource;
|
||||||
|
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.springframework.http.MediaType;
|
import org.springframework.http.MediaType;
|
||||||
import org.springframework.test.web.servlet.request.MockMvcRequestBuilders;
|
import org.springframework.test.web.servlet.request.MockMvcRequestBuilders;
|
||||||
import org.springframework.test.web.servlet.result.MockMvcResultHandlers;
|
import org.springframework.test.web.servlet.result.MockMvcResultHandlers;
|
||||||
import org.springframework.test.web.servlet.result.MockMvcResultMatchers;
|
import org.springframework.test.web.servlet.result.MockMvcResultMatchers;
|
||||||
|
|
||||||
|
import ch.psi.daq.cassandra.util.test.CassandraDataGen;
|
||||||
|
import ch.psi.daq.common.ordering.Ordering;
|
||||||
|
import ch.psi.daq.query.model.AggregationType;
|
||||||
import ch.psi.daq.query.model.PulseRangeQuery;
|
import ch.psi.daq.query.model.PulseRangeQuery;
|
||||||
import ch.psi.daq.query.model.TimeRangeQuery;
|
import ch.psi.daq.query.model.TimeRangeQuery;
|
||||||
import ch.psi.daq.query.model.TimeRangeQueryDate;
|
import ch.psi.daq.query.model.TimeRangeQueryDate;
|
||||||
|
import ch.psi.daq.queryrest.controller.QueryRestController;
|
||||||
|
import ch.psi.daq.test.cassandra.admin.CassandraAdmin;
|
||||||
import ch.psi.daq.test.queryrest.AbstractDaqRestTest;
|
import ch.psi.daq.test.queryrest.AbstractDaqRestTest;
|
||||||
import ch.psi.daq.test.queryrest.query.DummyQueryProcessor;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Tests the {@link DaqController} implementation.
|
* Tests the {@link DaqController} implementation.
|
||||||
*/
|
*/
|
||||||
public class DaqRestControllerTest extends AbstractDaqRestTest {
|
public class DaqRestControllerTest extends AbstractDaqRestTest {
|
||||||
|
|
||||||
|
@Resource
|
||||||
|
private CassandraAdmin cassandraAdmin;
|
||||||
|
|
||||||
|
@Resource
|
||||||
|
private CassandraDataGen dataGen;
|
||||||
|
|
||||||
|
private static final boolean initialized = false;
|
||||||
|
|
||||||
|
private static final int DATA_REPLICATION = 1;
|
||||||
|
public static final String[] TEST_CHANNEL_NAMES = new String[]{"testChannel1", "testChannel2"};
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() throws Exception {
|
||||||
|
if (!initialized) {
|
||||||
|
cassandraAdmin.truncateAll();
|
||||||
|
|
||||||
|
dataGen.writeData(DATA_REPLICATION, 100, 2, TEST_CHANNEL_NAMES);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void tearDown() throws Exception {}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testPulseRangeQuery() throws Exception {
|
public void testPulseRangeQuery() throws Exception {
|
||||||
PulseRangeQuery request = new PulseRangeQuery(
|
PulseRangeQuery request = new PulseRangeQuery(
|
||||||
100,
|
100,
|
||||||
101,
|
101,
|
||||||
DummyQueryProcessor.TEST_CHANNEL_NAMES);
|
TEST_CHANNEL_NAMES);
|
||||||
|
|
||||||
String content = mapper.writeValueAsString(request);
|
String content = mapper.writeValueAsString(request);
|
||||||
|
|
||||||
this.mockMvc
|
this.mockMvc
|
||||||
.perform(MockMvcRequestBuilders.post("/query")
|
.perform(MockMvcRequestBuilders.post(QueryRestController.QUERY)
|
||||||
.contentType(MediaType.APPLICATION_JSON)
|
.contentType(MediaType.APPLICATION_JSON)
|
||||||
.content(content))
|
.content(content))
|
||||||
|
|
||||||
@ -51,12 +82,12 @@ public class DaqRestControllerTest extends AbstractDaqRestTest {
|
|||||||
TimeRangeQuery request = new TimeRangeQuery(
|
TimeRangeQuery request = new TimeRangeQuery(
|
||||||
100,
|
100,
|
||||||
101,
|
101,
|
||||||
DummyQueryProcessor.TEST_CHANNEL_NAMES);
|
TEST_CHANNEL_NAMES);
|
||||||
|
|
||||||
String content = mapper.writeValueAsString(request);
|
String content = mapper.writeValueAsString(request);
|
||||||
|
|
||||||
this.mockMvc
|
this.mockMvc
|
||||||
.perform(MockMvcRequestBuilders.post("/query")
|
.perform(MockMvcRequestBuilders.post(QueryRestController.QUERY)
|
||||||
.contentType(MediaType.APPLICATION_JSON)
|
.contentType(MediaType.APPLICATION_JSON)
|
||||||
.content(content))
|
.content(content))
|
||||||
|
|
||||||
@ -76,18 +107,18 @@ public class DaqRestControllerTest extends AbstractDaqRestTest {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testMapper() throws Exception {
|
public void testDateRangeQuery() throws Exception {
|
||||||
String startDate = TimeRangeQueryDate.format(100);
|
String startDate = TimeRangeQueryDate.format(100);
|
||||||
String endDate = TimeRangeQueryDate.format(101);
|
String endDate = TimeRangeQueryDate.format(101);
|
||||||
TimeRangeQueryDate request = new TimeRangeQueryDate(
|
TimeRangeQueryDate request = new TimeRangeQueryDate(
|
||||||
startDate,
|
startDate,
|
||||||
endDate,
|
endDate,
|
||||||
DummyQueryProcessor.TEST_CHANNEL_NAMES);
|
TEST_CHANNEL_NAMES);
|
||||||
|
|
||||||
String content = mapper.writeValueAsString(request);
|
String content = mapper.writeValueAsString(request);
|
||||||
|
|
||||||
this.mockMvc
|
this.mockMvc
|
||||||
.perform(MockMvcRequestBuilders.post("/query")
|
.perform(MockMvcRequestBuilders.post(QueryRestController.QUERY)
|
||||||
.contentType(MediaType.APPLICATION_JSON)
|
.contentType(MediaType.APPLICATION_JSON)
|
||||||
.content(content))
|
.content(content))
|
||||||
|
|
||||||
@ -105,4 +136,61 @@ public class DaqRestControllerTest extends AbstractDaqRestTest {
|
|||||||
.andExpect(MockMvcResultMatchers.jsonPath("$[1].data[0].pulseId").value(100))
|
.andExpect(MockMvcResultMatchers.jsonPath("$[1].data[0].pulseId").value(100))
|
||||||
.andExpect(MockMvcResultMatchers.jsonPath("$[1].data[1].pulseId").value(101));
|
.andExpect(MockMvcResultMatchers.jsonPath("$[1].data[1].pulseId").value(101));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testExtremaAggregation() throws Exception {
|
||||||
|
PulseRangeQuery request = new PulseRangeQuery(
|
||||||
|
100,
|
||||||
|
101,
|
||||||
|
false,
|
||||||
|
Ordering.asc,
|
||||||
|
AggregationType.extrema,
|
||||||
|
TEST_CHANNEL_NAMES[0]);
|
||||||
|
|
||||||
|
String content = mapper.writeValueAsString(request);
|
||||||
|
|
||||||
|
this.mockMvc
|
||||||
|
.perform(MockMvcRequestBuilders.post(QueryRestController.QUERY)
|
||||||
|
.contentType(MediaType.APPLICATION_JSON)
|
||||||
|
.content(content))
|
||||||
|
|
||||||
|
.andDo(MockMvcResultHandlers.print())
|
||||||
|
.andExpect(MockMvcResultMatchers.status().isOk())
|
||||||
|
|
||||||
|
.andExpect(MockMvcResultMatchers.jsonPath("$").exists())
|
||||||
|
.andExpect(MockMvcResultMatchers.jsonPath("$.minima").exists())
|
||||||
|
.andExpect(MockMvcResultMatchers.jsonPath("$.minima.min").exists())
|
||||||
|
.andExpect(MockMvcResultMatchers.jsonPath("$.minima.min.value").exists())
|
||||||
|
.andExpect(MockMvcResultMatchers.jsonPath("$.minima.min.value").value(100))
|
||||||
|
.andExpect(MockMvcResultMatchers.jsonPath("$.minima.min.event").exists())
|
||||||
|
.andExpect(MockMvcResultMatchers.jsonPath("$.minima.min.event.channel").exists())
|
||||||
|
.andExpect(MockMvcResultMatchers.jsonPath("$.minima.min.event.channel").value(TEST_CHANNEL_NAMES[0]))
|
||||||
|
.andExpect(MockMvcResultMatchers.jsonPath("$.minima.min.event.pulseId").exists())
|
||||||
|
.andExpect(MockMvcResultMatchers.jsonPath("$.minima.min.event.pulseId").value(100))
|
||||||
|
.andExpect(MockMvcResultMatchers.jsonPath("$.maxima").exists())
|
||||||
|
.andExpect(MockMvcResultMatchers.jsonPath("$.minima.max").exists())
|
||||||
|
.andExpect(MockMvcResultMatchers.jsonPath("$.minima.max.value").exists())
|
||||||
|
.andExpect(MockMvcResultMatchers.jsonPath("$.minima.max.value").value(101))
|
||||||
|
.andExpect(MockMvcResultMatchers.jsonPath("$.minima.max.event").exists())
|
||||||
|
.andExpect(MockMvcResultMatchers.jsonPath("$.minima.max.event.channel").exists())
|
||||||
|
.andExpect(MockMvcResultMatchers.jsonPath("$.minima.max.event.channel").value(TEST_CHANNEL_NAMES[0]))
|
||||||
|
.andExpect(MockMvcResultMatchers.jsonPath("$.minima.max.event.pulseId").exists())
|
||||||
|
.andExpect(MockMvcResultMatchers.jsonPath("$.minima.max.event.pulseId").value(101));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testChannelNameQuery() throws Exception {
|
||||||
|
this.mockMvc
|
||||||
|
.perform(MockMvcRequestBuilders.get(QueryRestController.CHANNELS)
|
||||||
|
.contentType(MediaType.APPLICATION_JSON))
|
||||||
|
|
||||||
|
.andDo(MockMvcResultHandlers.print())
|
||||||
|
.andExpect(MockMvcResultMatchers.status().isOk())
|
||||||
|
.andExpect(MockMvcResultMatchers.jsonPath("$").isArray())
|
||||||
|
.andExpect(MockMvcResultMatchers.jsonPath("$[0]").exists())
|
||||||
|
.andExpect(MockMvcResultMatchers.jsonPath("$[0]").value("testChannel1"))
|
||||||
|
.andExpect(MockMvcResultMatchers.jsonPath("$[1]").exists())
|
||||||
|
.andExpect(MockMvcResultMatchers.jsonPath("$[1]").value("testChannel2"))
|
||||||
|
.andExpect(MockMvcResultMatchers.jsonPath("$[2]").doesNotExist());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,58 @@
|
|||||||
|
package ch.psi.daq.test.queryrest.query;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.UUID;
|
||||||
|
import java.util.regex.Pattern;
|
||||||
|
import java.util.stream.LongStream;
|
||||||
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
|
import com.google.common.collect.Lists;
|
||||||
|
|
||||||
|
import ch.psi.daq.cassandra.reader.DataReader;
|
||||||
|
import ch.psi.daq.common.ordering.Ordering;
|
||||||
|
import ch.psi.daq.domain.cassandra.ChannelEvent;
|
||||||
|
import ch.psi.daq.domain.cassandra.DataEvent;
|
||||||
|
|
||||||
|
public class DummyDataReader implements DataReader {
|
||||||
|
|
||||||
|
public static final String TEST_CHANNEL_1 = "testChannel1";
|
||||||
|
public static final String TEST_CHANNEL_2 = "testChannel2";
|
||||||
|
public static final List<String> TEST_CHANNEL_NAMES = Lists.newArrayList(TEST_CHANNEL_1, TEST_CHANNEL_2);
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Stream<String> getChannelStream(String regex) {
|
||||||
|
Stream<String> channelStream = TEST_CHANNEL_NAMES.stream();
|
||||||
|
|
||||||
|
if (regex != null) {
|
||||||
|
Pattern pattern = Pattern.compile(regex);
|
||||||
|
channelStream = channelStream.filter(channel -> pattern.matcher(channel).find());
|
||||||
|
}
|
||||||
|
|
||||||
|
return channelStream;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Stream<? extends DataEvent> getEventStream(String channel, long startPulseId, long endPulseId,
|
||||||
|
Ordering ordering, String... columns) {
|
||||||
|
return getElements(channel, startPulseId, endPulseId);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Stream<? extends DataEvent> getEventStream(String channel, long startMillis, long startNanos, long endMillis,
|
||||||
|
long endNanos, Ordering ordering, String... columns) {
|
||||||
|
return getElements(channel, startMillis, endMillis);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected Stream<? extends DataEvent> getElements(String channel, long start, long end) {
|
||||||
|
return LongStream.rangeClosed(start, end).mapToObj(i -> {
|
||||||
|
return new ChannelEvent(
|
||||||
|
channel,
|
||||||
|
i,
|
||||||
|
0,
|
||||||
|
i,
|
||||||
|
i,
|
||||||
|
0,
|
||||||
|
"data_" + UUID.randomUUID().toString());
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
@ -1,78 +0,0 @@
|
|||||||
/**
|
|
||||||
*
|
|
||||||
*/
|
|
||||||
package ch.psi.daq.test.queryrest.query;
|
|
||||||
|
|
||||||
import java.util.Collection;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.Map.Entry;
|
|
||||||
import java.util.UUID;
|
|
||||||
import java.util.regex.Pattern;
|
|
||||||
import java.util.stream.Collectors;
|
|
||||||
import java.util.stream.IntStream;
|
|
||||||
import java.util.stream.Stream;
|
|
||||||
|
|
||||||
import com.google.common.collect.Lists;
|
|
||||||
import com.google.common.collect.Maps;
|
|
||||||
|
|
||||||
import ch.psi.daq.domain.cassandra.ChannelEvent;
|
|
||||||
import ch.psi.daq.domain.cassandra.DataEvent;
|
|
||||||
import ch.psi.daq.query.model.Query;
|
|
||||||
import ch.psi.daq.query.processor.QueryProcessor;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @author zellweger_c
|
|
||||||
*
|
|
||||||
*/
|
|
||||||
public class DummyQueryProcessor implements QueryProcessor {
|
|
||||||
|
|
||||||
public static final List<String> TEST_CHANNEL_NAMES = Lists.newArrayList("testChannel1", "testChannel2");
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Collection<String> getChannels() {
|
|
||||||
return TEST_CHANNEL_NAMES;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Collection<String> getChannels(String regex) {
|
|
||||||
if (regex != null) {
|
|
||||||
Pattern pattern = Pattern.compile(regex);
|
|
||||||
return TEST_CHANNEL_NAMES.stream().filter(channel -> pattern.matcher(channel).find())
|
|
||||||
.collect(Collectors.toList());
|
|
||||||
} else {
|
|
||||||
return getChannels();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* {@inheritDoc}
|
|
||||||
*/
|
|
||||||
@Override
|
|
||||||
public Stream<Entry<String, Stream<? extends DataEvent>>> process(Query query) {
|
|
||||||
|
|
||||||
int nrOfEntries = 2;
|
|
||||||
int startingPulseId = 100;
|
|
||||||
|
|
||||||
Map<String, Stream<? extends DataEvent>> result = Maps.newHashMap();
|
|
||||||
|
|
||||||
TEST_CHANNEL_NAMES.forEach(chName -> {
|
|
||||||
|
|
||||||
List<DataEvent> entities = IntStream.rangeClosed(1, nrOfEntries).boxed().map(i -> {
|
|
||||||
long pulseId = startingPulseId + i - 1;
|
|
||||||
return new ChannelEvent(
|
|
||||||
chName,
|
|
||||||
pulseId,
|
|
||||||
0,
|
|
||||||
pulseId,
|
|
||||||
pulseId,
|
|
||||||
0,
|
|
||||||
"data_" + UUID.randomUUID().toString());
|
|
||||||
}).collect(Collectors.toList());
|
|
||||||
|
|
||||||
result.put(chName, entities.stream());
|
|
||||||
});
|
|
||||||
|
|
||||||
return result.entrySet().stream();
|
|
||||||
}
|
|
||||||
}
|
|
Reference in New Issue
Block a user