Remote query server

This commit is contained in:
Fabian Märki
2019-02-07 13:26:49 +01:00
parent b6703e8e24
commit 8cdfcb5b82
7 changed files with 125 additions and 80 deletions

View File

@@ -5,7 +5,9 @@ import java.util.Arrays;
import java.util.EnumMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.BiFunction;
import java.util.function.Function;
import javax.annotation.PostConstruct;
@@ -35,6 +37,8 @@ import com.fasterxml.jackson.dataformat.smile.SmileFactory;
import ch.psi.daq.common.statistic.Statistics;
import ch.psi.daq.domain.DataEvent;
import ch.psi.daq.domain.backend.Backend;
import ch.psi.daq.domain.backend.BackendType;
import ch.psi.daq.domain.backend.DomainBackendType;
import ch.psi.daq.domain.config.DomainConfig;
import ch.psi.daq.domain.config.DomainConfigCORS;
import ch.psi.daq.domain.events.ChannelConfiguration;
@@ -49,6 +53,7 @@ import ch.psi.daq.domain.query.operation.EventField;
import ch.psi.daq.domain.query.operation.aggregation.extrema.AbstractExtremaMeta;
import ch.psi.daq.domain.query.response.Response;
import ch.psi.daq.domain.request.validate.RequestProviderValidator;
import ch.psi.daq.domain.rest.RestHelper;
import ch.psi.daq.query.analyzer.BackendQueryAnalyzerImpl;
import ch.psi.daq.query.config.QueryConfig;
import ch.psi.daq.queryrest.controller.validator.ConfigQueryValidator;
@@ -118,7 +123,8 @@ public class QueryRestConfig { // extends WebMvcConfigurerAdapter {
public static final String BEAN_NAME_FORMATTER_HISTORIC_CHANNEL_CONFIGURATION =
"formatterHistoricChannelConfiguration";
public static final String BEAN_NAME_FORMATTER_RAW_EVENT = "formatterRawEvent";
public static final String BEAN_NAME_QUERY_SERVER_ADDRESSES = "queryBackends";
public static final String BEAN_NAME_QUERY_SERVER_ADDRESSES = "queryServerAddesses";
public static final String BEAN_NAME_QUERY_SERVER_BACKENDS = "queryServerBackends";
private static final String QUERY_SERVER_TYPE = "query.server.type";
private static final String QUERY_SERVER_ADDRESSES = "query.server.addresses";
@@ -438,4 +444,15 @@ public class QueryRestConfig { // extends WebMvcConfigurerAdapter {
return queryBackendStrs;
}
@Bean(name = BEAN_NAME_QUERY_SERVER_BACKENDS)
@Lazy
public Map<Backend, String> queryServerBackends() {
final BiFunction<String, String, BackendType> typeCreator =
(name, queryServer) -> new DomainBackendType(name, queryServer);
@SuppressWarnings("unchecked")
final List<String> queryServerAddresses =
context.getBean(QueryRestConfig.BEAN_NAME_QUERY_SERVER_ADDRESSES, List.class);
return RestHelper.getBackends(context, queryServerAddresses, typeCreator);
}
}

View File

@@ -8,6 +8,7 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -21,6 +22,7 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.validation.DirectFieldBindingResult;
@@ -41,6 +43,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import ch.psi.daq.common.ordering.Ordering;
import ch.psi.daq.common.string.StringUtils;
import ch.psi.daq.common.tuple.Quadruple;
import ch.psi.daq.domain.backend.Backend;
import ch.psi.daq.domain.config.DomainConfig;
@@ -236,13 +239,24 @@ public class QueryRestController implements ApplicationContextAware {
method = RequestMethod.POST,
consumes = {MediaType.APPLICATION_JSON_VALUE})
public void executeDAQConfigQuery(
@RequestBody @Valid final DAQConfigQuery query,
@RequestBody @Valid final DAQConfigQuery query,
final HttpServletRequest httpRequest,
final HttpServletResponse httpResponse)
throws Throwable {
final String redirect = queryManager.getRedirection(query.getChannels());
// Do a redirection if only one backend is requested
//
final Supplier<Collection<ChannelName>> channelsSupplier = () -> query.getChannels();
final String redirect = queryManager.getRedirection(channelsSupplier);
if (redirect != null) {
httpResponse.sendRedirect(redirect + DomainConfig.PATH_QUERY_CONFIG);
final String redirectURL = redirect
+ httpRequest.getRequestURI()
+ httpRequest.getQueryString() != null ? StringUtils.QUESTION_MARK + httpRequest.getQueryString()
: StringUtils.EMPTY;
LOGGER.info("Send redirect to '{}'.", redirectURL);
httpResponse.setHeader(HttpHeaders.LOCATION, redirectURL);
// use 307 - works for POST too
httpResponse.setStatus(HttpServletResponse.SC_TEMPORARY_REDIRECT);
return;
}
@@ -252,16 +266,16 @@ public class QueryRestController implements ApplicationContextAware {
final Response response = query.getResponseOrDefault(defaultResponse);
if (response instanceof AbstractHTTPResponse) {
LOGGER.debug("Executing config query '{}'", query);
final AbstractHTTPResponse httpResponse = ((AbstractHTTPResponse) response);
final AbstractHTTPResponse httpRes = ((AbstractHTTPResponse) response);
httpResponse.validateQuery(query);
httpRes.validateQuery(query);
// execute query
final Entry<DAQConfigQueryElement, Stream<Triple<BackendQuery, ChannelName, ?>>> result =
queryManager.queryConfigs(query);
httpResponse.respond(
httpRes.respond(
context,
res,
httpResponse,
query,
result);
} else {
@@ -284,13 +298,17 @@ public class QueryRestController implements ApplicationContextAware {
*
* @param jsonBody The {@link DAQQuery} properties sent as a JSON string, i.e. this is the
* stringified body of the POST request method
* @param res the current {@link HttpServletResponse} instance
* @param httpRequest the {@link HttpServletRequest} instance associated with this request
* @param httpResponse the {@link HttpServletResponse} instance associated with this request
* @throws Exception if reading the JSON string fails or if the subsequent call fails
*/
@RequestMapping(
value = DomainConfig.PATH_QUERY,
method = RequestMethod.GET)
public void executeDAQQueryBodyAsString(@RequestParam String jsonBody, HttpServletResponse res) throws Exception {
public void executeDAQQueryBodyAsString(
@RequestParam final String jsonBody,
final HttpServletRequest httpRequest,
final HttpServletResponse httpResponse) throws Exception {
DAQQuery query;
try {
query = objectMapper.readValue(jsonBody, DAQQuery.class);
@@ -311,13 +329,7 @@ public class QueryRestController implements ApplicationContextAware {
final List<ObjectError> allErrors = errors.getAllErrors();
if (allErrors.isEmpty()) {
final String redirect = queryManager.getRedirection(query.getChannels());
if (redirect != null) {
res.sendRedirect(redirect + DomainConfig.PATH_QUERY);
return;
}
executeDAQQuery(query, res);
executeDAQQuery(query, httpRequest, httpResponse);
} else {
final String message = String.format("Could not parse '%s' due to '%s'.", jsonBody, errors.toString());
LOGGER.error(message);
@@ -329,21 +341,19 @@ public class QueryRestController implements ApplicationContextAware {
* Executes a single query.
*
* @param query the {@link DAQQuery}
* @param res the {@link HttpServletResponse} instance associated with this request
* @param httpRequest the {@link HttpServletRequest} instance associated with this request
* @param httpResponse the {@link HttpServletResponse} instance associated with this request
* @throws Exception thrown if writing to the output stream fails
*/
@RequestMapping(
value = DomainConfig.PATH_QUERY,
method = RequestMethod.POST,
consumes = {MediaType.APPLICATION_JSON_VALUE})
public void executeDAQQuery(@RequestBody @Valid DAQQuery query, HttpServletResponse res) throws Exception {
final String redirect = queryManager.getRedirection(query.getChannels());
if (redirect != null) {
res.sendRedirect(redirect + DomainConfig.PATH_QUERY);
return;
}
executeDAQQueries(new DAQQueries(query), res);
public void executeDAQQuery(
@RequestBody @Valid final DAQQuery query,
final HttpServletRequest httpRequest,
final HttpServletResponse httpResponse) throws Exception {
executeDAQQueries(new DAQQueries(query), httpRequest, httpResponse);
}
/**
@@ -351,13 +361,17 @@ public class QueryRestController implements ApplicationContextAware {
*
* @param jsonBody The {@link DAQQueries} properties sent as a JSON string, i.e. this is the
* stringified body of the POST request method
* @param res the current {@link HttpServletResponse} instance
* @param httpRequest the {@link HttpServletRequest} instance associated with this request
* @param httpResponse the {@link HttpServletResponse} instance associated with this request
* @throws Exception if reading the JSON string fails or if the subsequent call fails
*/
@RequestMapping(
value = DomainConfig.PATH_QUERIES,
method = RequestMethod.GET)
public void executeDAQQueriesBodyAsString(@RequestParam String jsonBody, HttpServletResponse res) throws Exception {
public void executeDAQQueriesBodyAsString(
@RequestParam final String jsonBody,
final HttpServletRequest httpRequest,
final HttpServletResponse httpResponse) throws Exception {
DAQQueries queries;
try {
queries = objectMapper.readValue(jsonBody, DAQQueries.class);
@@ -378,16 +392,7 @@ public class QueryRestController implements ApplicationContextAware {
final List<ObjectError> allErrors = errors.getAllErrors();
if (allErrors.isEmpty()) {
final Collection<ChannelName> channels = queries.getQueries().stream()
.flatMap(daqQuery -> daqQuery.getChannels().stream())
.collect(Collectors.toList());
final String redirect = queryManager.getRedirection(channels);
if (redirect != null) {
res.sendRedirect(redirect + DomainConfig.PATH_QUERIES);
return;
}
executeDAQQueries(queries, res);
executeDAQQueries(queries, httpRequest, httpResponse);
} else {
final String message = String.format("Could not parse '%s' due to '%s'.", jsonBody, errors.toString());
LOGGER.error(message);
@@ -404,20 +409,34 @@ public class QueryRestController implements ApplicationContextAware {
* QueryRestConfig#afterPropertiesSet) accordingly.
*
* @param queries the {@link DAQQueries}
* @param res the {@link HttpServletResponse} instance associated with this request
* @param httpRequest the {@link HttpServletRequest} instance associated with this request
* @param httpResponse the {@link HttpServletResponse} instance associated with this request
* @throws Exception thrown if writing to the output stream fails
*/
@RequestMapping(
value = DomainConfig.PATH_QUERIES,
method = RequestMethod.POST,
consumes = {MediaType.APPLICATION_JSON_VALUE})
public void executeDAQQueries(@RequestBody @Valid DAQQueries queries, HttpServletResponse res) throws Exception {
final Collection<ChannelName> channels = queries.getQueries().stream()
public void executeDAQQueries(
@RequestBody @Valid final DAQQueries queries,
final HttpServletRequest httpRequest,
final HttpServletResponse httpResponse) throws Exception {
// Do a redirection if only one backend is requested
//
final Supplier<Collection<ChannelName>> channelsSupplier = () -> queries.getQueries().stream()
.flatMap(daqQuery -> daqQuery.getChannels().stream())
.collect(Collectors.toList());
final String redirect = queryManager.getRedirection(channels);
final String redirect = queryManager.getRedirection(channelsSupplier);
if (redirect != null) {
res.sendRedirect(redirect + DomainConfig.PATH_QUERIES);
final String redirectURL = redirect
+ httpRequest.getRequestURI()
+ httpRequest.getQueryString() != null ? StringUtils.QUESTION_MARK + httpRequest.getQueryString()
: StringUtils.EMPTY;
LOGGER.info("Send redirect to '{}'.", redirectURL);
httpResponse.setHeader(HttpHeaders.LOCATION, redirectURL);
// use 307 - works for POST too
httpResponse.setStatus(HttpServletResponse.SC_TEMPORARY_REDIRECT);
return;
}
@@ -427,16 +446,16 @@ public class QueryRestController implements ApplicationContextAware {
final Response response = queries.getResponseOrDefault(defaultResponse);
if (response instanceof AbstractHTTPResponse) {
LOGGER.debug("Executing query '{}'", queries);
final AbstractHTTPResponse httpResponse = ((AbstractHTTPResponse) response);
final AbstractHTTPResponse httpRes = ((AbstractHTTPResponse) response);
httpResponse.validateQuery(queries);
httpRes.validateQuery(queries);
// execute query
final List<Entry<DAQQueryElement, Stream<Quadruple<BackendQuery, ChannelName, ?, ?>>>> result =
queryManager.queryEvents(queries);
httpResponse.respond(
httpRes.respond(
context,
res,
httpResponse,
queries,
result);
} else {

View File

@@ -3,6 +3,7 @@ package ch.psi.daq.queryrest.query;
import java.util.Collection;
import java.util.List;
import java.util.Map.Entry;
import java.util.function.Supplier;
import java.util.stream.Stream;
import org.apache.commons.lang3.tuple.Triple;
@@ -26,13 +27,13 @@ public interface QueryManager {
Stream<Backend> getBackends();
/**
/**
* Provides the root path of the redirection (or <tt>null</tt> if no redirection is possible).
*
*
* @param channels The channels
* @return String The root of the redirection or <tt>null</tt> for none.
*/
String getRedirection(final Collection<ChannelName> channels);
String getRedirection(final Supplier<Collection<ChannelName>> channels);
LongHash getChannelsHash();

View File

@@ -7,11 +7,11 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.PreDestroy;
import javax.servlet.http.HttpServletResponse;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.commons.lang3.tuple.Triple;
@@ -68,14 +68,12 @@ public class QueryManagerImpl implements QueryManager, ApplicationContextAware {
return Backend.getBackends().stream()
.filter(backend -> activeBackends.contains(backend));
}
@Override
public String getRedirection(final Collection<ChannelName> channels){
// set backends if not defined yet
channelsCache.configureBackends(channels);
public String getRedirection(final Supplier<Collection<ChannelName>> channels) {
return null;
}
@Override
public LongHash getChannelsHash() {
return channelsCache.getChannelsHash();
@@ -168,7 +166,8 @@ public class QueryManagerImpl implements QueryManager, ApplicationContextAware {
queryAnalizer.postProcess(channelToDataEvents);
// ChannelConfig query
final BackendQuery configQuery = new BackendQueryImpl(query, queryElement.getConfigFields());
final BackendQuery configQuery =
new BackendQueryImpl(query, queryElement.getConfigFields());
final Map<String, Stream<? extends ChannelConfiguration>> channelToConfig =
configQuery.getChannelConfigurations();

View File

@@ -2,19 +2,16 @@ package ch.psi.daq.queryrest.query;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeMap;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.PreDestroy;
import javax.servlet.http.HttpServletResponse;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.commons.lang3.tuple.Triple;
@@ -25,8 +22,6 @@ import org.springframework.context.ApplicationContextAware;
import ch.psi.daq.common.tuple.Quadruple;
import ch.psi.daq.domain.DataEvent;
import ch.psi.daq.domain.backend.Backend;
import ch.psi.daq.domain.backend.BackendType;
import ch.psi.daq.domain.backend.DomainBackendType;
import ch.psi.daq.domain.events.ChannelConfiguration;
import ch.psi.daq.domain.json.ChannelName;
import ch.psi.daq.domain.query.DAQConfigQuery;
@@ -37,13 +32,14 @@ import ch.psi.daq.domain.query.backend.BackendQuery;
import ch.psi.daq.domain.query.backend.BackendQueryImpl;
import ch.psi.daq.domain.query.backend.analyzer.BackendQueryAnalyzer;
import ch.psi.daq.domain.query.channels.BackendsChannelConfigurationCache;
import ch.psi.daq.domain.query.channels.ChannelConfigurationLoader;
import ch.psi.daq.domain.query.channels.ChannelConfigurationsRequest;
import ch.psi.daq.domain.query.channels.ChannelConfigurationsResponse;
import ch.psi.daq.domain.query.channels.ChannelsRequest;
import ch.psi.daq.domain.query.channels.ChannelsResponse;
import ch.psi.daq.domain.query.channels.LongHash;
import ch.psi.daq.domain.query.processor.QueryProcessor;
import ch.psi.daq.domain.rest.RestHelper;
import ch.psi.daq.query.config.QueryConfig;
import ch.psi.daq.queryrest.config.QueryRestConfig;
public class QueryManagerRemote implements QueryManager, ApplicationContextAware {
@@ -55,16 +51,21 @@ public class QueryManagerRemote implements QueryManager, ApplicationContextAware
@SuppressWarnings("unchecked")
@Override
public void setApplicationContext(ApplicationContext context) throws BeansException {
final BiFunction<String, String, BackendType> typeCreator =
(name, queryServer) -> new DomainBackendType(name, queryServer);
final List<String> queryServerAddresses =
context.getBean(QueryRestConfig.BEAN_NAME_QUERY_SERVER_ADDRESSES, List.class);
backendToServerAddresses = RestHelper.getBackends(context, queryServerAddresses, typeCreator);
backendToServerAddresses = context.getBean(QueryRestConfig.BEAN_NAME_QUERY_SERVER_BACKENDS, Map.class);
// channelsCache = new BackendsChannelConfigurationCache(backendToServerAddresses.keySet());
// channelsCache =
// context.getBean(QueryConfig.BEAN_NAME_HISTORIC_CHANNELS_CACHE,
// BackendsChannelConfigurationCache.class);
final long reloadPeriodMillis = context.getBean(QueryConfig.BEAN_NAME_CHANNELS_CACHE_RELOAD_PERIOD, Long.class);
final Function<Backend, ChannelConfigurationLoader> loaderProvider = backend -> {
if (backend.getBackendAccess().hasHistoricChannelConfigurationLoader()) {
return backend.getBackendAccess().getHistoricChannelConfigurationLoader();
} else {
return null;
}
};
channelsCache = new BackendsChannelConfigurationCache(loaderProvider, reloadPeriodMillis);
channelsCache.init(
context,
backendToServerAddresses.keySet().iterator().next(),
backendToServerAddresses.keySet());
queryAnalizerFactory = context.getBean(QueryRestConfig.BEAN_NAME_QUERY_ANALIZER_FACTORY, Function.class);
}
@@ -77,8 +78,9 @@ public class QueryManagerRemote implements QueryManager, ApplicationContextAware
}
@Override
public String getRedirection(final Collection<ChannelName> channels) {
public String getRedirection(final Supplier<Collection<ChannelName>> channelsSupplier) {
// set backends if not defined yet
final Collection<ChannelName> channels = channelsSupplier.get();
channelsCache.configureBackends(channels);
final Set<String> backendRoots = channels.stream()

View File

@@ -1,9 +1,11 @@
package ch.psi.daq.test.queryrest.query;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.util.List;
import java.util.stream.Collectors;
import javax.annotation.Resource;
import org.junit.After;
@@ -11,9 +13,8 @@ import org.junit.Before;
import org.junit.Test;
import org.springframework.context.ApplicationContext;
import ch.psi.daq.common.serialization.SerializationHelper;
import ch.psi.daq.domain.backend.Backend;
import ch.psi.daq.domain.config.DomainConfig;
import ch.psi.daq.domain.query.channels.ChannelsRequest;
import ch.psi.daq.queryrest.config.QueryRestConfig;
import ch.psi.daq.queryrest.query.QueryManager;
import ch.psi.daq.test.queryrest.AbstractDaqRestTest;
@@ -34,6 +35,12 @@ public class QueryManagerRemoteTest extends AbstractDaqRestTest {
@Test
public void testQueryManager_01() throws Exception {
assertTrue(false);
final List<Backend> backends = queryManager.getBackends().collect(Collectors.toList());
assertEquals(7, backends.size());
final List<String> channels = queryManager.getChannels(new ChannelsRequest())
.flatMap(response -> response.getChannels().stream())
.collect(Collectors.toList());
assertTrue("Size was " + channels.size(), channels.size() > 400000);
}
}

View File

@@ -10,7 +10,7 @@ domain.keyspace.base=daq_query_test
channels.cache.reload.period=-1
query.server.type=remote
#query.server.type=remote
query.min.time=1970-01-01T00:00:00.000000000+00:00