From a90ececbe0a5d47b06ce326c6e2a391673df869d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fabian=20M=C3=A4rki?= Date: Fri, 15 Jan 2016 10:04:02 +0100 Subject: [PATCH 1/5] ATEST-304 --- .../queryrest/query/DummyCassandraReader.java | 49 +++++++++++++------ .../test/queryrest/query/DummyDataReader.java | 3 ++ 2 files changed, 38 insertions(+), 14 deletions(-) diff --git a/src/test/java/ch/psi/daq/test/queryrest/query/DummyCassandraReader.java b/src/test/java/ch/psi/daq/test/queryrest/query/DummyCassandraReader.java index 7496ac7..67bf735 100644 --- a/src/test/java/ch/psi/daq/test/queryrest/query/DummyCassandraReader.java +++ b/src/test/java/ch/psi/daq/test/queryrest/query/DummyCassandraReader.java @@ -6,32 +6,32 @@ package ch.psi.daq.test.queryrest.query; import java.util.List; import java.util.Random; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; import java.util.stream.Collectors; import java.util.stream.LongStream; import java.util.stream.Stream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import com.google.common.collect.Lists; import ch.psi.daq.cassandra.reader.CassandraReader; -import ch.psi.daq.cassandra.reader.query.PulseIdRangeQuery; -import ch.psi.daq.cassandra.reader.query.TimeRangeQuery; import ch.psi.daq.cassandra.util.test.CassandraDataGen; import ch.psi.daq.common.ordering.Ordering; import ch.psi.daq.domain.DataEvent; +import ch.psi.daq.domain.cassandra.ChannelConfiguration; import ch.psi.daq.domain.cassandra.ChannelEvent; import ch.psi.daq.domain.cassandra.MetaPulseId; -import ch.psi.daq.domain.cassandra.MetaTime; +import ch.psi.daq.domain.cassandra.query.PulseIdRangeQuery; +import ch.psi.daq.domain.cassandra.query.TimeRangeQuery; import ch.psi.daq.domain.cassandra.querying.ChannelEventQuery; import ch.psi.daq.domain.cassandra.querying.ChannelEventQueryInfo; import ch.psi.daq.domain.cassandra.querying.EventQuery; -import ch.psi.data.converters.ConverterProvider; -/** - * @author zellweger_c - * - */ public class DummyCassandraReader implements CassandraReader { + private static final Logger LOGGER = LoggerFactory.getLogger(DummyCassandraReader.class); private static final int KEYSPACE = 1; private CassandraDataGen dataGen; @@ -160,6 +160,7 @@ public class DummyCassandraReader implements CassandraReader { i, i * 10, 0, + KEYSPACE, value ); @@ -176,6 +177,7 @@ public class DummyCassandraReader implements CassandraReader { i, i * 10, 0, + KEYSPACE, value, shape ); @@ -187,6 +189,7 @@ public class DummyCassandraReader implements CassandraReader { i, i * 10, 0, + KEYSPACE, i ); } @@ -235,6 +238,7 @@ public class DummyCassandraReader implements CassandraReader { */ @Override public CompletableFuture getEventAsync(ChannelEventQueryInfo queryInfo, String... columns) { + // implement when needed throw new UnsupportedOperationException(); } @@ -246,8 +250,6 @@ public class DummyCassandraReader implements CassandraReader { return dataGen.generateMetaPulseId( KEYSPACE, - ConverterProvider.TYPE_INT32, - null, query.getStartPulseId(), (query.getEndPulseId() - query.getStartPulseId() + 1), i -> i * 10, @@ -264,8 +266,6 @@ public class DummyCassandraReader implements CassandraReader { return dataGen.generateMetaTime( KEYSPACE, - ConverterProvider.TYPE_INT32, - null, query.getStartMillis() / 10, ((query.getEndMillis() - query.getStartMillis()) / 10 + 1), i -> i * 10, @@ -291,11 +291,32 @@ public class DummyCassandraReader implements CassandraReader { * @{inheritDoc */ @Override - public Stream getMetaTimeStream(TimeRangeQuery query) { + public Stream getMetaTimeStream(TimeRangeQuery query) { return getChannelEventQueryStream(query).map(r -> { - return (MetaTime) r; + return (ChannelEventQueryInfo) r; }); } + @Override + public Stream getEventStream(Stream queryInfos) { + return getEventStream(null, queryInfos); + } + + @Override + public ChannelConfiguration getChannelConfiguration(String channel) { + try { + return getChannelConfigurationAsync(channel) + .get(30, TimeUnit.SECONDS); + } catch (Throwable t) { + LOGGER.error("Could not read ChannelConfiguration from DB.", t); + return null; + } + } + + @Override + public CompletableFuture getChannelConfigurationAsync(String channel) { + // implement when needed + throw new UnsupportedOperationException(); + } } diff --git a/src/test/java/ch/psi/daq/test/queryrest/query/DummyDataReader.java b/src/test/java/ch/psi/daq/test/queryrest/query/DummyDataReader.java index 6620757..c86c6d8 100644 --- a/src/test/java/ch/psi/daq/test/queryrest/query/DummyDataReader.java +++ b/src/test/java/ch/psi/daq/test/queryrest/query/DummyDataReader.java @@ -59,6 +59,7 @@ public class DummyDataReader implements DataReader { i, i * 10, 0, + 1, value ); @@ -75,6 +76,7 @@ public class DummyDataReader implements DataReader { i, i * 10, 0, + 1, value, shape ); @@ -86,6 +88,7 @@ public class DummyDataReader implements DataReader { i, i * 10, 0, + 1, i ); } From 61c4efed110be3638024d67d7a27e683f8fda440 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fabian=20M=C3=A4rki?= Date: Fri, 15 Jan 2016 14:29:32 +0100 Subject: [PATCH 2/5] Whatever --- .project | 6 ------ .settings/org.eclipse.jdt.core.prefs | 2 +- 2 files changed, 1 insertion(+), 7 deletions(-) diff --git a/.project b/.project index 08328ec..56f8001 100644 --- a/.project +++ b/.project @@ -5,11 +5,6 @@ - - org.eclipse.wst.common.project.facet.core.builder - - - org.eclipse.jdt.core.javabuilder @@ -25,6 +20,5 @@ org.springframework.ide.eclipse.core.springnature org.springsource.ide.eclipse.gradle.core.nature org.eclipse.jdt.core.javanature - org.eclipse.wst.common.project.facet.core.nature diff --git a/.settings/org.eclipse.jdt.core.prefs b/.settings/org.eclipse.jdt.core.prefs index 8807bcd..b6bc4d5 100644 --- a/.settings/org.eclipse.jdt.core.prefs +++ b/.settings/org.eclipse.jdt.core.prefs @@ -1,5 +1,5 @@ # -#Thu Dec 17 10:45:31 CET 2015 +#Fri Jan 15 11:32:52 CET 2016 org.eclipse.jdt.core.compiler.debug.localVariable=generate org.eclipse.jdt.core.compiler.compliance=1.8 org.eclipse.jdt.core.compiler.codegen.unusedLocal=preserve From 79a350b9e50d681f3d162e3f04169f1f1083861d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fabian=20M=C3=A4rki?= Date: Tue, 19 Jan 2016 16:23:07 +0100 Subject: [PATCH 3/5] ATEST-304 ChannelEvent with Statistics --- .../ch/psi/daq/test/queryrest/query/DummyCassandraReader.java | 4 ++-- .../java/ch/psi/daq/test/queryrest/query/DummyDataReader.java | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/test/java/ch/psi/daq/test/queryrest/query/DummyCassandraReader.java b/src/test/java/ch/psi/daq/test/queryrest/query/DummyCassandraReader.java index 67bf735..2af8c81 100644 --- a/src/test/java/ch/psi/daq/test/queryrest/query/DummyCassandraReader.java +++ b/src/test/java/ch/psi/daq/test/queryrest/query/DummyCassandraReader.java @@ -89,7 +89,7 @@ public class DummyCassandraReader implements CassandraReader { */ @Override public Stream getEventStream(String channel, long startPulseId, long endPulseId, - Ordering ordering, String... columns) { + Ordering ordering, boolean aggregateValues, String... columns) { return getDummyEventStream(channel, startPulseId, endPulseId); } @@ -98,7 +98,7 @@ public class DummyCassandraReader implements CassandraReader { */ @Override public Stream getEventStream(String channel, long startMillis, long startNanos, long endMillis, - long endNanos, Ordering ordering, String... columns) { + long endNanos, Ordering ordering, boolean aggregateValues, String... columns) { return getDummyEventStream(channel, startMillis / 10, endMillis / 10); } diff --git a/src/test/java/ch/psi/daq/test/queryrest/query/DummyDataReader.java b/src/test/java/ch/psi/daq/test/queryrest/query/DummyDataReader.java index c86c6d8..4aee732 100644 --- a/src/test/java/ch/psi/daq/test/queryrest/query/DummyDataReader.java +++ b/src/test/java/ch/psi/daq/test/queryrest/query/DummyDataReader.java @@ -35,13 +35,13 @@ public class DummyDataReader implements DataReader { @Override public Stream getEventStream(String channel, long startPulseId, long endPulseId, - Ordering ordering, String... columns) { + Ordering ordering, boolean aggregateValues, String... columns) { return getElements(channel, startPulseId, endPulseId); } @Override public Stream getEventStream(String channel, long startMillis, long startNanos, long endMillis, - long endNanos, Ordering ordering, String... columns) { + long endNanos, Ordering ordering, boolean aggregateValues, String... columns) { return getElements(channel, startMillis / 10, endMillis / 10); } From 3730bf84d9b11f527f3a673c4bd13f3051e61f78 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fabian=20M=C3=A4rki?= Date: Mon, 25 Jan 2016 09:38:58 +0100 Subject: [PATCH 4/5] ATEST-304 --- .../queryrest/query/DummyCassandraReader.java | 50 +++++++++++++------ 1 file changed, 36 insertions(+), 14 deletions(-) diff --git a/src/test/java/ch/psi/daq/test/queryrest/query/DummyCassandraReader.java b/src/test/java/ch/psi/daq/test/queryrest/query/DummyCassandraReader.java index 2af8c81..715fd2a 100644 --- a/src/test/java/ch/psi/daq/test/queryrest/query/DummyCassandraReader.java +++ b/src/test/java/ch/psi/daq/test/queryrest/query/DummyCassandraReader.java @@ -27,7 +27,7 @@ import ch.psi.daq.domain.cassandra.MetaPulseId; import ch.psi.daq.domain.cassandra.query.PulseIdRangeQuery; import ch.psi.daq.domain.cassandra.query.TimeRangeQuery; import ch.psi.daq.domain.cassandra.querying.ChannelEventQuery; -import ch.psi.daq.domain.cassandra.querying.ChannelEventQueryInfo; +import ch.psi.daq.domain.cassandra.querying.MetaChannelEvent; import ch.psi.daq.domain.cassandra.querying.EventQuery; public class DummyCassandraReader implements CassandraReader { @@ -108,8 +108,8 @@ public class DummyCassandraReader implements CassandraReader { @Override public Stream getEventStream(EventQuery eventQuery, Stream queryProviders) { Stream result = queryProviders.map(ceq -> { - if (ceq instanceof ChannelEventQueryInfo) { - return getEvent((ChannelEventQueryInfo) ceq); + if (ceq instanceof MetaChannelEvent) { + return getEvent((MetaChannelEvent) ceq); } else { throw new UnsupportedOperationException("This is not yet implemented!"); } @@ -224,7 +224,7 @@ public class DummyCassandraReader implements CassandraReader { * @{inheritDoc */ @Override - public ChannelEvent getEvent(ChannelEventQueryInfo queryInfo, String... columns) { + public ChannelEvent getEvent(MetaChannelEvent queryInfo, String... columns) { if (queryInfo.getPulseId() > 0) { return (ChannelEvent) getDummyEvents(queryInfo.getChannel(), queryInfo.getPulseId(), queryInfo.getPulseId()) .get(0); @@ -237,8 +237,8 @@ public class DummyCassandraReader implements CassandraReader { * @{inheritDoc */ @Override - public CompletableFuture getEventAsync(ChannelEventQueryInfo queryInfo, String... columns) { - // implement when needed + public CompletableFuture getEventAsync(MetaChannelEvent queryInfo, String... columns) { + // implement when needed throw new UnsupportedOperationException(); } @@ -249,7 +249,6 @@ public class DummyCassandraReader implements CassandraReader { public Stream getChannelEventQueryStream(PulseIdRangeQuery query) { return dataGen.generateMetaPulseId( - KEYSPACE, query.getStartPulseId(), (query.getEndPulseId() - query.getStartPulseId() + 1), i -> i * 10, @@ -278,7 +277,7 @@ public class DummyCassandraReader implements CassandraReader { * @{inheritDoc */ @Override - public Stream getMetaPulseIdStream(PulseIdRangeQuery query) { + public Stream getMetaStream(PulseIdRangeQuery query) { return getChannelEventQueryStream(query).map(r -> { return (MetaPulseId) r; @@ -291,22 +290,28 @@ public class DummyCassandraReader implements CassandraReader { * @{inheritDoc */ @Override - public Stream getMetaTimeStream(TimeRangeQuery query) { + public Stream getMetaStream(TimeRangeQuery query) { return getChannelEventQueryStream(query).map(r -> { - return (ChannelEventQueryInfo) r; + return (MetaChannelEvent) r; }); } @Override - public Stream getEventStream(Stream queryInfos) { + public Stream getEventStream(Stream queryInfos) { return getEventStream(null, queryInfos); } @Override - public ChannelConfiguration getChannelConfiguration(String channel) { + public Stream getChannelConfiguration(TimeRangeQuery query) { + // implement when needed + throw new UnsupportedOperationException(); + } + + @Override + public ChannelConfiguration getChannelConfigurationBefore(TimeRangeQuery query) { try { - return getChannelConfigurationAsync(channel) + return getChannelConfigurationBeforeAsync(query) .get(30, TimeUnit.SECONDS); } catch (Throwable t) { LOGGER.error("Could not read ChannelConfiguration from DB.", t); @@ -315,7 +320,24 @@ public class DummyCassandraReader implements CassandraReader { } @Override - public CompletableFuture getChannelConfigurationAsync(String channel) { + public CompletableFuture getChannelConfigurationBeforeAsync(TimeRangeQuery query) { + // implement when needed + throw new UnsupportedOperationException(); + } + + @Override + public ChannelConfiguration getChannelConfigurationAfter(TimeRangeQuery query) { + try { + return getChannelConfigurationAfterAsync(query) + .get(30, TimeUnit.SECONDS); + } catch (Throwable t) { + LOGGER.error("Could not read ChannelConfiguration from DB.", t); + return null; + } + } + + @Override + public CompletableFuture getChannelConfigurationAfterAsync(TimeRangeQuery query) { // implement when needed throw new UnsupportedOperationException(); } From faab4fb43903db6c02e63761c71d024852b493e3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fabian=20M=C3=A4rki?= Date: Tue, 26 Jan 2016 11:20:47 +0100 Subject: [PATCH 5/5] Fix tests. --- .../psi/daq/test/queryrest/query/DummyCassandraReader.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/test/java/ch/psi/daq/test/queryrest/query/DummyCassandraReader.java b/src/test/java/ch/psi/daq/test/queryrest/query/DummyCassandraReader.java index 715fd2a..769e035 100644 --- a/src/test/java/ch/psi/daq/test/queryrest/query/DummyCassandraReader.java +++ b/src/test/java/ch/psi/daq/test/queryrest/query/DummyCassandraReader.java @@ -254,7 +254,12 @@ public class DummyCassandraReader implements CassandraReader { i -> i * 10, i -> 0, i -> i, - query.getChannel()).stream(); + query.getChannel()) + .stream() + .map(metaPulse -> { + metaPulse.setKeyspace(KEYSPACE); + return metaPulse; + }); } /**