diff --git a/.settings/org.eclipse.jdt.core.prefs b/.settings/org.eclipse.jdt.core.prefs index c4b8c74..c96f4fb 100644 --- a/.settings/org.eclipse.jdt.core.prefs +++ b/.settings/org.eclipse.jdt.core.prefs @@ -1,5 +1,5 @@ # -#Thu Mar 24 11:36:20 CET 2016 +#Thu Apr 14 08:26:07 CEST 2016 org.eclipse.jdt.core.compiler.debug.localVariable=generate org.eclipse.jdt.core.compiler.compliance=1.8 org.eclipse.jdt.core.compiler.codegen.unusedLocal=preserve diff --git a/build.gradle b/build.gradle index 2188b20..c6306c6 100644 --- a/build.gradle +++ b/build.gradle @@ -31,8 +31,7 @@ dependencies { } compile libraries.commons_lang compile libraries.commons_io - compile libraries.super_csv - compile libraries.super_csv_dozer + compile libraries.commons_csv testCompile libraries.spring_boot_starter_test testCompile libraries.jsonassert diff --git a/src/main/java/ch/psi/daq/queryrest/controller/QueryRestController.java b/src/main/java/ch/psi/daq/queryrest/controller/QueryRestController.java index 75bd5ec..6d36ca5 100644 --- a/src/main/java/ch/psi/daq/queryrest/controller/QueryRestController.java +++ b/src/main/java/ch/psi/daq/queryrest/controller/QueryRestController.java @@ -2,6 +2,7 @@ package ch.psi.daq.queryrest.controller; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.LinkedHashMap; import java.util.List; @@ -17,6 +18,7 @@ import javax.annotation.Resource; import javax.servlet.http.HttpServletResponse; import javax.validation.Valid; +import org.apache.commons.lang3.ArrayUtils; import org.apache.commons.lang3.tuple.Pair; import org.apache.commons.lang3.tuple.Triple; import org.slf4j.Logger; @@ -39,6 +41,7 @@ import ch.psi.daq.cassandra.request.validate.RequestProviderValidator; import ch.psi.daq.common.json.deserialize.AttributeBasedDeserializer; import ch.psi.daq.common.ordering.Ordering; import ch.psi.daq.domain.DataEvent; +import ch.psi.daq.domain.cassandra.FieldNames; import ch.psi.daq.domain.json.ChannelName; import ch.psi.daq.domain.reader.Backend; import ch.psi.daq.query.analyzer.QueryAnalyzer; @@ -61,6 +64,7 @@ import ch.psi.daq.queryrest.response.json.JSONResponseStreamWriter; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Lists; +import com.hazelcast.nio.serialization.FieldType; @RestController public class QueryRestController { @@ -281,6 +285,11 @@ public class QueryRestController { LOGGER.warn(message); throw new IllegalArgumentException(message); } + + + if(!ArrayUtils.contains(query.getColumns(), FieldNames.FIELD_GLOBAL_TIME)){ + query.addField(QueryField.globalMillis); + } } try { diff --git a/src/main/java/ch/psi/daq/queryrest/response/csv/AggregationStringifyer.java b/src/main/java/ch/psi/daq/queryrest/response/csv/AggregationStringifyer.java new file mode 100644 index 0000000..fcfac37 --- /dev/null +++ b/src/main/java/ch/psi/daq/queryrest/response/csv/AggregationStringifyer.java @@ -0,0 +1,25 @@ +package ch.psi.daq.queryrest.response.csv; + +import java.util.function.Function; +import java.util.function.ToDoubleFunction; + +import ch.psi.daq.domain.DataEvent; + +public class AggregationStringifyer implements Function { + private ToDoubleFunction accessor; + private String nonValue; + + public AggregationStringifyer(ToDoubleFunction accessor, String nonValue) { + this.accessor = accessor; + this.nonValue = nonValue; + } + + @Override + public String apply(DataEvent event) { + if (event == null) { + return nonValue; + }else{ + return Double.toString(accessor.applyAsDouble(event)); + } + } +} diff --git a/src/main/java/ch/psi/daq/queryrest/response/csv/ArrayProcessor.java b/src/main/java/ch/psi/daq/queryrest/response/csv/ArrayProcessor.java deleted file mode 100644 index 543fc1b..0000000 --- a/src/main/java/ch/psi/daq/queryrest/response/csv/ArrayProcessor.java +++ /dev/null @@ -1,57 +0,0 @@ -package ch.psi.daq.queryrest.response.csv; - -import java.lang.reflect.Array; - -import org.supercsv.cellprocessor.CellProcessorAdaptor; -import org.supercsv.cellprocessor.ift.CellProcessor; -import org.supercsv.util.CsvContext; - -public class ArrayProcessor extends CellProcessorAdaptor { - public static final char DEFAULT_SEPARATOR = ','; - public static final String OPEN_BRACKET = "["; - public static final String CLOSE_BRACKET = "]"; - - private char separator = DEFAULT_SEPARATOR; - - public ArrayProcessor() { - super(); - } - - public ArrayProcessor(char separator) { - super(); - this.separator = separator; - } - - public ArrayProcessor(CellProcessor next) { - super(next); - } - - public ArrayProcessor(CellProcessor next, char separator) { - super(next); - this.separator = separator; - } - - @Override - public Object execute(Object value, CsvContext context) { - if (value.getClass().isArray()) { - StringBuilder buf = new StringBuilder(); - - int length = Array.getLength(value); - buf.append(OPEN_BRACKET); - for (int i = 0; i < length;) { - Object val = next.execute(Array.get(value, i), context); - buf.append(val); - - ++i; - if (i < length) { - buf.append(separator); - } - } - buf.append(CLOSE_BRACKET); - - return buf.toString(); - } else { - return next.execute(value, context); - } - } -} diff --git a/src/main/java/ch/psi/daq/queryrest/response/csv/CSVResponseStreamWriter.java b/src/main/java/ch/psi/daq/queryrest/response/csv/CSVResponseStreamWriter.java index 89233f6..d3b6e58 100644 --- a/src/main/java/ch/psi/daq/queryrest/response/csv/CSVResponseStreamWriter.java +++ b/src/main/java/ch/psi/daq/queryrest/response/csv/CSVResponseStreamWriter.java @@ -1,32 +1,39 @@ package ch.psi.daq.queryrest.response.csv; +import java.io.BufferedWriter; import java.io.IOException; import java.io.OutputStream; import java.io.OutputStreamWriter; +import java.io.Writer; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; -import java.util.LinkedHashSet; +import java.util.Collection; +import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; +import java.util.function.ToLongFunction; import java.util.stream.Stream; import javax.annotation.Resource; import javax.servlet.ServletResponse; import javax.servlet.http.HttpServletResponse; +import org.apache.commons.csv.CSVFormat; +import org.apache.commons.csv.CSVPrinter; +import org.apache.commons.lang3.tuple.Pair; import org.apache.commons.lang3.tuple.Triple; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.supercsv.cellprocessor.constraint.NotNull; -import org.supercsv.cellprocessor.ift.CellProcessor; -import org.supercsv.io.dozer.CsvDozerBeanWriter; -import org.supercsv.io.dozer.ICsvDozerBeanWriter; -import org.supercsv.prefs.CsvPreference; import com.fasterxml.jackson.core.JsonEncoding; +import ch.psi.daq.common.stream.StreamIterable; +import ch.psi.daq.common.stream.StreamMatcher; import ch.psi.daq.domain.DataEvent; import ch.psi.daq.domain.json.ChannelName; import ch.psi.daq.query.analyzer.QueryAnalyzer; @@ -44,8 +51,15 @@ import ch.psi.daq.queryrest.response.AbstractResponseStreamWriter; */ public class CSVResponseStreamWriter extends AbstractResponseStreamWriter { - private static final char DELIMITER_CVS = ';'; - private static final char DELIMITER_ARRAY = ','; + public static final char DELIMITER_CVS = ';'; + public static final String DELIMITER_ARRAY = ","; + public static final char DELIMITER_CHANNELNAME_FIELDNAME = '.'; + public static final String EMPTY_VALUE = ""; + private static final Function, ChannelName> KEY_PROVIDER = (pair) -> pair.getKey(); + // try to match sync data (bsread) with non sync data (epics) based on the time usin 10 millis + // buckets. + private static final ToLongFunction> MATCHER_PROVIDER = (pair) -> pair.getValue() + .getGlobalMillis() / 10L; private static final Logger LOGGER = LoggerFactory.getLogger(CSVResponseStreamWriter.class); @@ -65,60 +79,82 @@ public class CSVResponseStreamWriter extends AbstractResponseStreamWriter { private void respondInternal(List>>> results, ResponseOptions options, HttpServletResponse response) throws Exception { AtomicReference exception = new AtomicReference<>(); - OutputStream out = handleCompressionAndResponseHeaders(options, response, CONTENT_TYPE_CSV); - List beanWriters = new ArrayList<>(); + final OutputStream out = handleCompressionAndResponseHeaders(options, response, CONTENT_TYPE_CSV); + + final Map>> streams = new LinkedHashMap<>(results.size()); + final List header = new ArrayList<>(); + final Collection>> accessors = new ArrayList<>(); + + // prepare the streams results.forEach(entry -> { - DAQQueryElement query = entry.getKey(); - - Set queryFields = query.getFields(); - List aggregations = query.getAggregations(); - int aggregationsSize = (aggregations != null ? aggregations.size() : 0); - - Set fieldMapping = new LinkedHashSet<>(queryFields.size() + aggregationsSize); - List header = new ArrayList<>(queryFields.size() + aggregationsSize); - - AtomicReference processorsRef = new AtomicReference<>(); + final DAQQueryElement query = entry.getKey(); entry.getValue() .sequential() .forEach(triple -> { - try { - CellProcessor[] processors = processorsRef.get(); - ICsvDozerBeanWriter beanWriter; + if (triple.getRight() instanceof Stream) { + setupChannelColumns(query, triple.getLeft(), triple.getMiddle(), header, accessors); - if (processors == null) { - processors = setupCellProcessors(query, triple.getLeft(), fieldMapping, header); - processorsRef.set(processors); - - beanWriter = setupBeanWriter(fieldMapping, out); - beanWriters.add(beanWriter); - } else { - beanWriter = beanWriters.get(beanWriters.size() - 1); - } - - writeToOutput(triple, processors, beanWriter, header); - } catch (Exception e) { - LOGGER.warn("Could not write CSV of '{}'", triple.getMiddle(), e); - exception.compareAndSet(null, e); + final Stream> eventStream = ((Stream) triple.getRight()) + .map(dataEvent -> Pair.of(triple.getMiddle(), dataEvent)); + streams.put(triple.getMiddle(), eventStream); + } else { + final String message = String.format("Expect a DataEvent Stream for '%s'.", triple.getMiddle()); + LOGGER.warn(message); } }); - - if (!beanWriters.isEmpty()) { - try { - beanWriters.get(beanWriters.size() - 1).flush(); - } catch (Exception e) { - LOGGER.error("Could not flush ICsvDozerBeanWriter.", e); - exception.compareAndSet(null, e); - } - } }); - for (ICsvDozerBeanWriter beanWriter : beanWriters) { + // online matching of the stream's content + StreamMatcher> streamMatcher = + new StreamMatcher<>(KEY_PROVIDER, MATCHER_PROVIDER, streams.values()); + Iterator>> streamsMatchIter = streamMatcher.iterator(); + + // prepare csv output + CSVFormat csvFormat = CSVFormat.EXCEL.withDelimiter(DELIMITER_CVS); + Writer writer = null; + CSVPrinter csvFilePrinter = null; + + try { + writer = new BufferedWriter(new OutputStreamWriter(out, StandardCharsets.UTF_8)); + // writer = new BufferedWriter(new OutputStreamWriter(out, + // response.getCharacterEncoding())); + csvFilePrinter = new CSVPrinter(writer, csvFormat); + + csvFilePrinter.printRecord(header); + + while (streamsMatchIter.hasNext()) { + final Map> match = streamsMatchIter.next(); + + // ensure correct order + Stream rowStream = accessors.stream().sequential() + .map(accessorPair -> { + Pair eventPair = match.get(accessorPair.getKey()); + if (eventPair != null) { + return accessorPair.getValue().apply(eventPair.getValue()); + } else { + return EMPTY_VALUE; + } + }); + + csvFilePrinter.printRecord(new StreamIterable(rowStream)); + } + } catch (IOException e) { + LOGGER.error("Could not write CSV.", e); + exception.compareAndSet(null, e); + } finally { try { - beanWriter.close(); - } catch (Exception e) { - LOGGER.error("Could not close ICsvDozerBeanWriter.", e); + if (writer != null) { + writer.flush(); + writer.close(); + } + + if (csvFilePrinter != null) { + csvFilePrinter.close(); + } + } catch (IOException e) { + LOGGER.error("Could not close CSV writer.", e); exception.compareAndSet(null, e); } } @@ -128,95 +164,29 @@ public class CSVResponseStreamWriter extends AbstractResponseStreamWriter { } } - /** - * Sets up the bean writer instance. - * - * @throws Exception - * - */ - private ICsvDozerBeanWriter setupBeanWriter(Set fieldMapping, OutputStream out) throws Exception { - CsvPreference preference = new CsvPreference.Builder( - (char) CsvPreference.EXCEL_NORTH_EUROPE_PREFERENCE.getQuoteChar(), - DELIMITER_CVS, - CsvPreference.EXCEL_NORTH_EUROPE_PREFERENCE.getEndOfLineSymbols()).build(); - ICsvDozerBeanWriter beanWriter = new CsvDozerBeanWriter(new OutputStreamWriter(out), preference); - // configure the mapping from the fields to the CSV columns - beanWriter.configureBeanMapping(DataEvent.class, fieldMapping.toArray(new String[fieldMapping.size()])); - return beanWriter; - } - - /** - * Sets up the array of {@link CellProcessor}s needed for later configuration of the bean writer. - * - * Cell processors are an integral part of reading and writing with Super CSV - they automate the - * data type conversions, and enforce constraints. They implement the chain of responsibility - * design pattern - each processor has a single, well-defined purpose and can be chained together - * with other processors to fully automate all of the required conversions and constraint - * validation for a single CSV column. - * - * @param daqQuery The current {@link DAQQueryElement} - * @param backendQuery One BackendQuery of the current {@link DAQQueryElement} - * @return Array of {@link CellProcessor} entries - */ - private CellProcessor[] setupCellProcessors(DAQQueryElement daqQuery, BackendQuery backendQuery, - Set fieldMapping, List header) { + private void setupChannelColumns(DAQQueryElement daqQuery, BackendQuery backendQuery, ChannelName channelName, + Collection header, Collection>> accessors) { Set queryFields = daqQuery.getFields(); List aggregations = daqQuery.getAggregations(); - List processorSet = - new ArrayList<>(queryFields.size() + (aggregations != null ? aggregations.size() : 0)); - - boolean isNewField; QueryAnalyzer queryAnalyzer = queryAnalizerFactory.apply(backendQuery); for (QueryField field : queryFields) { if (!(QueryField.value.equals(field) && queryAnalyzer.isAggregationEnabled())) { - isNewField = fieldMapping.add(field.name()); + header.add(channelName.getName() + DELIMITER_CHANNELNAME_FIELDNAME + field.name()); - if (isNewField) { - header.add(field.name()); - processorSet.add(new ArrayProcessor(new NotNull(), DELIMITER_ARRAY)); - } + accessors.add(Pair.of(channelName, new QueryFieldStringifyer(field.getAccessor(), EMPTY_VALUE, + DELIMITER_ARRAY))); } } if (aggregations != null && queryAnalyzer.isAggregationEnabled()) { for (Aggregation aggregation : daqQuery.getAggregations()) { - isNewField = fieldMapping.add("value." + aggregation.name()); - - if (isNewField) { - header.add(aggregation.name()); - processorSet.add(new ArrayProcessor(new NotNull(), DELIMITER_ARRAY)); - } + header.add(channelName.getName() + DELIMITER_CHANNELNAME_FIELDNAME + QueryField.value.name() + + DELIMITER_CHANNELNAME_FIELDNAME + aggregation.name()); + accessors.add(Pair.of(channelName, new AggregationStringifyer(aggregation.getAccessor(), EMPTY_VALUE))); } } - return processorSet.toArray(new CellProcessor[processorSet.size()]); } - - @SuppressWarnings("unchecked") - private void writeToOutput(Triple triple, CellProcessor[] processors, - ICsvDozerBeanWriter beanWriter, List header) throws IOException { - if (triple.getRight() instanceof Stream) { - beanWriter.writeComment(""); - beanWriter.writeComment("Start of " + triple.getMiddle()); - beanWriter.writeHeader(header.toArray(new String[header.size()])); - - Stream eventStream = (Stream) triple.getRight(); - eventStream - .forEach( - event -> { - try { - beanWriter.write(event, processors); - } catch (Exception e) { - LOGGER.error("Could not write elements for '{}-{}'", event.getChannel(), - event.getPulseId(), e); - } - }); - } else { - String message = "Type '" + triple.getRight().getClass() + "' not supported."; - LOGGER.error(message); - } - } - } diff --git a/src/main/java/ch/psi/daq/queryrest/response/csv/QueryFieldStringifyer.java b/src/main/java/ch/psi/daq/queryrest/response/csv/QueryFieldStringifyer.java new file mode 100644 index 0000000..476c02b --- /dev/null +++ b/src/main/java/ch/psi/daq/queryrest/response/csv/QueryFieldStringifyer.java @@ -0,0 +1,51 @@ +package ch.psi.daq.queryrest.response.csv; + +import java.lang.reflect.Array; +import java.util.function.Function; + +import ch.psi.daq.domain.DataEvent; + +public class QueryFieldStringifyer implements Function { + public static final String OPEN_BRACKET = "["; + public static final String CLOSE_BRACKET = "]"; + + private Function accessor; + private String nonValue; + private String arraySeparator; + + public QueryFieldStringifyer(Function accessor, String nonValue, String arraySeparator) { + this.accessor = accessor; + this.nonValue = nonValue; + this.arraySeparator = arraySeparator; + } + + @Override + public String apply(DataEvent event) { + if (event == null) { + return nonValue; + } + + Object value = accessor.apply(event); + if (value == null) { + return nonValue; + } else if (value.getClass().isArray()) { + StringBuilder buf = new StringBuilder(); + + int length = Array.getLength(value); + buf.append(OPEN_BRACKET); + for (int i = 0; i < length;) { + buf.append(Array.get(value, i)); + + ++i; + if (i < length) { + buf.append(arraySeparator); + } + } + buf.append(CLOSE_BRACKET); + + return buf.toString(); + } else { + return value.toString(); + } + } +} diff --git a/src/test/java/ch/psi/daq/test/queryrest/controller/QueryRestControllerCsvTest.java b/src/test/java/ch/psi/daq/test/queryrest/controller/QueryRestControllerCsvTest.java index c814716..cdaebf1 100644 --- a/src/test/java/ch/psi/daq/test/queryrest/controller/QueryRestControllerCsvTest.java +++ b/src/test/java/ch/psi/daq/test/queryrest/controller/QueryRestControllerCsvTest.java @@ -5,10 +5,16 @@ import static org.junit.Assert.assertTrue; import java.io.StringReader; import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; +import java.util.Set; +import org.apache.commons.csv.CSVFormat; +import org.apache.commons.csv.CSVParser; +import org.apache.commons.csv.CSVRecord; import org.junit.After; import org.junit.Test; import org.springframework.http.MediaType; @@ -16,11 +22,6 @@ import org.springframework.test.web.servlet.MvcResult; import org.springframework.test.web.servlet.request.MockMvcRequestBuilders; import org.springframework.test.web.servlet.result.MockMvcResultHandlers; import org.springframework.test.web.servlet.result.MockMvcResultMatchers; -import org.supercsv.cellprocessor.constraint.NotNull; -import org.supercsv.cellprocessor.ift.CellProcessor; -import org.supercsv.io.CsvMapReader; -import org.supercsv.io.ICsvMapReader; -import org.supercsv.prefs.CsvPreference; import ch.psi.daq.cassandra.request.range.RequestRangeDate; import ch.psi.daq.cassandra.request.range.RequestRangePulseId; @@ -37,6 +38,7 @@ import ch.psi.daq.query.model.impl.DAQQueries; import ch.psi.daq.query.model.impl.DAQQuery; import ch.psi.daq.query.model.impl.DAQQueryElement; import ch.psi.daq.queryrest.controller.QueryRestController; +import ch.psi.daq.queryrest.response.csv.CSVResponseStreamWriter; import ch.psi.daq.test.queryrest.AbstractDaqRestTest; /** @@ -54,33 +56,24 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest { @Test public void testPulseRangeQuery() throws Exception { + List channels = Arrays.asList(TEST_CHANNEL_01, TEST_CHANNEL_02); DAQQuery request = new DAQQuery( new RequestRangePulseId( 0, 1), - TEST_CHANNEL_NAMES); + channels); request.setResponseFormat(ResponseFormat.CSV); LinkedHashSet queryFields = new LinkedHashSet<>(); - LinkedHashSet cellProcessors = new LinkedHashSet<>(); queryFields.add(QueryField.channel); - cellProcessors.add(new NotNull()); queryFields.add(QueryField.pulseId); - cellProcessors.add(new NotNull()); queryFields.add(QueryField.iocSeconds); - cellProcessors.add(new NotNull()); queryFields.add(QueryField.iocMillis); - cellProcessors.add(new NotNull()); queryFields.add(QueryField.globalSeconds); - cellProcessors.add(new NotNull()); queryFields.add(QueryField.globalMillis); - cellProcessors.add(new NotNull()); queryFields.add(QueryField.shape); - cellProcessors.add(new NotNull()); queryFields.add(QueryField.eventCount); - cellProcessors.add(new NotNull()); queryFields.add(QueryField.value); - cellProcessors.add(new NotNull()); request.setFields(queryFields); String content = mapper.writeValueAsString(request); @@ -95,87 +88,263 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest { .andExpect(MockMvcResultMatchers.status().isOk()) .andReturn(); - CellProcessor[] processors = cellProcessors.toArray(new CellProcessor[cellProcessors.size()]); + String response = result.getResponse().getContentAsString(); + System.out.println("Response: " + response); + + CSVFormat csvFormat = CSVFormat.EXCEL.withDelimiter(CSVResponseStreamWriter.DELIMITER_CVS); + StringReader reader = new StringReader(response); + CSVParser csvParser = new CSVParser(reader, csvFormat); + + try { + long pulse = 0; + int totalRows = 2; + + List records = csvParser.getRecords(); + assertEquals(totalRows + 1, records.size()); + // remove header + CSVRecord record = records.remove(0); + assertEquals(queryFields.size() * channels.size(), record.size()); + int column = 0; + for (String channel : channels) { + for (QueryField queryField : queryFields) { + assertEquals(channel + CSVResponseStreamWriter.DELIMITER_CHANNELNAME_FIELDNAME + queryField.name(), + record.get(column++)); + } + } + + for (int row = 0; row < totalRows; ++row) { + record = records.get(row); + + assertEquals(queryFields.size() * channels.size(), record.size()); + + column = 0; + for (String channel : channels) { + assertEquals(channel, record.get(column++)); + assertEquals("" + pulse, record.get(column++)); + assertEquals(TimeUtils.getTimeStr(TestTimeUtils.getTimeFromPulseId(pulse)), record.get(column++)); + assertEquals("" + TimeUtils.getMillis(TestTimeUtils.getTimeFromPulseId(pulse)), record.get(column++)); + assertEquals(TimeUtils.getTimeStr(TestTimeUtils.getTimeFromPulseId(pulse)), record.get(column++)); + assertEquals("" + TimeUtils.getMillis(TestTimeUtils.getTimeFromPulseId(pulse)), record.get(column++)); + assertEquals("[1]", record.get(column++)); + assertEquals("1", record.get(column++)); + assertEquals("" + pulse, record.get(column++)); + } + ++pulse; + } + } finally { + reader.close(); + csvParser.close(); + } + } + + @Test + public void testPulseRangeQuery_Match() throws Exception { + Map> channelSeq = new HashMap<>(); + Set channel_01_Seq = new LinkedHashSet<>(Arrays.asList(0L, 2L, 4L, 5L)); + String channel_01 = "TestChannel_Sequence_" + Arrays.toString(channel_01_Seq.toArray()); + channelSeq.put(channel_01, channel_01_Seq); + Set channel_02_Seq = new LinkedHashSet<>(Arrays.asList(1L, 3L, 5L)); + String channel_02 = "TestChannel_Sequence_" + Arrays.toString(channel_02_Seq.toArray()); + channelSeq.put(channel_02, channel_02_Seq); + Set channel_03_Seq = new LinkedHashSet<>(Arrays.asList(0L, 1L, 2L, 3L, 4L, 5L)); + String channel_03 = "TestChannel_Sequence_" + Arrays.toString(channel_03_Seq.toArray()); + channelSeq.put(channel_03, channel_03_Seq); + List channels = Arrays.asList(channel_01, channel_02, channel_03); + DAQQuery request = new DAQQuery( + new RequestRangePulseId( + -1, + -1), + channels); + request.setResponseFormat(ResponseFormat.CSV); + + LinkedHashSet queryFields = new LinkedHashSet<>(); + queryFields.add(QueryField.channel); + queryFields.add(QueryField.pulseId); + queryFields.add(QueryField.globalMillis); + queryFields.add(QueryField.value); + request.setFields(queryFields); + + String content = mapper.writeValueAsString(request); + System.out.println(content); + + MvcResult result = this.mockMvc + .perform(MockMvcRequestBuilders + .post(QueryRestController.QUERY) + .contentType(MediaType.APPLICATION_JSON) + .content(content)) + .andDo(MockMvcResultHandlers.print()) + .andExpect(MockMvcResultMatchers.status().isOk()) + .andReturn(); String response = result.getResponse().getContentAsString(); System.out.println("Response: " + response); - ICsvMapReader mapReader = - new CsvMapReader(new StringReader(response), CsvPreference.EXCEL_NORTH_EUROPE_PREFERENCE); - try { - // read comment - mapReader.read(""); - String[] header = mapReader.getHeader(false); - Map customerMap; - long resultsPerChannel = 2; - long pulse = 0; - long channelCount = 1; - while ((customerMap = mapReader.read(header, processors)) != null) { - assertEquals(TEST_CHANNEL + channelCount, customerMap.get(QueryField.channel.name())); - assertEquals("" + pulse, customerMap.get(QueryField.pulseId.name())); - assertEquals("" + TimeUtils.getTimeStr(TestTimeUtils.getTimeFromPulseId(pulse)), - customerMap.get(QueryField.iocSeconds.name())); - assertEquals("" + TimeUtils.getMillis(TestTimeUtils.getTimeFromPulseId(pulse)), - customerMap.get(QueryField.iocMillis.name())); - assertEquals("" + TimeUtils.getTimeStr(TestTimeUtils.getTimeFromPulseId(pulse)), - customerMap.get(QueryField.globalSeconds.name())); - assertEquals("" + TimeUtils.getMillis(TestTimeUtils.getTimeFromPulseId(pulse)), - customerMap.get(QueryField.globalMillis.name())); - assertEquals("[1]", customerMap.get(QueryField.shape.name())); - assertEquals("1", customerMap.get(QueryField.eventCount.name())); - assertEquals("" + pulse, customerMap.get(QueryField.value.name())); + CSVFormat csvFormat = CSVFormat.EXCEL.withDelimiter(CSVResponseStreamWriter.DELIMITER_CVS); + StringReader reader = new StringReader(response); + CSVParser csvParser = new CSVParser(reader, csvFormat); - pulse = ++pulse % resultsPerChannel; - if (pulse == 0) { - ++channelCount; - if (channelCount <= TEST_CHANNEL_NAMES.length) { - // read comment (empty rows are skiped - mapReader.read(""); - header = mapReader.getHeader(false); - } + try { + long pulse = 0; + int totalRows = 6; + + List records = csvParser.getRecords(); + assertEquals(totalRows + 1, records.size()); + CSVRecord record = records.remove(0); + assertEquals(queryFields.size() * channels.size(), record.size()); + int column = 0; + for (String channel : channels) { + for (QueryField queryField : queryFields) { + assertEquals(channel + CSVResponseStreamWriter.DELIMITER_CHANNELNAME_FIELDNAME + queryField.name(), + record.get(column++)); } } + + for (int row = 0; row < totalRows; ++row) { + record = records.get(row); + + assertEquals(queryFields.size() * channels.size(), record.size()); + + column = 0; + for (String channel : channels) { + if (channelSeq.get(channel).contains(pulse)) { + assertEquals(channel, record.get(column++)); + assertEquals("" + pulse, record.get(column++)); + assertEquals("" + TimeUtils.getMillis(TestTimeUtils.getTimeFromPulseId(pulse)), record.get(column++)); + assertEquals("" + pulse, record.get(column++)); + } else { + assertEquals(CSVResponseStreamWriter.EMPTY_VALUE, record.get(column++)); + assertEquals(CSVResponseStreamWriter.EMPTY_VALUE, record.get(column++)); + assertEquals(CSVResponseStreamWriter.EMPTY_VALUE, record.get(column++)); + assertEquals(CSVResponseStreamWriter.EMPTY_VALUE, record.get(column++)); + } + + } + ++pulse; + } } finally { - mapReader.close(); + reader.close(); + csvParser.close(); + } + } + + @Test + public void testPulseRangeQuery_Match_NoTimeField() throws Exception { + Map> channelSeq = new HashMap<>(); + Set channel_01_Seq = new LinkedHashSet<>(Arrays.asList(0L, 2L, 4L, 5L)); + String channel_01 = "TestChannel_Sequence_" + Arrays.toString(channel_01_Seq.toArray()); + channelSeq.put(channel_01, channel_01_Seq); + Set channel_02_Seq = new LinkedHashSet<>(Arrays.asList(1L, 3L, 5L)); + String channel_02 = "TestChannel_Sequence_" + Arrays.toString(channel_02_Seq.toArray()); + channelSeq.put(channel_02, channel_02_Seq); + Set channel_03_Seq = new LinkedHashSet<>(Arrays.asList(0L, 1L, 2L, 3L, 4L, 5L)); + String channel_03 = "TestChannel_Sequence_" + Arrays.toString(channel_03_Seq.toArray()); + channelSeq.put(channel_03, channel_03_Seq); + List channels = Arrays.asList(channel_01, channel_02, channel_03); + DAQQuery request = new DAQQuery( + new RequestRangePulseId( + -1, + -1), + channels); + request.setResponseFormat(ResponseFormat.CSV); + + LinkedHashSet queryFields = new LinkedHashSet<>(); + queryFields.add(QueryField.channel); + queryFields.add(QueryField.value); + request.setFields(queryFields); + + String content = mapper.writeValueAsString(request); + System.out.println(content); + + MvcResult result = this.mockMvc + .perform(MockMvcRequestBuilders + .post(QueryRestController.QUERY) + .contentType(MediaType.APPLICATION_JSON) + .content(content)) + .andDo(MockMvcResultHandlers.print()) + .andExpect(MockMvcResultMatchers.status().isOk()) + .andReturn(); + + String response = result.getResponse().getContentAsString(); + System.out.println("Response: " + response); + + CSVFormat csvFormat = CSVFormat.EXCEL.withDelimiter(CSVResponseStreamWriter.DELIMITER_CVS); + StringReader reader = new StringReader(response); + CSVParser csvParser = new CSVParser(reader, csvFormat); + + try { + long pulse = 0; + int totalRows = 6; + + List records = csvParser.getRecords(); + assertEquals(totalRows + 1, records.size()); + // remove header + CSVRecord record = records.remove(0); + assertEquals((queryFields.size() + 1) * channels.size(), record.size()); + int column = 0; + for (String channel : channels) { + for (QueryField queryField : queryFields) { + assertEquals(channel + CSVResponseStreamWriter.DELIMITER_CHANNELNAME_FIELDNAME + queryField.name(), + record.get(column++)); + } + assertEquals( + channel + CSVResponseStreamWriter.DELIMITER_CHANNELNAME_FIELDNAME + QueryField.globalMillis.name(), + record.get(column++)); + } + + for (int row = 0; row < totalRows; ++row) { + record = records.get(row); + + assertEquals((queryFields.size() + 1) * channels.size(), record.size()); + + column = 0; + for (String channel : channels) { + if (channelSeq.get(channel).contains(pulse)) { + assertEquals(channel, record.get(column++)); + assertEquals("" + pulse, record.get(column++)); + assertEquals("" + TimeUtils.getMillis(TestTimeUtils.getTimeFromPulseId(pulse)), record.get(column++)); + } else { + assertEquals(CSVResponseStreamWriter.EMPTY_VALUE, record.get(column++)); + assertEquals(CSVResponseStreamWriter.EMPTY_VALUE, record.get(column++)); + assertEquals(CSVResponseStreamWriter.EMPTY_VALUE, record.get(column++)); + } + + } + ++pulse; + } + } finally { + reader.close(); + csvParser.close(); } } @Test public void testPulseRangeQueries() throws Exception { + List channels = Arrays.asList(TEST_CHANNEL_01, TEST_CHANNEL_02); String testChannel3 = "testChannel3"; DAQQueries request = new DAQQueries( new DAQQueryElement( new RequestRangePulseId( 0, 1), - TEST_CHANNEL_NAMES), + channels), new DAQQueryElement( new RequestRangePulseId( 0, 1), testChannel3)); request.setResponseFormat(ResponseFormat.CSV); + channels = Arrays.asList(TEST_CHANNEL_01, TEST_CHANNEL_02, testChannel3); LinkedHashSet queryFields = new LinkedHashSet<>(); - LinkedHashSet cellProcessors = new LinkedHashSet<>(); queryFields.add(QueryField.channel); - cellProcessors.add(new NotNull()); queryFields.add(QueryField.pulseId); - cellProcessors.add(new NotNull()); queryFields.add(QueryField.iocSeconds); - cellProcessors.add(new NotNull()); queryFields.add(QueryField.iocMillis); - cellProcessors.add(new NotNull()); queryFields.add(QueryField.globalSeconds); - cellProcessors.add(new NotNull()); queryFields.add(QueryField.globalMillis); - cellProcessors.add(new NotNull()); queryFields.add(QueryField.shape); - cellProcessors.add(new NotNull()); queryFields.add(QueryField.eventCount); - cellProcessors.add(new NotNull()); queryFields.add(QueryField.value); - cellProcessors.add(new NotNull()); for (DAQQueryElement element : request) { element.setFields(queryFields); } @@ -192,81 +361,75 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest { .andExpect(MockMvcResultMatchers.status().isOk()) .andReturn(); - CellProcessor[] processors = cellProcessors.toArray(new CellProcessor[cellProcessors.size()]); - String response = result.getResponse().getContentAsString(); System.out.println("Response: " + response); - ICsvMapReader mapReader = - new CsvMapReader(new StringReader(response), CsvPreference.EXCEL_NORTH_EUROPE_PREFERENCE); - try { - // read comment - mapReader.read(""); - String[] header = mapReader.getHeader(false); - Map customerMap; - long resultsPerChannel = 2; - long pulse = 0; - long channelCount = 1; - while ((customerMap = mapReader.read(header, processors)) != null) { - assertEquals(TEST_CHANNEL + channelCount, customerMap.get(QueryField.channel.name())); - assertEquals("" + pulse, customerMap.get(QueryField.pulseId.name())); - assertEquals("" + TimeUtils.getTimeStr(TestTimeUtils.getTimeFromPulseId(pulse)), - customerMap.get(QueryField.iocSeconds.name())); - assertEquals("" + TimeUtils.getMillis(TestTimeUtils.getTimeFromPulseId(pulse)), - customerMap.get(QueryField.iocMillis.name())); - assertEquals("" + TimeUtils.getTimeStr(TestTimeUtils.getTimeFromPulseId(pulse)), - customerMap.get(QueryField.globalSeconds.name())); - assertEquals("" + TimeUtils.getMillis(TestTimeUtils.getTimeFromPulseId(pulse)), - customerMap.get(QueryField.globalMillis.name())); - assertEquals("[1]", customerMap.get(QueryField.shape.name())); - assertEquals("1", customerMap.get(QueryField.eventCount.name())); - assertEquals("" + pulse, customerMap.get(QueryField.value.name())); + CSVFormat csvFormat = CSVFormat.EXCEL.withDelimiter(CSVResponseStreamWriter.DELIMITER_CVS); + StringReader reader = new StringReader(response); + CSVParser csvParser = new CSVParser(reader, csvFormat); - pulse = ++pulse % resultsPerChannel; - if (pulse == 0) { - ++channelCount; - if (channelCount <= TEST_CHANNEL_NAMES.length + 1) { - // read comment (empty rows are skiped - mapReader.read(""); - header = mapReader.getHeader(false); - } + try { + long pulse = 0; + int totalRows = 2; + + List records = csvParser.getRecords(); + assertEquals(totalRows + 1, records.size()); + // remove header + CSVRecord record = records.remove(0); + assertEquals(queryFields.size() * channels.size(), record.size()); + int column = 0; + for (String channel : channels) { + for (QueryField queryField : queryFields) { + assertEquals(channel + CSVResponseStreamWriter.DELIMITER_CHANNELNAME_FIELDNAME + queryField.name(), + record.get(column++)); } } + + for (int row = 0; row < totalRows; ++row) { + record = records.get(row); + + assertEquals(queryFields.size() * channels.size(), record.size()); + + column = 0; + for (String channel : channels) { + assertEquals(channel, record.get(column++)); + assertEquals("" + pulse, record.get(column++)); + assertEquals(TimeUtils.getTimeStr(TestTimeUtils.getTimeFromPulseId(pulse)), record.get(column++)); + assertEquals("" + TimeUtils.getMillis(TestTimeUtils.getTimeFromPulseId(pulse)), record.get(column++)); + assertEquals(TimeUtils.getTimeStr(TestTimeUtils.getTimeFromPulseId(pulse)), record.get(column++)); + assertEquals("" + TimeUtils.getMillis(TestTimeUtils.getTimeFromPulseId(pulse)), record.get(column++)); + assertEquals("[1]", record.get(column++)); + assertEquals("1", record.get(column++)); + assertEquals("" + pulse, record.get(column++)); + } + ++pulse; + } } finally { - mapReader.close(); + reader.close(); + csvParser.close(); } } @Test public void testPulseRangeQueryWaveform() throws Exception { - String channelName = "XYWaveform"; + List channels = Arrays.asList("XYWaveform"); DAQQuery request = new DAQQuery( new RequestRangePulseId( 0, 1), - channelName); + channels); request.setResponseFormat(ResponseFormat.CSV); LinkedHashSet queryFields = new LinkedHashSet<>(); - LinkedHashSet cellProcessors = new LinkedHashSet<>(); queryFields.add(QueryField.channel); - cellProcessors.add(new NotNull()); queryFields.add(QueryField.pulseId); - cellProcessors.add(new NotNull()); queryFields.add(QueryField.iocSeconds); - cellProcessors.add(new NotNull()); queryFields.add(QueryField.iocMillis); - cellProcessors.add(new NotNull()); queryFields.add(QueryField.globalSeconds); - cellProcessors.add(new NotNull()); queryFields.add(QueryField.globalMillis); - cellProcessors.add(new NotNull()); queryFields.add(QueryField.shape); - cellProcessors.add(new NotNull()); queryFields.add(QueryField.eventCount); - cellProcessors.add(new NotNull()); queryFields.add(QueryField.value); - cellProcessors.add(new NotNull()); request.setFields(queryFields); String content = mapper.writeValueAsString(request); @@ -281,71 +444,76 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest { .andExpect(MockMvcResultMatchers.status().isOk()) .andReturn(); - CellProcessor[] processors = cellProcessors.toArray(new CellProcessor[cellProcessors.size()]); - String response = result.getResponse().getContentAsString(); System.out.println("Response: " + response); - ICsvMapReader mapReader = - new CsvMapReader(new StringReader(response), CsvPreference.EXCEL_NORTH_EUROPE_PREFERENCE); - try { - // read comment - mapReader.read(""); - final String[] header = mapReader.getHeader(false); - Map customerMap; - long pulse = 0; - while ((customerMap = mapReader.read(header, processors)) != null) { - assertEquals(channelName, customerMap.get(QueryField.channel.name())); - assertEquals("" + pulse, customerMap.get(QueryField.pulseId.name())); - assertEquals("" + TimeUtils.getTimeStr(TestTimeUtils.getTimeFromPulseId(pulse)), - customerMap.get(QueryField.iocSeconds.name())); - assertEquals("" + TimeUtils.getMillis(TestTimeUtils.getTimeFromPulseId(pulse)), - customerMap.get(QueryField.iocMillis.name())); - assertEquals("" + TimeUtils.getTimeStr(TestTimeUtils.getTimeFromPulseId(pulse)), - customerMap.get(QueryField.globalSeconds.name())); - assertEquals("" + TimeUtils.getMillis(TestTimeUtils.getTimeFromPulseId(pulse)), - customerMap.get(QueryField.globalMillis.name())); - assertEquals("[2048]", customerMap.get(QueryField.shape.name())); - assertEquals("1", customerMap.get(QueryField.eventCount.name())); - assertTrue(customerMap.get(QueryField.value.name()).toString().startsWith("[")); - assertTrue(customerMap.get(QueryField.value.name()).toString().endsWith("]")); + CSVFormat csvFormat = CSVFormat.EXCEL.withDelimiter(CSVResponseStreamWriter.DELIMITER_CVS); + StringReader reader = new StringReader(response); + CSVParser csvParser = new CSVParser(reader, csvFormat); + try { + long pulse = 0; + int totalRows = 2; + + List records = csvParser.getRecords(); + assertEquals(totalRows + 1, records.size()); + // remove header + CSVRecord record = records.remove(0); + assertEquals(queryFields.size() * channels.size(), record.size()); + int column = 0; + for (String channel : channels) { + for (QueryField queryField : queryFields) { + assertEquals(channel + CSVResponseStreamWriter.DELIMITER_CHANNELNAME_FIELDNAME + queryField.name(), + record.get(column++)); + } + } + + for (int row = 0; row < totalRows; ++row) { + record = records.get(row); + + assertEquals(queryFields.size() * channels.size(), record.size()); + + column = 0; + for (String channel : channels) { + assertEquals(channel, record.get(column++)); + assertEquals("" + pulse, record.get(column++)); + assertEquals(TimeUtils.getTimeStr(TestTimeUtils.getTimeFromPulseId(pulse)), record.get(column++)); + assertEquals("" + TimeUtils.getMillis(TestTimeUtils.getTimeFromPulseId(pulse)), record.get(column++)); + assertEquals(TimeUtils.getTimeStr(TestTimeUtils.getTimeFromPulseId(pulse)), record.get(column++)); + assertEquals("" + TimeUtils.getMillis(TestTimeUtils.getTimeFromPulseId(pulse)), record.get(column++)); + assertEquals("[2048]", record.get(column++)); + assertEquals("1", record.get(column++)); + assertTrue(record.get(column).startsWith("[")); + assertTrue(record.get(column++).endsWith("]")); + } ++pulse; } } finally { - mapReader.close(); + reader.close(); + csvParser.close(); } } @Test public void testTimeRangeQuery() throws Exception { + List channels = Arrays.asList(TEST_CHANNEL_01, TEST_CHANNEL_02); DAQQuery request = new DAQQuery( new RequestRangeTime( TimeUtils.getTimeFromMillis(0, 0), TimeUtils.getTimeFromMillis(10, 0)), - TEST_CHANNEL_NAMES); + channels); request.setResponseFormat(ResponseFormat.CSV); LinkedHashSet queryFields = new LinkedHashSet<>(); - LinkedHashSet cellProcessors = new LinkedHashSet<>(); queryFields.add(QueryField.channel); - cellProcessors.add(new NotNull()); queryFields.add(QueryField.pulseId); - cellProcessors.add(new NotNull()); queryFields.add(QueryField.iocSeconds); - cellProcessors.add(new NotNull()); queryFields.add(QueryField.iocMillis); - cellProcessors.add(new NotNull()); queryFields.add(QueryField.globalSeconds); - cellProcessors.add(new NotNull()); queryFields.add(QueryField.globalMillis); - cellProcessors.add(new NotNull()); queryFields.add(QueryField.shape); - cellProcessors.add(new NotNull()); queryFields.add(QueryField.eventCount); - cellProcessors.add(new NotNull()); queryFields.add(QueryField.value); - cellProcessors.add(new NotNull()); request.setFields(queryFields); String content = mapper.writeValueAsString(request); @@ -360,86 +528,79 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest { .andExpect(MockMvcResultMatchers.status().isOk()) .andReturn(); - CellProcessor[] processors = cellProcessors.toArray(new CellProcessor[cellProcessors.size()]); - String response = result.getResponse().getContentAsString(); System.out.println("Response: " + response); - CsvMapReader mapReader = - new CsvMapReader(new StringReader(response), CsvPreference.EXCEL_NORTH_EUROPE_PREFERENCE); - try { - // read comment - mapReader.read(""); - String[] header = mapReader.getHeader(false); - Map customerMap; - long resultsPerChannel = 2; - long pulse = 0; - long channelCount = 1; - while ((customerMap = mapReader.read(header, processors)) != null) { - assertEquals(TEST_CHANNEL + channelCount, customerMap.get(QueryField.channel.name())); - assertEquals("" + pulse, customerMap.get(QueryField.pulseId.name())); - assertEquals("" + TimeUtils.getTimeStr(TestTimeUtils.getTimeFromPulseId(pulse)), - customerMap.get(QueryField.iocSeconds.name())); - assertEquals("" + TimeUtils.getMillis(TestTimeUtils.getTimeFromPulseId(pulse)), - customerMap.get(QueryField.iocMillis.name())); - assertEquals("" + TimeUtils.getTimeStr(TestTimeUtils.getTimeFromPulseId(pulse)), - customerMap.get(QueryField.globalSeconds.name())); - assertEquals("" + TimeUtils.getMillis(TestTimeUtils.getTimeFromPulseId(pulse)), - customerMap.get(QueryField.globalMillis.name())); - assertEquals("[1]", customerMap.get(QueryField.shape.name())); - assertEquals("1", customerMap.get(QueryField.eventCount.name())); - assertEquals("" + pulse, customerMap.get(QueryField.value.name())); + CSVFormat csvFormat = CSVFormat.EXCEL.withDelimiter(CSVResponseStreamWriter.DELIMITER_CVS); + StringReader reader = new StringReader(response); + CSVParser csvParser = new CSVParser(reader, csvFormat); - pulse = ++pulse % resultsPerChannel; - if (pulse == 0) { - ++channelCount; - if (channelCount <= TEST_CHANNEL_NAMES.length) { - // read comment (empty rows are skiped - mapReader.read(""); - header = mapReader.getHeader(false); - } + try { + long pulse = 0; + int totalRows = 2; + + List records = csvParser.getRecords(); + assertEquals(totalRows + 1, records.size()); + // remove header + CSVRecord record = records.remove(0); + assertEquals(queryFields.size() * channels.size(), record.size()); + int column = 0; + for (String channel : channels) { + for (QueryField queryField : queryFields) { + assertEquals(channel + CSVResponseStreamWriter.DELIMITER_CHANNELNAME_FIELDNAME + queryField.name(), + record.get(column++)); } } + + for (int row = 0; row < totalRows; ++row) { + record = records.get(row); + + assertEquals(queryFields.size() * channels.size(), record.size()); + + column = 0; + for (String channel : channels) { + assertEquals(channel, record.get(column++)); + assertEquals("" + pulse, record.get(column++)); + assertEquals(TimeUtils.getTimeStr(TestTimeUtils.getTimeFromPulseId(pulse)), record.get(column++)); + assertEquals("" + TimeUtils.getMillis(TestTimeUtils.getTimeFromPulseId(pulse)), record.get(column++)); + assertEquals(TimeUtils.getTimeStr(TestTimeUtils.getTimeFromPulseId(pulse)), record.get(column++)); + assertEquals("" + TimeUtils.getMillis(TestTimeUtils.getTimeFromPulseId(pulse)), record.get(column++)); + assertEquals("[1]", record.get(column++)); + assertEquals("1", record.get(column++)); + assertEquals("" + pulse, record.get(column++)); + } + ++pulse; + } } finally { - mapReader.close(); + reader.close(); + csvParser.close(); } } @Test public void testDateRangeQuery() throws Exception { + List channels = Arrays.asList(TEST_CHANNEL_01, TEST_CHANNEL_02); String startDate = TimeUtils.format(0); String endDate = TimeUtils.format(10); DAQQuery request = new DAQQuery( new RequestRangeDate( startDate, endDate), - TEST_CHANNEL_NAMES); + channels); request.setResponseFormat(ResponseFormat.CSV); LinkedHashSet queryFields = new LinkedHashSet<>(); - LinkedHashSet cellProcessors = new LinkedHashSet<>(); queryFields.add(QueryField.channel); - cellProcessors.add(new NotNull()); queryFields.add(QueryField.pulseId); - cellProcessors.add(new NotNull()); queryFields.add(QueryField.iocSeconds); - cellProcessors.add(new NotNull()); queryFields.add(QueryField.iocDate); - cellProcessors.add(new NotNull()); queryFields.add(QueryField.iocMillis); - cellProcessors.add(new NotNull()); queryFields.add(QueryField.globalSeconds); - cellProcessors.add(new NotNull()); queryFields.add(QueryField.globalDate); - cellProcessors.add(new NotNull()); queryFields.add(QueryField.globalMillis); - cellProcessors.add(new NotNull()); queryFields.add(QueryField.shape); - cellProcessors.add(new NotNull()); queryFields.add(QueryField.eventCount); - cellProcessors.add(new NotNull()); queryFields.add(QueryField.value); - cellProcessors.add(new NotNull()); request.setFields(queryFields); String content = mapper.writeValueAsString(request); @@ -454,52 +615,54 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest { .andExpect(MockMvcResultMatchers.status().isOk()) .andReturn(); - CellProcessor[] processors = cellProcessors.toArray(new CellProcessor[cellProcessors.size()]); - String response = result.getResponse().getContentAsString(); System.out.println("Response: " + response); - ICsvMapReader mapReader = - new CsvMapReader(new StringReader(response), CsvPreference.EXCEL_NORTH_EUROPE_PREFERENCE); - try { - // read comment - mapReader.read(""); - String[] header = mapReader.getHeader(false); - Map customerMap; - long resultsPerChannel = 2; - long pulse = 0; - long channelCount = 1; - while ((customerMap = mapReader.read(header, processors)) != null) { - assertEquals(TEST_CHANNEL + channelCount, customerMap.get(QueryField.channel.name())); - assertEquals("" + pulse, customerMap.get(QueryField.pulseId.name())); - assertEquals("" + TimeUtils.getTimeStr(TestTimeUtils.getTimeFromPulseId(pulse)), - customerMap.get(QueryField.iocSeconds.name())); - assertEquals("" + TimeUtils.format(TestTimeUtils.getTimeFromPulseId(pulse)), - customerMap.get(QueryField.iocDate.name())); - assertEquals("" + TimeUtils.getMillis(TestTimeUtils.getTimeFromPulseId(pulse)), - customerMap.get(QueryField.iocMillis.name())); - assertEquals("" + TimeUtils.getTimeStr(TestTimeUtils.getTimeFromPulseId(pulse)), - customerMap.get(QueryField.globalSeconds.name())); - assertEquals("" + TimeUtils.format(TestTimeUtils.getTimeFromPulseId(pulse)), - customerMap.get(QueryField.globalDate.name())); - assertEquals("" + TimeUtils.getMillis(TestTimeUtils.getTimeFromPulseId(pulse)), - customerMap.get(QueryField.globalMillis.name())); - assertEquals("[1]", customerMap.get(QueryField.shape.name())); - assertEquals("1", customerMap.get(QueryField.eventCount.name())); - assertEquals("" + pulse, customerMap.get(QueryField.value.name())); + CSVFormat csvFormat = CSVFormat.EXCEL.withDelimiter(CSVResponseStreamWriter.DELIMITER_CVS); + StringReader reader = new StringReader(response); + CSVParser csvParser = new CSVParser(reader, csvFormat); - pulse = ++pulse % resultsPerChannel; - if (pulse == 0) { - ++channelCount; - if (channelCount <= TEST_CHANNEL_NAMES.length) { - // read comment (empty rows are skiped - mapReader.read(""); - header = mapReader.getHeader(false); - } + try { + long pulse = 0; + int totalRows = 2; + + List records = csvParser.getRecords(); + assertEquals(totalRows + 1, records.size()); + // remove header + CSVRecord record = records.remove(0); + assertEquals(queryFields.size() * channels.size(), record.size()); + int column = 0; + for (String channel : channels) { + for (QueryField queryField : queryFields) { + assertEquals(channel + CSVResponseStreamWriter.DELIMITER_CHANNELNAME_FIELDNAME + queryField.name(), + record.get(column++)); } } + + for (int row = 0; row < totalRows; ++row) { + record = records.get(row); + + assertEquals(queryFields.size() * channels.size(), record.size()); + + column = 0; + for (String channel : channels) { + assertEquals(channel, record.get(column++)); + assertEquals("" + pulse, record.get(column++)); + assertEquals(TimeUtils.getTimeStr(TestTimeUtils.getTimeFromPulseId(pulse)), record.get(column++)); + assertEquals("" + TimeUtils.format(TestTimeUtils.getTimeFromPulseId(pulse)), record.get(column++)); + assertEquals("" + TimeUtils.getMillis(TestTimeUtils.getTimeFromPulseId(pulse)), record.get(column++)); + assertEquals(TimeUtils.getTimeStr(TestTimeUtils.getTimeFromPulseId(pulse)), record.get(column++)); + assertEquals("" + TimeUtils.format(TestTimeUtils.getTimeFromPulseId(pulse)), record.get(column++)); + assertEquals("" + TimeUtils.getMillis(TestTimeUtils.getTimeFromPulseId(pulse)), record.get(column++)); + assertEquals("[1]", record.get(column++)); + assertEquals("1", record.get(column++)); + assertEquals("" + pulse, record.get(column++)); + } + ++pulse; + } } finally { - mapReader.close(); + reader.close(); + csvParser.close(); } } @@ -565,6 +728,7 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest { @Test public void testDateRangeQueryNrOfBinsAggregate() throws Exception { + List channels = Arrays.asList(TEST_CHANNEL_01); long startTime = 0; long endTime = 99; String startDate = TimeUtils.format(startTime); @@ -573,37 +737,25 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest { new RequestRangeDate( startDate, endDate), - TEST_CHANNEL_01); + channels); request.setNrOfBins(2); request.setResponseFormat(ResponseFormat.CSV); LinkedHashSet queryFields = new LinkedHashSet<>(); - LinkedHashSet cellProcessors = new LinkedHashSet<>(); queryFields.add(QueryField.channel); - cellProcessors.add(new NotNull()); queryFields.add(QueryField.pulseId); - cellProcessors.add(new NotNull()); queryFields.add(QueryField.iocSeconds); - cellProcessors.add(new NotNull()); queryFields.add(QueryField.iocMillis); - cellProcessors.add(new NotNull()); queryFields.add(QueryField.globalSeconds); - cellProcessors.add(new NotNull()); queryFields.add(QueryField.globalMillis); - cellProcessors.add(new NotNull()); queryFields.add(QueryField.shape); - cellProcessors.add(new NotNull()); queryFields.add(QueryField.eventCount); - cellProcessors.add(new NotNull()); request.setFields(queryFields); List aggregations = new ArrayList<>(); aggregations.add(Aggregation.min); - cellProcessors.add(new NotNull()); aggregations.add(Aggregation.mean); - cellProcessors.add(new NotNull()); aggregations.add(Aggregation.max); - cellProcessors.add(new NotNull()); request.setAggregations(aggregations); String content = mapper.writeValueAsString(request); @@ -618,45 +770,65 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest { .andExpect(MockMvcResultMatchers.status().isOk()) .andReturn(); - CellProcessor[] processors = cellProcessors.toArray(new CellProcessor[cellProcessors.size()]); - String response = result.getResponse().getContentAsString(); System.out.println("Response: " + response); - ICsvMapReader mapReader = - new CsvMapReader(new StringReader(response), CsvPreference.EXCEL_NORTH_EUROPE_PREFERENCE); - try { - // read comment - mapReader.read(""); - String[] header = mapReader.getHeader(false); - Map customerMap; - long pulse = 0; - while ((customerMap = mapReader.read(header, processors)) != null) { - assertEquals(TEST_CHANNEL_01, customerMap.get(QueryField.channel.name())); - assertEquals("" + pulse, customerMap.get(QueryField.pulseId.name())); - assertEquals("" + TimeUtils.getTimeStr(TestTimeUtils.getTimeFromPulseId(pulse)), - customerMap.get(QueryField.iocSeconds.name())); - assertEquals("" + TimeUtils.getMillis(TestTimeUtils.getTimeFromPulseId(pulse)), - customerMap.get(QueryField.iocMillis.name())); - assertEquals("" + TimeUtils.getTimeStr(TestTimeUtils.getTimeFromPulseId(pulse)), - customerMap.get(QueryField.globalSeconds.name())); - assertEquals("" + TimeUtils.getMillis(TestTimeUtils.getTimeFromPulseId(pulse)), - customerMap.get(QueryField.globalMillis.name())); - assertEquals("[1]", customerMap.get(QueryField.shape.name())); - assertEquals("5", customerMap.get(QueryField.eventCount.name())); - assertEquals("" + pulse + ".0", customerMap.get(Aggregation.min.name())); - assertEquals("" + (pulse + 2) + ".0", customerMap.get(Aggregation.mean.name())); - assertEquals("" + (pulse + 4) + ".0", customerMap.get(Aggregation.max.name())); + CSVFormat csvFormat = CSVFormat.EXCEL.withDelimiter(CSVResponseStreamWriter.DELIMITER_CVS); + StringReader reader = new StringReader(response); + CSVParser csvParser = new CSVParser(reader, csvFormat); + try { + long pulse = 0; + int totalRows = 2; + + List records = csvParser.getRecords(); + assertEquals(totalRows + 1, records.size()); + // remove header + CSVRecord record = records.remove(0); + assertEquals((queryFields.size() + aggregations.size()) * channels.size(), record.size()); + int column = 0; + for (String channel : channels) { + for (QueryField queryField : queryFields) { + assertEquals(channel + CSVResponseStreamWriter.DELIMITER_CHANNELNAME_FIELDNAME + queryField.name(), + record.get(column++)); + } + for (Aggregation aggregation : aggregations) { + assertEquals(channel + CSVResponseStreamWriter.DELIMITER_CHANNELNAME_FIELDNAME + QueryField.value + + CSVResponseStreamWriter.DELIMITER_CHANNELNAME_FIELDNAME + aggregation.name(), + record.get(column++)); + } + } + + for (int row = 0; row < totalRows; ++row) { + record = records.get(row); + + assertEquals((queryFields.size() + aggregations.size()) * channels.size(), record.size()); + + column = 0; + for (String channel : channels) { + assertEquals(channel, record.get(column++)); + assertEquals("" + pulse, record.get(column++)); + assertEquals(TimeUtils.getTimeStr(TestTimeUtils.getTimeFromPulseId(pulse)), record.get(column++)); + assertEquals("" + TimeUtils.getMillis(TestTimeUtils.getTimeFromPulseId(pulse)), record.get(column++)); + assertEquals(TimeUtils.getTimeStr(TestTimeUtils.getTimeFromPulseId(pulse)), record.get(column++)); + assertEquals("" + TimeUtils.getMillis(TestTimeUtils.getTimeFromPulseId(pulse)), record.get(column++)); + assertEquals("[1]", record.get(column++)); + assertEquals("5", record.get(column++)); + assertEquals("" + pulse + ".0", record.get(column++)); + assertEquals("" + (pulse + 2) + ".0", record.get(column++)); + assertEquals("" + (pulse + 4) + ".0", record.get(column++)); + } pulse += 5; } } finally { - mapReader.close(); + reader.close(); + csvParser.close(); } } @Test public void testDateRangeQueryBinSizeAggregate() throws Exception { + List channels = Arrays.asList(TEST_CHANNEL_01); long startTime = 0; long endTime = 999; String startDate = TimeUtils.format(startTime); @@ -665,40 +837,27 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest { new RequestRangeDate( startDate, endDate), - TEST_CHANNEL_01); + channels); request.setBinSize(100); request.setResponseFormat(ResponseFormat.CSV); LinkedHashSet queryFields = new LinkedHashSet<>(); - LinkedHashSet cellProcessors = new LinkedHashSet<>(); queryFields.add(QueryField.channel); - cellProcessors.add(new NotNull()); queryFields.add(QueryField.pulseId); - cellProcessors.add(new NotNull()); queryFields.add(QueryField.iocSeconds); - cellProcessors.add(new NotNull()); queryFields.add(QueryField.iocMillis); - cellProcessors.add(new NotNull()); queryFields.add(QueryField.globalSeconds); - cellProcessors.add(new NotNull()); queryFields.add(QueryField.globalMillis); - cellProcessors.add(new NotNull()); queryFields.add(QueryField.shape); - cellProcessors.add(new NotNull()); queryFields.add(QueryField.eventCount); - cellProcessors.add(new NotNull()); request.setFields(queryFields); List aggregations = new ArrayList<>(); aggregations.add(Aggregation.min); - cellProcessors.add(new NotNull()); aggregations.add(Aggregation.mean); - cellProcessors.add(new NotNull()); aggregations.add(Aggregation.max); - cellProcessors.add(new NotNull()); request.setAggregations(aggregations); - String content = mapper.writeValueAsString(request); System.out.println(content); @@ -711,40 +870,121 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest { .andExpect(MockMvcResultMatchers.status().isOk()) .andReturn(); - CellProcessor[] processors = cellProcessors.toArray(new CellProcessor[cellProcessors.size()]); + String response = result.getResponse().getContentAsString(); + System.out.println("Response: " + response); + + CSVFormat csvFormat = CSVFormat.EXCEL.withDelimiter(CSVResponseStreamWriter.DELIMITER_CVS); + StringReader reader = new StringReader(response); + CSVParser csvParser = new CSVParser(reader, csvFormat); + + try { + long pulse = 0; + int totalRows = 10; + + List records = csvParser.getRecords(); + assertEquals(totalRows + 1, records.size()); + // remove header + CSVRecord record = records.remove(0); + assertEquals((queryFields.size() + aggregations.size()) * channels.size(), record.size()); + int column = 0; + for (String channel : channels) { + for (QueryField queryField : queryFields) { + assertEquals(channel + CSVResponseStreamWriter.DELIMITER_CHANNELNAME_FIELDNAME + queryField.name(), + record.get(column++)); + } + for (Aggregation aggregation : aggregations) { + assertEquals(channel + CSVResponseStreamWriter.DELIMITER_CHANNELNAME_FIELDNAME + QueryField.value + + CSVResponseStreamWriter.DELIMITER_CHANNELNAME_FIELDNAME + aggregation.name(), + record.get(column++)); + } + } + + for (int row = 0; row < totalRows; ++row) { + record = records.get(row); + + assertEquals((queryFields.size() + aggregations.size()) * channels.size(), record.size()); + + column = 0; + for (String channel : channels) { + assertEquals(channel, record.get(column++)); + assertEquals("" + pulse, record.get(column++)); + assertEquals(TimeUtils.getTimeStr(TestTimeUtils.getTimeFromPulseId(pulse)), record.get(column++)); + assertEquals("" + TimeUtils.getMillis(TestTimeUtils.getTimeFromPulseId(pulse)), record.get(column++)); + assertEquals(TimeUtils.getTimeStr(TestTimeUtils.getTimeFromPulseId(pulse)), record.get(column++)); + assertEquals("" + TimeUtils.getMillis(TestTimeUtils.getTimeFromPulseId(pulse)), record.get(column++)); + assertEquals("[1]", record.get(column++)); + assertEquals("10", record.get(column++)); + assertEquals("" + pulse + ".0", record.get(column++)); + assertEquals("" + (pulse + 4.5), record.get(column++)); + assertEquals("" + (pulse + 9) + ".0", record.get(column++)); + } + pulse += 10; + } + } finally { + reader.close(); + csvParser.close(); + } + } + + @Test + public void testQuery_NoTimeFields_01() throws Exception { + List channels = Arrays.asList(TEST_CHANNEL_01, TEST_CHANNEL_02); + DAQQuery request = new DAQQuery( + new RequestRangePulseId( + 0, + 1), + channels); + request.setResponseFormat(ResponseFormat.CSV); + + LinkedHashSet queryFields = new LinkedHashSet<>(); + queryFields.add(QueryField.channel); + queryFields.add(QueryField.value); + request.setFields(queryFields); + + String content = mapper.writeValueAsString(request); + System.out.println(content); + + MvcResult result = this.mockMvc + .perform(MockMvcRequestBuilders + .post(QueryRestController.QUERY) + .contentType(MediaType.APPLICATION_JSON) + .content(content)) + .andDo(MockMvcResultHandlers.print()) + .andExpect(MockMvcResultMatchers.status().isOk()) + .andReturn(); String response = result.getResponse().getContentAsString(); System.out.println("Response: " + response); - ICsvMapReader mapReader = - new CsvMapReader(new StringReader(response), CsvPreference.EXCEL_NORTH_EUROPE_PREFERENCE); - try { - // read comment - mapReader.read(""); - String[] header = mapReader.getHeader(false); - Map customerMap; - long pulse = 0; - while ((customerMap = mapReader.read(header, processors)) != null) { - assertEquals(TEST_CHANNEL_01, customerMap.get(QueryField.channel.name())); - assertEquals("" + pulse, customerMap.get(QueryField.pulseId.name())); - assertEquals("" + TimeUtils.getTimeStr(TestTimeUtils.getTimeFromPulseId(pulse)), - customerMap.get(QueryField.iocSeconds.name())); - assertEquals("" + TimeUtils.getMillis(TestTimeUtils.getTimeFromPulseId(pulse)), - customerMap.get(QueryField.iocMillis.name())); - assertEquals("" + TimeUtils.getTimeStr(TestTimeUtils.getTimeFromPulseId(pulse)), - customerMap.get(QueryField.globalSeconds.name())); - assertEquals("" + TimeUtils.getMillis(TestTimeUtils.getTimeFromPulseId(pulse)), - customerMap.get(QueryField.globalMillis.name())); - assertEquals("[1]", customerMap.get(QueryField.shape.name())); - assertEquals("10", customerMap.get(QueryField.eventCount.name())); - assertEquals("" + pulse + ".0", customerMap.get(Aggregation.min.name())); - assertEquals("" + (pulse + 4.5), customerMap.get(Aggregation.mean.name())); - assertEquals("" + (pulse + 9) + ".0", customerMap.get(Aggregation.max.name())); + CSVFormat csvFormat = CSVFormat.EXCEL.withDelimiter(CSVResponseStreamWriter.DELIMITER_CVS); + StringReader reader = new StringReader(response); + CSVParser csvParser = new CSVParser(reader, csvFormat); - pulse += 10; + try { + long pulse = 0; + int totalRows = 2; + + List records = csvParser.getRecords(); + assertEquals(totalRows + 1, records.size()); + // remove header + records.remove(0); + + for (int row = 0; row < totalRows; ++row) { + CSVRecord record = records.get(row); + + assertEquals((queryFields.size() + 1) * channels.size(), record.size()); + + int column = 0; + for (String channel : channels) { + assertEquals(channel, record.get(column++)); + assertEquals("" + pulse, record.get(column++)); + assertEquals("" + TimeUtils.getMillis(TestTimeUtils.getTimeFromPulseId(pulse)), record.get(column++)); + } + ++pulse; } } finally { - mapReader.close(); + reader.close(); + csvParser.close(); } } diff --git a/src/test/java/ch/psi/daq/test/queryrest/query/DummyArchiverApplianceReader.java b/src/test/java/ch/psi/daq/test/queryrest/query/DummyArchiverApplianceReader.java index 121b4dd..df6ddc0 100644 --- a/src/test/java/ch/psi/daq/test/queryrest/query/DummyArchiverApplianceReader.java +++ b/src/test/java/ch/psi/daq/test/queryrest/query/DummyArchiverApplianceReader.java @@ -1,36 +1,25 @@ package ch.psi.daq.test.queryrest.query; -import java.math.BigDecimal; import java.util.List; -import java.util.Random; import java.util.concurrent.atomic.AtomicLong; import java.util.regex.Pattern; -import java.util.stream.LongStream; import java.util.stream.Stream; -import org.apache.commons.lang3.ArrayUtils; - import com.google.common.collect.Lists; -import ch.psi.daq.common.time.TimeUtils; import ch.psi.daq.domain.StreamEvent; -import ch.psi.daq.domain.cassandra.ChannelEvent; -import ch.psi.daq.domain.cassandra.FieldNames; import ch.psi.daq.domain.cassandra.query.PulseIdRangeQuery; import ch.psi.daq.domain.cassandra.query.TimeRangeQuery; -import ch.psi.daq.domain.cassandra.utils.TablePropertiesUtils; import ch.psi.daq.domain.reader.Backend; import ch.psi.daq.domain.reader.DataReader; public class DummyArchiverApplianceReader implements DataReader { public static final String ARCHIVER_TEST_CHANNEL = "ArchiverTestChannel_"; - private static final int KEYSPACE = 1; public static final String TEST_CHANNEL_1 = "ArchiverChannel_1"; public static final String TEST_CHANNEL_2 = "ArchiverChannel_2"; private List channels = Lists.newArrayList(TEST_CHANNEL_1, TEST_CHANNEL_2); - private final Random random = new Random(0); private AtomicLong channelNameCallCounter = new AtomicLong(); @@ -54,78 +43,14 @@ public class DummyArchiverApplianceReader implements DataReader { @Override public Stream getEventStream(PulseIdRangeQuery query) { - return getDummyEventStream(query.getChannel(), query.getStartPulseId(), query.getEndPulseId(), + return DummyCassandraReader.getDummyEventStream(query.getChannel(), query.getStartPulseId(), query.getEndPulseId(), query.getEventColumns()) .filter(query.getFilterOrDefault(NO_OP_FILTER)); } @Override public Stream getEventStream(TimeRangeQuery query) { - return getDummyEventStream(query.getChannel(), query.getStartMillis() / 10, query.getEndMillis() / 10) + return DummyCassandraReader.getDummyEventStream(query.getChannel(), query.getStartMillis() / 10, query.getEndMillis() / 10) .filter(query.getFilterOrDefault(NO_OP_FILTER)); } - - private Stream getDummyEventStream(String channelParam, long startIndex, long endIndex, - String... columns) { - String channelLower = channelParam.toLowerCase(); - String channel = - (columns == null || columns.length == 0 || ArrayUtils.contains(columns, FieldNames.FIELD_CHANNEL)) ? channelParam - : null; - - Stream eventStream = - LongStream.rangeClosed(startIndex, endIndex).mapToObj( - i -> { - BigDecimal iocTime = - (columns == null || columns.length == 0 || ArrayUtils.contains(columns, - FieldNames.FIELD_IOC_TIME)) ? TimeUtils.getTimeFromMillis(i * 10, 0) - : TablePropertiesUtils.DEFAULT_VALUE_DECIMAL; - BigDecimal globalTime = - (columns == null || columns.length == 0 || ArrayUtils.contains(columns, - FieldNames.FIELD_GLOBAL_TIME)) ? TimeUtils.getTimeFromMillis(i * 10, 0) - : TablePropertiesUtils.DEFAULT_VALUE_DECIMAL; - long pulseId = - (columns == null || columns.length == 0 || ArrayUtils.contains(columns, - FieldNames.FIELD_PULSE_ID)) ? i : TablePropertiesUtils.DEFAULT_VALUE_BIGINT_PRIMITIVE; - - if (channelLower.contains("waveform")) { - long[] value = random.longs(2048).toArray(); - value[0] = i; - return new ChannelEvent( - channel, - iocTime, - pulseId, - globalTime, - KEYSPACE, - value - ); - - } else if (channelLower.contains("image")) { - int x = 640; - int y = 480; - int[] shape = new int[] {x, y}; - long[] value = random.longs(x * y).toArray(); - value[0] = i; - return new ChannelEvent( - channel, - iocTime, - pulseId, - globalTime, - KEYSPACE, - value, - shape - ); - } else { - return new ChannelEvent( - channel, - iocTime, - pulseId, - globalTime, - KEYSPACE, - i - ); - } - }); - - return eventStream; - } } diff --git a/src/test/java/ch/psi/daq/test/queryrest/query/DummyCassandraReader.java b/src/test/java/ch/psi/daq/test/queryrest/query/DummyCassandraReader.java index 619de3c..69a3177 100644 --- a/src/test/java/ch/psi/daq/test/queryrest/query/DummyCassandraReader.java +++ b/src/test/java/ch/psi/daq/test/queryrest/query/DummyCassandraReader.java @@ -4,8 +4,10 @@ package ch.psi.daq.test.queryrest.query; import java.math.BigDecimal; +import java.util.Arrays; import java.util.List; import java.util.Random; +import java.util.StringJoiner; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -39,11 +41,11 @@ import ch.psi.daq.domain.reader.Backend; public class DummyCassandraReader implements CassandraReader { private static final Logger LOGGER = LoggerFactory.getLogger(DummyCassandraReader.class); public static final String DATABUFFER_TEST_CHANNEL = "DataBufferTestChannel_"; + private static final Random random = new Random(0); private static final int KEYSPACE = 1; private CassandraDataGen dataGen; private List channels; - private Random random = new Random(0); private AtomicLong channelNameCallCounter = new AtomicLong(); /** @@ -100,7 +102,8 @@ public class DummyCassandraReader implements CassandraReader { @Override public Stream getEventStream(PulseIdRangeQuery query) { - return getDummyEventStream(query.getChannel(), query.getStartPulseId(), query.getEndPulseId(), query.getEventColumns()) + return getDummyEventStream(query.getChannel(), query.getStartPulseId(), query.getEndPulseId(), + query.getEventColumns()) .filter(query.getFilterOrDefault(NO_OP_FILTER)); } @@ -126,58 +129,87 @@ public class DummyCassandraReader implements CassandraReader { return result; } - private Stream getDummyEventStream(String channelParam, long startIndex, long endIndex, String... columns) { + public static Stream getDummyEventStream(String channelParam, long startIndex, long endIndex, + String... columns) { String channelLower = channelParam.toLowerCase(); - String channel = (columns == null || columns.length == 0 || ArrayUtils.contains(columns, FieldNames.FIELD_CHANNEL)) ? channelParam : null; + String channel = + (columns == null || columns.length == 0 || ArrayUtils.contains(columns, FieldNames.FIELD_CHANNEL)) ? channelParam + : null; - Stream eventStream = LongStream.rangeClosed(startIndex, endIndex).mapToObj(i -> { - BigDecimal iocTime = (columns == null || columns.length == 0 || ArrayUtils.contains(columns, FieldNames.FIELD_IOC_TIME)) ? TimeUtils.getTimeFromMillis(i * 10, 0) : TablePropertiesUtils.DEFAULT_VALUE_DECIMAL; - BigDecimal globalTime = (columns == null || columns.length == 0 || ArrayUtils.contains(columns, FieldNames.FIELD_GLOBAL_TIME)) ? TimeUtils.getTimeFromMillis(i * 10, 0) : TablePropertiesUtils.DEFAULT_VALUE_DECIMAL; - long pulseId = (columns == null || columns.length == 0 || ArrayUtils.contains(columns, FieldNames.FIELD_PULSE_ID)) ? i : TablePropertiesUtils.DEFAULT_VALUE_BIGINT_PRIMITIVE; + Stream rangeStream; - if (channelLower.contains("waveform")) { - long[] value = random.longs(2048).toArray(); - value[0] = i; - return new ChannelEvent( - channel, - iocTime, - pulseId, - globalTime, - KEYSPACE, - value - ); + if (channelParam.contains("[") && channelParam.contains("]")) { + rangeStream = + Arrays.stream( + channelParam.substring( + channelParam.indexOf("[") + 1, + channelParam.indexOf("]")) + .split(",") + ) + .map(str -> str.trim()) + .map(Long::parseLong); + } else { + rangeStream = LongStream.rangeClosed(startIndex, endIndex).boxed(); + } - } else if (channelLower.contains("image")) { - int x = 640; - int y = 480; - int[] shape = new int[] {x, y}; - long[] value = random.longs(x * y).toArray(); - value[0] = i; - return new ChannelEvent( - channel, - iocTime, - pulseId, - globalTime, - KEYSPACE, - value, - shape - ); - } else { - return new ChannelEvent( - channel, - iocTime, - pulseId, - globalTime, - KEYSPACE, - i - ); - } - }); + Stream eventStream = + rangeStream.map( + i -> { + BigDecimal iocTime = + (columns == null || columns.length == 0 || ArrayUtils.contains(columns, + FieldNames.FIELD_IOC_TIME)) ? TimeUtils.getTimeFromMillis(i * 10, 0) + : TablePropertiesUtils.DEFAULT_VALUE_DECIMAL; + BigDecimal globalTime = + (columns == null || columns.length == 0 || ArrayUtils.contains(columns, + FieldNames.FIELD_GLOBAL_TIME)) ? TimeUtils.getTimeFromMillis(i * 10, 0) + : TablePropertiesUtils.DEFAULT_VALUE_DECIMAL; + long pulseId = + (columns == null || columns.length == 0 || ArrayUtils.contains(columns, + FieldNames.FIELD_PULSE_ID)) ? i : TablePropertiesUtils.DEFAULT_VALUE_BIGINT_PRIMITIVE; + + if (channelLower.contains("waveform")) { + long[] value = random.longs(2048).toArray(); + value[0] = i; + return new ChannelEvent( + channel, + iocTime, + pulseId, + globalTime, + KEYSPACE, + value + ); + + } else if (channelLower.contains("image")) { + int x = 640; + int y = 480; + int[] shape = new int[] {x, y}; + long[] value = random.longs(x * y).toArray(); + value[0] = i; + return new ChannelEvent( + channel, + iocTime, + pulseId, + globalTime, + KEYSPACE, + value, + shape + ); + } else { + return new ChannelEvent( + channel, + iocTime, + pulseId, + globalTime, + KEYSPACE, + i + ); + } + }); return eventStream; } - private List getDummyEvents(String channel, long startIndex, long endIndex, String...columns) { + private List getDummyEvents(String channel, long startIndex, long endIndex, String... columns) { return getDummyEventStream(channel, startIndex, endIndex, columns).collect(Collectors.toList()); } @@ -205,7 +237,8 @@ public class DummyCassandraReader implements CassandraReader { @Override public ChannelEvent getEvent(MetaChannelEvent queryInfo, String... columns) { if (queryInfo.getPulseId() > 0) { - return (ChannelEvent) getDummyEvents(queryInfo.getChannel(), queryInfo.getPulseId(), queryInfo.getPulseId(), columns) + return (ChannelEvent) getDummyEvents(queryInfo.getChannel(), queryInfo.getPulseId(), queryInfo.getPulseId(), + columns) .get(0); } return (ChannelEvent) getDummyEvents(queryInfo.getChannel(), queryInfo.getGlobalMillis() / 10,