Merge branch 'ATEST-315' into 'master'

ATEST-315



See merge request !9
This commit is contained in:
maerki_f
2016-04-14 14:57:33 +02:00
10 changed files with 816 additions and 621 deletions

View File

@ -1,5 +1,5 @@
# #
#Thu Mar 24 11:36:20 CET 2016 #Thu Apr 14 08:26:07 CEST 2016
org.eclipse.jdt.core.compiler.debug.localVariable=generate org.eclipse.jdt.core.compiler.debug.localVariable=generate
org.eclipse.jdt.core.compiler.compliance=1.8 org.eclipse.jdt.core.compiler.compliance=1.8
org.eclipse.jdt.core.compiler.codegen.unusedLocal=preserve org.eclipse.jdt.core.compiler.codegen.unusedLocal=preserve

View File

@ -31,8 +31,7 @@ dependencies {
} }
compile libraries.commons_lang compile libraries.commons_lang
compile libraries.commons_io compile libraries.commons_io
compile libraries.super_csv compile libraries.commons_csv
compile libraries.super_csv_dozer
testCompile libraries.spring_boot_starter_test testCompile libraries.spring_boot_starter_test
testCompile libraries.jsonassert testCompile libraries.jsonassert

View File

@ -2,6 +2,7 @@ package ch.psi.daq.queryrest.controller;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
@ -17,6 +18,7 @@ import javax.annotation.Resource;
import javax.servlet.http.HttpServletResponse; import javax.servlet.http.HttpServletResponse;
import javax.validation.Valid; import javax.validation.Valid;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.tuple.Pair; import org.apache.commons.lang3.tuple.Pair;
import org.apache.commons.lang3.tuple.Triple; import org.apache.commons.lang3.tuple.Triple;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -39,6 +41,7 @@ import ch.psi.daq.cassandra.request.validate.RequestProviderValidator;
import ch.psi.daq.common.json.deserialize.AttributeBasedDeserializer; import ch.psi.daq.common.json.deserialize.AttributeBasedDeserializer;
import ch.psi.daq.common.ordering.Ordering; import ch.psi.daq.common.ordering.Ordering;
import ch.psi.daq.domain.DataEvent; import ch.psi.daq.domain.DataEvent;
import ch.psi.daq.domain.cassandra.FieldNames;
import ch.psi.daq.domain.json.ChannelName; import ch.psi.daq.domain.json.ChannelName;
import ch.psi.daq.domain.reader.Backend; import ch.psi.daq.domain.reader.Backend;
import ch.psi.daq.query.analyzer.QueryAnalyzer; import ch.psi.daq.query.analyzer.QueryAnalyzer;
@ -61,6 +64,7 @@ import ch.psi.daq.queryrest.response.json.JSONResponseStreamWriter;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.hazelcast.nio.serialization.FieldType;
@RestController @RestController
public class QueryRestController { public class QueryRestController {
@ -281,6 +285,11 @@ public class QueryRestController {
LOGGER.warn(message); LOGGER.warn(message);
throw new IllegalArgumentException(message); throw new IllegalArgumentException(message);
} }
if(!ArrayUtils.contains(query.getColumns(), FieldNames.FIELD_GLOBAL_TIME)){
query.addField(QueryField.globalMillis);
}
} }
try { try {

View File

@ -0,0 +1,25 @@
package ch.psi.daq.queryrest.response.csv;
import java.util.function.Function;
import java.util.function.ToDoubleFunction;
import ch.psi.daq.domain.DataEvent;
public class AggregationStringifyer implements Function<DataEvent, String> {
private ToDoubleFunction<DataEvent> accessor;
private String nonValue;
public AggregationStringifyer(ToDoubleFunction<DataEvent> accessor, String nonValue) {
this.accessor = accessor;
this.nonValue = nonValue;
}
@Override
public String apply(DataEvent event) {
if (event == null) {
return nonValue;
}else{
return Double.toString(accessor.applyAsDouble(event));
}
}
}

View File

@ -1,57 +0,0 @@
package ch.psi.daq.queryrest.response.csv;
import java.lang.reflect.Array;
import org.supercsv.cellprocessor.CellProcessorAdaptor;
import org.supercsv.cellprocessor.ift.CellProcessor;
import org.supercsv.util.CsvContext;
public class ArrayProcessor extends CellProcessorAdaptor {
public static final char DEFAULT_SEPARATOR = ',';
public static final String OPEN_BRACKET = "[";
public static final String CLOSE_BRACKET = "]";
private char separator = DEFAULT_SEPARATOR;
public ArrayProcessor() {
super();
}
public ArrayProcessor(char separator) {
super();
this.separator = separator;
}
public ArrayProcessor(CellProcessor next) {
super(next);
}
public ArrayProcessor(CellProcessor next, char separator) {
super(next);
this.separator = separator;
}
@Override
public Object execute(Object value, CsvContext context) {
if (value.getClass().isArray()) {
StringBuilder buf = new StringBuilder();
int length = Array.getLength(value);
buf.append(OPEN_BRACKET);
for (int i = 0; i < length;) {
Object val = next.execute(Array.get(value, i), context);
buf.append(val);
++i;
if (i < length) {
buf.append(separator);
}
}
buf.append(CLOSE_BRACKET);
return buf.toString();
} else {
return next.execute(value, context);
}
}
}

View File

@ -1,32 +1,39 @@
package ch.psi.daq.queryrest.response.csv; package ch.psi.daq.queryrest.response.csv;
import java.io.BufferedWriter;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
import java.io.OutputStreamWriter; import java.io.OutputStreamWriter;
import java.io.Writer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.LinkedHashSet; import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Set; import java.util.Set;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function; import java.util.function.Function;
import java.util.function.ToLongFunction;
import java.util.stream.Stream; import java.util.stream.Stream;
import javax.annotation.Resource; import javax.annotation.Resource;
import javax.servlet.ServletResponse; import javax.servlet.ServletResponse;
import javax.servlet.http.HttpServletResponse; import javax.servlet.http.HttpServletResponse;
import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVPrinter;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.commons.lang3.tuple.Triple; import org.apache.commons.lang3.tuple.Triple;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.supercsv.cellprocessor.constraint.NotNull;
import org.supercsv.cellprocessor.ift.CellProcessor;
import org.supercsv.io.dozer.CsvDozerBeanWriter;
import org.supercsv.io.dozer.ICsvDozerBeanWriter;
import org.supercsv.prefs.CsvPreference;
import com.fasterxml.jackson.core.JsonEncoding; import com.fasterxml.jackson.core.JsonEncoding;
import ch.psi.daq.common.stream.StreamIterable;
import ch.psi.daq.common.stream.StreamMatcher;
import ch.psi.daq.domain.DataEvent; import ch.psi.daq.domain.DataEvent;
import ch.psi.daq.domain.json.ChannelName; import ch.psi.daq.domain.json.ChannelName;
import ch.psi.daq.query.analyzer.QueryAnalyzer; import ch.psi.daq.query.analyzer.QueryAnalyzer;
@ -44,8 +51,15 @@ import ch.psi.daq.queryrest.response.AbstractResponseStreamWriter;
*/ */
public class CSVResponseStreamWriter extends AbstractResponseStreamWriter { public class CSVResponseStreamWriter extends AbstractResponseStreamWriter {
private static final char DELIMITER_CVS = ';'; public static final char DELIMITER_CVS = ';';
private static final char DELIMITER_ARRAY = ','; public static final String DELIMITER_ARRAY = ",";
public static final char DELIMITER_CHANNELNAME_FIELDNAME = '.';
public static final String EMPTY_VALUE = "";
private static final Function<Pair<ChannelName, DataEvent>, ChannelName> KEY_PROVIDER = (pair) -> pair.getKey();
// try to match sync data (bsread) with non sync data (epics) based on the time usin 10 millis
// buckets.
private static final ToLongFunction<Pair<ChannelName, DataEvent>> MATCHER_PROVIDER = (pair) -> pair.getValue()
.getGlobalMillis() / 10L;
private static final Logger LOGGER = LoggerFactory.getLogger(CSVResponseStreamWriter.class); private static final Logger LOGGER = LoggerFactory.getLogger(CSVResponseStreamWriter.class);
@ -65,60 +79,82 @@ public class CSVResponseStreamWriter extends AbstractResponseStreamWriter {
private void respondInternal(List<Entry<DAQQueryElement, Stream<Triple<BackendQuery, ChannelName, ?>>>> results, private void respondInternal(List<Entry<DAQQueryElement, Stream<Triple<BackendQuery, ChannelName, ?>>>> results,
ResponseOptions options, HttpServletResponse response) throws Exception { ResponseOptions options, HttpServletResponse response) throws Exception {
AtomicReference<Exception> exception = new AtomicReference<>(); AtomicReference<Exception> exception = new AtomicReference<>();
OutputStream out = handleCompressionAndResponseHeaders(options, response, CONTENT_TYPE_CSV);
List<ICsvDozerBeanWriter> beanWriters = new ArrayList<>();
final OutputStream out = handleCompressionAndResponseHeaders(options, response, CONTENT_TYPE_CSV);
final Map<ChannelName, Stream<Pair<ChannelName, DataEvent>>> streams = new LinkedHashMap<>(results.size());
final List<String> header = new ArrayList<>();
final Collection<Pair<ChannelName, Function<DataEvent, String>>> accessors = new ArrayList<>();
// prepare the streams
results.forEach(entry -> { results.forEach(entry -> {
DAQQueryElement query = entry.getKey(); final DAQQueryElement query = entry.getKey();
Set<QueryField> queryFields = query.getFields();
List<Aggregation> aggregations = query.getAggregations();
int aggregationsSize = (aggregations != null ? aggregations.size() : 0);
Set<String> fieldMapping = new LinkedHashSet<>(queryFields.size() + aggregationsSize);
List<String> header = new ArrayList<>(queryFields.size() + aggregationsSize);
AtomicReference<CellProcessor[]> processorsRef = new AtomicReference<>();
entry.getValue() entry.getValue()
.sequential() .sequential()
.forEach(triple -> { .forEach(triple -> {
try { if (triple.getRight() instanceof Stream) {
CellProcessor[] processors = processorsRef.get(); setupChannelColumns(query, triple.getLeft(), triple.getMiddle(), header, accessors);
ICsvDozerBeanWriter beanWriter;
if (processors == null) { final Stream<Pair<ChannelName, DataEvent>> eventStream = ((Stream<DataEvent>) triple.getRight())
processors = setupCellProcessors(query, triple.getLeft(), fieldMapping, header); .map(dataEvent -> Pair.of(triple.getMiddle(), dataEvent));
processorsRef.set(processors); streams.put(triple.getMiddle(), eventStream);
} else {
beanWriter = setupBeanWriter(fieldMapping, out); final String message = String.format("Expect a DataEvent Stream for '%s'.", triple.getMiddle());
beanWriters.add(beanWriter); LOGGER.warn(message);
} else {
beanWriter = beanWriters.get(beanWriters.size() - 1);
}
writeToOutput(triple, processors, beanWriter, header);
} catch (Exception e) {
LOGGER.warn("Could not write CSV of '{}'", triple.getMiddle(), e);
exception.compareAndSet(null, e);
} }
}); });
if (!beanWriters.isEmpty()) {
try {
beanWriters.get(beanWriters.size() - 1).flush();
} catch (Exception e) {
LOGGER.error("Could not flush ICsvDozerBeanWriter.", e);
exception.compareAndSet(null, e);
}
}
}); });
for (ICsvDozerBeanWriter beanWriter : beanWriters) { // online matching of the stream's content
StreamMatcher<ChannelName, Pair<ChannelName, DataEvent>> streamMatcher =
new StreamMatcher<>(KEY_PROVIDER, MATCHER_PROVIDER, streams.values());
Iterator<Map<ChannelName, Pair<ChannelName, DataEvent>>> streamsMatchIter = streamMatcher.iterator();
// prepare csv output
CSVFormat csvFormat = CSVFormat.EXCEL.withDelimiter(DELIMITER_CVS);
Writer writer = null;
CSVPrinter csvFilePrinter = null;
try {
writer = new BufferedWriter(new OutputStreamWriter(out, StandardCharsets.UTF_8));
// writer = new BufferedWriter(new OutputStreamWriter(out,
// response.getCharacterEncoding()));
csvFilePrinter = new CSVPrinter(writer, csvFormat);
csvFilePrinter.printRecord(header);
while (streamsMatchIter.hasNext()) {
final Map<ChannelName, Pair<ChannelName, DataEvent>> match = streamsMatchIter.next();
// ensure correct order
Stream<String> rowStream = accessors.stream().sequential()
.map(accessorPair -> {
Pair<ChannelName, DataEvent> eventPair = match.get(accessorPair.getKey());
if (eventPair != null) {
return accessorPair.getValue().apply(eventPair.getValue());
} else {
return EMPTY_VALUE;
}
});
csvFilePrinter.printRecord(new StreamIterable<String>(rowStream));
}
} catch (IOException e) {
LOGGER.error("Could not write CSV.", e);
exception.compareAndSet(null, e);
} finally {
try { try {
beanWriter.close(); if (writer != null) {
} catch (Exception e) { writer.flush();
LOGGER.error("Could not close ICsvDozerBeanWriter.", e); writer.close();
}
if (csvFilePrinter != null) {
csvFilePrinter.close();
}
} catch (IOException e) {
LOGGER.error("Could not close CSV writer.", e);
exception.compareAndSet(null, e); exception.compareAndSet(null, e);
} }
} }
@ -128,95 +164,29 @@ public class CSVResponseStreamWriter extends AbstractResponseStreamWriter {
} }
} }
/**
* Sets up the bean writer instance.
*
* @throws Exception
*
*/
private ICsvDozerBeanWriter setupBeanWriter(Set<String> fieldMapping, OutputStream out) throws Exception {
CsvPreference preference = new CsvPreference.Builder(
(char) CsvPreference.EXCEL_NORTH_EUROPE_PREFERENCE.getQuoteChar(),
DELIMITER_CVS,
CsvPreference.EXCEL_NORTH_EUROPE_PREFERENCE.getEndOfLineSymbols()).build();
ICsvDozerBeanWriter beanWriter = new CsvDozerBeanWriter(new OutputStreamWriter(out), preference); private void setupChannelColumns(DAQQueryElement daqQuery, BackendQuery backendQuery, ChannelName channelName,
// configure the mapping from the fields to the CSV columns Collection<String> header, Collection<Pair<ChannelName, Function<DataEvent, String>>> accessors) {
beanWriter.configureBeanMapping(DataEvent.class, fieldMapping.toArray(new String[fieldMapping.size()]));
return beanWriter;
}
/**
* Sets up the array of {@link CellProcessor}s needed for later configuration of the bean writer.
*
* Cell processors are an integral part of reading and writing with Super CSV - they automate the
* data type conversions, and enforce constraints. They implement the chain of responsibility
* design pattern - each processor has a single, well-defined purpose and can be chained together
* with other processors to fully automate all of the required conversions and constraint
* validation for a single CSV column.
*
* @param daqQuery The current {@link DAQQueryElement}
* @param backendQuery One BackendQuery of the current {@link DAQQueryElement}
* @return Array of {@link CellProcessor} entries
*/
private CellProcessor[] setupCellProcessors(DAQQueryElement daqQuery, BackendQuery backendQuery,
Set<String> fieldMapping, List<String> header) {
Set<QueryField> queryFields = daqQuery.getFields(); Set<QueryField> queryFields = daqQuery.getFields();
List<Aggregation> aggregations = daqQuery.getAggregations(); List<Aggregation> aggregations = daqQuery.getAggregations();
List<CellProcessor> processorSet =
new ArrayList<>(queryFields.size() + (aggregations != null ? aggregations.size() : 0));
boolean isNewField;
QueryAnalyzer queryAnalyzer = queryAnalizerFactory.apply(backendQuery); QueryAnalyzer queryAnalyzer = queryAnalizerFactory.apply(backendQuery);
for (QueryField field : queryFields) { for (QueryField field : queryFields) {
if (!(QueryField.value.equals(field) && queryAnalyzer.isAggregationEnabled())) { if (!(QueryField.value.equals(field) && queryAnalyzer.isAggregationEnabled())) {
isNewField = fieldMapping.add(field.name()); header.add(channelName.getName() + DELIMITER_CHANNELNAME_FIELDNAME + field.name());
if (isNewField) { accessors.add(Pair.of(channelName, new QueryFieldStringifyer(field.getAccessor(), EMPTY_VALUE,
header.add(field.name()); DELIMITER_ARRAY)));
processorSet.add(new ArrayProcessor(new NotNull(), DELIMITER_ARRAY));
}
} }
} }
if (aggregations != null && queryAnalyzer.isAggregationEnabled()) { if (aggregations != null && queryAnalyzer.isAggregationEnabled()) {
for (Aggregation aggregation : daqQuery.getAggregations()) { for (Aggregation aggregation : daqQuery.getAggregations()) {
isNewField = fieldMapping.add("value." + aggregation.name()); header.add(channelName.getName() + DELIMITER_CHANNELNAME_FIELDNAME + QueryField.value.name()
+ DELIMITER_CHANNELNAME_FIELDNAME + aggregation.name());
if (isNewField) { accessors.add(Pair.of(channelName, new AggregationStringifyer(aggregation.getAccessor(), EMPTY_VALUE)));
header.add(aggregation.name());
processorSet.add(new ArrayProcessor(new NotNull(), DELIMITER_ARRAY));
}
} }
} }
return processorSet.toArray(new CellProcessor[processorSet.size()]);
} }
@SuppressWarnings("unchecked")
private void writeToOutput(Triple<BackendQuery, ChannelName, ?> triple, CellProcessor[] processors,
ICsvDozerBeanWriter beanWriter, List<String> header) throws IOException {
if (triple.getRight() instanceof Stream) {
beanWriter.writeComment("");
beanWriter.writeComment("Start of " + triple.getMiddle());
beanWriter.writeHeader(header.toArray(new String[header.size()]));
Stream<DataEvent> eventStream = (Stream<DataEvent>) triple.getRight();
eventStream
.forEach(
event -> {
try {
beanWriter.write(event, processors);
} catch (Exception e) {
LOGGER.error("Could not write elements for '{}-{}'", event.getChannel(),
event.getPulseId(), e);
}
});
} else {
String message = "Type '" + triple.getRight().getClass() + "' not supported.";
LOGGER.error(message);
}
}
} }

View File

@ -0,0 +1,51 @@
package ch.psi.daq.queryrest.response.csv;
import java.lang.reflect.Array;
import java.util.function.Function;
import ch.psi.daq.domain.DataEvent;
public class QueryFieldStringifyer implements Function<DataEvent, String> {
public static final String OPEN_BRACKET = "[";
public static final String CLOSE_BRACKET = "]";
private Function<DataEvent, Object> accessor;
private String nonValue;
private String arraySeparator;
public QueryFieldStringifyer(Function<DataEvent, Object> accessor, String nonValue, String arraySeparator) {
this.accessor = accessor;
this.nonValue = nonValue;
this.arraySeparator = arraySeparator;
}
@Override
public String apply(DataEvent event) {
if (event == null) {
return nonValue;
}
Object value = accessor.apply(event);
if (value == null) {
return nonValue;
} else if (value.getClass().isArray()) {
StringBuilder buf = new StringBuilder();
int length = Array.getLength(value);
buf.append(OPEN_BRACKET);
for (int i = 0; i < length;) {
buf.append(Array.get(value, i));
++i;
if (i < length) {
buf.append(arraySeparator);
}
}
buf.append(CLOSE_BRACKET);
return buf.toString();
} else {
return value.toString();
}
}
}

View File

@ -1,36 +1,25 @@
package ch.psi.daq.test.queryrest.query; package ch.psi.daq.test.queryrest.query;
import java.math.BigDecimal;
import java.util.List; import java.util.List;
import java.util.Random;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import java.util.stream.LongStream;
import java.util.stream.Stream; import java.util.stream.Stream;
import org.apache.commons.lang3.ArrayUtils;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import ch.psi.daq.common.time.TimeUtils;
import ch.psi.daq.domain.StreamEvent; import ch.psi.daq.domain.StreamEvent;
import ch.psi.daq.domain.cassandra.ChannelEvent;
import ch.psi.daq.domain.cassandra.FieldNames;
import ch.psi.daq.domain.cassandra.query.PulseIdRangeQuery; import ch.psi.daq.domain.cassandra.query.PulseIdRangeQuery;
import ch.psi.daq.domain.cassandra.query.TimeRangeQuery; import ch.psi.daq.domain.cassandra.query.TimeRangeQuery;
import ch.psi.daq.domain.cassandra.utils.TablePropertiesUtils;
import ch.psi.daq.domain.reader.Backend; import ch.psi.daq.domain.reader.Backend;
import ch.psi.daq.domain.reader.DataReader; import ch.psi.daq.domain.reader.DataReader;
public class DummyArchiverApplianceReader implements DataReader { public class DummyArchiverApplianceReader implements DataReader {
public static final String ARCHIVER_TEST_CHANNEL = "ArchiverTestChannel_"; public static final String ARCHIVER_TEST_CHANNEL = "ArchiverTestChannel_";
private static final int KEYSPACE = 1;
public static final String TEST_CHANNEL_1 = "ArchiverChannel_1"; public static final String TEST_CHANNEL_1 = "ArchiverChannel_1";
public static final String TEST_CHANNEL_2 = "ArchiverChannel_2"; public static final String TEST_CHANNEL_2 = "ArchiverChannel_2";
private List<String> channels = Lists.newArrayList(TEST_CHANNEL_1, TEST_CHANNEL_2); private List<String> channels = Lists.newArrayList(TEST_CHANNEL_1, TEST_CHANNEL_2);
private final Random random = new Random(0);
private AtomicLong channelNameCallCounter = new AtomicLong(); private AtomicLong channelNameCallCounter = new AtomicLong();
@ -54,78 +43,14 @@ public class DummyArchiverApplianceReader implements DataReader {
@Override @Override
public Stream<? extends StreamEvent> getEventStream(PulseIdRangeQuery query) { public Stream<? extends StreamEvent> getEventStream(PulseIdRangeQuery query) {
return getDummyEventStream(query.getChannel(), query.getStartPulseId(), query.getEndPulseId(), return DummyCassandraReader.getDummyEventStream(query.getChannel(), query.getStartPulseId(), query.getEndPulseId(),
query.getEventColumns()) query.getEventColumns())
.filter(query.getFilterOrDefault(NO_OP_FILTER)); .filter(query.getFilterOrDefault(NO_OP_FILTER));
} }
@Override @Override
public Stream<? extends StreamEvent> getEventStream(TimeRangeQuery query) { public Stream<? extends StreamEvent> getEventStream(TimeRangeQuery query) {
return getDummyEventStream(query.getChannel(), query.getStartMillis() / 10, query.getEndMillis() / 10) return DummyCassandraReader.getDummyEventStream(query.getChannel(), query.getStartMillis() / 10, query.getEndMillis() / 10)
.filter(query.getFilterOrDefault(NO_OP_FILTER)); .filter(query.getFilterOrDefault(NO_OP_FILTER));
} }
private Stream<? extends StreamEvent> getDummyEventStream(String channelParam, long startIndex, long endIndex,
String... columns) {
String channelLower = channelParam.toLowerCase();
String channel =
(columns == null || columns.length == 0 || ArrayUtils.contains(columns, FieldNames.FIELD_CHANNEL)) ? channelParam
: null;
Stream<? extends StreamEvent> eventStream =
LongStream.rangeClosed(startIndex, endIndex).mapToObj(
i -> {
BigDecimal iocTime =
(columns == null || columns.length == 0 || ArrayUtils.contains(columns,
FieldNames.FIELD_IOC_TIME)) ? TimeUtils.getTimeFromMillis(i * 10, 0)
: TablePropertiesUtils.DEFAULT_VALUE_DECIMAL;
BigDecimal globalTime =
(columns == null || columns.length == 0 || ArrayUtils.contains(columns,
FieldNames.FIELD_GLOBAL_TIME)) ? TimeUtils.getTimeFromMillis(i * 10, 0)
: TablePropertiesUtils.DEFAULT_VALUE_DECIMAL;
long pulseId =
(columns == null || columns.length == 0 || ArrayUtils.contains(columns,
FieldNames.FIELD_PULSE_ID)) ? i : TablePropertiesUtils.DEFAULT_VALUE_BIGINT_PRIMITIVE;
if (channelLower.contains("waveform")) {
long[] value = random.longs(2048).toArray();
value[0] = i;
return new ChannelEvent(
channel,
iocTime,
pulseId,
globalTime,
KEYSPACE,
value
);
} else if (channelLower.contains("image")) {
int x = 640;
int y = 480;
int[] shape = new int[] {x, y};
long[] value = random.longs(x * y).toArray();
value[0] = i;
return new ChannelEvent(
channel,
iocTime,
pulseId,
globalTime,
KEYSPACE,
value,
shape
);
} else {
return new ChannelEvent(
channel,
iocTime,
pulseId,
globalTime,
KEYSPACE,
i
);
}
});
return eventStream;
}
} }

View File

@ -4,8 +4,10 @@
package ch.psi.daq.test.queryrest.query; package ch.psi.daq.test.queryrest.query;
import java.math.BigDecimal; import java.math.BigDecimal;
import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.Random; import java.util.Random;
import java.util.StringJoiner;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
@ -39,11 +41,11 @@ import ch.psi.daq.domain.reader.Backend;
public class DummyCassandraReader implements CassandraReader { public class DummyCassandraReader implements CassandraReader {
private static final Logger LOGGER = LoggerFactory.getLogger(DummyCassandraReader.class); private static final Logger LOGGER = LoggerFactory.getLogger(DummyCassandraReader.class);
public static final String DATABUFFER_TEST_CHANNEL = "DataBufferTestChannel_"; public static final String DATABUFFER_TEST_CHANNEL = "DataBufferTestChannel_";
private static final Random random = new Random(0);
private static final int KEYSPACE = 1; private static final int KEYSPACE = 1;
private CassandraDataGen dataGen; private CassandraDataGen dataGen;
private List<String> channels; private List<String> channels;
private Random random = new Random(0);
private AtomicLong channelNameCallCounter = new AtomicLong(); private AtomicLong channelNameCallCounter = new AtomicLong();
/** /**
@ -100,7 +102,8 @@ public class DummyCassandraReader implements CassandraReader {
@Override @Override
public Stream<ChannelEvent> getEventStream(PulseIdRangeQuery query) { public Stream<ChannelEvent> getEventStream(PulseIdRangeQuery query) {
return getDummyEventStream(query.getChannel(), query.getStartPulseId(), query.getEndPulseId(), query.getEventColumns()) return getDummyEventStream(query.getChannel(), query.getStartPulseId(), query.getEndPulseId(),
query.getEventColumns())
.filter(query.getFilterOrDefault(NO_OP_FILTER)); .filter(query.getFilterOrDefault(NO_OP_FILTER));
} }
@ -126,58 +129,87 @@ public class DummyCassandraReader implements CassandraReader {
return result; return result;
} }
private Stream<ChannelEvent> getDummyEventStream(String channelParam, long startIndex, long endIndex, String... columns) { public static Stream<ChannelEvent> getDummyEventStream(String channelParam, long startIndex, long endIndex,
String... columns) {
String channelLower = channelParam.toLowerCase(); String channelLower = channelParam.toLowerCase();
String channel = (columns == null || columns.length == 0 || ArrayUtils.contains(columns, FieldNames.FIELD_CHANNEL)) ? channelParam : null; String channel =
(columns == null || columns.length == 0 || ArrayUtils.contains(columns, FieldNames.FIELD_CHANNEL)) ? channelParam
: null;
Stream<ChannelEvent> eventStream = LongStream.rangeClosed(startIndex, endIndex).mapToObj(i -> { Stream<Long> rangeStream;
BigDecimal iocTime = (columns == null || columns.length == 0 || ArrayUtils.contains(columns, FieldNames.FIELD_IOC_TIME)) ? TimeUtils.getTimeFromMillis(i * 10, 0) : TablePropertiesUtils.DEFAULT_VALUE_DECIMAL;
BigDecimal globalTime = (columns == null || columns.length == 0 || ArrayUtils.contains(columns, FieldNames.FIELD_GLOBAL_TIME)) ? TimeUtils.getTimeFromMillis(i * 10, 0) : TablePropertiesUtils.DEFAULT_VALUE_DECIMAL;
long pulseId = (columns == null || columns.length == 0 || ArrayUtils.contains(columns, FieldNames.FIELD_PULSE_ID)) ? i : TablePropertiesUtils.DEFAULT_VALUE_BIGINT_PRIMITIVE;
if (channelLower.contains("waveform")) { if (channelParam.contains("[") && channelParam.contains("]")) {
long[] value = random.longs(2048).toArray(); rangeStream =
value[0] = i; Arrays.stream(
return new ChannelEvent( channelParam.substring(
channel, channelParam.indexOf("[") + 1,
iocTime, channelParam.indexOf("]"))
pulseId, .split(",")
globalTime, )
KEYSPACE, .map(str -> str.trim())
value .map(Long::parseLong);
); } else {
rangeStream = LongStream.rangeClosed(startIndex, endIndex).boxed();
}
} else if (channelLower.contains("image")) { Stream<ChannelEvent> eventStream =
int x = 640; rangeStream.map(
int y = 480; i -> {
int[] shape = new int[] {x, y}; BigDecimal iocTime =
long[] value = random.longs(x * y).toArray(); (columns == null || columns.length == 0 || ArrayUtils.contains(columns,
value[0] = i; FieldNames.FIELD_IOC_TIME)) ? TimeUtils.getTimeFromMillis(i * 10, 0)
return new ChannelEvent( : TablePropertiesUtils.DEFAULT_VALUE_DECIMAL;
channel, BigDecimal globalTime =
iocTime, (columns == null || columns.length == 0 || ArrayUtils.contains(columns,
pulseId, FieldNames.FIELD_GLOBAL_TIME)) ? TimeUtils.getTimeFromMillis(i * 10, 0)
globalTime, : TablePropertiesUtils.DEFAULT_VALUE_DECIMAL;
KEYSPACE, long pulseId =
value, (columns == null || columns.length == 0 || ArrayUtils.contains(columns,
shape FieldNames.FIELD_PULSE_ID)) ? i : TablePropertiesUtils.DEFAULT_VALUE_BIGINT_PRIMITIVE;
);
} else { if (channelLower.contains("waveform")) {
return new ChannelEvent( long[] value = random.longs(2048).toArray();
channel, value[0] = i;
iocTime, return new ChannelEvent(
pulseId, channel,
globalTime, iocTime,
KEYSPACE, pulseId,
i globalTime,
); KEYSPACE,
} value
}); );
} else if (channelLower.contains("image")) {
int x = 640;
int y = 480;
int[] shape = new int[] {x, y};
long[] value = random.longs(x * y).toArray();
value[0] = i;
return new ChannelEvent(
channel,
iocTime,
pulseId,
globalTime,
KEYSPACE,
value,
shape
);
} else {
return new ChannelEvent(
channel,
iocTime,
pulseId,
globalTime,
KEYSPACE,
i
);
}
});
return eventStream; return eventStream;
} }
private List<? extends DataEvent> getDummyEvents(String channel, long startIndex, long endIndex, String...columns) { private List<? extends DataEvent> getDummyEvents(String channel, long startIndex, long endIndex, String... columns) {
return getDummyEventStream(channel, startIndex, endIndex, columns).collect(Collectors.toList()); return getDummyEventStream(channel, startIndex, endIndex, columns).collect(Collectors.toList());
} }
@ -205,7 +237,8 @@ public class DummyCassandraReader implements CassandraReader {
@Override @Override
public ChannelEvent getEvent(MetaChannelEvent queryInfo, String... columns) { public ChannelEvent getEvent(MetaChannelEvent queryInfo, String... columns) {
if (queryInfo.getPulseId() > 0) { if (queryInfo.getPulseId() > 0) {
return (ChannelEvent) getDummyEvents(queryInfo.getChannel(), queryInfo.getPulseId(), queryInfo.getPulseId(), columns) return (ChannelEvent) getDummyEvents(queryInfo.getChannel(), queryInfo.getPulseId(), queryInfo.getPulseId(),
columns)
.get(0); .get(0);
} }
return (ChannelEvent) getDummyEvents(queryInfo.getChannel(), queryInfo.getGlobalMillis() / 10, return (ChannelEvent) getDummyEvents(queryInfo.getChannel(), queryInfo.getGlobalMillis() / 10,