From 8cdfcb5b8223fdc6062457975e5592d07f4ed6ac Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fabian=20M=C3=A4rki?= Date: Thu, 7 Feb 2019 13:26:49 +0100 Subject: [PATCH] Remote query server --- .../daq/queryrest/config/QueryRestConfig.java | 19 ++- .../controller/QueryRestController.java | 111 ++++++++++-------- .../psi/daq/queryrest/query/QueryManager.java | 7 +- .../daq/queryrest/query/QueryManagerImpl.java | 13 +- .../queryrest/query/QueryManagerRemote.java | 36 +++--- .../query/QueryManagerRemoteTest.java | 17 ++- src/test/resources/queryrest-test.properties | 2 +- 7 files changed, 125 insertions(+), 80 deletions(-) diff --git a/src/main/java/ch/psi/daq/queryrest/config/QueryRestConfig.java b/src/main/java/ch/psi/daq/queryrest/config/QueryRestConfig.java index b8d6606..9bab83a 100644 --- a/src/main/java/ch/psi/daq/queryrest/config/QueryRestConfig.java +++ b/src/main/java/ch/psi/daq/queryrest/config/QueryRestConfig.java @@ -5,7 +5,9 @@ import java.util.Arrays; import java.util.EnumMap; import java.util.LinkedHashSet; import java.util.List; +import java.util.Map; import java.util.Set; +import java.util.function.BiFunction; import java.util.function.Function; import javax.annotation.PostConstruct; @@ -35,6 +37,8 @@ import com.fasterxml.jackson.dataformat.smile.SmileFactory; import ch.psi.daq.common.statistic.Statistics; import ch.psi.daq.domain.DataEvent; import ch.psi.daq.domain.backend.Backend; +import ch.psi.daq.domain.backend.BackendType; +import ch.psi.daq.domain.backend.DomainBackendType; import ch.psi.daq.domain.config.DomainConfig; import ch.psi.daq.domain.config.DomainConfigCORS; import ch.psi.daq.domain.events.ChannelConfiguration; @@ -49,6 +53,7 @@ import ch.psi.daq.domain.query.operation.EventField; import ch.psi.daq.domain.query.operation.aggregation.extrema.AbstractExtremaMeta; import ch.psi.daq.domain.query.response.Response; import ch.psi.daq.domain.request.validate.RequestProviderValidator; +import ch.psi.daq.domain.rest.RestHelper; import ch.psi.daq.query.analyzer.BackendQueryAnalyzerImpl; import ch.psi.daq.query.config.QueryConfig; import ch.psi.daq.queryrest.controller.validator.ConfigQueryValidator; @@ -118,7 +123,8 @@ public class QueryRestConfig { // extends WebMvcConfigurerAdapter { public static final String BEAN_NAME_FORMATTER_HISTORIC_CHANNEL_CONFIGURATION = "formatterHistoricChannelConfiguration"; public static final String BEAN_NAME_FORMATTER_RAW_EVENT = "formatterRawEvent"; - public static final String BEAN_NAME_QUERY_SERVER_ADDRESSES = "queryBackends"; + public static final String BEAN_NAME_QUERY_SERVER_ADDRESSES = "queryServerAddesses"; + public static final String BEAN_NAME_QUERY_SERVER_BACKENDS = "queryServerBackends"; private static final String QUERY_SERVER_TYPE = "query.server.type"; private static final String QUERY_SERVER_ADDRESSES = "query.server.addresses"; @@ -438,4 +444,15 @@ public class QueryRestConfig { // extends WebMvcConfigurerAdapter { return queryBackendStrs; } + + @Bean(name = BEAN_NAME_QUERY_SERVER_BACKENDS) + @Lazy + public Map queryServerBackends() { + final BiFunction typeCreator = + (name, queryServer) -> new DomainBackendType(name, queryServer); + @SuppressWarnings("unchecked") + final List queryServerAddresses = + context.getBean(QueryRestConfig.BEAN_NAME_QUERY_SERVER_ADDRESSES, List.class); + return RestHelper.getBackends(context, queryServerAddresses, typeCreator); + } } diff --git a/src/main/java/ch/psi/daq/queryrest/controller/QueryRestController.java b/src/main/java/ch/psi/daq/queryrest/controller/QueryRestController.java index 89c2964..40c3121 100644 --- a/src/main/java/ch/psi/daq/queryrest/controller/QueryRestController.java +++ b/src/main/java/ch/psi/daq/queryrest/controller/QueryRestController.java @@ -8,6 +8,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.function.Function; +import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -21,6 +22,7 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.BeansException; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; +import org.springframework.http.HttpHeaders; import org.springframework.http.HttpStatus; import org.springframework.http.MediaType; import org.springframework.validation.DirectFieldBindingResult; @@ -41,6 +43,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Lists; import ch.psi.daq.common.ordering.Ordering; +import ch.psi.daq.common.string.StringUtils; import ch.psi.daq.common.tuple.Quadruple; import ch.psi.daq.domain.backend.Backend; import ch.psi.daq.domain.config.DomainConfig; @@ -236,13 +239,24 @@ public class QueryRestController implements ApplicationContextAware { method = RequestMethod.POST, consumes = {MediaType.APPLICATION_JSON_VALUE}) public void executeDAQConfigQuery( - @RequestBody @Valid final DAQConfigQuery query, + @RequestBody @Valid final DAQConfigQuery query, final HttpServletRequest httpRequest, final HttpServletResponse httpResponse) throws Throwable { - final String redirect = queryManager.getRedirection(query.getChannels()); + // Do a redirection if only one backend is requested + // + final Supplier> channelsSupplier = () -> query.getChannels(); + final String redirect = queryManager.getRedirection(channelsSupplier); if (redirect != null) { - httpResponse.sendRedirect(redirect + DomainConfig.PATH_QUERY_CONFIG); + final String redirectURL = redirect + + httpRequest.getRequestURI() + + httpRequest.getQueryString() != null ? StringUtils.QUESTION_MARK + httpRequest.getQueryString() + : StringUtils.EMPTY; + + LOGGER.info("Send redirect to '{}'.", redirectURL); + httpResponse.setHeader(HttpHeaders.LOCATION, redirectURL); + // use 307 - works for POST too + httpResponse.setStatus(HttpServletResponse.SC_TEMPORARY_REDIRECT); return; } @@ -252,16 +266,16 @@ public class QueryRestController implements ApplicationContextAware { final Response response = query.getResponseOrDefault(defaultResponse); if (response instanceof AbstractHTTPResponse) { LOGGER.debug("Executing config query '{}'", query); - final AbstractHTTPResponse httpResponse = ((AbstractHTTPResponse) response); + final AbstractHTTPResponse httpRes = ((AbstractHTTPResponse) response); - httpResponse.validateQuery(query); + httpRes.validateQuery(query); // execute query final Entry>> result = queryManager.queryConfigs(query); - httpResponse.respond( + httpRes.respond( context, - res, + httpResponse, query, result); } else { @@ -284,13 +298,17 @@ public class QueryRestController implements ApplicationContextAware { * * @param jsonBody The {@link DAQQuery} properties sent as a JSON string, i.e. this is the * stringified body of the POST request method - * @param res the current {@link HttpServletResponse} instance + * @param httpRequest the {@link HttpServletRequest} instance associated with this request + * @param httpResponse the {@link HttpServletResponse} instance associated with this request * @throws Exception if reading the JSON string fails or if the subsequent call fails */ @RequestMapping( value = DomainConfig.PATH_QUERY, method = RequestMethod.GET) - public void executeDAQQueryBodyAsString(@RequestParam String jsonBody, HttpServletResponse res) throws Exception { + public void executeDAQQueryBodyAsString( + @RequestParam final String jsonBody, + final HttpServletRequest httpRequest, + final HttpServletResponse httpResponse) throws Exception { DAQQuery query; try { query = objectMapper.readValue(jsonBody, DAQQuery.class); @@ -311,13 +329,7 @@ public class QueryRestController implements ApplicationContextAware { final List allErrors = errors.getAllErrors(); if (allErrors.isEmpty()) { - final String redirect = queryManager.getRedirection(query.getChannels()); - if (redirect != null) { - res.sendRedirect(redirect + DomainConfig.PATH_QUERY); - return; - } - - executeDAQQuery(query, res); + executeDAQQuery(query, httpRequest, httpResponse); } else { final String message = String.format("Could not parse '%s' due to '%s'.", jsonBody, errors.toString()); LOGGER.error(message); @@ -329,21 +341,19 @@ public class QueryRestController implements ApplicationContextAware { * Executes a single query. * * @param query the {@link DAQQuery} - * @param res the {@link HttpServletResponse} instance associated with this request + * @param httpRequest the {@link HttpServletRequest} instance associated with this request + * @param httpResponse the {@link HttpServletResponse} instance associated with this request * @throws Exception thrown if writing to the output stream fails */ @RequestMapping( value = DomainConfig.PATH_QUERY, method = RequestMethod.POST, consumes = {MediaType.APPLICATION_JSON_VALUE}) - public void executeDAQQuery(@RequestBody @Valid DAQQuery query, HttpServletResponse res) throws Exception { - final String redirect = queryManager.getRedirection(query.getChannels()); - if (redirect != null) { - res.sendRedirect(redirect + DomainConfig.PATH_QUERY); - return; - } - - executeDAQQueries(new DAQQueries(query), res); + public void executeDAQQuery( + @RequestBody @Valid final DAQQuery query, + final HttpServletRequest httpRequest, + final HttpServletResponse httpResponse) throws Exception { + executeDAQQueries(new DAQQueries(query), httpRequest, httpResponse); } /** @@ -351,13 +361,17 @@ public class QueryRestController implements ApplicationContextAware { * * @param jsonBody The {@link DAQQueries} properties sent as a JSON string, i.e. this is the * stringified body of the POST request method - * @param res the current {@link HttpServletResponse} instance + * @param httpRequest the {@link HttpServletRequest} instance associated with this request + * @param httpResponse the {@link HttpServletResponse} instance associated with this request * @throws Exception if reading the JSON string fails or if the subsequent call fails */ @RequestMapping( value = DomainConfig.PATH_QUERIES, method = RequestMethod.GET) - public void executeDAQQueriesBodyAsString(@RequestParam String jsonBody, HttpServletResponse res) throws Exception { + public void executeDAQQueriesBodyAsString( + @RequestParam final String jsonBody, + final HttpServletRequest httpRequest, + final HttpServletResponse httpResponse) throws Exception { DAQQueries queries; try { queries = objectMapper.readValue(jsonBody, DAQQueries.class); @@ -378,16 +392,7 @@ public class QueryRestController implements ApplicationContextAware { final List allErrors = errors.getAllErrors(); if (allErrors.isEmpty()) { - final Collection channels = queries.getQueries().stream() - .flatMap(daqQuery -> daqQuery.getChannels().stream()) - .collect(Collectors.toList()); - final String redirect = queryManager.getRedirection(channels); - if (redirect != null) { - res.sendRedirect(redirect + DomainConfig.PATH_QUERIES); - return; - } - - executeDAQQueries(queries, res); + executeDAQQueries(queries, httpRequest, httpResponse); } else { final String message = String.format("Could not parse '%s' due to '%s'.", jsonBody, errors.toString()); LOGGER.error(message); @@ -404,20 +409,34 @@ public class QueryRestController implements ApplicationContextAware { * QueryRestConfig#afterPropertiesSet) accordingly. * * @param queries the {@link DAQQueries} - * @param res the {@link HttpServletResponse} instance associated with this request + * @param httpRequest the {@link HttpServletRequest} instance associated with this request + * @param httpResponse the {@link HttpServletResponse} instance associated with this request * @throws Exception thrown if writing to the output stream fails */ @RequestMapping( value = DomainConfig.PATH_QUERIES, method = RequestMethod.POST, consumes = {MediaType.APPLICATION_JSON_VALUE}) - public void executeDAQQueries(@RequestBody @Valid DAQQueries queries, HttpServletResponse res) throws Exception { - final Collection channels = queries.getQueries().stream() + public void executeDAQQueries( + @RequestBody @Valid final DAQQueries queries, + final HttpServletRequest httpRequest, + final HttpServletResponse httpResponse) throws Exception { + // Do a redirection if only one backend is requested + // + final Supplier> channelsSupplier = () -> queries.getQueries().stream() .flatMap(daqQuery -> daqQuery.getChannels().stream()) .collect(Collectors.toList()); - final String redirect = queryManager.getRedirection(channels); + final String redirect = queryManager.getRedirection(channelsSupplier); if (redirect != null) { - res.sendRedirect(redirect + DomainConfig.PATH_QUERIES); + final String redirectURL = redirect + + httpRequest.getRequestURI() + + httpRequest.getQueryString() != null ? StringUtils.QUESTION_MARK + httpRequest.getQueryString() + : StringUtils.EMPTY; + + LOGGER.info("Send redirect to '{}'.", redirectURL); + httpResponse.setHeader(HttpHeaders.LOCATION, redirectURL); + // use 307 - works for POST too + httpResponse.setStatus(HttpServletResponse.SC_TEMPORARY_REDIRECT); return; } @@ -427,16 +446,16 @@ public class QueryRestController implements ApplicationContextAware { final Response response = queries.getResponseOrDefault(defaultResponse); if (response instanceof AbstractHTTPResponse) { LOGGER.debug("Executing query '{}'", queries); - final AbstractHTTPResponse httpResponse = ((AbstractHTTPResponse) response); + final AbstractHTTPResponse httpRes = ((AbstractHTTPResponse) response); - httpResponse.validateQuery(queries); + httpRes.validateQuery(queries); // execute query final List>>> result = queryManager.queryEvents(queries); - httpResponse.respond( + httpRes.respond( context, - res, + httpResponse, queries, result); } else { diff --git a/src/main/java/ch/psi/daq/queryrest/query/QueryManager.java b/src/main/java/ch/psi/daq/queryrest/query/QueryManager.java index a8e90c4..f08a0f1 100644 --- a/src/main/java/ch/psi/daq/queryrest/query/QueryManager.java +++ b/src/main/java/ch/psi/daq/queryrest/query/QueryManager.java @@ -3,6 +3,7 @@ package ch.psi.daq.queryrest.query; import java.util.Collection; import java.util.List; import java.util.Map.Entry; +import java.util.function.Supplier; import java.util.stream.Stream; import org.apache.commons.lang3.tuple.Triple; @@ -26,13 +27,13 @@ public interface QueryManager { Stream getBackends(); - /** + /** * Provides the root path of the redirection (or null if no redirection is possible). - * + * * @param channels The channels * @return String The root of the redirection or null for none. */ - String getRedirection(final Collection channels); + String getRedirection(final Supplier> channels); LongHash getChannelsHash(); diff --git a/src/main/java/ch/psi/daq/queryrest/query/QueryManagerImpl.java b/src/main/java/ch/psi/daq/queryrest/query/QueryManagerImpl.java index 8552fdb..2d9e852 100644 --- a/src/main/java/ch/psi/daq/queryrest/query/QueryManagerImpl.java +++ b/src/main/java/ch/psi/daq/queryrest/query/QueryManagerImpl.java @@ -7,11 +7,11 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.function.Function; +import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.Stream; import javax.annotation.PreDestroy; -import javax.servlet.http.HttpServletResponse; import org.apache.commons.lang3.tuple.Pair; import org.apache.commons.lang3.tuple.Triple; @@ -68,14 +68,12 @@ public class QueryManagerImpl implements QueryManager, ApplicationContextAware { return Backend.getBackends().stream() .filter(backend -> activeBackends.contains(backend)); } - + @Override - public String getRedirection(final Collection channels){ - // set backends if not defined yet - channelsCache.configureBackends(channels); + public String getRedirection(final Supplier> channels) { return null; } - + @Override public LongHash getChannelsHash() { return channelsCache.getChannelsHash(); @@ -168,7 +166,8 @@ public class QueryManagerImpl implements QueryManager, ApplicationContextAware { queryAnalizer.postProcess(channelToDataEvents); // ChannelConfig query - final BackendQuery configQuery = new BackendQueryImpl(query, queryElement.getConfigFields()); + final BackendQuery configQuery = + new BackendQueryImpl(query, queryElement.getConfigFields()); final Map> channelToConfig = configQuery.getChannelConfigurations(); diff --git a/src/main/java/ch/psi/daq/queryrest/query/QueryManagerRemote.java b/src/main/java/ch/psi/daq/queryrest/query/QueryManagerRemote.java index b156010..f807213 100644 --- a/src/main/java/ch/psi/daq/queryrest/query/QueryManagerRemote.java +++ b/src/main/java/ch/psi/daq/queryrest/query/QueryManagerRemote.java @@ -2,19 +2,16 @@ package ch.psi.daq.queryrest.query; import java.util.ArrayList; import java.util.Collection; -import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; -import java.util.TreeMap; -import java.util.function.BiFunction; import java.util.function.Function; +import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.Stream; import javax.annotation.PreDestroy; -import javax.servlet.http.HttpServletResponse; import org.apache.commons.lang3.tuple.Pair; import org.apache.commons.lang3.tuple.Triple; @@ -25,8 +22,6 @@ import org.springframework.context.ApplicationContextAware; import ch.psi.daq.common.tuple.Quadruple; import ch.psi.daq.domain.DataEvent; import ch.psi.daq.domain.backend.Backend; -import ch.psi.daq.domain.backend.BackendType; -import ch.psi.daq.domain.backend.DomainBackendType; import ch.psi.daq.domain.events.ChannelConfiguration; import ch.psi.daq.domain.json.ChannelName; import ch.psi.daq.domain.query.DAQConfigQuery; @@ -37,13 +32,14 @@ import ch.psi.daq.domain.query.backend.BackendQuery; import ch.psi.daq.domain.query.backend.BackendQueryImpl; import ch.psi.daq.domain.query.backend.analyzer.BackendQueryAnalyzer; import ch.psi.daq.domain.query.channels.BackendsChannelConfigurationCache; +import ch.psi.daq.domain.query.channels.ChannelConfigurationLoader; import ch.psi.daq.domain.query.channels.ChannelConfigurationsRequest; import ch.psi.daq.domain.query.channels.ChannelConfigurationsResponse; import ch.psi.daq.domain.query.channels.ChannelsRequest; import ch.psi.daq.domain.query.channels.ChannelsResponse; import ch.psi.daq.domain.query.channels.LongHash; import ch.psi.daq.domain.query.processor.QueryProcessor; -import ch.psi.daq.domain.rest.RestHelper; +import ch.psi.daq.query.config.QueryConfig; import ch.psi.daq.queryrest.config.QueryRestConfig; public class QueryManagerRemote implements QueryManager, ApplicationContextAware { @@ -55,16 +51,21 @@ public class QueryManagerRemote implements QueryManager, ApplicationContextAware @SuppressWarnings("unchecked") @Override public void setApplicationContext(ApplicationContext context) throws BeansException { - final BiFunction typeCreator = - (name, queryServer) -> new DomainBackendType(name, queryServer); - final List queryServerAddresses = - context.getBean(QueryRestConfig.BEAN_NAME_QUERY_SERVER_ADDRESSES, List.class); - backendToServerAddresses = RestHelper.getBackends(context, queryServerAddresses, typeCreator); + backendToServerAddresses = context.getBean(QueryRestConfig.BEAN_NAME_QUERY_SERVER_BACKENDS, Map.class); - // channelsCache = new BackendsChannelConfigurationCache(backendToServerAddresses.keySet()); - // channelsCache = - // context.getBean(QueryConfig.BEAN_NAME_HISTORIC_CHANNELS_CACHE, - // BackendsChannelConfigurationCache.class); + final long reloadPeriodMillis = context.getBean(QueryConfig.BEAN_NAME_CHANNELS_CACHE_RELOAD_PERIOD, Long.class); + final Function loaderProvider = backend -> { + if (backend.getBackendAccess().hasHistoricChannelConfigurationLoader()) { + return backend.getBackendAccess().getHistoricChannelConfigurationLoader(); + } else { + return null; + } + }; + channelsCache = new BackendsChannelConfigurationCache(loaderProvider, reloadPeriodMillis); + channelsCache.init( + context, + backendToServerAddresses.keySet().iterator().next(), + backendToServerAddresses.keySet()); queryAnalizerFactory = context.getBean(QueryRestConfig.BEAN_NAME_QUERY_ANALIZER_FACTORY, Function.class); } @@ -77,8 +78,9 @@ public class QueryManagerRemote implements QueryManager, ApplicationContextAware } @Override - public String getRedirection(final Collection channels) { + public String getRedirection(final Supplier> channelsSupplier) { // set backends if not defined yet + final Collection channels = channelsSupplier.get(); channelsCache.configureBackends(channels); final Set backendRoots = channels.stream() diff --git a/src/test/java/ch/psi/daq/test/queryrest/query/QueryManagerRemoteTest.java b/src/test/java/ch/psi/daq/test/queryrest/query/QueryManagerRemoteTest.java index 15bffef..9d26da9 100644 --- a/src/test/java/ch/psi/daq/test/queryrest/query/QueryManagerRemoteTest.java +++ b/src/test/java/ch/psi/daq/test/queryrest/query/QueryManagerRemoteTest.java @@ -1,9 +1,11 @@ package ch.psi.daq.test.queryrest.query; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import java.util.List; +import java.util.stream.Collectors; + import javax.annotation.Resource; import org.junit.After; @@ -11,9 +13,8 @@ import org.junit.Before; import org.junit.Test; import org.springframework.context.ApplicationContext; -import ch.psi.daq.common.serialization.SerializationHelper; import ch.psi.daq.domain.backend.Backend; -import ch.psi.daq.domain.config.DomainConfig; +import ch.psi.daq.domain.query.channels.ChannelsRequest; import ch.psi.daq.queryrest.config.QueryRestConfig; import ch.psi.daq.queryrest.query.QueryManager; import ch.psi.daq.test.queryrest.AbstractDaqRestTest; @@ -34,6 +35,12 @@ public class QueryManagerRemoteTest extends AbstractDaqRestTest { @Test public void testQueryManager_01() throws Exception { - assertTrue(false); + final List backends = queryManager.getBackends().collect(Collectors.toList()); + assertEquals(7, backends.size()); + + final List channels = queryManager.getChannels(new ChannelsRequest()) + .flatMap(response -> response.getChannels().stream()) + .collect(Collectors.toList()); + assertTrue("Size was " + channels.size(), channels.size() > 400000); } } diff --git a/src/test/resources/queryrest-test.properties b/src/test/resources/queryrest-test.properties index bda14c2..ca37d55 100644 --- a/src/test/resources/queryrest-test.properties +++ b/src/test/resources/queryrest-test.properties @@ -10,7 +10,7 @@ domain.keyspace.base=daq_query_test channels.cache.reload.period=-1 -query.server.type=remote +#query.server.type=remote query.min.time=1970-01-01T00:00:00.000000000+00:00