getChannels(@PathVariable(value = "channelName") String channelName)
throws Throwable {
return getChannels(new ChannelsRequest(channelName));
}
-
- /**
- * Catch-all query method for getting data from the backend for both JSON and CSV requests.
- *
- * The {@link DAQQuery} object will be a concrete subclass based on the combination of fields
- * defined in the user's query. The {@link AttributeBasedDeserializer} decides which class to
- * deserialize the information into and has been configured (see
- * QueryRestConfig#afterPropertiesSet) accordingly.
- *
- * @param query concrete implementation of {@link DAQQuery}
- * @param res the {@link HttpServletResponse} instance associated with this request
- * @throws IOException thrown if writing to the output stream fails
- */
- @RequestMapping(
- value = QUERY,
- method = RequestMethod.POST,
- consumes = {MediaType.APPLICATION_JSON_VALUE})
- public void executeQuery(@RequestBody @Valid DAQQuery query, HttpServletResponse res) throws Exception {
- try {
- LOGGER.debug("Executing query '{}'", query.toString());
-
- if (ResponseFormat.JSON.equals(query.getResponseFormat())) {
- // write the response back to the client using java 8 streams
- jsonResponseStreamWriter.respond(executeQuery(query), query, res);
- } else {
- // it's a CSV request
- executeQueryCsv(query, res);
- }
-
- } catch (Exception e) {
- LOGGER.error("Failed to execute query '{}'.", query, e);
- throw e;
- }
- }
-
/**
* Accepts the properties for the {@link DAQQuery} instance as stringified JSON query string.
*
@@ -176,59 +172,153 @@ public class QueryRestController {
value = QUERY,
method = RequestMethod.GET)
public void executeQueryBodyAsString(@RequestParam String jsonBody, HttpServletResponse res) throws Exception {
-
DAQQuery query = objectMapper.readValue(jsonBody, DAQQuery.class);
executeQuery(query, res);
-
+ }
+
+ /**
+ * Executes a single query.
+ *
+ * @param query the {@link DAQQuery}
+ * @param res the {@link HttpServletResponse} instance associated with this request
+ * @throws IOException thrown if writing to the output stream fails
+ */
+ @RequestMapping(
+ value = QUERY,
+ method = RequestMethod.POST,
+ consumes = {MediaType.APPLICATION_JSON_VALUE})
+ public void executeQuery(@RequestBody @Valid DAQQuery query, HttpServletResponse res) throws Exception {
+ executeQueries(new DAQQueries(query), res);
+ }
+
+ /**
+ * Accepts the properties for the {@link DAQQueries} instance as stringified JSON query string.
+ *
+ * @param jsonBody The {@link DAQQueries} properties sent as a JSON string, i.e. this is the
+ * stringified body of the POST request method
+ * @param res the current {@link HttpServletResponse} instance
+ * @throws Exception if reading the JSON string fails or if the subsequent call to
+ * {@link #executeQueries(DAQQueries, HttpServletResponse)} fails
+ */
+ @RequestMapping(
+ value = QUERIES,
+ method = RequestMethod.GET)
+ public void executeQueriesBodyAsString(@RequestParam String jsonBody, HttpServletResponse res) throws Exception {
+ DAQQueries queries = objectMapper.readValue(jsonBody, DAQQueries.class);
+ executeQueries(queries, res);
+ }
+
+ /**
+ * Catch-all query method for getting data from the backend for both JSON and CSV requests.
+ *
+ * The {@link DAQQueries} object will contain the concrete subclass based on the combination of
+ * fields defined in the user's query. The {@link AttributeBasedDeserializer} decides which class
+ * to deserialize the information into and has been configured (see
+ * QueryRestConfig#afterPropertiesSet) accordingly.
+ *
+ * @param queries the {@link DAQQueryElement}s
+ * @param res the {@link HttpServletResponse} instance associated with this request
+ * @throws IOException thrown if writing to the output stream fails
+ */
+ @RequestMapping(
+ value = QUERIES,
+ method = RequestMethod.POST,
+ consumes = {MediaType.APPLICATION_JSON_VALUE})
+ public void executeQueries(@RequestBody @Valid DAQQueries queries, HttpServletResponse res) throws Exception {
+ try {
+ LOGGER.debug("Executing queries '{}'", queries);
+
+ if (ResponseFormat.JSON.equals(queries.getResponseFormat())) {
+ // write the response back to the client using java 8 streams
+ jsonResponseStreamWriter.respond(executeQueries(queries), queries, res);
+ } else if (ResponseFormat.CSV.equals(queries.getResponseFormat())) {
+ // it's a CSV request
+ executeQueriesCsv(queries, res);
+ } else {
+ String message = String.format("Unsupported response format '%s'", queries.getResponseFormat().name());
+ LOGGER.error(message);
+ throw new RuntimeException(message);
+ }
+
+ } catch (Exception e) {
+ LOGGER.error("Failed to execute query '{}'.", queries, e);
+ throw e;
+ }
}
/**
* Returns data in CSV format to the client.
*/
- private void executeQueryCsv(DAQQuery query, HttpServletResponse res) throws Exception {
-
- if (!(query.getAggregationType() == null || AggregationType.value.equals(query.getAggregationType()))) {
- // We allow only no aggregation or value aggregation as
- // extrema: nested structure and not clear how to map it to one line
- // index: value is an array of Statistics whose size is not clear at initialization time
- String message = "CSV export does not support '" + query.getAggregationType() + "'";
- LOGGER.warn(message);
- throw new IllegalArgumentException(message);
+ private void executeQueriesCsv(DAQQueries queries, HttpServletResponse res) throws Exception {
+
+ for (DAQQueryElement query : queries) {
+ if (!(query.getAggregationType() == null || AggregationType.value.equals(query.getAggregationType()))) {
+ // We allow only no aggregation or value aggregation as
+ // extrema: nested structure and not clear how to map it to one line
+ // index: value is an array of Statistics whose size is not clear at initialization time
+ String message = "CSV export does not support '" + query.getAggregationType() + "'";
+ LOGGER.warn(message);
+ throw new IllegalArgumentException(message);
+ }
}
-
+
try {
- LOGGER.debug("Executing query '{}'", query.toString());
+ LOGGER.debug("Executing query '{}'", queries);
// write the response back to the client using java 8 streams
- csvResponseStreamWriter.respond(executeQuery(query), query, res);
+ csvResponseStreamWriter.respond(executeQueries(queries), queries, res);
} catch (Exception e) {
- LOGGER.error("Failed to execute query '{}'.", query, e);
+ LOGGER.error("Failed to execute query '{}'.", queries, e);
throw e;
}
-
+
}
- public Stream> executeQuery(DAQQuery query) {
- QueryAnalyzer queryAnalizer = queryAnalizerFactory.apply(query);
+ public List>>> executeQueries(DAQQueries queries) {
+ // set backends if not defined yet
+ channelNameCache.setBackends(queries);
- // all the magic happens here
- Stream>> channelToDataEvents =
- getQueryProcessor(query.getDbMode()).process(queryAnalizer);
+ List>>> results =
+ new ArrayList<>(queries.getQueries().size());
- // do post-process
- Stream> channelToData = queryAnalizer.postProcess(channelToDataEvents);
+ for (DAQQueryElement queryElement : queries) {
+ Stream> resultStreams =
+ queryElement
+ .getBackendQueries()
+ .stream()
+ .filter(query -> {
+ QueryProcessor processor = queryProcessors.get(query.getBackend());
+ if (processor != null) {
+ return true;
+ } else {
+ LOGGER.warn("There is no QueryProcessor available for '{}'", query.getBackend());
+ return false;
+ }
+ })
+ .flatMap(query -> {
+ QueryProcessor processor = queryProcessors.get(query.getBackend());
+ QueryAnalyzer queryAnalizer = queryAnalizerFactory.apply(query);
- return channelToData;
- }
+ // all the magic happens here
+ Stream>> channelToDataEvents =
+ processor.process(queryAnalizer);
+ // do post-process
+ Stream> channelToData = queryAnalizer.postProcess(channelToDataEvents);
- private QueryProcessor getQueryProcessor(DBMode dbMode) {
- if (DBMode.databuffer.equals(dbMode)) {
- return cassandraQueryProcessor.get();
- } else if (DBMode.archiverappliance.equals(dbMode)) {
- return archiverApplianceQueryProcessor.get();
- } else {
- LOGGER.error("Unknown DBMode '{}'!", dbMode);
- throw new IllegalArgumentException(String.format("Unknown DBMode '%s'", dbMode));
+ return channelToData.map(entry -> {
+ return Triple.of(query, entry.getKey(), entry.getValue());
+ });
+ });
+
+ // Now we have a stream that loads elements sequential BackendQuery by BackendQuery.
+ // By materializing the outer Stream the elements of all BackendQuery are loaded async
+ // (speeds things up but requires also more memory - i.e. it relies on Backends not loading
+ // all elements into memory at once)
+ resultStreams = resultStreams.collect(Collectors.toList()).stream();
+
+ results.add(Pair.of(queryElement, resultStreams));
}
+
+ return results;
}
/**
@@ -237,12 +327,8 @@ public class QueryRestController {
* @return list of {@link Ordering}s as String array
*/
@RequestMapping(value = "ordering", method = {RequestMethod.GET}, produces = {MediaType.APPLICATION_JSON_VALUE})
- public @ResponseBody List getOrderingValues() {
- List orderings = Lists.newArrayList(Ordering.values());
- return orderings.stream()
- .map((Ordering ord) -> {
- return ord.toString();
- }).collect(Collectors.toList());
+ public @ResponseBody List getOrderingValues() {
+ return Lists.newArrayList(Ordering.values());
}
/**
@@ -251,12 +337,8 @@ public class QueryRestController {
* @return list of {@link QueryField}s as String array
*/
@RequestMapping(value = "queryfields", method = {RequestMethod.GET}, produces = {MediaType.APPLICATION_JSON_VALUE})
- public @ResponseBody List getQueryFieldValues() {
- List orderings = Lists.newArrayList(QueryField.values());
- return orderings.stream()
- .map((QueryField qf) -> {
- return qf.toString();
- }).collect(Collectors.toList());
+ public @ResponseBody List getQueryFieldValues() {
+ return Lists.newArrayList(QueryField.values());
}
/**
@@ -265,12 +347,8 @@ public class QueryRestController {
* @return list of {@link Aggregation}s as String array
*/
@RequestMapping(value = "aggregations", method = {RequestMethod.GET}, produces = {MediaType.APPLICATION_JSON_VALUE})
- public @ResponseBody List getAggregationsValues() {
- List orderings = Lists.newArrayList(Aggregation.values());
- return orderings.stream()
- .map((Aggregation value) -> {
- return value.toString();
- }).collect(Collectors.toList());
+ public @ResponseBody List getAggregationsValues() {
+ return Lists.newArrayList(Aggregation.values());
}
/**
@@ -280,25 +358,17 @@ public class QueryRestController {
*/
@RequestMapping(value = "aggregationtypes", method = {RequestMethod.GET},
produces = {MediaType.APPLICATION_JSON_VALUE})
- public @ResponseBody List getAggregationTypesValues() {
- List orderings = Lists.newArrayList(AggregationType.values());
- return orderings.stream()
- .map((AggregationType value) -> {
- return value.toString();
- }).collect(Collectors.toList());
+ public @ResponseBody List getAggregationTypesValues() {
+ return Lists.newArrayList(AggregationType.values());
}
/**
- * Returns the current list of {@link DBMode} values.
+ * Returns the current list of {@link Backend} values.
*
- * @return list of {@link DBMode}s as String array
+ * @return list of {@link Backend}s as String array
*/
- @RequestMapping(value = "dbmodes", method = {RequestMethod.GET}, produces = {MediaType.APPLICATION_JSON_VALUE})
- public @ResponseBody List getDBModeValues() {
- List orderings = Lists.newArrayList(DBMode.values());
- return orderings.stream()
- .map((DBMode value) -> {
- return value.toString();
- }).collect(Collectors.toList());
+ @RequestMapping(value = "backends", method = {RequestMethod.GET}, produces = {MediaType.APPLICATION_JSON_VALUE})
+ public @ResponseBody List getDBModeValues() {
+ return Lists.newArrayList(Backend.values());
}
}
diff --git a/src/main/java/ch/psi/daq/queryrest/controller/validator/QueryValidator.java b/src/main/java/ch/psi/daq/queryrest/controller/validator/QueryValidator.java
index e1c0767..5d433a4 100644
--- a/src/main/java/ch/psi/daq/queryrest/controller/validator/QueryValidator.java
+++ b/src/main/java/ch/psi/daq/queryrest/controller/validator/QueryValidator.java
@@ -1,7 +1,7 @@
package ch.psi.daq.queryrest.controller.validator;
+import java.util.ArrayList;
import java.util.LinkedHashSet;
-import java.util.LinkedList;
import java.util.Set;
import javax.annotation.Resource;
@@ -10,10 +10,10 @@ import org.springframework.validation.Errors;
import org.springframework.validation.Validator;
import ch.psi.daq.query.model.Aggregation;
-import ch.psi.daq.query.model.DBMode;
import ch.psi.daq.query.model.QueryField;
+import ch.psi.daq.query.model.impl.DAQQueries;
import ch.psi.daq.query.model.impl.DAQQuery;
-import ch.psi.daq.cassandra.request.Request;
+import ch.psi.daq.query.model.impl.DAQQueryElement;
import ch.psi.daq.queryrest.config.QueryRestConfig;
public class QueryValidator implements Validator {
@@ -28,8 +28,8 @@ public class QueryValidator implements Validator {
* {@inheritDoc}
*/
@Override
- public boolean supports(Class> clazz) {
- return DAQQuery.class.isAssignableFrom(clazz);
+ public boolean supports(Class> clazz) {
+ return DAQQuery.class.isAssignableFrom(clazz) || DAQQueries.class.isAssignableFrom(clazz);
}
/**
@@ -37,25 +37,24 @@ public class QueryValidator implements Validator {
*/
@Override
public void validate(Object target, Errors errors) {
-
- DAQQuery query = (DAQQuery) target;
-
- Request request = query.getRequest();
-
- if (DBMode.archiverappliance.equals(query.getDbMode())) {
- if (!request.getRequestRange().isTimeRangeDefined()) {
- errors.reject("dbMode", "ArchiverAppliance supports time range queries only!");
+ if (target instanceof DAQQuery) {
+ this.checkElement((DAQQuery) target);
+ }else if(target instanceof DAQQueries){
+ DAQQueries queries = (DAQQueries) target;
+ for (DAQQueryElement daqQueryElement : queries) {
+ this.checkElement(daqQueryElement);
}
}
+ }
+ private void checkElement(DAQQueryElement query) {
// set default values (if not set)
if (query.getFields() == null || query.getFields().isEmpty()) {
query.setFields(new LinkedHashSet<>(defaultResponseFields));
}
if (query.getAggregations() == null || query.getAggregations().isEmpty()) {
- query.setAggregations(new LinkedList<>(defaultResponseAggregations));
+ query.setAggregations(new ArrayList<>(defaultResponseAggregations));
}
}
-
}
diff --git a/src/main/java/ch/psi/daq/queryrest/response/AbstractResponseStreamWriter.java b/src/main/java/ch/psi/daq/queryrest/response/AbstractResponseStreamWriter.java
index 4e45abb..6108127 100644
--- a/src/main/java/ch/psi/daq/queryrest/response/AbstractResponseStreamWriter.java
+++ b/src/main/java/ch/psi/daq/queryrest/response/AbstractResponseStreamWriter.java
@@ -10,13 +10,8 @@ import javax.servlet.http.HttpServletResponse;
import org.springframework.http.MediaType;
import ch.psi.daq.query.model.ResponseFormat;
-import ch.psi.daq.query.model.impl.DAQQuery;
+import ch.psi.daq.query.model.ResponseOptions;
-
-/**
- * @author zellweger_c
- *
- */
public abstract class AbstractResponseStreamWriter implements ResponseStreamWriter {
public static final String CONTENT_TYPE_CSV = "text/csv";
@@ -32,24 +27,24 @@ public abstract class AbstractResponseStreamWriter implements ResponseStreamWrit
* see http://tools.ietf.org/html/rfc2616#section-14.11 and
* see http://tools.ietf.org/html/rfc2616#section-3.5
*
- * @param query The query
+ * @param options The options for the response
* @param response The HttpServletResponse
* @param contentType The content type
* @return OutputStream The OutputStream
* @throws Exception Something goes wrong
*/
- protected OutputStream handleCompressionAndResponseHeaders(DAQQuery query, HttpServletResponse response,
+ protected OutputStream handleCompressionAndResponseHeaders(ResponseOptions options, HttpServletResponse response,
String contentType) throws Exception {
OutputStream out = response.getOutputStream();
response.addHeader("Content-Type", contentType);
- if (query.isCompressed()) {
- String filename = "data." + query.getCompression().getFileSuffix();
+ if (options.isCompressed()) {
+ String filename = "data." + options.getCompression().getFileSuffix();
response.addHeader("Content-Disposition", "attachment; filename=" + filename);
- response.addHeader("Content-Encoding", query.getCompression().toString());
- out = query.getCompression().wrapStream(out);
+ response.addHeader("Content-Encoding", options.getCompression().toString());
+ out = options.getCompression().wrapStream(out);
} else {
- String filename = "data." + (query.getResponseFormat() == ResponseFormat.CSV ? "csv" : "json");
+ String filename = "data." + (options.getResponseFormat() == ResponseFormat.CSV ? "csv" : "json");
response.addHeader("Content-Disposition", "attachment; filename=" + filename);
}
diff --git a/src/main/java/ch/psi/daq/queryrest/response/ResponseStreamWriter.java b/src/main/java/ch/psi/daq/queryrest/response/ResponseStreamWriter.java
index 53d6be3..387a6bc 100644
--- a/src/main/java/ch/psi/daq/queryrest/response/ResponseStreamWriter.java
+++ b/src/main/java/ch/psi/daq/queryrest/response/ResponseStreamWriter.java
@@ -1,13 +1,18 @@
package ch.psi.daq.queryrest.response;
-import java.io.IOException;
+import java.util.List;
import java.util.Map.Entry;
import java.util.stream.Stream;
import javax.servlet.ServletResponse;
import javax.servlet.http.HttpServletResponse;
-import ch.psi.daq.query.model.impl.DAQQuery;
+import org.apache.commons.lang3.tuple.Triple;
+
+import ch.psi.daq.query.model.ResponseOptions;
+import ch.psi.daq.query.model.impl.BackendQuery;
+import ch.psi.daq.query.model.impl.DAQQueryElement;
+import ch.psi.daq.query.request.ChannelName;
public interface ResponseStreamWriter {
@@ -15,10 +20,11 @@ public interface ResponseStreamWriter {
* Responding with the the contents of the stream by writing into the output stream of the
* {@link ServletResponse}.
*
- * @param stream Mapping from channel name to data
- * @param query concrete instance of {@link DAQQuery}
+ * @param results The results results
+ * @param options The options for the response
* @param response {@link ServletResponse} instance given by the current HTTP request
- * @throws IOException thrown if writing to the output stream fails
+ * @throws Exception thrown if writing to the output stream fails
*/
- public void respond(Stream> stream, DAQQuery query, HttpServletResponse response) throws Exception;
+ public void respond(List>>> results, ResponseOptions options,
+ HttpServletResponse response) throws Exception;
}
diff --git a/src/main/java/ch/psi/daq/queryrest/response/csv/CSVResponseStreamWriter.java b/src/main/java/ch/psi/daq/queryrest/response/csv/CSVResponseStreamWriter.java
index 7d762b4..aa764eb 100644
--- a/src/main/java/ch/psi/daq/queryrest/response/csv/CSVResponseStreamWriter.java
+++ b/src/main/java/ch/psi/daq/queryrest/response/csv/CSVResponseStreamWriter.java
@@ -16,6 +16,7 @@ import javax.annotation.Resource;
import javax.servlet.ServletResponse;
import javax.servlet.http.HttpServletResponse;
+import org.apache.commons.lang3.tuple.Triple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.supercsv.cellprocessor.constraint.NotNull;
@@ -24,16 +25,19 @@ import org.supercsv.io.dozer.CsvDozerBeanWriter;
import org.supercsv.io.dozer.ICsvDozerBeanWriter;
import org.supercsv.prefs.CsvPreference;
+import com.fasterxml.jackson.core.JsonEncoding;
+
import ch.psi.daq.domain.DataEvent;
import ch.psi.daq.query.analyzer.QueryAnalyzer;
import ch.psi.daq.query.model.Aggregation;
import ch.psi.daq.query.model.Query;
import ch.psi.daq.query.model.QueryField;
-import ch.psi.daq.query.model.impl.DAQQuery;
+import ch.psi.daq.query.model.ResponseOptions;
+import ch.psi.daq.query.model.impl.BackendQuery;
+import ch.psi.daq.query.model.impl.DAQQueryElement;
+import ch.psi.daq.query.request.ChannelName;
import ch.psi.daq.queryrest.response.AbstractResponseStreamWriter;
-import com.fasterxml.jackson.core.JsonEncoding;
-
/**
* Takes a Java 8 stream and writes it to the output stream provided by the {@link ServletResponse}
* of the current request.
@@ -44,53 +48,103 @@ public class CSVResponseStreamWriter extends AbstractResponseStreamWriter {
private static final char DELIMITER_ARRAY = ',';
private static final Logger LOGGER = LoggerFactory.getLogger(CSVResponseStreamWriter.class);
-
+
@Resource
private Function queryAnalizerFactory;
@Override
- public void respond(Stream> stream, DAQQuery query, HttpServletResponse response) throws Exception {
+ public void respond(List>>> results,
+ ResponseOptions options,
+ HttpServletResponse response) throws Exception {
response.setCharacterEncoding(JsonEncoding.UTF8.getJavaName());
response.setContentType(CONTENT_TYPE_CSV);
- respondInternal(stream, query, response);
+ respondInternal(results, options, response);
}
- private void respondInternal(Stream> stream, DAQQuery query, HttpServletResponse response)
- throws Exception {
+ private void respondInternal(List>>> results,
+ ResponseOptions options, HttpServletResponse response) throws Exception {
+ AtomicReference exception = new AtomicReference<>();
+ OutputStream out = handleCompressionAndResponseHeaders(options, response, CONTENT_TYPE_CSV);
+ List beanWriters = new ArrayList<>();
- Set queryFields = query.getFields();
- List aggregations = query.getAggregations();
- int aggregationsSize = (aggregations != null ? aggregations.size() : 0);
+ results.forEach(entry -> {
+ DAQQueryElement query = entry.getKey();
- Set fieldMapping = new LinkedHashSet<>(queryFields.size() + aggregationsSize);
- List header = new ArrayList<>(queryFields.size() + aggregationsSize);
+ Set queryFields = query.getFields();
+ List aggregations = query.getAggregations();
+ int aggregationsSize = (aggregations != null ? aggregations.size() : 0);
- CellProcessor[] processors = setupCellProcessors(query, fieldMapping, header);
-
- OutputStream out = handleCompressionAndResponseHeaders(query, response, CONTENT_TYPE_CSV);
- ICsvDozerBeanWriter beanWriter = setupBeanWriter(fieldMapping, out);
+ Set fieldMapping = new LinkedHashSet<>(queryFields.size() + aggregationsSize);
+ List header = new ArrayList<>(queryFields.size() + aggregationsSize);
- writeToOutput(stream, header, processors, beanWriter);
-
+ AtomicReference processorsRef = new AtomicReference<>();
+
+ entry.getValue()
+ .sequential()
+ .forEach(triple -> {
+ try {
+ CellProcessor[] processors = processorsRef.get();
+ ICsvDozerBeanWriter beanWriter;
+
+ if (processors == null) {
+ processors = setupCellProcessors(query, triple.getLeft(), fieldMapping, header);
+ processorsRef.set(processors);
+
+ beanWriter = setupBeanWriter(fieldMapping, out);
+ beanWriters.add(beanWriter);
+ } else {
+ beanWriter = beanWriters.get(beanWriters.size() - 1);
+ }
+
+ writeToOutput(triple, processors, beanWriter, header);
+ } catch (Exception e) {
+ LOGGER.warn("Could not write CSV of '{}'", triple.getMiddle(), e);
+ exception.compareAndSet(null, e);
+ }
+ });
+
+ if (!beanWriters.isEmpty()) {
+ try {
+ beanWriters.get(beanWriters.size() - 1).flush();
+ } catch (Exception e) {
+ LOGGER.error("Could not flush ICsvDozerBeanWriter.", e);
+ exception.compareAndSet(null, e);
+ }
+ }
+ });
+
+ for (ICsvDozerBeanWriter beanWriter : beanWriters) {
+ try {
+ beanWriter.close();
+ } catch (Exception e) {
+ LOGGER.error("Could not close ICsvDozerBeanWriter.", e);
+ exception.compareAndSet(null, e);
+ }
+ }
+
+ if (exception.get() != null) {
+ throw exception.get();
+ }
}
/**
- * Sets up the bean writer instance.
- * @throws Exception
- *
- */
- private ICsvDozerBeanWriter setupBeanWriter(Set fieldMapping, OutputStream out) throws Exception {
- CsvPreference preference = new CsvPreference.Builder(
- (char) CsvPreference.EXCEL_NORTH_EUROPE_PREFERENCE.getQuoteChar(),
- DELIMITER_CVS,
- CsvPreference.EXCEL_NORTH_EUROPE_PREFERENCE.getEndOfLineSymbols()).build();
-
- ICsvDozerBeanWriter beanWriter = new CsvDozerBeanWriter(new OutputStreamWriter(out), preference);
- // configure the mapping from the fields to the CSV columns
- beanWriter.configureBeanMapping(DataEvent.class, fieldMapping.toArray(new String[fieldMapping.size()]));
- return beanWriter;
- }
+ * Sets up the bean writer instance.
+ *
+ * @throws Exception
+ *
+ */
+ private ICsvDozerBeanWriter setupBeanWriter(Set fieldMapping, OutputStream out) throws Exception {
+ CsvPreference preference = new CsvPreference.Builder(
+ (char) CsvPreference.EXCEL_NORTH_EUROPE_PREFERENCE.getQuoteChar(),
+ DELIMITER_CVS,
+ CsvPreference.EXCEL_NORTH_EUROPE_PREFERENCE.getEndOfLineSymbols()).build();
+
+ ICsvDozerBeanWriter beanWriter = new CsvDozerBeanWriter(new OutputStreamWriter(out), preference);
+ // configure the mapping from the fields to the CSV columns
+ beanWriter.configureBeanMapping(DataEvent.class, fieldMapping.toArray(new String[fieldMapping.size()]));
+ return beanWriter;
+ }
/**
* Sets up the array of {@link CellProcessor}s needed for later configuration of the bean writer.
@@ -101,34 +155,36 @@ public class CSVResponseStreamWriter extends AbstractResponseStreamWriter {
* with other processors to fully automate all of the required conversions and constraint
* validation for a single CSV column.
*
- * @param query The current {@link DAQQuery}
+ * @param daqQuery The current {@link DAQQueryElement}
+ * @param backendQuery One BackendQuery of the current {@link DAQQueryElement}
* @return Array of {@link CellProcessor} entries
*/
- private CellProcessor[] setupCellProcessors(DAQQuery query, Set fieldMapping, List header) {
- Set queryFields = query.getFields();
- List aggregations = query.getAggregations();
-
+ private CellProcessor[] setupCellProcessors(DAQQueryElement daqQuery, BackendQuery backendQuery,
+ Set fieldMapping, List header) {
+ Set queryFields = daqQuery.getFields();
+ List aggregations = daqQuery.getAggregations();
+
List processorSet =
new ArrayList<>(queryFields.size() + (aggregations != null ? aggregations.size() : 0));
-
+
boolean isNewField;
- QueryAnalyzer queryAnalyzer = queryAnalizerFactory.apply(query);
-
+ QueryAnalyzer queryAnalyzer = queryAnalizerFactory.apply(backendQuery);
+
for (QueryField field : queryFields) {
- if(!(QueryField.value.equals(field) && queryAnalyzer.isAggregationEnabled())){
+ if (!(QueryField.value.equals(field) && queryAnalyzer.isAggregationEnabled())) {
isNewField = fieldMapping.add(field.name());
-
+
if (isNewField) {
header.add(field.name());
processorSet.add(new ArrayProcessor(new NotNull(), DELIMITER_ARRAY));
}
}
}
-
+
if (aggregations != null && queryAnalyzer.isAggregationEnabled()) {
- for (Aggregation aggregation : query.getAggregations()) {
+ for (Aggregation aggregation : daqQuery.getAggregations()) {
isNewField = fieldMapping.add("value." + aggregation.name());
-
+
if (isNewField) {
header.add(aggregation.name());
processorSet.add(new ArrayProcessor(new NotNull(), DELIMITER_ARRAY));
@@ -139,44 +195,27 @@ public class CSVResponseStreamWriter extends AbstractResponseStreamWriter {
}
@SuppressWarnings("unchecked")
- private void writeToOutput(Stream> stream, List header, CellProcessor[] processors,
- ICsvDozerBeanWriter beanWriter) throws IOException, Exception {
- AtomicReference exception = new AtomicReference<>();
- try {
-
+ private void writeToOutput(Triple triple, CellProcessor[] processors,
+ ICsvDozerBeanWriter beanWriter, List header) throws IOException {
+ if (triple.getRight() instanceof Stream) {
+ beanWriter.writeComment("");
+ beanWriter.writeComment("Start of " + triple.getMiddle());
beanWriter.writeHeader(header.toArray(new String[header.size()]));
- stream
- /* ensure elements are sequentially written */
- .sequential()
+ Stream eventStream = (Stream) triple.getRight();
+ eventStream
.forEach(
- entry -> {
- if (entry.getValue() instanceof Stream) {
- Stream eventStream = (Stream) entry.getValue();
- eventStream
- .forEach(
- event -> {
- try {
- beanWriter.write(event, processors);
- } catch (Exception e) {
- LOGGER.error("Could not write elements for '{}-{}'", event.getChannel(),
- event.getPulseId(), e);
- exception.compareAndSet(null, e);
- }
- });
- } else {
- String message = "Type '" + entry.getValue().getClass() + "' not supported.";
- LOGGER.error(message);
- exception.compareAndSet(null, new RuntimeException(message));
- }
- }
- );
- } finally {
- beanWriter.close();
- }
-
- if (exception.get() != null) {
- throw exception.get();
+ event -> {
+ try {
+ beanWriter.write(event, processors);
+ } catch (Exception e) {
+ LOGGER.error("Could not write elements for '{}-{}'", event.getChannel(),
+ event.getPulseId(), e);
+ }
+ });
+ } else {
+ String message = "Type '" + triple.getRight().getClass() + "' not supported.";
+ LOGGER.error(message);
}
}
diff --git a/src/main/java/ch/psi/daq/queryrest/response/json/JSONResponseStreamWriter.java b/src/main/java/ch/psi/daq/queryrest/response/json/JSONResponseStreamWriter.java
index 850c73a..5ea08e9 100644
--- a/src/main/java/ch/psi/daq/queryrest/response/json/JSONResponseStreamWriter.java
+++ b/src/main/java/ch/psi/daq/queryrest/response/json/JSONResponseStreamWriter.java
@@ -1,6 +1,5 @@
package ch.psi.daq.queryrest.response.json;
-import java.io.IOException;
import java.io.OutputStream;
import java.util.LinkedHashSet;
import java.util.List;
@@ -13,15 +12,11 @@ import javax.annotation.Resource;
import javax.servlet.ServletResponse;
import javax.servlet.http.HttpServletResponse;
+import org.apache.commons.lang3.tuple.Triple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.MediaType;
-import ch.psi.daq.query.model.Aggregation;
-import ch.psi.daq.query.model.QueryField;
-import ch.psi.daq.query.model.impl.DAQQuery;
-import ch.psi.daq.queryrest.response.AbstractResponseStreamWriter;
-
import com.fasterxml.jackson.core.JsonEncoding;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
@@ -30,6 +25,14 @@ import com.fasterxml.jackson.databind.ObjectWriter;
import com.fasterxml.jackson.databind.ser.impl.SimpleBeanPropertyFilter;
import com.fasterxml.jackson.databind.ser.impl.SimpleFilterProvider;
+import ch.psi.daq.query.model.Aggregation;
+import ch.psi.daq.query.model.QueryField;
+import ch.psi.daq.query.model.ResponseOptions;
+import ch.psi.daq.query.model.impl.BackendQuery;
+import ch.psi.daq.query.model.impl.DAQQueryElement;
+import ch.psi.daq.query.request.ChannelName;
+import ch.psi.daq.queryrest.response.AbstractResponseStreamWriter;
+
/**
* Takes a Java 8 stream and writes it to the output stream provided by the {@link ServletResponse}
* of the current request.
@@ -38,7 +41,7 @@ public class JSONResponseStreamWriter extends AbstractResponseStreamWriter {
private static final String DATA_RESP_FIELD = "data";
- private static final Logger logger = LoggerFactory.getLogger(JSONResponseStreamWriter.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(JSONResponseStreamWriter.class);
@Resource
private JsonFactory jsonFactory;
@@ -46,19 +49,75 @@ public class JSONResponseStreamWriter extends AbstractResponseStreamWriter {
@Resource
private ObjectMapper mapper;
+
@Override
- public void respond(Stream> stream, DAQQuery query, HttpServletResponse response) throws Exception {
+ public void respond(List>>> results,
+ ResponseOptions options, HttpServletResponse response) throws Exception {
response.setCharacterEncoding(JsonEncoding.UTF8.getJavaName());
response.setContentType(MediaType.APPLICATION_JSON_VALUE);
- Set includedFields = getFields(query);
-
- ObjectWriter writer = configureWriter(includedFields);
-
- respondInternal(stream, response, writer, query);
+ respondInternal(results, options, response);
}
- protected Set getFields(DAQQuery query) {
+ private void respondInternal(List>>> results,
+ ResponseOptions options, HttpServletResponse response) throws Exception {
+ AtomicReference exception = new AtomicReference<>();
+ OutputStream out = handleCompressionAndResponseHeaders(options, response, CONTENT_TYPE_JSON);
+ JsonGenerator generator = jsonFactory.createGenerator(out, JsonEncoding.UTF8);
+
+ try {
+ if (results.size() > 1) {
+ generator.writeStartArray();
+ }
+
+ results
+ .forEach(entryy -> {
+ DAQQueryElement daqQuery = entryy.getKey();
+ Set includedFields = getFields(daqQuery);
+ ObjectWriter writer = configureWriter(includedFields);
+
+ try {
+ generator.writeStartArray();
+
+ entryy.getValue()
+ /* ensure elements are sequentially written */
+ .sequential()
+ .forEach(
+ triple -> {
+ try {
+ generator.writeStartObject();
+ generator.writeFieldName(QueryField.channel.name());
+ writer.writeValue(generator, triple.getMiddle());
+ generator.writeFieldName(DATA_RESP_FIELD);
+ writer.writeValue(generator, triple.getRight());
+ generator.writeEndObject();
+ } catch (Exception e) {
+ LOGGER.error("Could not write channel name of channel '{}'", triple.getMiddle(), e);
+ exception.compareAndSet(null, e);
+ }
+ });
+
+ generator.writeEndArray();
+ } catch (Exception e) {
+ LOGGER.error("Exception while writing json for '{}'", daqQuery.getChannels(), e);
+ exception.compareAndSet(null, e);
+ }
+ });
+ } finally {
+ if (results.size() > 1) {
+ generator.writeEndArray();
+ }
+
+ generator.flush();
+ generator.close();
+ }
+
+ if (exception.get() != null) {
+ throw exception.get();
+ }
+ }
+
+ protected Set getFields(DAQQueryElement query) {
Set queryFields = query.getFields();
List aggregations = query.getAggregations();
@@ -95,52 +154,4 @@ public class JSONResponseStreamWriter extends AbstractResponseStreamWriter {
ObjectWriter writer = mapper.writer(propertyFilter);
return writer;
}
-
- /**
- * Writes the outer Java stream into the output stream.
- *
- * @param stream Mapping from channel name to data
- * @param response {@link ServletResponse} instance given by the current HTTP request
- * @param writer configured writer that includes the fields the end user wants to see
- * @param query
- * @throws IOException thrown if writing to the output stream fails
- */
- private void respondInternal(Stream> stream, HttpServletResponse response, ObjectWriter writer, DAQQuery query)
- throws Exception {
-
- OutputStream out = handleCompressionAndResponseHeaders(query, response, CONTENT_TYPE_JSON);
- JsonGenerator generator = jsonFactory.createGenerator(out, JsonEncoding.UTF8);
-
- AtomicReference exception = new AtomicReference<>();
- try {
- generator.writeStartArray();
- stream
- /* ensure elements are sequentially written */
- .sequential()
- .forEach(
- entry -> {
- try {
- generator.writeStartObject();
- generator.writeStringField(QueryField.channel.name(), entry.getKey());
-
- generator.writeFieldName(DATA_RESP_FIELD);
- writer.writeValue(generator, entry.getValue());
-
- generator.writeEndObject();
- } catch (Exception e) {
- logger.error("Could not write channel name of channel '{}'", entry.getKey(), e);
- exception.compareAndSet(null, e);
- }
- });
- generator.writeEndArray();
- } finally {
- generator.flush();
- generator.close();
- }
-
- if (exception.get() != null) {
- throw exception.get();
- }
- }
-
}
diff --git a/src/main/java/ch/psi/daq/queryrest/response/json/JsonByteArraySerializer.java b/src/main/java/ch/psi/daq/queryrest/response/json/JsonByteArraySerializer.java
deleted file mode 100644
index dee1d8d..0000000
--- a/src/main/java/ch/psi/daq/queryrest/response/json/JsonByteArraySerializer.java
+++ /dev/null
@@ -1,30 +0,0 @@
-package ch.psi.daq.queryrest.response.json;
-
-import java.io.IOException;
-
-import com.fasterxml.jackson.core.JsonGenerationException;
-import com.fasterxml.jackson.core.JsonGenerator;
-import com.fasterxml.jackson.databind.SerializerProvider;
-import com.fasterxml.jackson.databind.ser.std.StdSerializer;
-
-// see: http://stackoverflow.com/a/15037329
-public class JsonByteArraySerializer extends StdSerializer {
- private static final long serialVersionUID = -5914688899857435263L;
-
- public JsonByteArraySerializer() {
- super(byte[].class, true);
- }
-
- @Override
- public void serialize(byte[] bytes, JsonGenerator jgen, SerializerProvider provider) throws IOException,
- JsonGenerationException {
- jgen.writeStartArray();
-
- for (byte b : bytes) {
- // stackoverflow example used a mask -> ?
- jgen.writeNumber(b);
- }
-
- jgen.writeEndArray();
- }
-}
diff --git a/src/main/java/ch/psi/daq/queryrest/response/json/JsonStreamSerializer.java b/src/main/java/ch/psi/daq/queryrest/response/json/JsonStreamSerializer.java
deleted file mode 100644
index 61fede0..0000000
--- a/src/main/java/ch/psi/daq/queryrest/response/json/JsonStreamSerializer.java
+++ /dev/null
@@ -1,24 +0,0 @@
-package ch.psi.daq.queryrest.response.json;
-
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.stream.Stream;
-
-import com.fasterxml.jackson.core.JsonGenerationException;
-import com.fasterxml.jackson.core.JsonGenerator;
-import com.fasterxml.jackson.databind.SerializerProvider;
-import com.fasterxml.jackson.databind.ser.std.StdSerializer;
-
-public class JsonStreamSerializer extends StdSerializer>{
- private static final long serialVersionUID = 4695859735299703478L;
-
- public JsonStreamSerializer() {
- super(Stream.class, true);
- }
-
- @Override
- public void serialize(Stream> stream, JsonGenerator jgen, SerializerProvider provider) throws IOException,
- JsonGenerationException {
- provider.findValueSerializer(Iterator.class, null).serialize(stream.iterator(), jgen, provider);
- }
-}
diff --git a/src/test/java/ch/psi/daq/test/queryrest/DaqWebMvcConfig.java b/src/test/java/ch/psi/daq/test/queryrest/DaqWebMvcConfig.java
index 50f85b5..f92b2ae 100644
--- a/src/test/java/ch/psi/daq/test/queryrest/DaqWebMvcConfig.java
+++ b/src/test/java/ch/psi/daq/test/queryrest/DaqWebMvcConfig.java
@@ -12,11 +12,13 @@ import org.springframework.web.servlet.config.annotation.WebMvcConfigurationSupp
import ch.psi.daq.cassandra.reader.CassandraReader;
import ch.psi.daq.cassandra.util.test.CassandraDataGen;
+import ch.psi.daq.domain.reader.DataReader;
import ch.psi.daq.query.processor.QueryProcessor;
import ch.psi.daq.query.processor.QueryProcessorLocal;
import ch.psi.daq.test.cassandra.admin.CassandraTestAdmin;
import ch.psi.daq.test.cassandra.admin.CassandraTestAdminImpl;
import ch.psi.daq.test.query.config.LocalQueryTestConfig;
+import ch.psi.daq.test.queryrest.query.DummyArchiverApplianceReader;
import ch.psi.daq.test.queryrest.query.DummyCassandraReader;
@Configuration
@@ -45,6 +47,18 @@ public class DaqWebMvcConfig extends WebMvcConfigurationSupport {
return new DummyCassandraReader();
}
+// @Bean
+// @Lazy
+// public DataReader archiverApplianceReader() {
+// return new DummyArchiverApplianceReader();
+// }
+//
+// @Bean
+// @Lazy
+// public QueryProcessor archiverApplianceQueryProcessor() {
+// return new QueryProcessorLocal(archiverApplianceReader());
+// }
+
@Bean
@Lazy
public CassandraTestAdmin cassandraTestAdmin() {
diff --git a/src/test/java/ch/psi/daq/test/queryrest/controller/QueryRestControllerCsvTest.java b/src/test/java/ch/psi/daq/test/queryrest/controller/QueryRestControllerCsvTest.java
index 9700f38..6e3fcec 100644
--- a/src/test/java/ch/psi/daq/test/queryrest/controller/QueryRestControllerCsvTest.java
+++ b/src/test/java/ch/psi/daq/test/queryrest/controller/QueryRestControllerCsvTest.java
@@ -34,7 +34,9 @@ import ch.psi.daq.query.model.AggregationType;
import ch.psi.daq.query.model.Compression;
import ch.psi.daq.query.model.QueryField;
import ch.psi.daq.query.model.ResponseFormat;
+import ch.psi.daq.query.model.impl.DAQQueries;
import ch.psi.daq.query.model.impl.DAQQuery;
+import ch.psi.daq.query.model.impl.DAQQueryElement;
import ch.psi.daq.queryrest.controller.QueryRestController;
import ch.psi.daq.queryrest.filter.CorsFilter;
import ch.psi.daq.test.cassandra.admin.CassandraTestAdmin;
@@ -69,9 +71,8 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest {
0,
1),
TEST_CHANNEL_NAMES);
- request.setCompression(Compression.NONE);
request.setResponseFormat(ResponseFormat.CSV);
-
+
LinkedHashSet queryFields = new LinkedHashSet<>();
LinkedHashSet cellProcessors = new LinkedHashSet<>();
queryFields.add(QueryField.channel);
@@ -109,10 +110,14 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest {
CellProcessor[] processors = cellProcessors.toArray(new CellProcessor[cellProcessors.size()]);
String response = result.getResponse().getContentAsString();
+ System.out.println("Response: " + response);
+
ICsvMapReader mapReader =
new CsvMapReader(new StringReader(response), CsvPreference.EXCEL_NORTH_EUROPE_PREFERENCE);
try {
- final String[] header = mapReader.getHeader(true);
+ // read comment
+ mapReader.read("");
+ String[] header = mapReader.getHeader(false);
Map customerMap;
long resultsPerChannel = 2;
long pulse = 0;
@@ -131,13 +136,111 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest {
pulse = ++pulse % resultsPerChannel;
if (pulse == 0) {
++channelCount;
+ if (channelCount <= TEST_CHANNEL_NAMES.length) {
+ // read comment (empty rows are skiped
+ mapReader.read("");
+ header = mapReader.getHeader(false);
+ }
}
}
} finally {
mapReader.close();
}
}
-
+
+ @Test
+ public void testPulseRangeQueries() throws Exception {
+ String testChannel3 = "testChannel3";
+ DAQQueries request = new DAQQueries(
+ new DAQQueryElement(
+ new RequestRangePulseId(
+ 0,
+ 1),
+ TEST_CHANNEL_NAMES),
+ new DAQQueryElement(
+ new RequestRangePulseId(
+ 0,
+ 1),
+ testChannel3));
+ request.setResponseFormat(ResponseFormat.CSV);
+
+ LinkedHashSet queryFields = new LinkedHashSet<>();
+ LinkedHashSet cellProcessors = new LinkedHashSet<>();
+ queryFields.add(QueryField.channel);
+ cellProcessors.add(new NotNull());
+ queryFields.add(QueryField.pulseId);
+ cellProcessors.add(new NotNull());
+ queryFields.add(QueryField.iocMillis);
+ cellProcessors.add(new NotNull());
+ queryFields.add(QueryField.iocNanos);
+ cellProcessors.add(new NotNull());
+ queryFields.add(QueryField.globalMillis);
+ cellProcessors.add(new NotNull());
+ queryFields.add(QueryField.globalNanos);
+ cellProcessors.add(new NotNull());
+ queryFields.add(QueryField.shape);
+ cellProcessors.add(new NotNull());
+ queryFields.add(QueryField.eventCount);
+ cellProcessors.add(new NotNull());
+ queryFields.add(QueryField.value);
+ cellProcessors.add(new NotNull());
+ for (DAQQueryElement element : request) {
+ element.setFields(queryFields);
+ }
+
+ String content = mapper.writeValueAsString(request);
+ System.out.println(content);
+
+ MvcResult result = this.mockMvc
+ .perform(MockMvcRequestBuilders
+ .post(QueryRestController.QUERIES)
+ .contentType(MediaType.APPLICATION_JSON)
+ .content(content))
+ .andDo(MockMvcResultHandlers.print())
+ .andExpect(MockMvcResultMatchers.status().isOk())
+ .andReturn();
+
+ CellProcessor[] processors = cellProcessors.toArray(new CellProcessor[cellProcessors.size()]);
+
+ String response = result.getResponse().getContentAsString();
+ System.out.println("Response: " + response);
+
+ ICsvMapReader mapReader =
+ new CsvMapReader(new StringReader(response), CsvPreference.EXCEL_NORTH_EUROPE_PREFERENCE);
+ try {
+ // read comment
+ mapReader.read("");
+ String[] header = mapReader.getHeader(false);
+ Map customerMap;
+ long resultsPerChannel = 2;
+ long pulse = 0;
+ long channelCount = 1;
+ while ((customerMap = mapReader.read(header, processors)) != null) {
+ assertEquals(TEST_CHANNEL + channelCount, customerMap.get(QueryField.channel.name()));
+ assertEquals("" + pulse, customerMap.get(QueryField.pulseId.name()));
+ assertEquals("" + pulse * 10, customerMap.get(QueryField.iocMillis.name()));
+ assertEquals("0", customerMap.get(QueryField.iocNanos.name()));
+ assertEquals("" + pulse * 10, customerMap.get(QueryField.globalMillis.name()));
+ assertEquals("0", customerMap.get(QueryField.globalNanos.name()));
+ assertEquals("[1]", customerMap.get(QueryField.shape.name()));
+ assertEquals("1", customerMap.get(QueryField.eventCount.name()));
+ assertEquals("" + pulse, customerMap.get(QueryField.value.name()));
+
+ pulse = ++pulse % resultsPerChannel;
+ if (pulse == 0) {
+ ++channelCount;
+ if (channelCount <= TEST_CHANNEL_NAMES.length + 1) {
+ // read comment (empty rows are skiped
+ mapReader.read("");
+ header = mapReader.getHeader(false);
+ }
+ }
+ }
+ } finally {
+ mapReader.close();
+ }
+ }
+
@Test
public void testPulseRangeQueryWaveform() throws Exception {
String channelName = "XYWaveform";
@@ -145,10 +248,9 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest {
new RequestRangePulseId(
0,
1),
- channelName);
- request.setCompression(Compression.NONE);
+ channelName);
request.setResponseFormat(ResponseFormat.CSV);
-
+
LinkedHashSet queryFields = new LinkedHashSet<>();
LinkedHashSet cellProcessors = new LinkedHashSet<>();
queryFields.add(QueryField.channel);
@@ -186,10 +288,14 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest {
CellProcessor[] processors = cellProcessors.toArray(new CellProcessor[cellProcessors.size()]);
String response = result.getResponse().getContentAsString();
+ System.out.println("Response: " + response);
+
ICsvMapReader mapReader =
new CsvMapReader(new StringReader(response), CsvPreference.EXCEL_NORTH_EUROPE_PREFERENCE);
try {
- final String[] header = mapReader.getHeader(true);
+ // read comment
+ mapReader.read("");
+ final String[] header = mapReader.getHeader(false);
Map customerMap;
long pulse = 0;
while ((customerMap = mapReader.read(header, processors)) != null) {
@@ -218,9 +324,8 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest {
0,
10),
TEST_CHANNEL_NAMES);
- request.setCompression(Compression.NONE);
request.setResponseFormat(ResponseFormat.CSV);
-
+
LinkedHashSet queryFields = new LinkedHashSet<>();
LinkedHashSet cellProcessors = new LinkedHashSet<>();
queryFields.add(QueryField.channel);
@@ -258,10 +363,14 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest {
CellProcessor[] processors = cellProcessors.toArray(new CellProcessor[cellProcessors.size()]);
String response = result.getResponse().getContentAsString();
- ICsvMapReader mapReader =
+ System.out.println("Response: " + response);
+
+ CsvMapReader mapReader =
new CsvMapReader(new StringReader(response), CsvPreference.EXCEL_NORTH_EUROPE_PREFERENCE);
try {
- final String[] header = mapReader.getHeader(true);
+ // read comment
+ mapReader.read("");
+ String[] header = mapReader.getHeader(false);
Map customerMap;
long resultsPerChannel = 2;
long pulse = 0;
@@ -280,6 +389,11 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest {
pulse = ++pulse % resultsPerChannel;
if (pulse == 0) {
++channelCount;
+ if (channelCount <= TEST_CHANNEL_NAMES.length) {
+ // read comment (empty rows are skiped
+ mapReader.read("");
+ header = mapReader.getHeader(false);
+ }
}
}
} finally {
@@ -296,9 +410,8 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest {
startDate,
endDate),
TEST_CHANNEL_NAMES);
- request.setCompression(Compression.NONE);
request.setResponseFormat(ResponseFormat.CSV);
-
+
LinkedHashSet queryFields = new LinkedHashSet<>();
LinkedHashSet cellProcessors = new LinkedHashSet<>();
queryFields.add(QueryField.channel);
@@ -336,10 +449,14 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest {
CellProcessor[] processors = cellProcessors.toArray(new CellProcessor[cellProcessors.size()]);
String response = result.getResponse().getContentAsString();
+ System.out.println("Response: " + response);
+
ICsvMapReader mapReader =
new CsvMapReader(new StringReader(response), CsvPreference.EXCEL_NORTH_EUROPE_PREFERENCE);
try {
- final String[] header = mapReader.getHeader(true);
+ // read comment
+ mapReader.read("");
+ String[] header = mapReader.getHeader(false);
Map customerMap;
long resultsPerChannel = 2;
long pulse = 0;
@@ -358,6 +475,11 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest {
pulse = ++pulse % resultsPerChannel;
if (pulse == 0) {
++channelCount;
+ if (channelCount <= TEST_CHANNEL_NAMES.length) {
+ // read comment (empty rows are skiped
+ mapReader.read("");
+ header = mapReader.getHeader(false);
+ }
}
}
} finally {
@@ -371,11 +493,9 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest {
new RequestRangePulseId(
0,
1),
- false,
Ordering.asc,
AggregationType.extrema,
TEST_CHANNEL_NAMES[0]);
- request.setCompression(Compression.NONE);
request.setResponseFormat(ResponseFormat.CSV);
String content = mapper.writeValueAsString(request);
@@ -403,11 +523,9 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest {
new RequestRangePulseId(
0,
1),
- false,
Ordering.asc,
AggregationType.index,
TEST_CHANNEL_NAMES[0]);
- request.setCompression(Compression.NONE);
request.setResponseFormat(ResponseFormat.CSV);
String content = mapper.writeValueAsString(request);
@@ -441,7 +559,6 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest {
endDate),
TEST_CHANNEL_01);
request.setNrOfBins(2);
- request.setCompression(Compression.NONE);
request.setResponseFormat(ResponseFormat.CSV);
LinkedHashSet queryFields = new LinkedHashSet<>();
@@ -473,7 +590,6 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest {
cellProcessors.add(new NotNull());
request.setAggregations(aggregations);
-
String content = mapper.writeValueAsString(request);
System.out.println(content);
@@ -489,10 +605,14 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest {
CellProcessor[] processors = cellProcessors.toArray(new CellProcessor[cellProcessors.size()]);
String response = result.getResponse().getContentAsString();
+ System.out.println("Response: " + response);
+
ICsvMapReader mapReader =
new CsvMapReader(new StringReader(response), CsvPreference.EXCEL_NORTH_EUROPE_PREFERENCE);
try {
- final String[] header = mapReader.getHeader(true);
+ // read comment
+ mapReader.read("");
+ String[] header = mapReader.getHeader(false);
Map customerMap;
long pulse = 0;
while ((customerMap = mapReader.read(header, processors)) != null) {
@@ -527,7 +647,6 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest {
endDate),
TEST_CHANNEL_01);
request.setBinSize(100);
- request.setCompression(Compression.NONE);
request.setResponseFormat(ResponseFormat.CSV);
LinkedHashSet queryFields = new LinkedHashSet<>();
@@ -575,10 +694,14 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest {
CellProcessor[] processors = cellProcessors.toArray(new CellProcessor[cellProcessors.size()]);
String response = result.getResponse().getContentAsString();
+ System.out.println("Response: " + response);
+
ICsvMapReader mapReader =
new CsvMapReader(new StringReader(response), CsvPreference.EXCEL_NORTH_EUROPE_PREFERENCE);
try {
- final String[] header = mapReader.getHeader(true);
+ // read comment
+ mapReader.read("");
+ String[] header = mapReader.getHeader(false);
Map customerMap;
long pulse = 0;
while ((customerMap = mapReader.read(header, processors)) != null) {
@@ -600,7 +723,7 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest {
mapReader.close();
}
}
-
+
@Test
public void testGzipFileSuffixHeader() throws Exception {
DAQQuery request = new DAQQuery(
@@ -623,16 +746,15 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest {
.andExpect(MockMvcResultMatchers.status().isOk())
.andExpect(MockMvcResultMatchers.header().string("Content-Disposition", "attachment; filename=data.gz"));
}
-
+
@Test
- public void testJsonFileSuffixHeader() throws Exception {
+ public void testCvsFileSuffixHeader() throws Exception {
DAQQuery request = new DAQQuery(
new RequestRangePulseId(
10,
11),
TEST_CHANNEL_NAMES);
request.setResponseFormat(ResponseFormat.CSV);
- request.setCompression(Compression.NONE);
String content = mapper.writeValueAsString(request);
diff --git a/src/test/java/ch/psi/daq/test/queryrest/controller/QueryRestControllerJsonTest.java b/src/test/java/ch/psi/daq/test/queryrest/controller/QueryRestControllerJsonTest.java
index 051fb3f..b7f1581 100644
--- a/src/test/java/ch/psi/daq/test/queryrest/controller/QueryRestControllerJsonTest.java
+++ b/src/test/java/ch/psi/daq/test/queryrest/controller/QueryRestControllerJsonTest.java
@@ -15,9 +15,14 @@ import ch.psi.daq.cassandra.request.range.RequestRangePulseId;
import ch.psi.daq.cassandra.request.range.RequestRangeTime;
import ch.psi.daq.cassandra.util.test.CassandraDataGen;
import ch.psi.daq.common.ordering.Ordering;
+import ch.psi.daq.domain.reader.Backend;
import ch.psi.daq.query.model.AggregationType;
import ch.psi.daq.query.model.Compression;
+import ch.psi.daq.query.model.impl.DAQQueries;
import ch.psi.daq.query.model.impl.DAQQuery;
+import ch.psi.daq.query.model.impl.DAQQueryElement;
+import ch.psi.daq.query.request.ChannelName;
+import ch.psi.daq.query.request.ChannelsRequest;
import ch.psi.daq.queryrest.controller.QueryRestController;
import ch.psi.daq.queryrest.filter.CorsFilter;
import ch.psi.daq.test.cassandra.admin.CassandraTestAdmin;
@@ -47,17 +52,31 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest {
@Test
public void testChannelNameQuery() throws Exception {
- this.mockMvc.perform(
- MockMvcRequestBuilders
- .get(QueryRestController.CHANNELS)
- .contentType(MediaType.APPLICATION_JSON))
+ this.mockMvc
+ .perform(
+ MockMvcRequestBuilders
+ .get(QueryRestController.CHANNELS)
+ .contentType(MediaType.APPLICATION_JSON))
.andDo(MockMvcResultHandlers.print())
.andExpect(MockMvcResultMatchers.status().isOk())
.andExpect(MockMvcResultMatchers.jsonPath("$").isArray())
.andExpect(MockMvcResultMatchers.jsonPath("$[0]").exists())
- .andExpect(MockMvcResultMatchers.jsonPath("$[0]").value("BoolScalar"))
- .andExpect(MockMvcResultMatchers.jsonPath("$[1]").exists())
- .andExpect(MockMvcResultMatchers.jsonPath("$[1]").value("BoolWaveform"));
+ .andExpect(MockMvcResultMatchers.jsonPath("$[0].backend").value(Backend.databuffer.name()))
+ .andExpect(MockMvcResultMatchers.jsonPath("$[0].channels").isArray())
+ .andExpect(MockMvcResultMatchers.jsonPath("$[0].channels[0]").exists())
+ .andExpect(MockMvcResultMatchers.jsonPath("$[0].channels[0]").value("BoolScalar"))
+ .andExpect(MockMvcResultMatchers.jsonPath("$[0].channels[1]").exists())
+ .andExpect(MockMvcResultMatchers.jsonPath("$[0].channels[1]").value("BoolWaveform"))
+// .andExpect(MockMvcResultMatchers.jsonPath("$[1]").exists())
+// .andExpect(MockMvcResultMatchers.jsonPath("$[1].backend").value(Backend.archiverappliance.name()))
+// .andExpect(MockMvcResultMatchers.jsonPath("$[1].channels").isArray())
+// .andExpect(MockMvcResultMatchers.jsonPath("$[1].channels[0]").exists())
+// .andExpect(
+// MockMvcResultMatchers.jsonPath("$[1].channels[0]").value(DummyArchiverApplianceReader.TEST_CHANNEL_1))
+// .andExpect(MockMvcResultMatchers.jsonPath("$[1].channels[1]").exists())
+// .andExpect(
+// MockMvcResultMatchers.jsonPath("$[1].channels[1]").value(DummyArchiverApplianceReader.TEST_CHANNEL_2))
+ ;
}
@@ -71,14 +90,104 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest {
.andExpect(MockMvcResultMatchers.status().isOk())
.andExpect(MockMvcResultMatchers.jsonPath("$").isArray())
.andExpect(MockMvcResultMatchers.jsonPath("$[0]").exists())
- .andExpect(MockMvcResultMatchers.jsonPath("$[0]").value("Int32Scalar"))
- .andExpect(MockMvcResultMatchers.jsonPath("$[1]").exists())
- .andExpect(MockMvcResultMatchers.jsonPath("$[1]").value("Int32Waveform"))
- .andExpect(MockMvcResultMatchers.jsonPath("$[2]").exists())
- .andExpect(MockMvcResultMatchers.jsonPath("$[2]").value("UInt32Scalar"))
- .andExpect(MockMvcResultMatchers.jsonPath("$[3]").exists())
- .andExpect(MockMvcResultMatchers.jsonPath("$[3]").value("UInt32Waveform"))
- .andExpect(MockMvcResultMatchers.jsonPath("$[4]").doesNotExist());
+ .andExpect(MockMvcResultMatchers.jsonPath("$[0].backend").value(Backend.databuffer.name()))
+ .andExpect(MockMvcResultMatchers.jsonPath("$[0].channels").isArray())
+ .andExpect(MockMvcResultMatchers.jsonPath("$[0].channels[0]").exists())
+ .andExpect(MockMvcResultMatchers.jsonPath("$[0].channels[0]").value("Int32Scalar"))
+ .andExpect(MockMvcResultMatchers.jsonPath("$[0].channels[1]").exists())
+ .andExpect(MockMvcResultMatchers.jsonPath("$[0].channels[1]").value("Int32Waveform"))
+ .andExpect(MockMvcResultMatchers.jsonPath("$[0].channels[2]").exists())
+ .andExpect(MockMvcResultMatchers.jsonPath("$[0].channels[2]").value("UInt32Scalar"))
+ .andExpect(MockMvcResultMatchers.jsonPath("$[0].channels[3]").exists())
+ .andExpect(MockMvcResultMatchers.jsonPath("$[0].channels[3]").value("UInt32Waveform"))
+// .andExpect(MockMvcResultMatchers.jsonPath("$[1]").exists())
+// .andExpect(MockMvcResultMatchers.jsonPath("$[1].backend").value(Backend.archiverappliance.name()))
+// .andExpect(MockMvcResultMatchers.jsonPath("$[1].channels").isArray())
+// .andExpect(MockMvcResultMatchers.jsonPath("$[1].channels[0]").doesNotExist())
+ ;
+ }
+
+ @Test
+ public void testChannelNameQueryBackendOrder() throws Exception {
+ ChannelsRequest request = new ChannelsRequest("int64", Ordering.desc, Backend.databuffer);
+
+ String content = mapper.writeValueAsString(request);
+ System.out.println(content);
+
+ this.mockMvc
+ .perform(MockMvcRequestBuilders
+ .post(QueryRestController.CHANNELS)
+ .contentType(MediaType.APPLICATION_JSON)
+ .content(content))
+
+ .andExpect(MockMvcResultMatchers.status().isOk())
+ .andExpect(MockMvcResultMatchers.jsonPath("$").isArray())
+ .andExpect(MockMvcResultMatchers.jsonPath("$[0]").exists())
+ .andExpect(MockMvcResultMatchers.jsonPath("$[0].backend").value(Backend.databuffer.name()))
+ .andExpect(MockMvcResultMatchers.jsonPath("$[0].channels").isArray())
+ .andExpect(MockMvcResultMatchers.jsonPath("$[0].channels[0]").exists())
+ .andExpect(MockMvcResultMatchers.jsonPath("$[0].channels[0]").value("UInt64Waveform"))
+ .andExpect(MockMvcResultMatchers.jsonPath("$[0].channels[1]").exists())
+ .andExpect(MockMvcResultMatchers.jsonPath("$[0].channels[1]").value("UInt64Scalar"))
+ .andExpect(MockMvcResultMatchers.jsonPath("$[0].channels[2]").exists())
+ .andExpect(MockMvcResultMatchers.jsonPath("$[0].channels[2]").value("Int64Waveform"))
+ .andExpect(MockMvcResultMatchers.jsonPath("$[0].channels[3]").exists())
+ .andExpect(MockMvcResultMatchers.jsonPath("$[0].channels[3]").value("Int64Scalar"))
+ .andExpect(MockMvcResultMatchers.jsonPath("$[1]").doesNotExist());
+ }
+
+ @Test
+ public void testChannelNameQueryReload() throws Exception {
+ ChannelsRequest request = new ChannelsRequest();
+
+ String content = mapper.writeValueAsString(request);
+ System.out.println(content);
+
+ this.mockMvc
+ .perform(MockMvcRequestBuilders
+ .post(QueryRestController.CHANNELS)
+ .contentType(MediaType.APPLICATION_JSON)
+ .content(content))
+
+ .andExpect(MockMvcResultMatchers.status().isOk())
+ .andExpect(MockMvcResultMatchers.jsonPath("$").isArray())
+ .andExpect(MockMvcResultMatchers.jsonPath("$[0]").exists())
+ .andExpect(MockMvcResultMatchers.jsonPath("$[0].backend").value(Backend.databuffer.name()))
+ .andExpect(MockMvcResultMatchers.jsonPath("$[0].channels").isArray())
+ .andExpect(MockMvcResultMatchers.jsonPath("$[0].channels[23]").exists())
+ .andExpect(MockMvcResultMatchers.jsonPath("$[0].channels[24]").doesNotExist())
+// .andExpect(MockMvcResultMatchers.jsonPath("$[1]").exists())
+// .andExpect(MockMvcResultMatchers.jsonPath("$[1].backend").value(Backend.archiverappliance.name()))
+// .andExpect(MockMvcResultMatchers.jsonPath("$[1].channels").isArray())
+// .andExpect(MockMvcResultMatchers.jsonPath("$[1].channels[2]").exists())
+// .andExpect(MockMvcResultMatchers.jsonPath("$[1].channels[3]").doesNotExist())
+ ;
+
+ // each reload add another channel
+ request.setReload(true);
+
+ content = mapper.writeValueAsString(request);
+ System.out.println(content);
+
+ this.mockMvc
+ .perform(MockMvcRequestBuilders
+ .post(QueryRestController.CHANNELS)
+ .contentType(MediaType.APPLICATION_JSON)
+ .content(content))
+
+ .andExpect(MockMvcResultMatchers.status().isOk())
+ .andExpect(MockMvcResultMatchers.jsonPath("$").isArray())
+ .andExpect(MockMvcResultMatchers.jsonPath("$[0]").exists())
+ .andExpect(MockMvcResultMatchers.jsonPath("$[0].backend").value(Backend.databuffer.name()))
+ .andExpect(MockMvcResultMatchers.jsonPath("$[0].channels").isArray())
+ .andExpect(MockMvcResultMatchers.jsonPath("$[0].channels[24]").exists())
+ .andExpect(MockMvcResultMatchers.jsonPath("$[0].channels[25]").doesNotExist())
+// .andExpect(MockMvcResultMatchers.jsonPath("$[1]").exists())
+// .andExpect(MockMvcResultMatchers.jsonPath("$[1].backend").value(Backend.archiverappliance.name()))
+// .andExpect(MockMvcResultMatchers.jsonPath("$[1].channels").isArray())
+// .andExpect(MockMvcResultMatchers.jsonPath("$[1].channels[3]").exists())
+// .andExpect(MockMvcResultMatchers.jsonPath("$[1].channels[4]").doesNotExist())
+ ;
}
@Test
@@ -166,7 +275,6 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest {
"Origin, Authorization, Accept, Content-Type"));
}
-
@Test
public void testPulseRangeQuery() throws Exception {
DAQQuery request = new DAQQuery(
@@ -174,7 +282,6 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest {
10,
11),
TEST_CHANNEL_NAMES);
- request.setCompression(Compression.NONE);
String content = mapper.writeValueAsString(request);
System.out.println(content);
@@ -189,19 +296,110 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest {
.andExpect(MockMvcResultMatchers.status().isOk())
.andExpect(MockMvcResultMatchers.jsonPath("$").isArray())
.andExpect(MockMvcResultMatchers.jsonPath("$[0]").exists())
- .andExpect(MockMvcResultMatchers.jsonPath("$[0].channel").value(TEST_CHANNEL_01))
+ .andExpect(MockMvcResultMatchers.jsonPath("$[0].channel.name").value(TEST_CHANNEL_01))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].pulseId").value(10))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].globalMillis").value(100))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].pulseId").value(11))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].globalMillis").value(110))
.andExpect(MockMvcResultMatchers.jsonPath("$[1]").exists())
- .andExpect(MockMvcResultMatchers.jsonPath("$[1].channel").value(TEST_CHANNEL_02))
+ .andExpect(MockMvcResultMatchers.jsonPath("$[1].channel.name").value(TEST_CHANNEL_02))
.andExpect(MockMvcResultMatchers.jsonPath("$[1].data").isArray())
.andExpect(MockMvcResultMatchers.jsonPath("$[1].data[0].pulseId").value(10))
.andExpect(MockMvcResultMatchers.jsonPath("$[1].data[0].globalMillis").value(100))
.andExpect(MockMvcResultMatchers.jsonPath("$[1].data[1].pulseId").value(11))
.andExpect(MockMvcResultMatchers.jsonPath("$[1].data[1].globalMillis").value(110));
}
+
+ @Test
+ public void testPulseRangeQueryBackends() throws Exception {
+ DAQQuery request = new DAQQuery(
+ new RequestRangePulseId(
+ 10,
+ 11),
+ new ChannelName(TEST_CHANNEL_01, Backend.databuffer),
+ new ChannelName(TEST_CHANNEL_02, Backend.archiverappliance));
+
+ String content = mapper.writeValueAsString(request);
+ System.out.println(content);
+
+ this.mockMvc
+ .perform(MockMvcRequestBuilders
+ .post(QueryRestController.QUERY)
+ .contentType(MediaType.APPLICATION_JSON)
+ .content(content))
+
+ .andDo(MockMvcResultHandlers.print())
+ .andExpect(MockMvcResultMatchers.status().isOk())
+ .andExpect(MockMvcResultMatchers.jsonPath("$").isArray())
+ .andExpect(MockMvcResultMatchers.jsonPath("$[0]").exists())
+ .andExpect(MockMvcResultMatchers.jsonPath("$[0].channel").isMap())
+ .andExpect(MockMvcResultMatchers.jsonPath("$[0].channel.name").value(TEST_CHANNEL_01))
+ .andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].pulseId").value(10))
+ .andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].globalMillis").value(100))
+ .andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].pulseId").value(11))
+ .andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].globalMillis").value(110))
+// .andExpect(MockMvcResultMatchers.jsonPath("$[1]").exists())
+// .andExpect(MockMvcResultMatchers.jsonPath("$[1].channel").isMap())
+// .andExpect(MockMvcResultMatchers.jsonPath("$[1].channel.name").value(TEST_CHANNEL_02))
+// .andExpect(MockMvcResultMatchers.jsonPath("$[1].data").isArray())
+// .andExpect(MockMvcResultMatchers.jsonPath("$[1].data[0].pulseId").value(10))
+// .andExpect(MockMvcResultMatchers.jsonPath("$[1].data[0].globalMillis").value(100))
+// .andExpect(MockMvcResultMatchers.jsonPath("$[1].data[1].pulseId").value(11))
+// .andExpect(MockMvcResultMatchers.jsonPath("$[1].data[1].globalMillis").value(110))
+ ;
+ }
+
+ @Test
+ public void testPulseRangeQueries() throws Exception {
+ String testChannel3 = "testChannel3";
+ DAQQueries request = new DAQQueries(
+ new DAQQueryElement(
+ new RequestRangePulseId(
+ 10,
+ 11),
+ TEST_CHANNEL_NAMES),
+ new DAQQueryElement(
+ new RequestRangePulseId(
+ 10,
+ 11),
+ testChannel3));
+
+ String content = mapper.writeValueAsString(request);
+ System.out.println(content);
+
+ this.mockMvc
+ .perform(MockMvcRequestBuilders
+ .post(QueryRestController.QUERIES)
+ .contentType(MediaType.APPLICATION_JSON)
+ .content(content))
+
+ .andDo(MockMvcResultHandlers.print())
+ .andExpect(MockMvcResultMatchers.status().isOk())
+ .andExpect(MockMvcResultMatchers.jsonPath("$").isArray())
+ .andExpect(MockMvcResultMatchers.jsonPath("$[0]").exists())
+ .andExpect(MockMvcResultMatchers.jsonPath("$[0]").isArray())
+ .andExpect(MockMvcResultMatchers.jsonPath("$[0][0]").exists())
+ .andExpect(MockMvcResultMatchers.jsonPath("$[0][0].channel.name").value(TEST_CHANNEL_01))
+ .andExpect(MockMvcResultMatchers.jsonPath("$[0][0].data[0].pulseId").value(10))
+ .andExpect(MockMvcResultMatchers.jsonPath("$[0][0].data[0].globalMillis").value(100))
+ .andExpect(MockMvcResultMatchers.jsonPath("$[0][0].data[1].pulseId").value(11))
+ .andExpect(MockMvcResultMatchers.jsonPath("$[0][0].data[1].globalMillis").value(110))
+ .andExpect(MockMvcResultMatchers.jsonPath("$[0][1]").exists())
+ .andExpect(MockMvcResultMatchers.jsonPath("$[0][1].channel.name").value(TEST_CHANNEL_02))
+ .andExpect(MockMvcResultMatchers.jsonPath("$[0][1].data").isArray())
+ .andExpect(MockMvcResultMatchers.jsonPath("$[0][1].data[0].pulseId").value(10))
+ .andExpect(MockMvcResultMatchers.jsonPath("$[0][1].data[0].globalMillis").value(100))
+ .andExpect(MockMvcResultMatchers.jsonPath("$[0][1].data[1].pulseId").value(11))
+ .andExpect(MockMvcResultMatchers.jsonPath("$[0][1].data[1].globalMillis").value(110))
+ .andExpect(MockMvcResultMatchers.jsonPath("$[1]").exists())
+ .andExpect(MockMvcResultMatchers.jsonPath("$[1]").isArray())
+ .andExpect(MockMvcResultMatchers.jsonPath("$[1][0]").exists())
+ .andExpect(MockMvcResultMatchers.jsonPath("$[1][0].channel.name").value(testChannel3))
+ .andExpect(MockMvcResultMatchers.jsonPath("$[1][0].data[0].pulseId").value(10))
+ .andExpect(MockMvcResultMatchers.jsonPath("$[1][0].data[0].globalMillis").value(100))
+ .andExpect(MockMvcResultMatchers.jsonPath("$[1][0].data[1].pulseId").value(11))
+ .andExpect(MockMvcResultMatchers.jsonPath("$[1][0].data[1].globalMillis").value(110));
+ }
@Test
public void testTimeRangeQuery() throws Exception {
@@ -210,7 +408,6 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest {
100,
110),
TEST_CHANNEL_NAMES);
- request.setCompression(Compression.NONE);
String content = mapper.writeValueAsString(request);
@@ -223,14 +420,16 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest {
.andExpect(MockMvcResultMatchers.status().isOk())
.andExpect(MockMvcResultMatchers.jsonPath("$").isArray())
.andExpect(MockMvcResultMatchers.jsonPath("$[0]").exists())
- .andExpect(MockMvcResultMatchers.jsonPath("$[0].channel").value(TEST_CHANNEL_01))
+ .andExpect(MockMvcResultMatchers.jsonPath("$[0].channel").isMap())
+ .andExpect(MockMvcResultMatchers.jsonPath("$[0].channel.name").value(TEST_CHANNEL_01))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data").isArray())
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].pulseId").value(10))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].globalMillis").value(100))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].pulseId").value(11))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].globalMillis").value(110))
.andExpect(MockMvcResultMatchers.jsonPath("$[1]").exists())
- .andExpect(MockMvcResultMatchers.jsonPath("$[1].channel").value(TEST_CHANNEL_02))
+ .andExpect(MockMvcResultMatchers.jsonPath("$[1].channel").isMap())
+ .andExpect(MockMvcResultMatchers.jsonPath("$[1].channel.name").value(TEST_CHANNEL_02))
.andExpect(MockMvcResultMatchers.jsonPath("$[1].data").isArray())
.andExpect(MockMvcResultMatchers.jsonPath("$[1].data[0].pulseId").value(10))
.andExpect(MockMvcResultMatchers.jsonPath("$[1].data[0].globalMillis").value(100))
@@ -247,7 +446,6 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest {
startDate,
endDate),
TEST_CHANNEL_NAMES);
- request.setCompression(Compression.NONE);
String content = mapper.writeValueAsString(request);
System.out.println(content);
@@ -263,14 +461,16 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest {
.andExpect(MockMvcResultMatchers.status().isOk())
.andExpect(MockMvcResultMatchers.jsonPath("$").isArray())
.andExpect(MockMvcResultMatchers.jsonPath("$[0]").exists())
- .andExpect(MockMvcResultMatchers.jsonPath("$[0].channel").value(TEST_CHANNEL_01))
+ .andExpect(MockMvcResultMatchers.jsonPath("$[0].channel.name").value(TEST_CHANNEL_01))
+ .andExpect(MockMvcResultMatchers.jsonPath("$[0].channel.backend").value("databuffer"))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data").isArray())
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].pulseId").value(10))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].globalMillis").value(100))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].pulseId").value(11))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].globalMillis").value(110))
.andExpect(MockMvcResultMatchers.jsonPath("$[1]").exists())
- .andExpect(MockMvcResultMatchers.jsonPath("$[1].channel").value(TEST_CHANNEL_02))
+ .andExpect(MockMvcResultMatchers.jsonPath("$[1].channel.name").value(TEST_CHANNEL_02))
+ .andExpect(MockMvcResultMatchers.jsonPath("$[1].channel.backend").value("databuffer"))
.andExpect(MockMvcResultMatchers.jsonPath("$[1].data").isArray())
.andExpect(MockMvcResultMatchers.jsonPath("$[1].data[0].pulseId").value(10))
.andExpect(MockMvcResultMatchers.jsonPath("$[1].data[0].globalMillis").value(100))
@@ -284,11 +484,9 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest {
new RequestRangePulseId(
100,
101),
- false,
Ordering.asc,
AggregationType.extrema,
TEST_CHANNEL_NAMES[0]);
- request.setCompression(Compression.NONE);
String content = mapper.writeValueAsString(request);
@@ -301,8 +499,8 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest {
.andExpect(MockMvcResultMatchers.status().isOk())
.andExpect(MockMvcResultMatchers.jsonPath("$").exists())
- .andExpect(MockMvcResultMatchers.jsonPath("$[0].channel").exists())
- .andExpect(MockMvcResultMatchers.jsonPath("$[0].channel").value(TEST_CHANNEL_NAMES[0]))
+ .andExpect(MockMvcResultMatchers.jsonPath("$[0].channel").isMap())
+ .andExpect(MockMvcResultMatchers.jsonPath("$[0].channel.name").value(TEST_CHANNEL_NAMES[0]))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data").exists())
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data.minima").exists())
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data.minima.min").exists())
@@ -332,7 +530,6 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest {
endDate),
TEST_CHANNEL_01);
request.setNrOfBins(2);
- request.setCompression(Compression.NONE);
String content = mapper.writeValueAsString(request);
System.out.println(content);
@@ -348,7 +545,9 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest {
.andExpect(MockMvcResultMatchers.status().isOk())
.andExpect(MockMvcResultMatchers.jsonPath("$").isArray())
.andExpect(MockMvcResultMatchers.jsonPath("$[0]").exists())
- .andExpect(MockMvcResultMatchers.jsonPath("$[0].channel").value(TEST_CHANNEL_01))
+ .andExpect(MockMvcResultMatchers.jsonPath("$[0].channel").isMap())
+ .andExpect(MockMvcResultMatchers.jsonPath("$[0].channel.name").value(TEST_CHANNEL_01))
+ .andExpect(MockMvcResultMatchers.jsonPath("$[0].channel.backend").value(Backend.databuffer.name()))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data").isArray())
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].pulseId").value(10))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].globalMillis").value(100))
@@ -360,17 +559,16 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest {
@Test
public void testDateRangeQueryBinSizeAggregate() throws Exception {
- long startTime = 1000;
- long endTime = 1999;
- String startDate = RequestRangeDate.format(startTime);
- String endDate = RequestRangeDate.format(endTime);
+ long startTime = 1000;
+ long endTime = 1999;
+ String startDate = RequestRangeDate.format(startTime);
+ String endDate = RequestRangeDate.format(endTime);
DAQQuery request = new DAQQuery(
new RequestRangeDate(
startDate,
endDate),
TEST_CHANNEL_01);
request.setBinSize(100);
- request.setCompression(Compression.NONE);
String content = mapper.writeValueAsString(request);
System.out.println(content);
@@ -386,7 +584,9 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest {
.andExpect(MockMvcResultMatchers.status().isOk())
.andExpect(MockMvcResultMatchers.jsonPath("$").isArray())
.andExpect(MockMvcResultMatchers.jsonPath("$[0]").exists())
- .andExpect(MockMvcResultMatchers.jsonPath("$[0].channel").value(TEST_CHANNEL_01))
+ .andExpect(MockMvcResultMatchers.jsonPath("$[0].channel").isMap())
+ .andExpect(MockMvcResultMatchers.jsonPath("$[0].channel.name").value(TEST_CHANNEL_01))
+ .andExpect(MockMvcResultMatchers.jsonPath("$[0].channel.backend").value(Backend.databuffer.name()))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data").isArray())
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].pulseId").value(100))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].globalMillis").value(1000))
@@ -419,9 +619,9 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest {
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[9].globalMillis").value(1900))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[9].eventCount").value(10));
}
-
+
@Test
- public void testGzipCompression() throws Exception {
+ public void testGzipFileSuffixHeader() throws Exception {
DAQQuery request = new DAQQuery(
new RequestRangePulseId(
10,
@@ -432,27 +632,6 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest {
String content = mapper.writeValueAsString(request);
System.out.println(content);
- this.mockMvc
- .perform(MockMvcRequestBuilders
- .post(QueryRestController.QUERY)
- .contentType(MediaType.APPLICATION_JSON)
- .content(content))
-
- .andDo(MockMvcResultHandlers.print())
- .andExpect(MockMvcResultMatchers.status().isOk());
- }
-
- @Test
- public void testGzipFileSuffixHeader() throws Exception {
- DAQQuery request = new DAQQuery(
- new RequestRangePulseId(
- 10,
- 11),
- TEST_CHANNEL_NAMES);
- request.setCompression(Compression.GZIP);
-
- String content = mapper.writeValueAsString(request);
-
this.mockMvc
.perform(MockMvcRequestBuilders
.post(QueryRestController.QUERY)
@@ -463,7 +642,7 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest {
.andExpect(MockMvcResultMatchers.status().isOk())
.andExpect(MockMvcResultMatchers.header().string("Content-Disposition", "attachment; filename=data.gz"));
}
-
+
@Test
public void testJsonFileSuffixHeader() throws Exception {
DAQQuery request = new DAQQuery(
@@ -471,7 +650,6 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest {
10,
11),
TEST_CHANNEL_NAMES);
- request.setCompression(Compression.NONE);
String content = mapper.writeValueAsString(request);
@@ -485,5 +663,4 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest {
.andExpect(MockMvcResultMatchers.status().isOk())
.andExpect(MockMvcResultMatchers.header().string("Content-Disposition", "attachment; filename=data.json"));
}
-
}
diff --git a/src/test/java/ch/psi/daq/test/queryrest/query/DummyDataReader.java b/src/test/java/ch/psi/daq/test/queryrest/query/DummyArchiverApplianceReader.java
similarity index 78%
rename from src/test/java/ch/psi/daq/test/queryrest/query/DummyDataReader.java
rename to src/test/java/ch/psi/daq/test/queryrest/query/DummyArchiverApplianceReader.java
index 4aee732..3a30f90 100644
--- a/src/test/java/ch/psi/daq/test/queryrest/query/DummyDataReader.java
+++ b/src/test/java/ch/psi/daq/test/queryrest/query/DummyArchiverApplianceReader.java
@@ -2,6 +2,7 @@ package ch.psi.daq.test.queryrest.query;
import java.util.List;
import java.util.Random;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Pattern;
import java.util.stream.LongStream;
import java.util.stream.Stream;
@@ -11,20 +12,30 @@ import com.google.common.collect.Lists;
import ch.psi.daq.common.ordering.Ordering;
import ch.psi.daq.domain.DataEvent;
import ch.psi.daq.domain.cassandra.ChannelEvent;
+import ch.psi.daq.domain.reader.Backend;
import ch.psi.daq.domain.reader.DataReader;
-public class DummyDataReader implements DataReader {
+public class DummyArchiverApplianceReader implements DataReader {
+ public static final String ARCHIVER_TEST_CHANNEL = "ArchiverTestChannel_";
- public static final String TEST_CHANNEL_1 = "testChannel1";
- public static final String TEST_CHANNEL_2 = "testChannel2";
- public static final List TEST_CHANNEL_NAMES = Lists.newArrayList(TEST_CHANNEL_1, TEST_CHANNEL_2);
+ public static final String TEST_CHANNEL_1 = "ArchiverChannel_1";
+ public static final String TEST_CHANNEL_2 = "ArchiverChannel_2";
+ private List channels = Lists.newArrayList(TEST_CHANNEL_1, TEST_CHANNEL_2);
private final Random random = new Random(0);
+ private AtomicLong channelNameCallCounter = new AtomicLong();
+
+
+ @Override
+ public Backend getBackend() {
+ return Backend.archiverappliance;
+ }
@Override
public Stream getChannelStream(String regex) {
- Stream channelStream = TEST_CHANNEL_NAMES.stream();
+ channels.add(ARCHIVER_TEST_CHANNEL + channelNameCallCounter.incrementAndGet());
+ Stream channelStream = channels.stream();
if (regex != null) {
Pattern pattern = Pattern.compile(regex);
channelStream = channelStream.filter(channel -> pattern.matcher(channel).find());
diff --git a/src/test/java/ch/psi/daq/test/queryrest/query/DummyCassandraReader.java b/src/test/java/ch/psi/daq/test/queryrest/query/DummyCassandraReader.java
index 769e035..8d033c6 100644
--- a/src/test/java/ch/psi/daq/test/queryrest/query/DummyCassandraReader.java
+++ b/src/test/java/ch/psi/daq/test/queryrest/query/DummyCassandraReader.java
@@ -7,6 +7,7 @@ import java.util.List;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
@@ -29,23 +30,25 @@ import ch.psi.daq.domain.cassandra.query.TimeRangeQuery;
import ch.psi.daq.domain.cassandra.querying.ChannelEventQuery;
import ch.psi.daq.domain.cassandra.querying.MetaChannelEvent;
import ch.psi.daq.domain.cassandra.querying.EventQuery;
+import ch.psi.daq.domain.reader.Backend;
public class DummyCassandraReader implements CassandraReader {
private static final Logger LOGGER = LoggerFactory.getLogger(DummyCassandraReader.class);
+ public static final String DATABUFFER_TEST_CHANNEL = "DataBufferTestChannel_";
private static final int KEYSPACE = 1;
private CassandraDataGen dataGen;
- private String[] channels;
+ private List channels;
private Random random = new Random(0);
+ private AtomicLong channelNameCallCounter = new AtomicLong();
/**
*
*/
public DummyCassandraReader() {
this.dataGen = new CassandraDataGen();
- this.channels = new String[] {
- "testChannel1",
- "testChannel2",
+
+ this.channels = Lists.newArrayList(
"BoolScalar",
"BoolWaveform",
"Int8Scalar",
@@ -68,7 +71,12 @@ public class DummyCassandraReader implements CassandraReader {
"Float32Waveform",
"Float64Scalar",
"Float64Waveform",
- "StringScalar"};
+ "StringScalar");
+ }
+
+ @Override
+ public Backend getBackend() {
+ return Backend.databuffer;
}
/**
@@ -76,10 +84,12 @@ public class DummyCassandraReader implements CassandraReader {
*/
@Override
public Stream getChannelStream(String regex) {
- Stream channelStream = Stream.of(channels);
+ channels.add(DATABUFFER_TEST_CHANNEL + channelNameCallCounter.incrementAndGet());
+
+ Stream channelStream = channels.stream();
if (regex != null) {
Pattern pattern = Pattern.compile(regex.toLowerCase());
- channelStream = Stream.of(channels).filter(channel -> pattern.matcher(channel.toLowerCase()).find());
+ channelStream = channelStream.filter(channel -> pattern.matcher(channel.toLowerCase()).find());
}
return channelStream;
}