From ebb904009da43b40d93bad1c578c046b5265bce1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fabian=20M=C3=A4rki?= Date: Fri, 31 Jul 2015 10:12:52 +0200 Subject: [PATCH] ATEST-123 --- Readme.md | 37 ++++++++----- .../daq/queryrest/config/QueryRestConfig.java | 13 +---- .../controller/QueryRestController.java | 52 +++++++++++-------- .../daq/queryrest/model/ChannelsRequest.java | 37 +++++++++++++ src/main/resources/queryrest.properties | 13 ++++- .../daq/test/queryrest/DaqWebMvcConfig.java | 14 ++--- 6 files changed, 110 insertions(+), 56 deletions(-) create mode 100644 src/main/java/ch/psi/daq/queryrest/model/ChannelsRequest.java diff --git a/Readme.md b/Readme.md index 543e57d..3594c55 100644 --- a/Readme.md +++ b/Readme.md @@ -31,24 +31,36 @@ It is possible to overwrite properties by defining new values in `${HOME}/.confi ### Request ``` -GET http://:/channels - -or - -GET http://:/channels/{regex} +POST http://:/channels ``` ### Example ``` -curl -H "Content-Type: application/json" -X GET http://sf-nube-14.psi.ch:8080/channels +curl -H "Content-Type: application/json" -X POST -d '{"dbMode":"databuffer"}' http://sf-nube-14.psi.ch:8080/channels or -curl -H "Content-Type: application/json" -X GET http://sf-nube-14.psi.ch:8080/channels/TRFCB +curl -H "Content-Type: application/json" -X POST -d '{"dbMode":"archiverappliance","regex":"TRFCA|TRFCB"}' http://sf-nube-14.psi.ch:8080/channels ``` +### Response example + +The response is a JSON array of channel names. + +``` +["channel1","channel2"] +``` + +### Channels Request + +Requests are defined using JSON. +There exist following fields: + +- **dbMode**: Specifies the database to query (values: **databuffer**|archiverappliance). +- **regex**: Specifies a filter (default is **no regex**). Filtering is done using regular expressions (see: [Pattern](https://docs.oracle.com/javase/8/docs/api/java/util/regex/Pattern.html). + ## Query Data @@ -56,7 +68,7 @@ curl -H "Content-Type: application/json" -X GET http://sf-nube-14.psi.ch:8080/ch ### Request ``` -GET http://:/query +POST http://:/query ``` ### Example @@ -64,7 +76,7 @@ GET http://:/query A request is performed using JSON. The JSON query defines the channels to be queried, the range, and how the data should be aggregated (this is optional but highly recommended). ``` -curl -H "Content-Type: application/json" -X GET -d '{"channels":["channel1","channel2"],"startPulseId":0,"endPulseId":4}' http://sf-nube-14.psi.ch:8080/channels +curl -H "Content-Type: application/json" -X POST -d '{"channels":["channel1","channel2"],"startPulseId":0,"endPulseId":4}' http://sf-nube-14.psi.ch:8080/channels ``` ### Response example @@ -126,7 +138,7 @@ The response is in JSON. ] ``` -### JSON Query +### Query Request Queries are defined using JSON. There exist following fields: @@ -141,9 +153,10 @@ There exist following fields: - **binSize**: Activates data binning. Specifies the number of pulses per bin for pulse-range queries or the number of milliseconds per bin for time-range queries. - **aggregations**: Activates data aggregation. Array of requested aggregations (see [here](https://github.psi.ch/projects/ST/repos/ch.psi.daq.query/browse/src/main/java/ch/psi/daq/query/model/Aggregation.java) for possible values). These values will be added to the *data* array response. - **aggregationType**: Specifies the type of aggregation (see [here](https://github.psi.ch/projects/ST/repos/ch.psi.daq.query/browse/src/main/java/ch/psi/daq/query/model/AggregationType.java)). The default type is *value* aggregation (e.g., sum([1,2,3])=6). Alternatively, it is possible to define *index* aggregation for multiple arrays in combination with binning (e.g., sum([1,2,3], [3,2,1]) = [4,4,4]). -- **aggregateChannels**: Specifies whether the data of the requested channels should be combined together using the defined aggregation (values: true|**false**) +- **aggregateChannels**: Specifies whether the data of the requested channels should be combined together using the defined aggregation (values: **false**|true) +- **dbMode**: Specifies the database to query (values: **databuffer**|archiverappliance). -### Example JSON Queries +### Query Examples **TODO:** \ No newline at end of file diff --git a/src/main/java/ch/psi/daq/queryrest/config/QueryRestConfig.java b/src/main/java/ch/psi/daq/queryrest/config/QueryRestConfig.java index f49b8ea..3adb70e 100644 --- a/src/main/java/ch/psi/daq/queryrest/config/QueryRestConfig.java +++ b/src/main/java/ch/psi/daq/queryrest/config/QueryRestConfig.java @@ -26,11 +26,9 @@ import ch.psi.daq.cassandra.util.test.CassandraDataGen; import ch.psi.daq.common.json.deserialize.AttributeBasedDeserializer; import ch.psi.daq.common.statistic.StorelessStatistics; import ch.psi.daq.domain.DataEvent; -import ch.psi.daq.query.analyzer.ArchiverApplianceQueryAnalyzer; -import ch.psi.daq.query.analyzer.CassandraQueryAnalyzer; +import ch.psi.daq.query.analyzer.QueryAnalyzerImpl; import ch.psi.daq.query.analyzer.QueryAnalyzer; import ch.psi.daq.query.config.QueryConfig; -import ch.psi.daq.query.config.QueryRunMode; import ch.psi.daq.query.model.AbstractQuery; import ch.psi.daq.query.model.Aggregation; import ch.psi.daq.query.model.Query; @@ -65,9 +63,6 @@ public class QueryRestConfig { @Resource private Environment env; - @Resource - private QueryRunMode queryRunMode; - @Bean public ObjectMapper objectMapper() { ObjectMapper mapper = new ObjectMapper(); @@ -103,11 +98,7 @@ public class QueryRestConfig { @Bean public Function queryAnalizerFactory() { - if (QueryRunMode.archiverappliance.equals(queryRunMode)) { - return (query) -> new ArchiverApplianceQueryAnalyzer(query); - } else { - return (query) -> new CassandraQueryAnalyzer(query); - } + return (query) -> new QueryAnalyzerImpl(query); } @Bean diff --git a/src/main/java/ch/psi/daq/queryrest/controller/QueryRestController.java b/src/main/java/ch/psi/daq/queryrest/controller/QueryRestController.java index 1ae6532..17edf41 100644 --- a/src/main/java/ch/psi/daq/queryrest/controller/QueryRestController.java +++ b/src/main/java/ch/psi/daq/queryrest/controller/QueryRestController.java @@ -26,10 +26,12 @@ import ch.psi.daq.domain.DataEvent; import ch.psi.daq.query.analyzer.QueryAnalyzer; import ch.psi.daq.query.model.AbstractQuery; import ch.psi.daq.query.model.Aggregation; +import ch.psi.daq.query.model.DBMode; import ch.psi.daq.query.model.Query; import ch.psi.daq.query.model.QueryField; import ch.psi.daq.query.processor.QueryProcessor; import ch.psi.daq.queryrest.config.QueryRestConfig; +import ch.psi.daq.queryrest.model.ChannelsRequest; import ch.psi.daq.queryrest.response.ResponseStreamWriter; @RestController @@ -38,51 +40,45 @@ public class QueryRestController { private static final Logger LOGGER = LoggerFactory.getLogger(QueryRestController.class); public static final String CHANNELS = "channels"; - public static final String CHANNELS_REGEX = CHANNELS + "/{regex}"; public static final String QUERY = "query"; @Resource private ResponseStreamWriter responseStreamWriter; @Resource - private QueryProcessor queryProcessor; + private QueryProcessor cassandraQueryProcessor; + + @Resource + private QueryProcessor archiverApplianceQueryProcessor; @Resource private Function queryAnalizerFactory; @Resource(name = QueryRestConfig.BEAN_NAME_DEFAULT_RESPONSE_FIELDS) private Set defaultResponseFields; - + @Resource(name = QueryRestConfig.BEAN_NAME_DEFAULT_RESPONSE_AGGREGATIONS) private Set defaultResponseAggregations; @RequestMapping( value = CHANNELS, - method = RequestMethod.GET, + method = {RequestMethod.GET, RequestMethod.POST}, produces = {MediaType.APPLICATION_JSON_VALUE}) - public @ResponseBody Collection getChannels() throws Throwable { + public @ResponseBody Collection getChannels(@RequestBody(required = false) ChannelsRequest request) + throws Throwable { + // in case not specified use default (e.g. GET) + if (request == null) { + request = new ChannelsRequest(); + } + try { - return queryProcessor.getChannels(); + return getQueryProcessor(request.getDbMode()).getChannels(request.getRegex()); } catch (Throwable t) { LOGGER.error("Failed to query channel names.", t); throw t; } } - @RequestMapping( - value = CHANNELS_REGEX, - method = RequestMethod.POST, - consumes = {MediaType.APPLICATION_JSON_VALUE}, - produces = {MediaType.APPLICATION_JSON_VALUE}) - public @ResponseBody Collection getChannels(@RequestBody String regex) throws Throwable { - try { - return queryProcessor.getChannels(regex); - } catch (Throwable t) { - LOGGER.error("Failed to query channel names with regex '{}'.", regex, t); - throw t; - } - } - @RequestMapping( value = QUERY, method = RequestMethod.POST, @@ -98,7 +94,8 @@ public class QueryRestController { extendQuery(query); // all the magic happens here - Stream>> channelToDataEvents = queryProcessor.process(queryAnalizer); + Stream>> channelToDataEvents = + getQueryProcessor(query.getDBMode()).process(queryAnalizer); // do post-process Stream> channelToData = queryAnalizer.postProcess(channelToDataEvents); @@ -111,11 +108,22 @@ public class QueryRestController { } } + private QueryProcessor getQueryProcessor(DBMode dbMode) { + if (DBMode.databuffer.equals(dbMode)) { + return cassandraQueryProcessor; + } else if (DBMode.archiverappliance.equals(dbMode)) { + return archiverApplianceQueryProcessor; + } else { + LOGGER.error("Unknown DBMode '{}'!", dbMode); + throw new IllegalArgumentException(String.format("Unknown DBMode '%s'", dbMode)); + } + } + private void extendQuery(AbstractQuery query) { if (query.getFields() == null || query.getFields().isEmpty()) { query.setFields(new LinkedHashSet<>(defaultResponseFields)); } - if(query.getAggregations() == null || query.getAggregations().isEmpty()){ + if (query.getAggregations() == null || query.getAggregations().isEmpty()) { query.setAggregations(new LinkedList<>(defaultResponseAggregations)); } } diff --git a/src/main/java/ch/psi/daq/queryrest/model/ChannelsRequest.java b/src/main/java/ch/psi/daq/queryrest/model/ChannelsRequest.java new file mode 100644 index 0000000..7474428 --- /dev/null +++ b/src/main/java/ch/psi/daq/queryrest/model/ChannelsRequest.java @@ -0,0 +1,37 @@ +package ch.psi.daq.queryrest.model; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonInclude.Include; + +import ch.psi.daq.query.model.DBMode; + +// as RequestBody due to special chars in regex +@JsonInclude(Include.NON_DEFAULT) +public class ChannelsRequest { + private DBMode dbMode = DBMode.databuffer; + // null for no regex + private String regex = null; + + public ChannelsRequest() {} + + public ChannelsRequest(DBMode dbMode, String regex) { + this.regex = regex; + this.dbMode = dbMode; + } + + public DBMode getDbMode() { + return dbMode; + } + + public void setDbMode(DBMode dbMode) { + this.dbMode = dbMode; + } + + public String getRegex() { + return regex; + } + + public void setRegex(String regex) { + this.regex = regex; + } +} diff --git a/src/main/resources/queryrest.properties b/src/main/resources/queryrest.properties index 1050546..09a1bb6 100644 --- a/src/main/resources/queryrest.properties +++ b/src/main/resources/queryrest.properties @@ -4,4 +4,15 @@ server.port=8080 # defines the fields that are included in the response # if no fields have been specified by the user queryrest.default.response.fields=channel,pulseId,globalMillis,globalNanos,value -queryrest.default.response.aggregations=min,max,sum \ No newline at end of file +queryrest.default.response.aggregations=min,max,sum + +# defines the list of hosts who are tried for an initial connection to the cluster +hazelcast.query.initialcandidates=localhost + + + +cassandra.basekeyspace=daq_query_test + +# defines the cluster group and its password +hazelcast.query.group.name=QueryClusterTest +hazelcast.query.group.password=a3&PvvHh7f#6HjAx5Da$ \ No newline at end of file diff --git a/src/test/java/ch/psi/daq/test/queryrest/DaqWebMvcConfig.java b/src/test/java/ch/psi/daq/test/queryrest/DaqWebMvcConfig.java index 1693604..baf858f 100644 --- a/src/test/java/ch/psi/daq/test/queryrest/DaqWebMvcConfig.java +++ b/src/test/java/ch/psi/daq/test/queryrest/DaqWebMvcConfig.java @@ -8,9 +8,8 @@ import org.springframework.context.annotation.PropertySources; import org.springframework.web.servlet.config.annotation.EnableWebMvc; import org.springframework.web.servlet.config.annotation.WebMvcConfigurationSupport; -import ch.psi.daq.domain.reader.DataReader; +import ch.psi.daq.query.processor.QueryProcessorLocal; import ch.psi.daq.query.processor.QueryProcessor; -import ch.psi.daq.query.processor.cassandra.CassandraQueryProcessorLocal; import ch.psi.daq.test.query.config.LocalQueryTestConfig; import ch.psi.daq.test.queryrest.query.DummyDataReader; @@ -26,14 +25,9 @@ public class DaqWebMvcConfig extends WebMvcConfigurationSupport { @Import(value = {LocalQueryTestConfig.class}) static class InnerConfiguration { } - + @Bean public QueryProcessor queryProcessor() { - return new CassandraQueryProcessorLocal(); + return new QueryProcessorLocal(new DummyDataReader()); } - - @Bean - public DataReader dataReader() { - return new DummyDataReader(); - } -} \ No newline at end of file +}