ATEST-249:

- provide a channel search method with with the user can define all or
part of the channel name
This commit is contained in:
Zellweger Christof Ralf
2015-10-20 11:49:12 +02:00
parent 9030776f61
commit c894bb5939
3 changed files with 137 additions and 121 deletions

View File

@ -17,6 +17,7 @@ import org.springframework.http.MediaType;
import org.springframework.validation.Validator; import org.springframework.validation.Validator;
import org.springframework.web.bind.WebDataBinder; import org.springframework.web.bind.WebDataBinder;
import org.springframework.web.bind.annotation.InitBinder; import org.springframework.web.bind.annotation.InitBinder;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RequestMethod;
@ -25,6 +26,7 @@ import org.springframework.web.bind.annotation.RestController;
import ch.psi.daq.cassandra.request.validate.RequestProviderValidator; import ch.psi.daq.cassandra.request.validate.RequestProviderValidator;
import ch.psi.daq.cassandra.util.test.CassandraDataGen; import ch.psi.daq.cassandra.util.test.CassandraDataGen;
import ch.psi.daq.common.json.deserialize.AttributeBasedDeserializer;
import ch.psi.daq.domain.DataEvent; import ch.psi.daq.domain.DataEvent;
import ch.psi.daq.query.analyzer.QueryAnalyzer; import ch.psi.daq.query.analyzer.QueryAnalyzer;
import ch.psi.daq.query.model.Aggregation; import ch.psi.daq.query.model.Aggregation;
@ -40,148 +42,152 @@ import ch.psi.daq.queryrest.response.ResponseStreamWriter;
@RestController @RestController
public class QueryRestController { public class QueryRestController {
private static final Logger LOGGER = LoggerFactory.getLogger(QueryRestController.class); private static final Logger LOGGER = LoggerFactory.getLogger(QueryRestController.class);
public static final String CHANNELS = "/channels"; public static final String CHANNELS = "/channels";
public static final String QUERY = "/query"; public static final String QUERY = "/query";
@Resource @Resource
private Validator queryValidator; private Validator queryValidator;
private Validator requestProviderValidator = new RequestProviderValidator(); private Validator requestProviderValidator = new RequestProviderValidator();
@Resource @Resource
private ResponseStreamWriter responseStreamWriter; private ResponseStreamWriter responseStreamWriter;
@Resource @Resource
private QueryProcessor cassandraQueryProcessor; private QueryProcessor cassandraQueryProcessor;
@Resource @Resource
private QueryProcessor archiverApplianceQueryProcessor; private QueryProcessor archiverApplianceQueryProcessor;
@Resource @Resource
private Function<Query, QueryAnalyzer> queryAnalizerFactory; private Function<Query, QueryAnalyzer> queryAnalizerFactory;
@Resource(name = QueryRestConfig.BEAN_NAME_DEFAULT_RESPONSE_FIELDS) @Resource(name = QueryRestConfig.BEAN_NAME_DEFAULT_RESPONSE_FIELDS)
private Set<QueryField> defaultResponseFields; private Set<QueryField> defaultResponseFields;
@Resource(name = QueryRestConfig.BEAN_NAME_DEFAULT_RESPONSE_AGGREGATIONS) @Resource(name = QueryRestConfig.BEAN_NAME_DEFAULT_RESPONSE_AGGREGATIONS)
private Set<Aggregation> defaultResponseAggregations; private Set<Aggregation> defaultResponseAggregations;
@InitBinder
protected void initBinder(WebDataBinder binder) {
if (binder.getTarget() != null) {
if (requestProviderValidator.supports(binder.getTarget().getClass())) {
binder.addValidators(requestProviderValidator);
}
if (queryValidator.supports(binder.getTarget().getClass())) {
binder.addValidators(queryValidator);
}
}
}
@InitBinder @RequestMapping(value = CHANNELS, method = { RequestMethod.GET, RequestMethod.POST }, produces = { MediaType.APPLICATION_JSON_VALUE })
protected void initBinder(WebDataBinder binder) { public @ResponseBody Collection<String> getChannels(@RequestBody(required = false) ChannelsRequest request)
if (binder.getTarget() != null) { throws Throwable {
if (requestProviderValidator.supports(binder.getTarget().getClass())) { // in case not specified use default (e.g. GET)
binder.addValidators(requestProviderValidator); if (request == null) {
} request = new ChannelsRequest();
if (queryValidator.supports(binder.getTarget().getClass())) { }
binder.addValidators(queryValidator);
}
}
}
@RequestMapping( try {
value = CHANNELS, // sorted collection
method = {RequestMethod.GET, RequestMethod.POST}, Collection<String> allChannels = getQueryProcessor(request.getDbMode()).getChannels(request.getRegex());
produces = {MediaType.APPLICATION_JSON_VALUE}) return allChannels;
public @ResponseBody Collection<String> getChannels(@RequestBody(required = false) ChannelsRequest request) } catch (Throwable t) {
throws Throwable { LOGGER.error("Failed to query channel names.", t);
// in case not specified use default (e.g. GET) throw t;
if (request == null) { }
request = new ChannelsRequest(); }
}
try { /**
return getQueryProcessor(request.getDbMode()).getChannels(request.getRegex()); * Query specific channel names, and return only those.
} catch (Throwable t) { *
LOGGER.error("Failed to query channel names.", t); * @param channelName
throw t; * part of (or full) channel name
} * @return Collection of channel names matching the specified input channel
} * name
*/
@RequestMapping(value = CHANNELS + "/{channelName}", method = { RequestMethod.GET }, produces = { MediaType.APPLICATION_JSON_VALUE })
public @ResponseBody Collection<String> getChannels(@PathVariable(value = "channelName") String channelName) {
@RequestMapping( ChannelsRequest request = new ChannelsRequest(channelName);
value = QUERY, Collection<String> specificChannels = getQueryProcessor(request.getDbMode()).getChannels(request.getRegex());
method = RequestMethod.POST, return specificChannels;
consumes = {MediaType.APPLICATION_JSON_VALUE}) }
public void executeQuery(@RequestBody @Valid AbstractQuery query, HttpServletResponse res) throws IOException {
try {
LOGGER.debug("Executing query '{}'", query.getClass().getSimpleName());
// write the response back to the client using java 8 streams /**
responseStreamWriter.respond(executeQuery(query), query, res); * Catch-all query method for getting data from the backend.
} catch (Throwable t) { * <p>
LOGGER.error("Failed execute query '{}'.", query, t); * The {@link AbstractQuery} object will be a concrete subclass based on the
throw t; * combination of fields defined in the user's query. The
} * {@link AttributeBasedDeserializer} decides which class to deserialize the
} * information into and has been configured (see
* QueryRestConfig#afterPropertiesSet) accordingly.
*
* @param query
* concrete implementation of {@link AbstractQuery}
* @param res
* the {@link HttpServletResponse} instance associated with this
* request
* @throws IOException
* thrown if writing to the output stream fails
*/
@RequestMapping(value = QUERY, method = RequestMethod.POST, consumes = { MediaType.APPLICATION_JSON_VALUE })
public void executeQuery(@RequestBody @Valid AbstractQuery query, HttpServletResponse res) throws IOException {
try {
LOGGER.debug("Executing query '{}'", query.toString());
public Stream<Entry<String, ?>> executeQuery(AbstractQuery query) { // write the response back to the client using java 8 streams
QueryAnalyzer queryAnalizer = queryAnalizerFactory.apply(query); responseStreamWriter.respond(executeQuery(query), query, res);
} catch (IOException t) {
LOGGER.error("Failed to execute query '{}'.", query, t);
throw t;
}
}
// all the magic happens here public Stream<Entry<String, ?>> executeQuery(AbstractQuery query) {
Stream<Entry<String, Stream<? extends DataEvent>>> channelToDataEvents = QueryAnalyzer queryAnalizer = queryAnalizerFactory.apply(query);
getQueryProcessor(query.getDbMode()).process(queryAnalizer);
// do post-process // all the magic happens here
Stream<Entry<String, ?>> channelToData = queryAnalizer.postProcess(channelToDataEvents); Stream<Entry<String, Stream<? extends DataEvent>>> channelToDataEvents = getQueryProcessor(query.getDbMode())
.process(queryAnalizer);
return channelToData; // do post-process
} Stream<Entry<String, ?>> channelToData = queryAnalizer.postProcess(channelToDataEvents);
private QueryProcessor getQueryProcessor(DBMode dbMode) { return channelToData;
if (DBMode.databuffer.equals(dbMode)) { }
return cassandraQueryProcessor;
} else if (DBMode.archiverappliance.equals(dbMode)) {
return archiverApplianceQueryProcessor;
} else {
LOGGER.error("Unknown DBMode '{}'!", dbMode);
throw new IllegalArgumentException(String.format("Unknown DBMode '%s'", dbMode));
}
}
// ========================================================================================== private QueryProcessor getQueryProcessor(DBMode dbMode) {
// TODO: This is simply for initial / rudimentary testing - remove once further evolved if (DBMode.databuffer.equals(dbMode)) {
return cassandraQueryProcessor;
} else if (DBMode.archiverappliance.equals(dbMode)) {
return archiverApplianceQueryProcessor;
} else {
LOGGER.error("Unknown DBMode '{}'!", dbMode);
throw new IllegalArgumentException(String.format("Unknown DBMode '%s'", dbMode));
}
}
@Resource // ==========================================================================================
private CassandraDataGen cassandraDataGen; // TODO: This is simply for initial / rudimentary testing - remove once
// further evolved
@RequestMapping(value = "/write") @Resource
public void writeDummyEntry() { private CassandraDataGen cassandraDataGen;
long nrOfElements = 4;
cassandraDataGen.writeData(3, 0, 4,
i -> i,
i -> 0,
i -> i,
i -> i,
i -> 0,
i -> new long[] {i, i, i, i},
"TRFCA-channel1");
cassandraDataGen.writeData(3, 0, 4, @RequestMapping(value = "/write")
i -> i, public void writeDummyEntry() {
i -> 0, long nrOfElements = 4;
i -> i, cassandraDataGen.writeData(3, 0, 4, i -> i, i -> 0, i -> i, i -> i, i -> 0, i -> new long[] { i, i, i, i },
i -> i, "TRFCA-channel1");
i -> 0,
i -> new long[] {nrOfElements - i, nrOfElements - i, nrOfElements - i, nrOfElements - i},
"TRFCA-channel2");
cassandraDataGen.writeData(3, 0, 4, cassandraDataGen.writeData(3, 0, 4, i -> i, i -> 0, i -> i, i -> i, i -> 0, i -> new long[] { nrOfElements - i,
i -> i, nrOfElements - i, nrOfElements - i, nrOfElements - i }, "TRFCA-channel2");
i -> 0,
i -> i,
i -> i,
i -> 0,
i -> i,
"TRFCB-channel3");
cassandraDataGen.writeData(3, 0, 4, cassandraDataGen.writeData(3, 0, 4, i -> i, i -> 0, i -> i, i -> i, i -> 0, i -> i, "TRFCB-channel3");
i -> i,
i -> 0, cassandraDataGen.writeData(3, 0, 4, i -> i, i -> 0, i -> i, i -> i, i -> 0, i -> nrOfElements - i,
i -> i, "TRFCB-channel4");
i -> i, }
i -> 0,
i -> nrOfElements - i,
"TRFCB-channel4");
}
} }

View File

@ -14,6 +14,10 @@ public class ChannelsRequest {
public ChannelsRequest() {} public ChannelsRequest() {}
public ChannelsRequest(String regex) {
this(DBMode.databuffer, regex);
}
public ChannelsRequest(DBMode dbMode, String regex) { public ChannelsRequest(DBMode dbMode, String regex) {
this.regex = regex; this.regex = regex;
this.dbMode = dbMode; this.dbMode = dbMode;

View File

@ -74,3 +74,9 @@ curl -v -X POST -H 'Content-Type: application/json' -d '{"queryType":"timerange"
curl -v -X POST -H 'Content-Type: application/json' -d '{"queryType":"pulserange","ordering":"ASC","channels":["dummy-test"],"fields":["channel","pulseId","globalMillis","globalNanos","dbValueBytes"],"binningStrategy":"bincount","binDuration":100,"aggregateChannels":"false","aggregationType":"index","aggregations":[{"fieldRef":"e_val","type":"max","resultFieldName":"maximum"},{"fieldRef":"e_val","type":"min","resultFieldName":"minimum"}], "queryRange":{"startPulseId":100,"endPulseId":100}}' http://localhost:8080/pulserange curl -v -X POST -H 'Content-Type: application/json' -d '{"queryType":"pulserange","ordering":"ASC","channels":["dummy-test"],"fields":["channel","pulseId","globalMillis","globalNanos","dbValueBytes"],"binningStrategy":"bincount","binDuration":100,"aggregateChannels":"false","aggregationType":"index","aggregations":[{"fieldRef":"e_val","type":"max","resultFieldName":"maximum"},{"fieldRef":"e_val","type":"min","resultFieldName":"minimum"}], "queryRange":{"startPulseId":100,"endPulseId":100}}' http://localhost:8080/pulserange
===============================================================================================================================================
curl -H "Content-Type: application/json" -X POST -d '{"channels":["BooleanScalar"],"startPulseId":144404537,"endPulseId":144404547}' http://sf-nube-14.psi.ch:8080/query