Central Dispatcher

This commit is contained in:
Fabian Märki
2019-02-19 16:15:18 +01:00
parent 911fe46d28
commit 0cfa4ac8da
8 changed files with 33 additions and 38 deletions

View File

@ -46,7 +46,7 @@ import ch.psi.daq.domain.config.DomainConfig;
import ch.psi.daq.domain.config.DomainConfigCORS; import ch.psi.daq.domain.config.DomainConfigCORS;
import ch.psi.daq.domain.events.ChannelConfiguration; import ch.psi.daq.domain.events.ChannelConfiguration;
import ch.psi.daq.domain.json.ServerAddress; import ch.psi.daq.domain.json.ServerAddress;
import ch.psi.daq.domain.json.mixin.HistoricChannelConfigurationPropertyFilterMixin; import ch.psi.daq.domain.json.mixin.ChannelConfigurationPropertyFilterMixin;
import ch.psi.daq.domain.json.mixin.PropertyFilterMixin; import ch.psi.daq.domain.json.mixin.PropertyFilterMixin;
import ch.psi.daq.domain.query.backend.BackendQuery; import ch.psi.daq.domain.query.backend.BackendQuery;
import ch.psi.daq.domain.query.backend.analyzer.BackendQueryAnalyzer; import ch.psi.daq.domain.query.backend.analyzer.BackendQueryAnalyzer;
@ -164,7 +164,7 @@ public class QueryRestConfig { // extends WebMvcConfigurerAdapter {
objectMapper.addMixIn(AbstractExtremaMeta.class, PropertyFilterMixin.class); objectMapper.addMixIn(AbstractExtremaMeta.class, PropertyFilterMixin.class);
objectMapper.addMixIn(EnumMap.class, PropertyFilterMixin.class); objectMapper.addMixIn(EnumMap.class, PropertyFilterMixin.class);
objectMapper.addMixIn(ChannelConfiguration.class, HistoricChannelConfigurationPropertyFilterMixin.class); objectMapper.addMixIn(ChannelConfiguration.class, ChannelConfigurationPropertyFilterMixin.class);
objectMapper.addMixIn(Response.class, PolymorphicResponseMixIn.class); objectMapper.addMixIn(Response.class, PolymorphicResponseMixIn.class);
} }
@ -251,7 +251,7 @@ public class QueryRestConfig { // extends WebMvcConfigurerAdapter {
@Bean(name = BEAN_NAME_QUERY_MANAGER) @Bean(name = BEAN_NAME_QUERY_MANAGER)
@Lazy @Lazy
public QueryManager queryManager() { public QueryManager queryManager() {
final String value = env.getProperty(QUERY_SERVER_TYPE, "local"); final String value = env.getProperty(QUERY_SERVER_TYPE, QueryManagerImpl.QUERY_SERVER_TYPE);
LOGGER.debug("Load '{}={}'", QUERY_SERVER_TYPE, value); LOGGER.debug("Load '{}={}'", QUERY_SERVER_TYPE, value);
if (QueryManagerImpl.QUERY_SERVER_TYPE.equals(value)) { if (QueryManagerImpl.QUERY_SERVER_TYPE.equals(value)) {
@ -263,7 +263,6 @@ public class QueryRestConfig { // extends WebMvcConfigurerAdapter {
LOGGER.error(message); LOGGER.error(message);
throw new IllegalStateException(message); throw new IllegalStateException(message);
} }
} }
@Bean(name = BEAN_NAME_QUERY_MANAGER_LOCAL) @Bean(name = BEAN_NAME_QUERY_MANAGER_LOCAL)
@ -432,10 +431,8 @@ public class QueryRestConfig { // extends WebMvcConfigurerAdapter {
final String newDefaultProtocol = "https"; final String newDefaultProtocol = "https";
final String newDefaultHost = "data-api.psi.ch"; final String newDefaultHost = "data-api.psi.ch";
// make sure it is different to Integer.valueOf()
final Integer newDefaultPort = new Integer(443);
LOGGER.warn("\nSet back to 443!\n#####");
final int newDefaultPort = 8080;
// this should be overwritten // this should be overwritten
final String newDefaultPath = ""; final String newDefaultPath = "";
final List<String> queryBackendStrs = new ArrayList<>(queryBackends.size()); final List<String> queryBackendStrs = new ArrayList<>(queryBackends.size());

View File

@ -4,7 +4,6 @@ import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Set;
import java.util.function.Function; import java.util.function.Function;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
@ -33,23 +32,16 @@ import ch.psi.daq.domain.query.channels.LongHash;
import ch.psi.daq.domain.query.processor.QueryProcessor; import ch.psi.daq.domain.query.processor.QueryProcessor;
public abstract class AbstractQueryManager implements QueryManager { public abstract class AbstractQueryManager implements QueryManager {
private Set<Backend> activeBackends;
private BackendsChannelConfigurationCache channelsCache; private BackendsChannelConfigurationCache channelsCache;
private Function<BackendQuery, BackendQueryAnalyzer> queryAnalizerFactory; private Function<BackendQuery, BackendQueryAnalyzer> queryAnalizerFactory;
protected void init( protected void init(
final Set<Backend> activeBackends,
final BackendsChannelConfigurationCache channelsCache, final BackendsChannelConfigurationCache channelsCache,
final Function<BackendQuery, BackendQueryAnalyzer> queryAnalizerFactory) { final Function<BackendQuery, BackendQueryAnalyzer> queryAnalizerFactory) {
this.activeBackends = activeBackends;
this.channelsCache = channelsCache; this.channelsCache = channelsCache;
this.queryAnalizerFactory = queryAnalizerFactory; this.queryAnalizerFactory = queryAnalizerFactory;
} }
protected Set<Backend> getActiveBackends() {
return activeBackends;
}
protected BackendsChannelConfigurationCache getConfigurationCache() { protected BackendsChannelConfigurationCache getConfigurationCache() {
return channelsCache; return channelsCache;
} }
@ -61,42 +53,42 @@ public abstract class AbstractQueryManager implements QueryManager {
@Override @Override
public Stream<Backend> getBackends() { public Stream<Backend> getBackends() {
return Backend.getBackends().stream() return Backend.getBackends().stream()
.filter(backend -> activeBackends.contains(backend)); .filter(backend -> getConfigurationCache().getActiveBackends().contains(backend));
} }
@Override @Override
public LongHash getChannelsHash() { public LongHash getChannelsHash() {
return channelsCache.getChannelsHash(); return getConfigurationCache().getChannelsHash();
} }
@Override @Override
public Stream<ChannelsResponse> getChannels( public Stream<ChannelsResponse> getChannels(
final ChannelsRequest request) { final ChannelsRequest request) {
return channelsCache.getChannels(request); return getConfigurationCache().getChannels(request);
} }
@Override @Override
public LongHash getChannelConfigurationsHash() { public LongHash getChannelConfigurationsHash() {
return channelsCache.getChannelConfigurationsHash(); return getConfigurationCache().getChannelConfigurationsHash();
} }
@Override @Override
public Stream<ChannelConfigurationsResponse> getChannelConfigurations( public Stream<ChannelConfigurationsResponse> getChannelConfigurations(
final ChannelConfigurationsRequest request) { final ChannelConfigurationsRequest request) {
return channelsCache.getChannelConfigurations(request); return getConfigurationCache().getChannelConfigurations(request);
} }
@Override @Override
public ChannelConfiguration getChannelConfiguration( public ChannelConfiguration getChannelConfiguration(
final ChannelName channel) { final ChannelName channel) {
return channelsCache.getChannelConfiguration(channel); return getConfigurationCache().getChannelConfiguration(channel);
} }
@Override @Override
public Entry<DAQConfigQueryElement, Stream<Triple<BackendQuery, ChannelName, ?>>> queryConfigs( public Entry<DAQConfigQueryElement, Stream<Triple<BackendQuery, ChannelName, ?>>> queryConfigs(
final DAQConfigQuery daqQuery) { final DAQConfigQuery daqQuery) {
// set backends if not defined yet // set backends if not defined yet
channelsCache.configureBackends(daqQuery.getChannels()); getConfigurationCache().configureBackends(daqQuery.getChannels());
Stream<Triple<BackendQuery, ChannelName, ?>> resultStreams = Stream<Triple<BackendQuery, ChannelName, ?>> resultStreams =
BackendQueryImpl BackendQueryImpl
@ -127,7 +119,7 @@ public abstract class AbstractQueryManager implements QueryManager {
final DAQQueries queries) { final DAQQueries queries) {
// set backends if not defined yet // set backends if not defined yet
for (DAQQueryElement daqQuery : queries) { for (DAQQueryElement daqQuery : queries) {
channelsCache.configureBackends(daqQuery.getChannels()); getConfigurationCache().configureBackends(daqQuery.getChannels());
} }
final List<Entry<DAQQueryElement, Stream<Quadruple<BackendQuery, ChannelName, ?, ?>>>> results = final List<Entry<DAQQueryElement, Stream<Quadruple<BackendQuery, ChannelName, ?, ?>>>> results =
@ -141,7 +133,8 @@ public abstract class AbstractQueryManager implements QueryManager {
.filter( .filter(
query -> { query -> {
return query.getBackend().getBackendAccess().hasDataReader() return query.getBackend().getBackendAccess().hasDataReader()
&& query.getBackend().getBackendAccess().hasQueryProcessor();}) && query.getBackend().getBackendAccess().hasQueryProcessor();
})
.flatMap( .flatMap(
query -> { query -> {
final QueryProcessor processor = final QueryProcessor processor =

View File

@ -25,6 +25,11 @@ import ch.psi.daq.domain.query.channels.LongHash;
public interface QueryManager { public interface QueryManager {
/**
* Provides the available/active Backends.
*
* @return Stream The Backends
*/
Stream<Backend> getBackends(); Stream<Backend> getBackends();
/** /**

View File

@ -1,7 +1,6 @@
package ch.psi.daq.queryrest.query; package ch.psi.daq.queryrest.query;
import java.util.Collection; import java.util.Collection;
import java.util.Set;
import java.util.function.Function; import java.util.function.Function;
import java.util.function.Supplier; import java.util.function.Supplier;
@ -29,13 +28,12 @@ public class QueryManagerImpl extends AbstractQueryManager implements Applicatio
final Backend backend = context.getBean(DomainConfig.BEAN_NAME_BACKEND_DEFAULT, Backend.class); final Backend backend = context.getBean(DomainConfig.BEAN_NAME_BACKEND_DEFAULT, Backend.class);
context = backend.getApplicationContext(); context = backend.getApplicationContext();
final Set<Backend> activeBackends = context.getBean(DomainConfig.BEAN_NAME_BACKENDS_ACTIVE, Set.class);
final BackendsChannelConfigurationCache channelsCache = final BackendsChannelConfigurationCache channelsCache =
context.getBean(QueryConfig.BEAN_NAME_HISTORIC_CHANNELS_CACHE, BackendsChannelConfigurationCache.class); context.getBean(QueryConfig.BEAN_NAME_HISTORIC_CHANNELS_CACHE, BackendsChannelConfigurationCache.class);
final Function<BackendQuery, BackendQueryAnalyzer> queryAnalizerFactory = final Function<BackendQuery, BackendQueryAnalyzer> queryAnalizerFactory =
context.getBean(QueryRestConfig.BEAN_NAME_QUERY_ANALIZER_FACTORY, Function.class); context.getBean(QueryRestConfig.BEAN_NAME_QUERY_ANALIZER_FACTORY, Function.class);
init(activeBackends, channelsCache, queryAnalizerFactory); init(channelsCache, queryAnalizerFactory);
} }
@PreDestroy @PreDestroy

View File

@ -24,6 +24,7 @@ import ch.psi.daq.common.tuple.Quadruple;
import ch.psi.daq.domain.DataEvent; import ch.psi.daq.domain.DataEvent;
import ch.psi.daq.domain.StreamEvent; import ch.psi.daq.domain.StreamEvent;
import ch.psi.daq.domain.backend.Backend; import ch.psi.daq.domain.backend.Backend;
import ch.psi.daq.domain.config.DomainConfig;
import ch.psi.daq.domain.events.ChannelConfiguration; import ch.psi.daq.domain.events.ChannelConfiguration;
import ch.psi.daq.domain.json.ChannelName; import ch.psi.daq.domain.json.ChannelName;
import ch.psi.daq.domain.query.DAQQueries; import ch.psi.daq.domain.query.DAQQueries;
@ -39,7 +40,6 @@ import ch.psi.daq.domain.query.response.Response;
import ch.psi.daq.domain.query.response.ResponseFormat; import ch.psi.daq.domain.query.response.ResponseFormat;
import ch.psi.daq.domain.query.response.ResponseImpl; import ch.psi.daq.domain.query.response.ResponseImpl;
import ch.psi.daq.domain.rest.RestHelper; import ch.psi.daq.domain.rest.RestHelper;
import ch.psi.daq.query.config.QueryConfig;
import ch.psi.daq.queryrest.config.QueryRestConfig; import ch.psi.daq.queryrest.config.QueryRestConfig;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
@ -56,7 +56,7 @@ public class QueryManagerRemote extends AbstractQueryManager implements Applicat
public void setApplicationContext(ApplicationContext context) throws BeansException { public void setApplicationContext(ApplicationContext context) throws BeansException {
backendToServerAddresses = context.getBean(QueryRestConfig.BEAN_NAME_QUERY_SERVER_BACKENDS, Map.class); backendToServerAddresses = context.getBean(QueryRestConfig.BEAN_NAME_QUERY_SERVER_BACKENDS, Map.class);
final long reloadPeriodMillis = context.getBean(QueryConfig.BEAN_NAME_CHANNELS_CACHE_RELOAD_PERIOD, Long.class); final long reloadPeriodMillis = context.getBean(DomainConfig.BEAN_NAME_CHANNELS_CACHE_RELOAD_PERIOD, Long.class);
final Function<Backend, ChannelConfigurationLoader> loaderProvider = backend -> { final Function<Backend, ChannelConfigurationLoader> loaderProvider = backend -> {
if (backend.getBackendAccess().hasHistoricChannelConfigurationLoader()) { if (backend.getBackendAccess().hasHistoricChannelConfigurationLoader()) {
return backend.getBackendAccess().getHistoricChannelConfigurationLoader(); return backend.getBackendAccess().getHistoricChannelConfigurationLoader();
@ -64,16 +64,17 @@ public class QueryManagerRemote extends AbstractQueryManager implements Applicat
return null; return null;
} }
}; };
final Backend defaultBackend = backendToServerAddresses.keySet().iterator().next();
final BackendsChannelConfigurationCache channelsCache = final BackendsChannelConfigurationCache channelsCache =
new BackendsChannelConfigurationCache(loaderProvider, reloadPeriodMillis); new BackendsChannelConfigurationCache(loaderProvider, reloadPeriodMillis);
channelsCache.init( channelsCache.init(
context, context,
backendToServerAddresses.keySet().iterator().next(), defaultBackend,
backendToServerAddresses.keySet()); backendToServerAddresses.keySet());
final Function<BackendQuery, BackendQueryAnalyzer> queryAnalizerFactory = final Function<BackendQuery, BackendQueryAnalyzer> queryAnalizerFactory =
context.getBean(QueryRestConfig.BEAN_NAME_QUERY_ANALIZER_FACTORY, Function.class); context.getBean(QueryRestConfig.BEAN_NAME_QUERY_ANALIZER_FACTORY, Function.class);
init(backendToServerAddresses.keySet(), channelsCache, queryAnalizerFactory); init(channelsCache, queryAnalizerFactory);
} }
@PreDestroy @PreDestroy

View File

@ -37,7 +37,7 @@ import ch.psi.daq.common.tuple.Quadruple;
import ch.psi.daq.domain.DataEvent; import ch.psi.daq.domain.DataEvent;
import ch.psi.daq.domain.events.ChannelConfiguration; import ch.psi.daq.domain.events.ChannelConfiguration;
import ch.psi.daq.domain.json.ChannelName; import ch.psi.daq.domain.json.ChannelName;
import ch.psi.daq.domain.json.mixin.HistoricChannelConfigurationPropertyFilterMixin; import ch.psi.daq.domain.json.mixin.ChannelConfigurationPropertyFilterMixin;
import ch.psi.daq.domain.json.mixin.PropertyFilterMixin; import ch.psi.daq.domain.json.mixin.PropertyFilterMixin;
import ch.psi.daq.domain.query.DAQQueryElement; import ch.psi.daq.domain.query.DAQQueryElement;
import ch.psi.daq.domain.query.backend.BackendQuery; import ch.psi.daq.domain.query.backend.BackendQuery;
@ -318,7 +318,7 @@ public class DAQQueriesResponseFormatter
SimpleBeanPropertyFilter.filterOutAllExcept(includedEventFields)); SimpleBeanPropertyFilter.filterOutAllExcept(includedEventFields));
} }
if (includedConfigFields != null) { if (includedConfigFields != null) {
propertyFilter.addFilter(HistoricChannelConfigurationPropertyFilterMixin.FILTER_NAME, propertyFilter.addFilter(ChannelConfigurationPropertyFilterMixin.FILTER_NAME,
SimpleBeanPropertyFilter.filterOutAllExcept(includedConfigFields)); SimpleBeanPropertyFilter.filterOutAllExcept(includedConfigFields));
} }
// only write the properties not excluded in the filter // only write the properties not excluded in the filter

View File

@ -1,3 +1,4 @@
query.server.type=remote query.server.type=remote
query.processor.type=local query.processor.type=local
query.server.addresses=[{"protocol":"http","host":"localhost","port":8081,"path":""}]
# query.server.addresses=[{"protocol":"http","host":"localhost","port":8081,"path":""}]

View File

@ -53,7 +53,7 @@ import ch.psi.daq.test.queryrest.AbstractDaqRestTest;
import ch.psi.data.converters.ByteConverter; import ch.psi.data.converters.ByteConverter;
public class QueryManagerRemoteTest extends AbstractDaqRestTest { public class QueryManagerRemoteTest extends AbstractDaqRestTest {
private final static String DATA_BUFFER = "daqlocal";// "sf-databuffer"; private final static String DATA_BUFFER = "sf-databuffer";
@Resource @Resource
private ApplicationContext context; private ApplicationContext context;