From 84885412824b54ccba5c4e42919411f0970f37d9 Mon Sep 17 00:00:00 2001 From: Zellweger Christof Ralf Date: Tue, 27 Oct 2015 15:41:42 +0100 Subject: [PATCH] ATEST-259: - trying to implement dummy cassandra reader - fixing some of the tests --- .../controller/QueryRestController.java | 72 +++++- .../daq/test/queryrest/DaqWebMvcConfig.java | 10 +- .../queryrest/query/DummyCassandraReader.java | 224 ++++++++++++++++++ 3 files changed, 302 insertions(+), 4 deletions(-) create mode 100644 src/test/java/ch/psi/daq/test/queryrest/query/DummyCassandraReader.java diff --git a/src/main/java/ch/psi/daq/queryrest/controller/QueryRestController.java b/src/main/java/ch/psi/daq/queryrest/controller/QueryRestController.java index ab518a7..4abb185 100644 --- a/src/main/java/ch/psi/daq/queryrest/controller/QueryRestController.java +++ b/src/main/java/ch/psi/daq/queryrest/controller/QueryRestController.java @@ -2,9 +2,11 @@ package ch.psi.daq.queryrest.controller; import java.io.IOException; import java.util.Collection; +import java.util.List; import java.util.Map.Entry; import java.util.Set; import java.util.function.Function; +import java.util.stream.Collectors; import java.util.stream.Stream; import javax.annotation.Resource; @@ -27,9 +29,11 @@ import org.springframework.web.bind.annotation.RestController; import ch.psi.daq.cassandra.request.validate.RequestProviderValidator; import ch.psi.daq.cassandra.util.test.CassandraDataGen; import ch.psi.daq.common.json.deserialize.AttributeBasedDeserializer; +import ch.psi.daq.common.ordering.Ordering; import ch.psi.daq.domain.DataEvent; import ch.psi.daq.query.analyzer.QueryAnalyzer; import ch.psi.daq.query.model.Aggregation; +import ch.psi.daq.query.model.AggregationType; import ch.psi.daq.query.model.DBMode; import ch.psi.daq.query.model.Query; import ch.psi.daq.query.model.QueryField; @@ -39,6 +43,8 @@ import ch.psi.daq.queryrest.config.QueryRestConfig; import ch.psi.daq.queryrest.model.ChannelsRequest; import ch.psi.daq.queryrest.response.ResponseStreamWriter; +import com.google.common.collect.Lists; + @RestController public class QueryRestController { @@ -114,6 +120,68 @@ public class QueryRestController { return getChannels(new ChannelsRequest(channelName)); } + + /** + * Returns the current list of {@link Ordering}s available. + * + * @return list of {@link Ordering}s as String array + */ + @RequestMapping(value = "ordering", method = {RequestMethod.GET}, + produces = {MediaType.APPLICATION_JSON_VALUE}) + public @ResponseBody List getOrderingValues() { + List orderings = Lists.newArrayList(Ordering.values()); + return orderings.stream() + .map((Ordering ord) -> { + return ord.toString(); + }).collect(Collectors.toList()); + } + + /** + * Returns the current list of {@link QueryField}s available. + * + * @return list of {@link QueryField}s as String array + */ + @RequestMapping(value = "queryfields", method = {RequestMethod.GET}, + produces = {MediaType.APPLICATION_JSON_VALUE}) + public @ResponseBody List getQueryFieldValues() { + List orderings = Lists.newArrayList(QueryField.values()); + return orderings.stream() + .map((QueryField qf) -> { + return qf.toString(); + }).collect(Collectors.toList()); + } + + /** + * Returns the current list of {@link Aggregation}s available. + * + * @return list of {@link Aggregation}s as String array + */ + @RequestMapping(value = "aggregations", method = {RequestMethod.GET}, + produces = {MediaType.APPLICATION_JSON_VALUE}) + public @ResponseBody List getAggregationsValues() { + List orderings = Lists.newArrayList(Aggregation.values()); + return orderings.stream() + .map((Aggregation value) -> { + return value.toString(); + }).collect(Collectors.toList()); + } + + /** + * Returns the current list of {@link AggregationType}s available. + * + * @return list of {@link AggregationType}s as String array + */ + @RequestMapping(value = "aggregationtypes", method = {RequestMethod.GET}, + produces = {MediaType.APPLICATION_JSON_VALUE}) + public @ResponseBody List getAggregationTypesValues() { + List orderings = Lists.newArrayList(AggregationType.values()); + return orderings.stream() + .map((AggregationType value) -> { + return value.toString(); + }).collect(Collectors.toList()); + } + + /** * Catch-all query method for getting data from the backend. *

@@ -143,8 +211,8 @@ public class QueryRestController { QueryAnalyzer queryAnalizer = queryAnalizerFactory.apply(query); // all the magic happens here - Stream>> channelToDataEvents = getQueryProcessor(query.getDbMode()) - .process(queryAnalizer); + Stream>> channelToDataEvents = + getQueryProcessor(query.getDbMode()).process(queryAnalizer); // do post-process Stream> channelToData = queryAnalizer.postProcess(channelToDataEvents); diff --git a/src/test/java/ch/psi/daq/test/queryrest/DaqWebMvcConfig.java b/src/test/java/ch/psi/daq/test/queryrest/DaqWebMvcConfig.java index 9bc39dd..7b0a34a 100644 --- a/src/test/java/ch/psi/daq/test/queryrest/DaqWebMvcConfig.java +++ b/src/test/java/ch/psi/daq/test/queryrest/DaqWebMvcConfig.java @@ -9,12 +9,14 @@ import org.springframework.context.annotation.PropertySources; import org.springframework.web.servlet.config.annotation.EnableWebMvc; import org.springframework.web.servlet.config.annotation.WebMvcConfigurationSupport; +import ch.psi.daq.cassandra.reader.CassandraReader; import ch.psi.daq.domain.reader.DataReader; import ch.psi.daq.query.processor.QueryProcessor; import ch.psi.daq.query.processor.QueryProcessorLocal; import ch.psi.daq.test.cassandra.admin.CassandraTestAdmin; import ch.psi.daq.test.cassandra.admin.CassandraTestAdminImpl; import ch.psi.daq.test.query.config.LocalQueryTestConfig; +import ch.psi.daq.test.queryrest.query.DummyCassandraReader; import ch.psi.daq.test.queryrest.query.DummyDataReader; @Configuration @@ -32,8 +34,8 @@ public class DaqWebMvcConfig extends WebMvcConfigurationSupport { } @Bean - public QueryProcessor queryProcessor() { - return new QueryProcessorLocal(new DummyDataReader()); + public QueryProcessor cassandraQueryProcessorLocal() { + return new QueryProcessorLocal(dataReader()); } @Bean @@ -41,6 +43,10 @@ public class DaqWebMvcConfig extends WebMvcConfigurationSupport { return new DummyDataReader(); } + @Bean CassandraReader cassandraReader() { + return new DummyCassandraReader(); + } + @Bean public CassandraTestAdmin cassandraTestAdmin() { return new CassandraTestAdminImpl(); diff --git a/src/test/java/ch/psi/daq/test/queryrest/query/DummyCassandraReader.java b/src/test/java/ch/psi/daq/test/queryrest/query/DummyCassandraReader.java new file mode 100644 index 0000000..a95a2e9 --- /dev/null +++ b/src/test/java/ch/psi/daq/test/queryrest/query/DummyCassandraReader.java @@ -0,0 +1,224 @@ +/** + * + */ +package ch.psi.daq.test.queryrest.query; + +import java.awt.image.MultiPixelPackedSampleModel; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import org.apache.commons.lang.NotImplementedException; + +import ch.psi.daq.cassandra.reader.CassandraReader; +import ch.psi.daq.cassandra.reader.query.PulseIdRangeQuery; +import ch.psi.daq.cassandra.reader.query.TimeRangeQuery; +import ch.psi.daq.cassandra.util.test.CassandraDataGen; +import ch.psi.daq.common.ordering.Ordering; +import ch.psi.daq.domain.DataEvent; +import ch.psi.daq.domain.cassandra.ChannelEvent; +import ch.psi.daq.domain.cassandra.MetaPulseId; +import ch.psi.daq.domain.cassandra.MetaTime; +import ch.psi.daq.domain.cassandra.querying.ChannelEventQuery; +import ch.psi.daq.domain.cassandra.querying.ChannelEventQueryInfo; +import ch.psi.daq.domain.cassandra.querying.EventQuery; + +import com.google.common.collect.Lists; + +/** + * @author zellweger_c + * + */ +public class DummyCassandraReader implements CassandraReader { + + private static final int KEYSPACE = 1; + private CassandraDataGen dataGen; + private String[] channels; + + + /** + * + */ + public DummyCassandraReader() { + this.dataGen = new CassandraDataGen(); + this.channels = new String[]{ + "testChannel1", + "testChannel2", + "BooleanScalar", + "BooleanWaveform", + "ByteScalar", + "ByteWaveform", + "DoubleScalar", + "DoubleWaveform", + "FloatScalar", + "FloatWaveform", + "IntegerScalar", + "IntegerWaveform", + "LongScalar", + "LongWaveform", + "ShortScalar", + "ShortWaveform", + "StringScalar", + "UByteScalar", + "UByteWaveform", + "UIntegerScalar", + "UIntegerWaveform", + "ULongScalar", + "ULongWaveform", + "UShortScalar", + "UShortWaveform"}; + } + /** + * @{inheritDoc} + */ + @Override + public Stream getChannelStream(String regex) { + return Stream.of(channels); + } + + /** + * @{inheritDoc} + */ + @Override + public Stream getEventStream(String channel, long startPulseId, long endPulseId, + Ordering ordering, String... columns) { + return getDummyEventStream(channel, startPulseId, endPulseId); + } + + /** + * @{inheritDoc} + */ + @Override + public Stream getEventStream(String channel, long startMillis, long startNanos, long endMillis, + long endNanos, Ordering ordering, String... columns) { + return getDummyEventStream(channel, startMillis, endMillis); + } + + /** + * @{inheritDoc} + */ + @Override + public Stream getEventStream(EventQuery eventQuery, Stream queryProviders) { + List result = Lists.newArrayList(); + queryProviders.forEach(ceq -> { + if (ceq instanceof ChannelEventQueryInfo) { + result.add(getEvent((ChannelEventQueryInfo) ceq)); + } else { + throw new NotImplementedException("This is not yet implemented!"); + } + }); + return result.stream(); + } + /** + * @{inheritDoc} + */ + @Override + public Stream getEventStream(PulseIdRangeQuery query) { + Stream dummyEventStream = getDummyEventStream(query.getChannel(), query.getStartPulseId(), query.getEndPulseId()) + .map(ce -> { return (ChannelEvent) ce; }); + return dummyEventStream; + + } + /** + * @{inheritDoc} + */ + @Override + public Stream getEventStream(TimeRangeQuery query) { + Stream dummyEventStream = getDummyEventStream(query.getChannel(), query.getStartMillis(), query.getEndMillis()) + .map(ce -> { return (ChannelEvent) ce; }); + return dummyEventStream; + } + + + private Stream getDummyEventStream(String channel, long startIndex, long endIndex) { + return dataGen.generateData(startIndex, (endIndex-startIndex), channel).stream(); + } + + /** + * @{inheritDoc} + */ + @Override + public List getChannels() { + return Lists.newArrayList(channels); + } + + /** + * @{inheritDoc} + */ + @Override + public List getChannels(String regex) { + return Lists.newArrayList(channels).stream().filter(s -> { return s.contains(regex); }).collect(Collectors.toList()); + } + + /** + * @{inheritDoc} + */ + @Override + public ChannelEvent getEvent(ChannelEventQueryInfo queryInfo, String... columns) { + if (queryInfo.getPulseId() > 0) { + return dataGen.generateData(queryInfo.getPulseId(), 1, queryInfo.getChannel()).get(0); + } + return dataGen.generateData(queryInfo.getGlobalMillis(), 1, queryInfo.getChannel()).get(0); + } + + /** + * @{inheritDoc} + */ + @Override + public CompletableFuture getEventAsync(ChannelEventQueryInfo queryInfo, String... columns) { + throw new NotImplementedException(); + } + + /** + * @{inheritDoc} + */ + @Override + public Stream getChannelEventQueryStream(PulseIdRangeQuery query) { + return dataGen.generateMetaPulseId( + KEYSPACE, + query.getStartPulseId(), + (query.getEndPulseId() - query.getStartPulseId()), + query.getChannel()).stream(); + } + + /** + * @{inheritDoc} + */ + @Override + public Stream getChannelEventQueryStream(TimeRangeQuery query) { + return dataGen.generateMetaTime( + KEYSPACE, + query.getStartMillis(), + (query.getEndMillis() - query.getStartMillis()), + query.getChannel()) + .stream() + .map(e -> { + return (ChannelEventQuery) e; + }); + } + + /** + * @{inheritDoc} + */ + @Override + public Stream getMetaPulseIdStream(PulseIdRangeQuery query) { + return dataGen.generateMetaPulseId( + KEYSPACE, + query.getStartPulseId(), (query.getEndPulseId() - query.getStartPulseId()), + query.getChannel()).stream(); + } + + + /** + * @{inheritDoc} + */ + @Override + public Stream getMetaTimeStream(TimeRangeQuery query) { + return dataGen.generateMetaTime( + KEYSPACE, + query.getStartMillis(), (query.getEndMillis() - query.getStartMillis()), + query.getChannel()).stream(); + } + +}