Refactoring

This commit is contained in:
Fabian Märki
2016-03-21 15:34:19 +01:00
parent 9f56e9e8ad
commit 419f97f429
2 changed files with 76 additions and 95 deletions

View File

@ -12,11 +12,12 @@ import org.apache.commons.lang3.ArrayUtils;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import ch.psi.daq.common.ordering.Ordering;
import ch.psi.daq.common.time.TimeUtils; import ch.psi.daq.common.time.TimeUtils;
import ch.psi.daq.domain.DataEvent; import ch.psi.daq.domain.StreamEvent;
import ch.psi.daq.domain.cassandra.ChannelEvent; import ch.psi.daq.domain.cassandra.ChannelEvent;
import ch.psi.daq.domain.cassandra.FieldNames; import ch.psi.daq.domain.cassandra.FieldNames;
import ch.psi.daq.domain.cassandra.query.PulseIdRangeQuery;
import ch.psi.daq.domain.cassandra.query.TimeRangeQuery;
import ch.psi.daq.domain.cassandra.utils.TablePropertiesUtils; import ch.psi.daq.domain.cassandra.utils.TablePropertiesUtils;
import ch.psi.daq.domain.reader.Backend; import ch.psi.daq.domain.reader.Backend;
import ch.psi.daq.domain.reader.DataReader; import ch.psi.daq.domain.reader.DataReader;
@ -52,64 +53,78 @@ public class DummyArchiverApplianceReader implements DataReader {
} }
@Override @Override
public Stream<? extends DataEvent> getEventStream(String channel, long startPulseId, long endPulseId, public Stream<? extends StreamEvent> getEventStream(PulseIdRangeQuery query) {
Ordering ordering, boolean aggregateValues, String... columns) { return getDummyEventStream(query.getChannel(), query.getStartPulseId(), query.getEndPulseId(),
return getDummyEventStream(channel, startPulseId, endPulseId, columns); query.getEventColumns())
.filter(query.getFilterOrDefault(NO_OP_FILTER));
} }
@Override @Override
public Stream<? extends DataEvent> getEventStream(String channel, BigDecimal startTime, BigDecimal endTime, public Stream<? extends StreamEvent> getEventStream(TimeRangeQuery query) {
Ordering ordering, boolean aggregateValues, String... columns) { return getDummyEventStream(query.getChannel(), query.getStartMillis() / 10, query.getEndMillis() / 10)
return getDummyEventStream(channel, TimeUtils.getMillis(startTime) / 10, TimeUtils.getMillis(endTime) / 10); .filter(query.getFilterOrDefault(NO_OP_FILTER));
} }
private Stream<? extends DataEvent> getDummyEventStream(String channelParam, long startIndex, long endIndex, String... columns) { private Stream<? extends StreamEvent> getDummyEventStream(String channelParam, long startIndex, long endIndex,
String... columns) {
String channelLower = channelParam.toLowerCase(); String channelLower = channelParam.toLowerCase();
String channel = (columns == null || columns.length == 0 || ArrayUtils.contains(columns, FieldNames.FIELD_CHANNEL)) ? channelParam : null; String channel =
(columns == null || columns.length == 0 || ArrayUtils.contains(columns, FieldNames.FIELD_CHANNEL)) ? channelParam
: null;
Stream<? extends DataEvent> eventStream = LongStream.rangeClosed(startIndex, endIndex).mapToObj(i -> { Stream<? extends StreamEvent> eventStream =
BigDecimal iocTime = (columns == null || columns.length == 0 || ArrayUtils.contains(columns, FieldNames.FIELD_IOC_TIME)) ? TimeUtils.getTimeFromMillis(i * 10, 0) : TablePropertiesUtils.DEFAULT_VALUE_DECIMAL; LongStream.rangeClosed(startIndex, endIndex).mapToObj(
BigDecimal globalTime = (columns == null || columns.length == 0 || ArrayUtils.contains(columns, FieldNames.FIELD_GLOBAL_TIME)) ? TimeUtils.getTimeFromMillis(i * 10, 0) : TablePropertiesUtils.DEFAULT_VALUE_DECIMAL; i -> {
long pulseId = (columns == null || columns.length == 0 || ArrayUtils.contains(columns, FieldNames.FIELD_PULSE_ID)) ? i : TablePropertiesUtils.DEFAULT_VALUE_BIGINT_PRIMITIVE; BigDecimal iocTime =
(columns == null || columns.length == 0 || ArrayUtils.contains(columns,
FieldNames.FIELD_IOC_TIME)) ? TimeUtils.getTimeFromMillis(i * 10, 0)
: TablePropertiesUtils.DEFAULT_VALUE_DECIMAL;
BigDecimal globalTime =
(columns == null || columns.length == 0 || ArrayUtils.contains(columns,
FieldNames.FIELD_GLOBAL_TIME)) ? TimeUtils.getTimeFromMillis(i * 10, 0)
: TablePropertiesUtils.DEFAULT_VALUE_DECIMAL;
long pulseId =
(columns == null || columns.length == 0 || ArrayUtils.contains(columns,
FieldNames.FIELD_PULSE_ID)) ? i : TablePropertiesUtils.DEFAULT_VALUE_BIGINT_PRIMITIVE;
if (channelLower.contains("waveform")) { if (channelLower.contains("waveform")) {
long[] value = random.longs(2048).toArray(); long[] value = random.longs(2048).toArray();
value[0] = i; value[0] = i;
return new ChannelEvent( return new ChannelEvent(
channel, channel,
iocTime, iocTime,
pulseId, pulseId,
globalTime, globalTime,
KEYSPACE, KEYSPACE,
value value
); );
} else if (channelLower.contains("image")) { } else if (channelLower.contains("image")) {
int x = 640; int x = 640;
int y = 480; int y = 480;
int[] shape = new int[] {x, y}; int[] shape = new int[] {x, y};
long[] value = random.longs(x * y).toArray(); long[] value = random.longs(x * y).toArray();
value[0] = i; value[0] = i;
return new ChannelEvent( return new ChannelEvent(
channel, channel,
iocTime, iocTime,
pulseId, pulseId,
globalTime, globalTime,
KEYSPACE, KEYSPACE,
value, value,
shape shape
); );
} else { } else {
return new ChannelEvent( return new ChannelEvent(
channel, channel,
iocTime, iocTime,
pulseId, pulseId,
globalTime, globalTime,
KEYSPACE, KEYSPACE,
i i
); );
} }
}); });
return eventStream; return eventStream;
} }

View File

@ -22,7 +22,6 @@ import com.google.common.collect.Lists;
import ch.psi.daq.cassandra.reader.CassandraReader; import ch.psi.daq.cassandra.reader.CassandraReader;
import ch.psi.daq.cassandra.util.test.CassandraDataGen; import ch.psi.daq.cassandra.util.test.CassandraDataGen;
import ch.psi.daq.common.ordering.Ordering;
import ch.psi.daq.common.time.TimeUtils; import ch.psi.daq.common.time.TimeUtils;
import ch.psi.daq.domain.DataEvent; import ch.psi.daq.domain.DataEvent;
import ch.psi.daq.domain.cassandra.ChannelConfiguration; import ch.psi.daq.domain.cassandra.ChannelConfiguration;
@ -32,8 +31,8 @@ import ch.psi.daq.domain.cassandra.MetaPulseId;
import ch.psi.daq.domain.cassandra.query.PulseIdRangeQuery; import ch.psi.daq.domain.cassandra.query.PulseIdRangeQuery;
import ch.psi.daq.domain.cassandra.query.TimeRangeQuery; import ch.psi.daq.domain.cassandra.query.TimeRangeQuery;
import ch.psi.daq.domain.cassandra.querying.ChannelEventQuery; import ch.psi.daq.domain.cassandra.querying.ChannelEventQuery;
import ch.psi.daq.domain.cassandra.querying.MetaChannelEvent;
import ch.psi.daq.domain.cassandra.querying.EventQuery; import ch.psi.daq.domain.cassandra.querying.EventQuery;
import ch.psi.daq.domain.cassandra.querying.MetaChannelEvent;
import ch.psi.daq.domain.cassandra.utils.TablePropertiesUtils; import ch.psi.daq.domain.cassandra.utils.TablePropertiesUtils;
import ch.psi.daq.domain.reader.Backend; import ch.psi.daq.domain.reader.Backend;
@ -99,22 +98,16 @@ public class DummyCassandraReader implements CassandraReader {
return channelStream; return channelStream;
} }
/**
* @{inheritDoc
*/
@Override @Override
public Stream<? extends DataEvent> getEventStream(String channel, long startPulseId, long endPulseId, public Stream<ChannelEvent> getEventStream(PulseIdRangeQuery query) {
Ordering ordering, boolean aggregateValues, String... columns) { return getDummyEventStream(query.getChannel(), query.getStartPulseId(), query.getEndPulseId(), query.getEventColumns())
return getDummyEventStream(channel, startPulseId, endPulseId, columns); .filter(query.getFilterOrDefault(NO_OP_FILTER));
} }
/**
* @{inheritDoc
*/
@Override @Override
public Stream<? extends DataEvent> getEventStream(String channel, BigDecimal startTime, BigDecimal endTime, public Stream<ChannelEvent> getEventStream(TimeRangeQuery query) {
Ordering ordering, boolean aggregateValues, String... columns) { return getDummyEventStream(query.getChannel(), query.getStartMillis() / 10, query.getEndMillis() / 10)
return getDummyEventStream(channel, TimeUtils.getMillis(startTime) / 10, TimeUtils.getMillis(endTime) / 10, columns); .filter(query.getFilterOrDefault(NO_OP_FILTER));
} }
/** /**
@ -133,39 +126,11 @@ public class DummyCassandraReader implements CassandraReader {
return result; return result;
} }
/** private Stream<ChannelEvent> getDummyEventStream(String channelParam, long startIndex, long endIndex, String... columns) {
* @{inheritDoc
*/
@Override
public Stream<ChannelEvent> getEventStream(PulseIdRangeQuery query) {
Stream<ChannelEvent> dummyEventStream =
getDummyEventStream(query.getChannel(), query.getStartPulseId(), query.getEndPulseId(), query.getEventColumns())
.map(ce -> {
return (ChannelEvent) ce;
});
return dummyEventStream;
}
/**
* @{inheritDoc
*/
@Override
public Stream<ChannelEvent> getEventStream(TimeRangeQuery query) {
Stream<ChannelEvent> dummyEventStream =
getDummyEventStream(query.getChannel(), query.getStartMillis() / 10, query.getEndMillis() / 10, query.getEventColumns())
.map(ce -> {
return (ChannelEvent) ce;
});
return dummyEventStream;
}
private Stream<? extends DataEvent> getDummyEventStream(String channelParam, long startIndex, long endIndex, String... columns) {
String channelLower = channelParam.toLowerCase(); String channelLower = channelParam.toLowerCase();
String channel = (columns == null || columns.length == 0 || ArrayUtils.contains(columns, FieldNames.FIELD_CHANNEL)) ? channelParam : null; String channel = (columns == null || columns.length == 0 || ArrayUtils.contains(columns, FieldNames.FIELD_CHANNEL)) ? channelParam : null;
Stream<? extends DataEvent> eventStream = LongStream.rangeClosed(startIndex, endIndex).mapToObj(i -> { Stream<ChannelEvent> eventStream = LongStream.rangeClosed(startIndex, endIndex).mapToObj(i -> {
BigDecimal iocTime = (columns == null || columns.length == 0 || ArrayUtils.contains(columns, FieldNames.FIELD_IOC_TIME)) ? TimeUtils.getTimeFromMillis(i * 10, 0) : TablePropertiesUtils.DEFAULT_VALUE_DECIMAL; BigDecimal iocTime = (columns == null || columns.length == 0 || ArrayUtils.contains(columns, FieldNames.FIELD_IOC_TIME)) ? TimeUtils.getTimeFromMillis(i * 10, 0) : TablePropertiesUtils.DEFAULT_VALUE_DECIMAL;
BigDecimal globalTime = (columns == null || columns.length == 0 || ArrayUtils.contains(columns, FieldNames.FIELD_GLOBAL_TIME)) ? TimeUtils.getTimeFromMillis(i * 10, 0) : TablePropertiesUtils.DEFAULT_VALUE_DECIMAL; BigDecimal globalTime = (columns == null || columns.length == 0 || ArrayUtils.contains(columns, FieldNames.FIELD_GLOBAL_TIME)) ? TimeUtils.getTimeFromMillis(i * 10, 0) : TablePropertiesUtils.DEFAULT_VALUE_DECIMAL;
long pulseId = (columns == null || columns.length == 0 || ArrayUtils.contains(columns, FieldNames.FIELD_PULSE_ID)) ? i : TablePropertiesUtils.DEFAULT_VALUE_BIGINT_PRIMITIVE; long pulseId = (columns == null || columns.length == 0 || ArrayUtils.contains(columns, FieldNames.FIELD_PULSE_ID)) ? i : TablePropertiesUtils.DEFAULT_VALUE_BIGINT_PRIMITIVE;
@ -284,6 +249,7 @@ public class DummyCassandraReader implements CassandraReader {
return dataGen.generateMetaTime( return dataGen.generateMetaTime(
KEYSPACE, KEYSPACE,
3600,
query.getStartMillis() / 10, query.getStartMillis() / 10,
((query.getEndMillis() - query.getStartMillis()) / 10 + 1), ((query.getEndMillis() - query.getStartMillis()) / 10 + 1),
i -> i * 10, i -> i * 10,