Central query backend

This commit is contained in:
Fabian Märki
2019-02-14 11:58:25 +01:00
parent 8cdfcb5b82
commit cce7b4f4fb
11 changed files with 854 additions and 287 deletions

View File

@ -82,7 +82,7 @@ POST https://<host>:<port>/channels
#### Command
```bash
curl -H "Content-Type: application/json" -X POST -d '{"regex": "AMPLT|PHASE"}' https://data-api.psi.ch/sf/channels | python -m json.tool
curl -L -H "Content-Type: application/json" -X POST -d '{"regex": "AMPLT|PHASE"}' https://data-api.psi.ch/sf/channels | python -m json.tool
```
#### Response
@ -152,7 +152,7 @@ POST https://<host>:<port>/channels/config
#### Command
```bash
curl -H "Content-Type: application/json" -X POST -d '{"regex": "AMPLT|PHASE|CAM"}' https://data-api.psi.ch/sf/channels/config | python -m json.tool
curl -L -H "Content-Type: application/json" -X POST -d '{"regex": "AMPLT|PHASE|CAM"}' https://data-api.psi.ch/sf/channels/config | python -m json.tool
```
#### Response
@ -233,13 +233,13 @@ GET https://<host>:<port>/channel/config/{channel}
#### Command
```bash
curl -H "Content-Type: application/json" -X POST -d '{"name": "SINEG01-RCIR-PUP10:SIG-AMPLT", "backend":"sf-databuffer"}' https://data-api.psi.ch/sf/channel/config | python -m json.tool
curl -L -H "Content-Type: application/json" -X POST -d '{"name": "SINEG01-RCIR-PUP10:SIG-AMPLT", "backend":"sf-databuffer"}' https://data-api.psi.ch/sf/channel/config | python -m json.tool
```
or
```bash
curl -H "Content-Type: application/json" -X GET https://data-api.psi.ch/sf/channel/config/SINEG01-RCIR-PUP10:SIG-AMPLT | python -m json.tool
curl -L -H "Content-Type: application/json" -X GET https://data-api.psi.ch/sf/channel/config/SINEG01-RCIR-PUP10:SIG-AMPLT | python -m json.tool
```
#### Response
@ -302,7 +302,8 @@ A request is performed by sending a valid JSON object in the HTTP request body.
},
"response":{
"format":"json",
"compression":"none"
"compression":"none",
"allowRedirect":true
},
"mapping":{
"incomplete":"provide-as-is"
@ -507,12 +508,14 @@ It is possible to specify the response format the queried data should have.
```json
"response":{
"format":"json",
"compression":"none"
"compression":"none",
"allowRedirect":true
}
```
- **format**: The format of the response (values: **json**|csv). Please note that `csv` does not support `index` and `extrema` aggregations.
- **compression**: Responses can be compressed when transferred from the server (values: **none**|gzip). If compression is enabled, you have to tell `curl` that the data is compressed by defining the attribute `--compressed` so that it decompresses the data automatically.
- **allowRedirect**: Defines it the central query rest server is allowed to redirect queries to the query rest server of the actual backend given that the query allows for it (values: **true**|false). Redirect needs to be enabled in `curl` using the `-L` option.
<a name="value_mapping"/>
@ -989,7 +992,7 @@ curl -H "Content-Type: application/json" -X POST -d '{"eventFields":["pulseId",
##### Command
```bash
curl -H "Content-Type: application/json" -X POST -d '{"response":{"format":"csv"},"range":{"startPulseId":0,"endPulseId":4},"channels":["channel1","channel2"],"eventFields":["channel","pulseId","iocSeconds","globalSeconds","shape","eventCount","value"]}' https://data-api.psi.ch/sf/query
curl -L -H "Content-Type: application/json" -X POST -d '{"response":{"format":"csv"},"range":{"startPulseId":0,"endPulseId":4},"channels":["channel1","channel2"],"eventFields":["channel","pulseId","iocSeconds","globalSeconds","shape","eventCount","value"]}' https://data-api.psi.ch/sf/query
```
##### Response
@ -1465,7 +1468,8 @@ A request is performed by sending a valid JSON object in the HTTP request body.
],
"response":{
"format":"json",
"compression":"none"
"compression":"none",
"allowRedirect":true
}
}
```

View File

@ -429,7 +429,9 @@ public class QueryRestConfig { // extends WebMvcConfigurerAdapter {
final String newDefaultProtocol = "https";
final String newDefaultHost = "data-api.psi.ch";
final int newDefaultPort = 443;
LOGGER.warn("\nSet back to 443!\n#####");
final int newDefaultPort = 8080;
// this should be overwritten
final String newDefaultPath = "";
final List<String> queryBackendStrs = new ArrayList<>(queryBackends.size());

View File

@ -243,27 +243,30 @@ public class QueryRestController implements ApplicationContextAware {
final HttpServletRequest httpRequest,
final HttpServletResponse httpResponse)
throws Throwable {
// Do a redirection if only one backend is requested
//
final Supplier<Collection<ChannelName>> channelsSupplier = () -> query.getChannels();
final String redirect = queryManager.getRedirection(channelsSupplier);
if (redirect != null) {
final String redirectURL = redirect
+ httpRequest.getRequestURI()
+ httpRequest.getQueryString() != null ? StringUtils.QUESTION_MARK + httpRequest.getQueryString()
: StringUtils.EMPTY;
final Response response = query.getResponseOrDefault(defaultResponse);
if (response.isAllowRedirect()) {
// Do a redirection if only one backend is requested
//
final Supplier<Collection<ChannelName>> channelsSupplier = () -> query.getChannels();
final String redirect = queryManager.getRedirection(channelsSupplier);
if (redirect != null) {
final String redirectURL =
redirect
+ httpRequest.getRequestURI()
+ ((httpRequest.getQueryString() != null && StringUtils.NULL_STRING.equals(httpRequest.getQueryString())) ? StringUtils.QUESTION_MARK + httpRequest.getQueryString()
: StringUtils.EMPTY);
LOGGER.info("Send redirect to '{}'.", redirectURL);
httpResponse.setHeader(HttpHeaders.LOCATION, redirectURL);
// use 307 - works for POST too
httpResponse.setStatus(HttpServletResponse.SC_TEMPORARY_REDIRECT);
return;
LOGGER.info("Send redirect to '{}'.", redirectURL);
httpResponse.setHeader(HttpHeaders.LOCATION, redirectURL);
// use 307 - works for POST too
httpResponse.setStatus(HttpServletResponse.SC_TEMPORARY_REDIRECT);
return;
}
}
try {
LOGGER.debug("Executing query '{}'", query);
final Response response = query.getResponseOrDefault(defaultResponse);
if (response instanceof AbstractHTTPResponse) {
LOGGER.debug("Executing config query '{}'", query);
final AbstractHTTPResponse httpRes = ((AbstractHTTPResponse) response);
@ -421,29 +424,32 @@ public class QueryRestController implements ApplicationContextAware {
@RequestBody @Valid final DAQQueries queries,
final HttpServletRequest httpRequest,
final HttpServletResponse httpResponse) throws Exception {
// Do a redirection if only one backend is requested
//
final Supplier<Collection<ChannelName>> channelsSupplier = () -> queries.getQueries().stream()
.flatMap(daqQuery -> daqQuery.getChannels().stream())
.collect(Collectors.toList());
final String redirect = queryManager.getRedirection(channelsSupplier);
if (redirect != null) {
final String redirectURL = redirect
+ httpRequest.getRequestURI()
+ httpRequest.getQueryString() != null ? StringUtils.QUESTION_MARK + httpRequest.getQueryString()
: StringUtils.EMPTY;
final Response response = queries.getResponseOrDefault(defaultResponse);
if (response.isAllowRedirect()) {
// Do a redirection if only one backend is requested
//
final Supplier<Collection<ChannelName>> channelsSupplier = () -> queries.getQueries().stream()
.flatMap(daqQuery -> daqQuery.getChannels().stream())
.collect(Collectors.toList());
final String redirect = queryManager.getRedirection(channelsSupplier);
if (redirect != null) {
final String redirectURL =
redirect
+ httpRequest.getRequestURI()
+ ((httpRequest.getQueryString() != null && StringUtils.NULL_STRING.equals(httpRequest.getQueryString())) ? StringUtils.QUESTION_MARK + httpRequest.getQueryString()
: StringUtils.EMPTY);
LOGGER.info("Send redirect to '{}'.", redirectURL);
httpResponse.setHeader(HttpHeaders.LOCATION, redirectURL);
// use 307 - works for POST too
httpResponse.setStatus(HttpServletResponse.SC_TEMPORARY_REDIRECT);
return;
LOGGER.info("Send redirect to '{}'.", redirectURL);
httpResponse.setHeader(HttpHeaders.LOCATION, redirectURL);
// use 307 - works for POST too
httpResponse.setStatus(HttpServletResponse.SC_TEMPORARY_REDIRECT);
return;
}
}
try {
LOGGER.debug("Executing queries '{}'", queries);
final Response response = queries.getResponseOrDefault(defaultResponse);
if (response instanceof AbstractHTTPResponse) {
LOGGER.debug("Executing query '{}'", queries);
final AbstractHTTPResponse httpRes = ((AbstractHTTPResponse) response);

View File

@ -0,0 +1,186 @@
package ch.psi.daq.queryrest.query;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.commons.lang3.tuple.Triple;
import ch.psi.daq.common.tuple.Quadruple;
import ch.psi.daq.domain.DataEvent;
import ch.psi.daq.domain.backend.Backend;
import ch.psi.daq.domain.events.ChannelConfiguration;
import ch.psi.daq.domain.json.ChannelName;
import ch.psi.daq.domain.query.DAQConfigQuery;
import ch.psi.daq.domain.query.DAQConfigQueryElement;
import ch.psi.daq.domain.query.DAQQueries;
import ch.psi.daq.domain.query.DAQQueryElement;
import ch.psi.daq.domain.query.backend.BackendQuery;
import ch.psi.daq.domain.query.backend.BackendQueryImpl;
import ch.psi.daq.domain.query.backend.analyzer.BackendQueryAnalyzer;
import ch.psi.daq.domain.query.channels.BackendsChannelConfigurationCache;
import ch.psi.daq.domain.query.channels.ChannelConfigurationsRequest;
import ch.psi.daq.domain.query.channels.ChannelConfigurationsResponse;
import ch.psi.daq.domain.query.channels.ChannelsRequest;
import ch.psi.daq.domain.query.channels.ChannelsResponse;
import ch.psi.daq.domain.query.channels.LongHash;
import ch.psi.daq.domain.query.processor.QueryProcessor;
public abstract class AbstractQueryManager implements QueryManager {
private Set<Backend> activeBackends;
private BackendsChannelConfigurationCache channelsCache;
private Function<BackendQuery, BackendQueryAnalyzer> queryAnalizerFactory;
protected void init(
final Set<Backend> activeBackends,
final BackendsChannelConfigurationCache channelsCache,
final Function<BackendQuery, BackendQueryAnalyzer> queryAnalizerFactory) {
this.activeBackends = activeBackends;
this.channelsCache = channelsCache;
this.queryAnalizerFactory = queryAnalizerFactory;
}
protected Set<Backend> getActiveBackends() {
return activeBackends;
}
protected BackendsChannelConfigurationCache getConfigurationCache() {
return channelsCache;
}
protected Function<BackendQuery, BackendQueryAnalyzer> getQueryAnalizerFactory() {
return queryAnalizerFactory;
}
@Override
public Stream<Backend> getBackends() {
return Backend.getBackends().stream()
.filter(backend -> activeBackends.contains(backend));
}
@Override
public LongHash getChannelsHash() {
return channelsCache.getChannelsHash();
}
@Override
public Stream<ChannelsResponse> getChannels(
final ChannelsRequest request) {
return channelsCache.getChannels(request);
}
@Override
public LongHash getChannelConfigurationsHash() {
return channelsCache.getChannelConfigurationsHash();
}
@Override
public Stream<ChannelConfigurationsResponse> getChannelConfigurations(
final ChannelConfigurationsRequest request) {
return channelsCache.getChannelConfigurations(request);
}
@Override
public ChannelConfiguration getChannelConfiguration(
final ChannelName channel) {
return channelsCache.getChannelConfiguration(channel);
}
@Override
public Entry<DAQConfigQueryElement, Stream<Triple<BackendQuery, ChannelName, ?>>> queryConfigs(
final DAQConfigQuery daqQuery) {
// set backends if not defined yet
channelsCache.configureBackends(daqQuery.getChannels());
Stream<Triple<BackendQuery, ChannelName, ?>> resultStreams =
BackendQueryImpl
.getBackendQueries(daqQuery)
.stream()
.filter(
query -> query.getBackend().getBackendAccess().hasDataReader())
.flatMap(
query -> {
/* all the magic happens here */
final Map<String, Stream<? extends ChannelConfiguration>> channelToConfig =
query.getChannelConfigurations();
return channelToConfig.entrySet().stream()
.map(entry -> {
return Triple.of(
query,
new ChannelName(entry.getKey(), query.getBackend()),
entry.getValue());
});
});
return Pair.of(daqQuery, resultStreams);
}
@Override
public List<Entry<DAQQueryElement, Stream<Quadruple<BackendQuery, ChannelName, ?, ?>>>> queryEvents(
final DAQQueries queries) {
// set backends if not defined yet
for (DAQQueryElement daqQuery : queries) {
channelsCache.configureBackends(daqQuery.getChannels());
}
final List<Entry<DAQQueryElement, Stream<Quadruple<BackendQuery, ChannelName, ?, ?>>>> results =
new ArrayList<>(queries.getQueries().size());
for (final DAQQueryElement queryElement : queries) {
Stream<Quadruple<BackendQuery, ChannelName, ?, ?>> resultStreams =
BackendQueryImpl
.getBackendQueries(queryElement)
.stream()
.filter(
query -> {
return query.getBackend().getBackendAccess().hasDataReader()
&& query.getBackend().getBackendAccess().hasQueryProcessor();})
.flatMap(
query -> {
final QueryProcessor processor =
query.getBackend().getBackendAccess().getQueryProcessor();
final BackendQueryAnalyzer queryAnalizer = queryAnalizerFactory.apply(query);
// ChannelEvent query
/* all the magic happens here */
final Stream<Entry<ChannelName, Stream<? extends DataEvent>>> channelToDataEvents =
processor.process(queryAnalizer);
/* do post-process */
final Stream<Entry<ChannelName, ?>> channelToData =
queryAnalizer.postProcess(channelToDataEvents);
// ChannelConfig query
final BackendQuery configQuery =
new BackendQueryImpl(query, queryElement.getConfigFields());
final Map<String, Stream<? extends ChannelConfiguration>> channelToConfig =
configQuery.getChannelConfigurations();
return channelToData.map(entry -> {
return Quadruple.of(
query,
entry.getKey(),
channelToConfig.get(entry.getKey().getName()),
entry.getValue());
});
});
// Now we have a stream that loads elements sequential BackendQuery by BackendQuery.
// By materializing the outer Stream the elements of all BackendQuery are loaded async
// (speeds things up but requires also more memory - i.e. it relies on Backends not loading
// all elements into memory at once)
resultStreams = resultStreams.collect(Collectors.toList()).stream();
results.add(Pair.of(queryElement, resultStreams));
}
return results;
}
}

View File

@ -1,52 +1,27 @@
package ch.psi.daq.queryrest.query;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.PreDestroy;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.commons.lang3.tuple.Triple;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import ch.psi.daq.common.tuple.Quadruple;
import ch.psi.daq.domain.DataEvent;
import ch.psi.daq.domain.backend.Backend;
import ch.psi.daq.domain.config.DomainConfig;
import ch.psi.daq.domain.events.ChannelConfiguration;
import ch.psi.daq.domain.json.ChannelName;
import ch.psi.daq.domain.query.DAQConfigQuery;
import ch.psi.daq.domain.query.DAQConfigQueryElement;
import ch.psi.daq.domain.query.DAQQueries;
import ch.psi.daq.domain.query.DAQQueryElement;
import ch.psi.daq.domain.query.backend.BackendQuery;
import ch.psi.daq.domain.query.backend.BackendQueryImpl;
import ch.psi.daq.domain.query.backend.analyzer.BackendQueryAnalyzer;
import ch.psi.daq.domain.query.channels.BackendsChannelConfigurationCache;
import ch.psi.daq.domain.query.channels.ChannelConfigurationsRequest;
import ch.psi.daq.domain.query.channels.ChannelConfigurationsResponse;
import ch.psi.daq.domain.query.channels.ChannelsRequest;
import ch.psi.daq.domain.query.channels.ChannelsResponse;
import ch.psi.daq.domain.query.channels.LongHash;
import ch.psi.daq.domain.query.processor.QueryProcessor;
import ch.psi.daq.query.config.QueryConfig;
import ch.psi.daq.queryrest.config.QueryRestConfig;
public class QueryManagerImpl implements QueryManager, ApplicationContextAware {
public class QueryManagerImpl extends AbstractQueryManager implements ApplicationContextAware {
public static final String QUERY_SERVER_TYPE = "local";
private Set<Backend> activeBackends;
private BackendsChannelConfigurationCache channelsCache;
private Function<BackendQuery, BackendQueryAnalyzer> queryAnalizerFactory;
@SuppressWarnings("unchecked")
@Override
@ -54,142 +29,20 @@ public class QueryManagerImpl implements QueryManager, ApplicationContextAware {
final Backend backend = context.getBean(DomainConfig.BEAN_NAME_BACKEND_DEFAULT, Backend.class);
context = backend.getApplicationContext();
activeBackends = context.getBean(DomainConfig.BEAN_NAME_BACKENDS_ACTIVE, Set.class);
channelsCache =
final Set<Backend> activeBackends = context.getBean(DomainConfig.BEAN_NAME_BACKENDS_ACTIVE, Set.class);
final BackendsChannelConfigurationCache channelsCache =
context.getBean(QueryConfig.BEAN_NAME_HISTORIC_CHANNELS_CACHE, BackendsChannelConfigurationCache.class);
queryAnalizerFactory = context.getBean(QueryRestConfig.BEAN_NAME_QUERY_ANALIZER_FACTORY, Function.class);
final Function<BackendQuery, BackendQueryAnalyzer> queryAnalizerFactory =
context.getBean(QueryRestConfig.BEAN_NAME_QUERY_ANALIZER_FACTORY, Function.class);
init(activeBackends, channelsCache, queryAnalizerFactory);
}
@PreDestroy
public void destroy() {}
@Override
public Stream<Backend> getBackends() {
return Backend.getBackends().stream()
.filter(backend -> activeBackends.contains(backend));
}
@Override
public String getRedirection(final Supplier<Collection<ChannelName>> channels) {
return null;
}
@Override
public LongHash getChannelsHash() {
return channelsCache.getChannelsHash();
}
@Override
public Stream<ChannelsResponse> getChannels(
final ChannelsRequest request) {
return channelsCache.getChannels(request);
}
@Override
public LongHash getChannelConfigurationsHash() {
return channelsCache.getChannelConfigurationsHash();
}
@Override
public Stream<ChannelConfigurationsResponse> getChannelConfigurations(
final ChannelConfigurationsRequest request) {
return channelsCache.getChannelConfigurations(request);
}
@Override
public ChannelConfiguration getChannelConfiguration(
final ChannelName channel) {
return channelsCache.getChannelConfiguration(channel);
}
@Override
public Entry<DAQConfigQueryElement, Stream<Triple<BackendQuery, ChannelName, ?>>> queryConfigs(
final DAQConfigQuery daqQuery) {
// set backends if not defined yet
channelsCache.configureBackends(daqQuery.getChannels());
Stream<Triple<BackendQuery, ChannelName, ?>> resultStreams =
BackendQueryImpl
.getBackendQueries(daqQuery)
.stream()
.filter(
query -> query.getBackend().getBackendAccess().hasStreamEventReader())
.flatMap(
query -> {
/* all the magic happens here */
final Map<String, Stream<? extends ChannelConfiguration>> channelToConfig =
query.getChannelConfigurations();
return channelToConfig.entrySet().stream()
.map(entry -> {
return Triple.of(
query,
new ChannelName(entry.getKey(), query.getBackend()),
entry.getValue());
});
});
return Pair.of(daqQuery, resultStreams);
}
@Override
public List<Entry<DAQQueryElement, Stream<Quadruple<BackendQuery, ChannelName, ?, ?>>>> queryEvents(
final DAQQueries queries) {
// set backends if not defined yet
for (DAQQueryElement daqQuery : queries) {
channelsCache.configureBackends(daqQuery.getChannels());
}
final List<Entry<DAQQueryElement, Stream<Quadruple<BackendQuery, ChannelName, ?, ?>>>> results =
new ArrayList<>(queries.getQueries().size());
for (final DAQQueryElement queryElement : queries) {
Stream<Quadruple<BackendQuery, ChannelName, ?, ?>> resultStreams =
BackendQueryImpl
.getBackendQueries(queryElement)
.stream()
.filter(
query -> query.getBackend().getBackendAccess().hasDataReader()
&& query.getBackend().getBackendAccess().hasQueryProcessor())
.flatMap(
query -> {
final QueryProcessor processor =
query.getBackend().getBackendAccess().getQueryProcessor();
final BackendQueryAnalyzer queryAnalizer = queryAnalizerFactory.apply(query);
// ChannelEvent query
/* all the magic happens here */
final Stream<Entry<ChannelName, Stream<? extends DataEvent>>> channelToDataEvents =
processor.process(queryAnalizer);
/* do post-process */
final Stream<Entry<ChannelName, ?>> channelToData =
queryAnalizer.postProcess(channelToDataEvents);
// ChannelConfig query
final BackendQuery configQuery =
new BackendQueryImpl(query, queryElement.getConfigFields());
final Map<String, Stream<? extends ChannelConfiguration>> channelToConfig =
configQuery.getChannelConfigurations();
return channelToData.map(entry -> {
return Quadruple.of(
query,
entry.getKey(),
channelToConfig.get(entry.getKey().getName()),
entry.getValue());
});
});
// Now we have a stream that loads elements sequential BackendQuery by BackendQuery.
// By materializing the outer Stream the elements of all BackendQuery are loaded async
// (speeds things up but requires also more memory - i.e. it relies on Backends not loading
// all elements into memory at once)
resultStreams = resultStreams.collect(Collectors.toList()).stream();
results.add(Pair.of(queryElement, resultStreams));
}
return results;
}
}

View File

@ -14,39 +14,42 @@ import java.util.stream.Stream;
import javax.annotation.PreDestroy;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.commons.lang3.tuple.Triple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import ch.psi.daq.common.tuple.Quadruple;
import ch.psi.daq.domain.DataEvent;
import ch.psi.daq.domain.StreamEvent;
import ch.psi.daq.domain.backend.Backend;
import ch.psi.daq.domain.events.ChannelConfiguration;
import ch.psi.daq.domain.json.ChannelName;
import ch.psi.daq.domain.query.DAQConfigQuery;
import ch.psi.daq.domain.query.DAQConfigQueryElement;
import ch.psi.daq.domain.query.DAQQueries;
import ch.psi.daq.domain.query.DAQQuery;
import ch.psi.daq.domain.query.DAQQueryElement;
import ch.psi.daq.domain.query.backend.BackendQuery;
import ch.psi.daq.domain.query.backend.BackendQueryImpl;
import ch.psi.daq.domain.query.backend.analyzer.BackendQueryAnalyzer;
import ch.psi.daq.domain.query.channels.BackendsChannelConfigurationCache;
import ch.psi.daq.domain.query.channels.ChannelConfigurationLoader;
import ch.psi.daq.domain.query.channels.ChannelConfigurationsRequest;
import ch.psi.daq.domain.query.channels.ChannelConfigurationsResponse;
import ch.psi.daq.domain.query.channels.ChannelsRequest;
import ch.psi.daq.domain.query.channels.ChannelsResponse;
import ch.psi.daq.domain.query.channels.LongHash;
import ch.psi.daq.domain.query.processor.QueryProcessor;
import ch.psi.daq.domain.query.mapping.IncompleteStrategy;
import ch.psi.daq.domain.query.response.Response;
import ch.psi.daq.domain.query.response.ResponseFormat;
import ch.psi.daq.domain.query.response.ResponseImpl;
import ch.psi.daq.domain.rest.RestHelper;
import ch.psi.daq.query.config.QueryConfig;
import ch.psi.daq.queryrest.config.QueryRestConfig;
public class QueryManagerRemote implements QueryManager, ApplicationContextAware {
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public class QueryManagerRemote extends AbstractQueryManager implements ApplicationContextAware {
private static final Logger LOGGER = LoggerFactory.getLogger(QueryManagerRemote.class);
public static final String QUERY_SERVER_TYPE = "remote";
private Map<Backend, String> backendToServerAddresses;
private BackendsChannelConfigurationCache channelsCache;
private Function<BackendQuery, BackendQueryAnalyzer> queryAnalizerFactory;
@SuppressWarnings("unchecked")
@Override
@ -61,27 +64,31 @@ public class QueryManagerRemote implements QueryManager, ApplicationContextAware
return null;
}
};
channelsCache = new BackendsChannelConfigurationCache(loaderProvider, reloadPeriodMillis);
final BackendsChannelConfigurationCache channelsCache =
new BackendsChannelConfigurationCache(loaderProvider, reloadPeriodMillis);
channelsCache.init(
context,
backendToServerAddresses.keySet().iterator().next(),
backendToServerAddresses.keySet());
queryAnalizerFactory = context.getBean(QueryRestConfig.BEAN_NAME_QUERY_ANALIZER_FACTORY, Function.class);
final Function<BackendQuery, BackendQueryAnalyzer> queryAnalizerFactory =
context.getBean(QueryRestConfig.BEAN_NAME_QUERY_ANALIZER_FACTORY, Function.class);
init(backendToServerAddresses.keySet(), channelsCache, queryAnalizerFactory);
}
@PreDestroy
public void destroy() {}
@Override
public Stream<Backend> getBackends() {
return backendToServerAddresses.keySet().stream();
}
// @Override
// public Stream<Backend> getBackends() {
// return backendToServerAddresses.keySet().stream();
// }
@Override
public String getRedirection(final Supplier<Collection<ChannelName>> channelsSupplier) {
// set backends if not defined yet
final Collection<ChannelName> channels = channelsSupplier.get();
channelsCache.configureBackends(channels);
getConfigurationCache().configureBackends(channels);
final Set<String> backendRoots = channels.stream()
.map(channelName -> backendToServerAddresses.get(channelName.getBackend()))
@ -95,93 +102,85 @@ public class QueryManagerRemote implements QueryManager, ApplicationContextAware
}
}
@Override
public LongHash getChannelsHash() {
return channelsCache.getChannelsHash();
}
@Override
public Stream<ChannelsResponse> getChannels(
final ChannelsRequest request) {
return channelsCache.getChannels(request);
}
@Override
public LongHash getChannelConfigurationsHash() {
return channelsCache.getChannelConfigurationsHash();
}
@Override
public Stream<ChannelConfigurationsResponse> getChannelConfigurations(
final ChannelConfigurationsRequest request) {
return channelsCache.getChannelConfigurations(request);
}
@Override
public ChannelConfiguration getChannelConfiguration(
final ChannelName channel) {
return channelsCache.getChannelConfiguration(channel);
}
@Override
public Entry<DAQConfigQueryElement, Stream<Triple<BackendQuery, ChannelName, ?>>> queryConfigs(
final DAQConfigQuery daqQuery) {
// set backends if not defined yet
channelsCache.configureBackends(daqQuery.getChannels());
Stream<Triple<BackendQuery, ChannelName, ?>> resultStreams =
BackendQueryImpl
.getBackendQueries(daqQuery)
.stream()
.filter(
query -> query.getBackend().getBackendAccess().hasStreamEventReader())
.flatMap(
query -> {
/* all the magic happens here */
final Map<String, Stream<? extends ChannelConfiguration>> channelToConfig =
query.getChannelConfigurations();
return channelToConfig.entrySet().stream()
.map(entry -> {
return Triple.of(
query,
new ChannelName(entry.getKey(), query.getBackend()),
entry.getValue());
});
});
return Pair.of(daqQuery, resultStreams);
}
@Override
public List<Entry<DAQQueryElement, Stream<Quadruple<BackendQuery, ChannelName, ?, ?>>>> queryEvents(
final DAQQueries queries) {
// INFO: It is always an option to call super.queryEvents(queries);
// The super call will use QueryRestStreamEventReader. However this will be slower because
// more data is transfered (aggregation info for e.g. index/bin is not available on
// QueryRestStreamEventReader level).
// set backends if not defined yet
for (DAQQueryElement daqQuery : queries) {
channelsCache.configureBackends(daqQuery.getChannels());
getConfigurationCache().configureBackends(daqQuery.getChannels());
}
final List<Entry<DAQQueryElement, Stream<Quadruple<BackendQuery, ChannelName, ?, ?>>>> results =
new ArrayList<>(queries.getQueries().size());
// TODO: consider rawevent -> ???
final Response response = new ResponseImpl(ResponseFormat.JSON);
for (final DAQQueryElement queryElement : queries) {
Stream<Quadruple<BackendQuery, ChannelName, ?, ?>> resultStreams =
BackendQueryImpl
.getBackendQueries(queryElement)
.stream()
.filter(
query -> query.getBackend().getBackendAccess().hasDataReader()
&& query.getBackend().getBackendAccess().hasQueryProcessor())
.flatMap(
query -> {
final QueryProcessor processor =
query.getBackend().getBackendAccess().getQueryProcessor();
final BackendQueryAnalyzer queryAnalizer = queryAnalizerFactory.apply(query);
final String queryServer = backendToServerAddresses.get(query.getBackend());
// ChannelEvent query
/* all the magic happens here */
final Stream<Entry<ChannelName, Stream<? extends DataEvent>>> channelToDataEvents =
processor.process(queryAnalizer);
query.getChannels()
.stream()
.map(channel -> {
if (queryServer == null) {
LOGGER.warn(
"There is no query server defined for '{}' of '{}'. Provide empty stream.",
channel, query.getBackend());
return Pair.of(new ChannelName(channel, query.getBackend()),
Stream.empty());
} else {
final ChannelName channelName =
new ChannelName(channel, query.getBackend());
final DAQQuery daqQuery =
new DAQQuery(queryElement, response, channelName);
final Stream<StreamEvent> events;
if (daqQuery.getMapping() != null) {
daqQuery.getMapping().setIncomplete(IncompleteStrategy.FILL_NULL);
events = RestHelper.queryEventsTableAsync(
query.getBackend().getApplicationContext(), queryServer,
daqQuery)
.onErrorResume(thrw -> {
LOGGER.warn(
"Could not query '{}' for '{}' of '{}' as table. Provide empty stream.",
queryServer, channel, query.getBackend(), thrw);
return Mono.empty();
})
.flatMapMany(eventTable -> Flux
.fromStream(eventTable.getEventsOfColumn(0)))
.toStream();
} else {
events = RestHelper
.queryEventsAsync(query.getBackend().getApplicationContext(),
queryServer, daqQuery)
.onErrorResume(thrw -> {
LOGGER.warn(
"Could not query '{}' for '{}' of '{}'. Provide empty stream.",
queryServer, channel, query.getBackend(), thrw);
return Flux.empty();
})
.flatMap(channelEvents -> Flux
.fromStream(channelEvents.getEvents()))
.toStream();
}
return Pair.of(channelName, events);
}
});
final BackendQueryAnalyzer queryAnalizer = getQueryAnalizerFactory().apply(query);
/* do post-process */
final Stream<Entry<ChannelName, ?>> channelToData =
queryAnalizer.postProcess(channelToDataEvents);
@ -212,5 +211,4 @@ public class QueryManagerRemote implements QueryManager, ApplicationContextAware
return results;
}
}

View File

@ -0,0 +1,3 @@
query.server.type=remote
query.processor.type=local
query.server.addresses=[{"protocol":"http","host":"localhost","port":8081,"path":""}]

View File

@ -14,7 +14,7 @@ queryrest.response.fields.config.historic=name,backend,type,shape,source,descrip
##########################################
# defines the query server type "local" for local backend access and "remote" for remote backend access through REST calls
query.server.type=local
# defines REST backends (for query.server.type=remote
# defines REST backends (for query.server.type=remote)
query.server.addresses=[{"path":"/sf"},{"path":"/gls"},{"path":"/hipa"},{"path":"/saresa"},{"path":"/saresb"}]
# defines if the writer is a local writer (can write data to filesystem)

View File

@ -25,10 +25,10 @@ import ch.psi.daq.domain.backend.BackendType;
import ch.psi.daq.domain.config.DomainConfig;
import ch.psi.daq.domain.events.ChannelEvent;
import ch.psi.daq.domain.query.processor.QueryProcessor;
import ch.psi.daq.domain.query.processor.QueryProcessorLocal;
import ch.psi.daq.domain.reader.StreamEventReader;
import ch.psi.daq.domain.test.reader.TestReader;
import ch.psi.daq.query.config.QueryConfig;
import ch.psi.daq.query.processor.QueryProcessorLocal;
import ch.psi.daq.queryrest.config.QueryRestConfig;
@Configuration

View File

@ -83,7 +83,7 @@ public abstract class AbstractQueryRestControllerTableTest extends AbstractDaqRe
ChannelEventTableImpl table = getResponseMapper().readValue(responseBytes, ChannelEventTableImpl.class);
assertEquals(2, table.size());
List<DataEvent> events = table.getEvents(0).collect(Collectors.toList());
List<DataEvent> events = table.getEventsOfRow(0).collect(Collectors.toList());
assertEquals(2, events.size());
DataEvent event = events.get(0);
assertEquals(TEST_CHANNEL_01, event.getChannel());
@ -100,7 +100,7 @@ public abstract class AbstractQueryRestControllerTableTest extends AbstractDaqRe
assertEquals(new Time(1, 0), event.getIocTime());
assertEquals(100, event.getValue(Number.class).longValue());
events = table.getEvents(1).collect(Collectors.toList());
events = table.getEventsOfRow(1).collect(Collectors.toList());
assertEquals(2, events.size());
event = events.get(0);
assertEquals(TEST_CHANNEL_01, event.getChannel());

View File

@ -1,9 +1,16 @@
package ch.psi.daq.test.queryrest.query;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Resource;
@ -13,26 +20,534 @@ import org.junit.Before;
import org.junit.Test;
import org.springframework.context.ApplicationContext;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import ch.psi.bsread.message.Type;
import ch.psi.daq.common.concurrent.LoggingMapMergeFunction;
import ch.psi.daq.common.statistic.CombinableStatistics;
import ch.psi.daq.common.statistic.Statistics;
import ch.psi.daq.common.statistic.StorelessStatistics;
import ch.psi.daq.common.time.TimeUtils;
import ch.psi.daq.domain.StreamEvent;
import ch.psi.daq.domain.backend.Backend;
import ch.psi.daq.domain.config.DomainConfig;
import ch.psi.daq.domain.events.ChannelConfiguration;
import ch.psi.daq.domain.events.impl.ChannelConfigurationImpl;
import ch.psi.daq.domain.json.ChannelEventTable;
import ch.psi.daq.domain.json.ChannelEvents;
import ch.psi.daq.domain.json.ChannelName;
import ch.psi.daq.domain.json.Event;
import ch.psi.daq.domain.query.DAQConfigQuery;
import ch.psi.daq.domain.query.DAQQuery;
import ch.psi.daq.domain.query.channels.ChannelsRequest;
import ch.psi.daq.domain.query.mapping.Mapping;
import ch.psi.daq.domain.query.operation.Aggregation;
import ch.psi.daq.domain.query.operation.AggregationDescriptor;
import ch.psi.daq.domain.query.operation.AggregationType;
import ch.psi.daq.domain.request.range.RequestRangeTime;
import ch.psi.daq.domain.rest.RestHelper;
import ch.psi.daq.queryrest.config.QueryRestConfig;
import ch.psi.daq.queryrest.query.QueryManager;
import ch.psi.daq.test.queryrest.AbstractDaqRestTest;
import ch.psi.data.converters.ByteConverter;
public class QueryManagerRemoteTest extends AbstractDaqRestTest {
private final static String DATA_BUFFER = "daqlocal";// "sf-databuffer";
@Resource
private ApplicationContext context;
private QueryManager queryManager;
private Map<Backend, String> backendToServerAddresses;
private Backend queryBackend;
private String queryServer;
@SuppressWarnings("unchecked")
@Before
public void setUp() throws Exception {
queryManager = context.getBean(QueryRestConfig.BEAN_NAME_QUERY_MANAGER_REMOTE, QueryManager.class);
backendToServerAddresses =
context.getBean(QueryRestConfig.BEAN_NAME_QUERY_SERVER_BACKENDS, Map.class);
queryBackend = backendToServerAddresses.keySet().stream()
.filter(backend -> DATA_BUFFER.equals(backend.getName()))
.findFirst()
.orElse(null);
queryServer = backendToServerAddresses.get(queryBackend);
}
@After
public void tearDown() {}
@Test
public void testStatisticsJSON_01() throws Exception {
final ObjectMapper objectMapper = context.getBean(DomainConfig.BEAN_NAME_OBJECT_MAPPER, ObjectMapper.class);
final StorelessStatistics stats1 = new StorelessStatistics(3);
stats1.add(5);
final StorelessStatistics stats2 = new StorelessStatistics(3);
stats2.add(5);
stats2.add(7);
List<Statistics> statisticsList = Arrays.asList(
stats1,
stats2);
String statsStr = objectMapper.writeValueAsString(
statisticsList
.stream()
.map(stat -> stat.getTyped()));
List<Statistics> statisticsListDes = objectMapper.readValue(statsStr, new TypeReference<List<Statistics>>() {});
assertEquals(statisticsList, statisticsListDes);
List<CombinableStatistics> combStatisticsListDes =
objectMapper.readValue(statsStr, new TypeReference<List<CombinableStatistics>>() {});
assertEquals(statisticsList, combStatisticsListDes);
}
@Test
public void testConfigQuery_01() throws Exception {
assertNotNull(queryBackend);
assertNotNull(queryServer);
String channel = "SINEG01-RCIR-PUP10:SIG-AMPLT-AVG";
DAQConfigQuery query = new DAQConfigQuery(
new RequestRangeTime(
TimeUtils.parse("2016-10-12T14:00"),
TimeUtils.parse("2018-12-03T08:00")),
new ChannelName(channel, queryBackend));
List<ChannelConfiguration> configs = RestHelper.queryConfigs(context, queryServer, query)
.stream()
.flatMap(channelConfigs -> channelConfigs.getConfigs())
.collect(Collectors.toList());
assertTrue("Size was " + configs.size(), configs.size() > 0);
assertEquals(ChannelConfigurationImpl.class, configs.get(0).getClass());
assertEquals(channel, configs.get(0).getChannel());
assertEquals(queryBackend, configs.get(0).getBackend());
}
@Test
public void testEventQueryRaw_01() throws Exception {
assertNotNull(queryBackend);
assertNotNull(queryServer);
String scalarChannel = "SINEG01-RCIR-PUP10:SIG-AMPLT-AVG";
String waveformChannel = "SINEG01-RCIR-PUP10:SIG-AMPLT";
DAQQuery query = new DAQQuery(
new RequestRangeTime(
TimeUtils.parse("2016-10-12T14:00"),
TimeUtils.parse("2016-10-12T16:00")),
new ChannelName(scalarChannel, queryBackend),
new ChannelName(waveformChannel, queryBackend));
final List<? extends ChannelEvents> eventsList = RestHelper.queryEvents(context, queryServer, query);
assertEquals("Size was " + eventsList.size(), 2, eventsList.size());
List<StreamEvent> events = eventsList.get(0)
.getEvents()
.collect(Collectors.toList());
assertEquals("Size was " + events.size(), 11, events.size());
StreamEvent event = events.get(0);
assertEquals(Event.class, event.getClass());
assertEquals(queryBackend, event.getBackend());
assertEquals(scalarChannel, event.getChannel());
assertEquals(1, event.getDoubleStream(ByteConverter.PARALLELISM_FALSE).count());
assertEquals(1, event.getEventCount());
assertEquals("2016-10-12T14:07:09.914760000+02:00", event.getGlobalDate());
assertEquals("1970-01-01T01:00:00.000000000+01:00", event.getIocDate());
assertEquals(638760000, event.getPulseId());
assertArrayEquals(new int[] {1}, event.getShape());
assertEquals(1, event.getStatistics().getCount());
assertEquals(Type.Float64.getKey(), event.getType());
assertEquals("9.430576E-5", event.getValueAsString());
events = eventsList.get(1)
.getEvents()
.collect(Collectors.toList());
assertEquals("Size was " + events.size(), 11, events.size());
event = events.get(0);
assertEquals(Event.class, event.getClass());
assertEquals(queryBackend, event.getBackend());
assertEquals(waveformChannel, event.getChannel());
assertEquals(2048, event.getDoubleStream(ByteConverter.PARALLELISM_FALSE).count());
assertEquals(1, event.getEventCount());
assertEquals("2016-10-12T14:07:09.914760000+02:00", event.getGlobalDate());
assertEquals("1970-01-01T01:00:00.000000000+01:00", event.getIocDate());
assertEquals(638760000, event.getPulseId());
assertArrayEquals(new int[] {2048}, event.getShape());
assertEquals(2048, event.getStatistics().getCount());
assertEquals(Type.Int32.getKey(), event.getType());
assertEquals(
"[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2, 3, 3, 4, 6, 6, 8, 6, 5, 3, 3, 3, 8, 12, 14... shape [2048]]",
event.getValueAsString(30));
}
@Test
public void testEventQueryAggregated_01() throws Exception {
assertNotNull(queryBackend);
assertNotNull(queryServer);
String scalarChannel = "SINEG01-RCIR-PUP10:SIG-AMPLT-AVG";
String waveformChannel = "SINEG01-RCIR-PUP10:SIG-AMPLT";
DAQQuery query = new DAQQuery(
new RequestRangeTime(
TimeUtils.parse("2016-10-12T14:00"),
TimeUtils.parse("2016-10-12T16:00")),
new ChannelName(scalarChannel, queryBackend),
new ChannelName(waveformChannel, queryBackend));
query.setAggregation(new AggregationDescriptor(AggregationType.value)
.setAggregations(Arrays.asList(Aggregation.typed)));
final List<? extends ChannelEvents> eventsList = RestHelper.queryEvents(context, queryServer, query);
assertEquals("Size was " + eventsList.size(), 2, eventsList.size());
List<StreamEvent> events = eventsList.get(0)
.getEvents()
.collect(Collectors.toList());
assertEquals("Size was " + events.size(), 11, events.size());
StreamEvent event = events.get(0);
assertEquals(Event.class, event.getClass());
assertEquals(queryBackend, event.getBackend());
assertEquals(scalarChannel, event.getChannel());
// assertEquals(1, event.getDoubleStream(ByteConverter.PARALLELISM_FALSE).count());
assertEquals(1, event.getEventCount());
assertEquals("2016-10-12T14:07:09.914760000+02:00", event.getGlobalDate());
assertEquals("1970-01-01T01:00:00.000000000+01:00", event.getIocDate());
assertEquals(638760000, event.getPulseId());
assertArrayEquals(new int[] {1}, event.getShape());
assertEquals(1, event.getStatistics().getCount());
assertNull(event.getType());
assertTrue(event.getValueAsString(30).startsWith("StorelessStatistics"));
events = eventsList.get(1)
.getEvents()
.collect(Collectors.toList());
assertEquals("Size was " + events.size(), 11, events.size());
event = events.get(0);
assertEquals(Event.class, event.getClass());
assertEquals(queryBackend, event.getBackend());
assertEquals(waveformChannel, event.getChannel());
// assertEquals(1, event.getDoubleStream(ByteConverter.PARALLELISM_FALSE).count());
assertEquals(1, event.getEventCount());
assertEquals("2016-10-12T14:07:09.914760000+02:00", event.getGlobalDate());
assertEquals("1970-01-01T01:00:00.000000000+01:00", event.getIocDate());
assertEquals(638760000, event.getPulseId());
assertArrayEquals(new int[] {1}, event.getShape());
assertEquals(2048, event.getStatistics().getCount());
assertNull(event.getType());
assertTrue(event.getValueAsString(30).startsWith("StorelessStatistics"));
}
@Test
public void testEventQueryAggregatedBin_01() throws Exception {
assertNotNull(queryBackend);
assertNotNull(queryServer);
String scalarChannel = "SINEG01-RCIR-PUP10:SIG-AMPLT-AVG";
String waveformChannel = "SINEG01-RCIR-PUP10:SIG-AMPLT";
DAQQuery query = new DAQQuery(
new RequestRangeTime(
TimeUtils.parse("2016-10-12T14:00"),
TimeUtils.parse("2016-10-12T16:00")),
new ChannelName(scalarChannel, queryBackend),
new ChannelName(waveformChannel, queryBackend));
query.setAggregation(new AggregationDescriptor(AggregationType.value)
.setNrOfBins(2)
.setAggregations(Arrays.asList(Aggregation.typed)));
int firstBinEventCount = 6; // 5;
final List<? extends ChannelEvents> eventsList = RestHelper.queryEvents(context, queryServer, query);
assertEquals("Size was " + eventsList.size(), 2, eventsList.size());
List<StreamEvent> events = eventsList.get(0)
.getEvents()
.collect(Collectors.toList());
assertEquals("Size was " + events.size(), 2, events.size());
StreamEvent event = events.get(0);
assertEquals(Event.class, event.getClass());
assertEquals(queryBackend, event.getBackend());
assertEquals(scalarChannel, event.getChannel());
// assertEquals(1, event.getDoubleStream(ByteConverter.PARALLELISM_FALSE).count());
assertEquals(firstBinEventCount, event.getEventCount());
assertEquals("2016-10-12T14:07:09.914760000+02:00", event.getGlobalDate());
assertEquals("1970-01-01T01:00:00.000000000+01:00", event.getIocDate());
assertEquals(638760000, event.getPulseId());
assertArrayEquals(new int[] {1}, event.getShape());
assertEquals(firstBinEventCount, event.getStatistics().getCount());
assertNull(event.getType());
assertTrue(event.getValueAsString(30).startsWith("StorelessStatistics"));
events = eventsList.get(1)
.getEvents()
.collect(Collectors.toList());
assertEquals("Size was " + events.size(), 11, events.size());
event = events.get(0);
assertEquals(Event.class, event.getClass());
assertEquals(queryBackend, event.getBackend());
assertEquals(waveformChannel, event.getChannel());
// assertEquals(1, event.getDoubleStream(ByteConverter.PARALLELISM_FALSE).count());
assertEquals(firstBinEventCount, event.getEventCount());
assertEquals("2016-10-12T14:07:09.914760000+02:00", event.getGlobalDate());
assertEquals("1970-01-01T01:00:00.000000000+01:00", event.getIocDate());
assertEquals(638760000, event.getPulseId());
assertArrayEquals(new int[] {1}, event.getShape());
assertEquals(firstBinEventCount * 2048, event.getStatistics().getCount());
assertNull(event.getType());
assertTrue(event.getValueAsString(30).startsWith("StorelessStatistics"));
}
@Test
public void testEventQueryIndexAggregateBin_01() throws Exception {
assertNotNull(queryBackend);
assertNotNull(queryServer);
String scalarChannel = "SINEG01-RCIR-PUP10:SIG-AMPLT-AVG";
String waveformChannel = "SINEG01-RCIR-PUP10:SIG-AMPLT";
DAQQuery query = new DAQQuery(
new RequestRangeTime(
TimeUtils.parse("2016-10-12T14:00"),
TimeUtils.parse("2016-10-12T16:00")),
new ChannelName(scalarChannel, queryBackend),
new ChannelName(waveformChannel, queryBackend));
query.setAggregation(new AggregationDescriptor(AggregationType.index)
.setNrOfBins(2)
.setAggregations(Arrays.asList(Aggregation.typed)));
final List<? extends ChannelEvents> eventsList = RestHelper.queryEvents(context, queryServer, query);
assertEquals("Size was " + eventsList.size(), 2, eventsList.size());
int firstBinEventCount = 6; // 5;
List<StreamEvent> events = eventsList.get(0)
.getEvents()
.collect(Collectors.toList());
assertEquals("Size was " + events.size(), 11, events.size());
StreamEvent event = events.get(0);
assertEquals(Event.class, event.getClass());
assertEquals(queryBackend, event.getBackend());
assertEquals(scalarChannel, event.getChannel());
// assertEquals(1, event.getDoubleStream(ByteConverter.PARALLELISM_FALSE).count());
assertEquals(firstBinEventCount, event.getEventCount());
assertEquals("2016-10-12T14:07:09.914760000+02:00", event.getGlobalDate());
assertEquals("1970-01-01T01:00:00.000000000+01:00", event.getIocDate());
assertEquals(638760000, event.getPulseId());
assertArrayEquals(new int[] {1}, event.getShape());
assertEquals(firstBinEventCount, event.getStatistics().getCount());
assertEquals(firstBinEventCount, event.getValue(Statistics.class).getCount());
assertNull(event.getType());
assertTrue(event.getValueAsString(30).startsWith("StorelessStatistics"));
events = eventsList.get(1)
.getEvents()
.collect(Collectors.toList());
assertEquals("Size was " + events.size(), 11, events.size());
event = events.get(0);
assertEquals(Event.class, event.getClass());
assertEquals(queryBackend, event.getBackend());
assertEquals(waveformChannel, event.getChannel());
// assertEquals(1, event.getDoubleStream(ByteConverter.PARALLELISM_FALSE).count());
assertEquals(firstBinEventCount, event.getEventCount());
assertEquals("2016-10-12T14:07:09.914760000+02:00", event.getGlobalDate());
assertEquals("1970-01-01T01:00:00.000000000+01:00", event.getIocDate());
assertEquals(638760000, event.getPulseId());
assertArrayEquals(new int[] {2048}, event.getShape());
assertEquals(2048, event.getValue(List.class).size());
assertEquals(firstBinEventCount, ((Statistics) event.getValue(List.class).get(0)).getCount());
assertNull(event.getType());
//
// System.out.println(event.getEventCount());
// System.out.println(event.getGlobalDate());
// System.out.println(event.getIocDate());
// System.out.println(event.getPulseId());
// System.out.println(Arrays.toString(event.getShape()));
// System.out.println(event.getValue(Object.class));
// System.out.println(event.getType());
// System.out.println(event.getValueAsString(30));
}
@Test
public void testEventQueryTableRaw_01() throws Exception {
assertNotNull(queryBackend);
assertNotNull(queryServer);
String scalarChannel = "SINEG01-RCIR-PUP10:SIG-AMPLT-AVG";
String waveformChannel = "SINEG01-RCIR-PUP10:SIG-AMPLT";
DAQQuery query = new DAQQuery(
new RequestRangeTime(
TimeUtils.parse("2016-10-12T14:00"),
TimeUtils.parse("2016-10-12T16:00")),
new ChannelName(scalarChannel, queryBackend),
new ChannelName(waveformChannel, queryBackend));
query.setMapping(new Mapping());
final ChannelEventTable table = RestHelper.queryEventsTable(context, queryServer, query);
assertEquals("Size was " + table.size(), 11, table.size());
final List<Map<String, StreamEvent>> eventsTable = table.getEvents(Collectors.toMap(
event -> event.getChannel(),
Function.identity(),
LoggingMapMergeFunction.getInstance(),
LinkedHashMap::new))
.collect(Collectors.toList());
assertEquals("Size was " + eventsTable.size(), 11, eventsTable.size());
final Map<String, StreamEvent> eventsRow = eventsTable.get(0);
assertEquals("Size was " + eventsRow.size(), 2, eventsRow.size());
assertTrue(eventsRow.containsKey(scalarChannel));
assertTrue(eventsRow.containsKey(waveformChannel));
StreamEvent event = eventsRow.get(scalarChannel);
assertEquals(Event.class, event.getClass());
assertEquals(queryBackend, event.getBackend());
assertEquals(scalarChannel, event.getChannel());
assertEquals(1, event.getDoubleStream(ByteConverter.PARALLELISM_FALSE).count());
assertEquals(1, event.getEventCount());
assertEquals("2016-10-12T14:07:09.914760000+02:00", event.getGlobalDate());
assertEquals("1970-01-01T01:00:00.000000000+01:00", event.getIocDate());
assertEquals(638760000, event.getPulseId());
assertArrayEquals(new int[] {1}, event.getShape());
assertEquals(1, event.getStatistics().getCount());
assertEquals(Type.Float64.getKey(), event.getType());
assertEquals("9.430576E-5", event.getValueAsString());
event = eventsRow.get(waveformChannel);
assertEquals(Event.class, event.getClass());
assertEquals(queryBackend, event.getBackend());
assertEquals(waveformChannel, event.getChannel());
assertEquals(2048, event.getDoubleStream(ByteConverter.PARALLELISM_FALSE).count());
assertEquals(1, event.getEventCount());
assertEquals("2016-10-12T14:07:09.914760000+02:00", event.getGlobalDate());
assertEquals("1970-01-01T01:00:00.000000000+01:00", event.getIocDate());
assertEquals(638760000, event.getPulseId());
assertArrayEquals(new int[] {2048}, event.getShape());
assertEquals(2048, event.getStatistics().getCount());
assertEquals(Type.Int32.getKey(), event.getType());
assertEquals(
"[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2, 3, 3, 4, 6, 6, 8, 6, 5, 3, 3, 3, 8, 12, 14... shape [2048]]",
event.getValueAsString(30));
}
@Test
public void testEventQueryTableAggregated_01() throws Exception {
assertNotNull(queryBackend);
assertNotNull(queryServer);
String scalarChannel = "SINEG01-RCIR-PUP10:SIG-AMPLT-AVG";
String waveformChannel = "SINEG01-RCIR-PUP10:SIG-AMPLT";
DAQQuery query = new DAQQuery(
new RequestRangeTime(
TimeUtils.parse("2016-10-12T14:00"),
TimeUtils.parse("2016-10-12T16:00")),
new ChannelName(scalarChannel, queryBackend),
new ChannelName(waveformChannel, queryBackend));
query.setAggregation(new AggregationDescriptor(AggregationType.value)
.setAggregations(Arrays.asList(Aggregation.typed)));
query.setMapping(new Mapping());
final ChannelEventTable table = RestHelper.queryEventsTable(context, queryServer, query);
assertEquals("Size was " + table.size(), 11, table.size());
final List<Map<String, StreamEvent>> eventsTable = table.getEvents(Collectors.toMap(
event -> event.getChannel(),
Function.identity(),
LoggingMapMergeFunction.getInstance(),
LinkedHashMap::new))
.collect(Collectors.toList());
assertEquals("Size was " + eventsTable.size(), 11, eventsTable.size());
final Map<String, StreamEvent> eventsRow = eventsTable.get(0);
assertEquals("Size was " + eventsRow.size(), 2, eventsRow.size());
assertTrue(eventsRow.containsKey(scalarChannel));
assertTrue(eventsRow.containsKey(waveformChannel));
StreamEvent event = eventsRow.get(scalarChannel);
assertEquals(Event.class, event.getClass());
assertEquals(queryBackend, event.getBackend());
assertEquals(scalarChannel, event.getChannel());
// assertEquals(1, event.getDoubleStream(ByteConverter.PARALLELISM_FALSE).count());
assertEquals(1, event.getEventCount());
assertEquals("2016-10-12T14:07:09.914760000+02:00", event.getGlobalDate());
assertEquals("1970-01-01T01:00:00.000000000+01:00", event.getIocDate());
assertEquals(638760000, event.getPulseId());
assertArrayEquals(new int[] {1}, event.getShape());
assertEquals(1, event.getStatistics().getCount());
assertNull(event.getType());
assertTrue(event.getValueAsString(30).startsWith("StorelessStatistics"));
event = eventsRow.get(waveformChannel);
assertEquals(Event.class, event.getClass());
assertEquals(queryBackend, event.getBackend());
assertEquals(waveformChannel, event.getChannel());
// assertEquals(1, event.getDoubleStream(ByteConverter.PARALLELISM_FALSE).count());
assertEquals(1, event.getEventCount());
assertEquals("2016-10-12T14:07:09.914760000+02:00", event.getGlobalDate());
assertEquals("1970-01-01T01:00:00.000000000+01:00", event.getIocDate());
assertEquals(638760000, event.getPulseId());
assertArrayEquals(new int[] {1}, event.getShape());
assertEquals(2048, event.getStatistics().getCount());
assertNull(event.getType());
assertTrue(event.getValueAsString(30).startsWith("StorelessStatistics"));
}
@Test
public void testEventQueryTableAggregatedBin_01() throws Exception {
assertNotNull(queryBackend);
assertNotNull(queryServer);
String scalarChannel = "SINEG01-RCIR-PUP10:SIG-AMPLT-AVG";
String waveformChannel = "SINEG01-RCIR-PUP10:SIG-AMPLT";
DAQQuery query = new DAQQuery(
new RequestRangeTime(
TimeUtils.parse("2016-10-12T14:00"),
TimeUtils.parse("2016-10-12T16:00")),
new ChannelName(scalarChannel, queryBackend),
new ChannelName(waveformChannel, queryBackend));
query.setAggregation(new AggregationDescriptor(AggregationType.value)
.setNrOfBins(2)
.setAggregations(Arrays.asList(Aggregation.typed)));
query.setMapping(new Mapping());
int firstBinEventCount = 6; // 5;
final ChannelEventTable table = RestHelper.queryEventsTable(context, queryServer, query);
assertEquals("Size was " + table.size(), 2, table.size());
final List<Map<String, StreamEvent>> eventsTable = table.getEvents(Collectors.toMap(
event -> event.getChannel(),
Function.identity(),
LoggingMapMergeFunction.getInstance(),
LinkedHashMap::new))
.collect(Collectors.toList());
assertEquals("Size was " + eventsTable.size(), 2, eventsTable.size());
final Map<String, StreamEvent> eventsRow = eventsTable.get(0);
assertEquals("Size was " + eventsRow.size(), 2, eventsRow.size());
assertTrue(eventsRow.containsKey(scalarChannel));
assertTrue(eventsRow.containsKey(waveformChannel));
StreamEvent event = eventsRow.get(scalarChannel);
assertEquals(Event.class, event.getClass());
assertEquals(queryBackend, event.getBackend());
assertEquals(scalarChannel, event.getChannel());
// assertEquals(1, event.getDoubleStream(ByteConverter.PARALLELISM_FALSE).count());
assertEquals(firstBinEventCount, event.getEventCount());
assertEquals("2016-10-12T14:07:09.914760000+02:00", event.getGlobalDate());
assertEquals("1970-01-01T01:00:00.000000000+01:00", event.getIocDate());
assertEquals(638760000, event.getPulseId());
assertArrayEquals(new int[] {1}, event.getShape());
assertEquals(firstBinEventCount, event.getStatistics().getCount());
assertNull(event.getType());
assertTrue(event.getValueAsString(30).startsWith("StorelessStatistics"));
event = eventsRow.get(waveformChannel);
assertEquals(Event.class, event.getClass());
assertEquals(queryBackend, event.getBackend());
assertEquals(waveformChannel, event.getChannel());
// assertEquals(1, event.getDoubleStream(ByteConverter.PARALLELISM_FALSE).count());
assertEquals(firstBinEventCount, event.getEventCount());
assertEquals("2016-10-12T14:07:09.914760000+02:00", event.getGlobalDate());
assertEquals("1970-01-01T01:00:00.000000000+01:00", event.getIocDate());
assertEquals(638760000, event.getPulseId());
assertArrayEquals(new int[] {1}, event.getShape());
assertEquals(firstBinEventCount * 2048, event.getStatistics().getCount());
assertNull(event.getType());
assertTrue(event.getValueAsString(30).startsWith("StorelessStatistics"));
}
@Test
public void testQueryManager_01() throws Exception {
final List<Backend> backends = queryManager.getBackends().collect(Collectors.toList());