From 0cfa4ac8da66d991994d2a5948b295742d7559ed Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fabian=20M=C3=A4rki?= Date: Tue, 19 Feb 2019 16:15:18 +0100 Subject: [PATCH] Central Dispatcher --- .../daq/queryrest/config/QueryRestConfig.java | 13 ++++----- .../queryrest/query/AbstractQueryManager.java | 27 +++++++------------ .../psi/daq/queryrest/query/QueryManager.java | 9 +++++-- .../daq/queryrest/query/QueryManagerImpl.java | 4 +-- .../queryrest/query/QueryManagerRemote.java | 9 ++++--- .../DAQQueriesResponseFormatter.java | 4 +-- src/main/resources/queryrest-all.properties | 3 ++- .../query/QueryManagerRemoteTest.java | 2 +- 8 files changed, 33 insertions(+), 38 deletions(-) diff --git a/src/main/java/ch/psi/daq/queryrest/config/QueryRestConfig.java b/src/main/java/ch/psi/daq/queryrest/config/QueryRestConfig.java index 2cd2048..8c9f43e 100644 --- a/src/main/java/ch/psi/daq/queryrest/config/QueryRestConfig.java +++ b/src/main/java/ch/psi/daq/queryrest/config/QueryRestConfig.java @@ -46,7 +46,7 @@ import ch.psi.daq.domain.config.DomainConfig; import ch.psi.daq.domain.config.DomainConfigCORS; import ch.psi.daq.domain.events.ChannelConfiguration; import ch.psi.daq.domain.json.ServerAddress; -import ch.psi.daq.domain.json.mixin.HistoricChannelConfigurationPropertyFilterMixin; +import ch.psi.daq.domain.json.mixin.ChannelConfigurationPropertyFilterMixin; import ch.psi.daq.domain.json.mixin.PropertyFilterMixin; import ch.psi.daq.domain.query.backend.BackendQuery; import ch.psi.daq.domain.query.backend.analyzer.BackendQueryAnalyzer; @@ -164,7 +164,7 @@ public class QueryRestConfig { // extends WebMvcConfigurerAdapter { objectMapper.addMixIn(AbstractExtremaMeta.class, PropertyFilterMixin.class); objectMapper.addMixIn(EnumMap.class, PropertyFilterMixin.class); - objectMapper.addMixIn(ChannelConfiguration.class, HistoricChannelConfigurationPropertyFilterMixin.class); + objectMapper.addMixIn(ChannelConfiguration.class, ChannelConfigurationPropertyFilterMixin.class); objectMapper.addMixIn(Response.class, PolymorphicResponseMixIn.class); } @@ -251,7 +251,7 @@ public class QueryRestConfig { // extends WebMvcConfigurerAdapter { @Bean(name = BEAN_NAME_QUERY_MANAGER) @Lazy public QueryManager queryManager() { - final String value = env.getProperty(QUERY_SERVER_TYPE, "local"); + final String value = env.getProperty(QUERY_SERVER_TYPE, QueryManagerImpl.QUERY_SERVER_TYPE); LOGGER.debug("Load '{}={}'", QUERY_SERVER_TYPE, value); if (QueryManagerImpl.QUERY_SERVER_TYPE.equals(value)) { @@ -263,7 +263,6 @@ public class QueryRestConfig { // extends WebMvcConfigurerAdapter { LOGGER.error(message); throw new IllegalStateException(message); } - } @Bean(name = BEAN_NAME_QUERY_MANAGER_LOCAL) @@ -432,10 +431,8 @@ public class QueryRestConfig { // extends WebMvcConfigurerAdapter { final String newDefaultProtocol = "https"; final String newDefaultHost = "data-api.psi.ch"; - - - LOGGER.warn("\nSet back to 443!\n#####"); - final int newDefaultPort = 8080; + // make sure it is different to Integer.valueOf() + final Integer newDefaultPort = new Integer(443); // this should be overwritten final String newDefaultPath = ""; final List queryBackendStrs = new ArrayList<>(queryBackends.size()); diff --git a/src/main/java/ch/psi/daq/queryrest/query/AbstractQueryManager.java b/src/main/java/ch/psi/daq/queryrest/query/AbstractQueryManager.java index 4fcc1f1..96ac39c 100644 --- a/src/main/java/ch/psi/daq/queryrest/query/AbstractQueryManager.java +++ b/src/main/java/ch/psi/daq/queryrest/query/AbstractQueryManager.java @@ -4,7 +4,6 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Map.Entry; -import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -33,23 +32,16 @@ import ch.psi.daq.domain.query.channels.LongHash; import ch.psi.daq.domain.query.processor.QueryProcessor; public abstract class AbstractQueryManager implements QueryManager { - private Set activeBackends; private BackendsChannelConfigurationCache channelsCache; private Function queryAnalizerFactory; protected void init( - final Set activeBackends, final BackendsChannelConfigurationCache channelsCache, final Function queryAnalizerFactory) { - this.activeBackends = activeBackends; this.channelsCache = channelsCache; this.queryAnalizerFactory = queryAnalizerFactory; } - protected Set getActiveBackends() { - return activeBackends; - } - protected BackendsChannelConfigurationCache getConfigurationCache() { return channelsCache; } @@ -61,42 +53,42 @@ public abstract class AbstractQueryManager implements QueryManager { @Override public Stream getBackends() { return Backend.getBackends().stream() - .filter(backend -> activeBackends.contains(backend)); + .filter(backend -> getConfigurationCache().getActiveBackends().contains(backend)); } @Override public LongHash getChannelsHash() { - return channelsCache.getChannelsHash(); + return getConfigurationCache().getChannelsHash(); } @Override public Stream getChannels( final ChannelsRequest request) { - return channelsCache.getChannels(request); + return getConfigurationCache().getChannels(request); } @Override public LongHash getChannelConfigurationsHash() { - return channelsCache.getChannelConfigurationsHash(); + return getConfigurationCache().getChannelConfigurationsHash(); } @Override public Stream getChannelConfigurations( final ChannelConfigurationsRequest request) { - return channelsCache.getChannelConfigurations(request); + return getConfigurationCache().getChannelConfigurations(request); } @Override public ChannelConfiguration getChannelConfiguration( final ChannelName channel) { - return channelsCache.getChannelConfiguration(channel); + return getConfigurationCache().getChannelConfiguration(channel); } @Override public Entry>> queryConfigs( final DAQConfigQuery daqQuery) { // set backends if not defined yet - channelsCache.configureBackends(daqQuery.getChannels()); + getConfigurationCache().configureBackends(daqQuery.getChannels()); Stream> resultStreams = BackendQueryImpl @@ -127,7 +119,7 @@ public abstract class AbstractQueryManager implements QueryManager { final DAQQueries queries) { // set backends if not defined yet for (DAQQueryElement daqQuery : queries) { - channelsCache.configureBackends(daqQuery.getChannels()); + getConfigurationCache().configureBackends(daqQuery.getChannels()); } final List>>> results = @@ -141,7 +133,8 @@ public abstract class AbstractQueryManager implements QueryManager { .filter( query -> { return query.getBackend().getBackendAccess().hasDataReader() - && query.getBackend().getBackendAccess().hasQueryProcessor();}) + && query.getBackend().getBackendAccess().hasQueryProcessor(); + }) .flatMap( query -> { final QueryProcessor processor = diff --git a/src/main/java/ch/psi/daq/queryrest/query/QueryManager.java b/src/main/java/ch/psi/daq/queryrest/query/QueryManager.java index f08a0f1..c4691f7 100644 --- a/src/main/java/ch/psi/daq/queryrest/query/QueryManager.java +++ b/src/main/java/ch/psi/daq/queryrest/query/QueryManager.java @@ -25,15 +25,20 @@ import ch.psi.daq.domain.query.channels.LongHash; public interface QueryManager { + /** + * Provides the available/active Backends. + * + * @return Stream The Backends + */ Stream getBackends(); - /** + /** * Provides the root path of the redirection (or null if no redirection is possible). * * @param channels The channels * @return String The root of the redirection or null for none. */ - String getRedirection(final Supplier> channels); + String getRedirection(final Supplier> channels); LongHash getChannelsHash(); diff --git a/src/main/java/ch/psi/daq/queryrest/query/QueryManagerImpl.java b/src/main/java/ch/psi/daq/queryrest/query/QueryManagerImpl.java index 6553802..088e596 100644 --- a/src/main/java/ch/psi/daq/queryrest/query/QueryManagerImpl.java +++ b/src/main/java/ch/psi/daq/queryrest/query/QueryManagerImpl.java @@ -1,7 +1,6 @@ package ch.psi.daq.queryrest.query; import java.util.Collection; -import java.util.Set; import java.util.function.Function; import java.util.function.Supplier; @@ -29,13 +28,12 @@ public class QueryManagerImpl extends AbstractQueryManager implements Applicatio final Backend backend = context.getBean(DomainConfig.BEAN_NAME_BACKEND_DEFAULT, Backend.class); context = backend.getApplicationContext(); - final Set activeBackends = context.getBean(DomainConfig.BEAN_NAME_BACKENDS_ACTIVE, Set.class); final BackendsChannelConfigurationCache channelsCache = context.getBean(QueryConfig.BEAN_NAME_HISTORIC_CHANNELS_CACHE, BackendsChannelConfigurationCache.class); final Function queryAnalizerFactory = context.getBean(QueryRestConfig.BEAN_NAME_QUERY_ANALIZER_FACTORY, Function.class); - init(activeBackends, channelsCache, queryAnalizerFactory); + init(channelsCache, queryAnalizerFactory); } @PreDestroy diff --git a/src/main/java/ch/psi/daq/queryrest/query/QueryManagerRemote.java b/src/main/java/ch/psi/daq/queryrest/query/QueryManagerRemote.java index 031aba4..d12fbd8 100644 --- a/src/main/java/ch/psi/daq/queryrest/query/QueryManagerRemote.java +++ b/src/main/java/ch/psi/daq/queryrest/query/QueryManagerRemote.java @@ -24,6 +24,7 @@ import ch.psi.daq.common.tuple.Quadruple; import ch.psi.daq.domain.DataEvent; import ch.psi.daq.domain.StreamEvent; import ch.psi.daq.domain.backend.Backend; +import ch.psi.daq.domain.config.DomainConfig; import ch.psi.daq.domain.events.ChannelConfiguration; import ch.psi.daq.domain.json.ChannelName; import ch.psi.daq.domain.query.DAQQueries; @@ -39,7 +40,6 @@ import ch.psi.daq.domain.query.response.Response; import ch.psi.daq.domain.query.response.ResponseFormat; import ch.psi.daq.domain.query.response.ResponseImpl; import ch.psi.daq.domain.rest.RestHelper; -import ch.psi.daq.query.config.QueryConfig; import ch.psi.daq.queryrest.config.QueryRestConfig; import reactor.core.publisher.Flux; @@ -56,7 +56,7 @@ public class QueryManagerRemote extends AbstractQueryManager implements Applicat public void setApplicationContext(ApplicationContext context) throws BeansException { backendToServerAddresses = context.getBean(QueryRestConfig.BEAN_NAME_QUERY_SERVER_BACKENDS, Map.class); - final long reloadPeriodMillis = context.getBean(QueryConfig.BEAN_NAME_CHANNELS_CACHE_RELOAD_PERIOD, Long.class); + final long reloadPeriodMillis = context.getBean(DomainConfig.BEAN_NAME_CHANNELS_CACHE_RELOAD_PERIOD, Long.class); final Function loaderProvider = backend -> { if (backend.getBackendAccess().hasHistoricChannelConfigurationLoader()) { return backend.getBackendAccess().getHistoricChannelConfigurationLoader(); @@ -64,16 +64,17 @@ public class QueryManagerRemote extends AbstractQueryManager implements Applicat return null; } }; + final Backend defaultBackend = backendToServerAddresses.keySet().iterator().next(); final BackendsChannelConfigurationCache channelsCache = new BackendsChannelConfigurationCache(loaderProvider, reloadPeriodMillis); channelsCache.init( context, - backendToServerAddresses.keySet().iterator().next(), + defaultBackend, backendToServerAddresses.keySet()); final Function queryAnalizerFactory = context.getBean(QueryRestConfig.BEAN_NAME_QUERY_ANALIZER_FACTORY, Function.class); - init(backendToServerAddresses.keySet(), channelsCache, queryAnalizerFactory); + init(channelsCache, queryAnalizerFactory); } @PreDestroy diff --git a/src/main/java/ch/psi/daq/queryrest/response/formatter/DAQQueriesResponseFormatter.java b/src/main/java/ch/psi/daq/queryrest/response/formatter/DAQQueriesResponseFormatter.java index 0594efe..858d990 100644 --- a/src/main/java/ch/psi/daq/queryrest/response/formatter/DAQQueriesResponseFormatter.java +++ b/src/main/java/ch/psi/daq/queryrest/response/formatter/DAQQueriesResponseFormatter.java @@ -37,7 +37,7 @@ import ch.psi.daq.common.tuple.Quadruple; import ch.psi.daq.domain.DataEvent; import ch.psi.daq.domain.events.ChannelConfiguration; import ch.psi.daq.domain.json.ChannelName; -import ch.psi.daq.domain.json.mixin.HistoricChannelConfigurationPropertyFilterMixin; +import ch.psi.daq.domain.json.mixin.ChannelConfigurationPropertyFilterMixin; import ch.psi.daq.domain.json.mixin.PropertyFilterMixin; import ch.psi.daq.domain.query.DAQQueryElement; import ch.psi.daq.domain.query.backend.BackendQuery; @@ -318,7 +318,7 @@ public class DAQQueriesResponseFormatter SimpleBeanPropertyFilter.filterOutAllExcept(includedEventFields)); } if (includedConfigFields != null) { - propertyFilter.addFilter(HistoricChannelConfigurationPropertyFilterMixin.FILTER_NAME, + propertyFilter.addFilter(ChannelConfigurationPropertyFilterMixin.FILTER_NAME, SimpleBeanPropertyFilter.filterOutAllExcept(includedConfigFields)); } // only write the properties not excluded in the filter diff --git a/src/main/resources/queryrest-all.properties b/src/main/resources/queryrest-all.properties index a4908d2..65befb9 100644 --- a/src/main/resources/queryrest-all.properties +++ b/src/main/resources/queryrest-all.properties @@ -1,3 +1,4 @@ query.server.type=remote query.processor.type=local -query.server.addresses=[{"protocol":"http","host":"localhost","port":8081,"path":""}] \ No newline at end of file + +# query.server.addresses=[{"protocol":"http","host":"localhost","port":8081,"path":""}] \ No newline at end of file diff --git a/src/test/java/ch/psi/daq/test/queryrest/query/QueryManagerRemoteTest.java b/src/test/java/ch/psi/daq/test/queryrest/query/QueryManagerRemoteTest.java index e90382e..23db0f0 100644 --- a/src/test/java/ch/psi/daq/test/queryrest/query/QueryManagerRemoteTest.java +++ b/src/test/java/ch/psi/daq/test/queryrest/query/QueryManagerRemoteTest.java @@ -53,7 +53,7 @@ import ch.psi.daq.test.queryrest.AbstractDaqRestTest; import ch.psi.data.converters.ByteConverter; public class QueryManagerRemoteTest extends AbstractDaqRestTest { - private final static String DATA_BUFFER = "daqlocal";// "sf-databuffer"; + private final static String DATA_BUFFER = "sf-databuffer"; @Resource private ApplicationContext context;