ATEST-285
This commit is contained in:
@ -15,6 +15,7 @@ import javax.validation.Valid;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.context.ApplicationContext;
|
||||
import org.springframework.http.MediaType;
|
||||
import org.springframework.validation.Validator;
|
||||
import org.springframework.web.bind.WebDataBinder;
|
||||
@ -28,10 +29,12 @@ import org.springframework.web.bind.annotation.RestController;
|
||||
|
||||
import ch.psi.daq.cassandra.request.validate.RequestProviderValidator;
|
||||
import ch.psi.daq.cassandra.util.test.CassandraDataGen;
|
||||
import ch.psi.daq.common.concurrent.singleton.Deferred;
|
||||
import ch.psi.daq.common.json.deserialize.AttributeBasedDeserializer;
|
||||
import ch.psi.daq.common.ordering.Ordering;
|
||||
import ch.psi.daq.domain.DataEvent;
|
||||
import ch.psi.daq.query.analyzer.QueryAnalyzer;
|
||||
import ch.psi.daq.query.config.QueryConfig;
|
||||
import ch.psi.daq.query.model.Aggregation;
|
||||
import ch.psi.daq.query.model.AggregationType;
|
||||
import ch.psi.daq.query.model.DBMode;
|
||||
@ -61,10 +64,14 @@ public class QueryRestController {
|
||||
private ResponseStreamWriter responseStreamWriter;
|
||||
|
||||
@Resource
|
||||
private QueryProcessor cassandraQueryProcessor;
|
||||
private ApplicationContext appContext;
|
||||
|
||||
@Resource
|
||||
private QueryProcessor archiverApplianceQueryProcessor;
|
||||
// using Deferred ensures that data-api startup succeeds even if DataBuffer/ArchiverAppliance is
|
||||
// not reachable at startup
|
||||
private Deferred<QueryProcessor> cassandraQueryProcessor = new Deferred<QueryProcessor>(
|
||||
() -> appContext.getBean(QueryConfig.BEAN_NAME_CASSANDRA_QUERY_PROCESSOR, QueryProcessor.class));
|
||||
private Deferred<QueryProcessor> archiverApplianceQueryProcessor = new Deferred<QueryProcessor>(
|
||||
() -> appContext.getBean(QueryConfig.BEAN_NAME_ARCHIVER_APPLIANCE_QUERY_PROCESSOR, QueryProcessor.class));
|
||||
|
||||
@Resource
|
||||
private Function<Query, QueryAnalyzer> queryAnalizerFactory;
|
||||
@ -87,7 +94,8 @@ public class QueryRestController {
|
||||
}
|
||||
}
|
||||
|
||||
@RequestMapping(value = CHANNELS, method = {RequestMethod.GET, RequestMethod.POST}, produces = {MediaType.APPLICATION_JSON_VALUE})
|
||||
@RequestMapping(value = CHANNELS, method = {RequestMethod.GET, RequestMethod.POST},
|
||||
produces = {MediaType.APPLICATION_JSON_VALUE})
|
||||
public @ResponseBody Collection<String> getChannels(@RequestBody(required = false) ChannelsRequest request)
|
||||
throws Throwable {
|
||||
// in case not specified use default (e.g. GET)
|
||||
@ -112,13 +120,14 @@ public class QueryRestController {
|
||||
* @return Collection of channel names matching the specified input channel name
|
||||
* @throws Throwable in case something goes wrong
|
||||
*/
|
||||
@RequestMapping(value = CHANNELS + "/{channelName}", method = {RequestMethod.GET}, produces = {MediaType.APPLICATION_JSON_VALUE})
|
||||
@RequestMapping(value = CHANNELS + "/{channelName}", method = {RequestMethod.GET},
|
||||
produces = {MediaType.APPLICATION_JSON_VALUE})
|
||||
public @ResponseBody Collection<String> getChannels(@PathVariable(value = "channelName") String channelName)
|
||||
throws Throwable {
|
||||
return getChannels(new ChannelsRequest(channelName));
|
||||
}
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* Returns the current list of {@link Ordering}s available.
|
||||
*
|
||||
@ -132,7 +141,7 @@ public class QueryRestController {
|
||||
return ord.toString();
|
||||
}).collect(Collectors.toList());
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Returns the current list of {@link QueryField}s available.
|
||||
*
|
||||
@ -146,7 +155,7 @@ public class QueryRestController {
|
||||
return qf.toString();
|
||||
}).collect(Collectors.toList());
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Returns the current list of {@link Aggregation}s available.
|
||||
*
|
||||
@ -160,13 +169,14 @@ public class QueryRestController {
|
||||
return value.toString();
|
||||
}).collect(Collectors.toList());
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Returns the current list of {@link AggregationType}s available.
|
||||
*
|
||||
* @return list of {@link AggregationType}s as String array
|
||||
*/
|
||||
@RequestMapping(value = "aggregationtypes", method = {RequestMethod.GET}, produces = {MediaType.APPLICATION_JSON_VALUE})
|
||||
@RequestMapping(value = "aggregationtypes", method = {RequestMethod.GET},
|
||||
produces = {MediaType.APPLICATION_JSON_VALUE})
|
||||
public @ResponseBody List<String> getAggregationTypesValues() {
|
||||
List<AggregationType> orderings = Lists.newArrayList(AggregationType.values());
|
||||
return orderings.stream()
|
||||
@ -188,14 +198,14 @@ public class QueryRestController {
|
||||
return value.toString();
|
||||
}).collect(Collectors.toList());
|
||||
}
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* Catch-all query method for getting data from the backend.
|
||||
* <p>
|
||||
* The {@link DAQQuery} object will be a concrete subclass based on the combination of
|
||||
* fields defined in the user's query. The {@link AttributeBasedDeserializer} decides which class
|
||||
* to deserialize the information into and has been configured (see
|
||||
* The {@link DAQQuery} object will be a concrete subclass based on the combination of fields
|
||||
* defined in the user's query. The {@link AttributeBasedDeserializer} decides which class to
|
||||
* deserialize the information into and has been configured (see
|
||||
* QueryRestConfig#afterPropertiesSet) accordingly.
|
||||
*
|
||||
* @param query concrete implementation of {@link DAQQuery}
|
||||
@ -219,7 +229,7 @@ public class QueryRestController {
|
||||
QueryAnalyzer queryAnalizer = queryAnalizerFactory.apply(query);
|
||||
|
||||
// all the magic happens here
|
||||
Stream<Entry<String, Stream<? extends DataEvent>>> channelToDataEvents =
|
||||
Stream<Entry<String, Stream<? extends DataEvent>>> channelToDataEvents =
|
||||
getQueryProcessor(query.getDbMode()).process(queryAnalizer);
|
||||
|
||||
// do post-process
|
||||
@ -230,9 +240,9 @@ public class QueryRestController {
|
||||
|
||||
private QueryProcessor getQueryProcessor(DBMode dbMode) {
|
||||
if (DBMode.databuffer.equals(dbMode)) {
|
||||
return cassandraQueryProcessor;
|
||||
return cassandraQueryProcessor.get();
|
||||
} else if (DBMode.archiverappliance.equals(dbMode)) {
|
||||
return archiverApplianceQueryProcessor;
|
||||
return archiverApplianceQueryProcessor.get();
|
||||
} else {
|
||||
LOGGER.error("Unknown DBMode '{}'!", dbMode);
|
||||
throw new IllegalArgumentException(String.format("Unknown DBMode '%s'", dbMode));
|
||||
|
Reference in New Issue
Block a user