ATEST-344

This commit is contained in:
Fabian Märki
2016-07-05 09:40:14 +02:00
parent 907637ad32
commit 596e2617a8
7 changed files with 273 additions and 0 deletions
@@ -15,6 +15,7 @@ import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.context.annotation.Lazy;
import org.springframework.context.annotation.PropertySource;
import org.springframework.core.env.Environment;
import org.springframework.http.converter.HttpMessageConverter;
@@ -51,6 +52,7 @@ public class QueryRestConfig extends WebMvcConfigurerAdapter {
private static final String QUERYREST_DEFAULT_RESPONSE_FIELDS = "queryrest.default.response.fields";
private static final String QUERYREST_CORS_ALLOWEDORIGINS = "queryrest.cors.allowedorigins";
private static final String QUERYREST_CORS_FORCEALLHEADERS = "queryrest.cors.forceallheaders";
private static final String QUERYREST_READTIMOUT = "queryrest.readtimeout";
// a nested configuration
// this guarantees that the ordering of the properties file is as expected
@@ -67,6 +69,7 @@ public class QueryRestConfig extends WebMvcConfigurerAdapter {
public static final String BEAN_NAME_DEFAULT_RESPONSE_AGGREGATIONS = "defaultResponseAggregations";
public static final String BEAN_NAME_CORS_ALLOWEDORIGINS = "corsAllowedorigins";
public static final String BEAN_NAME_CORS_FORCEALLHEADERS = "corsForceallheaders";
public static final String BEAN_NAME_READ_TIMOUT = "queryRestReadTimout";
@Resource
private Environment env;
@@ -178,4 +181,12 @@ public class QueryRestConfig extends WebMvcConfigurerAdapter {
LOGGER.debug("Load '{}={}'", QUERYREST_CORS_FORCEALLHEADERS, value);
return value;
}
@Bean(name = BEAN_NAME_READ_TIMOUT)
@Lazy
public Integer readTimeout() {
Integer value = env.getProperty(QUERYREST_READTIMOUT, Integer.class, 10);
LOGGER.debug("Load '{}={}'", QUERYREST_READTIMOUT, value);
return value;
}
}
@@ -7,6 +7,8 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -42,6 +44,7 @@ import ch.psi.daq.domain.DataEvent;
import ch.psi.daq.domain.FieldNames;
import ch.psi.daq.domain.config.DomainConfig;
import ch.psi.daq.domain.json.ChannelName;
import ch.psi.daq.domain.json.status.channel.ChannelStatus;
import ch.psi.daq.domain.query.DAQQueries;
import ch.psi.daq.domain.query.DAQQuery;
import ch.psi.daq.domain.query.DAQQueryElement;
@@ -51,14 +54,17 @@ import ch.psi.daq.domain.query.operation.Aggregation;
import ch.psi.daq.domain.query.operation.AggregationType;
import ch.psi.daq.domain.query.operation.QueryField;
import ch.psi.daq.domain.query.operation.ResponseFormat;
import ch.psi.daq.domain.query.status.channel.ChannelStatusQuery;
import ch.psi.daq.domain.reader.Backend;
import ch.psi.daq.domain.request.validate.RequestProviderValidator;
import ch.psi.daq.domain.status.StatusReader;
import ch.psi.daq.query.analyzer.QueryAnalyzer;
import ch.psi.daq.query.config.QueryConfig;
import ch.psi.daq.query.model.Query;
import ch.psi.daq.query.model.impl.BackendQuery;
import ch.psi.daq.query.processor.ChannelNameCache;
import ch.psi.daq.query.processor.QueryProcessor;
import ch.psi.daq.queryrest.config.QueryRestConfig;
import ch.psi.daq.queryrest.response.csv.CSVResponseStreamWriter;
import ch.psi.daq.queryrest.response.json.JSONResponseStreamWriter;
@@ -73,6 +79,7 @@ public class QueryRestController {
public static final String PATH_CHANNELS = DomainConfig.PATH_CHANNELS;
public static final String PATH_QUERY = DomainConfig.PATH_QUERY;
public static final String PATH_QUERIES = DomainConfig.PATH_QUERIES;
public static final String PATH_STATUS_CHANNELS = DomainConfig.PATH_STATUS_CHANNELS;
@Resource
private Validator queryValidator;
@@ -93,9 +100,14 @@ public class QueryRestController {
@Resource
private Function<Query, QueryAnalyzer> queryAnalizerFactory;
@Resource(name = QueryRestConfig.BEAN_NAME_READ_TIMOUT)
private Integer readTimeout;
private Map<Backend, QueryProcessor> queryProcessors = new LinkedHashMap<>();
private ChannelNameCache channelNameCache;
private Map<Backend, StatusReader> statusReaders = new LinkedHashMap<>();
@PostConstruct
public void afterPropertiesSet() {
List<Exception> exceptions = new ArrayList<>();
@@ -135,6 +147,32 @@ public class QueryRestController {
throw new RuntimeException("No Backends available!");
}
try {
StatusReader statusReader =
appContext.getBean(QueryConfig.BEAN_NAME_CASSANDRA_STATUS_READER, StatusReader.class);
statusReaders.put(statusReader.getBackend(), statusReader);
} catch (Exception e) {
exceptions.add(e);
LOGGER.warn("");
LOGGER.warn("##########");
LOGGER.warn("Could not load status reader for cassandra.");
LOGGER.warn("##########");
LOGGER.warn("");
}
try {
StatusReader statusReader =
appContext.getBean(QueryConfig.BEAN_NAME_ARCHIVER_APPLIANCE_STATUS_READER, StatusReader.class);
statusReaders.put(statusReader.getBackend(), statusReader);
} catch (Exception e) {
exceptions.add(e);
LOGGER.warn("");
LOGGER.warn("##########");
LOGGER.warn("Could not load status reader for archiverappliance.");
LOGGER.warn("##########");
LOGGER.warn("");
}
channelNameCache =
new ChannelNameCache(queryProcessors, appContext.getBean(CassandraConfig.BEAN_NAME_READ_TIMEOUT,
Integer.class).longValue());
@@ -183,6 +221,46 @@ public class QueryRestController {
return getChannels(new ChannelsRequest(channelName));
}
/**
* Queries for channels status
*
* @param query the {@link ChannelStatusQuery}
* @param res the {@link HttpServletResponse} instance associated with this request
* @throws IOException thrown if writing to the output stream fails
*/
@RequestMapping(
value = PATH_STATUS_CHANNELS,
method = RequestMethod.POST,
consumes = {MediaType.APPLICATION_JSON_VALUE})
public @ResponseBody Collection<ChannelStatus> executeChannelStatusQuery(@RequestBody ChannelStatusQuery query,
HttpServletResponse res) throws Exception {
// set backends if not defined yet
channelNameCache.setBackends(query.getChannels());
List<CompletableFuture<ChannelStatus>> futures = new ArrayList<>(query.getChannels().size());
for (ChannelName channel : query.getChannels()) {
StatusReader statusReader = statusReaders.get(channel.getBackend());
if (statusReader != null) {
futures.add(statusReader.getChannelStatusAsync(channel.getName()));
} else {
LOGGER.warn("There is no StatusReader available for '{}'.", channel.getBackend());
}
}
try {
List<ChannelStatus> retStatus = new ArrayList<>(futures.size());
for (CompletableFuture<ChannelStatus> completableFuture : futures) {
retStatus.add(completableFuture.get(readTimeout, TimeUnit.SECONDS));
}
return retStatus;
} catch (Exception e) {
String message = "Could not extract ChannelStatus within timelimits.";
LOGGER.error(message, e);
throw new IllegalStateException(message, e);
}
}
/**
* Accepts the properties for the {@link DAQQuery} instance as stringified JSON query string.
*
+3
View File
@@ -16,3 +16,6 @@ queryrest.cors.forceallheaders=false
# If this is set to '*', then all requests are allowed, from any source. If it's set to, say, http://ui-data-api.psi.ch, then only requests
# originating from that domain (Origin header set to that value) will be allowed. Otherwise a 403 error will be returned.
queryrest.cors.allowedorigins=*
# Timeout for read operations in seconds
queryrest.readtimeout=30