Async read, query REST etc.
This commit is contained in:
@ -1,5 +1,5 @@
|
|||||||
#
|
#
|
||||||
#Mon Jul 20 15:36:19 CEST 2015
|
#Fri Jul 24 14:48:03 CEST 2015
|
||||||
org.eclipse.jdt.core.compiler.debug.localVariable=generate
|
org.eclipse.jdt.core.compiler.debug.localVariable=generate
|
||||||
org.eclipse.jdt.core.compiler.compliance=1.8
|
org.eclipse.jdt.core.compiler.compliance=1.8
|
||||||
org.eclipse.jdt.core.compiler.codegen.unusedLocal=preserve
|
org.eclipse.jdt.core.compiler.codegen.unusedLocal=preserve
|
||||||
|
@ -3,7 +3,10 @@ package ch.psi.daq.queryrest.config;
|
|||||||
import java.util.LinkedHashSet;
|
import java.util.LinkedHashSet;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import javax.annotation.Resource;
|
||||||
|
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
import org.springframework.context.annotation.Bean;
|
import org.springframework.context.annotation.Bean;
|
||||||
import org.springframework.context.annotation.Configuration;
|
import org.springframework.context.annotation.Configuration;
|
||||||
import org.springframework.context.annotation.Import;
|
import org.springframework.context.annotation.Import;
|
||||||
@ -13,12 +16,17 @@ import org.springframework.util.StringUtils;
|
|||||||
|
|
||||||
import com.fasterxml.jackson.annotation.JsonInclude.Include;
|
import com.fasterxml.jackson.annotation.JsonInclude.Include;
|
||||||
import com.fasterxml.jackson.core.JsonFactory;
|
import com.fasterxml.jackson.core.JsonFactory;
|
||||||
|
import com.fasterxml.jackson.core.Version;
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
import com.fasterxml.jackson.databind.module.SimpleModule;
|
||||||
|
|
||||||
|
import ch.psi.daq.cassandra.util.test.CassandraDataGen;
|
||||||
|
import ch.psi.daq.common.json.deserialize.AttributeBasedDeserializer;
|
||||||
import ch.psi.daq.common.statistic.StorelessStatistics;
|
import ch.psi.daq.common.statistic.StorelessStatistics;
|
||||||
import ch.psi.daq.domain.cassandra.ChannelEvent;
|
import ch.psi.daq.domain.cassandra.ChannelEvent;
|
||||||
import ch.psi.daq.query.config.QueryClientConfig;
|
|
||||||
import ch.psi.daq.query.config.QueryConfig;
|
import ch.psi.daq.query.config.QueryConfig;
|
||||||
|
import ch.psi.daq.query.model.AbstractQuery;
|
||||||
|
import ch.psi.daq.query.model.QueryField;
|
||||||
import ch.psi.daq.queryrest.model.PropertyFilterMixin;
|
import ch.psi.daq.queryrest.model.PropertyFilterMixin;
|
||||||
import ch.psi.daq.queryrest.response.ResponseStreamWriter;
|
import ch.psi.daq.queryrest.response.ResponseStreamWriter;
|
||||||
|
|
||||||
@ -27,18 +35,44 @@ import ch.psi.daq.queryrest.response.ResponseStreamWriter;
|
|||||||
@PropertySource(value = {"file:${user.home}/.config/daq/queryrest.properties"}, ignoreResourceNotFound = true)
|
@PropertySource(value = {"file:${user.home}/.config/daq/queryrest.properties"}, ignoreResourceNotFound = true)
|
||||||
public class QueryRestConfig {
|
public class QueryRestConfig {
|
||||||
|
|
||||||
@Autowired
|
|
||||||
private Environment env;
|
|
||||||
|
|
||||||
// a nested configuration
|
// a nested configuration
|
||||||
// this guarantees that the ordering of the properties file is as expected
|
// this guarantees that the ordering of the properties file is as expected
|
||||||
// see:
|
// see:
|
||||||
// https://jira.spring.io/browse/SPR-10409?focusedCommentId=101393&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-101393
|
// https://jira.spring.io/browse/SPR-10409?focusedCommentId=101393&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-101393
|
||||||
@Configuration
|
@Configuration
|
||||||
@Import({QueryConfig.class, QueryClientConfig.class})
|
@Import({QueryConfig.class})
|
||||||
static class InnerConfiguration {
|
static class InnerConfiguration {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static final Logger LOGGER = LoggerFactory.getLogger(QueryRestConfig.class);
|
||||||
|
|
||||||
|
public static final String BEAN_NAME_DEFAULT_RESPONSE_FIELDS = "defaultResponseFields";
|
||||||
|
|
||||||
|
@Resource
|
||||||
|
private Environment env;
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public ObjectMapper objectMapper() {
|
||||||
|
ObjectMapper mapper = new ObjectMapper();
|
||||||
|
|
||||||
|
String packageName = AbstractQuery.class.getPackage().getName();
|
||||||
|
Class<AbstractQuery> abstractQueryClass = AbstractQuery.class;
|
||||||
|
AttributeBasedDeserializer<AbstractQuery> abstractQueryDeserializer =
|
||||||
|
new AttributeBasedDeserializer<AbstractQuery>(abstractQueryClass).register(packageName);
|
||||||
|
SimpleModule module = new SimpleModule("PolymorphicAbstractQuery", Version.unknownVersion());
|
||||||
|
module.addDeserializer(abstractQueryClass, abstractQueryDeserializer);
|
||||||
|
mapper.registerModule(module);
|
||||||
|
|
||||||
|
// only include non-null values
|
||||||
|
mapper.setSerializationInclusion(Include.NON_NULL);
|
||||||
|
// Mixin which is used dynamically to filter out which properties get serialised and which
|
||||||
|
// won't. This way, the user can specify which columns are to be received.
|
||||||
|
mapper.addMixIn(ChannelEvent.class, PropertyFilterMixin.class);
|
||||||
|
mapper.addMixIn(StorelessStatistics.class, PropertyFilterMixin.class);
|
||||||
|
|
||||||
|
return mapper;
|
||||||
|
}
|
||||||
|
|
||||||
@Bean
|
@Bean
|
||||||
public JsonFactory jsonFactory() {
|
public JsonFactory jsonFactory() {
|
||||||
return new JsonFactory();
|
return new JsonFactory();
|
||||||
@ -49,27 +83,28 @@ public class QueryRestConfig {
|
|||||||
return new ResponseStreamWriter();
|
return new ResponseStreamWriter();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Bean
|
@Bean(name = BEAN_NAME_DEFAULT_RESPONSE_FIELDS)
|
||||||
public ObjectMapper objectMapper() {
|
public Set<QueryField> defaultResponseFields() {
|
||||||
ObjectMapper mapper = new ObjectMapper();
|
String[] responseFields =
|
||||||
// only include non-null values
|
StringUtils.commaDelimitedListToStringArray(env.getProperty("queryrest.default.response.fields"));
|
||||||
mapper.setSerializationInclusion(Include.NON_NULL);
|
|
||||||
// Mixin which is used dynamically to filter out which properties get serialised and which
|
|
||||||
// won't. This way, the user can specify which columns are to be received.
|
|
||||||
mapper.addMixIn(ChannelEvent.class, PropertyFilterMixin.class);
|
|
||||||
mapper.addMixIn(StorelessStatistics.class, PropertyFilterMixin.class);
|
|
||||||
return mapper;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Bean
|
|
||||||
public Set<String> defaultResponseFields() {
|
|
||||||
String[] responseFields = StringUtils.commaDelimitedListToStringArray(env.getProperty("queryrest.default.response.fields"));
|
|
||||||
// preserve order
|
// preserve order
|
||||||
LinkedHashSet<String> defaultResponseFields = new LinkedHashSet<>(responseFields.length);
|
LinkedHashSet<QueryField> defaultResponseFields = new LinkedHashSet<>(responseFields.length);
|
||||||
for (String field : defaultResponseFields) {
|
for (String field : responseFields) {
|
||||||
defaultResponseFields.add(field);
|
try {
|
||||||
|
defaultResponseFields.add(QueryField.valueOf(field));
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOGGER.error("Field '{}' in queryrest.default.response.fields is invalid.", field, e);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return defaultResponseFields;
|
return defaultResponseFields;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
// ==========================================================================================
|
||||||
|
// TODO: This is simply for initial / rudimentary testing - remove once further evolved
|
||||||
|
@Bean
|
||||||
|
public CassandraDataGen cassandraDataGen() {
|
||||||
|
return new CassandraDataGen();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,145 +1,114 @@
|
|||||||
package ch.psi.daq.queryrest.controller;
|
package ch.psi.daq.queryrest.controller;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Arrays;
|
import java.util.Collection;
|
||||||
import java.util.List;
|
import java.util.LinkedHashSet;
|
||||||
import java.util.Map;
|
import java.util.Map.Entry;
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.Set;
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
import java.util.function.LongFunction;
|
|
||||||
import java.util.function.LongUnaryOperator;
|
|
||||||
import java.util.stream.Collectors;
|
|
||||||
import java.util.stream.LongStream;
|
|
||||||
import java.util.stream.Stream;
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
|
import javax.annotation.Resource;
|
||||||
import javax.servlet.http.HttpServletResponse;
|
import javax.servlet.http.HttpServletResponse;
|
||||||
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.http.MediaType;
|
||||||
import org.springframework.util.Assert;
|
import org.springframework.web.bind.annotation.PathVariable;
|
||||||
import org.springframework.web.bind.annotation.RequestBody;
|
import org.springframework.web.bind.annotation.RequestBody;
|
||||||
import org.springframework.web.bind.annotation.RequestMapping;
|
import org.springframework.web.bind.annotation.RequestMapping;
|
||||||
|
import org.springframework.web.bind.annotation.RequestMethod;
|
||||||
|
import org.springframework.web.bind.annotation.ResponseBody;
|
||||||
import org.springframework.web.bind.annotation.RestController;
|
import org.springframework.web.bind.annotation.RestController;
|
||||||
|
|
||||||
import ch.psi.daq.cassandra.writer.CassandraWriter;
|
import ch.psi.daq.cassandra.util.test.CassandraDataGen;
|
||||||
import ch.psi.daq.domain.DataType;
|
|
||||||
import ch.psi.daq.domain.cassandra.ChannelEvent;
|
|
||||||
import ch.psi.daq.domain.cassandra.DataEvent;
|
import ch.psi.daq.domain.cassandra.DataEvent;
|
||||||
import ch.psi.daq.query.model.AbstractQuery;
|
import ch.psi.daq.query.model.AbstractQuery;
|
||||||
import ch.psi.daq.query.model.PulseRangeQuery;
|
import ch.psi.daq.query.model.QueryField;
|
||||||
import ch.psi.daq.query.model.TimeRangeQuery;
|
|
||||||
import ch.psi.daq.query.processor.QueryProcessor;
|
import ch.psi.daq.query.processor.QueryProcessor;
|
||||||
|
import ch.psi.daq.queryrest.config.QueryRestConfig;
|
||||||
import ch.psi.daq.queryrest.response.ResponseStreamWriter;
|
import ch.psi.daq.queryrest.response.ResponseStreamWriter;
|
||||||
|
|
||||||
@RestController
|
@RestController
|
||||||
public class QueryRestController {
|
public class QueryRestController {
|
||||||
|
|
||||||
private static final Logger logger = LoggerFactory.getLogger(QueryRestController.class);
|
private static final Logger LOGGER = LoggerFactory.getLogger(QueryRestController.class);
|
||||||
|
|
||||||
@Autowired
|
public static final String CHANNELS = "channels";
|
||||||
private CassandraWriter cassandraWriter;
|
public static final String CHANNELS_REGEX = CHANNELS + "/{regex}";
|
||||||
|
public static final String QUERY = "query";
|
||||||
|
|
||||||
@Autowired
|
@Resource
|
||||||
private ResponseStreamWriter responseStreamWriter;
|
private ResponseStreamWriter responseStreamWriter;
|
||||||
|
|
||||||
@Autowired
|
@Resource
|
||||||
private QueryProcessor queryProcessor;
|
private QueryProcessor queryProcessor;
|
||||||
|
|
||||||
|
@Resource(name = QueryRestConfig.BEAN_NAME_DEFAULT_RESPONSE_FIELDS)
|
||||||
|
private Set<QueryField> defaultResponseFields;
|
||||||
|
|
||||||
@RequestMapping(value = "/pulserange")
|
@RequestMapping(
|
||||||
public void pulseRange(@RequestBody PulseRangeQuery query, HttpServletResponse res) throws IOException {
|
value = CHANNELS,
|
||||||
|
method = RequestMethod.GET,
|
||||||
logger.debug("PulseRangeQuery received: {}", query);
|
produces = {MediaType.APPLICATION_JSON_VALUE})
|
||||||
|
public @ResponseBody Collection<String> getChannels() throws Throwable {
|
||||||
executeQuery(query, res);
|
try {
|
||||||
|
return queryProcessor.getChannels();
|
||||||
|
} catch (Throwable t) {
|
||||||
|
LOGGER.error("Failed to query channel names.", t);
|
||||||
|
throw t;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@RequestMapping(
|
||||||
@RequestMapping(value = "/timerange")
|
value = CHANNELS_REGEX,
|
||||||
public void timeRange(@RequestBody TimeRangeQuery query, HttpServletResponse res) throws IOException {
|
method = RequestMethod.GET,
|
||||||
|
produces = {MediaType.APPLICATION_JSON_VALUE})
|
||||||
logger.debug("TimeRangeQuery received: {}", query);
|
public @ResponseBody Collection<String> getChannels(@PathVariable String regex) throws Throwable {
|
||||||
|
try {
|
||||||
executeQuery(query, res);
|
return queryProcessor.getChannels(regex);
|
||||||
|
} catch (Throwable t) {
|
||||||
|
LOGGER.error("Failed to query channel names with regex '{}'.", regex, t);
|
||||||
|
throw t;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@RequestMapping(
|
||||||
|
value = QUERY,
|
||||||
|
method = RequestMethod.GET,
|
||||||
|
consumes = {MediaType.APPLICATION_JSON_VALUE},
|
||||||
|
produces = {MediaType.APPLICATION_JSON_VALUE})
|
||||||
|
public void executeQuery(@RequestBody AbstractQuery query, HttpServletResponse res) throws IOException {
|
||||||
|
try {
|
||||||
|
LOGGER.debug("Execute query '{}'", query.getClass().getSimpleName());
|
||||||
|
|
||||||
private void executeQuery(AbstractQuery query, HttpServletResponse res) throws IOException {
|
validateQuery(query);
|
||||||
|
|
||||||
// all the magic happens here
|
// all the magic happens here
|
||||||
Map<String, Stream<? extends DataEvent>> process = queryProcessor.process(query);
|
Stream<Entry<String, Stream<? extends DataEvent>>> process = queryProcessor.process(query);
|
||||||
|
|
||||||
Stream<DataEvent> flatStreams = process.values().stream().flatMap(s -> {
|
// write the response back to the client using java 8 streams
|
||||||
return s;
|
responseStreamWriter.respond(process, query, res);
|
||||||
});
|
} catch (Throwable t) {
|
||||||
|
LOGGER.error("Failed execute query '{}'.", query, t);
|
||||||
|
throw t;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// write the response back to the client using java 8 streams
|
// TODO: can we do this with built-in validators?
|
||||||
responseStreamWriter.respond(flatStreams, query, res);
|
private void validateQuery(AbstractQuery query) {
|
||||||
|
if (query.getFields() == null || query.getFields().isEmpty()) {
|
||||||
|
query.setFields(new LinkedHashSet<>(defaultResponseFields));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// ==========================================================================================
|
// ==========================================================================================
|
||||||
// TODO this is simply a method for initial / rudimentary testing - remove once further evolved
|
// TODO: This is simply for initial / rudimentary testing - remove once further evolved
|
||||||
|
|
||||||
|
@Resource
|
||||||
|
private CassandraDataGen cassandraDataGen;
|
||||||
|
|
||||||
@RequestMapping(value = "/write")
|
@RequestMapping(value = "/write")
|
||||||
public long writeDummyEntry() {
|
public void writeDummyEntry() {
|
||||||
|
cassandraDataGen.writeData(3, 0, 100, "channel1", "channel2");
|
||||||
long startIndex = System.currentTimeMillis();
|
|
||||||
|
|
||||||
writeData(3,startIndex, 100, new String[]{"channel1", "channel2"});
|
|
||||||
|
|
||||||
return startIndex;
|
|
||||||
}
|
|
||||||
|
|
||||||
private void writeData(int dataReplication, long startIndex, long nrOfElements,
|
|
||||||
String... channelNames) {
|
|
||||||
writeData(dataReplication, startIndex, nrOfElements,
|
|
||||||
i -> 2 * i,
|
|
||||||
i -> 2 * i,
|
|
||||||
i -> 2 * i,
|
|
||||||
i -> 2 * i,
|
|
||||||
i -> 2 * i,
|
|
||||||
i -> Long.valueOf(2 * i),
|
|
||||||
channelNames);
|
|
||||||
}
|
|
||||||
|
|
||||||
private <T> void writeData(int dataReplication, long startIndex, long nrOfElements,
|
|
||||||
LongUnaryOperator iocMillis, LongUnaryOperator iocNanos, LongUnaryOperator pulseIds,
|
|
||||||
LongUnaryOperator globalMillis, LongUnaryOperator globalNanos, LongFunction<T> valueFunction,
|
|
||||||
String... channelNames) {
|
|
||||||
|
|
||||||
Assert.notNull(channelNames);
|
|
||||||
|
|
||||||
CompletableFuture<Void> future;
|
|
||||||
|
|
||||||
List<ChannelEvent> events =
|
|
||||||
Arrays.stream(channelNames)
|
|
||||||
.parallel()
|
|
||||||
.flatMap(
|
|
||||||
channelName -> {
|
|
||||||
Stream<ChannelEvent> stream =
|
|
||||||
LongStream
|
|
||||||
.range(startIndex, startIndex + nrOfElements)
|
|
||||||
.parallel()
|
|
||||||
.mapToObj(
|
|
||||||
i -> {
|
|
||||||
Object value = valueFunction.apply(i);
|
|
||||||
return new ChannelEvent(channelName,
|
|
||||||
iocMillis.applyAsLong(i),
|
|
||||||
iocNanos.applyAsLong(i),
|
|
||||||
pulseIds.applyAsLong(i),
|
|
||||||
globalMillis.applyAsLong(i),
|
|
||||||
globalNanos.applyAsLong(i),
|
|
||||||
value,
|
|
||||||
DataType.getTypeName(value.getClass())
|
|
||||||
);
|
|
||||||
}
|
|
||||||
);
|
|
||||||
|
|
||||||
return stream;
|
|
||||||
})
|
|
||||||
.collect(Collectors.toList());
|
|
||||||
|
|
||||||
future = cassandraWriter.writeAsync(dataReplication, (int) TimeUnit.HOURS.toSeconds(1), events);
|
|
||||||
future.join();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -3,7 +3,7 @@ package ch.psi.daq.queryrest.model;
|
|||||||
import com.fasterxml.jackson.annotation.JsonFilter;
|
import com.fasterxml.jackson.annotation.JsonFilter;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*
|
* Kind of marker for ObjectMapper MixIn
|
||||||
*/
|
*/
|
||||||
@JsonFilter("namedPropertyFilter")
|
@JsonFilter("namedPropertyFilter")
|
||||||
public class PropertyFilterMixin {
|
public class PropertyFilterMixin {
|
||||||
|
@ -2,14 +2,16 @@ package ch.psi.daq.queryrest.response;
|
|||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.LinkedHashSet;
|
import java.util.LinkedHashSet;
|
||||||
|
import java.util.List;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.Map.Entry;
|
||||||
import java.util.stream.Stream;
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
|
import javax.annotation.Resource;
|
||||||
import javax.servlet.ServletResponse;
|
import javax.servlet.ServletResponse;
|
||||||
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
|
||||||
|
|
||||||
import com.fasterxml.jackson.core.JsonEncoding;
|
import com.fasterxml.jackson.core.JsonEncoding;
|
||||||
import com.fasterxml.jackson.core.JsonFactory;
|
import com.fasterxml.jackson.core.JsonFactory;
|
||||||
@ -22,6 +24,7 @@ import com.fasterxml.jackson.databind.ser.impl.SimpleFilterProvider;
|
|||||||
import ch.psi.daq.domain.cassandra.DataEvent;
|
import ch.psi.daq.domain.cassandra.DataEvent;
|
||||||
import ch.psi.daq.query.model.AbstractQuery;
|
import ch.psi.daq.query.model.AbstractQuery;
|
||||||
import ch.psi.daq.query.model.AggregationEnum;
|
import ch.psi.daq.query.model.AggregationEnum;
|
||||||
|
import ch.psi.daq.query.model.QueryField;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Takes a Java 8 stream and writes it to the output stream provided by the {@link ServletResponse}
|
* Takes a Java 8 stream and writes it to the output stream provided by the {@link ServletResponse}
|
||||||
@ -31,15 +34,12 @@ public class ResponseStreamWriter {
|
|||||||
|
|
||||||
private static final Logger logger = LoggerFactory.getLogger(ResponseStreamWriter.class);
|
private static final Logger logger = LoggerFactory.getLogger(ResponseStreamWriter.class);
|
||||||
|
|
||||||
@Autowired
|
@Resource
|
||||||
private JsonFactory jsonFactory;
|
private JsonFactory jsonFactory;
|
||||||
|
|
||||||
@Autowired
|
@Resource
|
||||||
private ObjectMapper mapper;
|
private ObjectMapper mapper;
|
||||||
|
|
||||||
@Autowired
|
|
||||||
private Set<String> defaultResponseFields;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Responding with the the contents of the stream by writing into the output stream of the
|
* Responding with the the contents of the stream by writing into the output stream of the
|
||||||
* {@link ServletResponse}.
|
* {@link ServletResponse}.
|
||||||
@ -49,16 +49,26 @@ public class ResponseStreamWriter {
|
|||||||
* @param response {@link ServletResponse} instance given by the current HTTP request
|
* @param response {@link ServletResponse} instance given by the current HTTP request
|
||||||
* @throws IOException thrown if writing to the output stream fails
|
* @throws IOException thrown if writing to the output stream fails
|
||||||
*/
|
*/
|
||||||
public void respond(Stream<DataEvent> stream, AbstractQuery query, ServletResponse response) throws IOException {
|
public void respond(Stream<Entry<String, Stream<? extends DataEvent>>> stream, AbstractQuery query,
|
||||||
|
ServletResponse response) throws IOException {
|
||||||
|
|
||||||
Set<String> includedFields = query.getFieldsOrDefault(defaultResponseFields);
|
Set<QueryField> queryFields = query.getFields();
|
||||||
|
List<AggregationEnum> aggregations = query.getAggregations();
|
||||||
|
|
||||||
if (query.getAggregations() != null) {
|
Set<String> includedFields =
|
||||||
includedFields = new LinkedHashSet<String>(includedFields);
|
new LinkedHashSet<String>(queryFields.size() + (aggregations != null ? aggregations.size() : 0));
|
||||||
|
|
||||||
|
for (QueryField field : queryFields) {
|
||||||
|
includedFields.add(field.name());
|
||||||
|
}
|
||||||
|
if (aggregations != null) {
|
||||||
for (AggregationEnum aggregation : query.getAggregations()) {
|
for (AggregationEnum aggregation : query.getAggregations()) {
|
||||||
includedFields.add(aggregation.name());
|
includedFields.add(aggregation.name());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// do not write channel since it is already provided as key in mapping
|
||||||
|
includedFields.remove(QueryField.channel.name());
|
||||||
|
|
||||||
ObjectWriter writer = configureWriter(includedFields);
|
ObjectWriter writer = configureWriter(includedFields);
|
||||||
respondInternal(stream, response, writer);
|
respondInternal(stream, response, writer);
|
||||||
}
|
}
|
||||||
@ -87,20 +97,39 @@ public class ResponseStreamWriter {
|
|||||||
* @param writer configured writer that includes the fields the end user wants to see
|
* @param writer configured writer that includes the fields the end user wants to see
|
||||||
* @throws IOException thrown if writing to the output stream fails
|
* @throws IOException thrown if writing to the output stream fails
|
||||||
*/
|
*/
|
||||||
private void respondInternal(Stream<DataEvent> stream, ServletResponse response, ObjectWriter writer)
|
private void respondInternal(Stream<Entry<String, Stream<? extends DataEvent>>> stream, ServletResponse response,
|
||||||
|
ObjectWriter writer)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
|
||||||
JsonGenerator generator = jsonFactory.createGenerator(response.getOutputStream(), JsonEncoding.UTF8);
|
JsonGenerator generator = jsonFactory.createGenerator(response.getOutputStream(), JsonEncoding.UTF8);
|
||||||
generator.writeStartArray();
|
generator.writeStartArray();
|
||||||
stream.forEach(ds -> {
|
stream
|
||||||
try {
|
/* ensure elements are sequentially written to the stream */
|
||||||
logger.trace("Writing value for: {}", ds);
|
.sequential()
|
||||||
// use the writer created just before
|
.forEach(
|
||||||
writer.writeValue(generator, ds);
|
entry -> {
|
||||||
} catch (Exception e) {
|
try {
|
||||||
logger.error("", e);
|
generator.writeStartObject();
|
||||||
}
|
generator.writeStringField(QueryField.channel.name(), entry.getKey());
|
||||||
});
|
generator.writeArrayFieldStart("values");
|
||||||
|
entry.getValue()
|
||||||
|
/* ensure elements are sequentially written to the stream */
|
||||||
|
.sequential()
|
||||||
|
.forEach(
|
||||||
|
dataEvent -> {
|
||||||
|
try {
|
||||||
|
writer.writeValue(generator, dataEvent);
|
||||||
|
} catch (Exception e) {
|
||||||
|
logger.error("Could not write event with pulse-id '{}' of channel '{}'",
|
||||||
|
dataEvent.getPulseId(), entry.getKey(), e);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
generator.writeEndArray();
|
||||||
|
generator.writeEndObject();
|
||||||
|
} catch (Exception e) {
|
||||||
|
logger.error("Could not write channel name of channel '{}'", entry.getKey(), e);
|
||||||
|
}
|
||||||
|
});
|
||||||
generator.writeEndArray();
|
generator.writeEndArray();
|
||||||
generator.flush();
|
generator.flush();
|
||||||
generator.close();
|
generator.close();
|
||||||
|
@ -1,12 +1,13 @@
|
|||||||
package ch.psi.daq.test.queryrest;
|
package ch.psi.daq.test.queryrest;
|
||||||
|
|
||||||
|
import javax.annotation.Resource;
|
||||||
|
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.runner.RunWith;
|
import org.junit.runner.RunWith;
|
||||||
import org.mockito.MockitoAnnotations;
|
import org.mockito.MockitoAnnotations;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
|
||||||
import org.springframework.boot.test.SpringApplicationConfiguration;
|
|
||||||
import org.springframework.test.annotation.DirtiesContext;
|
import org.springframework.test.annotation.DirtiesContext;
|
||||||
import org.springframework.test.annotation.DirtiesContext.ClassMode;
|
import org.springframework.test.annotation.DirtiesContext.ClassMode;
|
||||||
|
import org.springframework.test.context.ContextConfiguration;
|
||||||
import org.springframework.test.context.TestExecutionListeners;
|
import org.springframework.test.context.TestExecutionListeners;
|
||||||
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
|
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
|
||||||
import org.springframework.test.context.support.DependencyInjectionTestExecutionListener;
|
import org.springframework.test.context.support.DependencyInjectionTestExecutionListener;
|
||||||
@ -17,26 +18,26 @@ import org.springframework.web.context.WebApplicationContext;
|
|||||||
|
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
|
||||||
import ch.psi.daq.queryrest.QueryRestApplication;
|
|
||||||
import ch.psi.daq.test.cassandra.CassandraDaqUnitDependencyInjectionTestExecutionListener;
|
import ch.psi.daq.test.cassandra.CassandraDaqUnitDependencyInjectionTestExecutionListener;
|
||||||
|
|
||||||
|
|
||||||
@TestExecutionListeners({
|
@TestExecutionListeners({
|
||||||
CassandraDaqUnitDependencyInjectionTestExecutionListener.class,
|
CassandraDaqUnitDependencyInjectionTestExecutionListener.class,
|
||||||
DependencyInjectionTestExecutionListener.class})
|
DependencyInjectionTestExecutionListener.class})
|
||||||
@SpringApplicationConfiguration(classes = {QueryRestApplication.class, DaqWebMvcConfig.class})
|
//@SpringApplicationConfiguration(classes = {QueryRestApplication.class, DaqWebMvcConfig.class})
|
||||||
//@EmbeddedCassandra
|
//@EmbeddedCassandra
|
||||||
@DirtiesContext(classMode = ClassMode.AFTER_EACH_TEST_METHOD)
|
@DirtiesContext(classMode = ClassMode.AFTER_EACH_TEST_METHOD)
|
||||||
@WebAppConfiguration
|
@WebAppConfiguration
|
||||||
|
@ContextConfiguration(classes = DaqWebMvcConfig.class)
|
||||||
@RunWith(SpringJUnit4ClassRunner.class)
|
@RunWith(SpringJUnit4ClassRunner.class)
|
||||||
public abstract class AbstractDaqRestTest {
|
public abstract class AbstractDaqRestTest {
|
||||||
|
|
||||||
@Autowired
|
@Resource
|
||||||
protected WebApplicationContext webApplicationContext;
|
protected WebApplicationContext webApplicationContext;
|
||||||
|
|
||||||
protected MockMvc mockMvc;
|
protected MockMvc mockMvc;
|
||||||
|
|
||||||
@Autowired
|
@Resource
|
||||||
protected ObjectMapper mapper;
|
protected ObjectMapper mapper;
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
|
@ -3,20 +3,27 @@ package ch.psi.daq.test.queryrest;
|
|||||||
import org.springframework.context.annotation.Bean;
|
import org.springframework.context.annotation.Bean;
|
||||||
import org.springframework.context.annotation.Configuration;
|
import org.springframework.context.annotation.Configuration;
|
||||||
import org.springframework.context.annotation.Import;
|
import org.springframework.context.annotation.Import;
|
||||||
|
import org.springframework.context.annotation.PropertySource;
|
||||||
|
import org.springframework.context.annotation.PropertySources;
|
||||||
import org.springframework.web.servlet.config.annotation.EnableWebMvc;
|
import org.springframework.web.servlet.config.annotation.EnableWebMvc;
|
||||||
import org.springframework.web.servlet.config.annotation.WebMvcConfigurationSupport;
|
import org.springframework.web.servlet.config.annotation.WebMvcConfigurationSupport;
|
||||||
|
|
||||||
import ch.psi.daq.query.processor.QueryProcessor;
|
import ch.psi.daq.query.processor.QueryProcessor;
|
||||||
import ch.psi.daq.queryrest.config.QueryRestConfig;
|
import ch.psi.daq.test.query.config.LocalQueryTestConfig;
|
||||||
import ch.psi.daq.test.cassandra.config.LocalCassandraTestConfig;
|
|
||||||
import ch.psi.daq.test.queryrest.query.DummyQueryProcessor;
|
import ch.psi.daq.test.queryrest.query.DummyQueryProcessor;
|
||||||
|
|
||||||
@Configuration
|
@Configuration
|
||||||
@Import(value = {LocalCassandraTestConfig.class, QueryRestConfig.class})
|
@PropertySources(value = {
|
||||||
|
@PropertySource(value = {"classpath:queryrest-test.properties"})
|
||||||
|
})
|
||||||
@EnableWebMvc
|
@EnableWebMvc
|
||||||
public class DaqWebMvcConfig extends WebMvcConfigurationSupport {
|
public class DaqWebMvcConfig extends WebMvcConfigurationSupport {
|
||||||
|
|
||||||
// add test-specific beans and configurations here
|
// ensure that properties in dispatcher.properties are loaded first and then overwritten by the
|
||||||
|
// properties in dispatcher-test.properties
|
||||||
|
@Import(value = {LocalQueryTestConfig.class})
|
||||||
|
static class InnerConfiguration {
|
||||||
|
}
|
||||||
|
|
||||||
@Bean
|
@Bean
|
||||||
public QueryProcessor queryProcessor() {
|
public QueryProcessor queryProcessor() {
|
||||||
|
@ -1,23 +1,14 @@
|
|||||||
package ch.psi.daq.test.queryrest.controller;
|
package ch.psi.daq.test.queryrest.controller;
|
||||||
|
|
||||||
import java.util.Date;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
|
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.springframework.http.MediaType;
|
import org.springframework.http.MediaType;
|
||||||
import org.springframework.test.web.servlet.request.MockMvcRequestBuilders;
|
import org.springframework.test.web.servlet.request.MockMvcRequestBuilders;
|
||||||
import org.springframework.test.web.servlet.result.MockMvcResultHandlers;
|
import org.springframework.test.web.servlet.result.MockMvcResultHandlers;
|
||||||
import org.springframework.test.web.servlet.result.MockMvcResultMatchers;
|
import org.springframework.test.web.servlet.result.MockMvcResultMatchers;
|
||||||
|
|
||||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
|
||||||
import com.google.common.collect.Lists;
|
|
||||||
import com.google.common.collect.Sets;
|
|
||||||
|
|
||||||
import ch.psi.daq.common.ordering.Ordering;
|
|
||||||
import ch.psi.daq.query.model.AggregationType;
|
|
||||||
import ch.psi.daq.query.model.PulseRangeQuery;
|
import ch.psi.daq.query.model.PulseRangeQuery;
|
||||||
import ch.psi.daq.query.model.TimeRangeQuery;
|
import ch.psi.daq.query.model.TimeRangeQuery;
|
||||||
|
import ch.psi.daq.query.model.TimeRangeQueryDate;
|
||||||
import ch.psi.daq.test.queryrest.AbstractDaqRestTest;
|
import ch.psi.daq.test.queryrest.AbstractDaqRestTest;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -25,24 +16,17 @@ import ch.psi.daq.test.queryrest.AbstractDaqRestTest;
|
|||||||
*/
|
*/
|
||||||
public class DaqRestControllerTest extends AbstractDaqRestTest {
|
public class DaqRestControllerTest extends AbstractDaqRestTest {
|
||||||
|
|
||||||
private static final List<String> DEFAULT_PROPERTIES = Lists.newArrayList("channel", "pulseId");
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testPulseRangeQuery() throws Exception {
|
public void testPulseRangeQuery() throws Exception {
|
||||||
PulseRangeQuery request = new PulseRangeQuery(
|
PulseRangeQuery request = new PulseRangeQuery(
|
||||||
100,
|
100,
|
||||||
101
|
101
|
||||||
);
|
);
|
||||||
request.setOrdering(Ordering.desc);
|
|
||||||
request.setFields(Sets.newLinkedHashSet(DEFAULT_PROPERTIES));
|
|
||||||
request.setNrOfBins(100);
|
|
||||||
request.setAggregateChannels(false);
|
|
||||||
request.setAggregationType(AggregationType.index);
|
|
||||||
|
|
||||||
String content = mapper.writeValueAsString(request);
|
String content = mapper.writeValueAsString(request);
|
||||||
|
|
||||||
this.mockMvc
|
this.mockMvc
|
||||||
.perform(MockMvcRequestBuilders.post("/pulserange")
|
.perform(MockMvcRequestBuilders.post("/query")
|
||||||
.contentType(MediaType.APPLICATION_JSON)
|
.contentType(MediaType.APPLICATION_JSON)
|
||||||
.content(content))
|
.content(content))
|
||||||
|
|
||||||
@ -50,30 +34,27 @@ public class DaqRestControllerTest extends AbstractDaqRestTest {
|
|||||||
.andExpect(MockMvcResultMatchers.status().isOk())
|
.andExpect(MockMvcResultMatchers.status().isOk())
|
||||||
.andExpect(MockMvcResultMatchers.jsonPath("$").isArray())
|
.andExpect(MockMvcResultMatchers.jsonPath("$").isArray())
|
||||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0]").exists())
|
.andExpect(MockMvcResultMatchers.jsonPath("$[0]").exists())
|
||||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].pulseId").value(100))
|
.andExpect(MockMvcResultMatchers.jsonPath("$[0].channel").value("testChannel1"))
|
||||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].channel").value("testChannel1"));
|
.andExpect(MockMvcResultMatchers.jsonPath("$[0].values").isArray())
|
||||||
|
.andExpect(MockMvcResultMatchers.jsonPath("$[0].values[0].pulseId").value(100))
|
||||||
|
.andExpect(MockMvcResultMatchers.jsonPath("$[0].values[1].pulseId").value(101))
|
||||||
|
.andExpect(MockMvcResultMatchers.jsonPath("$[1]").exists())
|
||||||
|
.andExpect(MockMvcResultMatchers.jsonPath("$[1].channel").value("testChannel2"))
|
||||||
|
.andExpect(MockMvcResultMatchers.jsonPath("$[1].values").isArray())
|
||||||
|
.andExpect(MockMvcResultMatchers.jsonPath("$[1].values[0].pulseId").value(100))
|
||||||
|
.andExpect(MockMvcResultMatchers.jsonPath("$[1].values[1].pulseId").value(101));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testTimeRangeQuery() throws Exception {
|
public void testTimeRangeQuery() throws Exception {
|
||||||
long startTime = new Date().getTime();
|
|
||||||
long endTime = startTime + TimeUnit.SECONDS.toMillis(1);
|
|
||||||
|
|
||||||
TimeRangeQuery request = new TimeRangeQuery(
|
TimeRangeQuery request = new TimeRangeQuery(
|
||||||
startTime,
|
100,
|
||||||
endTime,
|
101);
|
||||||
"test");
|
|
||||||
request.setOrdering(Ordering.asc);
|
|
||||||
request.setFields(Sets.newLinkedHashSet(DEFAULT_PROPERTIES));
|
|
||||||
request.setNrOfBins(100);
|
|
||||||
request.setAggregateChannels(false);
|
|
||||||
request.setAggregationType(AggregationType.index);
|
|
||||||
|
|
||||||
String content = mapper.writeValueAsString(request);
|
String content = mapper.writeValueAsString(request);
|
||||||
System.out.println(content);
|
|
||||||
|
|
||||||
this.mockMvc
|
this.mockMvc
|
||||||
.perform(MockMvcRequestBuilders.post("/timerange")
|
.perform(MockMvcRequestBuilders.post("/query")
|
||||||
.contentType(MediaType.APPLICATION_JSON)
|
.contentType(MediaType.APPLICATION_JSON)
|
||||||
.content(content))
|
.content(content))
|
||||||
|
|
||||||
@ -81,24 +62,44 @@ public class DaqRestControllerTest extends AbstractDaqRestTest {
|
|||||||
.andExpect(MockMvcResultMatchers.status().isOk())
|
.andExpect(MockMvcResultMatchers.status().isOk())
|
||||||
.andExpect(MockMvcResultMatchers.jsonPath("$").isArray())
|
.andExpect(MockMvcResultMatchers.jsonPath("$").isArray())
|
||||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0]").exists())
|
.andExpect(MockMvcResultMatchers.jsonPath("$[0]").exists())
|
||||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].channel").value("testChannel1"));
|
.andExpect(MockMvcResultMatchers.jsonPath("$[0].channel").value("testChannel1"))
|
||||||
|
.andExpect(MockMvcResultMatchers.jsonPath("$[0].values").isArray())
|
||||||
|
.andExpect(MockMvcResultMatchers.jsonPath("$[0].values[0].pulseId").value(100))
|
||||||
|
.andExpect(MockMvcResultMatchers.jsonPath("$[0].values[1].pulseId").value(101))
|
||||||
|
.andExpect(MockMvcResultMatchers.jsonPath("$[1]").exists())
|
||||||
|
.andExpect(MockMvcResultMatchers.jsonPath("$[1].channel").value("testChannel2"))
|
||||||
|
.andExpect(MockMvcResultMatchers.jsonPath("$[1].values").isArray())
|
||||||
|
.andExpect(MockMvcResultMatchers.jsonPath("$[1].values[0].pulseId").value(100))
|
||||||
|
.andExpect(MockMvcResultMatchers.jsonPath("$[1].values[1].pulseId").value(101));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testMapper() throws JsonProcessingException {
|
public void testMapper() throws Exception {
|
||||||
long startTime = new Date().getTime();
|
String startDate = TimeRangeQueryDate.format(100);
|
||||||
long endTime = startTime + TimeUnit.SECONDS.toMillis(1);
|
String endDate = TimeRangeQueryDate.format(101);
|
||||||
TimeRangeQuery request = new TimeRangeQuery(
|
TimeRangeQueryDate request = new TimeRangeQueryDate(
|
||||||
startTime,
|
startDate,
|
||||||
endTime,
|
endDate);
|
||||||
"test");
|
|
||||||
request.setOrdering(Ordering.asc);
|
|
||||||
request.setFields(Sets.newLinkedHashSet(DEFAULT_PROPERTIES));
|
|
||||||
request.setNrOfBins(100);
|
|
||||||
request.setAggregateChannels(false);
|
|
||||||
request.setAggregationType(AggregationType.index);
|
|
||||||
|
|
||||||
String content = mapper.writeValueAsString(request);
|
String content = mapper.writeValueAsString(request);
|
||||||
System.out.println(content);
|
|
||||||
|
this.mockMvc
|
||||||
|
.perform(MockMvcRequestBuilders.post("/query")
|
||||||
|
.contentType(MediaType.APPLICATION_JSON)
|
||||||
|
.content(content))
|
||||||
|
|
||||||
|
.andDo(MockMvcResultHandlers.print())
|
||||||
|
.andExpect(MockMvcResultMatchers.status().isOk())
|
||||||
|
.andExpect(MockMvcResultMatchers.jsonPath("$").isArray())
|
||||||
|
.andExpect(MockMvcResultMatchers.jsonPath("$[0]").exists())
|
||||||
|
.andExpect(MockMvcResultMatchers.jsonPath("$[0].channel").value("testChannel1"))
|
||||||
|
.andExpect(MockMvcResultMatchers.jsonPath("$[0].values").isArray())
|
||||||
|
.andExpect(MockMvcResultMatchers.jsonPath("$[0].values[0].pulseId").value(100))
|
||||||
|
.andExpect(MockMvcResultMatchers.jsonPath("$[0].values[1].pulseId").value(101))
|
||||||
|
.andExpect(MockMvcResultMatchers.jsonPath("$[1]").exists())
|
||||||
|
.andExpect(MockMvcResultMatchers.jsonPath("$[1].channel").value("testChannel2"))
|
||||||
|
.andExpect(MockMvcResultMatchers.jsonPath("$[1].values").isArray())
|
||||||
|
.andExpect(MockMvcResultMatchers.jsonPath("$[1].values[0].pulseId").value(100))
|
||||||
|
.andExpect(MockMvcResultMatchers.jsonPath("$[1].values[1].pulseId").value(101));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -3,9 +3,12 @@
|
|||||||
*/
|
*/
|
||||||
package ch.psi.daq.test.queryrest.query;
|
package ch.psi.daq.test.queryrest.query;
|
||||||
|
|
||||||
|
import java.util.Collection;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Map.Entry;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
|
import java.util.regex.Pattern;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
import java.util.stream.IntStream;
|
import java.util.stream.IntStream;
|
||||||
import java.util.stream.Stream;
|
import java.util.stream.Stream;
|
||||||
@ -24,19 +27,36 @@ import ch.psi.daq.query.processor.QueryProcessor;
|
|||||||
*/
|
*/
|
||||||
public class DummyQueryProcessor implements QueryProcessor {
|
public class DummyQueryProcessor implements QueryProcessor {
|
||||||
|
|
||||||
private static final List<String> TEST_CHANNEL_NAMES = Lists.newArrayList("testChannel1", "testChannel2");
|
public static final List<String> TEST_CHANNEL_NAMES = Lists.newArrayList("testChannel1", "testChannel2");
|
||||||
|
|
||||||
/** {@inheritDoc}
|
@Override
|
||||||
|
public Collection<String> getChannels() {
|
||||||
|
return TEST_CHANNEL_NAMES;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Collection<String> getChannels(String regex) {
|
||||||
|
if (regex != null) {
|
||||||
|
Pattern pattern = Pattern.compile(regex);
|
||||||
|
return TEST_CHANNEL_NAMES.stream().filter(channel -> pattern.matcher(channel).find())
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
} else {
|
||||||
|
return getChannels();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@inheritDoc}
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public Map<String, Stream<? extends DataEvent>> process(Query query) {
|
public Stream<Entry<String, Stream<? extends DataEvent>>> process(Query query) {
|
||||||
|
|
||||||
int nrOfEntries = 2;
|
int nrOfEntries = 2;
|
||||||
int startingPulseId = 100;
|
int startingPulseId = 100;
|
||||||
|
|
||||||
Map<String, Stream<? extends DataEvent>> result = Maps.newHashMap();
|
Map<String, Stream<? extends DataEvent>> result = Maps.newHashMap();
|
||||||
|
|
||||||
TEST_CHANNEL_NAMES.forEach(chName ->{
|
TEST_CHANNEL_NAMES.forEach(chName -> {
|
||||||
|
|
||||||
List<DataEvent> entities = IntStream.rangeClosed(1, nrOfEntries).boxed().map(i -> {
|
List<DataEvent> entities = IntStream.rangeClosed(1, nrOfEntries).boxed().map(i -> {
|
||||||
long pulseId = startingPulseId + i - 1;
|
long pulseId = startingPulseId + i - 1;
|
||||||
@ -53,7 +73,6 @@ public class DummyQueryProcessor implements QueryProcessor {
|
|||||||
result.put(chName, entities.stream());
|
result.put(chName, entities.stream());
|
||||||
});
|
});
|
||||||
|
|
||||||
return result;
|
return result.entrySet().stream();
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
0
src/test/resources/queryrest-test.properties
Normal file
0
src/test/resources/queryrest-test.properties
Normal file
Reference in New Issue
Block a user