Merge pull request #3 in ST/ch.psi.daq.rest from atest79 to master
# By Zellweger Christof Ralf # Via Zellweger Christof Ralf * commit '51d219f6867c7a4ac76404416a751c1dbe5d13e0': ATEST-79: - removing code that uses UUIDGen ATEST-79: - implementing first draft of timerange and pulserange queries - adding tests, again based on an embedded local cassandra instance ATEST-79: - adding 1st draft of the queries
This commit is contained in:
38
build.gradle
38
build.gradle
@ -1,9 +1,45 @@
|
|||||||
version = '1.0.0'
|
version = '1.0.0'
|
||||||
|
|
||||||
|
buildscript {
|
||||||
|
dependencies {
|
||||||
|
classpath("org.springframework.boot:spring-boot-gradle-plugin:1.2.4.RELEASE")
|
||||||
|
}
|
||||||
|
repositories {
|
||||||
|
jcenter()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
apply plugin: 'spring-boot'
|
||||||
|
|
||||||
|
repositories {
|
||||||
|
jcenter()
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
springBoot {
|
||||||
|
// when using spring loaded turn on noverify
|
||||||
|
noverify = true
|
||||||
|
}
|
||||||
|
|
||||||
|
applicationDefaultJvmArgs = [
|
||||||
|
"-Dfile.encoding=UTF-8",
|
||||||
|
// if you need to debug java agents:
|
||||||
|
"-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5006"
|
||||||
|
]
|
||||||
|
|
||||||
|
//configurations {
|
||||||
|
// compile.exclude group: "com.fasterxml.jackson.core"
|
||||||
|
//}
|
||||||
|
|
||||||
dependencies {
|
dependencies {
|
||||||
compile project(':ch.psi.daq.cassandra')
|
compile (project(':ch.psi.daq.cassandra'))
|
||||||
|
compile (project(':ch.psi.daq.hazelcast.common'))
|
||||||
compile 'org.springframework.boot:spring-boot-starter-web:1.2.4.RELEASE'
|
compile 'org.springframework.boot:spring-boot-starter-web:1.2.4.RELEASE'
|
||||||
|
compile 'com.google.code.gson:gson:2+'
|
||||||
compile 'org.apache.commons:commons-lang3:3.4'
|
compile 'org.apache.commons:commons-lang3:3.4'
|
||||||
|
|
||||||
|
testCompile 'org.springframework.boot:spring-boot-starter-test:1.2.4.RELEASE'
|
||||||
|
testCompile 'org.skyscreamer:jsonassert:1.2.3'
|
||||||
|
testCompile 'com.jayway.jsonpath:json-path:2.0.0'
|
||||||
}
|
}
|
||||||
|
|
||||||
uploadArchives {
|
uploadArchives {
|
||||||
|
@ -1,32 +0,0 @@
|
|||||||
package ch.psi.daq.rest;
|
|
||||||
|
|
||||||
import java.util.UUID;
|
|
||||||
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
|
||||||
import org.springframework.web.bind.annotation.RequestMapping;
|
|
||||||
import org.springframework.web.bind.annotation.RestController;
|
|
||||||
|
|
||||||
import ch.psi.daq.cassandra.writer.CassandraWriter;
|
|
||||||
import ch.psi.daq.domain.cassandra.ChannelEvent;
|
|
||||||
|
|
||||||
@RestController
|
|
||||||
public class DaqController {
|
|
||||||
|
|
||||||
private static final Logger logger = LoggerFactory.getLogger(DaqController.class);
|
|
||||||
|
|
||||||
@Autowired
|
|
||||||
private CassandraWriter writer;
|
|
||||||
|
|
||||||
@RequestMapping(value = "/test")
|
|
||||||
public void queryIndices() {
|
|
||||||
|
|
||||||
logger.info("TEST endpoint invoked");
|
|
||||||
|
|
||||||
long pulseId = System.currentTimeMillis();
|
|
||||||
String value = "data_" + UUID.randomUUID().toString();
|
|
||||||
|
|
||||||
writer.writeAsync(1, 0, new ChannelEvent("dummyChannel", pulseId, 0, pulseId, pulseId, 0, value));
|
|
||||||
}
|
|
||||||
}
|
|
@ -5,39 +5,34 @@ import org.springframework.boot.autoconfigure.SpringBootApplication;
|
|||||||
import org.springframework.boot.builder.SpringApplicationBuilder;
|
import org.springframework.boot.builder.SpringApplicationBuilder;
|
||||||
import org.springframework.boot.context.web.SpringBootServletInitializer;
|
import org.springframework.boot.context.web.SpringBootServletInitializer;
|
||||||
import org.springframework.context.annotation.ComponentScan;
|
import org.springframework.context.annotation.ComponentScan;
|
||||||
import org.springframework.context.annotation.Import;
|
|
||||||
|
|
||||||
import ch.psi.daq.cassandra.config.CassandraConfig;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Entry point to our rest-frontend of the Swissfel application which most importantly wires all the @RestController annotated classes.
|
* Entry point to our rest-frontend of the Swissfel application which most importantly wires all the @RestController
|
||||||
|
* annotated classes.
|
||||||
* <p>
|
* <p>
|
||||||
*
|
*
|
||||||
* This acts as a @Configuration class for Spring. As such it has @ComponentScan
|
* This acts as a @Configuration class for Spring. As such it has @ComponentScan annotation that
|
||||||
* annotation that enables scanning for another Spring components in current
|
* enables scanning for another Spring components in current package and its subpackages.
|
||||||
* package and its subpackages.
|
|
||||||
* <p>
|
* <p>
|
||||||
* Another annotation is @EnableAutoConfiguration which tells Spring Boot to run
|
* Another annotation is @EnableAutoConfiguration which tells Spring Boot to run autoconfiguration.
|
||||||
* autoconfiguration.
|
|
||||||
* <p>
|
* <p>
|
||||||
* It also extends SpringBootServletInitializer which will configure Spring
|
* It also extends SpringBootServletInitializer which will configure Spring servlet for us, and
|
||||||
* servlet for us, and overrides the configure() method to point to itself, so
|
* overrides the configure() method to point to itself, so Spring can find the main configuration.
|
||||||
* Spring can find the main configuration.
|
|
||||||
* <p>
|
* <p>
|
||||||
* Finally, the main() method consists of single static call to
|
* Finally, the main() method consists of single static call to SpringApplication.run().
|
||||||
* SpringApplication.run().
|
|
||||||
* <p>
|
* <p>
|
||||||
* Methods annotated with @Bean are Java beans that are
|
* Methods annotated with @Bean are Java beans that are container-managed, i.e. managed by Spring.
|
||||||
* container-managed, i.e. managed by Spring. Whenever there are @Autowire, @Inject
|
* Whenever there are @Autowire, @Inject or similar annotations found in the code (which is being
|
||||||
* or similar annotations found in the code (which is being scanned through the @ComponentScan
|
* scanned through the @ComponentScan annotation), the container then knows how to create those
|
||||||
* annotation), the container then knows how to create those beans and inject
|
* beans and inject them accordingly.
|
||||||
* them accordingly.
|
|
||||||
*/
|
*/
|
||||||
@SpringBootApplication
|
@SpringBootApplication
|
||||||
//@Import(CassandraConfig.class) // either define the context to be imported, or see ComponentScan comment below
|
// @Import(CassandraConfig.class) // either define the context to be imported, or see ComponentScan
|
||||||
|
// comment below
|
||||||
@ComponentScan(basePackages = {
|
@ComponentScan(basePackages = {
|
||||||
"ch.psi.daq.rest",
|
"ch.psi.daq.rest",
|
||||||
"ch.psi.daq.cassandra.config", // define the package name with the CassandraConfig configuration, or @Import it (see above)
|
"ch.psi.daq.cassandra.config", // define the package name with the CassandraConfig
|
||||||
|
// configuration, or @Import it (see above)
|
||||||
"ch.psi.daq.cassandra.reader",
|
"ch.psi.daq.cassandra.reader",
|
||||||
"ch.psi.daq.cassandra.writer"
|
"ch.psi.daq.cassandra.writer"
|
||||||
})
|
})
|
||||||
@ -53,10 +48,4 @@ public class DaqRestApplication extends SpringBootServletInitializer {
|
|||||||
return application.sources(DaqRestApplication.class);
|
return application.sources(DaqRestApplication.class);
|
||||||
}
|
}
|
||||||
|
|
||||||
// a nested configuration
|
|
||||||
// this guarantees that the ordering of the properties file is as expected
|
|
||||||
// see: https://jira.spring.io/browse/SPR-10409?focusedCommentId=101393&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-101393
|
|
||||||
// @Configuration
|
|
||||||
// @Import(CassandraConfig.class)
|
|
||||||
// static class InnerConfiguration { }
|
|
||||||
}
|
}
|
||||||
|
53
src/main/java/ch/psi/daq/rest/DaqRestConfiguration.java
Normal file
53
src/main/java/ch/psi/daq/rest/DaqRestConfiguration.java
Normal file
@ -0,0 +1,53 @@
|
|||||||
|
package ch.psi.daq.rest;
|
||||||
|
|
||||||
|
import org.springframework.context.annotation.Bean;
|
||||||
|
import org.springframework.context.annotation.Configuration;
|
||||||
|
|
||||||
|
import ch.psi.daq.common.statistic.StorelessStatistics;
|
||||||
|
import ch.psi.daq.domain.cassandra.ChannelEvent;
|
||||||
|
import ch.psi.daq.hazelcast.common.query.processor.QueryProcessor;
|
||||||
|
import ch.psi.daq.rest.model.PropertyFilterMixin;
|
||||||
|
import ch.psi.daq.test.rest.query.DummyQueryProcessor;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.annotation.JsonInclude.Include;
|
||||||
|
import com.fasterxml.jackson.core.JsonFactory;
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
|
||||||
|
@Configuration
|
||||||
|
public class DaqRestConfiguration {
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public QueryProcessor queryProcessor() {
|
||||||
|
return new DummyQueryProcessor();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public JsonFactory jsonFactory() {
|
||||||
|
return new JsonFactory();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public ResponseStreamWriter responseStreamWriter() {
|
||||||
|
return new ResponseStreamWriter();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public ObjectMapper objectMapper() {
|
||||||
|
ObjectMapper mapper = new ObjectMapper();
|
||||||
|
// only include non-null values
|
||||||
|
mapper.setSerializationInclusion(Include.NON_NULL);
|
||||||
|
// Mixin which is used dynamically to filter out which properties get serialised and which
|
||||||
|
// won't. This way, the user can specify which columns are to be received.
|
||||||
|
mapper.addMixIn(ChannelEvent.class, PropertyFilterMixin.class);
|
||||||
|
mapper.addMixIn(StorelessStatistics.class, PropertyFilterMixin.class);
|
||||||
|
return mapper;
|
||||||
|
}
|
||||||
|
|
||||||
|
// a nested configuration
|
||||||
|
// this guarantees that the ordering of the properties file is as expected
|
||||||
|
// see:
|
||||||
|
// https://jira.spring.io/browse/SPR-10409?focusedCommentId=101393&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-101393
|
||||||
|
// @Configuration
|
||||||
|
// @Import(CassandraConfig.class)
|
||||||
|
// static class InnerConfiguration { }
|
||||||
|
}
|
80
src/main/java/ch/psi/daq/rest/DaqRestController.java
Normal file
80
src/main/java/ch/psi/daq/rest/DaqRestController.java
Normal file
@ -0,0 +1,80 @@
|
|||||||
|
package ch.psi.daq.rest;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
|
import javax.servlet.http.HttpServletResponse;
|
||||||
|
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.web.bind.annotation.RequestBody;
|
||||||
|
import org.springframework.web.bind.annotation.RequestMapping;
|
||||||
|
import org.springframework.web.bind.annotation.RestController;
|
||||||
|
|
||||||
|
import ch.psi.daq.domain.cassandra.DataEvent;
|
||||||
|
import ch.psi.daq.hazelcast.common.query.processor.QueryProcessor;
|
||||||
|
import ch.psi.daq.rest.queries.AbstractQuery;
|
||||||
|
import ch.psi.daq.rest.queries.PulseRangeQuery;
|
||||||
|
import ch.psi.daq.rest.queries.TimeRangeQuery;
|
||||||
|
|
||||||
|
@RestController
|
||||||
|
public class DaqRestController {
|
||||||
|
|
||||||
|
private static final Logger logger = LoggerFactory.getLogger(DaqRestController.class);
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private ResponseStreamWriter responseStreamWriter;
|
||||||
|
|
||||||
|
// TODO: just a dummy test implementation - remove when the real processor is ready
|
||||||
|
@Autowired
|
||||||
|
private QueryProcessor queryProcessor;
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* @param query
|
||||||
|
* @param res
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
@RequestMapping(value = "/pulserange")
|
||||||
|
public void pulseRange(@RequestBody PulseRangeQuery query, HttpServletResponse res) throws IOException {
|
||||||
|
|
||||||
|
logger.debug("PulseRangeQuery received: {}", query);
|
||||||
|
|
||||||
|
executeQuery(query, res);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* @param query
|
||||||
|
* @param res
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
@RequestMapping(value = "/timerange")
|
||||||
|
public void pulseRange(@RequestBody TimeRangeQuery query, HttpServletResponse res) throws IOException {
|
||||||
|
|
||||||
|
logger.debug("TimeRangeQuery received: {}", query);
|
||||||
|
|
||||||
|
executeQuery(query, res);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* @param query
|
||||||
|
* @param res
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
private void executeQuery(AbstractQuery query, HttpServletResponse res) throws IOException {
|
||||||
|
// all the magic happens here
|
||||||
|
Map<String, Stream<? extends DataEvent>> process = queryProcessor.process(query);
|
||||||
|
|
||||||
|
Stream<DataEvent> flatStreams = process.values().stream().flatMap(s -> {
|
||||||
|
return s;
|
||||||
|
});
|
||||||
|
|
||||||
|
// write the response back to the client using java 8 streams
|
||||||
|
responseStreamWriter.respond(flatStreams, query, res);
|
||||||
|
}
|
||||||
|
}
|
112
src/main/java/ch/psi/daq/rest/ResponseStreamWriter.java
Normal file
112
src/main/java/ch/psi/daq/rest/ResponseStreamWriter.java
Normal file
@ -0,0 +1,112 @@
|
|||||||
|
/**
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
package ch.psi.daq.rest;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
|
import javax.servlet.ServletResponse;
|
||||||
|
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
|
||||||
|
import ch.psi.daq.domain.cassandra.DataEvent;
|
||||||
|
import ch.psi.daq.rest.queries.AbstractQuery;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.core.JsonEncoding;
|
||||||
|
import com.fasterxml.jackson.core.JsonFactory;
|
||||||
|
import com.fasterxml.jackson.core.JsonGenerator;
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
import com.fasterxml.jackson.databind.ObjectWriter;
|
||||||
|
import com.fasterxml.jackson.databind.ser.impl.SimpleBeanPropertyFilter;
|
||||||
|
import com.fasterxml.jackson.databind.ser.impl.SimpleFilterProvider;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Takes a Java 8 stream and writes it to the output stream provided by the {@link ServletResponse}
|
||||||
|
* of the current request.
|
||||||
|
*/
|
||||||
|
public class ResponseStreamWriter {
|
||||||
|
|
||||||
|
private static final Logger logger = LoggerFactory.getLogger(ResponseStreamWriter.class);
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private JsonFactory jsonFactory;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private ObjectMapper mapper;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Responding with the the contents of the stream by writing into the output stream of the
|
||||||
|
* {@link ServletResponse}.
|
||||||
|
*
|
||||||
|
* @param stream {@link Stream} instance of {@link DataEvent}s
|
||||||
|
* @param query concrete instance of {@link AbstractQuery}
|
||||||
|
* @param response {@link ServletResponse} instance given by the current HTTP request
|
||||||
|
* @throws IOException thrown if writing to the output stream fails
|
||||||
|
*/
|
||||||
|
public void respond(Stream<DataEvent> stream, AbstractQuery query, ServletResponse response) throws IOException {
|
||||||
|
|
||||||
|
Set<String> includedFields = query.getFields();
|
||||||
|
|
||||||
|
if (query.getAggregations() != null) {
|
||||||
|
includedFields.addAll(query.getAggregations()
|
||||||
|
.stream()
|
||||||
|
.map(a -> {
|
||||||
|
return a.getType().toString();
|
||||||
|
})
|
||||||
|
.collect(Collectors.toSet()));
|
||||||
|
}
|
||||||
|
|
||||||
|
ObjectWriter writer = configureWriter(includedFields);
|
||||||
|
|
||||||
|
respondInternal(stream, response, writer);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Configures the writer dynamically by including the fields which should be included in the
|
||||||
|
* response.
|
||||||
|
*
|
||||||
|
* @param includedFields set of strings which correspond to the getter method names of the
|
||||||
|
* classes registered as a mixed-in
|
||||||
|
* @return the configured writer that includes the specified fields
|
||||||
|
*/
|
||||||
|
private ObjectWriter configureWriter(Set<String> includedFields) {
|
||||||
|
SimpleFilterProvider propertyFilter = new SimpleFilterProvider();
|
||||||
|
propertyFilter.addFilter("namedPropertyFilter", SimpleBeanPropertyFilter.filterOutAllExcept(includedFields));
|
||||||
|
// only write the properties not excluded in the filter
|
||||||
|
ObjectWriter writer = mapper.writer(propertyFilter);
|
||||||
|
return writer;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Writes the Java stream into the output stream.
|
||||||
|
*
|
||||||
|
* @param stream {@link Stream} instance of {@link DataEvent}s
|
||||||
|
* @param response {@link ServletResponse} instance given by the current HTTP request
|
||||||
|
* @param writer configured writer that includes the fields the end user wants to see
|
||||||
|
* @throws IOException thrown if writing to the output stream fails
|
||||||
|
*/
|
||||||
|
private void respondInternal(Stream<DataEvent> stream, ServletResponse response, ObjectWriter writer)
|
||||||
|
throws IOException {
|
||||||
|
|
||||||
|
JsonGenerator generator = jsonFactory.createGenerator(response.getOutputStream(), JsonEncoding.UTF8);
|
||||||
|
generator.writeStartArray();
|
||||||
|
stream.forEach(ds -> {
|
||||||
|
try {
|
||||||
|
logger.trace("Writing value for: {}", ds);
|
||||||
|
// use the writer created just before
|
||||||
|
writer.writeValue(generator, ds);
|
||||||
|
} catch (Exception e) {
|
||||||
|
logger.error("", e);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
generator.writeEndArray();
|
||||||
|
generator.flush();
|
||||||
|
generator.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
10
src/main/java/ch/psi/daq/rest/model/PropertyFilterMixin.java
Normal file
10
src/main/java/ch/psi/daq/rest/model/PropertyFilterMixin.java
Normal file
@ -0,0 +1,10 @@
|
|||||||
|
package ch.psi.daq.rest.model;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.annotation.JsonFilter;
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
@JsonFilter("namedPropertyFilter")
|
||||||
|
public class PropertyFilterMixin {
|
||||||
|
}
|
139
src/main/java/ch/psi/daq/rest/queries/AbstractQuery.java
Normal file
139
src/main/java/ch/psi/daq/rest/queries/AbstractQuery.java
Normal file
@ -0,0 +1,139 @@
|
|||||||
|
package ch.psi.daq.rest.queries;
|
||||||
|
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.LinkedHashSet;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.apache.commons.lang3.builder.ReflectionToStringBuilder;
|
||||||
|
import org.springframework.util.Assert;
|
||||||
|
|
||||||
|
import ch.psi.daq.cassandra.reader.Ordering;
|
||||||
|
import ch.psi.daq.hazelcast.common.query.Aggregation;
|
||||||
|
import ch.psi.daq.hazelcast.common.query.Query;
|
||||||
|
import ch.psi.daq.hazelcast.common.query.ValueAggregation;
|
||||||
|
import ch.psi.daq.hazelcast.common.query.bin.BinIntervalCalculator;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||||
|
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||||
|
import com.fasterxml.jackson.annotation.JsonSubTypes;
|
||||||
|
import com.fasterxml.jackson.annotation.JsonSubTypes.Type;
|
||||||
|
import com.fasterxml.jackson.annotation.JsonTypeInfo;
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* @author zellweger_c
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
@JsonTypeInfo(
|
||||||
|
use = JsonTypeInfo.Id.NAME,
|
||||||
|
include = JsonTypeInfo.As.PROPERTY,
|
||||||
|
property = "queryType")
|
||||||
|
@JsonSubTypes(
|
||||||
|
{
|
||||||
|
@Type(value = PulseRangeQuery.class, name = "pulserange"),
|
||||||
|
@Type(value = TimeRangeQuery.class, name = "timerange"),
|
||||||
|
})
|
||||||
|
public abstract class AbstractQuery implements Query {
|
||||||
|
|
||||||
|
private List<String> channels;
|
||||||
|
|
||||||
|
private LinkedHashSet<String> fields;
|
||||||
|
|
||||||
|
private List<Aggregation> aggregations;
|
||||||
|
|
||||||
|
private ValueAggregation valueAggregation;
|
||||||
|
|
||||||
|
private boolean aggregateChannels;
|
||||||
|
|
||||||
|
private Ordering ordering;
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* @param isOrderDescending whether to add a 'orderBy' clause into the database query
|
||||||
|
* @param channelIds all the channelIds (channel names) we want to query
|
||||||
|
* @param fields the fields (who map to fields in the DB) we are interested in returning to the
|
||||||
|
* client, needs to be in insertion order (hence the {@link LinkedHashSet} type)
|
||||||
|
*/
|
||||||
|
@JsonCreator
|
||||||
|
public AbstractQuery(
|
||||||
|
// note that those annotations are needed for the polymorphic
|
||||||
|
// mapping to work correctly
|
||||||
|
@JsonProperty(value = "ordering") Ordering ordering,
|
||||||
|
@JsonProperty(value = "channels") List<String> channelIds,
|
||||||
|
@JsonProperty(value = "fields") LinkedHashSet<String> fields,
|
||||||
|
@JsonProperty(value = "aggregateChannels") boolean aggregateChannels,
|
||||||
|
@JsonProperty(value = "aggregationType") ValueAggregation valueAggregation,
|
||||||
|
@JsonProperty(value = "aggregations") List<Aggregation> aggregations) {
|
||||||
|
|
||||||
|
this.ordering = ordering;
|
||||||
|
this.aggregateChannels = aggregateChannels;
|
||||||
|
this.valueAggregation = valueAggregation;
|
||||||
|
|
||||||
|
if (channelIds == null || fields == null) {
|
||||||
|
throw new IllegalArgumentException("sourceIds and/or fields cannot be null.");
|
||||||
|
}
|
||||||
|
Assert.notNull(channelIds, "channel name must not be null.");
|
||||||
|
Assert.notNull(fields, "field, i.e. property, names must not be null.");
|
||||||
|
|
||||||
|
this.channels = channelIds;
|
||||||
|
this.fields = fields;
|
||||||
|
this.aggregations = aggregations;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@inheritDoc}
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public List<String> getChannels() {
|
||||||
|
return Collections.unmodifiableList(channels);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@inheritDoc}
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public Ordering getOrdering() {
|
||||||
|
return ordering;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@inheritDoc}
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public ValueAggregation getValueAggregation() {
|
||||||
|
return valueAggregation;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@inheritDoc}
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public boolean aggregateChannels() {
|
||||||
|
return isAggregateChannels();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public BinIntervalCalculator getBinIntervalCalculator() {
|
||||||
|
// TODO Auto-generated method stub
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isAggregateChannels() {
|
||||||
|
return aggregateChannels;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public LinkedHashSet<String> getFields() {
|
||||||
|
return fields;
|
||||||
|
}
|
||||||
|
|
||||||
|
public List<Aggregation> getAggregations() {
|
||||||
|
return aggregations;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return ReflectionToStringBuilder.reflectionToString(this);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
67
src/main/java/ch/psi/daq/rest/queries/PulseRangeQuery.java
Normal file
67
src/main/java/ch/psi/daq/rest/queries/PulseRangeQuery.java
Normal file
@ -0,0 +1,67 @@
|
|||||||
|
package ch.psi.daq.rest.queries;
|
||||||
|
|
||||||
|
import java.util.LinkedHashSet;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.apache.commons.lang.builder.ToStringBuilder;
|
||||||
|
|
||||||
|
import ch.psi.daq.cassandra.reader.Ordering;
|
||||||
|
import ch.psi.daq.hazelcast.common.query.Aggregation;
|
||||||
|
import ch.psi.daq.hazelcast.common.query.ValueAggregation;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||||
|
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
public class PulseRangeQuery extends AbstractQuery {
|
||||||
|
|
||||||
|
private long startPulseId;
|
||||||
|
private long endPulseId;
|
||||||
|
|
||||||
|
@JsonCreator
|
||||||
|
public PulseRangeQuery(
|
||||||
|
// note that those annotations are needed for the polymorphic
|
||||||
|
// mapping to work correctly
|
||||||
|
@JsonProperty(value = "ordering") Ordering ordering,
|
||||||
|
@JsonProperty(value = "channels") List<String> channels,
|
||||||
|
@JsonProperty(value = "fields") LinkedHashSet<String> fields,
|
||||||
|
@JsonProperty(value = "aggregateChannels") boolean aggregateChannels,
|
||||||
|
@JsonProperty(value = "aggregationType") ValueAggregation valueAggregation,
|
||||||
|
@JsonProperty(value = "aggregations") List<Aggregation> aggregations,
|
||||||
|
@JsonProperty(value = "startPulseId") long startPulseId,
|
||||||
|
@JsonProperty(value = "endPulseId") long endPulseId)
|
||||||
|
{
|
||||||
|
|
||||||
|
super(ordering, channels, fields, aggregateChannels, valueAggregation, aggregations);
|
||||||
|
|
||||||
|
this.startPulseId = startPulseId;
|
||||||
|
this.endPulseId = endPulseId;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getStartPulseId() {
|
||||||
|
return startPulseId;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setStartPulseId(long startPulseId) {
|
||||||
|
this.startPulseId = startPulseId;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getEndPulseId() {
|
||||||
|
return endPulseId;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setEndPulseId(long endPulseId) {
|
||||||
|
this.endPulseId = endPulseId;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@inheritDoc}
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return ToStringBuilder.reflectionToString(this);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
133
src/main/java/ch/psi/daq/rest/queries/TimeRangeQuery.java
Normal file
133
src/main/java/ch/psi/daq/rest/queries/TimeRangeQuery.java
Normal file
@ -0,0 +1,133 @@
|
|||||||
|
package ch.psi.daq.rest.queries;
|
||||||
|
|
||||||
|
import java.text.ParseException;
|
||||||
|
import java.text.SimpleDateFormat;
|
||||||
|
import java.util.Date;
|
||||||
|
import java.util.LinkedHashSet;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import javax.validation.constraints.NotNull;
|
||||||
|
|
||||||
|
import org.apache.commons.lang.builder.ToStringBuilder;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import ch.psi.daq.cassandra.reader.Ordering;
|
||||||
|
import ch.psi.daq.hazelcast.common.query.Aggregation;
|
||||||
|
import ch.psi.daq.hazelcast.common.query.ValueAggregation;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||||
|
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||||
|
|
||||||
|
public class TimeRangeQuery extends AbstractQuery {
|
||||||
|
|
||||||
|
|
||||||
|
private static final String DATE_FORMAT_STRING = "yyyy/MM/dd hh:mm:ss.SSS";
|
||||||
|
private static final SimpleDateFormat DATE_FORMAT = new SimpleDateFormat(DATE_FORMAT_STRING);
|
||||||
|
private static Logger logger = LoggerFactory.getLogger(TimeRangeQuery.class);
|
||||||
|
|
||||||
|
@NotNull
|
||||||
|
private long start;
|
||||||
|
@NotNull
|
||||||
|
private long startNanoOffset;
|
||||||
|
|
||||||
|
@NotNull
|
||||||
|
private long end;
|
||||||
|
@NotNull
|
||||||
|
private long endNanoOffset;
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* @param ordering
|
||||||
|
* @param channelIds all the sourceIds (channel names) we want to query
|
||||||
|
* @param fields the fields (who map to fields in the DB) we are interested in returning to the
|
||||||
|
* client, needs to be in insertion order (hence the {@link LinkedHashSet} type)
|
||||||
|
* @param startMillis
|
||||||
|
* @param startNanoOffset
|
||||||
|
* @param endMillis
|
||||||
|
* @param endNanoOffset
|
||||||
|
*/
|
||||||
|
@JsonCreator
|
||||||
|
public TimeRangeQuery(
|
||||||
|
// note that those annotations are needed for the polymorphic
|
||||||
|
// mapping to work correctly
|
||||||
|
@JsonProperty(value = "ordering") Ordering ordering,
|
||||||
|
@JsonProperty(value = "channels") List<String> channels,
|
||||||
|
@JsonProperty(value = "fields") LinkedHashSet<String> fields,
|
||||||
|
@JsonProperty(value = "aggregateChannels") boolean aggregateChannels,
|
||||||
|
@JsonProperty(value = "aggregationType") ValueAggregation valueAggregation,
|
||||||
|
@JsonProperty(value = "aggregations") List<Aggregation> aggregations,
|
||||||
|
@JsonProperty(value = "start") long startMillis,
|
||||||
|
@JsonProperty(value = "startNanoOffset") long startNanoOffset,
|
||||||
|
@JsonProperty(value = "startDateTime") String startDateTime,
|
||||||
|
@JsonProperty(value = "end") long endMillis,
|
||||||
|
@JsonProperty(value = "endNanoOffset") long endNanoOffset,
|
||||||
|
@JsonProperty(value = "endDateTime") String endDateTime) {
|
||||||
|
|
||||||
|
super(ordering, channels, fields, aggregateChannels, valueAggregation, aggregations);
|
||||||
|
|
||||||
|
this.start = startMillis;
|
||||||
|
this.startNanoOffset = startNanoOffset;
|
||||||
|
this.end = endMillis;
|
||||||
|
this.endNanoOffset = endNanoOffset;
|
||||||
|
|
||||||
|
|
||||||
|
if (startDateTime != null && endDateTime != null) {
|
||||||
|
logger.info("startDateTime and endDateTime specified. This takes precedence over the start / end fields.");
|
||||||
|
try {
|
||||||
|
Date startDate = DATE_FORMAT.parse(startDateTime);
|
||||||
|
Date endDate = DATE_FORMAT.parse(endDateTime);
|
||||||
|
|
||||||
|
this.start = startDate.getTime();
|
||||||
|
this.end = endDate.getTime();
|
||||||
|
} catch (ParseException e) {
|
||||||
|
logger.error("Parsing the start- and/or endDate was unsuccessful. "
|
||||||
|
+ "The format must be '" + DATE_FORMAT_STRING + "'", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getStart() {
|
||||||
|
return start;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setStart(long start) {
|
||||||
|
this.start = start;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getStartNanoOffset() {
|
||||||
|
return startNanoOffset;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setStartNanoOffset(long startNanoOffset) {
|
||||||
|
this.startNanoOffset = startNanoOffset;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getEnd() {
|
||||||
|
return end;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setEnd(long end) {
|
||||||
|
this.end = end;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getEndNanoOffset() {
|
||||||
|
return endNanoOffset;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setEndNanoOffset(long endNanoOffset) {
|
||||||
|
this.endNanoOffset = endNanoOffset;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@inheritDoc}
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return ToStringBuilder.reflectionToString(this);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
}
|
51
src/test/java/ch/psi/daq/test/rest/AbstractDaqRestTest.java
Normal file
51
src/test/java/ch/psi/daq/test/rest/AbstractDaqRestTest.java
Normal file
@ -0,0 +1,51 @@
|
|||||||
|
package ch.psi.daq.test.rest;
|
||||||
|
|
||||||
|
import org.cassandraunit.spring.EmbeddedCassandra;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.runner.RunWith;
|
||||||
|
import org.mockito.MockitoAnnotations;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.boot.test.SpringApplicationConfiguration;
|
||||||
|
import org.springframework.test.annotation.DirtiesContext;
|
||||||
|
import org.springframework.test.annotation.DirtiesContext.ClassMode;
|
||||||
|
import org.springframework.test.context.TestExecutionListeners;
|
||||||
|
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
|
||||||
|
import org.springframework.test.context.support.DependencyInjectionTestExecutionListener;
|
||||||
|
import org.springframework.test.context.web.WebAppConfiguration;
|
||||||
|
import org.springframework.test.web.servlet.MockMvc;
|
||||||
|
import org.springframework.test.web.servlet.setup.MockMvcBuilders;
|
||||||
|
import org.springframework.web.context.WebApplicationContext;
|
||||||
|
|
||||||
|
import ch.psi.daq.rest.DaqRestApplication;
|
||||||
|
import ch.psi.daq.test.cassandra.CassandraDaqUnitDependencyInjectionTestExecutionListener;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
|
||||||
|
|
||||||
|
@SpringApplicationConfiguration(classes = {DaqRestApplication.class, DaqRestApplicationConfiguration.class})
|
||||||
|
@TestExecutionListeners({
|
||||||
|
CassandraDaqUnitDependencyInjectionTestExecutionListener.class,
|
||||||
|
DependencyInjectionTestExecutionListener.class})
|
||||||
|
@EmbeddedCassandra
|
||||||
|
@DirtiesContext(classMode = ClassMode.AFTER_EACH_TEST_METHOD)
|
||||||
|
@WebAppConfiguration
|
||||||
|
@RunWith(SpringJUnit4ClassRunner.class)
|
||||||
|
public abstract class AbstractDaqRestTest {
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
protected WebApplicationContext webApplicationContext;
|
||||||
|
|
||||||
|
protected MockMvc mockMvc;
|
||||||
|
|
||||||
|
protected ObjectMapper mapper = new ObjectMapper();
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setup() {
|
||||||
|
// Process mock annotations
|
||||||
|
MockitoAnnotations.initMocks(this);
|
||||||
|
|
||||||
|
// Setup Spring test in webapp-mode (same config as spring-boot)
|
||||||
|
this.mockMvc = MockMvcBuilders.webAppContextSetup(webApplicationContext).build();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -0,0 +1,23 @@
|
|||||||
|
package ch.psi.daq.test.rest;
|
||||||
|
|
||||||
|
import org.springframework.context.annotation.Configuration;
|
||||||
|
import org.springframework.context.annotation.Import;
|
||||||
|
import org.springframework.web.servlet.config.annotation.EnableWebMvc;
|
||||||
|
import org.springframework.web.servlet.config.annotation.WebMvcConfigurationSupport;
|
||||||
|
|
||||||
|
import ch.psi.daq.test.cassandra.LocalCassandraTestConfig;
|
||||||
|
|
||||||
|
@Configuration
|
||||||
|
//@ComponentScan(basePackages = {
|
||||||
|
// "ch.psi.daq.rest",
|
||||||
|
// "ch.psi.daq.cassandra.config", // define the package name with the CassandraConfig
|
||||||
|
// // configuration, or @Import it (see above)
|
||||||
|
// "ch.psi.daq.cassandra.reader",
|
||||||
|
// "ch.psi.daq.cassandra.writer"
|
||||||
|
//})
|
||||||
|
@Import(value = {LocalCassandraTestConfig.class})
|
||||||
|
@EnableWebMvc
|
||||||
|
public class DaqRestApplicationConfiguration extends WebMvcConfigurationSupport {
|
||||||
|
|
||||||
|
// add test-specific beans and configurations here
|
||||||
|
}
|
@ -0,0 +1,91 @@
|
|||||||
|
package ch.psi.daq.test.rest.controller;
|
||||||
|
|
||||||
|
import java.util.Date;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.springframework.http.MediaType;
|
||||||
|
import org.springframework.test.web.servlet.request.MockMvcRequestBuilders;
|
||||||
|
import org.springframework.test.web.servlet.result.MockMvcResultHandlers;
|
||||||
|
import org.springframework.test.web.servlet.result.MockMvcResultMatchers;
|
||||||
|
|
||||||
|
import ch.psi.daq.cassandra.reader.Ordering;
|
||||||
|
import ch.psi.daq.hazelcast.common.query.ValueAggregation;
|
||||||
|
import ch.psi.daq.rest.queries.PulseRangeQuery;
|
||||||
|
import ch.psi.daq.rest.queries.TimeRangeQuery;
|
||||||
|
import ch.psi.daq.test.rest.AbstractDaqRestTest;
|
||||||
|
|
||||||
|
import com.google.common.collect.Lists;
|
||||||
|
import com.google.common.collect.Sets;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests the {@link DaqController} implementation.
|
||||||
|
*/
|
||||||
|
public class DaqControllerTest extends AbstractDaqRestTest {
|
||||||
|
|
||||||
|
private static final List<String> DEFAULT_PROPERTIES = Lists.newArrayList("channel", "pulseId");
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPulseRangeQuery() throws Exception {
|
||||||
|
PulseRangeQuery request = new PulseRangeQuery(
|
||||||
|
Ordering.DESC,
|
||||||
|
Lists.newArrayList(), // DummyQueryProcessor simply returns a fixed list
|
||||||
|
Sets.newLinkedHashSet(DEFAULT_PROPERTIES),
|
||||||
|
false,
|
||||||
|
ValueAggregation.index,
|
||||||
|
null,
|
||||||
|
100l,
|
||||||
|
101l
|
||||||
|
);
|
||||||
|
|
||||||
|
String content = mapper.writeValueAsString(request);
|
||||||
|
|
||||||
|
this.mockMvc
|
||||||
|
.perform(MockMvcRequestBuilders.post("/pulserange")
|
||||||
|
.contentType(MediaType.APPLICATION_JSON)
|
||||||
|
.content(content))
|
||||||
|
|
||||||
|
.andDo(MockMvcResultHandlers.print())
|
||||||
|
.andExpect(MockMvcResultMatchers.status().isOk())
|
||||||
|
.andExpect(MockMvcResultMatchers.jsonPath("$").isArray())
|
||||||
|
.andExpect(MockMvcResultMatchers.jsonPath("$[0]").exists())
|
||||||
|
.andExpect(MockMvcResultMatchers.jsonPath("$[0].pulseId").value(100))
|
||||||
|
.andExpect(MockMvcResultMatchers.jsonPath("$[0].channel").value("testChannel1"));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testTimeRangeQuery() throws Exception {
|
||||||
|
|
||||||
|
long startTime = new Date().getTime();
|
||||||
|
long endTime = startTime + TimeUnit.SECONDS.toMillis(1);
|
||||||
|
TimeRangeQuery request = new TimeRangeQuery(
|
||||||
|
Ordering.ASC,
|
||||||
|
Lists.newArrayList(), // DummyQueryProcessor simply returns a fixed list
|
||||||
|
Sets.newLinkedHashSet(DEFAULT_PROPERTIES),
|
||||||
|
false,
|
||||||
|
ValueAggregation.index,
|
||||||
|
null, // aggregations
|
||||||
|
startTime, // startMillis
|
||||||
|
0,
|
||||||
|
null,
|
||||||
|
endTime,
|
||||||
|
0,
|
||||||
|
null
|
||||||
|
);
|
||||||
|
|
||||||
|
String content = mapper.writeValueAsString(request);
|
||||||
|
System.out.println(content);
|
||||||
|
|
||||||
|
this.mockMvc
|
||||||
|
.perform(MockMvcRequestBuilders.post("/timerange")
|
||||||
|
.contentType(MediaType.APPLICATION_JSON)
|
||||||
|
.content(content))
|
||||||
|
|
||||||
|
.andDo(MockMvcResultHandlers.print())
|
||||||
|
.andExpect(MockMvcResultMatchers.status().isOk())
|
||||||
|
.andExpect(MockMvcResultMatchers.jsonPath("$").isArray())
|
||||||
|
.andExpect(MockMvcResultMatchers.jsonPath("$[0]").exists())
|
||||||
|
.andExpect(MockMvcResultMatchers.jsonPath("$[0].channel").value("testChannel1"));
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,59 @@
|
|||||||
|
/**
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
package ch.psi.daq.test.rest.query;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.UUID;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
import java.util.stream.IntStream;
|
||||||
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
|
import ch.psi.daq.domain.cassandra.ChannelEvent;
|
||||||
|
import ch.psi.daq.domain.cassandra.DataEvent;
|
||||||
|
import ch.psi.daq.hazelcast.common.query.Query;
|
||||||
|
import ch.psi.daq.hazelcast.common.query.processor.QueryProcessor;
|
||||||
|
|
||||||
|
import com.google.common.collect.Lists;
|
||||||
|
import com.google.common.collect.Maps;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author zellweger_c
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
public class DummyQueryProcessor implements QueryProcessor {
|
||||||
|
|
||||||
|
private static final List<String> TEST_CHANNEL_NAMES = Lists.newArrayList("testChannel1", "testChannel2");
|
||||||
|
|
||||||
|
/** {@inheritDoc}
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public Map<String, Stream<? extends DataEvent>> process(Query query) {
|
||||||
|
|
||||||
|
int nrOfEntries = 2;
|
||||||
|
int startingPulseId = 100;
|
||||||
|
|
||||||
|
Map<String, Stream<? extends DataEvent>> result = Maps.newHashMap();
|
||||||
|
|
||||||
|
TEST_CHANNEL_NAMES.forEach(chName ->{
|
||||||
|
|
||||||
|
List<DataEvent> entities = IntStream.rangeClosed(1, nrOfEntries).boxed().map(i -> {
|
||||||
|
long pulseId = startingPulseId + i - 1;
|
||||||
|
return new ChannelEvent(
|
||||||
|
chName,
|
||||||
|
pulseId,
|
||||||
|
0,
|
||||||
|
pulseId,
|
||||||
|
pulseId,
|
||||||
|
0,
|
||||||
|
"data_" + UUID.randomUUID().toString());
|
||||||
|
}).collect(Collectors.toList());
|
||||||
|
|
||||||
|
result.put(chName, entities.stream());
|
||||||
|
});
|
||||||
|
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
54
src/test/resources/test-requests.txt
Normal file
54
src/test/resources/test-requests.txt
Normal file
@ -0,0 +1,54 @@
|
|||||||
|
{
|
||||||
|
"queryType":"pulserange",
|
||||||
|
"ordering":"ASC",
|
||||||
|
"channels": [
|
||||||
|
"test1",
|
||||||
|
"test2"
|
||||||
|
],
|
||||||
|
"fields": [
|
||||||
|
"channel", pulseId", "globalMillis", "globalNanos", "dbValueBytes"
|
||||||
|
],
|
||||||
|
"aggregateChannels":"false",
|
||||||
|
"aggregationType": "index",
|
||||||
|
"aggregations": [
|
||||||
|
{
|
||||||
|
"fieldRef" : "e_val",
|
||||||
|
"type" : "max",
|
||||||
|
"resultFieldName" : "maximum"
|
||||||
|
},{
|
||||||
|
"fieldRef" : "e_val",
|
||||||
|
"type" : "min",
|
||||||
|
"resultFieldName" : "minimum"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"startPulseId" : 100,
|
||||||
|
"endPulseId" : 200
|
||||||
|
}
|
||||||
|
|
||||||
|
curl -v -X POST -H 'Content-Type: application/json' -d '{"queryType":"pulserange","ordering":"ASC","channels":["test1","test2"],"fields":["channel","pulseId"],"aggregateChannels":"false","aggregationType":"index","aggregations":[{"fieldRef":"e_val","type":"max","resultFieldName":"maximum"},{"fieldRef":"e_val","type":"min","resultFieldName":"minimum"}],"startPulseId":100,"endPulseId":200}' http://localhost:8080/pulserange
|
||||||
|
|
||||||
|
|
||||||
|
===============================================================================================================================================
|
||||||
|
|
||||||
|
{
|
||||||
|
"queryType":"timerange",
|
||||||
|
"channels":[
|
||||||
|
"test1",
|
||||||
|
"test2"
|
||||||
|
],
|
||||||
|
"fields":[
|
||||||
|
"channel",
|
||||||
|
"pulseId"
|
||||||
|
],
|
||||||
|
"aggregateChannels":false,
|
||||||
|
"aggregations":null,
|
||||||
|
"start":1434717654177,
|
||||||
|
"startNanoOffset":0,
|
||||||
|
"end":1434717655177,
|
||||||
|
"endNanoOffset":0,
|
||||||
|
"valueAggregation":"index",
|
||||||
|
"binIntervalCalculator":null,
|
||||||
|
"ordering":"DESC"
|
||||||
|
}
|
||||||
|
|
||||||
|
curl -v -X POST -H 'Content-Type: application/json' -d '{"queryType":"timerange","channels":["test1","test2"],"fields":["channel","pulseId"],"aggregateChannels":false,"aggregations":null,"start":1434717654177,"startNanoOffset":0,"end":1434717655177,"endNanoOffset":0,"valueAggregation":"index","ordering":"ASC"}' http://localhost:8080/timerange
|
Reference in New Issue
Block a user