New way of defining server addresses.

This commit is contained in:
Fabian Märki
2019-01-24 11:22:28 +01:00
parent 5c70edf49c
commit 3da444d564
6 changed files with 271 additions and 14 deletions

View File

@ -1,5 +1,6 @@
package ch.psi.daq.queryrest.config;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.EnumMap;
import java.util.LinkedHashSet;
@ -27,6 +28,7 @@ import org.springframework.validation.Validator;
import com.fasterxml.jackson.annotation.JsonInclude.Include;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.smile.SmileFactory;
@ -36,6 +38,7 @@ import ch.psi.daq.domain.backend.Backend;
import ch.psi.daq.domain.config.DomainConfig;
import ch.psi.daq.domain.config.DomainConfigCORS;
import ch.psi.daq.domain.events.ChannelConfiguration;
import ch.psi.daq.domain.json.ServerAddress;
import ch.psi.daq.domain.json.mixin.HistoricChannelConfigurationPropertyFilterMixin;
import ch.psi.daq.domain.json.mixin.PropertyFilterMixin;
import ch.psi.daq.domain.query.backend.BackendQuery;
@ -112,6 +115,9 @@ public class QueryRestConfig { // extends WebMvcConfigurerAdapter {
public static final String BEAN_NAME_FORMATTER_HISTORIC_CHANNEL_CONFIGURATION =
"formatterHistoricChannelConfiguration";
public static final String BEAN_NAME_FORMATTER_RAW_EVENT = "formatterRawEvent";
public static final String BEAN_NAME_QUERY_BACKENDS = "queryBackends";
private static final String QUERY_BACKENDS = "query.backens";
@Resource
private ApplicationContext context;
@ -299,7 +305,8 @@ public class QueryRestConfig { // extends WebMvcConfigurerAdapter {
@Lazy
public Set<Aggregation> defaultResponseAggregations() {
String[] responseAggregations =
StringUtils.commaDelimitedListToStringArray(env.getProperty(QUERYREST_RESPONSE_FIELDS_EVENT_QUERY_AGGREGATIONS));
StringUtils
.commaDelimitedListToStringArray(env.getProperty(QUERYREST_RESPONSE_FIELDS_EVENT_QUERY_AGGREGATIONS));
LOGGER.debug("Load '{}={}'", QUERYREST_RESPONSE_FIELDS_EVENT_QUERY_AGGREGATIONS,
Arrays.toString(responseAggregations));
@ -370,4 +377,37 @@ public class QueryRestConfig { // extends WebMvcConfigurerAdapter {
public RawEventResponseFormatter idRawResponseFormatter() {
return new RawEventResponseFormatter();
}
@Bean(name = BEAN_NAME_QUERY_BACKENDS)
@Lazy
public List<String> queryBackends() {
final String value = env.getProperty(QUERY_BACKENDS, "[{}]");
LOGGER.debug("Load '{}={}'", QUERY_BACKENDS, value);
List<ServerAddress> queryBackends;
try {
queryBackends = objectMapper.readValue(value, new TypeReference<ArrayList<ServerAddress>>() {});
} catch (final Exception e) {
LOGGER.warn("Could not load '{}={}'. Use default", QUERY_BACKENDS, value, e);
ServerAddress defaultAddress = new ServerAddress();
queryBackends = Arrays.asList(defaultAddress);
}
final String newDefaultProtocol = "https";
final String newDefaultHost = "data-api.psi.ch";
final int newDefaultPort = 443;
// this should be overwritten
final String newDefaultPath = "";
final List<String> queryBackendStrs = new ArrayList<>(queryBackends.size());
for (final ServerAddress serverAddress : queryBackends) {
serverAddress.replaceDefaults(
newDefaultProtocol,
newDefaultHost,
newDefaultPort,
newDefaultPath);
queryBackendStrs.add(serverAddress.getAddress());
}
return queryBackendStrs;
}
}

View File

@ -3,9 +3,11 @@ package ch.psi.daq.queryrest.controller;
import java.net.URLDecoder;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@ -72,7 +74,6 @@ public class QueryRestController implements ApplicationContextAware {
private static final Logger LOGGER = LoggerFactory.getLogger(QueryRestController.class);
private Set<Backend> activeBackends;
private ApplicationContext context;
private ObjectMapper objectMapper;
private QueryManager queryManager;
@ -81,14 +82,12 @@ public class QueryRestController implements ApplicationContextAware {
private Validator requestProviderValidator;
private Response defaultResponse = new JSONHTTPResponse();
@SuppressWarnings("unchecked")
@Override
public void setApplicationContext(ApplicationContext context) throws BeansException {
final Backend backend = context.getBean(DomainConfig.BEAN_NAME_BACKEND_DEFAULT, Backend.class);
context = backend.getApplicationContext();
this.context = context;
activeBackends = context.getBean(DomainConfig.BEAN_NAME_BACKENDS_ACTIVE, Set.class);
objectMapper = context.getBean(DomainConfig.BEAN_NAME_OBJECT_MAPPER, ObjectMapper.class);
queryManager = context.getBean(QueryRestConfig.BEAN_NAME_QUERY_MANAGER, QueryManager.class);
eventQueryValidator = context.getBean(QueryRestConfig.BEAN_NAME_EVENT_QUERY_VALIDATOR, Validator.class);
@ -511,12 +510,28 @@ public class QueryRestController implements ApplicationContextAware {
value = DomainConfig.PATH_BACKENDS,
method = {RequestMethod.GET},
produces = {MediaType.APPLICATION_JSON_VALUE})
public @ResponseBody List<Backend> getBackendValues() {
return Backend.getBackends().stream()
.filter(backend -> activeBackends.contains(backend))
public @ResponseBody Collection<Backend> getBackendValues() {
return queryManager.getBackends()
.collect(Collectors.toList());
}
/**
* Returns the current list of {@link Backend} values.
*
* @return list of {@link Backend}s as String array
*/
@RequestMapping(
value = DomainConfig.PATH_BACKENDS_BY_ID,
method = {RequestMethod.GET},
produces = {MediaType.APPLICATION_JSON_VALUE})
public @ResponseBody Map<Integer, Backend> getBackendValuesById() {
return queryManager.getBackends()
.collect(
Collectors.toMap(
Backend::getId,
Function.identity()));
}
/**
* Returns the current list of {@link Compression}s available.
*

View File

@ -7,6 +7,7 @@ import java.util.stream.Stream;
import org.apache.commons.lang3.tuple.Triple;
import ch.psi.daq.common.tuple.Quadruple;
import ch.psi.daq.domain.backend.Backend;
import ch.psi.daq.domain.events.ChannelConfiguration;
import ch.psi.daq.domain.json.ChannelName;
import ch.psi.daq.domain.query.DAQConfigQuery;
@ -22,6 +23,8 @@ import ch.psi.daq.domain.query.channels.LongHash;
public interface QueryManager {
Stream<Backend> getBackends();
LongHash getChannelsHash();
Stream<ChannelsResponse> getChannels(final ChannelsRequest request) throws Exception;

View File

@ -4,6 +4,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@ -40,6 +41,7 @@ import ch.psi.daq.query.config.QueryConfig;
import ch.psi.daq.queryrest.config.QueryRestConfig;
public class QueryManagerImpl implements QueryManager, ApplicationContextAware {
private Set<Backend> activeBackends;
private BackendsChannelConfigurationCache channelsCache;
private Function<BackendQuery, BackendQueryAnalyzer> queryAnalizerFactory;
@ -49,6 +51,7 @@ public class QueryManagerImpl implements QueryManager, ApplicationContextAware {
final Backend backend = context.getBean(DomainConfig.BEAN_NAME_BACKEND_DEFAULT, Backend.class);
context = backend.getApplicationContext();
activeBackends = context.getBean(DomainConfig.BEAN_NAME_BACKENDS_ACTIVE, Set.class);
channelsCache =
context.getBean(QueryConfig.BEAN_NAME_HISTORIC_CHANNELS_CACHE, BackendsChannelConfigurationCache.class);
queryAnalizerFactory = context.getBean(QueryRestConfig.BEAN_NAME_QUERY_ANALIZER_FACTORY, Function.class);
@ -57,6 +60,12 @@ public class QueryManagerImpl implements QueryManager, ApplicationContextAware {
@PreDestroy
public void destroy() {}
@Override
public Stream<Backend> getBackends() {
return Backend.getBackends().stream()
.filter(backend -> activeBackends.contains(backend));
}
@Override
public LongHash getChannelsHash() {
return channelsCache.getChannelsHash();

View File

@ -0,0 +1,186 @@
package ch.psi.daq.queryrest.query;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeMap;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.PreDestroy;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.commons.lang3.tuple.Triple;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.web.reactive.function.client.WebClient;
import ch.psi.daq.common.tuple.Quadruple;
import ch.psi.daq.domain.DataEvent;
import ch.psi.daq.domain.backend.Backend;
import ch.psi.daq.domain.config.DomainConfig;
import ch.psi.daq.domain.events.ChannelConfiguration;
import ch.psi.daq.domain.json.ChannelName;
import ch.psi.daq.domain.query.DAQConfigQuery;
import ch.psi.daq.domain.query.DAQConfigQueryElement;
import ch.psi.daq.domain.query.DAQQueries;
import ch.psi.daq.domain.query.DAQQueryElement;
import ch.psi.daq.domain.query.backend.BackendQuery;
import ch.psi.daq.domain.query.backend.BackendQueryImpl;
import ch.psi.daq.domain.query.backend.analyzer.BackendQueryAnalyzer;
import ch.psi.daq.domain.query.channels.BackendsChannelConfigurationCache;
import ch.psi.daq.domain.query.channels.ChannelConfigurationsRequest;
import ch.psi.daq.domain.query.channels.ChannelConfigurationsResponse;
import ch.psi.daq.domain.query.channels.ChannelsRequest;
import ch.psi.daq.domain.query.channels.ChannelsResponse;
import ch.psi.daq.domain.query.channels.LongHash;
import ch.psi.daq.domain.query.processor.QueryProcessor;
import ch.psi.daq.query.config.QueryConfig;
import ch.psi.daq.queryrest.config.QueryRestConfig;
public class QueryManagerRemote implements QueryManager, ApplicationContextAware {
private WebClient client;
private Map<Backend, String> backendToServerAddresses;
// private BackendsChannelConfigurationCache channelsCache;
// private Function<BackendQuery, BackendQueryAnalyzer> queryAnalizerFactory;
@SuppressWarnings("unchecked")
@Override
public void setApplicationContext(ApplicationContext context) throws BeansException {
backendToServerAddresses = new TreeMap<>(Comparator.comparingInt(backend -> backend.getId()));
}
@PreDestroy
public void destroy() {}
@Override
public Stream<Backend> getBackends() {
return Backend.getBackends().stream()
.filter(backend -> activeBackends.contains(backend))
.collect(Collectors.toList());
}
@Override
public LongHash getChannelsHash() {
return channelsCache.getChannelsHash();
}
@Override
public Stream<ChannelsResponse> getChannels(ChannelsRequest request) {
return channelsCache.getChannels(request);
}
@Override
public LongHash getChannelConfigurationsHash() {
return channelsCache.getChannelConfigurationsHash();
}
@Override
public Stream<ChannelConfigurationsResponse> getChannelConfigurations(ChannelConfigurationsRequest request) {
return channelsCache.getChannelConfigurations(request);
}
@Override
public ChannelConfiguration getChannelConfiguration(ChannelName channel) {
return channelsCache.getChannelConfiguration(channel);
}
@Override
public Entry<DAQConfigQueryElement, Stream<Triple<BackendQuery, ChannelName, ?>>> queryConfigs(
final DAQConfigQuery daqQuery) {
// set backends if not defined yet
channelsCache.configureBackends(daqQuery.getChannels());
Stream<Triple<BackendQuery, ChannelName, ?>> resultStreams =
BackendQueryImpl
.getBackendQueries(daqQuery)
.stream()
.filter(
query -> query.getBackend().getBackendAccess().hasStreamEventReader())
.flatMap(
query -> {
/* all the magic happens here */
final Map<String, Stream<? extends ChannelConfiguration>> channelToConfig =
query.getChannelConfigurations();
return channelToConfig.entrySet().stream()
.map(entry -> {
return Triple.of(
query,
new ChannelName(entry.getKey(), query.getBackend()),
entry.getValue());
});
});
return Pair.of(daqQuery, resultStreams);
}
@Override
public List<Entry<DAQQueryElement, Stream<Quadruple<BackendQuery, ChannelName, ?, ?>>>> queryEvents(
final DAQQueries queries) {
// set backends if not defined yet
for (DAQQueryElement daqQuery : queries) {
channelsCache.configureBackends(daqQuery.getChannels());
}
final List<Entry<DAQQueryElement, Stream<Quadruple<BackendQuery, ChannelName, ?, ?>>>> results =
new ArrayList<>(queries.getQueries().size());
for (final DAQQueryElement queryElement : queries) {
Stream<Quadruple<BackendQuery, ChannelName, ?, ?>> resultStreams =
BackendQueryImpl
.getBackendQueries(queryElement)
.stream()
.filter(
query -> query.getBackend().getBackendAccess().hasDataReader()
&& query.getBackend().getBackendAccess().hasQueryProcessor())
.flatMap(
query -> {
final QueryProcessor processor =
query.getBackend().getBackendAccess().getQueryProcessor();
final BackendQueryAnalyzer queryAnalizer = queryAnalizerFactory.apply(query);
// ChannelEvent query
/* all the magic happens here */
final Stream<Entry<ChannelName, Stream<? extends DataEvent>>> channelToDataEvents =
processor.process(queryAnalizer);
/* do post-process */
final Stream<Entry<ChannelName, ?>> channelToData =
queryAnalizer.postProcess(channelToDataEvents);
// ChannelConfig query
final BackendQuery configQuery = new BackendQueryImpl(query, queryElement.getConfigFields());
final Map<String, Stream<? extends ChannelConfiguration>> channelToConfig =
configQuery.getChannelConfigurations();
return channelToData.map(entry -> {
return Quadruple.of(
query,
entry.getKey(),
channelToConfig.get(entry.getKey().getName()),
entry.getValue());
});
});
// Now we have a stream that loads elements sequential BackendQuery by BackendQuery.
// By materializing the outer Stream the elements of all BackendQuery are loaded async
// (speeds things up but requires also more memory - i.e. it relies on Backends not loading
// all elements into memory at once)
resultStreams = resultStreams.collect(Collectors.toList()).stream();
results.add(Pair.of(queryElement, resultStreams));
}
return results;
}
}

View File

@ -9,6 +9,10 @@ queryrest.response.fields.event.query.aggregations=min,mean,max
queryrest.response.fields.config.query=name,backend,pulseId,globalSeconds,type,shape,source,unit
queryrest.response.fields.config.historic=name,backend,type,shape,source,description,unit
# Central Query Configs
#######################
query.backens=[{"path":"/sf"},{"path":"/gls"},{"path":"/hipa"},{"path":"/saresa"},{"path":"/saresb"}]
# defines if the writer is a local writer (can write data to filesystem)
filestorage.writer.local=false
# defines if the writer is a local reader (can read data from filesystem)