From 8ce83706010d3ad29ff7a03c9d7e4cfc58e9443a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fabian=20M=C3=A4rki?= Date: Wed, 29 Jul 2015 11:32:36 +0200 Subject: [PATCH] ATEST-122 --- Readme.md | 2 +- .../daq/queryrest/config/QueryRestConfig.java | 52 ++++++++- .../controller/QueryRestController.java | 37 +++++-- .../response/JsonStreamSerializer.java | 24 ++++ .../response/ResponseStreamWriter.java | 39 +++---- src/main/resources/queryrest.properties | 3 +- .../daq/test/queryrest/DaqWebMvcConfig.java | 11 +- .../controller/DaqRestControllerTest.java | 104 ++++++++++++++++-- .../test/queryrest/query/DummyDataReader.java | 58 ++++++++++ .../queryrest/query/DummyQueryProcessor.java | 78 ------------- 10 files changed, 278 insertions(+), 130 deletions(-) create mode 100644 src/main/java/ch/psi/daq/queryrest/response/JsonStreamSerializer.java create mode 100644 src/test/java/ch/psi/daq/test/queryrest/query/DummyDataReader.java delete mode 100644 src/test/java/ch/psi/daq/test/queryrest/query/DummyQueryProcessor.java diff --git a/Readme.md b/Readme.md index 59e87ef..543e57d 100644 --- a/Readme.md +++ b/Readme.md @@ -139,7 +139,7 @@ There exist following fields: - **fields**: The requested fields (see [here](https://github.psi.ch/projects/ST/repos/ch.psi.daq.query/browse/src/main/java/ch/psi/daq/query/model/QueryField.java) for possible values). - **nrOfBins**: Activates data binning. Specifies the number of bins the pulse/time range should be devided into. - **binSize**: Activates data binning. Specifies the number of pulses per bin for pulse-range queries or the number of milliseconds per bin for time-range queries. -- **aggregations**: Activates data aggregation. Array of requested aggregations (see [here](https://github.psi.ch/projects/ST/repos/ch.psi.daq.query/browse/src/main/java/ch/psi/daq/query/model/AggregationEnum.java) for possible values). These values will be added to the *data* array response. +- **aggregations**: Activates data aggregation. Array of requested aggregations (see [here](https://github.psi.ch/projects/ST/repos/ch.psi.daq.query/browse/src/main/java/ch/psi/daq/query/model/Aggregation.java) for possible values). These values will be added to the *data* array response. - **aggregationType**: Specifies the type of aggregation (see [here](https://github.psi.ch/projects/ST/repos/ch.psi.daq.query/browse/src/main/java/ch/psi/daq/query/model/AggregationType.java)). The default type is *value* aggregation (e.g., sum([1,2,3])=6). Alternatively, it is possible to define *index* aggregation for multiple arrays in combination with binning (e.g., sum([1,2,3], [3,2,1]) = [4,4,4]). - **aggregateChannels**: Specifies whether the data of the requested channels should be combined together using the defined aggregation (values: true|**false**) diff --git a/src/main/java/ch/psi/daq/queryrest/config/QueryRestConfig.java b/src/main/java/ch/psi/daq/queryrest/config/QueryRestConfig.java index c03ce72..1e09fac 100644 --- a/src/main/java/ch/psi/daq/queryrest/config/QueryRestConfig.java +++ b/src/main/java/ch/psi/daq/queryrest/config/QueryRestConfig.java @@ -1,7 +1,9 @@ package ch.psi.daq.queryrest.config; +import java.util.EnumMap; import java.util.LinkedHashSet; import java.util.Set; +import java.util.function.Function; import javax.annotation.Resource; @@ -23,11 +25,16 @@ import com.fasterxml.jackson.databind.module.SimpleModule; import ch.psi.daq.cassandra.util.test.CassandraDataGen; import ch.psi.daq.common.json.deserialize.AttributeBasedDeserializer; import ch.psi.daq.common.statistic.StorelessStatistics; -import ch.psi.daq.domain.cassandra.ChannelEvent; +import ch.psi.daq.domain.cassandra.DataEvent; +import ch.psi.daq.query.analyzer.CassandraQueryAnalyzer; +import ch.psi.daq.query.analyzer.QueryAnalyzer; import ch.psi.daq.query.config.QueryConfig; import ch.psi.daq.query.model.AbstractQuery; +import ch.psi.daq.query.model.Aggregation; +import ch.psi.daq.query.model.Query; import ch.psi.daq.query.model.QueryField; import ch.psi.daq.queryrest.model.PropertyFilterMixin; +import ch.psi.daq.queryrest.response.JsonStreamSerializer; import ch.psi.daq.queryrest.response.ResponseStreamWriter; @Configuration @@ -35,6 +42,10 @@ import ch.psi.daq.queryrest.response.ResponseStreamWriter; @PropertySource(value = {"file:${user.home}/.config/daq/queryrest.properties"}, ignoreResourceNotFound = true) public class QueryRestConfig { + private static final String QUERYREST_DEFAULT_RESPONSE_AGGREGATIONS = "queryrest.default.response.aggregations"; + + private static final String QUERYREST_DEFAULT_RESPONSE_FIELDS = "queryrest.default.response.fields"; + // a nested configuration // this guarantees that the ordering of the properties file is as expected // see: @@ -47,6 +58,7 @@ public class QueryRestConfig { private static final Logger LOGGER = LoggerFactory.getLogger(QueryRestConfig.class); public static final String BEAN_NAME_DEFAULT_RESPONSE_FIELDS = "defaultResponseFields"; + public static final String BEAN_NAME_DEFAULT_RESPONSE_AGGREGATIONS = "defaultResponseAggregations"; @Resource private Environment env; @@ -67,8 +79,14 @@ public class QueryRestConfig { mapper.setSerializationInclusion(Include.NON_NULL); // Mixin which is used dynamically to filter out which properties get serialised and which // won't. This way, the user can specify which columns are to be received. - mapper.addMixIn(ChannelEvent.class, PropertyFilterMixin.class); + mapper.addMixIn(DataEvent.class, PropertyFilterMixin.class); mapper.addMixIn(StorelessStatistics.class, PropertyFilterMixin.class); + mapper.addMixIn(EnumMap.class, PropertyFilterMixin.class); + + // defines how to writer inner Streams (i.e. Stream>> toSerialize) + module = new SimpleModule("Streams API", Version.unknownVersion()); + module.addSerializer(new JsonStreamSerializer()); + mapper.registerModule(module); return mapper; } @@ -78,6 +96,11 @@ public class QueryRestConfig { return new JsonFactory(); } + @Bean + public Function queryAnalizerFactory() { + return (query) -> new CassandraQueryAnalyzer(query); + } + @Bean public ResponseStreamWriter responseStreamWriter() { return new ResponseStreamWriter(); @@ -86,21 +109,38 @@ public class QueryRestConfig { @Bean(name = BEAN_NAME_DEFAULT_RESPONSE_FIELDS) public Set defaultResponseFields() { String[] responseFields = - StringUtils.commaDelimitedListToStringArray(env.getProperty("queryrest.default.response.fields")); + StringUtils.commaDelimitedListToStringArray(env.getProperty(QUERYREST_DEFAULT_RESPONSE_FIELDS)); // preserve order LinkedHashSet defaultResponseFields = new LinkedHashSet<>(responseFields.length); for (String field : responseFields) { try { defaultResponseFields.add(QueryField.valueOf(field)); } catch (Exception e) { - LOGGER.error("Field '{}' in queryrest.default.response.fields is invalid.", field, e); + LOGGER.error("Field '{}' in '{}' is invalid.", field, QUERYREST_DEFAULT_RESPONSE_FIELDS, e); } } return defaultResponseFields; } - - + + @Bean(name = BEAN_NAME_DEFAULT_RESPONSE_AGGREGATIONS) + public Set defaultResponseAggregations() { + String[] responseAggregations = + StringUtils.commaDelimitedListToStringArray(env.getProperty(QUERYREST_DEFAULT_RESPONSE_AGGREGATIONS)); + // preserve order + LinkedHashSet defaultResponseAggregations = new LinkedHashSet<>(responseAggregations.length); + for (String aggregation : responseAggregations) { + try { + defaultResponseAggregations.add(Aggregation.valueOf(aggregation)); + } catch (Exception e) { + LOGGER.error("Aggregation '{}' in '{}' is invalid.", aggregation, QUERYREST_DEFAULT_RESPONSE_AGGREGATIONS, + e); + } + } + + return defaultResponseAggregations; + } + // ========================================================================================== // TODO: This is simply for initial / rudimentary testing - remove once further evolved @Bean diff --git a/src/main/java/ch/psi/daq/queryrest/controller/QueryRestController.java b/src/main/java/ch/psi/daq/queryrest/controller/QueryRestController.java index 2e6da3f..b243427 100644 --- a/src/main/java/ch/psi/daq/queryrest/controller/QueryRestController.java +++ b/src/main/java/ch/psi/daq/queryrest/controller/QueryRestController.java @@ -3,8 +3,10 @@ package ch.psi.daq.queryrest.controller; import java.io.IOException; import java.util.Collection; import java.util.LinkedHashSet; +import java.util.LinkedList; import java.util.Map.Entry; import java.util.Set; +import java.util.function.Function; import java.util.stream.Stream; import javax.annotation.Resource; @@ -13,7 +15,6 @@ import javax.servlet.http.HttpServletResponse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.http.MediaType; -import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; @@ -22,7 +23,10 @@ import org.springframework.web.bind.annotation.RestController; import ch.psi.daq.cassandra.util.test.CassandraDataGen; import ch.psi.daq.domain.cassandra.DataEvent; +import ch.psi.daq.query.analyzer.QueryAnalyzer; import ch.psi.daq.query.model.AbstractQuery; +import ch.psi.daq.query.model.Aggregation; +import ch.psi.daq.query.model.Query; import ch.psi.daq.query.model.QueryField; import ch.psi.daq.query.processor.QueryProcessor; import ch.psi.daq.queryrest.config.QueryRestConfig; @@ -43,8 +47,14 @@ public class QueryRestController { @Resource private QueryProcessor queryProcessor; + @Resource + private Function queryAnalizerFactory; + @Resource(name = QueryRestConfig.BEAN_NAME_DEFAULT_RESPONSE_FIELDS) private Set defaultResponseFields; + + @Resource(name = QueryRestConfig.BEAN_NAME_DEFAULT_RESPONSE_AGGREGATIONS) + private Set defaultResponseAggregations; @RequestMapping( value = CHANNELS, @@ -61,9 +71,10 @@ public class QueryRestController { @RequestMapping( value = CHANNELS_REGEX, - method = RequestMethod.GET, + method = RequestMethod.POST, + consumes = {MediaType.APPLICATION_JSON_VALUE}, produces = {MediaType.APPLICATION_JSON_VALUE}) - public @ResponseBody Collection getChannels(@PathVariable String regex) throws Throwable { + public @ResponseBody Collection getChannels(@RequestBody String regex) throws Throwable { try { return queryProcessor.getChannels(regex); } catch (Throwable t) { @@ -74,31 +85,39 @@ public class QueryRestController { @RequestMapping( value = QUERY, - method = RequestMethod.GET, + method = RequestMethod.POST, consumes = {MediaType.APPLICATION_JSON_VALUE}, produces = {MediaType.APPLICATION_JSON_VALUE}) public void executeQuery(@RequestBody AbstractQuery query, HttpServletResponse res) throws IOException { try { LOGGER.debug("Execute query '{}'", query.getClass().getSimpleName()); - validateQuery(query); + QueryAnalyzer queryAnalizer = queryAnalizerFactory.apply(query); + queryAnalizer.validate(); + + extendQuery(query); // all the magic happens here - Stream>> process = queryProcessor.process(query); + Stream>> channelToDataEvents = queryProcessor.process(queryAnalizer); + + // do post-process + Stream> channelToData = queryAnalizer.postProcess(channelToDataEvents); // write the response back to the client using java 8 streams - responseStreamWriter.respond(process, query, res); + responseStreamWriter.respond(channelToData, query, res); } catch (Throwable t) { LOGGER.error("Failed execute query '{}'.", query, t); throw t; } } - // TODO: can we do this with built-in validators? - private void validateQuery(AbstractQuery query) { + private void extendQuery(AbstractQuery query) { if (query.getFields() == null || query.getFields().isEmpty()) { query.setFields(new LinkedHashSet<>(defaultResponseFields)); } + if(query.getAggregations() == null || query.getAggregations().isEmpty()){ + query.setAggregations(new LinkedList<>(defaultResponseAggregations)); + } } // ========================================================================================== diff --git a/src/main/java/ch/psi/daq/queryrest/response/JsonStreamSerializer.java b/src/main/java/ch/psi/daq/queryrest/response/JsonStreamSerializer.java new file mode 100644 index 0000000..6333d69 --- /dev/null +++ b/src/main/java/ch/psi/daq/queryrest/response/JsonStreamSerializer.java @@ -0,0 +1,24 @@ +package ch.psi.daq.queryrest.response; + +import java.io.IOException; +import java.util.Iterator; +import java.util.stream.Stream; + +import com.fasterxml.jackson.core.JsonGenerationException; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.SerializerProvider; +import com.fasterxml.jackson.databind.ser.std.StdSerializer; + +public class JsonStreamSerializer extends StdSerializer>{ + private static final long serialVersionUID = 4695859735299703478L; + + public JsonStreamSerializer() { + super(Stream.class, true); + } + + @Override + public void serialize(Stream stream, JsonGenerator jgen, SerializerProvider provider) throws IOException, + JsonGenerationException { + provider.findValueSerializer(Iterator.class, null).serialize(stream.iterator(), jgen, provider); + } +} diff --git a/src/main/java/ch/psi/daq/queryrest/response/ResponseStreamWriter.java b/src/main/java/ch/psi/daq/queryrest/response/ResponseStreamWriter.java index 1b1a9c7..8918b7b 100644 --- a/src/main/java/ch/psi/daq/queryrest/response/ResponseStreamWriter.java +++ b/src/main/java/ch/psi/daq/queryrest/response/ResponseStreamWriter.java @@ -21,9 +21,8 @@ import com.fasterxml.jackson.databind.ObjectWriter; import com.fasterxml.jackson.databind.ser.impl.SimpleBeanPropertyFilter; import com.fasterxml.jackson.databind.ser.impl.SimpleFilterProvider; -import ch.psi.daq.domain.cassandra.DataEvent; import ch.psi.daq.query.model.AbstractQuery; -import ch.psi.daq.query.model.AggregationEnum; +import ch.psi.daq.query.model.Aggregation; import ch.psi.daq.query.model.QueryField; /** @@ -44,16 +43,16 @@ public class ResponseStreamWriter { * Responding with the the contents of the stream by writing into the output stream of the * {@link ServletResponse}. * - * @param stream {@link Stream} instance of {@link DataEvent}s + * @param stream Mapping from channel name to data * @param query concrete instance of {@link AbstractQuery} * @param response {@link ServletResponse} instance given by the current HTTP request * @throws IOException thrown if writing to the output stream fails */ - public void respond(Stream>> stream, AbstractQuery query, + public void respond(Stream> stream, AbstractQuery query, ServletResponse response) throws IOException { Set queryFields = query.getFields(); - List aggregations = query.getAggregations(); + List aggregations = query.getAggregations(); Set includedFields = new LinkedHashSet(queryFields.size() + (aggregations != null ? aggregations.size() : 0)); @@ -62,7 +61,7 @@ public class ResponseStreamWriter { includedFields.add(field.name()); } if (aggregations != null) { - for (AggregationEnum aggregation : query.getAggregations()) { + for (Aggregation aggregation : query.getAggregations()) { includedFields.add(aggregation.name()); } } @@ -88,43 +87,33 @@ public class ResponseStreamWriter { ObjectWriter writer = mapper.writer(propertyFilter); return writer; } - + /** - * Writes the Java stream into the output stream. + * Writes the outer Java stream into the output stream. * - * @param stream {@link Stream} instance of {@link DataEvent}s + * @param stream Mapping from channel name to data * @param response {@link ServletResponse} instance given by the current HTTP request * @param writer configured writer that includes the fields the end user wants to see * @throws IOException thrown if writing to the output stream fails */ - private void respondInternal(Stream>> stream, ServletResponse response, + private void respondInternal(Stream> stream, ServletResponse response, ObjectWriter writer) throws IOException { JsonGenerator generator = jsonFactory.createGenerator(response.getOutputStream(), JsonEncoding.UTF8); generator.writeStartArray(); stream - /* ensure elements are sequentially written to the stream */ + /* ensure elements are sequentially written */ .sequential() .forEach( entry -> { try { generator.writeStartObject(); generator.writeStringField(QueryField.channel.name(), entry.getKey()); - generator.writeArrayFieldStart("data"); - entry.getValue() - /* ensure elements are sequentially written to the stream */ - .sequential() - .forEach( - dataEvent -> { - try { - writer.writeValue(generator, dataEvent); - } catch (Exception e) { - logger.error("Could not write event with pulse-id '{}' of channel '{}'", - dataEvent.getPulseId(), entry.getKey(), e); - } - }); - generator.writeEndArray(); + + generator.writeFieldName("data"); + writer.writeValue(generator, entry.getValue()); + generator.writeEndObject(); } catch (Exception e) { logger.error("Could not write channel name of channel '{}'", entry.getKey(), e); diff --git a/src/main/resources/queryrest.properties b/src/main/resources/queryrest.properties index 2e55ecf..1050546 100644 --- a/src/main/resources/queryrest.properties +++ b/src/main/resources/queryrest.properties @@ -3,4 +3,5 @@ server.port=8080 # defines the fields that are included in the response # if no fields have been specified by the user -queryrest.default.response.fields=channel,pulseId,globalMillis,globalNanos,value \ No newline at end of file +queryrest.default.response.fields=channel,pulseId,globalMillis,globalNanos,value +queryrest.default.response.aggregations=min,max,sum \ No newline at end of file diff --git a/src/test/java/ch/psi/daq/test/queryrest/DaqWebMvcConfig.java b/src/test/java/ch/psi/daq/test/queryrest/DaqWebMvcConfig.java index 2b463fd..d6112b7 100644 --- a/src/test/java/ch/psi/daq/test/queryrest/DaqWebMvcConfig.java +++ b/src/test/java/ch/psi/daq/test/queryrest/DaqWebMvcConfig.java @@ -8,9 +8,11 @@ import org.springframework.context.annotation.PropertySources; import org.springframework.web.servlet.config.annotation.EnableWebMvc; import org.springframework.web.servlet.config.annotation.WebMvcConfigurationSupport; +import ch.psi.daq.cassandra.reader.DataReader; import ch.psi.daq.query.processor.QueryProcessor; +import ch.psi.daq.query.processor.cassandra.CassandraQueryProcessorLocal; import ch.psi.daq.test.query.config.LocalQueryTestConfig; -import ch.psi.daq.test.queryrest.query.DummyQueryProcessor; +import ch.psi.daq.test.queryrest.query.DummyDataReader; @Configuration @PropertySources(value = { @@ -27,6 +29,11 @@ public class DaqWebMvcConfig extends WebMvcConfigurationSupport { @Bean public QueryProcessor queryProcessor() { - return new DummyQueryProcessor(); + return new CassandraQueryProcessorLocal(); + } + + @Bean + public DataReader dataReader() { + return new DummyDataReader(); } } \ No newline at end of file diff --git a/src/test/java/ch/psi/daq/test/queryrest/controller/DaqRestControllerTest.java b/src/test/java/ch/psi/daq/test/queryrest/controller/DaqRestControllerTest.java index 4dcb7c9..ac061eb 100644 --- a/src/test/java/ch/psi/daq/test/queryrest/controller/DaqRestControllerTest.java +++ b/src/test/java/ch/psi/daq/test/queryrest/controller/DaqRestControllerTest.java @@ -1,33 +1,64 @@ package ch.psi.daq.test.queryrest.controller; +import javax.annotation.Resource; + +import org.junit.After; +import org.junit.Before; import org.junit.Test; import org.springframework.http.MediaType; import org.springframework.test.web.servlet.request.MockMvcRequestBuilders; import org.springframework.test.web.servlet.result.MockMvcResultHandlers; import org.springframework.test.web.servlet.result.MockMvcResultMatchers; +import ch.psi.daq.cassandra.util.test.CassandraDataGen; +import ch.psi.daq.common.ordering.Ordering; +import ch.psi.daq.query.model.AggregationType; import ch.psi.daq.query.model.PulseRangeQuery; import ch.psi.daq.query.model.TimeRangeQuery; import ch.psi.daq.query.model.TimeRangeQueryDate; +import ch.psi.daq.queryrest.controller.QueryRestController; +import ch.psi.daq.test.cassandra.admin.CassandraAdmin; import ch.psi.daq.test.queryrest.AbstractDaqRestTest; -import ch.psi.daq.test.queryrest.query.DummyQueryProcessor; /** * Tests the {@link DaqController} implementation. */ public class DaqRestControllerTest extends AbstractDaqRestTest { + + @Resource + private CassandraAdmin cassandraAdmin; + + @Resource + private CassandraDataGen dataGen; + + private static final boolean initialized = false; + + private static final int DATA_REPLICATION = 1; + public static final String[] TEST_CHANNEL_NAMES = new String[]{"testChannel1", "testChannel2"}; + + @Before + public void setUp() throws Exception { + if (!initialized) { + cassandraAdmin.truncateAll(); + + dataGen.writeData(DATA_REPLICATION, 100, 2, TEST_CHANNEL_NAMES); + } + } + + @After + public void tearDown() throws Exception {} @Test public void testPulseRangeQuery() throws Exception { PulseRangeQuery request = new PulseRangeQuery( 100, 101, - DummyQueryProcessor.TEST_CHANNEL_NAMES); + TEST_CHANNEL_NAMES); String content = mapper.writeValueAsString(request); this.mockMvc - .perform(MockMvcRequestBuilders.post("/query") + .perform(MockMvcRequestBuilders.post(QueryRestController.QUERY) .contentType(MediaType.APPLICATION_JSON) .content(content)) @@ -51,12 +82,12 @@ public class DaqRestControllerTest extends AbstractDaqRestTest { TimeRangeQuery request = new TimeRangeQuery( 100, 101, - DummyQueryProcessor.TEST_CHANNEL_NAMES); + TEST_CHANNEL_NAMES); String content = mapper.writeValueAsString(request); this.mockMvc - .perform(MockMvcRequestBuilders.post("/query") + .perform(MockMvcRequestBuilders.post(QueryRestController.QUERY) .contentType(MediaType.APPLICATION_JSON) .content(content)) @@ -76,18 +107,18 @@ public class DaqRestControllerTest extends AbstractDaqRestTest { } @Test - public void testMapper() throws Exception { + public void testDateRangeQuery() throws Exception { String startDate = TimeRangeQueryDate.format(100); String endDate = TimeRangeQueryDate.format(101); TimeRangeQueryDate request = new TimeRangeQueryDate( startDate, endDate, - DummyQueryProcessor.TEST_CHANNEL_NAMES); + TEST_CHANNEL_NAMES); String content = mapper.writeValueAsString(request); this.mockMvc - .perform(MockMvcRequestBuilders.post("/query") + .perform(MockMvcRequestBuilders.post(QueryRestController.QUERY) .contentType(MediaType.APPLICATION_JSON) .content(content)) @@ -105,4 +136,61 @@ public class DaqRestControllerTest extends AbstractDaqRestTest { .andExpect(MockMvcResultMatchers.jsonPath("$[1].data[0].pulseId").value(100)) .andExpect(MockMvcResultMatchers.jsonPath("$[1].data[1].pulseId").value(101)); } + + @Test + public void testExtremaAggregation() throws Exception { + PulseRangeQuery request = new PulseRangeQuery( + 100, + 101, + false, + Ordering.asc, + AggregationType.extrema, + TEST_CHANNEL_NAMES[0]); + + String content = mapper.writeValueAsString(request); + + this.mockMvc + .perform(MockMvcRequestBuilders.post(QueryRestController.QUERY) + .contentType(MediaType.APPLICATION_JSON) + .content(content)) + + .andDo(MockMvcResultHandlers.print()) + .andExpect(MockMvcResultMatchers.status().isOk()) + + .andExpect(MockMvcResultMatchers.jsonPath("$").exists()) + .andExpect(MockMvcResultMatchers.jsonPath("$.minima").exists()) + .andExpect(MockMvcResultMatchers.jsonPath("$.minima.min").exists()) + .andExpect(MockMvcResultMatchers.jsonPath("$.minima.min.value").exists()) + .andExpect(MockMvcResultMatchers.jsonPath("$.minima.min.value").value(100)) + .andExpect(MockMvcResultMatchers.jsonPath("$.minima.min.event").exists()) + .andExpect(MockMvcResultMatchers.jsonPath("$.minima.min.event.channel").exists()) + .andExpect(MockMvcResultMatchers.jsonPath("$.minima.min.event.channel").value(TEST_CHANNEL_NAMES[0])) + .andExpect(MockMvcResultMatchers.jsonPath("$.minima.min.event.pulseId").exists()) + .andExpect(MockMvcResultMatchers.jsonPath("$.minima.min.event.pulseId").value(100)) + .andExpect(MockMvcResultMatchers.jsonPath("$.maxima").exists()) + .andExpect(MockMvcResultMatchers.jsonPath("$.minima.max").exists()) + .andExpect(MockMvcResultMatchers.jsonPath("$.minima.max.value").exists()) + .andExpect(MockMvcResultMatchers.jsonPath("$.minima.max.value").value(101)) + .andExpect(MockMvcResultMatchers.jsonPath("$.minima.max.event").exists()) + .andExpect(MockMvcResultMatchers.jsonPath("$.minima.max.event.channel").exists()) + .andExpect(MockMvcResultMatchers.jsonPath("$.minima.max.event.channel").value(TEST_CHANNEL_NAMES[0])) + .andExpect(MockMvcResultMatchers.jsonPath("$.minima.max.event.pulseId").exists()) + .andExpect(MockMvcResultMatchers.jsonPath("$.minima.max.event.pulseId").value(101)); + } + + @Test + public void testChannelNameQuery() throws Exception { + this.mockMvc + .perform(MockMvcRequestBuilders.get(QueryRestController.CHANNELS) + .contentType(MediaType.APPLICATION_JSON)) + + .andDo(MockMvcResultHandlers.print()) + .andExpect(MockMvcResultMatchers.status().isOk()) + .andExpect(MockMvcResultMatchers.jsonPath("$").isArray()) + .andExpect(MockMvcResultMatchers.jsonPath("$[0]").exists()) + .andExpect(MockMvcResultMatchers.jsonPath("$[0]").value("testChannel1")) + .andExpect(MockMvcResultMatchers.jsonPath("$[1]").exists()) + .andExpect(MockMvcResultMatchers.jsonPath("$[1]").value("testChannel2")) + .andExpect(MockMvcResultMatchers.jsonPath("$[2]").doesNotExist()); + } } diff --git a/src/test/java/ch/psi/daq/test/queryrest/query/DummyDataReader.java b/src/test/java/ch/psi/daq/test/queryrest/query/DummyDataReader.java new file mode 100644 index 0000000..d54bd5b --- /dev/null +++ b/src/test/java/ch/psi/daq/test/queryrest/query/DummyDataReader.java @@ -0,0 +1,58 @@ +package ch.psi.daq.test.queryrest.query; + +import java.util.List; +import java.util.UUID; +import java.util.regex.Pattern; +import java.util.stream.LongStream; +import java.util.stream.Stream; + +import com.google.common.collect.Lists; + +import ch.psi.daq.cassandra.reader.DataReader; +import ch.psi.daq.common.ordering.Ordering; +import ch.psi.daq.domain.cassandra.ChannelEvent; +import ch.psi.daq.domain.cassandra.DataEvent; + +public class DummyDataReader implements DataReader { + + public static final String TEST_CHANNEL_1 = "testChannel1"; + public static final String TEST_CHANNEL_2 = "testChannel2"; + public static final List TEST_CHANNEL_NAMES = Lists.newArrayList(TEST_CHANNEL_1, TEST_CHANNEL_2); + + @Override + public Stream getChannelStream(String regex) { + Stream channelStream = TEST_CHANNEL_NAMES.stream(); + + if (regex != null) { + Pattern pattern = Pattern.compile(regex); + channelStream = channelStream.filter(channel -> pattern.matcher(channel).find()); + } + + return channelStream; + } + + @Override + public Stream getEventStream(String channel, long startPulseId, long endPulseId, + Ordering ordering, String... columns) { + return getElements(channel, startPulseId, endPulseId); + } + + @Override + public Stream getEventStream(String channel, long startMillis, long startNanos, long endMillis, + long endNanos, Ordering ordering, String... columns) { + return getElements(channel, startMillis, endMillis); + } + + protected Stream getElements(String channel, long start, long end) { + return LongStream.rangeClosed(start, end).mapToObj(i -> { + return new ChannelEvent( + channel, + i, + 0, + i, + i, + 0, + "data_" + UUID.randomUUID().toString()); + }); + } +} diff --git a/src/test/java/ch/psi/daq/test/queryrest/query/DummyQueryProcessor.java b/src/test/java/ch/psi/daq/test/queryrest/query/DummyQueryProcessor.java deleted file mode 100644 index 88a9a5a..0000000 --- a/src/test/java/ch/psi/daq/test/queryrest/query/DummyQueryProcessor.java +++ /dev/null @@ -1,78 +0,0 @@ -/** - * - */ -package ch.psi.daq.test.queryrest.query; - -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.UUID; -import java.util.regex.Pattern; -import java.util.stream.Collectors; -import java.util.stream.IntStream; -import java.util.stream.Stream; - -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; - -import ch.psi.daq.domain.cassandra.ChannelEvent; -import ch.psi.daq.domain.cassandra.DataEvent; -import ch.psi.daq.query.model.Query; -import ch.psi.daq.query.processor.QueryProcessor; - -/** - * @author zellweger_c - * - */ -public class DummyQueryProcessor implements QueryProcessor { - - public static final List TEST_CHANNEL_NAMES = Lists.newArrayList("testChannel1", "testChannel2"); - - @Override - public Collection getChannels() { - return TEST_CHANNEL_NAMES; - } - - @Override - public Collection getChannels(String regex) { - if (regex != null) { - Pattern pattern = Pattern.compile(regex); - return TEST_CHANNEL_NAMES.stream().filter(channel -> pattern.matcher(channel).find()) - .collect(Collectors.toList()); - } else { - return getChannels(); - } - } - - /** - * {@inheritDoc} - */ - @Override - public Stream>> process(Query query) { - - int nrOfEntries = 2; - int startingPulseId = 100; - - Map> result = Maps.newHashMap(); - - TEST_CHANNEL_NAMES.forEach(chName -> { - - List entities = IntStream.rangeClosed(1, nrOfEntries).boxed().map(i -> { - long pulseId = startingPulseId + i - 1; - return new ChannelEvent( - chName, - pulseId, - 0, - pulseId, - pulseId, - 0, - "data_" + UUID.randomUUID().toString()); - }).collect(Collectors.toList()); - - result.put(chName, entities.stream()); - }); - - return result.entrySet().stream(); - } -}