ATEST-79:

- implementing first draft of timerange and pulserange queries
- adding tests, again based on an embedded local cassandra instance
This commit is contained in:
Zellweger Christof Ralf
2015-06-19 15:14:54 +02:00
parent aaaf5a72fe
commit c1098a398f
17 changed files with 589 additions and 167 deletions

View File

@ -1,10 +1,45 @@
version = '1.0.0'
buildscript {
dependencies {
classpath("org.springframework.boot:spring-boot-gradle-plugin:1.2.4.RELEASE")
}
repositories {
jcenter()
}
}
apply plugin: 'spring-boot'
repositories {
jcenter()
}
springBoot {
// when using spring loaded turn on noverify
noverify = true
}
applicationDefaultJvmArgs = [
"-Dfile.encoding=UTF-8",
// if you need to debug java agents:
"-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5006"
]
//configurations {
// compile.exclude group: "com.fasterxml.jackson.core"
//}
dependencies {
compile project(':ch.psi.daq.cassandra')
compile project(':ch.psi.daq.hazelcast.common')
compile (project(':ch.psi.daq.cassandra'))
compile (project(':ch.psi.daq.hazelcast.common'))
compile 'org.springframework.boot:spring-boot-starter-web:1.2.4.RELEASE'
compile 'com.google.code.gson:gson:2+'
compile 'org.apache.commons:commons-lang3:3.4'
testCompile 'org.springframework.boot:spring-boot-starter-test:1.2.4.RELEASE'
testCompile 'org.skyscreamer:jsonassert:1.2.3'
testCompile 'com.jayway.jsonpath:json-path:2.0.0'
}
uploadArchives {

View File

@ -1,42 +0,0 @@
package ch.psi.daq.rest;
import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import ch.psi.daq.cassandra.writer.CassandraWriter;
import ch.psi.daq.domain.cassandra.ChannelEvent;
import ch.psi.daq.rest.queries.PulseRangeQuery;
@RestController
public class DaqController {
private static final Logger logger = LoggerFactory.getLogger(DaqController.class);
@Autowired
private CassandraWriter writer;
@RequestMapping(value = "/test")
public void queryIndices() {
logger.info("TEST endpoint invoked");
long pulseId = System.currentTimeMillis();
String value = "data_" + UUID.randomUUID().toString();
writer.writeAsync(1, 0, new ChannelEvent("dummyChannel", pulseId, 0, pulseId, pulseId, 0, value));
}
@RequestMapping(value = "/pulserange")
public void pulseRange(@RequestBody PulseRangeQuery query) {
logger.info("PulseRangeQuery received: {}", query);
}
}

View File

@ -5,9 +5,6 @@ import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.boot.context.web.SpringBootServletInitializer;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Import;
import ch.psi.daq.cassandra.config.CassandraConfig;
/**
* Entry point to our rest-frontend of the Swissfel application which most importantly wires all the @RestController
@ -30,7 +27,7 @@ import ch.psi.daq.cassandra.config.CassandraConfig;
* beans and inject them accordingly.
*/
@SpringBootApplication
//@Import(CassandraConfig.class) // either define the context to be imported, or see ComponentScan
// @Import(CassandraConfig.class) // either define the context to be imported, or see ComponentScan
// comment below
@ComponentScan(basePackages = {
"ch.psi.daq.rest",
@ -51,11 +48,4 @@ public class DaqRestApplication extends SpringBootServletInitializer {
return application.sources(DaqRestApplication.class);
}
// a nested configuration
// this guarantees that the ordering of the properties file is as expected
// see:
// https://jira.spring.io/browse/SPR-10409?focusedCommentId=101393&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-101393
// @Configuration
// @Import(CassandraConfig.class)
// static class InnerConfiguration { }
}

View File

@ -0,0 +1,53 @@
package ch.psi.daq.rest;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import ch.psi.daq.common.statistic.StorelessStatistics;
import ch.psi.daq.domain.cassandra.ChannelEvent;
import ch.psi.daq.hazelcast.common.query.processor.QueryProcessor;
import ch.psi.daq.rest.model.PropertyFilterMixin;
import ch.psi.daq.test.rest.query.DummyQueryProcessor;
import com.fasterxml.jackson.annotation.JsonInclude.Include;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
@Configuration
public class DaqRestConfiguration {
@Bean
public QueryProcessor queryProcessor() {
return new DummyQueryProcessor();
}
@Bean
public JsonFactory jsonFactory() {
return new JsonFactory();
}
@Bean
public ResponseStreamWriter responseStreamWriter() {
return new ResponseStreamWriter();
}
@Bean
public ObjectMapper objectMapper() {
ObjectMapper mapper = new ObjectMapper();
// only include non-null values
mapper.setSerializationInclusion(Include.NON_NULL);
// Mixin which is used dynamically to filter out which properties get serialised and which
// won't. This way, the user can specify which columns are to be received.
mapper.addMixIn(ChannelEvent.class, PropertyFilterMixin.class);
mapper.addMixIn(StorelessStatistics.class, PropertyFilterMixin.class);
return mapper;
}
// a nested configuration
// this guarantees that the ordering of the properties file is as expected
// see:
// https://jira.spring.io/browse/SPR-10409?focusedCommentId=101393&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-101393
// @Configuration
// @Import(CassandraConfig.class)
// static class InnerConfiguration { }
}

View File

@ -0,0 +1,80 @@
package ch.psi.daq.rest;
import java.io.IOException;
import java.util.Map;
import java.util.stream.Stream;
import javax.servlet.http.HttpServletResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import ch.psi.daq.domain.cassandra.DataEvent;
import ch.psi.daq.hazelcast.common.query.processor.QueryProcessor;
import ch.psi.daq.rest.queries.AbstractQuery;
import ch.psi.daq.rest.queries.PulseRangeQuery;
import ch.psi.daq.rest.queries.TimeRangeQuery;
@RestController
public class DaqRestController {
private static final Logger logger = LoggerFactory.getLogger(DaqRestController.class);
@Autowired
private ResponseStreamWriter responseStreamWriter;
// TODO: just a dummy test implementation - remove when the real processor is ready
@Autowired
private QueryProcessor queryProcessor;
/**
*
* @param query
* @param res
* @throws IOException
*/
@RequestMapping(value = "/pulserange")
public void pulseRange(@RequestBody PulseRangeQuery query, HttpServletResponse res) throws IOException {
logger.debug("PulseRangeQuery received: {}", query);
executeQuery(query, res);
}
/**
*
* @param query
* @param res
* @throws IOException
*/
@RequestMapping(value = "/timerange")
public void pulseRange(@RequestBody TimeRangeQuery query, HttpServletResponse res) throws IOException {
logger.debug("TimeRangeQuery received: {}", query);
executeQuery(query, res);
}
/**
*
* @param query
* @param res
* @throws IOException
*/
private void executeQuery(AbstractQuery query, HttpServletResponse res) throws IOException {
// all the magic happens here
Map<String, Stream<? extends DataEvent>> process = queryProcessor.process(query);
Stream<DataEvent> flatStreams = process.values().stream().flatMap(s -> {
return s;
});
// write the response back to the client using java 8 streams
responseStreamWriter.respond(flatStreams, query, res);
}
}

View File

@ -0,0 +1,112 @@
/**
*
*/
package ch.psi.daq.rest;
import java.io.IOException;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.servlet.ServletResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import ch.psi.daq.domain.cassandra.DataEvent;
import ch.psi.daq.rest.queries.AbstractQuery;
import com.fasterxml.jackson.core.JsonEncoding;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
import com.fasterxml.jackson.databind.ser.impl.SimpleBeanPropertyFilter;
import com.fasterxml.jackson.databind.ser.impl.SimpleFilterProvider;
/**
* Takes a Java 8 stream and writes it to the output stream provided by the {@link ServletResponse}
* of the current request.
*/
public class ResponseStreamWriter {
private static final Logger logger = LoggerFactory.getLogger(ResponseStreamWriter.class);
@Autowired
private JsonFactory jsonFactory;
@Autowired
private ObjectMapper mapper;
/**
* Responding with the the contents of the stream by writing into the output stream of the
* {@link ServletResponse}.
*
* @param stream {@link Stream} instance of {@link DataEvent}s
* @param query concrete instance of {@link AbstractQuery}
* @param response {@link ServletResponse} instance given by the current HTTP request
* @throws IOException thrown if writing to the output stream fails
*/
public void respond(Stream<DataEvent> stream, AbstractQuery query, ServletResponse response) throws IOException {
Set<String> includedFields = query.getFields();
if (query.getAggregations() != null) {
includedFields.addAll(query.getAggregations()
.stream()
.map(a -> {
return a.getType().toString();
})
.collect(Collectors.toSet()));
}
ObjectWriter writer = configureWriter(includedFields);
respondInternal(stream, response, writer);
}
/**
* Configures the writer dynamically by including the fields which should be included in the
* response.
*
* @param includedFields set of strings which correspond to the getter method names of the
* classes registered as a mixed-in
* @return the configured writer that includes the specified fields
*/
private ObjectWriter configureWriter(Set<String> includedFields) {
SimpleFilterProvider propertyFilter = new SimpleFilterProvider();
propertyFilter.addFilter("namedPropertyFilter", SimpleBeanPropertyFilter.filterOutAllExcept(includedFields));
// only write the properties not excluded in the filter
ObjectWriter writer = mapper.writer(propertyFilter);
return writer;
}
/**
* Writes the Java stream into the output stream.
*
* @param stream {@link Stream} instance of {@link DataEvent}s
* @param response {@link ServletResponse} instance given by the current HTTP request
* @param writer configured writer that includes the fields the end user wants to see
* @throws IOException thrown if writing to the output stream fails
*/
private void respondInternal(Stream<DataEvent> stream, ServletResponse response, ObjectWriter writer)
throws IOException {
JsonGenerator generator = jsonFactory.createGenerator(response.getOutputStream(), JsonEncoding.UTF8);
generator.writeStartArray();
stream.forEach(ds -> {
try {
logger.trace("Writing value for: {}", ds);
// use the writer created just before
writer.writeValue(generator, ds);
} catch (Exception e) {
logger.error("", e);
}
});
generator.writeEndArray();
generator.flush();
generator.close();
}
}

View File

@ -1,40 +0,0 @@
package ch.psi.daq.rest.model;
import org.apache.commons.lang3.builder.ToStringBuilder;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
public class Aggregation {
private String fieldRef;
private AggregationEnum type;
private String resultFieldName;
@JsonCreator
public Aggregation(
@JsonProperty(value = "fieldRef") String fieldRef,
@JsonProperty(value = "type") AggregationEnum type,
@JsonProperty(value = "resultFieldName") String resultFieldName) {
this.fieldRef = fieldRef;
this.type = type;
this.resultFieldName = resultFieldName;
}
public String getFieldRef() {
return fieldRef;
}
public AggregationEnum getType() {
return type;
}
public String getResultFieldName() {
return resultFieldName;
}
public String toString() {
return ToStringBuilder.reflectionToString(this);
}
}

View File

@ -1,30 +0,0 @@
package ch.psi.daq.rest.model;
public enum AggregationEnum {
/* mean value of values (of an array) */
mean,
/* min value of values (of an array) */
min,
/* max value of values (of an array) */
max,
/* sum of values (of an array) */
sum,
/* number of values (of an array) aggregated into a value */
count,
/* variance of values (of an array) */
variance,
/* standard deviation */
std_dev,
/* skewness */
skewness,
/* kurtosis */
kurtosis
}

View File

@ -0,0 +1,10 @@
package ch.psi.daq.rest.model;
import com.fasterxml.jackson.annotation.JsonFilter;
/**
*
*/
@JsonFilter("namedPropertyFilter")
public class PropertyFilterMixin {
}

View File

@ -5,19 +5,19 @@ import java.util.LinkedHashSet;
import java.util.List;
import org.apache.commons.lang3.builder.ReflectionToStringBuilder;
import org.springframework.util.Assert;
import ch.psi.daq.cassandra.reader.Ordering;
import ch.psi.daq.hazelcast.common.query.Aggregation;
import ch.psi.daq.hazelcast.common.query.Query;
import ch.psi.daq.hazelcast.common.query.ValueAggregation;
import ch.psi.daq.hazelcast.common.query.bin.BinIntervalCalculator;
import ch.psi.daq.rest.model.Aggregation;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonSubTypes.Type;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.sun.istack.internal.NotNull;
/**
*
@ -35,22 +35,18 @@ import com.sun.istack.internal.NotNull;
})
public abstract class AbstractQuery implements Query {
@NotNull
private boolean isOrderDescending;
@NotNull
private List<String> channels;
@NotNull
private LinkedHashSet<String> fields;
@NotNull
private List<Aggregation> aggregations;
private ValueAggregation valueAggregation;
private boolean aggregateChannels;
private Ordering ordering;
/**
*
* @param isOrderDescending whether to add a 'orderBy' clause into the database query
@ -62,20 +58,22 @@ public abstract class AbstractQuery implements Query {
public AbstractQuery(
// note that those annotations are needed for the polymorphic
// mapping to work correctly
@JsonProperty(value = "isOrderDescending") boolean isOrderDescending,
@JsonProperty(value = "ordering") Ordering ordering,
@JsonProperty(value = "channels") List<String> channelIds,
@JsonProperty(value = "fields") LinkedHashSet<String> fields,
@JsonProperty(value = "aggregateChannels") boolean aggregateChannels,
@JsonProperty(value = "aggregationType") ValueAggregation valueAggregation,
@JsonProperty(value = "aggregations") List<Aggregation> aggregations) {
this.isOrderDescending = isOrderDescending;
this.ordering = ordering;
this.aggregateChannels = aggregateChannels;
this.valueAggregation = valueAggregation;
if (channelIds == null || fields == null) {
throw new IllegalArgumentException("sourceIds and/or fields cannot be null.");
}
Assert.notNull(channelIds, "channel name must not be null.");
Assert.notNull(fields, "field, i.e. property, names must not be null.");
this.channels = channelIds;
this.fields = fields;
@ -95,10 +93,7 @@ public abstract class AbstractQuery implements Query {
*/
@Override
public Ordering getOrdering() {
if (isOrderDescending) {
return Ordering.DESC;
}
return Ordering.ASC;
return ordering;
}
/**
@ -123,14 +118,10 @@ public abstract class AbstractQuery implements Query {
return null;
}
public boolean isAggregateChannels() {
return aggregateChannels;
}
public boolean isOrderDescending() {
return isOrderDescending;
}
public LinkedHashSet<String> getFields() {
return fields;

View File

@ -5,8 +5,9 @@ import java.util.List;
import org.apache.commons.lang.builder.ToStringBuilder;
import ch.psi.daq.cassandra.reader.Ordering;
import ch.psi.daq.hazelcast.common.query.Aggregation;
import ch.psi.daq.hazelcast.common.query.ValueAggregation;
import ch.psi.daq.rest.model.Aggregation;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
@ -23,7 +24,7 @@ public class PulseRangeQuery extends AbstractQuery {
public PulseRangeQuery(
// note that those annotations are needed for the polymorphic
// mapping to work correctly
@JsonProperty(value = "isOrderDescending") boolean isOrderDescending,
@JsonProperty(value = "ordering") Ordering ordering,
@JsonProperty(value = "channels") List<String> channels,
@JsonProperty(value = "fields") LinkedHashSet<String> fields,
@JsonProperty(value = "aggregateChannels") boolean aggregateChannels,
@ -33,7 +34,7 @@ public class PulseRangeQuery extends AbstractQuery {
@JsonProperty(value = "endPulseId") long endPulseId)
{
super(isOrderDescending, channels, fields, aggregateChannels, valueAggregation, aggregations);
super(ordering, channels, fields, aggregateChannels, valueAggregation, aggregations);
this.startPulseId = startPulseId;
this.endPulseId = endPulseId;

View File

@ -6,7 +6,6 @@ import java.util.Date;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;
import javax.validation.constraints.NotNull;
@ -14,10 +13,10 @@ import org.apache.commons.lang.builder.ToStringBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import ch.psi.daq.domain.cassandra.FieldName;
import ch.psi.daq.cassandra.reader.Ordering;
import ch.psi.daq.domain.utils.UUIDGen;
import ch.psi.daq.hazelcast.common.query.Aggregation;
import ch.psi.daq.hazelcast.common.query.ValueAggregation;
import ch.psi.daq.rest.model.Aggregation;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
@ -45,14 +44,10 @@ public class TimeRangeQuery extends AbstractQuery {
@JsonIgnore
private UUID endTime;
@JsonIgnore
private String[] columns;
/**
*
* @param isOrderDescending
* @param ordering
* @param channelIds all the sourceIds (channel names) we want to query
* @param fields the fields (who map to fields in the DB) we are interested in returning to the
* client, needs to be in insertion order (hence the {@link LinkedHashSet} type)
@ -65,7 +60,7 @@ public class TimeRangeQuery extends AbstractQuery {
public TimeRangeQuery(
// note that those annotations are needed for the polymorphic
// mapping to work correctly
@JsonProperty(value = "isOrderDescending") boolean isOrderDescending,
@JsonProperty(value = "ordering") Ordering ordering,
@JsonProperty(value = "channels") List<String> channels,
@JsonProperty(value = "fields") LinkedHashSet<String> fields,
@JsonProperty(value = "aggregateChannels") boolean aggregateChannels,
@ -78,7 +73,7 @@ public class TimeRangeQuery extends AbstractQuery {
@JsonProperty(value = "endNanoOffset") long endNanoOffset,
@JsonProperty(value = "endDateTime") String endDateTime) {
super(isOrderDescending, channels, fields, aggregateChannels, valueAggregation, aggregations);
super(ordering, channels, fields, aggregateChannels, valueAggregation, aggregations);
this.start = startMillis;
this.startNanoOffset = startNanoOffset;
@ -103,13 +98,6 @@ public class TimeRangeQuery extends AbstractQuery {
this.startTime = UUIDGen.getMinTime(this.start, this.startNanoOffset);
this.endTime = UUIDGen.getMaxTime(this.end, this.endNanoOffset);
/**
* This will map to the columns in the database
*/
this.columns = fields.stream()
.map(f -> FieldName.valueOf(f))
.collect(Collectors.toList())
.toArray(new String[] {});
}
public long getStart() {
@ -152,10 +140,6 @@ public class TimeRangeQuery extends AbstractQuery {
return endTime;
}
public String[] getColumns() {
return columns;
}
/**
* {@inheritDoc}
*/

View File

@ -0,0 +1,51 @@
package ch.psi.daq.test.rest;
import org.cassandraunit.spring.EmbeddedCassandra;
import org.junit.Before;
import org.junit.runner.RunWith;
import org.mockito.MockitoAnnotations;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.SpringApplicationConfiguration;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.annotation.DirtiesContext.ClassMode;
import org.springframework.test.context.TestExecutionListeners;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.test.context.support.DependencyInjectionTestExecutionListener;
import org.springframework.test.context.web.WebAppConfiguration;
import org.springframework.test.web.servlet.MockMvc;
import org.springframework.test.web.servlet.setup.MockMvcBuilders;
import org.springframework.web.context.WebApplicationContext;
import ch.psi.daq.rest.DaqRestApplication;
import ch.psi.daq.test.cassandra.CassandraDaqUnitDependencyInjectionTestExecutionListener;
import com.fasterxml.jackson.databind.ObjectMapper;
@SpringApplicationConfiguration(classes = {DaqRestApplication.class, DaqRestApplicationConfiguration.class})
@TestExecutionListeners({
CassandraDaqUnitDependencyInjectionTestExecutionListener.class,
DependencyInjectionTestExecutionListener.class})
@EmbeddedCassandra
@DirtiesContext(classMode = ClassMode.AFTER_EACH_TEST_METHOD)
@WebAppConfiguration
@RunWith(SpringJUnit4ClassRunner.class)
public abstract class AbstractDaqRestTest {
@Autowired
protected WebApplicationContext webApplicationContext;
protected MockMvc mockMvc;
protected ObjectMapper mapper = new ObjectMapper();
@Before
public void setup() {
// Process mock annotations
MockitoAnnotations.initMocks(this);
// Setup Spring test in webapp-mode (same config as spring-boot)
this.mockMvc = MockMvcBuilders.webAppContextSetup(webApplicationContext).build();
}
}

View File

@ -0,0 +1,23 @@
package ch.psi.daq.test.rest;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.web.servlet.config.annotation.EnableWebMvc;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurationSupport;
import ch.psi.daq.test.cassandra.LocalCassandraTestConfig;
@Configuration
//@ComponentScan(basePackages = {
// "ch.psi.daq.rest",
// "ch.psi.daq.cassandra.config", // define the package name with the CassandraConfig
// // configuration, or @Import it (see above)
// "ch.psi.daq.cassandra.reader",
// "ch.psi.daq.cassandra.writer"
//})
@Import(value = {LocalCassandraTestConfig.class})
@EnableWebMvc
public class DaqRestApplicationConfiguration extends WebMvcConfigurationSupport {
// add test-specific beans and configurations here
}

View File

@ -0,0 +1,91 @@
package ch.psi.daq.test.rest.controller;
import java.util.Date;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.junit.Test;
import org.springframework.http.MediaType;
import org.springframework.test.web.servlet.request.MockMvcRequestBuilders;
import org.springframework.test.web.servlet.result.MockMvcResultHandlers;
import org.springframework.test.web.servlet.result.MockMvcResultMatchers;
import ch.psi.daq.cassandra.reader.Ordering;
import ch.psi.daq.hazelcast.common.query.ValueAggregation;
import ch.psi.daq.rest.queries.PulseRangeQuery;
import ch.psi.daq.rest.queries.TimeRangeQuery;
import ch.psi.daq.test.rest.AbstractDaqRestTest;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
/**
* Tests the {@link DaqController} implementation.
*/
public class DaqControllerTest extends AbstractDaqRestTest {
private static final List<String> DEFAULT_PROPERTIES = Lists.newArrayList("channel", "pulseId");
@Test
public void testPulseRangeQuery() throws Exception {
PulseRangeQuery request = new PulseRangeQuery(
Ordering.DESC,
Lists.newArrayList(), // DummyQueryProcessor simply returns a fixed list
Sets.newLinkedHashSet(DEFAULT_PROPERTIES),
false,
ValueAggregation.index,
null,
100l,
101l
);
String content = mapper.writeValueAsString(request);
this.mockMvc
.perform(MockMvcRequestBuilders.post("/pulserange")
.contentType(MediaType.APPLICATION_JSON)
.content(content))
.andDo(MockMvcResultHandlers.print())
.andExpect(MockMvcResultMatchers.status().isOk())
.andExpect(MockMvcResultMatchers.jsonPath("$").isArray())
.andExpect(MockMvcResultMatchers.jsonPath("$[0]").exists())
.andExpect(MockMvcResultMatchers.jsonPath("$[0].pulseId").value(100))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].channel").value("testChannel1"));
}
@Test
public void testTimeRangeQuery() throws Exception {
long startTime = new Date().getTime();
long endTime = startTime + TimeUnit.SECONDS.toMillis(1);
TimeRangeQuery request = new TimeRangeQuery(
Ordering.ASC,
Lists.newArrayList(), // DummyQueryProcessor simply returns a fixed list
Sets.newLinkedHashSet(DEFAULT_PROPERTIES),
false,
ValueAggregation.index,
null, // aggregations
startTime, // startMillis
0,
null,
endTime,
0,
null
);
String content = mapper.writeValueAsString(request);
System.out.println(content);
this.mockMvc
.perform(MockMvcRequestBuilders.post("/timerange")
.contentType(MediaType.APPLICATION_JSON)
.content(content))
.andDo(MockMvcResultHandlers.print())
.andExpect(MockMvcResultMatchers.status().isOk())
.andExpect(MockMvcResultMatchers.jsonPath("$").isArray())
.andExpect(MockMvcResultMatchers.jsonPath("$[0]").exists())
.andExpect(MockMvcResultMatchers.jsonPath("$[0].channel").value("testChannel1"));
}
}

View File

@ -0,0 +1,59 @@
/**
*
*/
package ch.psi.daq.test.rest.query;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import ch.psi.daq.domain.cassandra.ChannelEvent;
import ch.psi.daq.domain.cassandra.DataEvent;
import ch.psi.daq.hazelcast.common.query.Query;
import ch.psi.daq.hazelcast.common.query.processor.QueryProcessor;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
/**
* @author zellweger_c
*
*/
public class DummyQueryProcessor implements QueryProcessor {
private static final List<String> TEST_CHANNEL_NAMES = Lists.newArrayList("testChannel1", "testChannel2");
/** {@inheritDoc}
*/
@Override
public Map<String, Stream<? extends DataEvent>> process(Query query) {
int nrOfEntries = 2;
int startingPulseId = 100;
Map<String, Stream<? extends DataEvent>> result = Maps.newHashMap();
TEST_CHANNEL_NAMES.forEach(chName ->{
List<DataEvent> entities = IntStream.rangeClosed(1, nrOfEntries).boxed().map(i -> {
long pulseId = startingPulseId + i - 1;
return new ChannelEvent(
chName,
pulseId,
0,
pulseId,
pulseId,
0,
"data_" + UUID.randomUUID().toString());
}).collect(Collectors.toList());
result.put(chName, entities.stream());
});
return result;
}
}

View File

@ -0,0 +1,54 @@
{
"queryType":"pulserange",
"ordering":"ASC",
"channels": [
"test1",
"test2"
],
"fields": [
"channel", pulseId", "globalMillis", "globalNanos", "dbValueBytes"
],
"aggregateChannels":"false",
"aggregationType": "index",
"aggregations": [
{
"fieldRef" : "e_val",
"type" : "max",
"resultFieldName" : "maximum"
},{
"fieldRef" : "e_val",
"type" : "min",
"resultFieldName" : "minimum"
}
],
"startPulseId" : 100,
"endPulseId" : 200
}
curl -v -X POST -H 'Content-Type: application/json' -d '{"queryType":"pulserange","ordering":"ASC","channels":["test1","test2"],"fields":["channel","pulseId"],"aggregateChannels":"false","aggregationType":"index","aggregations":[{"fieldRef":"e_val","type":"max","resultFieldName":"maximum"},{"fieldRef":"e_val","type":"min","resultFieldName":"minimum"}],"startPulseId":100,"endPulseId":200}' http://localhost:8080/pulserange
===============================================================================================================================================
{
"queryType":"timerange",
"channels":[
"test1",
"test2"
],
"fields":[
"channel",
"pulseId"
],
"aggregateChannels":false,
"aggregations":null,
"start":1434717654177,
"startNanoOffset":0,
"end":1434717655177,
"endNanoOffset":0,
"valueAggregation":"index",
"binIntervalCalculator":null,
"ordering":"DESC"
}
curl -v -X POST -H 'Content-Type: application/json' -d '{"queryType":"timerange","channels":["test1","test2"],"fields":["channel","pulseId"],"aggregateChannels":false,"aggregations":null,"start":1434717654177,"startNanoOffset":0,"end":1434717655177,"endNanoOffset":0,"valueAggregation":"index","ordering":"ASC"}' http://localhost:8080/timerange