ATEST-123
This commit is contained in:
37
Readme.md
37
Readme.md
@ -31,24 +31,36 @@ It is possible to overwrite properties by defining new values in `${HOME}/.confi
|
|||||||
### Request
|
### Request
|
||||||
|
|
||||||
```
|
```
|
||||||
GET http://<host>:<port>/channels
|
POST http://<host>:<port>/channels
|
||||||
|
|
||||||
or
|
|
||||||
|
|
||||||
GET http://<host>:<port>/channels/{regex}
|
|
||||||
```
|
```
|
||||||
|
|
||||||
|
|
||||||
### Example
|
### Example
|
||||||
|
|
||||||
```
|
```
|
||||||
curl -H "Content-Type: application/json" -X GET http://sf-nube-14.psi.ch:8080/channels
|
curl -H "Content-Type: application/json" -X POST -d '{"dbMode":"databuffer"}' http://sf-nube-14.psi.ch:8080/channels
|
||||||
|
|
||||||
or
|
or
|
||||||
|
|
||||||
curl -H "Content-Type: application/json" -X GET http://sf-nube-14.psi.ch:8080/channels/TRFCB
|
curl -H "Content-Type: application/json" -X POST -d '{"dbMode":"archiverappliance","regex":"TRFCA|TRFCB"}' http://sf-nube-14.psi.ch:8080/channels
|
||||||
```
|
```
|
||||||
|
|
||||||
|
### Response example
|
||||||
|
|
||||||
|
The response is a JSON array of channel names.
|
||||||
|
|
||||||
|
```
|
||||||
|
["channel1","channel2"]
|
||||||
|
```
|
||||||
|
|
||||||
|
### Channels Request
|
||||||
|
|
||||||
|
Requests are defined using JSON.
|
||||||
|
There exist following fields:
|
||||||
|
|
||||||
|
- **dbMode**: Specifies the database to query (values: **databuffer**|archiverappliance).
|
||||||
|
- **regex**: Specifies a filter (default is **no regex**). Filtering is done using regular expressions (see: [Pattern](https://docs.oracle.com/javase/8/docs/api/java/util/regex/Pattern.html).
|
||||||
|
|
||||||
<a name="query_data"/>
|
<a name="query_data"/>
|
||||||
|
|
||||||
## Query Data
|
## Query Data
|
||||||
@ -56,7 +68,7 @@ curl -H "Content-Type: application/json" -X GET http://sf-nube-14.psi.ch:8080/ch
|
|||||||
### Request
|
### Request
|
||||||
|
|
||||||
```
|
```
|
||||||
GET http://<host>:<port>/query
|
POST http://<host>:<port>/query
|
||||||
```
|
```
|
||||||
|
|
||||||
### Example
|
### Example
|
||||||
@ -64,7 +76,7 @@ GET http://<host>:<port>/query
|
|||||||
A request is performed using JSON. The JSON query defines the channels to be queried, the range, and how the data should be aggregated (this is optional but highly recommended).
|
A request is performed using JSON. The JSON query defines the channels to be queried, the range, and how the data should be aggregated (this is optional but highly recommended).
|
||||||
|
|
||||||
```
|
```
|
||||||
curl -H "Content-Type: application/json" -X GET -d '{"channels":["channel1","channel2"],"startPulseId":0,"endPulseId":4}' http://sf-nube-14.psi.ch:8080/channels
|
curl -H "Content-Type: application/json" -X POST -d '{"channels":["channel1","channel2"],"startPulseId":0,"endPulseId":4}' http://sf-nube-14.psi.ch:8080/channels
|
||||||
```
|
```
|
||||||
|
|
||||||
### Response example
|
### Response example
|
||||||
@ -126,7 +138,7 @@ The response is in JSON.
|
|||||||
]
|
]
|
||||||
```
|
```
|
||||||
|
|
||||||
### JSON Query
|
### Query Request
|
||||||
|
|
||||||
Queries are defined using JSON.
|
Queries are defined using JSON.
|
||||||
There exist following fields:
|
There exist following fields:
|
||||||
@ -141,9 +153,10 @@ There exist following fields:
|
|||||||
- **binSize**: Activates data binning. Specifies the number of pulses per bin for pulse-range queries or the number of milliseconds per bin for time-range queries.
|
- **binSize**: Activates data binning. Specifies the number of pulses per bin for pulse-range queries or the number of milliseconds per bin for time-range queries.
|
||||||
- **aggregations**: Activates data aggregation. Array of requested aggregations (see [here](https://github.psi.ch/projects/ST/repos/ch.psi.daq.query/browse/src/main/java/ch/psi/daq/query/model/Aggregation.java) for possible values). These values will be added to the *data* array response.
|
- **aggregations**: Activates data aggregation. Array of requested aggregations (see [here](https://github.psi.ch/projects/ST/repos/ch.psi.daq.query/browse/src/main/java/ch/psi/daq/query/model/Aggregation.java) for possible values). These values will be added to the *data* array response.
|
||||||
- **aggregationType**: Specifies the type of aggregation (see [here](https://github.psi.ch/projects/ST/repos/ch.psi.daq.query/browse/src/main/java/ch/psi/daq/query/model/AggregationType.java)). The default type is *value* aggregation (e.g., sum([1,2,3])=6). Alternatively, it is possible to define *index* aggregation for multiple arrays in combination with binning (e.g., sum([1,2,3], [3,2,1]) = [4,4,4]).
|
- **aggregationType**: Specifies the type of aggregation (see [here](https://github.psi.ch/projects/ST/repos/ch.psi.daq.query/browse/src/main/java/ch/psi/daq/query/model/AggregationType.java)). The default type is *value* aggregation (e.g., sum([1,2,3])=6). Alternatively, it is possible to define *index* aggregation for multiple arrays in combination with binning (e.g., sum([1,2,3], [3,2,1]) = [4,4,4]).
|
||||||
- **aggregateChannels**: Specifies whether the data of the requested channels should be combined together using the defined aggregation (values: true|**false**)
|
- **aggregateChannels**: Specifies whether the data of the requested channels should be combined together using the defined aggregation (values: **false**|true)
|
||||||
|
- **dbMode**: Specifies the database to query (values: **databuffer**|archiverappliance).
|
||||||
|
|
||||||
|
|
||||||
### Example JSON Queries
|
### Query Examples
|
||||||
|
|
||||||
**TODO:**
|
**TODO:**
|
@ -26,11 +26,9 @@ import ch.psi.daq.cassandra.util.test.CassandraDataGen;
|
|||||||
import ch.psi.daq.common.json.deserialize.AttributeBasedDeserializer;
|
import ch.psi.daq.common.json.deserialize.AttributeBasedDeserializer;
|
||||||
import ch.psi.daq.common.statistic.StorelessStatistics;
|
import ch.psi.daq.common.statistic.StorelessStatistics;
|
||||||
import ch.psi.daq.domain.DataEvent;
|
import ch.psi.daq.domain.DataEvent;
|
||||||
import ch.psi.daq.query.analyzer.ArchiverApplianceQueryAnalyzer;
|
import ch.psi.daq.query.analyzer.QueryAnalyzerImpl;
|
||||||
import ch.psi.daq.query.analyzer.CassandraQueryAnalyzer;
|
|
||||||
import ch.psi.daq.query.analyzer.QueryAnalyzer;
|
import ch.psi.daq.query.analyzer.QueryAnalyzer;
|
||||||
import ch.psi.daq.query.config.QueryConfig;
|
import ch.psi.daq.query.config.QueryConfig;
|
||||||
import ch.psi.daq.query.config.QueryRunMode;
|
|
||||||
import ch.psi.daq.query.model.AbstractQuery;
|
import ch.psi.daq.query.model.AbstractQuery;
|
||||||
import ch.psi.daq.query.model.Aggregation;
|
import ch.psi.daq.query.model.Aggregation;
|
||||||
import ch.psi.daq.query.model.Query;
|
import ch.psi.daq.query.model.Query;
|
||||||
@ -65,9 +63,6 @@ public class QueryRestConfig {
|
|||||||
@Resource
|
@Resource
|
||||||
private Environment env;
|
private Environment env;
|
||||||
|
|
||||||
@Resource
|
|
||||||
private QueryRunMode queryRunMode;
|
|
||||||
|
|
||||||
@Bean
|
@Bean
|
||||||
public ObjectMapper objectMapper() {
|
public ObjectMapper objectMapper() {
|
||||||
ObjectMapper mapper = new ObjectMapper();
|
ObjectMapper mapper = new ObjectMapper();
|
||||||
@ -103,11 +98,7 @@ public class QueryRestConfig {
|
|||||||
|
|
||||||
@Bean
|
@Bean
|
||||||
public Function<Query, QueryAnalyzer> queryAnalizerFactory() {
|
public Function<Query, QueryAnalyzer> queryAnalizerFactory() {
|
||||||
if (QueryRunMode.archiverappliance.equals(queryRunMode)) {
|
return (query) -> new QueryAnalyzerImpl(query);
|
||||||
return (query) -> new ArchiverApplianceQueryAnalyzer(query);
|
|
||||||
} else {
|
|
||||||
return (query) -> new CassandraQueryAnalyzer(query);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Bean
|
@Bean
|
||||||
|
@ -26,10 +26,12 @@ import ch.psi.daq.domain.DataEvent;
|
|||||||
import ch.psi.daq.query.analyzer.QueryAnalyzer;
|
import ch.psi.daq.query.analyzer.QueryAnalyzer;
|
||||||
import ch.psi.daq.query.model.AbstractQuery;
|
import ch.psi.daq.query.model.AbstractQuery;
|
||||||
import ch.psi.daq.query.model.Aggregation;
|
import ch.psi.daq.query.model.Aggregation;
|
||||||
|
import ch.psi.daq.query.model.DBMode;
|
||||||
import ch.psi.daq.query.model.Query;
|
import ch.psi.daq.query.model.Query;
|
||||||
import ch.psi.daq.query.model.QueryField;
|
import ch.psi.daq.query.model.QueryField;
|
||||||
import ch.psi.daq.query.processor.QueryProcessor;
|
import ch.psi.daq.query.processor.QueryProcessor;
|
||||||
import ch.psi.daq.queryrest.config.QueryRestConfig;
|
import ch.psi.daq.queryrest.config.QueryRestConfig;
|
||||||
|
import ch.psi.daq.queryrest.model.ChannelsRequest;
|
||||||
import ch.psi.daq.queryrest.response.ResponseStreamWriter;
|
import ch.psi.daq.queryrest.response.ResponseStreamWriter;
|
||||||
|
|
||||||
@RestController
|
@RestController
|
||||||
@ -38,51 +40,45 @@ public class QueryRestController {
|
|||||||
private static final Logger LOGGER = LoggerFactory.getLogger(QueryRestController.class);
|
private static final Logger LOGGER = LoggerFactory.getLogger(QueryRestController.class);
|
||||||
|
|
||||||
public static final String CHANNELS = "channels";
|
public static final String CHANNELS = "channels";
|
||||||
public static final String CHANNELS_REGEX = CHANNELS + "/{regex}";
|
|
||||||
public static final String QUERY = "query";
|
public static final String QUERY = "query";
|
||||||
|
|
||||||
@Resource
|
@Resource
|
||||||
private ResponseStreamWriter responseStreamWriter;
|
private ResponseStreamWriter responseStreamWriter;
|
||||||
|
|
||||||
@Resource
|
@Resource
|
||||||
private QueryProcessor queryProcessor;
|
private QueryProcessor cassandraQueryProcessor;
|
||||||
|
|
||||||
|
@Resource
|
||||||
|
private QueryProcessor archiverApplianceQueryProcessor;
|
||||||
|
|
||||||
@Resource
|
@Resource
|
||||||
private Function<Query, QueryAnalyzer> queryAnalizerFactory;
|
private Function<Query, QueryAnalyzer> queryAnalizerFactory;
|
||||||
|
|
||||||
@Resource(name = QueryRestConfig.BEAN_NAME_DEFAULT_RESPONSE_FIELDS)
|
@Resource(name = QueryRestConfig.BEAN_NAME_DEFAULT_RESPONSE_FIELDS)
|
||||||
private Set<QueryField> defaultResponseFields;
|
private Set<QueryField> defaultResponseFields;
|
||||||
|
|
||||||
@Resource(name = QueryRestConfig.BEAN_NAME_DEFAULT_RESPONSE_AGGREGATIONS)
|
@Resource(name = QueryRestConfig.BEAN_NAME_DEFAULT_RESPONSE_AGGREGATIONS)
|
||||||
private Set<Aggregation> defaultResponseAggregations;
|
private Set<Aggregation> defaultResponseAggregations;
|
||||||
|
|
||||||
@RequestMapping(
|
@RequestMapping(
|
||||||
value = CHANNELS,
|
value = CHANNELS,
|
||||||
method = RequestMethod.GET,
|
method = {RequestMethod.GET, RequestMethod.POST},
|
||||||
produces = {MediaType.APPLICATION_JSON_VALUE})
|
produces = {MediaType.APPLICATION_JSON_VALUE})
|
||||||
public @ResponseBody Collection<String> getChannels() throws Throwable {
|
public @ResponseBody Collection<String> getChannels(@RequestBody(required = false) ChannelsRequest request)
|
||||||
|
throws Throwable {
|
||||||
|
// in case not specified use default (e.g. GET)
|
||||||
|
if (request == null) {
|
||||||
|
request = new ChannelsRequest();
|
||||||
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
return queryProcessor.getChannels();
|
return getQueryProcessor(request.getDbMode()).getChannels(request.getRegex());
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
LOGGER.error("Failed to query channel names.", t);
|
LOGGER.error("Failed to query channel names.", t);
|
||||||
throw t;
|
throw t;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@RequestMapping(
|
|
||||||
value = CHANNELS_REGEX,
|
|
||||||
method = RequestMethod.POST,
|
|
||||||
consumes = {MediaType.APPLICATION_JSON_VALUE},
|
|
||||||
produces = {MediaType.APPLICATION_JSON_VALUE})
|
|
||||||
public @ResponseBody Collection<String> getChannels(@RequestBody String regex) throws Throwable {
|
|
||||||
try {
|
|
||||||
return queryProcessor.getChannels(regex);
|
|
||||||
} catch (Throwable t) {
|
|
||||||
LOGGER.error("Failed to query channel names with regex '{}'.", regex, t);
|
|
||||||
throw t;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@RequestMapping(
|
@RequestMapping(
|
||||||
value = QUERY,
|
value = QUERY,
|
||||||
method = RequestMethod.POST,
|
method = RequestMethod.POST,
|
||||||
@ -98,7 +94,8 @@ public class QueryRestController {
|
|||||||
extendQuery(query);
|
extendQuery(query);
|
||||||
|
|
||||||
// all the magic happens here
|
// all the magic happens here
|
||||||
Stream<Entry<String, Stream<? extends DataEvent>>> channelToDataEvents = queryProcessor.process(queryAnalizer);
|
Stream<Entry<String, Stream<? extends DataEvent>>> channelToDataEvents =
|
||||||
|
getQueryProcessor(query.getDBMode()).process(queryAnalizer);
|
||||||
|
|
||||||
// do post-process
|
// do post-process
|
||||||
Stream<Entry<String, ?>> channelToData = queryAnalizer.postProcess(channelToDataEvents);
|
Stream<Entry<String, ?>> channelToData = queryAnalizer.postProcess(channelToDataEvents);
|
||||||
@ -111,11 +108,22 @@ public class QueryRestController {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private QueryProcessor getQueryProcessor(DBMode dbMode) {
|
||||||
|
if (DBMode.databuffer.equals(dbMode)) {
|
||||||
|
return cassandraQueryProcessor;
|
||||||
|
} else if (DBMode.archiverappliance.equals(dbMode)) {
|
||||||
|
return archiverApplianceQueryProcessor;
|
||||||
|
} else {
|
||||||
|
LOGGER.error("Unknown DBMode '{}'!", dbMode);
|
||||||
|
throw new IllegalArgumentException(String.format("Unknown DBMode '%s'", dbMode));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private void extendQuery(AbstractQuery query) {
|
private void extendQuery(AbstractQuery query) {
|
||||||
if (query.getFields() == null || query.getFields().isEmpty()) {
|
if (query.getFields() == null || query.getFields().isEmpty()) {
|
||||||
query.setFields(new LinkedHashSet<>(defaultResponseFields));
|
query.setFields(new LinkedHashSet<>(defaultResponseFields));
|
||||||
}
|
}
|
||||||
if(query.getAggregations() == null || query.getAggregations().isEmpty()){
|
if (query.getAggregations() == null || query.getAggregations().isEmpty()) {
|
||||||
query.setAggregations(new LinkedList<>(defaultResponseAggregations));
|
query.setAggregations(new LinkedList<>(defaultResponseAggregations));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,37 @@
|
|||||||
|
package ch.psi.daq.queryrest.model;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.annotation.JsonInclude;
|
||||||
|
import com.fasterxml.jackson.annotation.JsonInclude.Include;
|
||||||
|
|
||||||
|
import ch.psi.daq.query.model.DBMode;
|
||||||
|
|
||||||
|
// as RequestBody due to special chars in regex
|
||||||
|
@JsonInclude(Include.NON_DEFAULT)
|
||||||
|
public class ChannelsRequest {
|
||||||
|
private DBMode dbMode = DBMode.databuffer;
|
||||||
|
// null for no regex
|
||||||
|
private String regex = null;
|
||||||
|
|
||||||
|
public ChannelsRequest() {}
|
||||||
|
|
||||||
|
public ChannelsRequest(DBMode dbMode, String regex) {
|
||||||
|
this.regex = regex;
|
||||||
|
this.dbMode = dbMode;
|
||||||
|
}
|
||||||
|
|
||||||
|
public DBMode getDbMode() {
|
||||||
|
return dbMode;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setDbMode(DBMode dbMode) {
|
||||||
|
this.dbMode = dbMode;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getRegex() {
|
||||||
|
return regex;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setRegex(String regex) {
|
||||||
|
this.regex = regex;
|
||||||
|
}
|
||||||
|
}
|
@ -4,4 +4,15 @@ server.port=8080
|
|||||||
# defines the fields that are included in the response
|
# defines the fields that are included in the response
|
||||||
# if no fields have been specified by the user
|
# if no fields have been specified by the user
|
||||||
queryrest.default.response.fields=channel,pulseId,globalMillis,globalNanos,value
|
queryrest.default.response.fields=channel,pulseId,globalMillis,globalNanos,value
|
||||||
queryrest.default.response.aggregations=min,max,sum
|
queryrest.default.response.aggregations=min,max,sum
|
||||||
|
|
||||||
|
# defines the list of hosts who are tried for an initial connection to the cluster
|
||||||
|
hazelcast.query.initialcandidates=localhost
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
cassandra.basekeyspace=daq_query_test
|
||||||
|
|
||||||
|
# defines the cluster group and its password
|
||||||
|
hazelcast.query.group.name=QueryClusterTest
|
||||||
|
hazelcast.query.group.password=a3&PvvHh7f#6HjAx5Da$
|
@ -8,9 +8,8 @@ import org.springframework.context.annotation.PropertySources;
|
|||||||
import org.springframework.web.servlet.config.annotation.EnableWebMvc;
|
import org.springframework.web.servlet.config.annotation.EnableWebMvc;
|
||||||
import org.springframework.web.servlet.config.annotation.WebMvcConfigurationSupport;
|
import org.springframework.web.servlet.config.annotation.WebMvcConfigurationSupport;
|
||||||
|
|
||||||
import ch.psi.daq.domain.reader.DataReader;
|
import ch.psi.daq.query.processor.QueryProcessorLocal;
|
||||||
import ch.psi.daq.query.processor.QueryProcessor;
|
import ch.psi.daq.query.processor.QueryProcessor;
|
||||||
import ch.psi.daq.query.processor.cassandra.CassandraQueryProcessorLocal;
|
|
||||||
import ch.psi.daq.test.query.config.LocalQueryTestConfig;
|
import ch.psi.daq.test.query.config.LocalQueryTestConfig;
|
||||||
import ch.psi.daq.test.queryrest.query.DummyDataReader;
|
import ch.psi.daq.test.queryrest.query.DummyDataReader;
|
||||||
|
|
||||||
@ -26,14 +25,9 @@ public class DaqWebMvcConfig extends WebMvcConfigurationSupport {
|
|||||||
@Import(value = {LocalQueryTestConfig.class})
|
@Import(value = {LocalQueryTestConfig.class})
|
||||||
static class InnerConfiguration {
|
static class InnerConfiguration {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Bean
|
@Bean
|
||||||
public QueryProcessor queryProcessor() {
|
public QueryProcessor queryProcessor() {
|
||||||
return new CassandraQueryProcessorLocal();
|
return new QueryProcessorLocal(new DummyDataReader());
|
||||||
}
|
}
|
||||||
|
}
|
||||||
@Bean
|
|
||||||
public DataReader dataReader() {
|
|
||||||
return new DummyDataReader();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
Reference in New Issue
Block a user