From 419f97f429a99c51d12a4256148d6813ad4551a7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fabian=20M=C3=A4rki?= Date: Mon, 21 Mar 2016 15:34:19 +0100 Subject: [PATCH] Refactoring --- .../query/DummyArchiverApplianceReader.java | 117 ++++++++++-------- .../queryrest/query/DummyCassandraReader.java | 54 ++------ 2 files changed, 76 insertions(+), 95 deletions(-) diff --git a/src/test/java/ch/psi/daq/test/queryrest/query/DummyArchiverApplianceReader.java b/src/test/java/ch/psi/daq/test/queryrest/query/DummyArchiverApplianceReader.java index 9ab6dd2..121b4dd 100644 --- a/src/test/java/ch/psi/daq/test/queryrest/query/DummyArchiverApplianceReader.java +++ b/src/test/java/ch/psi/daq/test/queryrest/query/DummyArchiverApplianceReader.java @@ -12,11 +12,12 @@ import org.apache.commons.lang3.ArrayUtils; import com.google.common.collect.Lists; -import ch.psi.daq.common.ordering.Ordering; import ch.psi.daq.common.time.TimeUtils; -import ch.psi.daq.domain.DataEvent; +import ch.psi.daq.domain.StreamEvent; import ch.psi.daq.domain.cassandra.ChannelEvent; import ch.psi.daq.domain.cassandra.FieldNames; +import ch.psi.daq.domain.cassandra.query.PulseIdRangeQuery; +import ch.psi.daq.domain.cassandra.query.TimeRangeQuery; import ch.psi.daq.domain.cassandra.utils.TablePropertiesUtils; import ch.psi.daq.domain.reader.Backend; import ch.psi.daq.domain.reader.DataReader; @@ -52,64 +53,78 @@ public class DummyArchiverApplianceReader implements DataReader { } @Override - public Stream getEventStream(String channel, long startPulseId, long endPulseId, - Ordering ordering, boolean aggregateValues, String... columns) { - return getDummyEventStream(channel, startPulseId, endPulseId, columns); + public Stream getEventStream(PulseIdRangeQuery query) { + return getDummyEventStream(query.getChannel(), query.getStartPulseId(), query.getEndPulseId(), + query.getEventColumns()) + .filter(query.getFilterOrDefault(NO_OP_FILTER)); } @Override - public Stream getEventStream(String channel, BigDecimal startTime, BigDecimal endTime, - Ordering ordering, boolean aggregateValues, String... columns) { - return getDummyEventStream(channel, TimeUtils.getMillis(startTime) / 10, TimeUtils.getMillis(endTime) / 10); + public Stream getEventStream(TimeRangeQuery query) { + return getDummyEventStream(query.getChannel(), query.getStartMillis() / 10, query.getEndMillis() / 10) + .filter(query.getFilterOrDefault(NO_OP_FILTER)); } - private Stream getDummyEventStream(String channelParam, long startIndex, long endIndex, String... columns) { + private Stream getDummyEventStream(String channelParam, long startIndex, long endIndex, + String... columns) { String channelLower = channelParam.toLowerCase(); - String channel = (columns == null || columns.length == 0 || ArrayUtils.contains(columns, FieldNames.FIELD_CHANNEL)) ? channelParam : null; + String channel = + (columns == null || columns.length == 0 || ArrayUtils.contains(columns, FieldNames.FIELD_CHANNEL)) ? channelParam + : null; - Stream eventStream = LongStream.rangeClosed(startIndex, endIndex).mapToObj(i -> { - BigDecimal iocTime = (columns == null || columns.length == 0 || ArrayUtils.contains(columns, FieldNames.FIELD_IOC_TIME)) ? TimeUtils.getTimeFromMillis(i * 10, 0) : TablePropertiesUtils.DEFAULT_VALUE_DECIMAL; - BigDecimal globalTime = (columns == null || columns.length == 0 || ArrayUtils.contains(columns, FieldNames.FIELD_GLOBAL_TIME)) ? TimeUtils.getTimeFromMillis(i * 10, 0) : TablePropertiesUtils.DEFAULT_VALUE_DECIMAL; - long pulseId = (columns == null || columns.length == 0 || ArrayUtils.contains(columns, FieldNames.FIELD_PULSE_ID)) ? i : TablePropertiesUtils.DEFAULT_VALUE_BIGINT_PRIMITIVE; + Stream eventStream = + LongStream.rangeClosed(startIndex, endIndex).mapToObj( + i -> { + BigDecimal iocTime = + (columns == null || columns.length == 0 || ArrayUtils.contains(columns, + FieldNames.FIELD_IOC_TIME)) ? TimeUtils.getTimeFromMillis(i * 10, 0) + : TablePropertiesUtils.DEFAULT_VALUE_DECIMAL; + BigDecimal globalTime = + (columns == null || columns.length == 0 || ArrayUtils.contains(columns, + FieldNames.FIELD_GLOBAL_TIME)) ? TimeUtils.getTimeFromMillis(i * 10, 0) + : TablePropertiesUtils.DEFAULT_VALUE_DECIMAL; + long pulseId = + (columns == null || columns.length == 0 || ArrayUtils.contains(columns, + FieldNames.FIELD_PULSE_ID)) ? i : TablePropertiesUtils.DEFAULT_VALUE_BIGINT_PRIMITIVE; - if (channelLower.contains("waveform")) { - long[] value = random.longs(2048).toArray(); - value[0] = i; - return new ChannelEvent( - channel, - iocTime, - pulseId, - globalTime, - KEYSPACE, - value - ); + if (channelLower.contains("waveform")) { + long[] value = random.longs(2048).toArray(); + value[0] = i; + return new ChannelEvent( + channel, + iocTime, + pulseId, + globalTime, + KEYSPACE, + value + ); - } else if (channelLower.contains("image")) { - int x = 640; - int y = 480; - int[] shape = new int[] {x, y}; - long[] value = random.longs(x * y).toArray(); - value[0] = i; - return new ChannelEvent( - channel, - iocTime, - pulseId, - globalTime, - KEYSPACE, - value, - shape - ); - } else { - return new ChannelEvent( - channel, - iocTime, - pulseId, - globalTime, - KEYSPACE, - i - ); - } - }); + } else if (channelLower.contains("image")) { + int x = 640; + int y = 480; + int[] shape = new int[] {x, y}; + long[] value = random.longs(x * y).toArray(); + value[0] = i; + return new ChannelEvent( + channel, + iocTime, + pulseId, + globalTime, + KEYSPACE, + value, + shape + ); + } else { + return new ChannelEvent( + channel, + iocTime, + pulseId, + globalTime, + KEYSPACE, + i + ); + } + }); return eventStream; } diff --git a/src/test/java/ch/psi/daq/test/queryrest/query/DummyCassandraReader.java b/src/test/java/ch/psi/daq/test/queryrest/query/DummyCassandraReader.java index 69500c6..619de3c 100644 --- a/src/test/java/ch/psi/daq/test/queryrest/query/DummyCassandraReader.java +++ b/src/test/java/ch/psi/daq/test/queryrest/query/DummyCassandraReader.java @@ -22,7 +22,6 @@ import com.google.common.collect.Lists; import ch.psi.daq.cassandra.reader.CassandraReader; import ch.psi.daq.cassandra.util.test.CassandraDataGen; -import ch.psi.daq.common.ordering.Ordering; import ch.psi.daq.common.time.TimeUtils; import ch.psi.daq.domain.DataEvent; import ch.psi.daq.domain.cassandra.ChannelConfiguration; @@ -32,8 +31,8 @@ import ch.psi.daq.domain.cassandra.MetaPulseId; import ch.psi.daq.domain.cassandra.query.PulseIdRangeQuery; import ch.psi.daq.domain.cassandra.query.TimeRangeQuery; import ch.psi.daq.domain.cassandra.querying.ChannelEventQuery; -import ch.psi.daq.domain.cassandra.querying.MetaChannelEvent; import ch.psi.daq.domain.cassandra.querying.EventQuery; +import ch.psi.daq.domain.cassandra.querying.MetaChannelEvent; import ch.psi.daq.domain.cassandra.utils.TablePropertiesUtils; import ch.psi.daq.domain.reader.Backend; @@ -99,22 +98,16 @@ public class DummyCassandraReader implements CassandraReader { return channelStream; } - /** - * @{inheritDoc - */ @Override - public Stream getEventStream(String channel, long startPulseId, long endPulseId, - Ordering ordering, boolean aggregateValues, String... columns) { - return getDummyEventStream(channel, startPulseId, endPulseId, columns); + public Stream getEventStream(PulseIdRangeQuery query) { + return getDummyEventStream(query.getChannel(), query.getStartPulseId(), query.getEndPulseId(), query.getEventColumns()) + .filter(query.getFilterOrDefault(NO_OP_FILTER)); } - /** - * @{inheritDoc - */ @Override - public Stream getEventStream(String channel, BigDecimal startTime, BigDecimal endTime, - Ordering ordering, boolean aggregateValues, String... columns) { - return getDummyEventStream(channel, TimeUtils.getMillis(startTime) / 10, TimeUtils.getMillis(endTime) / 10, columns); + public Stream getEventStream(TimeRangeQuery query) { + return getDummyEventStream(query.getChannel(), query.getStartMillis() / 10, query.getEndMillis() / 10) + .filter(query.getFilterOrDefault(NO_OP_FILTER)); } /** @@ -133,39 +126,11 @@ public class DummyCassandraReader implements CassandraReader { return result; } - /** - * @{inheritDoc - */ - @Override - public Stream getEventStream(PulseIdRangeQuery query) { - Stream dummyEventStream = - getDummyEventStream(query.getChannel(), query.getStartPulseId(), query.getEndPulseId(), query.getEventColumns()) - .map(ce -> { - return (ChannelEvent) ce; - }); - return dummyEventStream; - - } - - /** - * @{inheritDoc - */ - @Override - public Stream getEventStream(TimeRangeQuery query) { - Stream dummyEventStream = - getDummyEventStream(query.getChannel(), query.getStartMillis() / 10, query.getEndMillis() / 10, query.getEventColumns()) - .map(ce -> { - return (ChannelEvent) ce; - }); - return dummyEventStream; - } - - - private Stream getDummyEventStream(String channelParam, long startIndex, long endIndex, String... columns) { + private Stream getDummyEventStream(String channelParam, long startIndex, long endIndex, String... columns) { String channelLower = channelParam.toLowerCase(); String channel = (columns == null || columns.length == 0 || ArrayUtils.contains(columns, FieldNames.FIELD_CHANNEL)) ? channelParam : null; - Stream eventStream = LongStream.rangeClosed(startIndex, endIndex).mapToObj(i -> { + Stream eventStream = LongStream.rangeClosed(startIndex, endIndex).mapToObj(i -> { BigDecimal iocTime = (columns == null || columns.length == 0 || ArrayUtils.contains(columns, FieldNames.FIELD_IOC_TIME)) ? TimeUtils.getTimeFromMillis(i * 10, 0) : TablePropertiesUtils.DEFAULT_VALUE_DECIMAL; BigDecimal globalTime = (columns == null || columns.length == 0 || ArrayUtils.contains(columns, FieldNames.FIELD_GLOBAL_TIME)) ? TimeUtils.getTimeFromMillis(i * 10, 0) : TablePropertiesUtils.DEFAULT_VALUE_DECIMAL; long pulseId = (columns == null || columns.length == 0 || ArrayUtils.contains(columns, FieldNames.FIELD_PULSE_ID)) ? i : TablePropertiesUtils.DEFAULT_VALUE_BIGINT_PRIMITIVE; @@ -284,6 +249,7 @@ public class DummyCassandraReader implements CassandraReader { return dataGen.generateMetaTime( KEYSPACE, + 3600, query.getStartMillis() / 10, ((query.getEndMillis() - query.getStartMillis()) / 10 + 1), i -> i * 10,