ATEST-304
This commit is contained in:
@ -6,32 +6,32 @@ package ch.psi.daq.test.queryrest.query;
|
||||
import java.util.List;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.regex.Pattern;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.LongStream;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
import ch.psi.daq.cassandra.reader.CassandraReader;
|
||||
import ch.psi.daq.cassandra.reader.query.PulseIdRangeQuery;
|
||||
import ch.psi.daq.cassandra.reader.query.TimeRangeQuery;
|
||||
import ch.psi.daq.cassandra.util.test.CassandraDataGen;
|
||||
import ch.psi.daq.common.ordering.Ordering;
|
||||
import ch.psi.daq.domain.DataEvent;
|
||||
import ch.psi.daq.domain.cassandra.ChannelConfiguration;
|
||||
import ch.psi.daq.domain.cassandra.ChannelEvent;
|
||||
import ch.psi.daq.domain.cassandra.MetaPulseId;
|
||||
import ch.psi.daq.domain.cassandra.MetaTime;
|
||||
import ch.psi.daq.domain.cassandra.query.PulseIdRangeQuery;
|
||||
import ch.psi.daq.domain.cassandra.query.TimeRangeQuery;
|
||||
import ch.psi.daq.domain.cassandra.querying.ChannelEventQuery;
|
||||
import ch.psi.daq.domain.cassandra.querying.ChannelEventQueryInfo;
|
||||
import ch.psi.daq.domain.cassandra.querying.EventQuery;
|
||||
import ch.psi.data.converters.ConverterProvider;
|
||||
|
||||
/**
|
||||
* @author zellweger_c
|
||||
*
|
||||
*/
|
||||
public class DummyCassandraReader implements CassandraReader {
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(DummyCassandraReader.class);
|
||||
|
||||
private static final int KEYSPACE = 1;
|
||||
private CassandraDataGen dataGen;
|
||||
@ -160,6 +160,7 @@ public class DummyCassandraReader implements CassandraReader {
|
||||
i,
|
||||
i * 10,
|
||||
0,
|
||||
KEYSPACE,
|
||||
value
|
||||
);
|
||||
|
||||
@ -176,6 +177,7 @@ public class DummyCassandraReader implements CassandraReader {
|
||||
i,
|
||||
i * 10,
|
||||
0,
|
||||
KEYSPACE,
|
||||
value,
|
||||
shape
|
||||
);
|
||||
@ -187,6 +189,7 @@ public class DummyCassandraReader implements CassandraReader {
|
||||
i,
|
||||
i * 10,
|
||||
0,
|
||||
KEYSPACE,
|
||||
i
|
||||
);
|
||||
}
|
||||
@ -235,6 +238,7 @@ public class DummyCassandraReader implements CassandraReader {
|
||||
*/
|
||||
@Override
|
||||
public CompletableFuture<ChannelEvent> getEventAsync(ChannelEventQueryInfo queryInfo, String... columns) {
|
||||
// implement when needed
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@ -246,8 +250,6 @@ public class DummyCassandraReader implements CassandraReader {
|
||||
|
||||
return dataGen.generateMetaPulseId(
|
||||
KEYSPACE,
|
||||
ConverterProvider.TYPE_INT32,
|
||||
null,
|
||||
query.getStartPulseId(),
|
||||
(query.getEndPulseId() - query.getStartPulseId() + 1),
|
||||
i -> i * 10,
|
||||
@ -264,8 +266,6 @@ public class DummyCassandraReader implements CassandraReader {
|
||||
|
||||
return dataGen.generateMetaTime(
|
||||
KEYSPACE,
|
||||
ConverterProvider.TYPE_INT32,
|
||||
null,
|
||||
query.getStartMillis() / 10,
|
||||
((query.getEndMillis() - query.getStartMillis()) / 10 + 1),
|
||||
i -> i * 10,
|
||||
@ -291,11 +291,32 @@ public class DummyCassandraReader implements CassandraReader {
|
||||
* @{inheritDoc
|
||||
*/
|
||||
@Override
|
||||
public Stream<MetaTime> getMetaTimeStream(TimeRangeQuery query) {
|
||||
public Stream<? extends ChannelEventQueryInfo> getMetaTimeStream(TimeRangeQuery query) {
|
||||
|
||||
return getChannelEventQueryStream(query).map(r -> {
|
||||
return (MetaTime) r;
|
||||
return (ChannelEventQueryInfo) r;
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public Stream<ChannelEvent> getEventStream(Stream<? extends ChannelEventQueryInfo> queryInfos) {
|
||||
return getEventStream(null, queryInfos);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelConfiguration getChannelConfiguration(String channel) {
|
||||
try {
|
||||
return getChannelConfigurationAsync(channel)
|
||||
.get(30, TimeUnit.SECONDS);
|
||||
} catch (Throwable t) {
|
||||
LOGGER.error("Could not read ChannelConfiguration from DB.", t);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<ChannelConfiguration> getChannelConfigurationAsync(String channel) {
|
||||
// implement when needed
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
}
|
||||
|
@ -59,6 +59,7 @@ public class DummyDataReader implements DataReader {
|
||||
i,
|
||||
i * 10,
|
||||
0,
|
||||
1,
|
||||
value
|
||||
);
|
||||
|
||||
@ -75,6 +76,7 @@ public class DummyDataReader implements DataReader {
|
||||
i,
|
||||
i * 10,
|
||||
0,
|
||||
1,
|
||||
value,
|
||||
shape
|
||||
);
|
||||
@ -86,6 +88,7 @@ public class DummyDataReader implements DataReader {
|
||||
i,
|
||||
i * 10,
|
||||
0,
|
||||
1,
|
||||
i
|
||||
);
|
||||
}
|
||||
|
Reference in New Issue
Block a user