diff --git a/Readme.md b/Readme.md index f5557b3..bb4a15c 100644 --- a/Readme.md +++ b/Readme.md @@ -60,21 +60,47 @@ POST http://:/channels #### Data ```json -{"regex": "TRFCA|TRFCB", "dbMode": "databuffer"} +{"regex": "TRFCA|TRFCB","backends": ["databuffer"],"ordering":"asc","reload":true} ``` ##### Explanation -- **regex**: Reqular expression used to filter channel names (default: no filtering). Filtering is done using JAVA's [Pattern](https://docs.oracle.com/javase/8/docs/api/java/util/regex/Pattern.html), more precisely [Matcher.find()](https://docs.oracle.com/javase/8/docs/api/java/util/regex/Matcher.html#find--)). -- **dbMode**: Defines the database to access (values: **databuffer**|archiverappliance) - +- **regex**: Reqular expression used to filter channel names. In case this value is undefined, no filter will be applied. Filtering is done using JAVA's [Pattern](https://docs.oracle.com/javase/8/docs/api/java/util/regex/Pattern.html), more precisely [Matcher.find()](https://docs.oracle.com/javase/8/docs/api/java/util/regex/Matcher.html#find--)). +- **backends**: Array of backends to access (values: databuffer|archiverappliance|filestorage). In case this value is undefined, all backends will be queried for their channels. +- **ordering**: The ordering of the channel names (values: **none**|asc|desc). +- **reload**: Forces the server to reload cached channel names (values: **false**|true). ### Example +#### Command + ```bash curl -H "Content-Type: application/json" -X POST -d '{"regex": "TRFCA|TRFCB"}' http://data-api.psi.ch/sf/channels ``` +#### Response + +```json +[ + { + "backend":"databuffer", + "channels":[ + "Channel_01", + "Channel_02", + "Channel_03" + ] + }, + { + "backend":"archiverappliance", + "channels":[ + "Channel_01", + "Channel_04", + "Channel_05" + ] + } +] +``` + ## Query Range @@ -127,6 +153,36 @@ Queries are applied to a range. The following types of ranges ranges are support - **endMillis**: The end time of the range. - **[endNanos]**: The optional nanosecond offset. + + +## Query Channel Names + +The simplest way to define channels is to use an array of channel name Strings. + +```json +"channels":[ + "Channel_02", + "Channel_04" +] +``` + +The query interface will automatically select the backend which contains the channel (e.g., *databuffer* for *Channel_02* and *archiverappliance* for *Channel_04*. In case name clashes exist, the query interface will use following order of priority: *databuffer*, *archiverappliance* and *filestorage*. + +It is also possible to explicitly define the backend to overcome name clashes. + +```json +"channels":[ + { + "name":"Channel_01", + "backend":"archiverappliance" + }, + { + "name":"Channel_01", + "backend":"databuffer" + } +] +``` + @@ -144,7 +200,7 @@ A request is performed by sending a valid JSON object in the HTTP request body. The following attributes can be specified: -- **channels**: Array of channel names to be queried. +- **channels**: Array of channels to be queried (see [Query Range](Readme.md#query_channel_names)). - **range**: The range of the query (see [Query Range](Readme.md#query_range)). - **ordering**: The ordering of the data (see [here](https://git.psi.ch/sf_daq/ch.psi.daq.common/blob/master/src/main/java/ch/psi/daq/common/ordering/Ordering.java) for possible values). - **fields**: The requested fields (see [here](https://git.psi.ch/sf_daq/ch.psi.daq.query/blob/master/src/main/java/ch/psi/daq/query/model/QueryField.java) for possible values). @@ -152,8 +208,6 @@ The following attributes can be specified: - **binSize**: Activates data binning. Specifies the number of pulses per bin for pulse-range queries or the number of milliseconds per bin for time-range queries (using number of pulses and number of milliseconds makes this binning strategy consistent between channel with different update frequencies). - **aggregations**: Activates data aggregation. Array of requested aggregations (see [here](https://git.psi.ch/sf_daq/ch.psi.daq.query/blob/master/src/main/java/ch/psi/daq/query/model/Aggregation.java) for possible values). These values will be added to the *data* array response. - **aggregationType**: Specifies the type of aggregation (see [here](https://git.psi.ch/sf_daq/ch.psi.daq.query/blob/master/src/main/java/ch/psi/daq/query/model/AggregationType.java)). The default type is *value* aggregation (e.g., sum([1,2,3])=6). Alternatively, it is possible to define *index* aggregation for multiple arrays in combination with binning (e.g., sum([1,2,3], [3,2,1]) = [4,4,4]). -- **aggregateChannels**: Specifies whether the data of the requested channels should be combined together using the defined aggregation (values: true|**false**) -- **dbMode**: Defines the database to access (values: **databuffer**|archiverappliance) - **compression**: Defines the compression algorithm to use, default value is **none**, see all values [here](https://github.psi.ch/sf_daq/ch.psi.daq.query/blob/master/src/main/java/ch/psi/daq/query/model/Compression.java)) - **responseFormat**: Specifies the format the response of the requested data is in, either in JSON or CSV format, default value **JSON**, see all values [here](https://github.psi.ch/sf_daq/ch.psi.daq.query/blob/master/src/main/java/ch/psi/daq/query/model/ResponseFormat.java)) @@ -165,7 +219,7 @@ If compression is enabled, we have to tell `curl` that the data is compressed so ### `responseFormat`: data is in JSON by default -Responses can be formatted as CSV or JSON using the `responseFormat` field. The returned data is JSON-formatted per default. +Responses can be formatted as CSV or JSON using the `responseFormat` field. The returned data is JSON-formatted by default. CSV export does not support `index` and `extrema` aggregations. @@ -319,7 +373,7 @@ See JSON representation of the data above. ```json { - "dbMode":"archiverappliance", + "backend":"archiverappliance", "range":{ "startMillis":0, "startNanos":0, diff --git a/src/main/java/ch/psi/daq/queryrest/config/QueryRestConfig.java b/src/main/java/ch/psi/daq/queryrest/config/QueryRestConfig.java index 6857fec..511623b 100644 --- a/src/main/java/ch/psi/daq/queryrest/config/QueryRestConfig.java +++ b/src/main/java/ch/psi/daq/queryrest/config/QueryRestConfig.java @@ -27,9 +27,7 @@ import org.springframework.web.servlet.config.annotation.WebMvcConfigurerAdapter import com.fasterxml.jackson.annotation.JsonInclude.Include; import com.fasterxml.jackson.core.JsonFactory; -import com.fasterxml.jackson.core.Version; import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.module.SimpleModule; import ch.psi.daq.common.statistic.StorelessStatistics; import ch.psi.daq.domain.DataEvent; @@ -44,8 +42,6 @@ import ch.psi.daq.queryrest.filter.CorsFilter; import ch.psi.daq.queryrest.model.PropertyFilterMixin; import ch.psi.daq.queryrest.response.csv.CSVResponseStreamWriter; import ch.psi.daq.queryrest.response.json.JSONResponseStreamWriter; -import ch.psi.daq.queryrest.response.json.JsonByteArraySerializer; -import ch.psi.daq.queryrest.response.json.JsonStreamSerializer; @Configuration @PropertySource(value = {"classpath:queryrest.properties"}) @@ -88,16 +84,6 @@ public class QueryRestConfig extends WebMvcConfigurerAdapter { objectMapper.addMixIn(DataEvent.class, PropertyFilterMixin.class); objectMapper.addMixIn(StorelessStatistics.class, PropertyFilterMixin.class); objectMapper.addMixIn(EnumMap.class, PropertyFilterMixin.class); - - // defines how to writer inner Streams (i.e. Stream>> toSerialize) - SimpleModule module = new SimpleModule("Streams API", Version.unknownVersion()); - module.addSerializer(new JsonStreamSerializer()); - objectMapper.registerModule(module); - - // by default, byte[] are written as String - module = new SimpleModule("byte[]", Version.unknownVersion()); - module.addSerializer(new JsonByteArraySerializer()); - objectMapper.registerModule(module); } /** diff --git a/src/main/java/ch/psi/daq/queryrest/controller/QueryRestController.java b/src/main/java/ch/psi/daq/queryrest/controller/QueryRestController.java index db58502..a2df91e 100644 --- a/src/main/java/ch/psi/daq/queryrest/controller/QueryRestController.java +++ b/src/main/java/ch/psi/daq/queryrest/controller/QueryRestController.java @@ -1,17 +1,24 @@ package ch.psi.daq.queryrest.controller; import java.io.IOException; +import java.util.ArrayList; import java.util.Collection; +import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; import java.util.Map.Entry; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; import javax.annotation.Resource; import javax.servlet.http.HttpServletResponse; import javax.validation.Valid; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.commons.lang3.tuple.Triple; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.ApplicationContext; @@ -27,28 +34,34 @@ import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.ResponseBody; import org.springframework.web.bind.annotation.RestController; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Lists; + +import ch.psi.daq.cassandra.config.CassandraConfig; import ch.psi.daq.cassandra.request.validate.RequestProviderValidator; -import ch.psi.daq.common.concurrent.singleton.Deferred; import ch.psi.daq.common.json.deserialize.AttributeBasedDeserializer; import ch.psi.daq.common.ordering.Ordering; import ch.psi.daq.domain.DataEvent; +import ch.psi.daq.domain.reader.Backend; import ch.psi.daq.query.analyzer.QueryAnalyzer; import ch.psi.daq.query.config.QueryConfig; import ch.psi.daq.query.model.Aggregation; import ch.psi.daq.query.model.AggregationType; -import ch.psi.daq.query.model.DBMode; import ch.psi.daq.query.model.Query; import ch.psi.daq.query.model.QueryField; import ch.psi.daq.query.model.ResponseFormat; +import ch.psi.daq.query.model.impl.BackendQuery; +import ch.psi.daq.query.model.impl.DAQQueries; import ch.psi.daq.query.model.impl.DAQQuery; +import ch.psi.daq.query.model.impl.DAQQueryElement; +import ch.psi.daq.query.processor.ChannelNameCache; import ch.psi.daq.query.processor.QueryProcessor; +import ch.psi.daq.query.request.ChannelName; import ch.psi.daq.query.request.ChannelsRequest; +import ch.psi.daq.query.request.ChannelsResponse; import ch.psi.daq.queryrest.response.csv.CSVResponseStreamWriter; import ch.psi.daq.queryrest.response.json.JSONResponseStreamWriter; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.collect.Lists; - @RestController public class QueryRestController { @@ -56,6 +69,7 @@ public class QueryRestController { public static final String CHANNELS = "/channels"; public static final String QUERY = "/query"; + public static final String QUERIES = "/queries"; @Resource private Validator queryValidator; @@ -69,20 +83,44 @@ public class QueryRestController { @Resource private ApplicationContext appContext; - + @Resource private ObjectMapper objectMapper; - // using Deferred ensures that data-api startup succeeds even if DataBuffer/ArchiverAppliance is - // not reachable at startup - private Deferred cassandraQueryProcessor = new Deferred( - () -> appContext.getBean(QueryConfig.BEAN_NAME_CASSANDRA_QUERY_PROCESSOR, QueryProcessor.class)); - private Deferred archiverApplianceQueryProcessor = new Deferred( - () -> appContext.getBean(QueryConfig.BEAN_NAME_ARCHIVER_APPLIANCE_QUERY_PROCESSOR, QueryProcessor.class)); - @Resource private Function queryAnalizerFactory; + private Map queryProcessors = new LinkedHashMap<>(); + private ChannelNameCache channelNameCache; + + @PostConstruct + public void afterPropertiesSet() { + try { + QueryProcessor queryProcessor = + appContext.getBean(QueryConfig.BEAN_NAME_CASSANDRA_QUERY_PROCESSOR, QueryProcessor.class); + queryProcessors.put(queryProcessor.getBackend(), queryProcessor); + } catch (Exception e) { + LOGGER.warn("Could not load query processor for cassandra.", e); + } + + try { + QueryProcessor queryProcessor = + appContext.getBean(QueryConfig.BEAN_NAME_ARCHIVER_APPLIANCE_QUERY_PROCESSOR, QueryProcessor.class); + queryProcessors.put(queryProcessor.getBackend(), queryProcessor); + } catch (Exception e) { + LOGGER.warn("Could not load query processor for archiverappliance.", e); + } + + channelNameCache = + new ChannelNameCache(queryProcessors, appContext.getBean(CassandraConfig.BEAN_NAME_READ_TIMEOUT, + Integer.class).longValue()); + } + + @PreDestroy + public void destroy() { + channelNameCache.destroy(); + } + @InitBinder protected void initBinder(WebDataBinder binder) { if (binder.getTarget() != null) { @@ -97,21 +135,14 @@ public class QueryRestController { @RequestMapping(value = CHANNELS, method = {RequestMethod.GET, RequestMethod.POST}, produces = {MediaType.APPLICATION_JSON_VALUE}) - public @ResponseBody Collection getChannels(@RequestBody(required = false) ChannelsRequest request) + public @ResponseBody List getChannels(@RequestBody(required = false) ChannelsRequest request) throws Throwable { - // in case not specified use default (e.g. GET) + // in case not specified use defaults (e.g. GET) if (request == null) { request = new ChannelsRequest(); } - try { - // sorted collection - Collection allChannels = getQueryProcessor(request.getDbMode()).getChannels(request.getRegex()); - return allChannels; - } catch (Throwable t) { - LOGGER.error("Failed to query channel names.", t); - throw t; - } + return channelNameCache.getChannels(request); } /** @@ -123,46 +154,11 @@ public class QueryRestController { */ @RequestMapping(value = CHANNELS + "/{channelName}", method = {RequestMethod.GET}, produces = {MediaType.APPLICATION_JSON_VALUE}) - public @ResponseBody Collection getChannels(@PathVariable(value = "channelName") String channelName) + public @ResponseBody Collection getChannels(@PathVariable(value = "channelName") String channelName) throws Throwable { return getChannels(new ChannelsRequest(channelName)); } - - /** - * Catch-all query method for getting data from the backend for both JSON and CSV requests. - *

- * The {@link DAQQuery} object will be a concrete subclass based on the combination of fields - * defined in the user's query. The {@link AttributeBasedDeserializer} decides which class to - * deserialize the information into and has been configured (see - * QueryRestConfig#afterPropertiesSet) accordingly. - * - * @param query concrete implementation of {@link DAQQuery} - * @param res the {@link HttpServletResponse} instance associated with this request - * @throws IOException thrown if writing to the output stream fails - */ - @RequestMapping( - value = QUERY, - method = RequestMethod.POST, - consumes = {MediaType.APPLICATION_JSON_VALUE}) - public void executeQuery(@RequestBody @Valid DAQQuery query, HttpServletResponse res) throws Exception { - try { - LOGGER.debug("Executing query '{}'", query.toString()); - - if (ResponseFormat.JSON.equals(query.getResponseFormat())) { - // write the response back to the client using java 8 streams - jsonResponseStreamWriter.respond(executeQuery(query), query, res); - } else { - // it's a CSV request - executeQueryCsv(query, res); - } - - } catch (Exception e) { - LOGGER.error("Failed to execute query '{}'.", query, e); - throw e; - } - } - /** * Accepts the properties for the {@link DAQQuery} instance as stringified JSON query string. * @@ -176,59 +172,154 @@ public class QueryRestController { value = QUERY, method = RequestMethod.GET) public void executeQueryBodyAsString(@RequestParam String jsonBody, HttpServletResponse res) throws Exception { - DAQQuery query = objectMapper.readValue(jsonBody, DAQQuery.class); executeQuery(query, res); - + } + + /** + * Executes a single query. + * + * @param query the {@link DAQQuery} + * @param res the {@link HttpServletResponse} instance associated with this request + * @throws IOException thrown if writing to the output stream fails + */ + @RequestMapping( + value = QUERY, + method = RequestMethod.POST, + consumes = {MediaType.APPLICATION_JSON_VALUE}) + public void executeQuery(@RequestBody @Valid DAQQuery query, HttpServletResponse res) throws Exception { + executeQueries(new DAQQueries(query), res); + } + + /** + * Accepts the properties for the {@link DAQQueries} instance as stringified JSON query string. + * + * @param jsonBody The {@link DAQQueries} properties sent as a JSON string, i.e. this is the + * stringified body of the POST request method + * @param res the current {@link HttpServletResponse} instance + * @throws Exception if reading the JSON string fails or if the subsequent call to + * {@link #executeQuery(DAQQueries, HttpServletResponse)} fails + */ + @RequestMapping( + value = QUERIES, + method = RequestMethod.GET) + public void executeQueriesBodyAsString(@RequestParam String jsonBody, HttpServletResponse res) throws Exception { + DAQQueries queries = objectMapper.readValue(jsonBody, DAQQueries.class); + executeQueries(queries, res); + } + + /** + * Catch-all query method for getting data from the backend for both JSON and CSV requests. + *

+ * The {@link DAQQueries} object will contain the concrete subclass based on the combination of + * fields defined in the user's query. The {@link AttributeBasedDeserializer} decides which class + * to deserialize the information into and has been configured (see + * QueryRestConfig#afterPropertiesSet) accordingly. + * + * @param queries the {@link DAQQueryElement}s + * @param res the {@link HttpServletResponse} instance associated with this request + * @throws IOException thrown if writing to the output stream fails + */ + @RequestMapping( + value = QUERIES, + method = RequestMethod.POST, + consumes = {MediaType.APPLICATION_JSON_VALUE}) + public void executeQueries(@RequestBody @Valid DAQQueries queries, HttpServletResponse res) throws Exception { + try { + LOGGER.debug("Executing queries '{}'", queries); + + if (ResponseFormat.JSON.equals(queries.getResponseFormat())) { + // write the response back to the client using java 8 streams + jsonResponseStreamWriter.respond(executeQueries(queries), queries, res); + } else if (ResponseFormat.CSV.equals(queries.getResponseFormat())) { + // it's a CSV request + executeQueriesCsv(queries, res); + } else { + String message = String.format("Unsupported response format '%s'", queries.getResponseFormat().name()); + LOGGER.error(message); + throw new RuntimeException(message); + } + + } catch (Exception e) { + LOGGER.error("Failed to execute query '{}'.", queries, e); + throw e; + } } /** * Returns data in CSV format to the client. */ - private void executeQueryCsv(DAQQuery query, HttpServletResponse res) throws Exception { - - if (!(query.getAggregationType() == null || AggregationType.value.equals(query.getAggregationType()))) { - // We allow only no aggregation or value aggregation as - // extrema: nested structure and not clear how to map it to one line - // index: value is an array of Statistics whose size is not clear at initialization time - String message = "CSV export does not support '" + query.getAggregationType() + "'"; - LOGGER.warn(message); - throw new IllegalArgumentException(message); + private void executeQueriesCsv(DAQQueries queries, HttpServletResponse res) throws Exception { + + for (DAQQueryElement query : queries) { + if (!(query.getAggregationType() == null || AggregationType.value.equals(query.getAggregationType()))) { + // We allow only no aggregation or value aggregation as + // extrema: nested structure and not clear how to map it to one line + // index: value is an array of Statistics whose size is not clear at initialization time + String message = "CSV export does not support '" + query.getAggregationType() + "'"; + LOGGER.warn(message); + throw new IllegalArgumentException(message); + } } - + try { - LOGGER.debug("Executing query '{}'", query.toString()); + LOGGER.debug("Executing query '{}'", queries); // write the response back to the client using java 8 streams - csvResponseStreamWriter.respond(executeQuery(query), query, res); + csvResponseStreamWriter.respond(executeQueries(queries), queries, res); } catch (Exception e) { - LOGGER.error("Failed to execute query '{}'.", query, e); + LOGGER.error("Failed to execute query '{}'.", queries, e); throw e; } - + } - public Stream> executeQuery(DAQQuery query) { - QueryAnalyzer queryAnalizer = queryAnalizerFactory.apply(query); + public List>>> executeQueries(DAQQueries queries) { + // set backends if not defined yet + channelNameCache.setBackends(queries); - // all the magic happens here - Stream>> channelToDataEvents = - getQueryProcessor(query.getDbMode()).process(queryAnalizer); + List>>> results = + new ArrayList<>(queries.getQueries().size()); - // do post-process - Stream> channelToData = queryAnalizer.postProcess(channelToDataEvents); + for (DAQQueryElement queryElement : queries) { + Stream> resultStreams = + queryElement + .getBackendQueries() + .stream() + .filter(query -> { + QueryProcessor processor = queryProcessors.get(query.getBackend()); + if (processor != null) { + return true; + } else { + LOGGER.warn("There is no QueryProcessor available for '{}'", query.getBackend()); + return false; + } + }) + .flatMap(query -> { + QueryProcessor processor = queryProcessors.get(query.getBackend()); + QueryAnalyzer queryAnalizer = queryAnalizerFactory.apply(query); - return channelToData; - } + // all the magic happens here + Stream>> channelToDataEvents = + processor.process(queryAnalizer); + // do post-process + Stream> channelToData = queryAnalizer.postProcess(channelToDataEvents); - private QueryProcessor getQueryProcessor(DBMode dbMode) { - if (DBMode.databuffer.equals(dbMode)) { - return cassandraQueryProcessor.get(); - } else if (DBMode.archiverappliance.equals(dbMode)) { - return archiverApplianceQueryProcessor.get(); - } else { - LOGGER.error("Unknown DBMode '{}'!", dbMode); - throw new IllegalArgumentException(String.format("Unknown DBMode '%s'", dbMode)); + return channelToData.map(entry -> { + return Triple.of(query, new ChannelName(entry.getKey(), query.getBackend()), + entry.getValue()); + }); + }); + + // Now we have a stream that loads elements sequential BackendQuery by BackendQuery. + // By materializing the outer Stream the elements of all BackendQuery are loaded async + // (speeds things up but requires also more memory - i.e. it relies on Backends not loading + // all elements into memory at once) + resultStreams = resultStreams.collect(Collectors.toList()).stream(); + + results.add(Pair.of(queryElement, resultStreams)); } + + return results; } /** @@ -237,12 +328,8 @@ public class QueryRestController { * @return list of {@link Ordering}s as String array */ @RequestMapping(value = "ordering", method = {RequestMethod.GET}, produces = {MediaType.APPLICATION_JSON_VALUE}) - public @ResponseBody List getOrderingValues() { - List orderings = Lists.newArrayList(Ordering.values()); - return orderings.stream() - .map((Ordering ord) -> { - return ord.toString(); - }).collect(Collectors.toList()); + public @ResponseBody List getOrderingValues() { + return Lists.newArrayList(Ordering.values()); } /** @@ -251,12 +338,8 @@ public class QueryRestController { * @return list of {@link QueryField}s as String array */ @RequestMapping(value = "queryfields", method = {RequestMethod.GET}, produces = {MediaType.APPLICATION_JSON_VALUE}) - public @ResponseBody List getQueryFieldValues() { - List orderings = Lists.newArrayList(QueryField.values()); - return orderings.stream() - .map((QueryField qf) -> { - return qf.toString(); - }).collect(Collectors.toList()); + public @ResponseBody List getQueryFieldValues() { + return Lists.newArrayList(QueryField.values()); } /** @@ -265,12 +348,8 @@ public class QueryRestController { * @return list of {@link Aggregation}s as String array */ @RequestMapping(value = "aggregations", method = {RequestMethod.GET}, produces = {MediaType.APPLICATION_JSON_VALUE}) - public @ResponseBody List getAggregationsValues() { - List orderings = Lists.newArrayList(Aggregation.values()); - return orderings.stream() - .map((Aggregation value) -> { - return value.toString(); - }).collect(Collectors.toList()); + public @ResponseBody List getAggregationsValues() { + return Lists.newArrayList(Aggregation.values()); } /** @@ -280,25 +359,17 @@ public class QueryRestController { */ @RequestMapping(value = "aggregationtypes", method = {RequestMethod.GET}, produces = {MediaType.APPLICATION_JSON_VALUE}) - public @ResponseBody List getAggregationTypesValues() { - List orderings = Lists.newArrayList(AggregationType.values()); - return orderings.stream() - .map((AggregationType value) -> { - return value.toString(); - }).collect(Collectors.toList()); + public @ResponseBody List getAggregationTypesValues() { + return Lists.newArrayList(AggregationType.values()); } /** - * Returns the current list of {@link DBMode} values. + * Returns the current list of {@link Backend} values. * - * @return list of {@link DBMode}s as String array + * @return list of {@link Backend}s as String array */ - @RequestMapping(value = "dbmodes", method = {RequestMethod.GET}, produces = {MediaType.APPLICATION_JSON_VALUE}) - public @ResponseBody List getDBModeValues() { - List orderings = Lists.newArrayList(DBMode.values()); - return orderings.stream() - .map((DBMode value) -> { - return value.toString(); - }).collect(Collectors.toList()); + @RequestMapping(value = "backends", method = {RequestMethod.GET}, produces = {MediaType.APPLICATION_JSON_VALUE}) + public @ResponseBody List getDBModeValues() { + return Lists.newArrayList(Backend.values()); } } diff --git a/src/main/java/ch/psi/daq/queryrest/controller/validator/QueryValidator.java b/src/main/java/ch/psi/daq/queryrest/controller/validator/QueryValidator.java index e1c0767..5d433a4 100644 --- a/src/main/java/ch/psi/daq/queryrest/controller/validator/QueryValidator.java +++ b/src/main/java/ch/psi/daq/queryrest/controller/validator/QueryValidator.java @@ -1,7 +1,7 @@ package ch.psi.daq.queryrest.controller.validator; +import java.util.ArrayList; import java.util.LinkedHashSet; -import java.util.LinkedList; import java.util.Set; import javax.annotation.Resource; @@ -10,10 +10,10 @@ import org.springframework.validation.Errors; import org.springframework.validation.Validator; import ch.psi.daq.query.model.Aggregation; -import ch.psi.daq.query.model.DBMode; import ch.psi.daq.query.model.QueryField; +import ch.psi.daq.query.model.impl.DAQQueries; import ch.psi.daq.query.model.impl.DAQQuery; -import ch.psi.daq.cassandra.request.Request; +import ch.psi.daq.query.model.impl.DAQQueryElement; import ch.psi.daq.queryrest.config.QueryRestConfig; public class QueryValidator implements Validator { @@ -28,8 +28,8 @@ public class QueryValidator implements Validator { * {@inheritDoc} */ @Override - public boolean supports(Class clazz) { - return DAQQuery.class.isAssignableFrom(clazz); + public boolean supports(Class clazz) { + return DAQQuery.class.isAssignableFrom(clazz) || DAQQueries.class.isAssignableFrom(clazz); } /** @@ -37,25 +37,24 @@ public class QueryValidator implements Validator { */ @Override public void validate(Object target, Errors errors) { - - DAQQuery query = (DAQQuery) target; - - Request request = query.getRequest(); - - if (DBMode.archiverappliance.equals(query.getDbMode())) { - if (!request.getRequestRange().isTimeRangeDefined()) { - errors.reject("dbMode", "ArchiverAppliance supports time range queries only!"); + if (target instanceof DAQQuery) { + this.checkElement((DAQQuery) target); + }else if(target instanceof DAQQueries){ + DAQQueries queries = (DAQQueries) target; + for (DAQQueryElement daqQueryElement : queries) { + this.checkElement(daqQueryElement); } } + } + private void checkElement(DAQQueryElement query) { // set default values (if not set) if (query.getFields() == null || query.getFields().isEmpty()) { query.setFields(new LinkedHashSet<>(defaultResponseFields)); } if (query.getAggregations() == null || query.getAggregations().isEmpty()) { - query.setAggregations(new LinkedList<>(defaultResponseAggregations)); + query.setAggregations(new ArrayList<>(defaultResponseAggregations)); } } - } diff --git a/src/main/java/ch/psi/daq/queryrest/response/AbstractResponseStreamWriter.java b/src/main/java/ch/psi/daq/queryrest/response/AbstractResponseStreamWriter.java index 4e45abb..6108127 100644 --- a/src/main/java/ch/psi/daq/queryrest/response/AbstractResponseStreamWriter.java +++ b/src/main/java/ch/psi/daq/queryrest/response/AbstractResponseStreamWriter.java @@ -10,13 +10,8 @@ import javax.servlet.http.HttpServletResponse; import org.springframework.http.MediaType; import ch.psi.daq.query.model.ResponseFormat; -import ch.psi.daq.query.model.impl.DAQQuery; +import ch.psi.daq.query.model.ResponseOptions; - -/** - * @author zellweger_c - * - */ public abstract class AbstractResponseStreamWriter implements ResponseStreamWriter { public static final String CONTENT_TYPE_CSV = "text/csv"; @@ -32,24 +27,24 @@ public abstract class AbstractResponseStreamWriter implements ResponseStreamWrit * see http://tools.ietf.org/html/rfc2616#section-14.11 and * see http://tools.ietf.org/html/rfc2616#section-3.5 * - * @param query The query + * @param options The options for the response * @param response The HttpServletResponse * @param contentType The content type * @return OutputStream The OutputStream * @throws Exception Something goes wrong */ - protected OutputStream handleCompressionAndResponseHeaders(DAQQuery query, HttpServletResponse response, + protected OutputStream handleCompressionAndResponseHeaders(ResponseOptions options, HttpServletResponse response, String contentType) throws Exception { OutputStream out = response.getOutputStream(); response.addHeader("Content-Type", contentType); - if (query.isCompressed()) { - String filename = "data." + query.getCompression().getFileSuffix(); + if (options.isCompressed()) { + String filename = "data." + options.getCompression().getFileSuffix(); response.addHeader("Content-Disposition", "attachment; filename=" + filename); - response.addHeader("Content-Encoding", query.getCompression().toString()); - out = query.getCompression().wrapStream(out); + response.addHeader("Content-Encoding", options.getCompression().toString()); + out = options.getCompression().wrapStream(out); } else { - String filename = "data." + (query.getResponseFormat() == ResponseFormat.CSV ? "csv" : "json"); + String filename = "data." + (options.getResponseFormat() == ResponseFormat.CSV ? "csv" : "json"); response.addHeader("Content-Disposition", "attachment; filename=" + filename); } diff --git a/src/main/java/ch/psi/daq/queryrest/response/ResponseStreamWriter.java b/src/main/java/ch/psi/daq/queryrest/response/ResponseStreamWriter.java index 53d6be3..56c16ac 100644 --- a/src/main/java/ch/psi/daq/queryrest/response/ResponseStreamWriter.java +++ b/src/main/java/ch/psi/daq/queryrest/response/ResponseStreamWriter.java @@ -1,13 +1,18 @@ package ch.psi.daq.queryrest.response; -import java.io.IOException; +import java.util.List; import java.util.Map.Entry; import java.util.stream.Stream; import javax.servlet.ServletResponse; import javax.servlet.http.HttpServletResponse; -import ch.psi.daq.query.model.impl.DAQQuery; +import org.apache.commons.lang3.tuple.Triple; + +import ch.psi.daq.query.model.ResponseOptions; +import ch.psi.daq.query.model.impl.BackendQuery; +import ch.psi.daq.query.model.impl.DAQQueryElement; +import ch.psi.daq.query.request.ChannelName; public interface ResponseStreamWriter { @@ -15,10 +20,11 @@ public interface ResponseStreamWriter { * Responding with the the contents of the stream by writing into the output stream of the * {@link ServletResponse}. * - * @param stream Mapping from channel name to data - * @param query concrete instance of {@link DAQQuery} + * @param stream The results results + * @param options The options for the response * @param response {@link ServletResponse} instance given by the current HTTP request - * @throws IOException thrown if writing to the output stream fails + * @throws Exception thrown if writing to the output stream fails */ - public void respond(Stream> stream, DAQQuery query, HttpServletResponse response) throws Exception; + public void respond(List>>> results, ResponseOptions options, + HttpServletResponse response) throws Exception; } diff --git a/src/main/java/ch/psi/daq/queryrest/response/csv/CSVResponseStreamWriter.java b/src/main/java/ch/psi/daq/queryrest/response/csv/CSVResponseStreamWriter.java index 7d762b4..aa764eb 100644 --- a/src/main/java/ch/psi/daq/queryrest/response/csv/CSVResponseStreamWriter.java +++ b/src/main/java/ch/psi/daq/queryrest/response/csv/CSVResponseStreamWriter.java @@ -16,6 +16,7 @@ import javax.annotation.Resource; import javax.servlet.ServletResponse; import javax.servlet.http.HttpServletResponse; +import org.apache.commons.lang3.tuple.Triple; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.supercsv.cellprocessor.constraint.NotNull; @@ -24,16 +25,19 @@ import org.supercsv.io.dozer.CsvDozerBeanWriter; import org.supercsv.io.dozer.ICsvDozerBeanWriter; import org.supercsv.prefs.CsvPreference; +import com.fasterxml.jackson.core.JsonEncoding; + import ch.psi.daq.domain.DataEvent; import ch.psi.daq.query.analyzer.QueryAnalyzer; import ch.psi.daq.query.model.Aggregation; import ch.psi.daq.query.model.Query; import ch.psi.daq.query.model.QueryField; -import ch.psi.daq.query.model.impl.DAQQuery; +import ch.psi.daq.query.model.ResponseOptions; +import ch.psi.daq.query.model.impl.BackendQuery; +import ch.psi.daq.query.model.impl.DAQQueryElement; +import ch.psi.daq.query.request.ChannelName; import ch.psi.daq.queryrest.response.AbstractResponseStreamWriter; -import com.fasterxml.jackson.core.JsonEncoding; - /** * Takes a Java 8 stream and writes it to the output stream provided by the {@link ServletResponse} * of the current request. @@ -44,53 +48,103 @@ public class CSVResponseStreamWriter extends AbstractResponseStreamWriter { private static final char DELIMITER_ARRAY = ','; private static final Logger LOGGER = LoggerFactory.getLogger(CSVResponseStreamWriter.class); - + @Resource private Function queryAnalizerFactory; @Override - public void respond(Stream> stream, DAQQuery query, HttpServletResponse response) throws Exception { + public void respond(List>>> results, + ResponseOptions options, + HttpServletResponse response) throws Exception { response.setCharacterEncoding(JsonEncoding.UTF8.getJavaName()); response.setContentType(CONTENT_TYPE_CSV); - respondInternal(stream, query, response); + respondInternal(results, options, response); } - private void respondInternal(Stream> stream, DAQQuery query, HttpServletResponse response) - throws Exception { + private void respondInternal(List>>> results, + ResponseOptions options, HttpServletResponse response) throws Exception { + AtomicReference exception = new AtomicReference<>(); + OutputStream out = handleCompressionAndResponseHeaders(options, response, CONTENT_TYPE_CSV); + List beanWriters = new ArrayList<>(); - Set queryFields = query.getFields(); - List aggregations = query.getAggregations(); - int aggregationsSize = (aggregations != null ? aggregations.size() : 0); + results.forEach(entry -> { + DAQQueryElement query = entry.getKey(); - Set fieldMapping = new LinkedHashSet<>(queryFields.size() + aggregationsSize); - List header = new ArrayList<>(queryFields.size() + aggregationsSize); + Set queryFields = query.getFields(); + List aggregations = query.getAggregations(); + int aggregationsSize = (aggregations != null ? aggregations.size() : 0); - CellProcessor[] processors = setupCellProcessors(query, fieldMapping, header); - - OutputStream out = handleCompressionAndResponseHeaders(query, response, CONTENT_TYPE_CSV); - ICsvDozerBeanWriter beanWriter = setupBeanWriter(fieldMapping, out); + Set fieldMapping = new LinkedHashSet<>(queryFields.size() + aggregationsSize); + List header = new ArrayList<>(queryFields.size() + aggregationsSize); - writeToOutput(stream, header, processors, beanWriter); - + AtomicReference processorsRef = new AtomicReference<>(); + + entry.getValue() + .sequential() + .forEach(triple -> { + try { + CellProcessor[] processors = processorsRef.get(); + ICsvDozerBeanWriter beanWriter; + + if (processors == null) { + processors = setupCellProcessors(query, triple.getLeft(), fieldMapping, header); + processorsRef.set(processors); + + beanWriter = setupBeanWriter(fieldMapping, out); + beanWriters.add(beanWriter); + } else { + beanWriter = beanWriters.get(beanWriters.size() - 1); + } + + writeToOutput(triple, processors, beanWriter, header); + } catch (Exception e) { + LOGGER.warn("Could not write CSV of '{}'", triple.getMiddle(), e); + exception.compareAndSet(null, e); + } + }); + + if (!beanWriters.isEmpty()) { + try { + beanWriters.get(beanWriters.size() - 1).flush(); + } catch (Exception e) { + LOGGER.error("Could not flush ICsvDozerBeanWriter.", e); + exception.compareAndSet(null, e); + } + } + }); + + for (ICsvDozerBeanWriter beanWriter : beanWriters) { + try { + beanWriter.close(); + } catch (Exception e) { + LOGGER.error("Could not close ICsvDozerBeanWriter.", e); + exception.compareAndSet(null, e); + } + } + + if (exception.get() != null) { + throw exception.get(); + } } /** - * Sets up the bean writer instance. - * @throws Exception - * - */ - private ICsvDozerBeanWriter setupBeanWriter(Set fieldMapping, OutputStream out) throws Exception { - CsvPreference preference = new CsvPreference.Builder( - (char) CsvPreference.EXCEL_NORTH_EUROPE_PREFERENCE.getQuoteChar(), - DELIMITER_CVS, - CsvPreference.EXCEL_NORTH_EUROPE_PREFERENCE.getEndOfLineSymbols()).build(); - - ICsvDozerBeanWriter beanWriter = new CsvDozerBeanWriter(new OutputStreamWriter(out), preference); - // configure the mapping from the fields to the CSV columns - beanWriter.configureBeanMapping(DataEvent.class, fieldMapping.toArray(new String[fieldMapping.size()])); - return beanWriter; - } + * Sets up the bean writer instance. + * + * @throws Exception + * + */ + private ICsvDozerBeanWriter setupBeanWriter(Set fieldMapping, OutputStream out) throws Exception { + CsvPreference preference = new CsvPreference.Builder( + (char) CsvPreference.EXCEL_NORTH_EUROPE_PREFERENCE.getQuoteChar(), + DELIMITER_CVS, + CsvPreference.EXCEL_NORTH_EUROPE_PREFERENCE.getEndOfLineSymbols()).build(); + + ICsvDozerBeanWriter beanWriter = new CsvDozerBeanWriter(new OutputStreamWriter(out), preference); + // configure the mapping from the fields to the CSV columns + beanWriter.configureBeanMapping(DataEvent.class, fieldMapping.toArray(new String[fieldMapping.size()])); + return beanWriter; + } /** * Sets up the array of {@link CellProcessor}s needed for later configuration of the bean writer. @@ -101,34 +155,36 @@ public class CSVResponseStreamWriter extends AbstractResponseStreamWriter { * with other processors to fully automate all of the required conversions and constraint * validation for a single CSV column. * - * @param query The current {@link DAQQuery} + * @param daqQuery The current {@link DAQQueryElement} + * @param backendQuery One BackendQuery of the current {@link DAQQueryElement} * @return Array of {@link CellProcessor} entries */ - private CellProcessor[] setupCellProcessors(DAQQuery query, Set fieldMapping, List header) { - Set queryFields = query.getFields(); - List aggregations = query.getAggregations(); - + private CellProcessor[] setupCellProcessors(DAQQueryElement daqQuery, BackendQuery backendQuery, + Set fieldMapping, List header) { + Set queryFields = daqQuery.getFields(); + List aggregations = daqQuery.getAggregations(); + List processorSet = new ArrayList<>(queryFields.size() + (aggregations != null ? aggregations.size() : 0)); - + boolean isNewField; - QueryAnalyzer queryAnalyzer = queryAnalizerFactory.apply(query); - + QueryAnalyzer queryAnalyzer = queryAnalizerFactory.apply(backendQuery); + for (QueryField field : queryFields) { - if(!(QueryField.value.equals(field) && queryAnalyzer.isAggregationEnabled())){ + if (!(QueryField.value.equals(field) && queryAnalyzer.isAggregationEnabled())) { isNewField = fieldMapping.add(field.name()); - + if (isNewField) { header.add(field.name()); processorSet.add(new ArrayProcessor(new NotNull(), DELIMITER_ARRAY)); } } } - + if (aggregations != null && queryAnalyzer.isAggregationEnabled()) { - for (Aggregation aggregation : query.getAggregations()) { + for (Aggregation aggregation : daqQuery.getAggregations()) { isNewField = fieldMapping.add("value." + aggregation.name()); - + if (isNewField) { header.add(aggregation.name()); processorSet.add(new ArrayProcessor(new NotNull(), DELIMITER_ARRAY)); @@ -139,44 +195,27 @@ public class CSVResponseStreamWriter extends AbstractResponseStreamWriter { } @SuppressWarnings("unchecked") - private void writeToOutput(Stream> stream, List header, CellProcessor[] processors, - ICsvDozerBeanWriter beanWriter) throws IOException, Exception { - AtomicReference exception = new AtomicReference<>(); - try { - + private void writeToOutput(Triple triple, CellProcessor[] processors, + ICsvDozerBeanWriter beanWriter, List header) throws IOException { + if (triple.getRight() instanceof Stream) { + beanWriter.writeComment(""); + beanWriter.writeComment("Start of " + triple.getMiddle()); beanWriter.writeHeader(header.toArray(new String[header.size()])); - stream - /* ensure elements are sequentially written */ - .sequential() + Stream eventStream = (Stream) triple.getRight(); + eventStream .forEach( - entry -> { - if (entry.getValue() instanceof Stream) { - Stream eventStream = (Stream) entry.getValue(); - eventStream - .forEach( - event -> { - try { - beanWriter.write(event, processors); - } catch (Exception e) { - LOGGER.error("Could not write elements for '{}-{}'", event.getChannel(), - event.getPulseId(), e); - exception.compareAndSet(null, e); - } - }); - } else { - String message = "Type '" + entry.getValue().getClass() + "' not supported."; - LOGGER.error(message); - exception.compareAndSet(null, new RuntimeException(message)); - } - } - ); - } finally { - beanWriter.close(); - } - - if (exception.get() != null) { - throw exception.get(); + event -> { + try { + beanWriter.write(event, processors); + } catch (Exception e) { + LOGGER.error("Could not write elements for '{}-{}'", event.getChannel(), + event.getPulseId(), e); + } + }); + } else { + String message = "Type '" + triple.getRight().getClass() + "' not supported."; + LOGGER.error(message); } } diff --git a/src/main/java/ch/psi/daq/queryrest/response/json/JSONResponseStreamWriter.java b/src/main/java/ch/psi/daq/queryrest/response/json/JSONResponseStreamWriter.java index 850c73a..62daa94 100644 --- a/src/main/java/ch/psi/daq/queryrest/response/json/JSONResponseStreamWriter.java +++ b/src/main/java/ch/psi/daq/queryrest/response/json/JSONResponseStreamWriter.java @@ -1,6 +1,5 @@ package ch.psi.daq.queryrest.response.json; -import java.io.IOException; import java.io.OutputStream; import java.util.LinkedHashSet; import java.util.List; @@ -13,15 +12,11 @@ import javax.annotation.Resource; import javax.servlet.ServletResponse; import javax.servlet.http.HttpServletResponse; +import org.apache.commons.lang3.tuple.Triple; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.http.MediaType; -import ch.psi.daq.query.model.Aggregation; -import ch.psi.daq.query.model.QueryField; -import ch.psi.daq.query.model.impl.DAQQuery; -import ch.psi.daq.queryrest.response.AbstractResponseStreamWriter; - import com.fasterxml.jackson.core.JsonEncoding; import com.fasterxml.jackson.core.JsonFactory; import com.fasterxml.jackson.core.JsonGenerator; @@ -30,6 +25,14 @@ import com.fasterxml.jackson.databind.ObjectWriter; import com.fasterxml.jackson.databind.ser.impl.SimpleBeanPropertyFilter; import com.fasterxml.jackson.databind.ser.impl.SimpleFilterProvider; +import ch.psi.daq.query.model.Aggregation; +import ch.psi.daq.query.model.QueryField; +import ch.psi.daq.query.model.ResponseOptions; +import ch.psi.daq.query.model.impl.BackendQuery; +import ch.psi.daq.query.model.impl.DAQQueryElement; +import ch.psi.daq.query.request.ChannelName; +import ch.psi.daq.queryrest.response.AbstractResponseStreamWriter; + /** * Takes a Java 8 stream and writes it to the output stream provided by the {@link ServletResponse} * of the current request. @@ -38,7 +41,7 @@ public class JSONResponseStreamWriter extends AbstractResponseStreamWriter { private static final String DATA_RESP_FIELD = "data"; - private static final Logger logger = LoggerFactory.getLogger(JSONResponseStreamWriter.class); + private static final Logger LOGGER = LoggerFactory.getLogger(JSONResponseStreamWriter.class); @Resource private JsonFactory jsonFactory; @@ -46,19 +49,78 @@ public class JSONResponseStreamWriter extends AbstractResponseStreamWriter { @Resource private ObjectMapper mapper; + @Override - public void respond(Stream> stream, DAQQuery query, HttpServletResponse response) throws Exception { + public void respond(List>>> results, + ResponseOptions options, HttpServletResponse response) throws Exception { response.setCharacterEncoding(JsonEncoding.UTF8.getJavaName()); response.setContentType(MediaType.APPLICATION_JSON_VALUE); - Set includedFields = getFields(query); - - ObjectWriter writer = configureWriter(includedFields); - - respondInternal(stream, response, writer, query); + respondInternal(results, options, response); } - protected Set getFields(DAQQuery query) { + private void respondInternal(List>>> results, + ResponseOptions options, HttpServletResponse response) throws Exception { + AtomicReference exception = new AtomicReference<>(); + OutputStream out = handleCompressionAndResponseHeaders(options, response, CONTENT_TYPE_JSON); + JsonGenerator generator = jsonFactory.createGenerator(out, JsonEncoding.UTF8); + + try { + if (results.size() > 1) { + generator.writeStartArray(); + } + + results + .forEach(entryy -> { + DAQQueryElement daqQuery = entryy.getKey(); + Set includedFields = getFields(daqQuery); + ObjectWriter writer = configureWriter(includedFields); + + try { + generator.writeStartArray(); + + entryy.getValue() + /* ensure elements are sequentially written */ + .sequential() + .forEach( + triple -> { + try { + generator.writeStartObject(); + generator.writeStringField(QueryField.channel.name(), triple.getMiddle() + .getName()); + + generator.writeFieldName(DATA_RESP_FIELD); + writer.writeValue(generator, triple.getRight()); + + generator.writeEndObject(); + } catch (Exception e) { + LOGGER.error("Could not write channel name of channel '{}'", triple.getMiddle(), + e); + exception.compareAndSet(null, e); + } + }); + + generator.writeEndArray(); + } catch (Exception e) { + LOGGER.error("Exception while writing json for '{}'", daqQuery.getChannels(), e); + exception.compareAndSet(null, e); + } + }); + } finally { + if (results.size() > 1) { + generator.writeEndArray(); + } + + generator.flush(); + generator.close(); + } + + if (exception.get() != null) { + throw exception.get(); + } + } + + protected Set getFields(DAQQueryElement query) { Set queryFields = query.getFields(); List aggregations = query.getAggregations(); @@ -95,52 +157,4 @@ public class JSONResponseStreamWriter extends AbstractResponseStreamWriter { ObjectWriter writer = mapper.writer(propertyFilter); return writer; } - - /** - * Writes the outer Java stream into the output stream. - * - * @param stream Mapping from channel name to data - * @param response {@link ServletResponse} instance given by the current HTTP request - * @param writer configured writer that includes the fields the end user wants to see - * @param query - * @throws IOException thrown if writing to the output stream fails - */ - private void respondInternal(Stream> stream, HttpServletResponse response, ObjectWriter writer, DAQQuery query) - throws Exception { - - OutputStream out = handleCompressionAndResponseHeaders(query, response, CONTENT_TYPE_JSON); - JsonGenerator generator = jsonFactory.createGenerator(out, JsonEncoding.UTF8); - - AtomicReference exception = new AtomicReference<>(); - try { - generator.writeStartArray(); - stream - /* ensure elements are sequentially written */ - .sequential() - .forEach( - entry -> { - try { - generator.writeStartObject(); - generator.writeStringField(QueryField.channel.name(), entry.getKey()); - - generator.writeFieldName(DATA_RESP_FIELD); - writer.writeValue(generator, entry.getValue()); - - generator.writeEndObject(); - } catch (Exception e) { - logger.error("Could not write channel name of channel '{}'", entry.getKey(), e); - exception.compareAndSet(null, e); - } - }); - generator.writeEndArray(); - } finally { - generator.flush(); - generator.close(); - } - - if (exception.get() != null) { - throw exception.get(); - } - } - } diff --git a/src/main/java/ch/psi/daq/queryrest/response/json/JsonByteArraySerializer.java b/src/main/java/ch/psi/daq/queryrest/response/json/JsonByteArraySerializer.java deleted file mode 100644 index dee1d8d..0000000 --- a/src/main/java/ch/psi/daq/queryrest/response/json/JsonByteArraySerializer.java +++ /dev/null @@ -1,30 +0,0 @@ -package ch.psi.daq.queryrest.response.json; - -import java.io.IOException; - -import com.fasterxml.jackson.core.JsonGenerationException; -import com.fasterxml.jackson.core.JsonGenerator; -import com.fasterxml.jackson.databind.SerializerProvider; -import com.fasterxml.jackson.databind.ser.std.StdSerializer; - -// see: http://stackoverflow.com/a/15037329 -public class JsonByteArraySerializer extends StdSerializer { - private static final long serialVersionUID = -5914688899857435263L; - - public JsonByteArraySerializer() { - super(byte[].class, true); - } - - @Override - public void serialize(byte[] bytes, JsonGenerator jgen, SerializerProvider provider) throws IOException, - JsonGenerationException { - jgen.writeStartArray(); - - for (byte b : bytes) { - // stackoverflow example used a mask -> ? - jgen.writeNumber(b); - } - - jgen.writeEndArray(); - } -} diff --git a/src/main/java/ch/psi/daq/queryrest/response/json/JsonStreamSerializer.java b/src/main/java/ch/psi/daq/queryrest/response/json/JsonStreamSerializer.java deleted file mode 100644 index 61fede0..0000000 --- a/src/main/java/ch/psi/daq/queryrest/response/json/JsonStreamSerializer.java +++ /dev/null @@ -1,24 +0,0 @@ -package ch.psi.daq.queryrest.response.json; - -import java.io.IOException; -import java.util.Iterator; -import java.util.stream.Stream; - -import com.fasterxml.jackson.core.JsonGenerationException; -import com.fasterxml.jackson.core.JsonGenerator; -import com.fasterxml.jackson.databind.SerializerProvider; -import com.fasterxml.jackson.databind.ser.std.StdSerializer; - -public class JsonStreamSerializer extends StdSerializer>{ - private static final long serialVersionUID = 4695859735299703478L; - - public JsonStreamSerializer() { - super(Stream.class, true); - } - - @Override - public void serialize(Stream stream, JsonGenerator jgen, SerializerProvider provider) throws IOException, - JsonGenerationException { - provider.findValueSerializer(Iterator.class, null).serialize(stream.iterator(), jgen, provider); - } -} diff --git a/src/test/java/ch/psi/daq/test/queryrest/DaqWebMvcConfig.java b/src/test/java/ch/psi/daq/test/queryrest/DaqWebMvcConfig.java index 50f85b5..f92b2ae 100644 --- a/src/test/java/ch/psi/daq/test/queryrest/DaqWebMvcConfig.java +++ b/src/test/java/ch/psi/daq/test/queryrest/DaqWebMvcConfig.java @@ -12,11 +12,13 @@ import org.springframework.web.servlet.config.annotation.WebMvcConfigurationSupp import ch.psi.daq.cassandra.reader.CassandraReader; import ch.psi.daq.cassandra.util.test.CassandraDataGen; +import ch.psi.daq.domain.reader.DataReader; import ch.psi.daq.query.processor.QueryProcessor; import ch.psi.daq.query.processor.QueryProcessorLocal; import ch.psi.daq.test.cassandra.admin.CassandraTestAdmin; import ch.psi.daq.test.cassandra.admin.CassandraTestAdminImpl; import ch.psi.daq.test.query.config.LocalQueryTestConfig; +import ch.psi.daq.test.queryrest.query.DummyArchiverApplianceReader; import ch.psi.daq.test.queryrest.query.DummyCassandraReader; @Configuration @@ -45,6 +47,18 @@ public class DaqWebMvcConfig extends WebMvcConfigurationSupport { return new DummyCassandraReader(); } +// @Bean +// @Lazy +// public DataReader archiverApplianceReader() { +// return new DummyArchiverApplianceReader(); +// } +// +// @Bean +// @Lazy +// public QueryProcessor archiverApplianceQueryProcessor() { +// return new QueryProcessorLocal(archiverApplianceReader()); +// } + @Bean @Lazy public CassandraTestAdmin cassandraTestAdmin() { diff --git a/src/test/java/ch/psi/daq/test/queryrest/controller/QueryRestControllerCsvTest.java b/src/test/java/ch/psi/daq/test/queryrest/controller/QueryRestControllerCsvTest.java index 9700f38..6e3fcec 100644 --- a/src/test/java/ch/psi/daq/test/queryrest/controller/QueryRestControllerCsvTest.java +++ b/src/test/java/ch/psi/daq/test/queryrest/controller/QueryRestControllerCsvTest.java @@ -34,7 +34,9 @@ import ch.psi.daq.query.model.AggregationType; import ch.psi.daq.query.model.Compression; import ch.psi.daq.query.model.QueryField; import ch.psi.daq.query.model.ResponseFormat; +import ch.psi.daq.query.model.impl.DAQQueries; import ch.psi.daq.query.model.impl.DAQQuery; +import ch.psi.daq.query.model.impl.DAQQueryElement; import ch.psi.daq.queryrest.controller.QueryRestController; import ch.psi.daq.queryrest.filter.CorsFilter; import ch.psi.daq.test.cassandra.admin.CassandraTestAdmin; @@ -69,9 +71,8 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest { 0, 1), TEST_CHANNEL_NAMES); - request.setCompression(Compression.NONE); request.setResponseFormat(ResponseFormat.CSV); - + LinkedHashSet queryFields = new LinkedHashSet<>(); LinkedHashSet cellProcessors = new LinkedHashSet<>(); queryFields.add(QueryField.channel); @@ -109,10 +110,14 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest { CellProcessor[] processors = cellProcessors.toArray(new CellProcessor[cellProcessors.size()]); String response = result.getResponse().getContentAsString(); + System.out.println("Response: " + response); + ICsvMapReader mapReader = new CsvMapReader(new StringReader(response), CsvPreference.EXCEL_NORTH_EUROPE_PREFERENCE); try { - final String[] header = mapReader.getHeader(true); + // read comment + mapReader.read(""); + String[] header = mapReader.getHeader(false); Map customerMap; long resultsPerChannel = 2; long pulse = 0; @@ -131,13 +136,111 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest { pulse = ++pulse % resultsPerChannel; if (pulse == 0) { ++channelCount; + if (channelCount <= TEST_CHANNEL_NAMES.length) { + // read comment (empty rows are skiped + mapReader.read(""); + header = mapReader.getHeader(false); + } } } } finally { mapReader.close(); } } - + + @Test + public void testPulseRangeQueries() throws Exception { + String testChannel3 = "testChannel3"; + DAQQueries request = new DAQQueries( + new DAQQueryElement( + new RequestRangePulseId( + 0, + 1), + TEST_CHANNEL_NAMES), + new DAQQueryElement( + new RequestRangePulseId( + 0, + 1), + testChannel3)); + request.setResponseFormat(ResponseFormat.CSV); + + LinkedHashSet queryFields = new LinkedHashSet<>(); + LinkedHashSet cellProcessors = new LinkedHashSet<>(); + queryFields.add(QueryField.channel); + cellProcessors.add(new NotNull()); + queryFields.add(QueryField.pulseId); + cellProcessors.add(new NotNull()); + queryFields.add(QueryField.iocMillis); + cellProcessors.add(new NotNull()); + queryFields.add(QueryField.iocNanos); + cellProcessors.add(new NotNull()); + queryFields.add(QueryField.globalMillis); + cellProcessors.add(new NotNull()); + queryFields.add(QueryField.globalNanos); + cellProcessors.add(new NotNull()); + queryFields.add(QueryField.shape); + cellProcessors.add(new NotNull()); + queryFields.add(QueryField.eventCount); + cellProcessors.add(new NotNull()); + queryFields.add(QueryField.value); + cellProcessors.add(new NotNull()); + for (DAQQueryElement element : request) { + element.setFields(queryFields); + } + + String content = mapper.writeValueAsString(request); + System.out.println(content); + + MvcResult result = this.mockMvc + .perform(MockMvcRequestBuilders + .post(QueryRestController.QUERIES) + .contentType(MediaType.APPLICATION_JSON) + .content(content)) + .andDo(MockMvcResultHandlers.print()) + .andExpect(MockMvcResultMatchers.status().isOk()) + .andReturn(); + + CellProcessor[] processors = cellProcessors.toArray(new CellProcessor[cellProcessors.size()]); + + String response = result.getResponse().getContentAsString(); + System.out.println("Response: " + response); + + ICsvMapReader mapReader = + new CsvMapReader(new StringReader(response), CsvPreference.EXCEL_NORTH_EUROPE_PREFERENCE); + try { + // read comment + mapReader.read(""); + String[] header = mapReader.getHeader(false); + Map customerMap; + long resultsPerChannel = 2; + long pulse = 0; + long channelCount = 1; + while ((customerMap = mapReader.read(header, processors)) != null) { + assertEquals(TEST_CHANNEL + channelCount, customerMap.get(QueryField.channel.name())); + assertEquals("" + pulse, customerMap.get(QueryField.pulseId.name())); + assertEquals("" + pulse * 10, customerMap.get(QueryField.iocMillis.name())); + assertEquals("0", customerMap.get(QueryField.iocNanos.name())); + assertEquals("" + pulse * 10, customerMap.get(QueryField.globalMillis.name())); + assertEquals("0", customerMap.get(QueryField.globalNanos.name())); + assertEquals("[1]", customerMap.get(QueryField.shape.name())); + assertEquals("1", customerMap.get(QueryField.eventCount.name())); + assertEquals("" + pulse, customerMap.get(QueryField.value.name())); + + pulse = ++pulse % resultsPerChannel; + if (pulse == 0) { + ++channelCount; + if (channelCount <= TEST_CHANNEL_NAMES.length + 1) { + // read comment (empty rows are skiped + mapReader.read(""); + header = mapReader.getHeader(false); + } + } + } + } finally { + mapReader.close(); + } + } + @Test public void testPulseRangeQueryWaveform() throws Exception { String channelName = "XYWaveform"; @@ -145,10 +248,9 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest { new RequestRangePulseId( 0, 1), - channelName); - request.setCompression(Compression.NONE); + channelName); request.setResponseFormat(ResponseFormat.CSV); - + LinkedHashSet queryFields = new LinkedHashSet<>(); LinkedHashSet cellProcessors = new LinkedHashSet<>(); queryFields.add(QueryField.channel); @@ -186,10 +288,14 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest { CellProcessor[] processors = cellProcessors.toArray(new CellProcessor[cellProcessors.size()]); String response = result.getResponse().getContentAsString(); + System.out.println("Response: " + response); + ICsvMapReader mapReader = new CsvMapReader(new StringReader(response), CsvPreference.EXCEL_NORTH_EUROPE_PREFERENCE); try { - final String[] header = mapReader.getHeader(true); + // read comment + mapReader.read(""); + final String[] header = mapReader.getHeader(false); Map customerMap; long pulse = 0; while ((customerMap = mapReader.read(header, processors)) != null) { @@ -218,9 +324,8 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest { 0, 10), TEST_CHANNEL_NAMES); - request.setCompression(Compression.NONE); request.setResponseFormat(ResponseFormat.CSV); - + LinkedHashSet queryFields = new LinkedHashSet<>(); LinkedHashSet cellProcessors = new LinkedHashSet<>(); queryFields.add(QueryField.channel); @@ -258,10 +363,14 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest { CellProcessor[] processors = cellProcessors.toArray(new CellProcessor[cellProcessors.size()]); String response = result.getResponse().getContentAsString(); - ICsvMapReader mapReader = + System.out.println("Response: " + response); + + CsvMapReader mapReader = new CsvMapReader(new StringReader(response), CsvPreference.EXCEL_NORTH_EUROPE_PREFERENCE); try { - final String[] header = mapReader.getHeader(true); + // read comment + mapReader.read(""); + String[] header = mapReader.getHeader(false); Map customerMap; long resultsPerChannel = 2; long pulse = 0; @@ -280,6 +389,11 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest { pulse = ++pulse % resultsPerChannel; if (pulse == 0) { ++channelCount; + if (channelCount <= TEST_CHANNEL_NAMES.length) { + // read comment (empty rows are skiped + mapReader.read(""); + header = mapReader.getHeader(false); + } } } } finally { @@ -296,9 +410,8 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest { startDate, endDate), TEST_CHANNEL_NAMES); - request.setCompression(Compression.NONE); request.setResponseFormat(ResponseFormat.CSV); - + LinkedHashSet queryFields = new LinkedHashSet<>(); LinkedHashSet cellProcessors = new LinkedHashSet<>(); queryFields.add(QueryField.channel); @@ -336,10 +449,14 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest { CellProcessor[] processors = cellProcessors.toArray(new CellProcessor[cellProcessors.size()]); String response = result.getResponse().getContentAsString(); + System.out.println("Response: " + response); + ICsvMapReader mapReader = new CsvMapReader(new StringReader(response), CsvPreference.EXCEL_NORTH_EUROPE_PREFERENCE); try { - final String[] header = mapReader.getHeader(true); + // read comment + mapReader.read(""); + String[] header = mapReader.getHeader(false); Map customerMap; long resultsPerChannel = 2; long pulse = 0; @@ -358,6 +475,11 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest { pulse = ++pulse % resultsPerChannel; if (pulse == 0) { ++channelCount; + if (channelCount <= TEST_CHANNEL_NAMES.length) { + // read comment (empty rows are skiped + mapReader.read(""); + header = mapReader.getHeader(false); + } } } } finally { @@ -371,11 +493,9 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest { new RequestRangePulseId( 0, 1), - false, Ordering.asc, AggregationType.extrema, TEST_CHANNEL_NAMES[0]); - request.setCompression(Compression.NONE); request.setResponseFormat(ResponseFormat.CSV); String content = mapper.writeValueAsString(request); @@ -403,11 +523,9 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest { new RequestRangePulseId( 0, 1), - false, Ordering.asc, AggregationType.index, TEST_CHANNEL_NAMES[0]); - request.setCompression(Compression.NONE); request.setResponseFormat(ResponseFormat.CSV); String content = mapper.writeValueAsString(request); @@ -441,7 +559,6 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest { endDate), TEST_CHANNEL_01); request.setNrOfBins(2); - request.setCompression(Compression.NONE); request.setResponseFormat(ResponseFormat.CSV); LinkedHashSet queryFields = new LinkedHashSet<>(); @@ -473,7 +590,6 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest { cellProcessors.add(new NotNull()); request.setAggregations(aggregations); - String content = mapper.writeValueAsString(request); System.out.println(content); @@ -489,10 +605,14 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest { CellProcessor[] processors = cellProcessors.toArray(new CellProcessor[cellProcessors.size()]); String response = result.getResponse().getContentAsString(); + System.out.println("Response: " + response); + ICsvMapReader mapReader = new CsvMapReader(new StringReader(response), CsvPreference.EXCEL_NORTH_EUROPE_PREFERENCE); try { - final String[] header = mapReader.getHeader(true); + // read comment + mapReader.read(""); + String[] header = mapReader.getHeader(false); Map customerMap; long pulse = 0; while ((customerMap = mapReader.read(header, processors)) != null) { @@ -527,7 +647,6 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest { endDate), TEST_CHANNEL_01); request.setBinSize(100); - request.setCompression(Compression.NONE); request.setResponseFormat(ResponseFormat.CSV); LinkedHashSet queryFields = new LinkedHashSet<>(); @@ -575,10 +694,14 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest { CellProcessor[] processors = cellProcessors.toArray(new CellProcessor[cellProcessors.size()]); String response = result.getResponse().getContentAsString(); + System.out.println("Response: " + response); + ICsvMapReader mapReader = new CsvMapReader(new StringReader(response), CsvPreference.EXCEL_NORTH_EUROPE_PREFERENCE); try { - final String[] header = mapReader.getHeader(true); + // read comment + mapReader.read(""); + String[] header = mapReader.getHeader(false); Map customerMap; long pulse = 0; while ((customerMap = mapReader.read(header, processors)) != null) { @@ -600,7 +723,7 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest { mapReader.close(); } } - + @Test public void testGzipFileSuffixHeader() throws Exception { DAQQuery request = new DAQQuery( @@ -623,16 +746,15 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest { .andExpect(MockMvcResultMatchers.status().isOk()) .andExpect(MockMvcResultMatchers.header().string("Content-Disposition", "attachment; filename=data.gz")); } - + @Test - public void testJsonFileSuffixHeader() throws Exception { + public void testCvsFileSuffixHeader() throws Exception { DAQQuery request = new DAQQuery( new RequestRangePulseId( 10, 11), TEST_CHANNEL_NAMES); request.setResponseFormat(ResponseFormat.CSV); - request.setCompression(Compression.NONE); String content = mapper.writeValueAsString(request); diff --git a/src/test/java/ch/psi/daq/test/queryrest/controller/QueryRestControllerJsonTest.java b/src/test/java/ch/psi/daq/test/queryrest/controller/QueryRestControllerJsonTest.java index 051fb3f..cec165a 100644 --- a/src/test/java/ch/psi/daq/test/queryrest/controller/QueryRestControllerJsonTest.java +++ b/src/test/java/ch/psi/daq/test/queryrest/controller/QueryRestControllerJsonTest.java @@ -15,13 +15,19 @@ import ch.psi.daq.cassandra.request.range.RequestRangePulseId; import ch.psi.daq.cassandra.request.range.RequestRangeTime; import ch.psi.daq.cassandra.util.test.CassandraDataGen; import ch.psi.daq.common.ordering.Ordering; +import ch.psi.daq.domain.reader.Backend; import ch.psi.daq.query.model.AggregationType; import ch.psi.daq.query.model.Compression; +import ch.psi.daq.query.model.impl.DAQQueries; import ch.psi.daq.query.model.impl.DAQQuery; +import ch.psi.daq.query.model.impl.DAQQueryElement; +import ch.psi.daq.query.request.ChannelName; +import ch.psi.daq.query.request.ChannelsRequest; import ch.psi.daq.queryrest.controller.QueryRestController; import ch.psi.daq.queryrest.filter.CorsFilter; import ch.psi.daq.test.cassandra.admin.CassandraTestAdmin; import ch.psi.daq.test.queryrest.AbstractDaqRestTest; +import ch.psi.daq.test.queryrest.query.DummyArchiverApplianceReader; /** * Tests the {@link DaqController} implementation. @@ -47,17 +53,30 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest { @Test public void testChannelNameQuery() throws Exception { - this.mockMvc.perform( - MockMvcRequestBuilders - .get(QueryRestController.CHANNELS) - .contentType(MediaType.APPLICATION_JSON)) + this.mockMvc + .perform( + MockMvcRequestBuilders + .get(QueryRestController.CHANNELS) + .contentType(MediaType.APPLICATION_JSON)) .andDo(MockMvcResultHandlers.print()) .andExpect(MockMvcResultMatchers.status().isOk()) .andExpect(MockMvcResultMatchers.jsonPath("$").isArray()) .andExpect(MockMvcResultMatchers.jsonPath("$[0]").exists()) - .andExpect(MockMvcResultMatchers.jsonPath("$[0]").value("BoolScalar")) + .andExpect(MockMvcResultMatchers.jsonPath("$[0].backend").value(Backend.databuffer.name())) + .andExpect(MockMvcResultMatchers.jsonPath("$[0].channels").isArray()) + .andExpect(MockMvcResultMatchers.jsonPath("$[0].channels[0]").exists()) + .andExpect(MockMvcResultMatchers.jsonPath("$[0].channels[0]").value("BoolScalar")) + .andExpect(MockMvcResultMatchers.jsonPath("$[0].channels[1]").exists()) + .andExpect(MockMvcResultMatchers.jsonPath("$[0].channels[1]").value("BoolWaveform")) .andExpect(MockMvcResultMatchers.jsonPath("$[1]").exists()) - .andExpect(MockMvcResultMatchers.jsonPath("$[1]").value("BoolWaveform")); + .andExpect(MockMvcResultMatchers.jsonPath("$[1].backend").value(Backend.archiverappliance.name())) + .andExpect(MockMvcResultMatchers.jsonPath("$[1].channels").isArray()) + .andExpect(MockMvcResultMatchers.jsonPath("$[1].channels[0]").exists()) + .andExpect( + MockMvcResultMatchers.jsonPath("$[1].channels[0]").value(DummyArchiverApplianceReader.TEST_CHANNEL_1)) + .andExpect(MockMvcResultMatchers.jsonPath("$[1].channels[1]").exists()) + .andExpect( + MockMvcResultMatchers.jsonPath("$[1].channels[1]").value(DummyArchiverApplianceReader.TEST_CHANNEL_2)); } @@ -71,14 +90,101 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest { .andExpect(MockMvcResultMatchers.status().isOk()) .andExpect(MockMvcResultMatchers.jsonPath("$").isArray()) .andExpect(MockMvcResultMatchers.jsonPath("$[0]").exists()) - .andExpect(MockMvcResultMatchers.jsonPath("$[0]").value("Int32Scalar")) + .andExpect(MockMvcResultMatchers.jsonPath("$[0].backend").value(Backend.databuffer.name())) + .andExpect(MockMvcResultMatchers.jsonPath("$[0].channels").isArray()) + .andExpect(MockMvcResultMatchers.jsonPath("$[0].channels[0]").exists()) + .andExpect(MockMvcResultMatchers.jsonPath("$[0].channels[0]").value("Int32Scalar")) + .andExpect(MockMvcResultMatchers.jsonPath("$[0].channels[1]").exists()) + .andExpect(MockMvcResultMatchers.jsonPath("$[0].channels[1]").value("Int32Waveform")) + .andExpect(MockMvcResultMatchers.jsonPath("$[0].channels[2]").exists()) + .andExpect(MockMvcResultMatchers.jsonPath("$[0].channels[2]").value("UInt32Scalar")) + .andExpect(MockMvcResultMatchers.jsonPath("$[0].channels[3]").exists()) + .andExpect(MockMvcResultMatchers.jsonPath("$[0].channels[3]").value("UInt32Waveform")) .andExpect(MockMvcResultMatchers.jsonPath("$[1]").exists()) - .andExpect(MockMvcResultMatchers.jsonPath("$[1]").value("Int32Waveform")) - .andExpect(MockMvcResultMatchers.jsonPath("$[2]").exists()) - .andExpect(MockMvcResultMatchers.jsonPath("$[2]").value("UInt32Scalar")) - .andExpect(MockMvcResultMatchers.jsonPath("$[3]").exists()) - .andExpect(MockMvcResultMatchers.jsonPath("$[3]").value("UInt32Waveform")) - .andExpect(MockMvcResultMatchers.jsonPath("$[4]").doesNotExist()); + .andExpect(MockMvcResultMatchers.jsonPath("$[1].backend").value(Backend.archiverappliance.name())) + .andExpect(MockMvcResultMatchers.jsonPath("$[1].channels").isArray()) + .andExpect(MockMvcResultMatchers.jsonPath("$[1].channels[0]").doesNotExist()); + } + + @Test + public void testChannelNameQueryBackendOrder() throws Exception { + ChannelsRequest request = new ChannelsRequest("int64", Ordering.desc, Backend.databuffer); + + String content = mapper.writeValueAsString(request); + System.out.println(content); + + this.mockMvc + .perform(MockMvcRequestBuilders + .post(QueryRestController.CHANNELS) + .contentType(MediaType.APPLICATION_JSON) + .content(content)) + + .andExpect(MockMvcResultMatchers.status().isOk()) + .andExpect(MockMvcResultMatchers.jsonPath("$").isArray()) + .andExpect(MockMvcResultMatchers.jsonPath("$[0]").exists()) + .andExpect(MockMvcResultMatchers.jsonPath("$[0].backend").value(Backend.databuffer.name())) + .andExpect(MockMvcResultMatchers.jsonPath("$[0].channels").isArray()) + .andExpect(MockMvcResultMatchers.jsonPath("$[0].channels[0]").exists()) + .andExpect(MockMvcResultMatchers.jsonPath("$[0].channels[0]").value("UInt64Waveform")) + .andExpect(MockMvcResultMatchers.jsonPath("$[0].channels[1]").exists()) + .andExpect(MockMvcResultMatchers.jsonPath("$[0].channels[1]").value("UInt64Scalar")) + .andExpect(MockMvcResultMatchers.jsonPath("$[0].channels[2]").exists()) + .andExpect(MockMvcResultMatchers.jsonPath("$[0].channels[2]").value("Int64Waveform")) + .andExpect(MockMvcResultMatchers.jsonPath("$[0].channels[3]").exists()) + .andExpect(MockMvcResultMatchers.jsonPath("$[0].channels[3]").value("Int64Scalar")) + .andExpect(MockMvcResultMatchers.jsonPath("$[1]").doesNotExist()); + } + + @Test + public void testChannelNameQueryReload() throws Exception { + ChannelsRequest request = new ChannelsRequest(); + + String content = mapper.writeValueAsString(request); + System.out.println(content); + + this.mockMvc + .perform(MockMvcRequestBuilders + .post(QueryRestController.CHANNELS) + .contentType(MediaType.APPLICATION_JSON) + .content(content)) + + .andExpect(MockMvcResultMatchers.status().isOk()) + .andExpect(MockMvcResultMatchers.jsonPath("$").isArray()) + .andExpect(MockMvcResultMatchers.jsonPath("$[0]").exists()) + .andExpect(MockMvcResultMatchers.jsonPath("$[0].backend").value(Backend.databuffer.name())) + .andExpect(MockMvcResultMatchers.jsonPath("$[0].channels").isArray()) + .andExpect(MockMvcResultMatchers.jsonPath("$[0].channels[23]").exists()) + .andExpect(MockMvcResultMatchers.jsonPath("$[0].channels[24]").doesNotExist()) + .andExpect(MockMvcResultMatchers.jsonPath("$[1]").exists()) + .andExpect(MockMvcResultMatchers.jsonPath("$[1].backend").value(Backend.archiverappliance.name())) + .andExpect(MockMvcResultMatchers.jsonPath("$[1].channels").isArray()) + .andExpect(MockMvcResultMatchers.jsonPath("$[1].channels[2]").exists()) + .andExpect(MockMvcResultMatchers.jsonPath("$[1].channels[3]").doesNotExist()); + + // each reload add another channel + request.setReload(true); + + content = mapper.writeValueAsString(request); + System.out.println(content); + + this.mockMvc + .perform(MockMvcRequestBuilders + .post(QueryRestController.CHANNELS) + .contentType(MediaType.APPLICATION_JSON) + .content(content)) + + .andExpect(MockMvcResultMatchers.status().isOk()) + .andExpect(MockMvcResultMatchers.jsonPath("$").isArray()) + .andExpect(MockMvcResultMatchers.jsonPath("$[0]").exists()) + .andExpect(MockMvcResultMatchers.jsonPath("$[0].backend").value(Backend.databuffer.name())) + .andExpect(MockMvcResultMatchers.jsonPath("$[0].channels").isArray()) + .andExpect(MockMvcResultMatchers.jsonPath("$[0].channels[24]").exists()) + .andExpect(MockMvcResultMatchers.jsonPath("$[0].channels[25]").doesNotExist()) + .andExpect(MockMvcResultMatchers.jsonPath("$[1]").exists()) + .andExpect(MockMvcResultMatchers.jsonPath("$[1].backend").value(Backend.archiverappliance.name())) + .andExpect(MockMvcResultMatchers.jsonPath("$[1].channels").isArray()) + .andExpect(MockMvcResultMatchers.jsonPath("$[1].channels[3]").exists()) + .andExpect(MockMvcResultMatchers.jsonPath("$[1].channels[4]").doesNotExist()); } @Test @@ -166,7 +272,6 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest { "Origin, Authorization, Accept, Content-Type")); } - @Test public void testPulseRangeQuery() throws Exception { DAQQuery request = new DAQQuery( @@ -174,7 +279,42 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest { 10, 11), TEST_CHANNEL_NAMES); - request.setCompression(Compression.NONE); + + String content = mapper.writeValueAsString(request); + System.out.println(content); + + this.mockMvc + .perform(MockMvcRequestBuilders + .post(QueryRestController.QUERY) + .contentType(MediaType.APPLICATION_JSON) + .content(content)) + + .andDo(MockMvcResultHandlers.print()) + .andExpect(MockMvcResultMatchers.status().isOk()) + .andExpect(MockMvcResultMatchers.jsonPath("$").isArray()) + .andExpect(MockMvcResultMatchers.jsonPath("$[0]").exists()) + .andExpect(MockMvcResultMatchers.jsonPath("$[0].channel").value(TEST_CHANNEL_01)) + .andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].pulseId").value(10)) + .andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].globalMillis").value(100)) + .andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].pulseId").value(11)) + .andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].globalMillis").value(110)) + .andExpect(MockMvcResultMatchers.jsonPath("$[1]").exists()) + .andExpect(MockMvcResultMatchers.jsonPath("$[1].channel").value(TEST_CHANNEL_02)) + .andExpect(MockMvcResultMatchers.jsonPath("$[1].data").isArray()) + .andExpect(MockMvcResultMatchers.jsonPath("$[1].data[0].pulseId").value(10)) + .andExpect(MockMvcResultMatchers.jsonPath("$[1].data[0].globalMillis").value(100)) + .andExpect(MockMvcResultMatchers.jsonPath("$[1].data[1].pulseId").value(11)) + .andExpect(MockMvcResultMatchers.jsonPath("$[1].data[1].globalMillis").value(110)); + } + + @Test + public void testPulseRangeQueryBackends() throws Exception { + DAQQuery request = new DAQQuery( + new RequestRangePulseId( + 10, + 11), + new ChannelName(TEST_CHANNEL_01, Backend.databuffer), + new ChannelName(TEST_CHANNEL_02, Backend.archiverappliance)); String content = mapper.writeValueAsString(request); System.out.println(content); @@ -203,6 +343,58 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest { .andExpect(MockMvcResultMatchers.jsonPath("$[1].data[1].globalMillis").value(110)); } + @Test + public void testPulseRangeQueries() throws Exception { + String testChannel3 = "testChannel3"; + DAQQueries request = new DAQQueries( + new DAQQueryElement( + new RequestRangePulseId( + 10, + 11), + TEST_CHANNEL_NAMES), + new DAQQueryElement( + new RequestRangePulseId( + 10, + 11), + testChannel3)); + + String content = mapper.writeValueAsString(request); + System.out.println(content); + + this.mockMvc + .perform(MockMvcRequestBuilders + .post(QueryRestController.QUERIES) + .contentType(MediaType.APPLICATION_JSON) + .content(content)) + + .andDo(MockMvcResultHandlers.print()) + .andExpect(MockMvcResultMatchers.status().isOk()) + .andExpect(MockMvcResultMatchers.jsonPath("$").isArray()) + .andExpect(MockMvcResultMatchers.jsonPath("$[0]").exists()) + .andExpect(MockMvcResultMatchers.jsonPath("$[0]").isArray()) + .andExpect(MockMvcResultMatchers.jsonPath("$[0][0]").exists()) + .andExpect(MockMvcResultMatchers.jsonPath("$[0][0].channel").value(TEST_CHANNEL_01)) + .andExpect(MockMvcResultMatchers.jsonPath("$[0][0].data[0].pulseId").value(10)) + .andExpect(MockMvcResultMatchers.jsonPath("$[0][0].data[0].globalMillis").value(100)) + .andExpect(MockMvcResultMatchers.jsonPath("$[0][0].data[1].pulseId").value(11)) + .andExpect(MockMvcResultMatchers.jsonPath("$[0][0].data[1].globalMillis").value(110)) + .andExpect(MockMvcResultMatchers.jsonPath("$[0][1]").exists()) + .andExpect(MockMvcResultMatchers.jsonPath("$[0][1].channel").value(TEST_CHANNEL_02)) + .andExpect(MockMvcResultMatchers.jsonPath("$[0][1].data").isArray()) + .andExpect(MockMvcResultMatchers.jsonPath("$[0][1].data[0].pulseId").value(10)) + .andExpect(MockMvcResultMatchers.jsonPath("$[0][1].data[0].globalMillis").value(100)) + .andExpect(MockMvcResultMatchers.jsonPath("$[0][1].data[1].pulseId").value(11)) + .andExpect(MockMvcResultMatchers.jsonPath("$[0][1].data[1].globalMillis").value(110)) + .andExpect(MockMvcResultMatchers.jsonPath("$[1]").exists()) + .andExpect(MockMvcResultMatchers.jsonPath("$[1]").isArray()) + .andExpect(MockMvcResultMatchers.jsonPath("$[1][0]").exists()) + .andExpect(MockMvcResultMatchers.jsonPath("$[1][0].channel").value(testChannel3)) + .andExpect(MockMvcResultMatchers.jsonPath("$[1][0].data[0].pulseId").value(10)) + .andExpect(MockMvcResultMatchers.jsonPath("$[1][0].data[0].globalMillis").value(100)) + .andExpect(MockMvcResultMatchers.jsonPath("$[1][0].data[1].pulseId").value(11)) + .andExpect(MockMvcResultMatchers.jsonPath("$[1][0].data[1].globalMillis").value(110)); + } + @Test public void testTimeRangeQuery() throws Exception { DAQQuery request = new DAQQuery( @@ -210,7 +402,6 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest { 100, 110), TEST_CHANNEL_NAMES); - request.setCompression(Compression.NONE); String content = mapper.writeValueAsString(request); @@ -247,7 +438,6 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest { startDate, endDate), TEST_CHANNEL_NAMES); - request.setCompression(Compression.NONE); String content = mapper.writeValueAsString(request); System.out.println(content); @@ -284,11 +474,9 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest { new RequestRangePulseId( 100, 101), - false, Ordering.asc, AggregationType.extrema, TEST_CHANNEL_NAMES[0]); - request.setCompression(Compression.NONE); String content = mapper.writeValueAsString(request); @@ -332,7 +520,6 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest { endDate), TEST_CHANNEL_01); request.setNrOfBins(2); - request.setCompression(Compression.NONE); String content = mapper.writeValueAsString(request); System.out.println(content); @@ -360,17 +547,16 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest { @Test public void testDateRangeQueryBinSizeAggregate() throws Exception { - long startTime = 1000; - long endTime = 1999; - String startDate = RequestRangeDate.format(startTime); - String endDate = RequestRangeDate.format(endTime); + long startTime = 1000; + long endTime = 1999; + String startDate = RequestRangeDate.format(startTime); + String endDate = RequestRangeDate.format(endTime); DAQQuery request = new DAQQuery( new RequestRangeDate( startDate, endDate), TEST_CHANNEL_01); request.setBinSize(100); - request.setCompression(Compression.NONE); String content = mapper.writeValueAsString(request); System.out.println(content); @@ -419,9 +605,9 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest { .andExpect(MockMvcResultMatchers.jsonPath("$[0].data[9].globalMillis").value(1900)) .andExpect(MockMvcResultMatchers.jsonPath("$[0].data[9].eventCount").value(10)); } - + @Test - public void testGzipCompression() throws Exception { + public void testGzipFileSuffixHeader() throws Exception { DAQQuery request = new DAQQuery( new RequestRangePulseId( 10, @@ -432,27 +618,6 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest { String content = mapper.writeValueAsString(request); System.out.println(content); - this.mockMvc - .perform(MockMvcRequestBuilders - .post(QueryRestController.QUERY) - .contentType(MediaType.APPLICATION_JSON) - .content(content)) - - .andDo(MockMvcResultHandlers.print()) - .andExpect(MockMvcResultMatchers.status().isOk()); - } - - @Test - public void testGzipFileSuffixHeader() throws Exception { - DAQQuery request = new DAQQuery( - new RequestRangePulseId( - 10, - 11), - TEST_CHANNEL_NAMES); - request.setCompression(Compression.GZIP); - - String content = mapper.writeValueAsString(request); - this.mockMvc .perform(MockMvcRequestBuilders .post(QueryRestController.QUERY) @@ -463,7 +628,7 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest { .andExpect(MockMvcResultMatchers.status().isOk()) .andExpect(MockMvcResultMatchers.header().string("Content-Disposition", "attachment; filename=data.gz")); } - + @Test public void testJsonFileSuffixHeader() throws Exception { DAQQuery request = new DAQQuery( @@ -471,7 +636,6 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest { 10, 11), TEST_CHANNEL_NAMES); - request.setCompression(Compression.NONE); String content = mapper.writeValueAsString(request); @@ -485,5 +649,4 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest { .andExpect(MockMvcResultMatchers.status().isOk()) .andExpect(MockMvcResultMatchers.header().string("Content-Disposition", "attachment; filename=data.json")); } - } diff --git a/src/test/java/ch/psi/daq/test/queryrest/query/DummyDataReader.java b/src/test/java/ch/psi/daq/test/queryrest/query/DummyArchiverApplianceReader.java similarity index 78% rename from src/test/java/ch/psi/daq/test/queryrest/query/DummyDataReader.java rename to src/test/java/ch/psi/daq/test/queryrest/query/DummyArchiverApplianceReader.java index 4aee732..3a30f90 100644 --- a/src/test/java/ch/psi/daq/test/queryrest/query/DummyDataReader.java +++ b/src/test/java/ch/psi/daq/test/queryrest/query/DummyArchiverApplianceReader.java @@ -2,6 +2,7 @@ package ch.psi.daq.test.queryrest.query; import java.util.List; import java.util.Random; +import java.util.concurrent.atomic.AtomicLong; import java.util.regex.Pattern; import java.util.stream.LongStream; import java.util.stream.Stream; @@ -11,20 +12,30 @@ import com.google.common.collect.Lists; import ch.psi.daq.common.ordering.Ordering; import ch.psi.daq.domain.DataEvent; import ch.psi.daq.domain.cassandra.ChannelEvent; +import ch.psi.daq.domain.reader.Backend; import ch.psi.daq.domain.reader.DataReader; -public class DummyDataReader implements DataReader { +public class DummyArchiverApplianceReader implements DataReader { + public static final String ARCHIVER_TEST_CHANNEL = "ArchiverTestChannel_"; - public static final String TEST_CHANNEL_1 = "testChannel1"; - public static final String TEST_CHANNEL_2 = "testChannel2"; - public static final List TEST_CHANNEL_NAMES = Lists.newArrayList(TEST_CHANNEL_1, TEST_CHANNEL_2); + public static final String TEST_CHANNEL_1 = "ArchiverChannel_1"; + public static final String TEST_CHANNEL_2 = "ArchiverChannel_2"; + private List channels = Lists.newArrayList(TEST_CHANNEL_1, TEST_CHANNEL_2); private final Random random = new Random(0); + private AtomicLong channelNameCallCounter = new AtomicLong(); + + + @Override + public Backend getBackend() { + return Backend.archiverappliance; + } @Override public Stream getChannelStream(String regex) { - Stream channelStream = TEST_CHANNEL_NAMES.stream(); + channels.add(ARCHIVER_TEST_CHANNEL + channelNameCallCounter.incrementAndGet()); + Stream channelStream = channels.stream(); if (regex != null) { Pattern pattern = Pattern.compile(regex); channelStream = channelStream.filter(channel -> pattern.matcher(channel).find()); diff --git a/src/test/java/ch/psi/daq/test/queryrest/query/DummyCassandraReader.java b/src/test/java/ch/psi/daq/test/queryrest/query/DummyCassandraReader.java index 769e035..8d033c6 100644 --- a/src/test/java/ch/psi/daq/test/queryrest/query/DummyCassandraReader.java +++ b/src/test/java/ch/psi/daq/test/queryrest/query/DummyCassandraReader.java @@ -7,6 +7,7 @@ import java.util.List; import java.util.Random; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import java.util.regex.Pattern; import java.util.stream.Collectors; import java.util.stream.LongStream; @@ -29,23 +30,25 @@ import ch.psi.daq.domain.cassandra.query.TimeRangeQuery; import ch.psi.daq.domain.cassandra.querying.ChannelEventQuery; import ch.psi.daq.domain.cassandra.querying.MetaChannelEvent; import ch.psi.daq.domain.cassandra.querying.EventQuery; +import ch.psi.daq.domain.reader.Backend; public class DummyCassandraReader implements CassandraReader { private static final Logger LOGGER = LoggerFactory.getLogger(DummyCassandraReader.class); + public static final String DATABUFFER_TEST_CHANNEL = "DataBufferTestChannel_"; private static final int KEYSPACE = 1; private CassandraDataGen dataGen; - private String[] channels; + private List channels; private Random random = new Random(0); + private AtomicLong channelNameCallCounter = new AtomicLong(); /** * */ public DummyCassandraReader() { this.dataGen = new CassandraDataGen(); - this.channels = new String[] { - "testChannel1", - "testChannel2", + + this.channels = Lists.newArrayList( "BoolScalar", "BoolWaveform", "Int8Scalar", @@ -68,7 +71,12 @@ public class DummyCassandraReader implements CassandraReader { "Float32Waveform", "Float64Scalar", "Float64Waveform", - "StringScalar"}; + "StringScalar"); + } + + @Override + public Backend getBackend() { + return Backend.databuffer; } /** @@ -76,10 +84,12 @@ public class DummyCassandraReader implements CassandraReader { */ @Override public Stream getChannelStream(String regex) { - Stream channelStream = Stream.of(channels); + channels.add(DATABUFFER_TEST_CHANNEL + channelNameCallCounter.incrementAndGet()); + + Stream channelStream = channels.stream(); if (regex != null) { Pattern pattern = Pattern.compile(regex.toLowerCase()); - channelStream = Stream.of(channels).filter(channel -> pattern.matcher(channel.toLowerCase()).find()); + channelStream = channelStream.filter(channel -> pattern.matcher(channel.toLowerCase()).find()); } return channelStream; }