Merge branch 'ATEST-304' into 'master'

Atest 304



See merge request !4
This commit is contained in:
maerki_f
2016-01-26 12:25:23 +01:00
4 changed files with 78 additions and 33 deletions

View File

@ -5,11 +5,6 @@
<projects> <projects>
</projects> </projects>
<buildSpec> <buildSpec>
<buildCommand>
<name>org.eclipse.wst.common.project.facet.core.builder</name>
<arguments>
</arguments>
</buildCommand>
<buildCommand> <buildCommand>
<name>org.eclipse.jdt.core.javabuilder</name> <name>org.eclipse.jdt.core.javabuilder</name>
<arguments> <arguments>
@ -25,6 +20,5 @@
<nature>org.springframework.ide.eclipse.core.springnature</nature> <nature>org.springframework.ide.eclipse.core.springnature</nature>
<nature>org.springsource.ide.eclipse.gradle.core.nature</nature> <nature>org.springsource.ide.eclipse.gradle.core.nature</nature>
<nature>org.eclipse.jdt.core.javanature</nature> <nature>org.eclipse.jdt.core.javanature</nature>
<nature>org.eclipse.wst.common.project.facet.core.nature</nature>
</natures> </natures>
</projectDescription> </projectDescription>

View File

@ -1,5 +1,5 @@
# #
#Thu Dec 17 10:45:31 CET 2015 #Fri Jan 15 11:32:52 CET 2016
org.eclipse.jdt.core.compiler.debug.localVariable=generate org.eclipse.jdt.core.compiler.debug.localVariable=generate
org.eclipse.jdt.core.compiler.compliance=1.8 org.eclipse.jdt.core.compiler.compliance=1.8
org.eclipse.jdt.core.compiler.codegen.unusedLocal=preserve org.eclipse.jdt.core.compiler.codegen.unusedLocal=preserve

View File

@ -6,32 +6,32 @@ package ch.psi.daq.test.queryrest.query;
import java.util.List; import java.util.List;
import java.util.Random; import java.util.Random;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.LongStream; import java.util.stream.LongStream;
import java.util.stream.Stream; import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import ch.psi.daq.cassandra.reader.CassandraReader; import ch.psi.daq.cassandra.reader.CassandraReader;
import ch.psi.daq.cassandra.reader.query.PulseIdRangeQuery;
import ch.psi.daq.cassandra.reader.query.TimeRangeQuery;
import ch.psi.daq.cassandra.util.test.CassandraDataGen; import ch.psi.daq.cassandra.util.test.CassandraDataGen;
import ch.psi.daq.common.ordering.Ordering; import ch.psi.daq.common.ordering.Ordering;
import ch.psi.daq.domain.DataEvent; import ch.psi.daq.domain.DataEvent;
import ch.psi.daq.domain.cassandra.ChannelConfiguration;
import ch.psi.daq.domain.cassandra.ChannelEvent; import ch.psi.daq.domain.cassandra.ChannelEvent;
import ch.psi.daq.domain.cassandra.MetaPulseId; import ch.psi.daq.domain.cassandra.MetaPulseId;
import ch.psi.daq.domain.cassandra.MetaTime; import ch.psi.daq.domain.cassandra.query.PulseIdRangeQuery;
import ch.psi.daq.domain.cassandra.query.TimeRangeQuery;
import ch.psi.daq.domain.cassandra.querying.ChannelEventQuery; import ch.psi.daq.domain.cassandra.querying.ChannelEventQuery;
import ch.psi.daq.domain.cassandra.querying.ChannelEventQueryInfo; import ch.psi.daq.domain.cassandra.querying.MetaChannelEvent;
import ch.psi.daq.domain.cassandra.querying.EventQuery; import ch.psi.daq.domain.cassandra.querying.EventQuery;
import ch.psi.data.converters.ConverterProvider;
/**
* @author zellweger_c
*
*/
public class DummyCassandraReader implements CassandraReader { public class DummyCassandraReader implements CassandraReader {
private static final Logger LOGGER = LoggerFactory.getLogger(DummyCassandraReader.class);
private static final int KEYSPACE = 1; private static final int KEYSPACE = 1;
private CassandraDataGen dataGen; private CassandraDataGen dataGen;
@ -89,7 +89,7 @@ public class DummyCassandraReader implements CassandraReader {
*/ */
@Override @Override
public Stream<? extends DataEvent> getEventStream(String channel, long startPulseId, long endPulseId, public Stream<? extends DataEvent> getEventStream(String channel, long startPulseId, long endPulseId,
Ordering ordering, String... columns) { Ordering ordering, boolean aggregateValues, String... columns) {
return getDummyEventStream(channel, startPulseId, endPulseId); return getDummyEventStream(channel, startPulseId, endPulseId);
} }
@ -98,7 +98,7 @@ public class DummyCassandraReader implements CassandraReader {
*/ */
@Override @Override
public Stream<? extends DataEvent> getEventStream(String channel, long startMillis, long startNanos, long endMillis, public Stream<? extends DataEvent> getEventStream(String channel, long startMillis, long startNanos, long endMillis,
long endNanos, Ordering ordering, String... columns) { long endNanos, Ordering ordering, boolean aggregateValues, String... columns) {
return getDummyEventStream(channel, startMillis / 10, endMillis / 10); return getDummyEventStream(channel, startMillis / 10, endMillis / 10);
} }
@ -108,8 +108,8 @@ public class DummyCassandraReader implements CassandraReader {
@Override @Override
public Stream<ChannelEvent> getEventStream(EventQuery eventQuery, Stream<? extends ChannelEventQuery> queryProviders) { public Stream<ChannelEvent> getEventStream(EventQuery eventQuery, Stream<? extends ChannelEventQuery> queryProviders) {
Stream<ChannelEvent> result = queryProviders.map(ceq -> { Stream<ChannelEvent> result = queryProviders.map(ceq -> {
if (ceq instanceof ChannelEventQueryInfo) { if (ceq instanceof MetaChannelEvent) {
return getEvent((ChannelEventQueryInfo) ceq); return getEvent((MetaChannelEvent) ceq);
} else { } else {
throw new UnsupportedOperationException("This is not yet implemented!"); throw new UnsupportedOperationException("This is not yet implemented!");
} }
@ -160,6 +160,7 @@ public class DummyCassandraReader implements CassandraReader {
i, i,
i * 10, i * 10,
0, 0,
KEYSPACE,
value value
); );
@ -176,6 +177,7 @@ public class DummyCassandraReader implements CassandraReader {
i, i,
i * 10, i * 10,
0, 0,
KEYSPACE,
value, value,
shape shape
); );
@ -187,6 +189,7 @@ public class DummyCassandraReader implements CassandraReader {
i, i,
i * 10, i * 10,
0, 0,
KEYSPACE,
i i
); );
} }
@ -221,7 +224,7 @@ public class DummyCassandraReader implements CassandraReader {
* @{inheritDoc * @{inheritDoc
*/ */
@Override @Override
public ChannelEvent getEvent(ChannelEventQueryInfo queryInfo, String... columns) { public ChannelEvent getEvent(MetaChannelEvent queryInfo, String... columns) {
if (queryInfo.getPulseId() > 0) { if (queryInfo.getPulseId() > 0) {
return (ChannelEvent) getDummyEvents(queryInfo.getChannel(), queryInfo.getPulseId(), queryInfo.getPulseId()) return (ChannelEvent) getDummyEvents(queryInfo.getChannel(), queryInfo.getPulseId(), queryInfo.getPulseId())
.get(0); .get(0);
@ -234,7 +237,8 @@ public class DummyCassandraReader implements CassandraReader {
* @{inheritDoc * @{inheritDoc
*/ */
@Override @Override
public CompletableFuture<ChannelEvent> getEventAsync(ChannelEventQueryInfo queryInfo, String... columns) { public CompletableFuture<ChannelEvent> getEventAsync(MetaChannelEvent queryInfo, String... columns) {
// implement when needed
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
@ -245,15 +249,17 @@ public class DummyCassandraReader implements CassandraReader {
public Stream<? extends ChannelEventQuery> getChannelEventQueryStream(PulseIdRangeQuery query) { public Stream<? extends ChannelEventQuery> getChannelEventQueryStream(PulseIdRangeQuery query) {
return dataGen.generateMetaPulseId( return dataGen.generateMetaPulseId(
KEYSPACE,
ConverterProvider.TYPE_INT32,
null,
query.getStartPulseId(), query.getStartPulseId(),
(query.getEndPulseId() - query.getStartPulseId() + 1), (query.getEndPulseId() - query.getStartPulseId() + 1),
i -> i * 10, i -> i * 10,
i -> 0, i -> 0,
i -> i, i -> i,
query.getChannel()).stream(); query.getChannel())
.stream()
.map(metaPulse -> {
metaPulse.setKeyspace(KEYSPACE);
return metaPulse;
});
} }
/** /**
@ -264,8 +270,6 @@ public class DummyCassandraReader implements CassandraReader {
return dataGen.generateMetaTime( return dataGen.generateMetaTime(
KEYSPACE, KEYSPACE,
ConverterProvider.TYPE_INT32,
null,
query.getStartMillis() / 10, query.getStartMillis() / 10,
((query.getEndMillis() - query.getStartMillis()) / 10 + 1), ((query.getEndMillis() - query.getStartMillis()) / 10 + 1),
i -> i * 10, i -> i * 10,
@ -278,7 +282,7 @@ public class DummyCassandraReader implements CassandraReader {
* @{inheritDoc * @{inheritDoc
*/ */
@Override @Override
public Stream<MetaPulseId> getMetaPulseIdStream(PulseIdRangeQuery query) { public Stream<MetaPulseId> getMetaStream(PulseIdRangeQuery query) {
return getChannelEventQueryStream(query).map(r -> { return getChannelEventQueryStream(query).map(r -> {
return (MetaPulseId) r; return (MetaPulseId) r;
@ -291,11 +295,55 @@ public class DummyCassandraReader implements CassandraReader {
* @{inheritDoc * @{inheritDoc
*/ */
@Override @Override
public Stream<MetaTime> getMetaTimeStream(TimeRangeQuery query) { public Stream<? extends MetaChannelEvent> getMetaStream(TimeRangeQuery query) {
return getChannelEventQueryStream(query).map(r -> { return getChannelEventQueryStream(query).map(r -> {
return (MetaTime) r; return (MetaChannelEvent) r;
}); });
} }
@Override
public Stream<ChannelEvent> getEventStream(Stream<? extends MetaChannelEvent> queryInfos) {
return getEventStream(null, queryInfos);
}
@Override
public Stream<ChannelConfiguration> getChannelConfiguration(TimeRangeQuery query) {
// implement when needed
throw new UnsupportedOperationException();
}
@Override
public ChannelConfiguration getChannelConfigurationBefore(TimeRangeQuery query) {
try {
return getChannelConfigurationBeforeAsync(query)
.get(30, TimeUnit.SECONDS);
} catch (Throwable t) {
LOGGER.error("Could not read ChannelConfiguration from DB.", t);
return null;
}
}
@Override
public CompletableFuture<ChannelConfiguration> getChannelConfigurationBeforeAsync(TimeRangeQuery query) {
// implement when needed
throw new UnsupportedOperationException();
}
@Override
public ChannelConfiguration getChannelConfigurationAfter(TimeRangeQuery query) {
try {
return getChannelConfigurationAfterAsync(query)
.get(30, TimeUnit.SECONDS);
} catch (Throwable t) {
LOGGER.error("Could not read ChannelConfiguration from DB.", t);
return null;
}
}
@Override
public CompletableFuture<ChannelConfiguration> getChannelConfigurationAfterAsync(TimeRangeQuery query) {
// implement when needed
throw new UnsupportedOperationException();
}
} }

View File

@ -35,13 +35,13 @@ public class DummyDataReader implements DataReader {
@Override @Override
public Stream<? extends DataEvent> getEventStream(String channel, long startPulseId, long endPulseId, public Stream<? extends DataEvent> getEventStream(String channel, long startPulseId, long endPulseId,
Ordering ordering, String... columns) { Ordering ordering, boolean aggregateValues, String... columns) {
return getElements(channel, startPulseId, endPulseId); return getElements(channel, startPulseId, endPulseId);
} }
@Override @Override
public Stream<? extends DataEvent> getEventStream(String channel, long startMillis, long startNanos, long endMillis, public Stream<? extends DataEvent> getEventStream(String channel, long startMillis, long startNanos, long endMillis,
long endNanos, Ordering ordering, String... columns) { long endNanos, Ordering ordering, boolean aggregateValues, String... columns) {
return getElements(channel, startMillis / 10, endMillis / 10); return getElements(channel, startMillis / 10, endMillis / 10);
} }
@ -59,6 +59,7 @@ public class DummyDataReader implements DataReader {
i, i,
i * 10, i * 10,
0, 0,
1,
value value
); );
@ -75,6 +76,7 @@ public class DummyDataReader implements DataReader {
i, i,
i * 10, i * 10,
0, 0,
1,
value, value,
shape shape
); );
@ -86,6 +88,7 @@ public class DummyDataReader implements DataReader {
i, i,
i * 10, i * 10,
0, 0,
1,
i i
); );
} }