diff --git a/src/test/java/ch/psi/daq/test/queryrest/query/DummyCassandraReader.java b/src/test/java/ch/psi/daq/test/queryrest/query/DummyCassandraReader.java index 2af8c81..715fd2a 100644 --- a/src/test/java/ch/psi/daq/test/queryrest/query/DummyCassandraReader.java +++ b/src/test/java/ch/psi/daq/test/queryrest/query/DummyCassandraReader.java @@ -27,7 +27,7 @@ import ch.psi.daq.domain.cassandra.MetaPulseId; import ch.psi.daq.domain.cassandra.query.PulseIdRangeQuery; import ch.psi.daq.domain.cassandra.query.TimeRangeQuery; import ch.psi.daq.domain.cassandra.querying.ChannelEventQuery; -import ch.psi.daq.domain.cassandra.querying.ChannelEventQueryInfo; +import ch.psi.daq.domain.cassandra.querying.MetaChannelEvent; import ch.psi.daq.domain.cassandra.querying.EventQuery; public class DummyCassandraReader implements CassandraReader { @@ -108,8 +108,8 @@ public class DummyCassandraReader implements CassandraReader { @Override public Stream getEventStream(EventQuery eventQuery, Stream queryProviders) { Stream result = queryProviders.map(ceq -> { - if (ceq instanceof ChannelEventQueryInfo) { - return getEvent((ChannelEventQueryInfo) ceq); + if (ceq instanceof MetaChannelEvent) { + return getEvent((MetaChannelEvent) ceq); } else { throw new UnsupportedOperationException("This is not yet implemented!"); } @@ -224,7 +224,7 @@ public class DummyCassandraReader implements CassandraReader { * @{inheritDoc */ @Override - public ChannelEvent getEvent(ChannelEventQueryInfo queryInfo, String... columns) { + public ChannelEvent getEvent(MetaChannelEvent queryInfo, String... columns) { if (queryInfo.getPulseId() > 0) { return (ChannelEvent) getDummyEvents(queryInfo.getChannel(), queryInfo.getPulseId(), queryInfo.getPulseId()) .get(0); @@ -237,8 +237,8 @@ public class DummyCassandraReader implements CassandraReader { * @{inheritDoc */ @Override - public CompletableFuture getEventAsync(ChannelEventQueryInfo queryInfo, String... columns) { - // implement when needed + public CompletableFuture getEventAsync(MetaChannelEvent queryInfo, String... columns) { + // implement when needed throw new UnsupportedOperationException(); } @@ -249,7 +249,6 @@ public class DummyCassandraReader implements CassandraReader { public Stream getChannelEventQueryStream(PulseIdRangeQuery query) { return dataGen.generateMetaPulseId( - KEYSPACE, query.getStartPulseId(), (query.getEndPulseId() - query.getStartPulseId() + 1), i -> i * 10, @@ -278,7 +277,7 @@ public class DummyCassandraReader implements CassandraReader { * @{inheritDoc */ @Override - public Stream getMetaPulseIdStream(PulseIdRangeQuery query) { + public Stream getMetaStream(PulseIdRangeQuery query) { return getChannelEventQueryStream(query).map(r -> { return (MetaPulseId) r; @@ -291,22 +290,28 @@ public class DummyCassandraReader implements CassandraReader { * @{inheritDoc */ @Override - public Stream getMetaTimeStream(TimeRangeQuery query) { + public Stream getMetaStream(TimeRangeQuery query) { return getChannelEventQueryStream(query).map(r -> { - return (ChannelEventQueryInfo) r; + return (MetaChannelEvent) r; }); } @Override - public Stream getEventStream(Stream queryInfos) { + public Stream getEventStream(Stream queryInfos) { return getEventStream(null, queryInfos); } @Override - public ChannelConfiguration getChannelConfiguration(String channel) { + public Stream getChannelConfiguration(TimeRangeQuery query) { + // implement when needed + throw new UnsupportedOperationException(); + } + + @Override + public ChannelConfiguration getChannelConfigurationBefore(TimeRangeQuery query) { try { - return getChannelConfigurationAsync(channel) + return getChannelConfigurationBeforeAsync(query) .get(30, TimeUnit.SECONDS); } catch (Throwable t) { LOGGER.error("Could not read ChannelConfiguration from DB.", t); @@ -315,7 +320,24 @@ public class DummyCassandraReader implements CassandraReader { } @Override - public CompletableFuture getChannelConfigurationAsync(String channel) { + public CompletableFuture getChannelConfigurationBeforeAsync(TimeRangeQuery query) { + // implement when needed + throw new UnsupportedOperationException(); + } + + @Override + public ChannelConfiguration getChannelConfigurationAfter(TimeRangeQuery query) { + try { + return getChannelConfigurationAfterAsync(query) + .get(30, TimeUnit.SECONDS); + } catch (Throwable t) { + LOGGER.error("Could not read ChannelConfiguration from DB.", t); + return null; + } + } + + @Override + public CompletableFuture getChannelConfigurationAfterAsync(TimeRangeQuery query) { // implement when needed throw new UnsupportedOperationException(); }