ATEST-304
This commit is contained in:
@ -27,7 +27,7 @@ import ch.psi.daq.domain.cassandra.MetaPulseId;
|
|||||||
import ch.psi.daq.domain.cassandra.query.PulseIdRangeQuery;
|
import ch.psi.daq.domain.cassandra.query.PulseIdRangeQuery;
|
||||||
import ch.psi.daq.domain.cassandra.query.TimeRangeQuery;
|
import ch.psi.daq.domain.cassandra.query.TimeRangeQuery;
|
||||||
import ch.psi.daq.domain.cassandra.querying.ChannelEventQuery;
|
import ch.psi.daq.domain.cassandra.querying.ChannelEventQuery;
|
||||||
import ch.psi.daq.domain.cassandra.querying.ChannelEventQueryInfo;
|
import ch.psi.daq.domain.cassandra.querying.MetaChannelEvent;
|
||||||
import ch.psi.daq.domain.cassandra.querying.EventQuery;
|
import ch.psi.daq.domain.cassandra.querying.EventQuery;
|
||||||
|
|
||||||
public class DummyCassandraReader implements CassandraReader {
|
public class DummyCassandraReader implements CassandraReader {
|
||||||
@ -108,8 +108,8 @@ public class DummyCassandraReader implements CassandraReader {
|
|||||||
@Override
|
@Override
|
||||||
public Stream<ChannelEvent> getEventStream(EventQuery eventQuery, Stream<? extends ChannelEventQuery> queryProviders) {
|
public Stream<ChannelEvent> getEventStream(EventQuery eventQuery, Stream<? extends ChannelEventQuery> queryProviders) {
|
||||||
Stream<ChannelEvent> result = queryProviders.map(ceq -> {
|
Stream<ChannelEvent> result = queryProviders.map(ceq -> {
|
||||||
if (ceq instanceof ChannelEventQueryInfo) {
|
if (ceq instanceof MetaChannelEvent) {
|
||||||
return getEvent((ChannelEventQueryInfo) ceq);
|
return getEvent((MetaChannelEvent) ceq);
|
||||||
} else {
|
} else {
|
||||||
throw new UnsupportedOperationException("This is not yet implemented!");
|
throw new UnsupportedOperationException("This is not yet implemented!");
|
||||||
}
|
}
|
||||||
@ -224,7 +224,7 @@ public class DummyCassandraReader implements CassandraReader {
|
|||||||
* @{inheritDoc
|
* @{inheritDoc
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public ChannelEvent getEvent(ChannelEventQueryInfo queryInfo, String... columns) {
|
public ChannelEvent getEvent(MetaChannelEvent queryInfo, String... columns) {
|
||||||
if (queryInfo.getPulseId() > 0) {
|
if (queryInfo.getPulseId() > 0) {
|
||||||
return (ChannelEvent) getDummyEvents(queryInfo.getChannel(), queryInfo.getPulseId(), queryInfo.getPulseId())
|
return (ChannelEvent) getDummyEvents(queryInfo.getChannel(), queryInfo.getPulseId(), queryInfo.getPulseId())
|
||||||
.get(0);
|
.get(0);
|
||||||
@ -237,8 +237,8 @@ public class DummyCassandraReader implements CassandraReader {
|
|||||||
* @{inheritDoc
|
* @{inheritDoc
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public CompletableFuture<ChannelEvent> getEventAsync(ChannelEventQueryInfo queryInfo, String... columns) {
|
public CompletableFuture<ChannelEvent> getEventAsync(MetaChannelEvent queryInfo, String... columns) {
|
||||||
// implement when needed
|
// implement when needed
|
||||||
throw new UnsupportedOperationException();
|
throw new UnsupportedOperationException();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -249,7 +249,6 @@ public class DummyCassandraReader implements CassandraReader {
|
|||||||
public Stream<? extends ChannelEventQuery> getChannelEventQueryStream(PulseIdRangeQuery query) {
|
public Stream<? extends ChannelEventQuery> getChannelEventQueryStream(PulseIdRangeQuery query) {
|
||||||
|
|
||||||
return dataGen.generateMetaPulseId(
|
return dataGen.generateMetaPulseId(
|
||||||
KEYSPACE,
|
|
||||||
query.getStartPulseId(),
|
query.getStartPulseId(),
|
||||||
(query.getEndPulseId() - query.getStartPulseId() + 1),
|
(query.getEndPulseId() - query.getStartPulseId() + 1),
|
||||||
i -> i * 10,
|
i -> i * 10,
|
||||||
@ -278,7 +277,7 @@ public class DummyCassandraReader implements CassandraReader {
|
|||||||
* @{inheritDoc
|
* @{inheritDoc
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public Stream<MetaPulseId> getMetaPulseIdStream(PulseIdRangeQuery query) {
|
public Stream<MetaPulseId> getMetaStream(PulseIdRangeQuery query) {
|
||||||
|
|
||||||
return getChannelEventQueryStream(query).map(r -> {
|
return getChannelEventQueryStream(query).map(r -> {
|
||||||
return (MetaPulseId) r;
|
return (MetaPulseId) r;
|
||||||
@ -291,22 +290,28 @@ public class DummyCassandraReader implements CassandraReader {
|
|||||||
* @{inheritDoc
|
* @{inheritDoc
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public Stream<? extends ChannelEventQueryInfo> getMetaTimeStream(TimeRangeQuery query) {
|
public Stream<? extends MetaChannelEvent> getMetaStream(TimeRangeQuery query) {
|
||||||
|
|
||||||
return getChannelEventQueryStream(query).map(r -> {
|
return getChannelEventQueryStream(query).map(r -> {
|
||||||
return (ChannelEventQueryInfo) r;
|
return (MetaChannelEvent) r;
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Stream<ChannelEvent> getEventStream(Stream<? extends ChannelEventQueryInfo> queryInfos) {
|
public Stream<ChannelEvent> getEventStream(Stream<? extends MetaChannelEvent> queryInfos) {
|
||||||
return getEventStream(null, queryInfos);
|
return getEventStream(null, queryInfos);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ChannelConfiguration getChannelConfiguration(String channel) {
|
public Stream<ChannelConfiguration> getChannelConfiguration(TimeRangeQuery query) {
|
||||||
|
// implement when needed
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ChannelConfiguration getChannelConfigurationBefore(TimeRangeQuery query) {
|
||||||
try {
|
try {
|
||||||
return getChannelConfigurationAsync(channel)
|
return getChannelConfigurationBeforeAsync(query)
|
||||||
.get(30, TimeUnit.SECONDS);
|
.get(30, TimeUnit.SECONDS);
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
LOGGER.error("Could not read ChannelConfiguration from DB.", t);
|
LOGGER.error("Could not read ChannelConfiguration from DB.", t);
|
||||||
@ -315,7 +320,24 @@ public class DummyCassandraReader implements CassandraReader {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public CompletableFuture<ChannelConfiguration> getChannelConfigurationAsync(String channel) {
|
public CompletableFuture<ChannelConfiguration> getChannelConfigurationBeforeAsync(TimeRangeQuery query) {
|
||||||
|
// implement when needed
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ChannelConfiguration getChannelConfigurationAfter(TimeRangeQuery query) {
|
||||||
|
try {
|
||||||
|
return getChannelConfigurationAfterAsync(query)
|
||||||
|
.get(30, TimeUnit.SECONDS);
|
||||||
|
} catch (Throwable t) {
|
||||||
|
LOGGER.error("Could not read ChannelConfiguration from DB.", t);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public CompletableFuture<ChannelConfiguration> getChannelConfigurationAfterAsync(TimeRangeQuery query) {
|
||||||
// implement when needed
|
// implement when needed
|
||||||
throw new UnsupportedOperationException();
|
throw new UnsupportedOperationException();
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user