diff --git a/build.gradle b/build.gradle
index e19e115..d495adc 100644
--- a/build.gradle
+++ b/build.gradle
@@ -1,9 +1,45 @@
version = '1.0.0'
+buildscript {
+ dependencies {
+ classpath("org.springframework.boot:spring-boot-gradle-plugin:1.2.4.RELEASE")
+ }
+ repositories {
+ jcenter()
+ }
+}
+apply plugin: 'spring-boot'
+
+repositories {
+ jcenter()
+}
+
+
+springBoot {
+ // when using spring loaded turn on noverify
+ noverify = true
+}
+
+applicationDefaultJvmArgs = [
+ "-Dfile.encoding=UTF-8",
+ // if you need to debug java agents:
+ "-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5006"
+]
+
+//configurations {
+// compile.exclude group: "com.fasterxml.jackson.core"
+//}
+
dependencies {
- compile project(':ch.psi.daq.cassandra')
+ compile (project(':ch.psi.daq.cassandra'))
+ compile (project(':ch.psi.daq.hazelcast.common'))
compile 'org.springframework.boot:spring-boot-starter-web:1.2.4.RELEASE'
+ compile 'com.google.code.gson:gson:2+'
compile 'org.apache.commons:commons-lang3:3.4'
+
+ testCompile 'org.springframework.boot:spring-boot-starter-test:1.2.4.RELEASE'
+ testCompile 'org.skyscreamer:jsonassert:1.2.3'
+ testCompile 'com.jayway.jsonpath:json-path:2.0.0'
}
uploadArchives {
diff --git a/src/main/java/ch/psi/daq/rest/DaqController.java b/src/main/java/ch/psi/daq/rest/DaqController.java
deleted file mode 100644
index aab01c6..0000000
--- a/src/main/java/ch/psi/daq/rest/DaqController.java
+++ /dev/null
@@ -1,32 +0,0 @@
-package ch.psi.daq.rest;
-
-import java.util.UUID;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.web.bind.annotation.RequestMapping;
-import org.springframework.web.bind.annotation.RestController;
-
-import ch.psi.daq.cassandra.writer.CassandraWriter;
-import ch.psi.daq.domain.cassandra.ChannelEvent;
-
-@RestController
-public class DaqController {
-
- private static final Logger logger = LoggerFactory.getLogger(DaqController.class);
-
- @Autowired
- private CassandraWriter writer;
-
- @RequestMapping(value = "/test")
- public void queryIndices() {
-
- logger.info("TEST endpoint invoked");
-
- long pulseId = System.currentTimeMillis();
- String value = "data_" + UUID.randomUUID().toString();
-
- writer.writeAsync(1, 0, new ChannelEvent("dummyChannel", pulseId, 0, pulseId, pulseId, 0, value));
- }
-}
diff --git a/src/main/java/ch/psi/daq/rest/DaqRestApplication.java b/src/main/java/ch/psi/daq/rest/DaqRestApplication.java
index e2a965f..2beaadf 100644
--- a/src/main/java/ch/psi/daq/rest/DaqRestApplication.java
+++ b/src/main/java/ch/psi/daq/rest/DaqRestApplication.java
@@ -5,58 +5,47 @@ import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.boot.context.web.SpringBootServletInitializer;
import org.springframework.context.annotation.ComponentScan;
-import org.springframework.context.annotation.Import;
-
-import ch.psi.daq.cassandra.config.CassandraConfig;
/**
- * Entry point to our rest-frontend of the Swissfel application which most importantly wires all the @RestController annotated classes.
+ * Entry point to our rest-frontend of the Swissfel application which most importantly wires all the @RestController
+ * annotated classes.
*
*
- * This acts as a @Configuration class for Spring. As such it has @ComponentScan
- * annotation that enables scanning for another Spring components in current
- * package and its subpackages.
+ * This acts as a @Configuration class for Spring. As such it has @ComponentScan annotation that
+ * enables scanning for another Spring components in current package and its subpackages.
*
- * Another annotation is @EnableAutoConfiguration which tells Spring Boot to run
- * autoconfiguration.
+ * Another annotation is @EnableAutoConfiguration which tells Spring Boot to run autoconfiguration.
*
- * It also extends SpringBootServletInitializer which will configure Spring
- * servlet for us, and overrides the configure() method to point to itself, so
- * Spring can find the main configuration.
+ * It also extends SpringBootServletInitializer which will configure Spring servlet for us, and
+ * overrides the configure() method to point to itself, so Spring can find the main configuration.
*
- * Finally, the main() method consists of single static call to
- * SpringApplication.run().
+ * Finally, the main() method consists of single static call to SpringApplication.run().
*
- * Methods annotated with @Bean are Java beans that are
- * container-managed, i.e. managed by Spring. Whenever there are @Autowire, @Inject
- * or similar annotations found in the code (which is being scanned through the @ComponentScan
- * annotation), the container then knows how to create those beans and inject
- * them accordingly.
+ * Methods annotated with @Bean are Java beans that are container-managed, i.e. managed by Spring.
+ * Whenever there are @Autowire, @Inject or similar annotations found in the code (which is being
+ * scanned through the @ComponentScan annotation), the container then knows how to create those
+ * beans and inject them accordingly.
*/
@SpringBootApplication
-//@Import(CassandraConfig.class) // either define the context to be imported, or see ComponentScan comment below
+// @Import(CassandraConfig.class) // either define the context to be imported, or see ComponentScan
+// comment below
@ComponentScan(basePackages = {
- "ch.psi.daq.rest",
- "ch.psi.daq.cassandra.config", // define the package name with the CassandraConfig configuration, or @Import it (see above)
- "ch.psi.daq.cassandra.reader",
- "ch.psi.daq.cassandra.writer"
+ "ch.psi.daq.rest",
+ "ch.psi.daq.cassandra.config", // define the package name with the CassandraConfig
+ // configuration, or @Import it (see above)
+ "ch.psi.daq.cassandra.reader",
+ "ch.psi.daq.cassandra.writer"
})
public class DaqRestApplication extends SpringBootServletInitializer {
-
- public static void main(final String[] args) {
- SpringApplication.run(DaqRestApplication.class, args);
- }
- @Override
- protected final SpringApplicationBuilder configure(final SpringApplicationBuilder application) {
- return application.sources(DaqRestApplication.class);
- }
-
- // a nested configuration
- // this guarantees that the ordering of the properties file is as expected
- // see: https://jira.spring.io/browse/SPR-10409?focusedCommentId=101393&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-101393
-// @Configuration
-// @Import(CassandraConfig.class)
-// static class InnerConfiguration { }
+ public static void main(final String[] args) {
+ SpringApplication.run(DaqRestApplication.class, args);
+ }
+
+ @Override
+ protected final SpringApplicationBuilder configure(final SpringApplicationBuilder application) {
+ return application.sources(DaqRestApplication.class);
+ }
+
}
diff --git a/src/main/java/ch/psi/daq/rest/DaqRestConfiguration.java b/src/main/java/ch/psi/daq/rest/DaqRestConfiguration.java
new file mode 100644
index 0000000..c4998c6
--- /dev/null
+++ b/src/main/java/ch/psi/daq/rest/DaqRestConfiguration.java
@@ -0,0 +1,53 @@
+package ch.psi.daq.rest;
+
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+import ch.psi.daq.common.statistic.StorelessStatistics;
+import ch.psi.daq.domain.cassandra.ChannelEvent;
+import ch.psi.daq.hazelcast.common.query.processor.QueryProcessor;
+import ch.psi.daq.rest.model.PropertyFilterMixin;
+import ch.psi.daq.test.rest.query.DummyQueryProcessor;
+
+import com.fasterxml.jackson.annotation.JsonInclude.Include;
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+@Configuration
+public class DaqRestConfiguration {
+
+ @Bean
+ public QueryProcessor queryProcessor() {
+ return new DummyQueryProcessor();
+ }
+
+ @Bean
+ public JsonFactory jsonFactory() {
+ return new JsonFactory();
+ }
+
+ @Bean
+ public ResponseStreamWriter responseStreamWriter() {
+ return new ResponseStreamWriter();
+ }
+
+ @Bean
+ public ObjectMapper objectMapper() {
+ ObjectMapper mapper = new ObjectMapper();
+ // only include non-null values
+ mapper.setSerializationInclusion(Include.NON_NULL);
+ // Mixin which is used dynamically to filter out which properties get serialised and which
+ // won't. This way, the user can specify which columns are to be received.
+ mapper.addMixIn(ChannelEvent.class, PropertyFilterMixin.class);
+ mapper.addMixIn(StorelessStatistics.class, PropertyFilterMixin.class);
+ return mapper;
+ }
+
+ // a nested configuration
+ // this guarantees that the ordering of the properties file is as expected
+ // see:
+ // https://jira.spring.io/browse/SPR-10409?focusedCommentId=101393&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-101393
+ // @Configuration
+ // @Import(CassandraConfig.class)
+ // static class InnerConfiguration { }
+}
diff --git a/src/main/java/ch/psi/daq/rest/DaqRestController.java b/src/main/java/ch/psi/daq/rest/DaqRestController.java
new file mode 100644
index 0000000..626b612
--- /dev/null
+++ b/src/main/java/ch/psi/daq/rest/DaqRestController.java
@@ -0,0 +1,80 @@
+package ch.psi.daq.rest;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.stream.Stream;
+
+import javax.servlet.http.HttpServletResponse;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+import ch.psi.daq.domain.cassandra.DataEvent;
+import ch.psi.daq.hazelcast.common.query.processor.QueryProcessor;
+import ch.psi.daq.rest.queries.AbstractQuery;
+import ch.psi.daq.rest.queries.PulseRangeQuery;
+import ch.psi.daq.rest.queries.TimeRangeQuery;
+
+@RestController
+public class DaqRestController {
+
+ private static final Logger logger = LoggerFactory.getLogger(DaqRestController.class);
+
+ @Autowired
+ private ResponseStreamWriter responseStreamWriter;
+
+ // TODO: just a dummy test implementation - remove when the real processor is ready
+ @Autowired
+ private QueryProcessor queryProcessor;
+
+ /**
+ *
+ * @param query
+ * @param res
+ * @throws IOException
+ */
+ @RequestMapping(value = "/pulserange")
+ public void pulseRange(@RequestBody PulseRangeQuery query, HttpServletResponse res) throws IOException {
+
+ logger.debug("PulseRangeQuery received: {}", query);
+
+ executeQuery(query, res);
+ }
+
+
+ /**
+ *
+ * @param query
+ * @param res
+ * @throws IOException
+ */
+ @RequestMapping(value = "/timerange")
+ public void pulseRange(@RequestBody TimeRangeQuery query, HttpServletResponse res) throws IOException {
+
+ logger.debug("TimeRangeQuery received: {}", query);
+
+ executeQuery(query, res);
+ }
+
+ /**
+ *
+ * @param query
+ * @param res
+ * @throws IOException
+ */
+ private void executeQuery(AbstractQuery query, HttpServletResponse res) throws IOException {
+ // all the magic happens here
+ Map> process = queryProcessor.process(query);
+
+ Stream flatStreams = process.values().stream().flatMap(s -> {
+ return s;
+ });
+
+ // write the response back to the client using java 8 streams
+ responseStreamWriter.respond(flatStreams, query, res);
+ }
+}
diff --git a/src/main/java/ch/psi/daq/rest/ResponseStreamWriter.java b/src/main/java/ch/psi/daq/rest/ResponseStreamWriter.java
new file mode 100644
index 0000000..e9847c1
--- /dev/null
+++ b/src/main/java/ch/psi/daq/rest/ResponseStreamWriter.java
@@ -0,0 +1,112 @@
+/**
+ *
+ */
+package ch.psi.daq.rest;
+
+import java.io.IOException;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import javax.servlet.ServletResponse;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+
+import ch.psi.daq.domain.cassandra.DataEvent;
+import ch.psi.daq.rest.queries.AbstractQuery;
+
+import com.fasterxml.jackson.core.JsonEncoding;
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectWriter;
+import com.fasterxml.jackson.databind.ser.impl.SimpleBeanPropertyFilter;
+import com.fasterxml.jackson.databind.ser.impl.SimpleFilterProvider;
+
+/**
+ * Takes a Java 8 stream and writes it to the output stream provided by the {@link ServletResponse}
+ * of the current request.
+ */
+public class ResponseStreamWriter {
+
+ private static final Logger logger = LoggerFactory.getLogger(ResponseStreamWriter.class);
+
+ @Autowired
+ private JsonFactory jsonFactory;
+
+ @Autowired
+ private ObjectMapper mapper;
+
+ /**
+ * Responding with the the contents of the stream by writing into the output stream of the
+ * {@link ServletResponse}.
+ *
+ * @param stream {@link Stream} instance of {@link DataEvent}s
+ * @param query concrete instance of {@link AbstractQuery}
+ * @param response {@link ServletResponse} instance given by the current HTTP request
+ * @throws IOException thrown if writing to the output stream fails
+ */
+ public void respond(Stream stream, AbstractQuery query, ServletResponse response) throws IOException {
+
+ Set includedFields = query.getFields();
+
+ if (query.getAggregations() != null) {
+ includedFields.addAll(query.getAggregations()
+ .stream()
+ .map(a -> {
+ return a.getType().toString();
+ })
+ .collect(Collectors.toSet()));
+ }
+
+ ObjectWriter writer = configureWriter(includedFields);
+
+ respondInternal(stream, response, writer);
+ }
+
+ /**
+ * Configures the writer dynamically by including the fields which should be included in the
+ * response.
+ *
+ * @param includedFields set of strings which correspond to the getter method names of the
+ * classes registered as a mixed-in
+ * @return the configured writer that includes the specified fields
+ */
+ private ObjectWriter configureWriter(Set includedFields) {
+ SimpleFilterProvider propertyFilter = new SimpleFilterProvider();
+ propertyFilter.addFilter("namedPropertyFilter", SimpleBeanPropertyFilter.filterOutAllExcept(includedFields));
+ // only write the properties not excluded in the filter
+ ObjectWriter writer = mapper.writer(propertyFilter);
+ return writer;
+ }
+
+ /**
+ * Writes the Java stream into the output stream.
+ *
+ * @param stream {@link Stream} instance of {@link DataEvent}s
+ * @param response {@link ServletResponse} instance given by the current HTTP request
+ * @param writer configured writer that includes the fields the end user wants to see
+ * @throws IOException thrown if writing to the output stream fails
+ */
+ private void respondInternal(Stream stream, ServletResponse response, ObjectWriter writer)
+ throws IOException {
+
+ JsonGenerator generator = jsonFactory.createGenerator(response.getOutputStream(), JsonEncoding.UTF8);
+ generator.writeStartArray();
+ stream.forEach(ds -> {
+ try {
+ logger.trace("Writing value for: {}", ds);
+ // use the writer created just before
+ writer.writeValue(generator, ds);
+ } catch (Exception e) {
+ logger.error("", e);
+ }
+ });
+ generator.writeEndArray();
+ generator.flush();
+ generator.close();
+ }
+
+}
diff --git a/src/main/java/ch/psi/daq/rest/model/PropertyFilterMixin.java b/src/main/java/ch/psi/daq/rest/model/PropertyFilterMixin.java
new file mode 100644
index 0000000..e6faa41
--- /dev/null
+++ b/src/main/java/ch/psi/daq/rest/model/PropertyFilterMixin.java
@@ -0,0 +1,10 @@
+package ch.psi.daq.rest.model;
+
+import com.fasterxml.jackson.annotation.JsonFilter;
+
+/**
+ *
+ */
+@JsonFilter("namedPropertyFilter")
+public class PropertyFilterMixin {
+}
diff --git a/src/main/java/ch/psi/daq/rest/queries/AbstractQuery.java b/src/main/java/ch/psi/daq/rest/queries/AbstractQuery.java
new file mode 100644
index 0000000..396efe9
--- /dev/null
+++ b/src/main/java/ch/psi/daq/rest/queries/AbstractQuery.java
@@ -0,0 +1,139 @@
+package ch.psi.daq.rest.queries;
+
+import java.util.Collections;
+import java.util.LinkedHashSet;
+import java.util.List;
+
+import org.apache.commons.lang3.builder.ReflectionToStringBuilder;
+import org.springframework.util.Assert;
+
+import ch.psi.daq.cassandra.reader.Ordering;
+import ch.psi.daq.hazelcast.common.query.Aggregation;
+import ch.psi.daq.hazelcast.common.query.Query;
+import ch.psi.daq.hazelcast.common.query.ValueAggregation;
+import ch.psi.daq.hazelcast.common.query.bin.BinIntervalCalculator;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonSubTypes.Type;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+
+/**
+ *
+ * @author zellweger_c
+ *
+ */
+@JsonTypeInfo(
+ use = JsonTypeInfo.Id.NAME,
+ include = JsonTypeInfo.As.PROPERTY,
+ property = "queryType")
+@JsonSubTypes(
+{
+ @Type(value = PulseRangeQuery.class, name = "pulserange"),
+ @Type(value = TimeRangeQuery.class, name = "timerange"),
+})
+public abstract class AbstractQuery implements Query {
+
+ private List channels;
+
+ private LinkedHashSet fields;
+
+ private List aggregations;
+
+ private ValueAggregation valueAggregation;
+
+ private boolean aggregateChannels;
+
+ private Ordering ordering;
+
+ /**
+ *
+ * @param isOrderDescending whether to add a 'orderBy' clause into the database query
+ * @param channelIds all the channelIds (channel names) we want to query
+ * @param fields the fields (who map to fields in the DB) we are interested in returning to the
+ * client, needs to be in insertion order (hence the {@link LinkedHashSet} type)
+ */
+ @JsonCreator
+ public AbstractQuery(
+ // note that those annotations are needed for the polymorphic
+ // mapping to work correctly
+ @JsonProperty(value = "ordering") Ordering ordering,
+ @JsonProperty(value = "channels") List channelIds,
+ @JsonProperty(value = "fields") LinkedHashSet fields,
+ @JsonProperty(value = "aggregateChannels") boolean aggregateChannels,
+ @JsonProperty(value = "aggregationType") ValueAggregation valueAggregation,
+ @JsonProperty(value = "aggregations") List aggregations) {
+
+ this.ordering = ordering;
+ this.aggregateChannels = aggregateChannels;
+ this.valueAggregation = valueAggregation;
+
+ if (channelIds == null || fields == null) {
+ throw new IllegalArgumentException("sourceIds and/or fields cannot be null.");
+ }
+ Assert.notNull(channelIds, "channel name must not be null.");
+ Assert.notNull(fields, "field, i.e. property, names must not be null.");
+
+ this.channels = channelIds;
+ this.fields = fields;
+ this.aggregations = aggregations;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public List getChannels() {
+ return Collections.unmodifiableList(channels);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public Ordering getOrdering() {
+ return ordering;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public ValueAggregation getValueAggregation() {
+ return valueAggregation;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public boolean aggregateChannels() {
+ return isAggregateChannels();
+ }
+
+ @Override
+ public BinIntervalCalculator getBinIntervalCalculator() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ public boolean isAggregateChannels() {
+ return aggregateChannels;
+ }
+
+
+ public LinkedHashSet getFields() {
+ return fields;
+ }
+
+ public List getAggregations() {
+ return aggregations;
+ }
+
+ @Override
+ public String toString() {
+ return ReflectionToStringBuilder.reflectionToString(this);
+ }
+
+}
diff --git a/src/main/java/ch/psi/daq/rest/queries/PulseRangeQuery.java b/src/main/java/ch/psi/daq/rest/queries/PulseRangeQuery.java
new file mode 100644
index 0000000..134d85a
--- /dev/null
+++ b/src/main/java/ch/psi/daq/rest/queries/PulseRangeQuery.java
@@ -0,0 +1,67 @@
+package ch.psi.daq.rest.queries;
+
+import java.util.LinkedHashSet;
+import java.util.List;
+
+import org.apache.commons.lang.builder.ToStringBuilder;
+
+import ch.psi.daq.cassandra.reader.Ordering;
+import ch.psi.daq.hazelcast.common.query.Aggregation;
+import ch.psi.daq.hazelcast.common.query.ValueAggregation;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ *
+ */
+public class PulseRangeQuery extends AbstractQuery {
+
+ private long startPulseId;
+ private long endPulseId;
+
+ @JsonCreator
+ public PulseRangeQuery(
+ // note that those annotations are needed for the polymorphic
+ // mapping to work correctly
+ @JsonProperty(value = "ordering") Ordering ordering,
+ @JsonProperty(value = "channels") List channels,
+ @JsonProperty(value = "fields") LinkedHashSet fields,
+ @JsonProperty(value = "aggregateChannels") boolean aggregateChannels,
+ @JsonProperty(value = "aggregationType") ValueAggregation valueAggregation,
+ @JsonProperty(value = "aggregations") List aggregations,
+ @JsonProperty(value = "startPulseId") long startPulseId,
+ @JsonProperty(value = "endPulseId") long endPulseId)
+ {
+
+ super(ordering, channels, fields, aggregateChannels, valueAggregation, aggregations);
+
+ this.startPulseId = startPulseId;
+ this.endPulseId = endPulseId;
+ }
+
+ public long getStartPulseId() {
+ return startPulseId;
+ }
+
+ public void setStartPulseId(long startPulseId) {
+ this.startPulseId = startPulseId;
+ }
+
+ public long getEndPulseId() {
+ return endPulseId;
+ }
+
+ public void setEndPulseId(long endPulseId) {
+ this.endPulseId = endPulseId;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public String toString() {
+ return ToStringBuilder.reflectionToString(this);
+ }
+
+}
diff --git a/src/main/java/ch/psi/daq/rest/queries/TimeRangeQuery.java b/src/main/java/ch/psi/daq/rest/queries/TimeRangeQuery.java
new file mode 100644
index 0000000..cbac26d
--- /dev/null
+++ b/src/main/java/ch/psi/daq/rest/queries/TimeRangeQuery.java
@@ -0,0 +1,133 @@
+package ch.psi.daq.rest.queries;
+
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.LinkedHashSet;
+import java.util.List;
+
+import javax.validation.constraints.NotNull;
+
+import org.apache.commons.lang.builder.ToStringBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import ch.psi.daq.cassandra.reader.Ordering;
+import ch.psi.daq.hazelcast.common.query.Aggregation;
+import ch.psi.daq.hazelcast.common.query.ValueAggregation;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public class TimeRangeQuery extends AbstractQuery {
+
+
+ private static final String DATE_FORMAT_STRING = "yyyy/MM/dd hh:mm:ss.SSS";
+ private static final SimpleDateFormat DATE_FORMAT = new SimpleDateFormat(DATE_FORMAT_STRING);
+ private static Logger logger = LoggerFactory.getLogger(TimeRangeQuery.class);
+
+ @NotNull
+ private long start;
+ @NotNull
+ private long startNanoOffset;
+
+ @NotNull
+ private long end;
+ @NotNull
+ private long endNanoOffset;
+
+
+ /**
+ *
+ * @param ordering
+ * @param channelIds all the sourceIds (channel names) we want to query
+ * @param fields the fields (who map to fields in the DB) we are interested in returning to the
+ * client, needs to be in insertion order (hence the {@link LinkedHashSet} type)
+ * @param startMillis
+ * @param startNanoOffset
+ * @param endMillis
+ * @param endNanoOffset
+ */
+ @JsonCreator
+ public TimeRangeQuery(
+ // note that those annotations are needed for the polymorphic
+ // mapping to work correctly
+ @JsonProperty(value = "ordering") Ordering ordering,
+ @JsonProperty(value = "channels") List channels,
+ @JsonProperty(value = "fields") LinkedHashSet fields,
+ @JsonProperty(value = "aggregateChannels") boolean aggregateChannels,
+ @JsonProperty(value = "aggregationType") ValueAggregation valueAggregation,
+ @JsonProperty(value = "aggregations") List aggregations,
+ @JsonProperty(value = "start") long startMillis,
+ @JsonProperty(value = "startNanoOffset") long startNanoOffset,
+ @JsonProperty(value = "startDateTime") String startDateTime,
+ @JsonProperty(value = "end") long endMillis,
+ @JsonProperty(value = "endNanoOffset") long endNanoOffset,
+ @JsonProperty(value = "endDateTime") String endDateTime) {
+
+ super(ordering, channels, fields, aggregateChannels, valueAggregation, aggregations);
+
+ this.start = startMillis;
+ this.startNanoOffset = startNanoOffset;
+ this.end = endMillis;
+ this.endNanoOffset = endNanoOffset;
+
+
+ if (startDateTime != null && endDateTime != null) {
+ logger.info("startDateTime and endDateTime specified. This takes precedence over the start / end fields.");
+ try {
+ Date startDate = DATE_FORMAT.parse(startDateTime);
+ Date endDate = DATE_FORMAT.parse(endDateTime);
+
+ this.start = startDate.getTime();
+ this.end = endDate.getTime();
+ } catch (ParseException e) {
+ logger.error("Parsing the start- and/or endDate was unsuccessful. "
+ + "The format must be '" + DATE_FORMAT_STRING + "'", e);
+ }
+ }
+
+ }
+
+ public long getStart() {
+ return start;
+ }
+
+ public void setStart(long start) {
+ this.start = start;
+ }
+
+ public long getStartNanoOffset() {
+ return startNanoOffset;
+ }
+
+ public void setStartNanoOffset(long startNanoOffset) {
+ this.startNanoOffset = startNanoOffset;
+ }
+
+ public long getEnd() {
+ return end;
+ }
+
+ public void setEnd(long end) {
+ this.end = end;
+ }
+
+ public long getEndNanoOffset() {
+ return endNanoOffset;
+ }
+
+ public void setEndNanoOffset(long endNanoOffset) {
+ this.endNanoOffset = endNanoOffset;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public String toString() {
+ return ToStringBuilder.reflectionToString(this);
+ }
+
+
+}
diff --git a/src/test/java/ch/psi/daq/test/rest/AbstractDaqRestTest.java b/src/test/java/ch/psi/daq/test/rest/AbstractDaqRestTest.java
new file mode 100644
index 0000000..70838af
--- /dev/null
+++ b/src/test/java/ch/psi/daq/test/rest/AbstractDaqRestTest.java
@@ -0,0 +1,51 @@
+package ch.psi.daq.test.rest;
+
+import org.cassandraunit.spring.EmbeddedCassandra;
+import org.junit.Before;
+import org.junit.runner.RunWith;
+import org.mockito.MockitoAnnotations;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.SpringApplicationConfiguration;
+import org.springframework.test.annotation.DirtiesContext;
+import org.springframework.test.annotation.DirtiesContext.ClassMode;
+import org.springframework.test.context.TestExecutionListeners;
+import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
+import org.springframework.test.context.support.DependencyInjectionTestExecutionListener;
+import org.springframework.test.context.web.WebAppConfiguration;
+import org.springframework.test.web.servlet.MockMvc;
+import org.springframework.test.web.servlet.setup.MockMvcBuilders;
+import org.springframework.web.context.WebApplicationContext;
+
+import ch.psi.daq.rest.DaqRestApplication;
+import ch.psi.daq.test.cassandra.CassandraDaqUnitDependencyInjectionTestExecutionListener;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+
+@SpringApplicationConfiguration(classes = {DaqRestApplication.class, DaqRestApplicationConfiguration.class})
+@TestExecutionListeners({
+ CassandraDaqUnitDependencyInjectionTestExecutionListener.class,
+ DependencyInjectionTestExecutionListener.class})
+@EmbeddedCassandra
+@DirtiesContext(classMode = ClassMode.AFTER_EACH_TEST_METHOD)
+@WebAppConfiguration
+@RunWith(SpringJUnit4ClassRunner.class)
+public abstract class AbstractDaqRestTest {
+
+ @Autowired
+ protected WebApplicationContext webApplicationContext;
+
+ protected MockMvc mockMvc;
+
+ protected ObjectMapper mapper = new ObjectMapper();
+
+ @Before
+ public void setup() {
+ // Process mock annotations
+ MockitoAnnotations.initMocks(this);
+
+ // Setup Spring test in webapp-mode (same config as spring-boot)
+ this.mockMvc = MockMvcBuilders.webAppContextSetup(webApplicationContext).build();
+ }
+
+}
diff --git a/src/test/java/ch/psi/daq/test/rest/DaqRestApplicationConfiguration.java b/src/test/java/ch/psi/daq/test/rest/DaqRestApplicationConfiguration.java
new file mode 100644
index 0000000..b9def01
--- /dev/null
+++ b/src/test/java/ch/psi/daq/test/rest/DaqRestApplicationConfiguration.java
@@ -0,0 +1,23 @@
+package ch.psi.daq.test.rest;
+
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.Import;
+import org.springframework.web.servlet.config.annotation.EnableWebMvc;
+import org.springframework.web.servlet.config.annotation.WebMvcConfigurationSupport;
+
+import ch.psi.daq.test.cassandra.LocalCassandraTestConfig;
+
+@Configuration
+//@ComponentScan(basePackages = {
+// "ch.psi.daq.rest",
+// "ch.psi.daq.cassandra.config", // define the package name with the CassandraConfig
+// // configuration, or @Import it (see above)
+// "ch.psi.daq.cassandra.reader",
+// "ch.psi.daq.cassandra.writer"
+//})
+@Import(value = {LocalCassandraTestConfig.class})
+@EnableWebMvc
+public class DaqRestApplicationConfiguration extends WebMvcConfigurationSupport {
+
+ // add test-specific beans and configurations here
+}
\ No newline at end of file
diff --git a/src/test/java/ch/psi/daq/test/rest/controller/DaqControllerTest.java b/src/test/java/ch/psi/daq/test/rest/controller/DaqControllerTest.java
new file mode 100644
index 0000000..96bc9f7
--- /dev/null
+++ b/src/test/java/ch/psi/daq/test/rest/controller/DaqControllerTest.java
@@ -0,0 +1,91 @@
+package ch.psi.daq.test.rest.controller;
+
+import java.util.Date;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Test;
+import org.springframework.http.MediaType;
+import org.springframework.test.web.servlet.request.MockMvcRequestBuilders;
+import org.springframework.test.web.servlet.result.MockMvcResultHandlers;
+import org.springframework.test.web.servlet.result.MockMvcResultMatchers;
+
+import ch.psi.daq.cassandra.reader.Ordering;
+import ch.psi.daq.hazelcast.common.query.ValueAggregation;
+import ch.psi.daq.rest.queries.PulseRangeQuery;
+import ch.psi.daq.rest.queries.TimeRangeQuery;
+import ch.psi.daq.test.rest.AbstractDaqRestTest;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+/**
+ * Tests the {@link DaqController} implementation.
+ */
+public class DaqControllerTest extends AbstractDaqRestTest {
+
+ private static final List DEFAULT_PROPERTIES = Lists.newArrayList("channel", "pulseId");
+
+ @Test
+ public void testPulseRangeQuery() throws Exception {
+ PulseRangeQuery request = new PulseRangeQuery(
+ Ordering.DESC,
+ Lists.newArrayList(), // DummyQueryProcessor simply returns a fixed list
+ Sets.newLinkedHashSet(DEFAULT_PROPERTIES),
+ false,
+ ValueAggregation.index,
+ null,
+ 100l,
+ 101l
+ );
+
+ String content = mapper.writeValueAsString(request);
+
+ this.mockMvc
+ .perform(MockMvcRequestBuilders.post("/pulserange")
+ .contentType(MediaType.APPLICATION_JSON)
+ .content(content))
+
+ .andDo(MockMvcResultHandlers.print())
+ .andExpect(MockMvcResultMatchers.status().isOk())
+ .andExpect(MockMvcResultMatchers.jsonPath("$").isArray())
+ .andExpect(MockMvcResultMatchers.jsonPath("$[0]").exists())
+ .andExpect(MockMvcResultMatchers.jsonPath("$[0].pulseId").value(100))
+ .andExpect(MockMvcResultMatchers.jsonPath("$[0].channel").value("testChannel1"));
+ }
+
+ @Test
+ public void testTimeRangeQuery() throws Exception {
+
+ long startTime = new Date().getTime();
+ long endTime = startTime + TimeUnit.SECONDS.toMillis(1);
+ TimeRangeQuery request = new TimeRangeQuery(
+ Ordering.ASC,
+ Lists.newArrayList(), // DummyQueryProcessor simply returns a fixed list
+ Sets.newLinkedHashSet(DEFAULT_PROPERTIES),
+ false,
+ ValueAggregation.index,
+ null, // aggregations
+ startTime, // startMillis
+ 0,
+ null,
+ endTime,
+ 0,
+ null
+ );
+
+ String content = mapper.writeValueAsString(request);
+ System.out.println(content);
+
+ this.mockMvc
+ .perform(MockMvcRequestBuilders.post("/timerange")
+ .contentType(MediaType.APPLICATION_JSON)
+ .content(content))
+
+ .andDo(MockMvcResultHandlers.print())
+ .andExpect(MockMvcResultMatchers.status().isOk())
+ .andExpect(MockMvcResultMatchers.jsonPath("$").isArray())
+ .andExpect(MockMvcResultMatchers.jsonPath("$[0]").exists())
+ .andExpect(MockMvcResultMatchers.jsonPath("$[0].channel").value("testChannel1"));
+ }
+}
diff --git a/src/test/java/ch/psi/daq/test/rest/query/DummyQueryProcessor.java b/src/test/java/ch/psi/daq/test/rest/query/DummyQueryProcessor.java
new file mode 100644
index 0000000..724ca96
--- /dev/null
+++ b/src/test/java/ch/psi/daq/test/rest/query/DummyQueryProcessor.java
@@ -0,0 +1,59 @@
+/**
+ *
+ */
+package ch.psi.daq.test.rest.query;
+
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+import ch.psi.daq.domain.cassandra.ChannelEvent;
+import ch.psi.daq.domain.cassandra.DataEvent;
+import ch.psi.daq.hazelcast.common.query.Query;
+import ch.psi.daq.hazelcast.common.query.processor.QueryProcessor;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+/**
+ * @author zellweger_c
+ *
+ */
+public class DummyQueryProcessor implements QueryProcessor {
+
+ private static final List TEST_CHANNEL_NAMES = Lists.newArrayList("testChannel1", "testChannel2");
+
+ /** {@inheritDoc}
+ */
+ @Override
+ public Map> process(Query query) {
+
+ int nrOfEntries = 2;
+ int startingPulseId = 100;
+
+ Map> result = Maps.newHashMap();
+
+ TEST_CHANNEL_NAMES.forEach(chName ->{
+
+ List entities = IntStream.rangeClosed(1, nrOfEntries).boxed().map(i -> {
+ long pulseId = startingPulseId + i - 1;
+ return new ChannelEvent(
+ chName,
+ pulseId,
+ 0,
+ pulseId,
+ pulseId,
+ 0,
+ "data_" + UUID.randomUUID().toString());
+ }).collect(Collectors.toList());
+
+ result.put(chName, entities.stream());
+ });
+
+ return result;
+ }
+
+}
diff --git a/src/test/resources/test-requests.txt b/src/test/resources/test-requests.txt
new file mode 100644
index 0000000..b5af79d
--- /dev/null
+++ b/src/test/resources/test-requests.txt
@@ -0,0 +1,54 @@
+{
+ "queryType":"pulserange",
+ "ordering":"ASC",
+ "channels": [
+ "test1",
+ "test2"
+ ],
+ "fields": [
+ "channel", pulseId", "globalMillis", "globalNanos", "dbValueBytes"
+ ],
+ "aggregateChannels":"false",
+ "aggregationType": "index",
+ "aggregations": [
+ {
+ "fieldRef" : "e_val",
+ "type" : "max",
+ "resultFieldName" : "maximum"
+ },{
+ "fieldRef" : "e_val",
+ "type" : "min",
+ "resultFieldName" : "minimum"
+ }
+ ],
+ "startPulseId" : 100,
+ "endPulseId" : 200
+}
+
+curl -v -X POST -H 'Content-Type: application/json' -d '{"queryType":"pulserange","ordering":"ASC","channels":["test1","test2"],"fields":["channel","pulseId"],"aggregateChannels":"false","aggregationType":"index","aggregations":[{"fieldRef":"e_val","type":"max","resultFieldName":"maximum"},{"fieldRef":"e_val","type":"min","resultFieldName":"minimum"}],"startPulseId":100,"endPulseId":200}' http://localhost:8080/pulserange
+
+
+===============================================================================================================================================
+
+{
+ "queryType":"timerange",
+ "channels":[
+ "test1",
+ "test2"
+ ],
+ "fields":[
+ "channel",
+ "pulseId"
+ ],
+ "aggregateChannels":false,
+ "aggregations":null,
+ "start":1434717654177,
+ "startNanoOffset":0,
+ "end":1434717655177,
+ "endNanoOffset":0,
+ "valueAggregation":"index",
+ "binIntervalCalculator":null,
+ "ordering":"DESC"
+}
+
+curl -v -X POST -H 'Content-Type: application/json' -d '{"queryType":"timerange","channels":["test1","test2"],"fields":["channel","pulseId"],"aggregateChannels":false,"aggregations":null,"start":1434717654177,"startNanoOffset":0,"end":1434717655177,"endNanoOffset":0,"valueAggregation":"index","ordering":"ASC"}' http://localhost:8080/timerange
\ No newline at end of file