diff --git a/build.gradle b/build.gradle index e19e115..cdcb257 100644 --- a/build.gradle +++ b/build.gradle @@ -2,6 +2,7 @@ version = '1.0.0' dependencies { compile project(':ch.psi.daq.cassandra') + compile project(':ch.psi.daq.hazelcast.common') compile 'org.springframework.boot:spring-boot-starter-web:1.2.4.RELEASE' compile 'org.apache.commons:commons-lang3:3.4' } diff --git a/src/main/java/ch/psi/daq/rest/DaqController.java b/src/main/java/ch/psi/daq/rest/DaqController.java index aab01c6..6a0d640 100644 --- a/src/main/java/ch/psi/daq/rest/DaqController.java +++ b/src/main/java/ch/psi/daq/rest/DaqController.java @@ -5,11 +5,13 @@ import java.util.UUID; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import ch.psi.daq.cassandra.writer.CassandraWriter; import ch.psi.daq.domain.cassandra.ChannelEvent; +import ch.psi.daq.rest.queries.PulseRangeQuery; @RestController public class DaqController { @@ -29,4 +31,12 @@ public class DaqController { writer.writeAsync(1, 0, new ChannelEvent("dummyChannel", pulseId, 0, pulseId, pulseId, 0, value)); } + + @RequestMapping(value = "/pulserange") + public void pulseRange(@RequestBody PulseRangeQuery query) { + + logger.info("PulseRangeQuery received: {}", query); + + + } } diff --git a/src/main/java/ch/psi/daq/rest/DaqRestApplication.java b/src/main/java/ch/psi/daq/rest/DaqRestApplication.java index e2a965f..d068c75 100644 --- a/src/main/java/ch/psi/daq/rest/DaqRestApplication.java +++ b/src/main/java/ch/psi/daq/rest/DaqRestApplication.java @@ -10,53 +10,52 @@ import org.springframework.context.annotation.Import; import ch.psi.daq.cassandra.config.CassandraConfig; /** - * Entry point to our rest-frontend of the Swissfel application which most importantly wires all the @RestController annotated classes. + * Entry point to our rest-frontend of the Swissfel application which most importantly wires all the @RestController + * annotated classes. *

* - * This acts as a @Configuration class for Spring. As such it has @ComponentScan - * annotation that enables scanning for another Spring components in current - * package and its subpackages. + * This acts as a @Configuration class for Spring. As such it has @ComponentScan annotation that + * enables scanning for another Spring components in current package and its subpackages. *

- * Another annotation is @EnableAutoConfiguration which tells Spring Boot to run - * autoconfiguration. + * Another annotation is @EnableAutoConfiguration which tells Spring Boot to run autoconfiguration. *

- * It also extends SpringBootServletInitializer which will configure Spring - * servlet for us, and overrides the configure() method to point to itself, so - * Spring can find the main configuration. + * It also extends SpringBootServletInitializer which will configure Spring servlet for us, and + * overrides the configure() method to point to itself, so Spring can find the main configuration. *

- * Finally, the main() method consists of single static call to - * SpringApplication.run(). + * Finally, the main() method consists of single static call to SpringApplication.run(). *

- * Methods annotated with @Bean are Java beans that are - * container-managed, i.e. managed by Spring. Whenever there are @Autowire, @Inject - * or similar annotations found in the code (which is being scanned through the @ComponentScan - * annotation), the container then knows how to create those beans and inject - * them accordingly. + * Methods annotated with @Bean are Java beans that are container-managed, i.e. managed by Spring. + * Whenever there are @Autowire, @Inject or similar annotations found in the code (which is being + * scanned through the @ComponentScan annotation), the container then knows how to create those + * beans and inject them accordingly. */ @SpringBootApplication -//@Import(CassandraConfig.class) // either define the context to be imported, or see ComponentScan comment below +//@Import(CassandraConfig.class) // either define the context to be imported, or see ComponentScan +// comment below @ComponentScan(basePackages = { - "ch.psi.daq.rest", - "ch.psi.daq.cassandra.config", // define the package name with the CassandraConfig configuration, or @Import it (see above) - "ch.psi.daq.cassandra.reader", - "ch.psi.daq.cassandra.writer" + "ch.psi.daq.rest", + "ch.psi.daq.cassandra.config", // define the package name with the CassandraConfig + // configuration, or @Import it (see above) + "ch.psi.daq.cassandra.reader", + "ch.psi.daq.cassandra.writer" }) public class DaqRestApplication extends SpringBootServletInitializer { - - public static void main(final String[] args) { - SpringApplication.run(DaqRestApplication.class, args); - } - @Override - protected final SpringApplicationBuilder configure(final SpringApplicationBuilder application) { - return application.sources(DaqRestApplication.class); - } - - // a nested configuration - // this guarantees that the ordering of the properties file is as expected - // see: https://jira.spring.io/browse/SPR-10409?focusedCommentId=101393&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-101393 -// @Configuration -// @Import(CassandraConfig.class) -// static class InnerConfiguration { } + public static void main(final String[] args) { + SpringApplication.run(DaqRestApplication.class, args); + } + + @Override + protected final SpringApplicationBuilder configure(final SpringApplicationBuilder application) { + return application.sources(DaqRestApplication.class); + } + + // a nested configuration + // this guarantees that the ordering of the properties file is as expected + // see: + // https://jira.spring.io/browse/SPR-10409?focusedCommentId=101393&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-101393 + // @Configuration + // @Import(CassandraConfig.class) + // static class InnerConfiguration { } } diff --git a/src/main/java/ch/psi/daq/rest/model/Aggregation.java b/src/main/java/ch/psi/daq/rest/model/Aggregation.java new file mode 100644 index 0000000..5adb060 --- /dev/null +++ b/src/main/java/ch/psi/daq/rest/model/Aggregation.java @@ -0,0 +1,40 @@ +package ch.psi.daq.rest.model; + +import org.apache.commons.lang3.builder.ToStringBuilder; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +public class Aggregation { + + private String fieldRef; + private AggregationEnum type; + private String resultFieldName; + + @JsonCreator + public Aggregation( + @JsonProperty(value = "fieldRef") String fieldRef, + @JsonProperty(value = "type") AggregationEnum type, + @JsonProperty(value = "resultFieldName") String resultFieldName) { + this.fieldRef = fieldRef; + this.type = type; + this.resultFieldName = resultFieldName; + } + + public String getFieldRef() { + return fieldRef; + } + + public AggregationEnum getType() { + return type; + } + + public String getResultFieldName() { + return resultFieldName; + } + + public String toString() { + return ToStringBuilder.reflectionToString(this); + } + +} diff --git a/src/main/java/ch/psi/daq/rest/model/AggregationEnum.java b/src/main/java/ch/psi/daq/rest/model/AggregationEnum.java new file mode 100644 index 0000000..d32eed6 --- /dev/null +++ b/src/main/java/ch/psi/daq/rest/model/AggregationEnum.java @@ -0,0 +1,30 @@ +package ch.psi.daq.rest.model; + +public enum AggregationEnum { + /* mean value of values (of an array) */ + mean, + + /* min value of values (of an array) */ + min, + + /* max value of values (of an array) */ + max, + + /* sum of values (of an array) */ + sum, + + /* number of values (of an array) aggregated into a value */ + count, + + /* variance of values (of an array) */ + variance, + + /* standard deviation */ + std_dev, + + /* skewness */ + skewness, + + /* kurtosis */ + kurtosis +} diff --git a/src/main/java/ch/psi/daq/rest/queries/AbstractQuery.java b/src/main/java/ch/psi/daq/rest/queries/AbstractQuery.java new file mode 100644 index 0000000..b0edb29 --- /dev/null +++ b/src/main/java/ch/psi/daq/rest/queries/AbstractQuery.java @@ -0,0 +1,148 @@ +package ch.psi.daq.rest.queries; + +import java.util.Collections; +import java.util.LinkedHashSet; +import java.util.List; + +import org.apache.commons.lang3.builder.ReflectionToStringBuilder; + +import ch.psi.daq.cassandra.reader.Ordering; +import ch.psi.daq.hazelcast.common.query.Query; +import ch.psi.daq.hazelcast.common.query.ValueAggregation; +import ch.psi.daq.hazelcast.common.query.bin.BinIntervalCalculator; +import ch.psi.daq.rest.model.Aggregation; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonSubTypes.Type; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import com.sun.istack.internal.NotNull; + +/** + * + * @author zellweger_c + * + */ +@JsonTypeInfo( + use = JsonTypeInfo.Id.NAME, + include = JsonTypeInfo.As.PROPERTY, + property = "queryType") +@JsonSubTypes( +{ + @Type(value = PulseRangeQuery.class, name = "pulserange"), + @Type(value = TimeRangeQuery.class, name = "timerange"), +}) +public abstract class AbstractQuery implements Query { + + @NotNull + private boolean isOrderDescending; + + @NotNull + private List channels; + + @NotNull + private LinkedHashSet fields; + + @NotNull + private List aggregations; + + private ValueAggregation valueAggregation; + + private boolean aggregateChannels; + + /** + * + * @param isOrderDescending whether to add a 'orderBy' clause into the database query + * @param channelIds all the channelIds (channel names) we want to query + * @param fields the fields (who map to fields in the DB) we are interested in returning to the + * client, needs to be in insertion order (hence the {@link LinkedHashSet} type) + */ + @JsonCreator + public AbstractQuery( + // note that those annotations are needed for the polymorphic + // mapping to work correctly + @JsonProperty(value = "isOrderDescending") boolean isOrderDescending, + @JsonProperty(value = "channels") List channelIds, + @JsonProperty(value = "fields") LinkedHashSet fields, + @JsonProperty(value = "aggregateChannels") boolean aggregateChannels, + @JsonProperty(value = "aggregationType") ValueAggregation valueAggregation, + @JsonProperty(value = "aggregations") List aggregations) { + + this.isOrderDescending = isOrderDescending; + this.aggregateChannels = aggregateChannels; + this.valueAggregation = valueAggregation; + + if (channelIds == null || fields == null) { + throw new IllegalArgumentException("sourceIds and/or fields cannot be null."); + } + + this.channels = channelIds; + this.fields = fields; + this.aggregations = aggregations; + } + + /** + * {@inheritDoc} + */ + @Override + public List getChannels() { + return Collections.unmodifiableList(channels); + } + + /** + * {@inheritDoc} + */ + @Override + public Ordering getOrdering() { + if (isOrderDescending) { + return Ordering.DESC; + } + return Ordering.ASC; + } + + /** + * {@inheritDoc} + */ + @Override + public ValueAggregation getValueAggregation() { + return valueAggregation; + } + + /** + * {@inheritDoc} + */ + @Override + public boolean aggregateChannels() { + return isAggregateChannels(); + } + + @Override + public BinIntervalCalculator getBinIntervalCalculator() { + // TODO Auto-generated method stub + return null; + } + + + public boolean isAggregateChannels() { + return aggregateChannels; + } + + public boolean isOrderDescending() { + return isOrderDescending; + } + + public LinkedHashSet getFields() { + return fields; + } + + public List getAggregations() { + return aggregations; + } + + @Override + public String toString() { + return ReflectionToStringBuilder.reflectionToString(this); + } + +} diff --git a/src/main/java/ch/psi/daq/rest/queries/PulseRangeQuery.java b/src/main/java/ch/psi/daq/rest/queries/PulseRangeQuery.java new file mode 100644 index 0000000..5eeae67 --- /dev/null +++ b/src/main/java/ch/psi/daq/rest/queries/PulseRangeQuery.java @@ -0,0 +1,66 @@ +package ch.psi.daq.rest.queries; + +import java.util.LinkedHashSet; +import java.util.List; + +import org.apache.commons.lang.builder.ToStringBuilder; + +import ch.psi.daq.hazelcast.common.query.ValueAggregation; +import ch.psi.daq.rest.model.Aggregation; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +/** + * + */ +public class PulseRangeQuery extends AbstractQuery { + + private long startPulseId; + private long endPulseId; + + @JsonCreator + public PulseRangeQuery( + // note that those annotations are needed for the polymorphic + // mapping to work correctly + @JsonProperty(value = "isOrderDescending") boolean isOrderDescending, + @JsonProperty(value = "channels") List channels, + @JsonProperty(value = "fields") LinkedHashSet fields, + @JsonProperty(value = "aggregateChannels") boolean aggregateChannels, + @JsonProperty(value = "aggregationType") ValueAggregation valueAggregation, + @JsonProperty(value = "aggregations") List aggregations, + @JsonProperty(value = "startPulseId") long startPulseId, + @JsonProperty(value = "endPulseId") long endPulseId) + { + + super(isOrderDescending, channels, fields, aggregateChannels, valueAggregation, aggregations); + + this.startPulseId = startPulseId; + this.endPulseId = endPulseId; + } + + public long getStartPulseId() { + return startPulseId; + } + + public void setStartPulseId(long startPulseId) { + this.startPulseId = startPulseId; + } + + public long getEndPulseId() { + return endPulseId; + } + + public void setEndPulseId(long endPulseId) { + this.endPulseId = endPulseId; + } + + /** + * {@inheritDoc} + */ + @Override + public String toString() { + return ToStringBuilder.reflectionToString(this); + } + +} diff --git a/src/main/java/ch/psi/daq/rest/queries/TimeRangeQuery.java b/src/main/java/ch/psi/daq/rest/queries/TimeRangeQuery.java new file mode 100644 index 0000000..f6fbbb2 --- /dev/null +++ b/src/main/java/ch/psi/daq/rest/queries/TimeRangeQuery.java @@ -0,0 +1,168 @@ +package ch.psi.daq.rest.queries; + +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.UUID; +import java.util.stream.Collectors; + +import javax.validation.constraints.NotNull; + +import org.apache.commons.lang.builder.ToStringBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import ch.psi.daq.domain.cassandra.FieldName; +import ch.psi.daq.domain.utils.UUIDGen; +import ch.psi.daq.hazelcast.common.query.ValueAggregation; +import ch.psi.daq.rest.model.Aggregation; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; + +public class TimeRangeQuery extends AbstractQuery { + + + private static final String DATE_FORMAT_STRING = "yyyy/MM/dd hh:mm:ss.SSS"; + private static final SimpleDateFormat DATE_FORMAT = new SimpleDateFormat(DATE_FORMAT_STRING); + private static Logger logger = LoggerFactory.getLogger(TimeRangeQuery.class); + + @NotNull + private long start; + @NotNull + private long startNanoOffset; + @JsonIgnore + private UUID startTime; + + @NotNull + private long end; + @NotNull + private long endNanoOffset; + + @JsonIgnore + private UUID endTime; + + @JsonIgnore + private String[] columns; + + + + /** + * + * @param isOrderDescending + * @param channelIds all the sourceIds (channel names) we want to query + * @param fields the fields (who map to fields in the DB) we are interested in returning to the + * client, needs to be in insertion order (hence the {@link LinkedHashSet} type) + * @param startMillis + * @param startNanoOffset + * @param endMillis + * @param endNanoOffset + */ + @JsonCreator + public TimeRangeQuery( + // note that those annotations are needed for the polymorphic + // mapping to work correctly + @JsonProperty(value = "isOrderDescending") boolean isOrderDescending, + @JsonProperty(value = "channels") List channels, + @JsonProperty(value = "fields") LinkedHashSet fields, + @JsonProperty(value = "aggregateChannels") boolean aggregateChannels, + @JsonProperty(value = "aggregationType") ValueAggregation valueAggregation, + @JsonProperty(value = "aggregations") List aggregations, + @JsonProperty(value = "start") long startMillis, + @JsonProperty(value = "startNanoOffset") long startNanoOffset, + @JsonProperty(value = "startDateTime") String startDateTime, + @JsonProperty(value = "end") long endMillis, + @JsonProperty(value = "endNanoOffset") long endNanoOffset, + @JsonProperty(value = "endDateTime") String endDateTime) { + + super(isOrderDescending, channels, fields, aggregateChannels, valueAggregation, aggregations); + + this.start = startMillis; + this.startNanoOffset = startNanoOffset; + this.end = endMillis; + this.endNanoOffset = endNanoOffset; + + + if (startDateTime != null && endDateTime != null) { + logger.info("startDateTime and endDateTime specified. This takes precedence over the start / end fields."); + try { + Date startDate = DATE_FORMAT.parse(startDateTime); + Date endDate = DATE_FORMAT.parse(endDateTime); + + this.start = startDate.getTime(); + this.end = endDate.getTime(); + } catch (ParseException e) { + logger.error("Parsing the start- and/or endDate was unsuccessful. " + + "The format must be '" + DATE_FORMAT_STRING + "'", e); + } + } + + this.startTime = UUIDGen.getMinTime(this.start, this.startNanoOffset); + this.endTime = UUIDGen.getMaxTime(this.end, this.endNanoOffset); + + /** + * This will map to the columns in the database + */ + this.columns = fields.stream() + .map(f -> FieldName.valueOf(f)) + .collect(Collectors.toList()) + .toArray(new String[] {}); + } + + public long getStart() { + return start; + } + + public void setStart(long start) { + this.start = start; + } + + public long getStartNanoOffset() { + return startNanoOffset; + } + + public void setStartNanoOffset(long startNanoOffset) { + this.startNanoOffset = startNanoOffset; + } + + public long getEnd() { + return end; + } + + public void setEnd(long end) { + this.end = end; + } + + public long getEndNanoOffset() { + return endNanoOffset; + } + + public void setEndNanoOffset(long endNanoOffset) { + this.endNanoOffset = endNanoOffset; + } + + public UUID getStartTime() { + return startTime; + } + + public UUID getEndTime() { + return endTime; + } + + public String[] getColumns() { + return columns; + } + + /** + * {@inheritDoc} + */ + @Override + public String toString() { + return ToStringBuilder.reflectionToString(this); + } + + +}