Config repo, restart, cleanup Cassandra dependencies.

This commit is contained in:
Fabian Märki
2016-06-06 15:55:44 +02:00
parent b61f21835a
commit 907637ad32

View File

@ -21,20 +21,21 @@ import org.slf4j.LoggerFactory;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import ch.psi.daq.cassandra.domain.ChannelConfiguration;
import ch.psi.daq.cassandra.domain.ChannelEvent;
import ch.psi.daq.cassandra.domain.MetaPulseId;
import ch.psi.daq.cassandra.reader.CassandraReader; import ch.psi.daq.cassandra.reader.CassandraReader;
import ch.psi.daq.cassandra.util.test.CassandraDataGen; import ch.psi.daq.cassandra.util.test.CassandraDataGen;
import ch.psi.daq.common.time.TimeUtils; import ch.psi.daq.common.time.TimeUtils;
import ch.psi.daq.domain.DataEvent; import ch.psi.daq.domain.DataEvent;
import ch.psi.daq.domain.FieldNames; import ch.psi.daq.domain.FieldNames;
import ch.psi.daq.domain.events.ChannelConfiguration;
import ch.psi.daq.domain.events.ChannelEvent;
import ch.psi.daq.domain.events.MetaPulseId;
import ch.psi.daq.domain.events.impl.ChannelEventImpl;
import ch.psi.daq.domain.query.backend.PulseIdRangeQuery; import ch.psi.daq.domain.query.backend.PulseIdRangeQuery;
import ch.psi.daq.domain.query.backend.TimeRangeQuery; import ch.psi.daq.domain.query.backend.TimeRangeQuery;
import ch.psi.daq.domain.query.event.EventQuery; import ch.psi.daq.domain.query.event.EventQuery;
import ch.psi.daq.domain.query.event.StreamEventQuery; import ch.psi.daq.domain.query.event.StreamEventQuery;
import ch.psi.daq.domain.reader.Backend; import ch.psi.daq.domain.reader.Backend;
import ch.psi.daq.domain.reader.MetaStreamEvent; import ch.psi.daq.domain.reader.MetaStreamEventQuery;
import ch.psi.daq.domain.utils.PropertiesUtils; import ch.psi.daq.domain.utils.PropertiesUtils;
public class DummyCassandraReader implements CassandraReader { public class DummyCassandraReader implements CassandraReader {
@ -118,8 +119,8 @@ public class DummyCassandraReader implements CassandraReader {
@Override @Override
public Stream<ChannelEvent> getEventStream(EventQuery eventQuery, Stream<? extends StreamEventQuery> queryProviders) { public Stream<ChannelEvent> getEventStream(EventQuery eventQuery, Stream<? extends StreamEventQuery> queryProviders) {
Stream<ChannelEvent> result = queryProviders.map(ceq -> { Stream<ChannelEvent> result = queryProviders.map(ceq -> {
if (ceq instanceof MetaStreamEvent) { if (ceq instanceof MetaStreamEventQuery) {
return getEvent((MetaStreamEvent<ChannelEvent>) ceq); return getEvent((MetaStreamEventQuery<ChannelEvent>) ceq);
} else { } else {
throw new UnsupportedOperationException("This is not yet implemented!"); throw new UnsupportedOperationException("This is not yet implemented!");
} }
@ -169,7 +170,7 @@ public class DummyCassandraReader implements CassandraReader {
if (channelLower.contains("waveform")) { if (channelLower.contains("waveform")) {
long[] value = random.longs(2048).toArray(); long[] value = random.longs(2048).toArray();
value[0] = i; value[0] = i;
return new ChannelEvent( return new ChannelEventImpl(
channel, channel,
iocTime, iocTime,
pulseId, pulseId,
@ -184,7 +185,7 @@ public class DummyCassandraReader implements CassandraReader {
int[] shape = new int[] {x, y}; int[] shape = new int[] {x, y};
long[] value = random.longs(x * y).toArray(); long[] value = random.longs(x * y).toArray();
value[0] = i; value[0] = i;
return new ChannelEvent( return new ChannelEventImpl(
channel, channel,
iocTime, iocTime,
pulseId, pulseId,
@ -194,7 +195,7 @@ public class DummyCassandraReader implements CassandraReader {
shape shape
); );
} else { } else {
return new ChannelEvent( return new ChannelEventImpl(
channel, channel,
iocTime, iocTime,
pulseId, pulseId,
@ -234,7 +235,7 @@ public class DummyCassandraReader implements CassandraReader {
* @{inheritDoc * @{inheritDoc
*/ */
@Override @Override
public ChannelEvent getEvent(MetaStreamEvent<ChannelEvent> queryInfo, String... columns) { public ChannelEvent getEvent(MetaStreamEventQuery<ChannelEvent> queryInfo, String... columns) {
if (queryInfo.getPulseId() > 0) { if (queryInfo.getPulseId() > 0) {
return (ChannelEvent) getDummyEvents(queryInfo.getChannel(), queryInfo.getPulseId(), queryInfo.getPulseId(), return (ChannelEvent) getDummyEvents(queryInfo.getChannel(), queryInfo.getPulseId(), queryInfo.getPulseId(),
columns) columns)
@ -248,7 +249,7 @@ public class DummyCassandraReader implements CassandraReader {
* @{inheritDoc * @{inheritDoc
*/ */
@Override @Override
public CompletableFuture<ChannelEvent> getEventAsync(MetaStreamEvent<ChannelEvent> queryInfo, String... columns) { public CompletableFuture<ChannelEvent> getEventAsync(MetaStreamEventQuery<ChannelEvent> queryInfo, String... columns) {
// implement when needed // implement when needed
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
@ -307,15 +308,15 @@ public class DummyCassandraReader implements CassandraReader {
* @{inheritDoc * @{inheritDoc
*/ */
@Override @Override
public Stream<? extends MetaStreamEvent<ChannelEvent>> getMetaStream(TimeRangeQuery query) { public Stream<? extends MetaStreamEventQuery<ChannelEvent>> getMetaStream(TimeRangeQuery query) {
return getStreamEventQueryStream(query).map(r -> { return getStreamEventQueryStream(query).map(r -> {
return (MetaStreamEvent<ChannelEvent>) r; return (MetaStreamEventQuery<ChannelEvent>) r;
}); });
} }
@Override @Override
public Stream<ChannelEvent> getEventStream(Stream<? extends MetaStreamEvent<ChannelEvent>> queryInfos) { public Stream<ChannelEvent> getEventStream(Stream<? extends MetaStreamEventQuery<ChannelEvent>> queryInfos) {
return getEventStream(null, queryInfos); return getEventStream(null, queryInfos);
} }