Towards central query
This commit is contained in:
@ -55,6 +55,7 @@ import ch.psi.daq.queryrest.controller.validator.ConfigQueryValidator;
|
||||
import ch.psi.daq.queryrest.controller.validator.EventQueryValidator;
|
||||
import ch.psi.daq.queryrest.query.QueryManager;
|
||||
import ch.psi.daq.queryrest.query.QueryManagerImpl;
|
||||
import ch.psi.daq.queryrest.query.QueryManagerRemote;
|
||||
import ch.psi.daq.queryrest.response.PolymorphicResponseMixIn;
|
||||
import ch.psi.daq.queryrest.response.csv.CSVResponseStreamWriter;
|
||||
import ch.psi.daq.queryrest.response.formatter.AnyResponseFormatter;
|
||||
@ -98,6 +99,8 @@ public class QueryRestConfig { // extends WebMvcConfigurerAdapter {
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(QueryRestConfig.class);
|
||||
|
||||
public static final String BEAN_NAME_QUERY_MANAGER = "queryManager";
|
||||
public static final String BEAN_NAME_QUERY_MANAGER_LOCAL = "queryManagerLocal";
|
||||
public static final String BEAN_NAME_QUERY_MANAGER_REMOTE = "queryManagerRemote";
|
||||
public static final String BEAN_NAME_QUERY_ANALIZER_FACTORY = "queryAnalizerFactory";
|
||||
public static final String BEAN_NAME_EVENT_QUERY_VALIDATOR = "eventQueryValidator";
|
||||
public static final String BEAN_NAME_CONFIG_QUERY_VALIDATOR = "configQueryValidator";
|
||||
@ -115,9 +118,10 @@ public class QueryRestConfig { // extends WebMvcConfigurerAdapter {
|
||||
public static final String BEAN_NAME_FORMATTER_HISTORIC_CHANNEL_CONFIGURATION =
|
||||
"formatterHistoricChannelConfiguration";
|
||||
public static final String BEAN_NAME_FORMATTER_RAW_EVENT = "formatterRawEvent";
|
||||
public static final String BEAN_NAME_QUERY_BACKENDS = "queryBackends";
|
||||
private static final String QUERY_BACKENDS = "query.backens";
|
||||
public static final String BEAN_NAME_QUERY_SERVER_ADDRESSES = "queryBackends";
|
||||
|
||||
private static final String QUERY_SERVER_TYPE = "query.server.type";
|
||||
private static final String QUERY_SERVER_ADDRESSES = "query.server.addresses";
|
||||
|
||||
@Resource
|
||||
private ApplicationContext context;
|
||||
@ -238,9 +242,33 @@ public class QueryRestConfig { // extends WebMvcConfigurerAdapter {
|
||||
@Bean(name = BEAN_NAME_QUERY_MANAGER)
|
||||
@Lazy
|
||||
public QueryManager queryManager() {
|
||||
final String value = env.getProperty(QUERY_SERVER_TYPE, "local");
|
||||
LOGGER.debug("Load '{}={}'", QUERY_SERVER_TYPE, value);
|
||||
|
||||
if (QueryManagerImpl.QUERY_SERVER_TYPE.equals(value)) {
|
||||
return context.getBean(BEAN_NAME_QUERY_MANAGER_LOCAL, QueryManager.class);
|
||||
} else if (QueryManagerRemote.QUERY_SERVER_TYPE.equals(value)) {
|
||||
return context.getBean(BEAN_NAME_QUERY_MANAGER_REMOTE, QueryManager.class);
|
||||
} else {
|
||||
final String message = String.format("Unknown value for '%s=%s'", QUERY_SERVER_TYPE, value);
|
||||
LOGGER.error(message);
|
||||
throw new IllegalStateException(message);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Bean(name = BEAN_NAME_QUERY_MANAGER_LOCAL)
|
||||
@Lazy
|
||||
public QueryManagerImpl queryManagerLocal() {
|
||||
return new QueryManagerImpl();
|
||||
}
|
||||
|
||||
@Bean(name = BEAN_NAME_QUERY_MANAGER_REMOTE)
|
||||
@Lazy
|
||||
public QueryManagerRemote queryManagerRemote() {
|
||||
return new QueryManagerRemote();
|
||||
}
|
||||
|
||||
@Bean(name = BEAN_NAME_DEFAULT_EVENT_RESPONSE_FIELDS)
|
||||
@Lazy
|
||||
public Set<EventField> defaultEventResponseFields() {
|
||||
@ -378,17 +406,17 @@ public class QueryRestConfig { // extends WebMvcConfigurerAdapter {
|
||||
return new RawEventResponseFormatter();
|
||||
}
|
||||
|
||||
@Bean(name = BEAN_NAME_QUERY_BACKENDS)
|
||||
@Bean(name = BEAN_NAME_QUERY_SERVER_ADDRESSES)
|
||||
@Lazy
|
||||
public List<String> queryBackends() {
|
||||
final String value = env.getProperty(QUERY_BACKENDS, "[{}]");
|
||||
LOGGER.debug("Load '{}={}'", QUERY_BACKENDS, value);
|
||||
public List<String> queryServerAddresses() {
|
||||
final String value = env.getProperty(QUERY_SERVER_ADDRESSES, "[{}]");
|
||||
LOGGER.debug("Load '{}={}'", QUERY_SERVER_ADDRESSES, value);
|
||||
|
||||
List<ServerAddress> queryBackends;
|
||||
try {
|
||||
queryBackends = objectMapper.readValue(value, new TypeReference<ArrayList<ServerAddress>>() {});
|
||||
} catch (final Exception e) {
|
||||
LOGGER.warn("Could not load '{}={}'. Use default", QUERY_BACKENDS, value, e);
|
||||
LOGGER.warn("Could not load '{}={}'. Use default", QUERY_SERVER_ADDRESSES, value, e);
|
||||
ServerAddress defaultAddress = new ServerAddress();
|
||||
queryBackends = Arrays.asList(defaultAddress);
|
||||
}
|
||||
|
@ -11,6 +11,7 @@ import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
import javax.validation.Valid;
|
||||
|
||||
@ -122,8 +123,10 @@ public class QueryRestController implements ApplicationContextAware {
|
||||
value = DomainConfig.PATH_CHANNELS,
|
||||
method = {RequestMethod.GET, RequestMethod.POST},
|
||||
produces = {MediaType.APPLICATION_JSON_VALUE})
|
||||
public void getChannels(@RequestBody(required = false) ChannelsRequest request,
|
||||
final HttpServletResponse res)
|
||||
public void getChannels(
|
||||
@RequestBody(required = false) ChannelsRequest request,
|
||||
final HttpServletRequest httpRequest,
|
||||
final HttpServletResponse httpResponse)
|
||||
throws Throwable {
|
||||
// in case not specified use defaults (e.g. GET)
|
||||
if (request == null) {
|
||||
@ -132,7 +135,7 @@ public class QueryRestController implements ApplicationContextAware {
|
||||
|
||||
((AbstractHTTPResponse) defaultResponse).respond(
|
||||
context,
|
||||
res,
|
||||
httpResponse,
|
||||
request,
|
||||
queryManager.getChannels(request));
|
||||
}
|
||||
@ -142,9 +145,11 @@ public class QueryRestController implements ApplicationContextAware {
|
||||
method = {RequestMethod.GET},
|
||||
produces = {MediaType.APPLICATION_JSON_VALUE})
|
||||
public void getChannels(
|
||||
@PathVariable(value = "channel") final String channelName, final HttpServletResponse res)
|
||||
@PathVariable(value = "channel") final String channelName,
|
||||
final HttpServletRequest httpRequest,
|
||||
final HttpServletResponse httpResponse)
|
||||
throws Throwable {
|
||||
getChannels(new ChannelsRequest(channelName), res);
|
||||
getChannels(new ChannelsRequest(channelName), httpRequest, httpResponse);
|
||||
}
|
||||
|
||||
@RequestMapping(
|
||||
@ -159,8 +164,10 @@ public class QueryRestController implements ApplicationContextAware {
|
||||
value = DomainConfig.PATH_CHANNELS_CONFIG,
|
||||
method = {RequestMethod.GET, RequestMethod.POST},
|
||||
produces = {MediaType.APPLICATION_JSON_VALUE})
|
||||
public void getChannelConfigurations(@RequestBody(required = false) ChannelConfigurationsRequest request,
|
||||
final HttpServletResponse res) throws Throwable {
|
||||
public void getChannelConfigurations(
|
||||
@RequestBody(required = false) ChannelConfigurationsRequest request,
|
||||
final HttpServletRequest httpRequest,
|
||||
final HttpServletResponse httpResponse) throws Throwable {
|
||||
// in case not specified use defaults (e.g. GET)
|
||||
if (request == null) {
|
||||
request = new ChannelConfigurationsRequest();
|
||||
@ -168,7 +175,7 @@ public class QueryRestController implements ApplicationContextAware {
|
||||
|
||||
((AbstractHTTPResponse) defaultResponse).respond(
|
||||
context,
|
||||
res,
|
||||
httpResponse,
|
||||
request,
|
||||
queryManager.getChannelConfigurations(request));
|
||||
}
|
||||
@ -178,16 +185,21 @@ public class QueryRestController implements ApplicationContextAware {
|
||||
method = {RequestMethod.GET},
|
||||
produces = {MediaType.APPLICATION_JSON_VALUE})
|
||||
public void getChannelConfigurations(
|
||||
@PathVariable(value = "channel") final String channelName, final HttpServletResponse res)
|
||||
@PathVariable(value = "channel") final String channelName,
|
||||
final HttpServletRequest httpRequest,
|
||||
final HttpServletResponse httpResponse)
|
||||
throws Throwable {
|
||||
getChannelConfigurations(new ChannelConfigurationsRequest(channelName), res);
|
||||
getChannelConfigurations(new ChannelConfigurationsRequest(channelName), httpRequest, httpResponse);
|
||||
}
|
||||
|
||||
@RequestMapping(
|
||||
value = DomainConfig.PATH_CHANNEL_CONFIG,
|
||||
method = {RequestMethod.POST},
|
||||
produces = {MediaType.APPLICATION_JSON_VALUE})
|
||||
public void getChannelConfiguration(@RequestBody final ChannelName channelName, final HttpServletResponse res)
|
||||
public void getChannelConfiguration(
|
||||
@RequestBody final ChannelName channelName,
|
||||
final HttpServletRequest httpRequest,
|
||||
final HttpServletResponse httpResponse)
|
||||
throws Throwable {
|
||||
try {
|
||||
final ChannelConfiguration config = queryManager.getChannelConfiguration(channelName);
|
||||
@ -195,11 +207,11 @@ public class QueryRestController implements ApplicationContextAware {
|
||||
if (config != null) {
|
||||
((AbstractHTTPResponse) defaultResponse).respond(
|
||||
context,
|
||||
res,
|
||||
httpResponse,
|
||||
channelName,
|
||||
config);
|
||||
} else {
|
||||
res.setStatus(HttpStatus.NOT_FOUND.value());
|
||||
httpResponse.setStatus(HttpStatus.NOT_FOUND.value());
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
LOGGER.warn("Failed to get channel configuration.", t);
|
||||
@ -212,19 +224,30 @@ public class QueryRestController implements ApplicationContextAware {
|
||||
method = {RequestMethod.GET},
|
||||
produces = {MediaType.APPLICATION_JSON_VALUE})
|
||||
public void getChannelConfiguration(
|
||||
@PathVariable(value = "channel") final String channelName, final HttpServletResponse res)
|
||||
@PathVariable(value = "channel") final String channelName,
|
||||
final HttpServletRequest httpRequest,
|
||||
final HttpServletResponse httpResponse)
|
||||
throws Throwable {
|
||||
getChannelConfiguration(new ChannelName(channelName), res);
|
||||
getChannelConfiguration(new ChannelName(channelName), httpRequest, httpResponse);
|
||||
}
|
||||
|
||||
@RequestMapping(
|
||||
value = DomainConfig.PATH_QUERY_CONFIG,
|
||||
method = RequestMethod.POST,
|
||||
consumes = {MediaType.APPLICATION_JSON_VALUE})
|
||||
public void executeDAQConfigQuery(@RequestBody @Valid final DAQConfigQuery query, final HttpServletResponse res)
|
||||
public void executeDAQConfigQuery(
|
||||
@RequestBody @Valid final DAQConfigQuery query,
|
||||
final HttpServletRequest httpRequest,
|
||||
final HttpServletResponse httpResponse)
|
||||
throws Throwable {
|
||||
final String redirect = queryManager.getRedirection(query.getChannels());
|
||||
if (redirect != null) {
|
||||
httpResponse.sendRedirect(redirect + DomainConfig.PATH_QUERY_CONFIG);
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
LOGGER.debug("Executing queries '{}'", query);
|
||||
LOGGER.debug("Executing query '{}'", query);
|
||||
|
||||
final Response response = query.getResponseOrDefault(defaultResponse);
|
||||
if (response instanceof AbstractHTTPResponse) {
|
||||
@ -288,6 +311,12 @@ public class QueryRestController implements ApplicationContextAware {
|
||||
|
||||
final List<ObjectError> allErrors = errors.getAllErrors();
|
||||
if (allErrors.isEmpty()) {
|
||||
final String redirect = queryManager.getRedirection(query.getChannels());
|
||||
if (redirect != null) {
|
||||
res.sendRedirect(redirect + DomainConfig.PATH_QUERY);
|
||||
return;
|
||||
}
|
||||
|
||||
executeDAQQuery(query, res);
|
||||
} else {
|
||||
final String message = String.format("Could not parse '%s' due to '%s'.", jsonBody, errors.toString());
|
||||
@ -308,6 +337,12 @@ public class QueryRestController implements ApplicationContextAware {
|
||||
method = RequestMethod.POST,
|
||||
consumes = {MediaType.APPLICATION_JSON_VALUE})
|
||||
public void executeDAQQuery(@RequestBody @Valid DAQQuery query, HttpServletResponse res) throws Exception {
|
||||
final String redirect = queryManager.getRedirection(query.getChannels());
|
||||
if (redirect != null) {
|
||||
res.sendRedirect(redirect + DomainConfig.PATH_QUERY);
|
||||
return;
|
||||
}
|
||||
|
||||
executeDAQQueries(new DAQQueries(query), res);
|
||||
}
|
||||
|
||||
@ -343,6 +378,15 @@ public class QueryRestController implements ApplicationContextAware {
|
||||
|
||||
final List<ObjectError> allErrors = errors.getAllErrors();
|
||||
if (allErrors.isEmpty()) {
|
||||
final Collection<ChannelName> channels = queries.getQueries().stream()
|
||||
.flatMap(daqQuery -> daqQuery.getChannels().stream())
|
||||
.collect(Collectors.toList());
|
||||
final String redirect = queryManager.getRedirection(channels);
|
||||
if (redirect != null) {
|
||||
res.sendRedirect(redirect + DomainConfig.PATH_QUERIES);
|
||||
return;
|
||||
}
|
||||
|
||||
executeDAQQueries(queries, res);
|
||||
} else {
|
||||
final String message = String.format("Could not parse '%s' due to '%s'.", jsonBody, errors.toString());
|
||||
@ -368,6 +412,15 @@ public class QueryRestController implements ApplicationContextAware {
|
||||
method = RequestMethod.POST,
|
||||
consumes = {MediaType.APPLICATION_JSON_VALUE})
|
||||
public void executeDAQQueries(@RequestBody @Valid DAQQueries queries, HttpServletResponse res) throws Exception {
|
||||
final Collection<ChannelName> channels = queries.getQueries().stream()
|
||||
.flatMap(daqQuery -> daqQuery.getChannels().stream())
|
||||
.collect(Collectors.toList());
|
||||
final String redirect = queryManager.getRedirection(channels);
|
||||
if (redirect != null) {
|
||||
res.sendRedirect(redirect + DomainConfig.PATH_QUERIES);
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
LOGGER.debug("Executing queries '{}'", queries);
|
||||
|
||||
|
@ -1,5 +1,6 @@
|
||||
package ch.psi.daq.queryrest.query;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.stream.Stream;
|
||||
@ -24,21 +25,34 @@ import ch.psi.daq.domain.query.channels.LongHash;
|
||||
public interface QueryManager {
|
||||
|
||||
Stream<Backend> getBackends();
|
||||
|
||||
|
||||
/**
|
||||
* Provides the root path of the redirection (or <tt>null</tt> if no redirection is possible).
|
||||
*
|
||||
* @param channels The channels
|
||||
* @return String The root of the redirection or <tt>null</tt> for none.
|
||||
*/
|
||||
String getRedirection(final Collection<ChannelName> channels);
|
||||
|
||||
LongHash getChannelsHash();
|
||||
|
||||
Stream<ChannelsResponse> getChannels(final ChannelsRequest request) throws Exception;
|
||||
Stream<ChannelsResponse> getChannels(
|
||||
final ChannelsRequest request) throws Exception;
|
||||
|
||||
LongHash getChannelConfigurationsHash();
|
||||
|
||||
Stream<ChannelConfigurationsResponse> getChannelConfigurations(final ChannelConfigurationsRequest request)
|
||||
Stream<ChannelConfigurationsResponse> getChannelConfigurations(
|
||||
final ChannelConfigurationsRequest request)
|
||||
throws Exception;
|
||||
|
||||
ChannelConfiguration getChannelConfiguration(final ChannelName channel) throws Exception;
|
||||
ChannelConfiguration getChannelConfiguration(
|
||||
final ChannelName channel) throws Exception;
|
||||
|
||||
Entry<DAQConfigQueryElement, Stream<Triple<BackendQuery, ChannelName, ?>>> queryConfigs(final DAQConfigQuery query)
|
||||
Entry<DAQConfigQueryElement, Stream<Triple<BackendQuery, ChannelName, ?>>> queryConfigs(
|
||||
final DAQConfigQuery query)
|
||||
throws Exception;
|
||||
|
||||
List<Entry<DAQQueryElement, Stream<Quadruple<BackendQuery, ChannelName, ?, ?>>>> queryEvents(final DAQQueries queries)
|
||||
List<Entry<DAQQueryElement, Stream<Quadruple<BackendQuery, ChannelName, ?, ?>>>> queryEvents(
|
||||
final DAQQueries queries)
|
||||
throws Exception;
|
||||
}
|
||||
|
@ -1,6 +1,7 @@
|
||||
package ch.psi.daq.queryrest.query;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
@ -10,6 +11,7 @@ import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import javax.annotation.PreDestroy;
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
import org.apache.commons.lang3.tuple.Triple;
|
||||
@ -41,6 +43,7 @@ import ch.psi.daq.query.config.QueryConfig;
|
||||
import ch.psi.daq.queryrest.config.QueryRestConfig;
|
||||
|
||||
public class QueryManagerImpl implements QueryManager, ApplicationContextAware {
|
||||
public static final String QUERY_SERVER_TYPE = "local";
|
||||
private Set<Backend> activeBackends;
|
||||
private BackendsChannelConfigurationCache channelsCache;
|
||||
private Function<BackendQuery, BackendQueryAnalyzer> queryAnalizerFactory;
|
||||
@ -66,13 +69,21 @@ public class QueryManagerImpl implements QueryManager, ApplicationContextAware {
|
||||
.filter(backend -> activeBackends.contains(backend));
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getRedirection(final Collection<ChannelName> channels){
|
||||
// set backends if not defined yet
|
||||
channelsCache.configureBackends(channels);
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public LongHash getChannelsHash() {
|
||||
return channelsCache.getChannelsHash();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Stream<ChannelsResponse> getChannels(ChannelsRequest request) {
|
||||
public Stream<ChannelsResponse> getChannels(
|
||||
final ChannelsRequest request) {
|
||||
return channelsCache.getChannels(request);
|
||||
}
|
||||
|
||||
@ -82,12 +93,14 @@ public class QueryManagerImpl implements QueryManager, ApplicationContextAware {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Stream<ChannelConfigurationsResponse> getChannelConfigurations(ChannelConfigurationsRequest request) {
|
||||
public Stream<ChannelConfigurationsResponse> getChannelConfigurations(
|
||||
final ChannelConfigurationsRequest request) {
|
||||
return channelsCache.getChannelConfigurations(request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelConfiguration getChannelConfiguration(ChannelName channel) {
|
||||
public ChannelConfiguration getChannelConfiguration(
|
||||
final ChannelName channel) {
|
||||
return channelsCache.getChannelConfiguration(channel);
|
||||
}
|
||||
|
||||
|
214
src/main/java/ch/psi/daq/queryrest/query/QueryManagerRemote.java
Normal file
214
src/main/java/ch/psi/daq/queryrest/query/QueryManagerRemote.java
Normal file
@ -0,0 +1,214 @@
|
||||
package ch.psi.daq.queryrest.query;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Set;
|
||||
import java.util.TreeMap;
|
||||
import java.util.function.BiFunction;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import javax.annotation.PreDestroy;
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
import org.apache.commons.lang3.tuple.Triple;
|
||||
import org.springframework.beans.BeansException;
|
||||
import org.springframework.context.ApplicationContext;
|
||||
import org.springframework.context.ApplicationContextAware;
|
||||
|
||||
import ch.psi.daq.common.tuple.Quadruple;
|
||||
import ch.psi.daq.domain.DataEvent;
|
||||
import ch.psi.daq.domain.backend.Backend;
|
||||
import ch.psi.daq.domain.backend.BackendType;
|
||||
import ch.psi.daq.domain.backend.DomainBackendType;
|
||||
import ch.psi.daq.domain.events.ChannelConfiguration;
|
||||
import ch.psi.daq.domain.json.ChannelName;
|
||||
import ch.psi.daq.domain.query.DAQConfigQuery;
|
||||
import ch.psi.daq.domain.query.DAQConfigQueryElement;
|
||||
import ch.psi.daq.domain.query.DAQQueries;
|
||||
import ch.psi.daq.domain.query.DAQQueryElement;
|
||||
import ch.psi.daq.domain.query.backend.BackendQuery;
|
||||
import ch.psi.daq.domain.query.backend.BackendQueryImpl;
|
||||
import ch.psi.daq.domain.query.backend.analyzer.BackendQueryAnalyzer;
|
||||
import ch.psi.daq.domain.query.channels.BackendsChannelConfigurationCache;
|
||||
import ch.psi.daq.domain.query.channels.ChannelConfigurationsRequest;
|
||||
import ch.psi.daq.domain.query.channels.ChannelConfigurationsResponse;
|
||||
import ch.psi.daq.domain.query.channels.ChannelsRequest;
|
||||
import ch.psi.daq.domain.query.channels.ChannelsResponse;
|
||||
import ch.psi.daq.domain.query.channels.LongHash;
|
||||
import ch.psi.daq.domain.query.processor.QueryProcessor;
|
||||
import ch.psi.daq.domain.rest.RestHelper;
|
||||
import ch.psi.daq.queryrest.config.QueryRestConfig;
|
||||
|
||||
public class QueryManagerRemote implements QueryManager, ApplicationContextAware {
|
||||
public static final String QUERY_SERVER_TYPE = "remote";
|
||||
private Map<Backend, String> backendToServerAddresses;
|
||||
private BackendsChannelConfigurationCache channelsCache;
|
||||
private Function<BackendQuery, BackendQueryAnalyzer> queryAnalizerFactory;
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public void setApplicationContext(ApplicationContext context) throws BeansException {
|
||||
final BiFunction<String, String, BackendType> typeCreator =
|
||||
(name, queryServer) -> new DomainBackendType(name, queryServer);
|
||||
final List<String> queryServerAddresses =
|
||||
context.getBean(QueryRestConfig.BEAN_NAME_QUERY_SERVER_ADDRESSES, List.class);
|
||||
backendToServerAddresses = RestHelper.getBackends(context, queryServerAddresses, typeCreator);
|
||||
|
||||
// channelsCache = new BackendsChannelConfigurationCache(backendToServerAddresses.keySet());
|
||||
// channelsCache =
|
||||
// context.getBean(QueryConfig.BEAN_NAME_HISTORIC_CHANNELS_CACHE,
|
||||
// BackendsChannelConfigurationCache.class);
|
||||
queryAnalizerFactory = context.getBean(QueryRestConfig.BEAN_NAME_QUERY_ANALIZER_FACTORY, Function.class);
|
||||
}
|
||||
|
||||
@PreDestroy
|
||||
public void destroy() {}
|
||||
|
||||
@Override
|
||||
public Stream<Backend> getBackends() {
|
||||
return backendToServerAddresses.keySet().stream();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getRedirection(final Collection<ChannelName> channels) {
|
||||
// set backends if not defined yet
|
||||
channelsCache.configureBackends(channels);
|
||||
|
||||
final Set<String> backendRoots = channels.stream()
|
||||
.map(channelName -> backendToServerAddresses.get(channelName.getBackend()))
|
||||
.collect(Collectors.toSet());
|
||||
|
||||
if (backendRoots.size() == 1) {
|
||||
// request can be sent to one REST backend -> do a redirection
|
||||
return backendRoots.iterator().next();
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public LongHash getChannelsHash() {
|
||||
return channelsCache.getChannelsHash();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Stream<ChannelsResponse> getChannels(
|
||||
final ChannelsRequest request) {
|
||||
return channelsCache.getChannels(request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public LongHash getChannelConfigurationsHash() {
|
||||
return channelsCache.getChannelConfigurationsHash();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Stream<ChannelConfigurationsResponse> getChannelConfigurations(
|
||||
final ChannelConfigurationsRequest request) {
|
||||
return channelsCache.getChannelConfigurations(request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelConfiguration getChannelConfiguration(
|
||||
final ChannelName channel) {
|
||||
return channelsCache.getChannelConfiguration(channel);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Entry<DAQConfigQueryElement, Stream<Triple<BackendQuery, ChannelName, ?>>> queryConfigs(
|
||||
final DAQConfigQuery daqQuery) {
|
||||
// set backends if not defined yet
|
||||
channelsCache.configureBackends(daqQuery.getChannels());
|
||||
|
||||
Stream<Triple<BackendQuery, ChannelName, ?>> resultStreams =
|
||||
BackendQueryImpl
|
||||
.getBackendQueries(daqQuery)
|
||||
.stream()
|
||||
.filter(
|
||||
query -> query.getBackend().getBackendAccess().hasStreamEventReader())
|
||||
.flatMap(
|
||||
query -> {
|
||||
/* all the magic happens here */
|
||||
final Map<String, Stream<? extends ChannelConfiguration>> channelToConfig =
|
||||
query.getChannelConfigurations();
|
||||
|
||||
return channelToConfig.entrySet().stream()
|
||||
.map(entry -> {
|
||||
return Triple.of(
|
||||
query,
|
||||
new ChannelName(entry.getKey(), query.getBackend()),
|
||||
entry.getValue());
|
||||
});
|
||||
});
|
||||
|
||||
return Pair.of(daqQuery, resultStreams);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Entry<DAQQueryElement, Stream<Quadruple<BackendQuery, ChannelName, ?, ?>>>> queryEvents(
|
||||
final DAQQueries queries) {
|
||||
// set backends if not defined yet
|
||||
for (DAQQueryElement daqQuery : queries) {
|
||||
channelsCache.configureBackends(daqQuery.getChannels());
|
||||
}
|
||||
|
||||
final List<Entry<DAQQueryElement, Stream<Quadruple<BackendQuery, ChannelName, ?, ?>>>> results =
|
||||
new ArrayList<>(queries.getQueries().size());
|
||||
|
||||
for (final DAQQueryElement queryElement : queries) {
|
||||
Stream<Quadruple<BackendQuery, ChannelName, ?, ?>> resultStreams =
|
||||
BackendQueryImpl
|
||||
.getBackendQueries(queryElement)
|
||||
.stream()
|
||||
.filter(
|
||||
query -> query.getBackend().getBackendAccess().hasDataReader()
|
||||
&& query.getBackend().getBackendAccess().hasQueryProcessor())
|
||||
.flatMap(
|
||||
query -> {
|
||||
final QueryProcessor processor =
|
||||
query.getBackend().getBackendAccess().getQueryProcessor();
|
||||
final BackendQueryAnalyzer queryAnalizer = queryAnalizerFactory.apply(query);
|
||||
|
||||
// ChannelEvent query
|
||||
/* all the magic happens here */
|
||||
final Stream<Entry<ChannelName, Stream<? extends DataEvent>>> channelToDataEvents =
|
||||
processor.process(queryAnalizer);
|
||||
/* do post-process */
|
||||
final Stream<Entry<ChannelName, ?>> channelToData =
|
||||
queryAnalizer.postProcess(channelToDataEvents);
|
||||
|
||||
// ChannelConfig query
|
||||
final BackendQuery configQuery =
|
||||
new BackendQueryImpl(query, queryElement.getConfigFields());
|
||||
final Map<String, Stream<? extends ChannelConfiguration>> channelToConfig =
|
||||
configQuery.getChannelConfigurations();
|
||||
|
||||
return channelToData.map(entry -> {
|
||||
return Quadruple.of(
|
||||
query,
|
||||
entry.getKey(),
|
||||
channelToConfig.get(entry.getKey().getName()),
|
||||
entry.getValue());
|
||||
});
|
||||
});
|
||||
|
||||
// Now we have a stream that loads elements sequential BackendQuery by BackendQuery.
|
||||
// By materializing the outer Stream the elements of all BackendQuery are loaded async
|
||||
// (speeds things up but requires also more memory - i.e. it relies on Backends not loading
|
||||
// all elements into memory at once)
|
||||
resultStreams = resultStreams.collect(Collectors.toList()).stream();
|
||||
|
||||
results.add(Pair.of(queryElement, resultStreams));
|
||||
}
|
||||
|
||||
return results;
|
||||
}
|
||||
|
||||
}
|
@ -9,9 +9,13 @@ queryrest.response.fields.event.query.aggregations=min,mean,max
|
||||
queryrest.response.fields.config.query=name,backend,pulseId,globalSeconds,type,shape,source,unit
|
||||
queryrest.response.fields.config.historic=name,backend,type,shape,source,description,unit
|
||||
|
||||
# Central Query Configs
|
||||
#######################
|
||||
query.backens=[{"path":"/sf"},{"path":"/gls"},{"path":"/hipa"},{"path":"/saresa"},{"path":"/saresb"}]
|
||||
# Remote Query Configs
|
||||
# defining how backends should be accessed
|
||||
##########################################
|
||||
# defines the query server type "local" for local backend access and "remote" for remote backend access through REST calls
|
||||
query.server.type=local
|
||||
# defines REST backends (for query.server.type=remote
|
||||
query.server.addresses=[{"path":"/sf"},{"path":"/gls"},{"path":"/hipa"},{"path":"/saresa"},{"path":"/saresb"}]
|
||||
|
||||
# defines if the writer is a local writer (can write data to filesystem)
|
||||
filestorage.writer.local=false
|
||||
|
@ -0,0 +1,39 @@
|
||||
package ch.psi.daq.test.queryrest.query;
|
||||
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertSame;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.springframework.context.ApplicationContext;
|
||||
|
||||
import ch.psi.daq.common.serialization.SerializationHelper;
|
||||
import ch.psi.daq.domain.backend.Backend;
|
||||
import ch.psi.daq.domain.config.DomainConfig;
|
||||
import ch.psi.daq.queryrest.config.QueryRestConfig;
|
||||
import ch.psi.daq.queryrest.query.QueryManager;
|
||||
import ch.psi.daq.test.queryrest.AbstractDaqRestTest;
|
||||
|
||||
public class QueryManagerRemoteTest extends AbstractDaqRestTest {
|
||||
|
||||
@Resource
|
||||
private ApplicationContext context;
|
||||
private QueryManager queryManager;
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
queryManager = context.getBean(QueryRestConfig.BEAN_NAME_QUERY_MANAGER_REMOTE, QueryManager.class);
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() {}
|
||||
|
||||
@Test
|
||||
public void testQueryManager_01() throws Exception {
|
||||
assertTrue(false);
|
||||
}
|
||||
}
|
@ -10,6 +10,8 @@ domain.keyspace.base=daq_query_test
|
||||
|
||||
channels.cache.reload.period=-1
|
||||
|
||||
query.server.type=remote
|
||||
|
||||
query.min.time=1970-01-01T00:00:00.000000000+00:00
|
||||
|
||||
# overload test
|
||||
|
Reference in New Issue
Block a user