Reformatting.
This commit is contained in:
@ -42,152 +42,145 @@ import ch.psi.daq.queryrest.response.ResponseStreamWriter;
|
|||||||
@RestController
|
@RestController
|
||||||
public class QueryRestController {
|
public class QueryRestController {
|
||||||
|
|
||||||
private static final Logger LOGGER = LoggerFactory.getLogger(QueryRestController.class);
|
private static final Logger LOGGER = LoggerFactory.getLogger(QueryRestController.class);
|
||||||
|
|
||||||
public static final String CHANNELS = "/channels";
|
public static final String CHANNELS = "/channels";
|
||||||
public static final String QUERY = "/query";
|
public static final String QUERY = "/query";
|
||||||
|
|
||||||
@Resource
|
@Resource
|
||||||
private Validator queryValidator;
|
private Validator queryValidator;
|
||||||
private Validator requestProviderValidator = new RequestProviderValidator();
|
private Validator requestProviderValidator = new RequestProviderValidator();
|
||||||
|
|
||||||
@Resource
|
@Resource
|
||||||
private ResponseStreamWriter responseStreamWriter;
|
private ResponseStreamWriter responseStreamWriter;
|
||||||
|
|
||||||
@Resource
|
@Resource
|
||||||
private QueryProcessor cassandraQueryProcessor;
|
private QueryProcessor cassandraQueryProcessor;
|
||||||
|
|
||||||
@Resource
|
@Resource
|
||||||
private QueryProcessor archiverApplianceQueryProcessor;
|
private QueryProcessor archiverApplianceQueryProcessor;
|
||||||
|
|
||||||
@Resource
|
@Resource
|
||||||
private Function<Query, QueryAnalyzer> queryAnalizerFactory;
|
private Function<Query, QueryAnalyzer> queryAnalizerFactory;
|
||||||
|
|
||||||
@Resource(name = QueryRestConfig.BEAN_NAME_DEFAULT_RESPONSE_FIELDS)
|
@Resource(name = QueryRestConfig.BEAN_NAME_DEFAULT_RESPONSE_FIELDS)
|
||||||
private Set<QueryField> defaultResponseFields;
|
private Set<QueryField> defaultResponseFields;
|
||||||
|
|
||||||
@Resource(name = QueryRestConfig.BEAN_NAME_DEFAULT_RESPONSE_AGGREGATIONS)
|
@Resource(name = QueryRestConfig.BEAN_NAME_DEFAULT_RESPONSE_AGGREGATIONS)
|
||||||
private Set<Aggregation> defaultResponseAggregations;
|
private Set<Aggregation> defaultResponseAggregations;
|
||||||
|
|
||||||
@InitBinder
|
@InitBinder
|
||||||
protected void initBinder(WebDataBinder binder) {
|
protected void initBinder(WebDataBinder binder) {
|
||||||
if (binder.getTarget() != null) {
|
if (binder.getTarget() != null) {
|
||||||
if (requestProviderValidator.supports(binder.getTarget().getClass())) {
|
if (requestProviderValidator.supports(binder.getTarget().getClass())) {
|
||||||
binder.addValidators(requestProviderValidator);
|
binder.addValidators(requestProviderValidator);
|
||||||
}
|
}
|
||||||
if (queryValidator.supports(binder.getTarget().getClass())) {
|
if (queryValidator.supports(binder.getTarget().getClass())) {
|
||||||
binder.addValidators(queryValidator);
|
binder.addValidators(queryValidator);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@RequestMapping(value = CHANNELS, method = { RequestMethod.GET, RequestMethod.POST }, produces = { MediaType.APPLICATION_JSON_VALUE })
|
@RequestMapping(value = CHANNELS, method = {RequestMethod.GET, RequestMethod.POST},
|
||||||
public @ResponseBody Collection<String> getChannels(@RequestBody(required = false) ChannelsRequest request)
|
produces = {MediaType.APPLICATION_JSON_VALUE})
|
||||||
throws Throwable {
|
public @ResponseBody Collection<String> getChannels(@RequestBody(required = false) ChannelsRequest request)
|
||||||
// in case not specified use default (e.g. GET)
|
throws Throwable {
|
||||||
if (request == null) {
|
// in case not specified use default (e.g. GET)
|
||||||
request = new ChannelsRequest();
|
if (request == null) {
|
||||||
}
|
request = new ChannelsRequest();
|
||||||
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
// sorted collection
|
// sorted collection
|
||||||
Collection<String> allChannels = getQueryProcessor(request.getDbMode()).getChannels(request.getRegex());
|
Collection<String> allChannels = getQueryProcessor(request.getDbMode()).getChannels(request.getRegex());
|
||||||
return allChannels;
|
return allChannels;
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
LOGGER.error("Failed to query channel names.", t);
|
LOGGER.error("Failed to query channel names.", t);
|
||||||
throw t;
|
throw t;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Query specific channel names, and return only those.
|
* Query specific channel names, and return only those.
|
||||||
*
|
*
|
||||||
* @param channelName
|
* @param channelName part of (or full) channel name
|
||||||
* part of (or full) channel name
|
* @return Collection of channel names matching the specified input channel name
|
||||||
* @return Collection of channel names matching the specified input channel
|
*/
|
||||||
* name
|
@RequestMapping(value = CHANNELS + "/{channelName}", method = {RequestMethod.GET},
|
||||||
*/
|
produces = {MediaType.APPLICATION_JSON_VALUE})
|
||||||
@RequestMapping(value = CHANNELS + "/{channelName}", method = { RequestMethod.GET }, produces = { MediaType.APPLICATION_JSON_VALUE })
|
public @ResponseBody Collection<String> getChannels(@PathVariable(value = "channelName") String channelName)
|
||||||
public @ResponseBody Collection<String> getChannels(@PathVariable(value = "channelName") String channelName) {
|
throws Throwable {
|
||||||
|
return getChannels(new ChannelsRequest(channelName));
|
||||||
|
}
|
||||||
|
|
||||||
ChannelsRequest request = new ChannelsRequest(channelName);
|
/**
|
||||||
Collection<String> specificChannels = getQueryProcessor(request.getDbMode()).getChannels(request.getRegex());
|
* Catch-all query method for getting data from the backend.
|
||||||
return specificChannels;
|
* <p>
|
||||||
}
|
* The {@link AbstractQuery} object will be a concrete subclass based on the combination of
|
||||||
|
* fields defined in the user's query. The {@link AttributeBasedDeserializer} decides which class
|
||||||
|
* to deserialize the information into and has been configured (see
|
||||||
|
* QueryRestConfig#afterPropertiesSet) accordingly.
|
||||||
|
*
|
||||||
|
* @param query concrete implementation of {@link AbstractQuery}
|
||||||
|
* @param res the {@link HttpServletResponse} instance associated with this request
|
||||||
|
* @throws IOException thrown if writing to the output stream fails
|
||||||
|
*/
|
||||||
|
@RequestMapping(value = QUERY, method = RequestMethod.POST, consumes = {MediaType.APPLICATION_JSON_VALUE})
|
||||||
|
public void executeQuery(@RequestBody @Valid AbstractQuery query, HttpServletResponse res) throws IOException {
|
||||||
|
try {
|
||||||
|
LOGGER.debug("Executing query '{}'", query.toString());
|
||||||
|
|
||||||
/**
|
// write the response back to the client using java 8 streams
|
||||||
* Catch-all query method for getting data from the backend.
|
responseStreamWriter.respond(executeQuery(query), query, res);
|
||||||
* <p>
|
} catch (IOException t) {
|
||||||
* The {@link AbstractQuery} object will be a concrete subclass based on the
|
LOGGER.error("Failed to execute query '{}'.", query, t);
|
||||||
* combination of fields defined in the user's query. The
|
throw t;
|
||||||
* {@link AttributeBasedDeserializer} decides which class to deserialize the
|
}
|
||||||
* information into and has been configured (see
|
}
|
||||||
* QueryRestConfig#afterPropertiesSet) accordingly.
|
|
||||||
*
|
|
||||||
* @param query
|
|
||||||
* concrete implementation of {@link AbstractQuery}
|
|
||||||
* @param res
|
|
||||||
* the {@link HttpServletResponse} instance associated with this
|
|
||||||
* request
|
|
||||||
* @throws IOException
|
|
||||||
* thrown if writing to the output stream fails
|
|
||||||
*/
|
|
||||||
@RequestMapping(value = QUERY, method = RequestMethod.POST, consumes = { MediaType.APPLICATION_JSON_VALUE })
|
|
||||||
public void executeQuery(@RequestBody @Valid AbstractQuery query, HttpServletResponse res) throws IOException {
|
|
||||||
try {
|
|
||||||
LOGGER.debug("Executing query '{}'", query.toString());
|
|
||||||
|
|
||||||
// write the response back to the client using java 8 streams
|
public Stream<Entry<String, ?>> executeQuery(AbstractQuery query) {
|
||||||
responseStreamWriter.respond(executeQuery(query), query, res);
|
QueryAnalyzer queryAnalizer = queryAnalizerFactory.apply(query);
|
||||||
} catch (IOException t) {
|
|
||||||
LOGGER.error("Failed to execute query '{}'.", query, t);
|
|
||||||
throw t;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public Stream<Entry<String, ?>> executeQuery(AbstractQuery query) {
|
// all the magic happens here
|
||||||
QueryAnalyzer queryAnalizer = queryAnalizerFactory.apply(query);
|
Stream<Entry<String, Stream<? extends DataEvent>>> channelToDataEvents = getQueryProcessor(query.getDbMode())
|
||||||
|
.process(queryAnalizer);
|
||||||
|
|
||||||
// all the magic happens here
|
// do post-process
|
||||||
Stream<Entry<String, Stream<? extends DataEvent>>> channelToDataEvents = getQueryProcessor(query.getDbMode())
|
Stream<Entry<String, ?>> channelToData = queryAnalizer.postProcess(channelToDataEvents);
|
||||||
.process(queryAnalizer);
|
|
||||||
|
|
||||||
// do post-process
|
return channelToData;
|
||||||
Stream<Entry<String, ?>> channelToData = queryAnalizer.postProcess(channelToDataEvents);
|
}
|
||||||
|
|
||||||
return channelToData;
|
private QueryProcessor getQueryProcessor(DBMode dbMode) {
|
||||||
}
|
if (DBMode.databuffer.equals(dbMode)) {
|
||||||
|
return cassandraQueryProcessor;
|
||||||
|
} else if (DBMode.archiverappliance.equals(dbMode)) {
|
||||||
|
return archiverApplianceQueryProcessor;
|
||||||
|
} else {
|
||||||
|
LOGGER.error("Unknown DBMode '{}'!", dbMode);
|
||||||
|
throw new IllegalArgumentException(String.format("Unknown DBMode '%s'", dbMode));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private QueryProcessor getQueryProcessor(DBMode dbMode) {
|
// ==========================================================================================
|
||||||
if (DBMode.databuffer.equals(dbMode)) {
|
// TODO: This is simply for initial / rudimentary testing - remove once
|
||||||
return cassandraQueryProcessor;
|
// further evolved
|
||||||
} else if (DBMode.archiverappliance.equals(dbMode)) {
|
|
||||||
return archiverApplianceQueryProcessor;
|
|
||||||
} else {
|
|
||||||
LOGGER.error("Unknown DBMode '{}'!", dbMode);
|
|
||||||
throw new IllegalArgumentException(String.format("Unknown DBMode '%s'", dbMode));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// ==========================================================================================
|
@Resource
|
||||||
// TODO: This is simply for initial / rudimentary testing - remove once
|
private CassandraDataGen cassandraDataGen;
|
||||||
// further evolved
|
|
||||||
|
|
||||||
@Resource
|
@RequestMapping(value = "/write")
|
||||||
private CassandraDataGen cassandraDataGen;
|
public void writeDummyEntry() {
|
||||||
|
long nrOfElements = 4;
|
||||||
|
cassandraDataGen.writeData(3, 0, 4, i -> i, i -> 0, i -> i, i -> i, i -> 0, i -> new long[] {i, i, i, i},
|
||||||
|
"TRFCA-channel1");
|
||||||
|
|
||||||
@RequestMapping(value = "/write")
|
cassandraDataGen.writeData(3, 0, 4, i -> i, i -> 0, i -> i, i -> i, i -> 0, i -> new long[] {nrOfElements - i,
|
||||||
public void writeDummyEntry() {
|
nrOfElements - i, nrOfElements - i, nrOfElements - i}, "TRFCA-channel2");
|
||||||
long nrOfElements = 4;
|
|
||||||
cassandraDataGen.writeData(3, 0, 4, i -> i, i -> 0, i -> i, i -> i, i -> 0, i -> new long[] { i, i, i, i },
|
|
||||||
"TRFCA-channel1");
|
|
||||||
|
|
||||||
cassandraDataGen.writeData(3, 0, 4, i -> i, i -> 0, i -> i, i -> i, i -> 0, i -> new long[] { nrOfElements - i,
|
cassandraDataGen.writeData(3, 0, 4, i -> i, i -> 0, i -> i, i -> i, i -> 0, i -> i, "TRFCB-channel3");
|
||||||
nrOfElements - i, nrOfElements - i, nrOfElements - i }, "TRFCA-channel2");
|
|
||||||
|
|
||||||
cassandraDataGen.writeData(3, 0, 4, i -> i, i -> 0, i -> i, i -> i, i -> 0, i -> i, "TRFCB-channel3");
|
cassandraDataGen.writeData(3, 0, 4, i -> i, i -> 0, i -> i, i -> i, i -> 0, i -> nrOfElements - i,
|
||||||
|
"TRFCB-channel4");
|
||||||
cassandraDataGen.writeData(3, 0, 4, i -> i, i -> 0, i -> i, i -> i, i -> 0, i -> nrOfElements - i,
|
}
|
||||||
"TRFCB-channel4");
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user