diff --git a/.project b/.project
index 08328ec..56f8001 100644
--- a/.project
+++ b/.project
@@ -5,11 +5,6 @@
-
- org.eclipse.wst.common.project.facet.core.builder
-
-
-
org.eclipse.jdt.core.javabuilder
@@ -25,6 +20,5 @@
org.springframework.ide.eclipse.core.springnature
org.springsource.ide.eclipse.gradle.core.nature
org.eclipse.jdt.core.javanature
- org.eclipse.wst.common.project.facet.core.nature
diff --git a/.settings/org.eclipse.jdt.core.prefs b/.settings/org.eclipse.jdt.core.prefs
index 8807bcd..b6bc4d5 100644
--- a/.settings/org.eclipse.jdt.core.prefs
+++ b/.settings/org.eclipse.jdt.core.prefs
@@ -1,5 +1,5 @@
#
-#Thu Dec 17 10:45:31 CET 2015
+#Fri Jan 15 11:32:52 CET 2016
org.eclipse.jdt.core.compiler.debug.localVariable=generate
org.eclipse.jdt.core.compiler.compliance=1.8
org.eclipse.jdt.core.compiler.codegen.unusedLocal=preserve
diff --git a/src/test/java/ch/psi/daq/test/queryrest/query/DummyCassandraReader.java b/src/test/java/ch/psi/daq/test/queryrest/query/DummyCassandraReader.java
index 7496ac7..769e035 100644
--- a/src/test/java/ch/psi/daq/test/queryrest/query/DummyCassandraReader.java
+++ b/src/test/java/ch/psi/daq/test/queryrest/query/DummyCassandraReader.java
@@ -6,32 +6,32 @@ package ch.psi.daq.test.queryrest.query;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
import java.util.stream.Stream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import com.google.common.collect.Lists;
import ch.psi.daq.cassandra.reader.CassandraReader;
-import ch.psi.daq.cassandra.reader.query.PulseIdRangeQuery;
-import ch.psi.daq.cassandra.reader.query.TimeRangeQuery;
import ch.psi.daq.cassandra.util.test.CassandraDataGen;
import ch.psi.daq.common.ordering.Ordering;
import ch.psi.daq.domain.DataEvent;
+import ch.psi.daq.domain.cassandra.ChannelConfiguration;
import ch.psi.daq.domain.cassandra.ChannelEvent;
import ch.psi.daq.domain.cassandra.MetaPulseId;
-import ch.psi.daq.domain.cassandra.MetaTime;
+import ch.psi.daq.domain.cassandra.query.PulseIdRangeQuery;
+import ch.psi.daq.domain.cassandra.query.TimeRangeQuery;
import ch.psi.daq.domain.cassandra.querying.ChannelEventQuery;
-import ch.psi.daq.domain.cassandra.querying.ChannelEventQueryInfo;
+import ch.psi.daq.domain.cassandra.querying.MetaChannelEvent;
import ch.psi.daq.domain.cassandra.querying.EventQuery;
-import ch.psi.data.converters.ConverterProvider;
-/**
- * @author zellweger_c
- *
- */
public class DummyCassandraReader implements CassandraReader {
+ private static final Logger LOGGER = LoggerFactory.getLogger(DummyCassandraReader.class);
private static final int KEYSPACE = 1;
private CassandraDataGen dataGen;
@@ -89,7 +89,7 @@ public class DummyCassandraReader implements CassandraReader {
*/
@Override
public Stream extends DataEvent> getEventStream(String channel, long startPulseId, long endPulseId,
- Ordering ordering, String... columns) {
+ Ordering ordering, boolean aggregateValues, String... columns) {
return getDummyEventStream(channel, startPulseId, endPulseId);
}
@@ -98,7 +98,7 @@ public class DummyCassandraReader implements CassandraReader {
*/
@Override
public Stream extends DataEvent> getEventStream(String channel, long startMillis, long startNanos, long endMillis,
- long endNanos, Ordering ordering, String... columns) {
+ long endNanos, Ordering ordering, boolean aggregateValues, String... columns) {
return getDummyEventStream(channel, startMillis / 10, endMillis / 10);
}
@@ -108,8 +108,8 @@ public class DummyCassandraReader implements CassandraReader {
@Override
public Stream getEventStream(EventQuery eventQuery, Stream extends ChannelEventQuery> queryProviders) {
Stream result = queryProviders.map(ceq -> {
- if (ceq instanceof ChannelEventQueryInfo) {
- return getEvent((ChannelEventQueryInfo) ceq);
+ if (ceq instanceof MetaChannelEvent) {
+ return getEvent((MetaChannelEvent) ceq);
} else {
throw new UnsupportedOperationException("This is not yet implemented!");
}
@@ -160,6 +160,7 @@ public class DummyCassandraReader implements CassandraReader {
i,
i * 10,
0,
+ KEYSPACE,
value
);
@@ -176,6 +177,7 @@ public class DummyCassandraReader implements CassandraReader {
i,
i * 10,
0,
+ KEYSPACE,
value,
shape
);
@@ -187,6 +189,7 @@ public class DummyCassandraReader implements CassandraReader {
i,
i * 10,
0,
+ KEYSPACE,
i
);
}
@@ -221,7 +224,7 @@ public class DummyCassandraReader implements CassandraReader {
* @{inheritDoc
*/
@Override
- public ChannelEvent getEvent(ChannelEventQueryInfo queryInfo, String... columns) {
+ public ChannelEvent getEvent(MetaChannelEvent queryInfo, String... columns) {
if (queryInfo.getPulseId() > 0) {
return (ChannelEvent) getDummyEvents(queryInfo.getChannel(), queryInfo.getPulseId(), queryInfo.getPulseId())
.get(0);
@@ -234,7 +237,8 @@ public class DummyCassandraReader implements CassandraReader {
* @{inheritDoc
*/
@Override
- public CompletableFuture getEventAsync(ChannelEventQueryInfo queryInfo, String... columns) {
+ public CompletableFuture getEventAsync(MetaChannelEvent queryInfo, String... columns) {
+ // implement when needed
throw new UnsupportedOperationException();
}
@@ -245,15 +249,17 @@ public class DummyCassandraReader implements CassandraReader {
public Stream extends ChannelEventQuery> getChannelEventQueryStream(PulseIdRangeQuery query) {
return dataGen.generateMetaPulseId(
- KEYSPACE,
- ConverterProvider.TYPE_INT32,
- null,
query.getStartPulseId(),
(query.getEndPulseId() - query.getStartPulseId() + 1),
i -> i * 10,
i -> 0,
i -> i,
- query.getChannel()).stream();
+ query.getChannel())
+ .stream()
+ .map(metaPulse -> {
+ metaPulse.setKeyspace(KEYSPACE);
+ return metaPulse;
+ });
}
/**
@@ -264,8 +270,6 @@ public class DummyCassandraReader implements CassandraReader {
return dataGen.generateMetaTime(
KEYSPACE,
- ConverterProvider.TYPE_INT32,
- null,
query.getStartMillis() / 10,
((query.getEndMillis() - query.getStartMillis()) / 10 + 1),
i -> i * 10,
@@ -278,7 +282,7 @@ public class DummyCassandraReader implements CassandraReader {
* @{inheritDoc
*/
@Override
- public Stream getMetaPulseIdStream(PulseIdRangeQuery query) {
+ public Stream getMetaStream(PulseIdRangeQuery query) {
return getChannelEventQueryStream(query).map(r -> {
return (MetaPulseId) r;
@@ -291,11 +295,55 @@ public class DummyCassandraReader implements CassandraReader {
* @{inheritDoc
*/
@Override
- public Stream getMetaTimeStream(TimeRangeQuery query) {
+ public Stream extends MetaChannelEvent> getMetaStream(TimeRangeQuery query) {
return getChannelEventQueryStream(query).map(r -> {
- return (MetaTime) r;
+ return (MetaChannelEvent) r;
});
}
+ @Override
+ public Stream getEventStream(Stream extends MetaChannelEvent> queryInfos) {
+ return getEventStream(null, queryInfos);
+ }
+
+ @Override
+ public Stream getChannelConfiguration(TimeRangeQuery query) {
+ // implement when needed
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ChannelConfiguration getChannelConfigurationBefore(TimeRangeQuery query) {
+ try {
+ return getChannelConfigurationBeforeAsync(query)
+ .get(30, TimeUnit.SECONDS);
+ } catch (Throwable t) {
+ LOGGER.error("Could not read ChannelConfiguration from DB.", t);
+ return null;
+ }
+ }
+
+ @Override
+ public CompletableFuture getChannelConfigurationBeforeAsync(TimeRangeQuery query) {
+ // implement when needed
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ChannelConfiguration getChannelConfigurationAfter(TimeRangeQuery query) {
+ try {
+ return getChannelConfigurationAfterAsync(query)
+ .get(30, TimeUnit.SECONDS);
+ } catch (Throwable t) {
+ LOGGER.error("Could not read ChannelConfiguration from DB.", t);
+ return null;
+ }
+ }
+
+ @Override
+ public CompletableFuture getChannelConfigurationAfterAsync(TimeRangeQuery query) {
+ // implement when needed
+ throw new UnsupportedOperationException();
+ }
}
diff --git a/src/test/java/ch/psi/daq/test/queryrest/query/DummyDataReader.java b/src/test/java/ch/psi/daq/test/queryrest/query/DummyDataReader.java
index 6620757..4aee732 100644
--- a/src/test/java/ch/psi/daq/test/queryrest/query/DummyDataReader.java
+++ b/src/test/java/ch/psi/daq/test/queryrest/query/DummyDataReader.java
@@ -35,13 +35,13 @@ public class DummyDataReader implements DataReader {
@Override
public Stream extends DataEvent> getEventStream(String channel, long startPulseId, long endPulseId,
- Ordering ordering, String... columns) {
+ Ordering ordering, boolean aggregateValues, String... columns) {
return getElements(channel, startPulseId, endPulseId);
}
@Override
public Stream extends DataEvent> getEventStream(String channel, long startMillis, long startNanos, long endMillis,
- long endNanos, Ordering ordering, String... columns) {
+ long endNanos, Ordering ordering, boolean aggregateValues, String... columns) {
return getElements(channel, startMillis / 10, endMillis / 10);
}
@@ -59,6 +59,7 @@ public class DummyDataReader implements DataReader {
i,
i * 10,
0,
+ 1,
value
);
@@ -75,6 +76,7 @@ public class DummyDataReader implements DataReader {
i,
i * 10,
0,
+ 1,
value,
shape
);
@@ -86,6 +88,7 @@ public class DummyDataReader implements DataReader {
i,
i * 10,
0,
+ 1,
i
);
}