diff --git a/Readme.md b/Readme.md index c698a84..0abb712 100644 --- a/Readme.md +++ b/Readme.md @@ -82,7 +82,7 @@ POST https://:/channels #### Command ```bash -curl -H "Content-Type: application/json" -X POST -d '{"regex": "AMPLT|PHASE"}' https://data-api.psi.ch/sf/channels | python -m json.tool +curl -L -H "Content-Type: application/json" -X POST -d '{"regex": "AMPLT|PHASE"}' https://data-api.psi.ch/sf/channels | python -m json.tool ``` #### Response @@ -152,7 +152,7 @@ POST https://:/channels/config #### Command ```bash -curl -H "Content-Type: application/json" -X POST -d '{"regex": "AMPLT|PHASE|CAM"}' https://data-api.psi.ch/sf/channels/config | python -m json.tool +curl -L -H "Content-Type: application/json" -X POST -d '{"regex": "AMPLT|PHASE|CAM"}' https://data-api.psi.ch/sf/channels/config | python -m json.tool ``` #### Response @@ -233,13 +233,13 @@ GET https://:/channel/config/{channel} #### Command ```bash -curl -H "Content-Type: application/json" -X POST -d '{"name": "SINEG01-RCIR-PUP10:SIG-AMPLT", "backend":"sf-databuffer"}' https://data-api.psi.ch/sf/channel/config | python -m json.tool +curl -L -H "Content-Type: application/json" -X POST -d '{"name": "SINEG01-RCIR-PUP10:SIG-AMPLT", "backend":"sf-databuffer"}' https://data-api.psi.ch/sf/channel/config | python -m json.tool ``` or ```bash -curl -H "Content-Type: application/json" -X GET https://data-api.psi.ch/sf/channel/config/SINEG01-RCIR-PUP10:SIG-AMPLT | python -m json.tool +curl -L -H "Content-Type: application/json" -X GET https://data-api.psi.ch/sf/channel/config/SINEG01-RCIR-PUP10:SIG-AMPLT | python -m json.tool ``` #### Response @@ -302,7 +302,8 @@ A request is performed by sending a valid JSON object in the HTTP request body. }, "response":{ "format":"json", - "compression":"none" + "compression":"none", + "allowRedirect":true }, "mapping":{ "incomplete":"provide-as-is" @@ -507,12 +508,14 @@ It is possible to specify the response format the queried data should have. ```json "response":{ "format":"json", - "compression":"none" + "compression":"none", + "allowRedirect":true } ``` - **format**: The format of the response (values: **json**|csv). Please note that `csv` does not support `index` and `extrema` aggregations. - **compression**: Responses can be compressed when transferred from the server (values: **none**|gzip). If compression is enabled, you have to tell `curl` that the data is compressed by defining the attribute `--compressed` so that it decompresses the data automatically. +- **allowRedirect**: Defines it the central query rest server is allowed to redirect queries to the query rest server of the actual backend given that the query allows for it (values: **true**|false). Redirect needs to be enabled in `curl` using the `-L` option. @@ -989,7 +992,7 @@ curl -H "Content-Type: application/json" -X POST -d '{"eventFields":["pulseId", ##### Command ```bash -curl -H "Content-Type: application/json" -X POST -d '{"response":{"format":"csv"},"range":{"startPulseId":0,"endPulseId":4},"channels":["channel1","channel2"],"eventFields":["channel","pulseId","iocSeconds","globalSeconds","shape","eventCount","value"]}' https://data-api.psi.ch/sf/query +curl -L -H "Content-Type: application/json" -X POST -d '{"response":{"format":"csv"},"range":{"startPulseId":0,"endPulseId":4},"channels":["channel1","channel2"],"eventFields":["channel","pulseId","iocSeconds","globalSeconds","shape","eventCount","value"]}' https://data-api.psi.ch/sf/query ``` ##### Response @@ -1465,7 +1468,8 @@ A request is performed by sending a valid JSON object in the HTTP request body. ], "response":{ "format":"json", - "compression":"none" + "compression":"none", + "allowRedirect":true } } ``` diff --git a/src/main/java/ch/psi/daq/queryrest/config/QueryRestConfig.java b/src/main/java/ch/psi/daq/queryrest/config/QueryRestConfig.java index 9bab83a..3fca01e 100644 --- a/src/main/java/ch/psi/daq/queryrest/config/QueryRestConfig.java +++ b/src/main/java/ch/psi/daq/queryrest/config/QueryRestConfig.java @@ -429,7 +429,9 @@ public class QueryRestConfig { // extends WebMvcConfigurerAdapter { final String newDefaultProtocol = "https"; final String newDefaultHost = "data-api.psi.ch"; - final int newDefaultPort = 443; + + LOGGER.warn("\nSet back to 443!\n#####"); + final int newDefaultPort = 8080; // this should be overwritten final String newDefaultPath = ""; final List queryBackendStrs = new ArrayList<>(queryBackends.size()); diff --git a/src/main/java/ch/psi/daq/queryrest/controller/QueryRestController.java b/src/main/java/ch/psi/daq/queryrest/controller/QueryRestController.java index 40c3121..16a11b9 100644 --- a/src/main/java/ch/psi/daq/queryrest/controller/QueryRestController.java +++ b/src/main/java/ch/psi/daq/queryrest/controller/QueryRestController.java @@ -243,27 +243,30 @@ public class QueryRestController implements ApplicationContextAware { final HttpServletRequest httpRequest, final HttpServletResponse httpResponse) throws Throwable { - // Do a redirection if only one backend is requested - // - final Supplier> channelsSupplier = () -> query.getChannels(); - final String redirect = queryManager.getRedirection(channelsSupplier); - if (redirect != null) { - final String redirectURL = redirect - + httpRequest.getRequestURI() - + httpRequest.getQueryString() != null ? StringUtils.QUESTION_MARK + httpRequest.getQueryString() - : StringUtils.EMPTY; + final Response response = query.getResponseOrDefault(defaultResponse); + if (response.isAllowRedirect()) { + // Do a redirection if only one backend is requested + // + final Supplier> channelsSupplier = () -> query.getChannels(); + final String redirect = queryManager.getRedirection(channelsSupplier); + if (redirect != null) { + final String redirectURL = + redirect + + httpRequest.getRequestURI() + + ((httpRequest.getQueryString() != null && StringUtils.NULL_STRING.equals(httpRequest.getQueryString())) ? StringUtils.QUESTION_MARK + httpRequest.getQueryString() + : StringUtils.EMPTY); - LOGGER.info("Send redirect to '{}'.", redirectURL); - httpResponse.setHeader(HttpHeaders.LOCATION, redirectURL); - // use 307 - works for POST too - httpResponse.setStatus(HttpServletResponse.SC_TEMPORARY_REDIRECT); - return; + LOGGER.info("Send redirect to '{}'.", redirectURL); + httpResponse.setHeader(HttpHeaders.LOCATION, redirectURL); + // use 307 - works for POST too + httpResponse.setStatus(HttpServletResponse.SC_TEMPORARY_REDIRECT); + return; + } } try { LOGGER.debug("Executing query '{}'", query); - final Response response = query.getResponseOrDefault(defaultResponse); if (response instanceof AbstractHTTPResponse) { LOGGER.debug("Executing config query '{}'", query); final AbstractHTTPResponse httpRes = ((AbstractHTTPResponse) response); @@ -421,29 +424,32 @@ public class QueryRestController implements ApplicationContextAware { @RequestBody @Valid final DAQQueries queries, final HttpServletRequest httpRequest, final HttpServletResponse httpResponse) throws Exception { - // Do a redirection if only one backend is requested - // - final Supplier> channelsSupplier = () -> queries.getQueries().stream() - .flatMap(daqQuery -> daqQuery.getChannels().stream()) - .collect(Collectors.toList()); - final String redirect = queryManager.getRedirection(channelsSupplier); - if (redirect != null) { - final String redirectURL = redirect - + httpRequest.getRequestURI() - + httpRequest.getQueryString() != null ? StringUtils.QUESTION_MARK + httpRequest.getQueryString() - : StringUtils.EMPTY; + final Response response = queries.getResponseOrDefault(defaultResponse); + if (response.isAllowRedirect()) { + // Do a redirection if only one backend is requested + // + final Supplier> channelsSupplier = () -> queries.getQueries().stream() + .flatMap(daqQuery -> daqQuery.getChannels().stream()) + .collect(Collectors.toList()); + final String redirect = queryManager.getRedirection(channelsSupplier); + if (redirect != null) { + final String redirectURL = + redirect + + httpRequest.getRequestURI() + + ((httpRequest.getQueryString() != null && StringUtils.NULL_STRING.equals(httpRequest.getQueryString())) ? StringUtils.QUESTION_MARK + httpRequest.getQueryString() + : StringUtils.EMPTY); - LOGGER.info("Send redirect to '{}'.", redirectURL); - httpResponse.setHeader(HttpHeaders.LOCATION, redirectURL); - // use 307 - works for POST too - httpResponse.setStatus(HttpServletResponse.SC_TEMPORARY_REDIRECT); - return; + LOGGER.info("Send redirect to '{}'.", redirectURL); + httpResponse.setHeader(HttpHeaders.LOCATION, redirectURL); + // use 307 - works for POST too + httpResponse.setStatus(HttpServletResponse.SC_TEMPORARY_REDIRECT); + return; + } } try { LOGGER.debug("Executing queries '{}'", queries); - final Response response = queries.getResponseOrDefault(defaultResponse); if (response instanceof AbstractHTTPResponse) { LOGGER.debug("Executing query '{}'", queries); final AbstractHTTPResponse httpRes = ((AbstractHTTPResponse) response); diff --git a/src/main/java/ch/psi/daq/queryrest/query/AbstractQueryManager.java b/src/main/java/ch/psi/daq/queryrest/query/AbstractQueryManager.java new file mode 100644 index 0000000..4fcc1f1 --- /dev/null +++ b/src/main/java/ch/psi/daq/queryrest/query/AbstractQueryManager.java @@ -0,0 +1,186 @@ +package ch.psi.daq.queryrest.query; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import org.apache.commons.lang3.tuple.Pair; +import org.apache.commons.lang3.tuple.Triple; + +import ch.psi.daq.common.tuple.Quadruple; +import ch.psi.daq.domain.DataEvent; +import ch.psi.daq.domain.backend.Backend; +import ch.psi.daq.domain.events.ChannelConfiguration; +import ch.psi.daq.domain.json.ChannelName; +import ch.psi.daq.domain.query.DAQConfigQuery; +import ch.psi.daq.domain.query.DAQConfigQueryElement; +import ch.psi.daq.domain.query.DAQQueries; +import ch.psi.daq.domain.query.DAQQueryElement; +import ch.psi.daq.domain.query.backend.BackendQuery; +import ch.psi.daq.domain.query.backend.BackendQueryImpl; +import ch.psi.daq.domain.query.backend.analyzer.BackendQueryAnalyzer; +import ch.psi.daq.domain.query.channels.BackendsChannelConfigurationCache; +import ch.psi.daq.domain.query.channels.ChannelConfigurationsRequest; +import ch.psi.daq.domain.query.channels.ChannelConfigurationsResponse; +import ch.psi.daq.domain.query.channels.ChannelsRequest; +import ch.psi.daq.domain.query.channels.ChannelsResponse; +import ch.psi.daq.domain.query.channels.LongHash; +import ch.psi.daq.domain.query.processor.QueryProcessor; + +public abstract class AbstractQueryManager implements QueryManager { + private Set activeBackends; + private BackendsChannelConfigurationCache channelsCache; + private Function queryAnalizerFactory; + + protected void init( + final Set activeBackends, + final BackendsChannelConfigurationCache channelsCache, + final Function queryAnalizerFactory) { + this.activeBackends = activeBackends; + this.channelsCache = channelsCache; + this.queryAnalizerFactory = queryAnalizerFactory; + } + + protected Set getActiveBackends() { + return activeBackends; + } + + protected BackendsChannelConfigurationCache getConfigurationCache() { + return channelsCache; + } + + protected Function getQueryAnalizerFactory() { + return queryAnalizerFactory; + } + + @Override + public Stream getBackends() { + return Backend.getBackends().stream() + .filter(backend -> activeBackends.contains(backend)); + } + + @Override + public LongHash getChannelsHash() { + return channelsCache.getChannelsHash(); + } + + @Override + public Stream getChannels( + final ChannelsRequest request) { + return channelsCache.getChannels(request); + } + + @Override + public LongHash getChannelConfigurationsHash() { + return channelsCache.getChannelConfigurationsHash(); + } + + @Override + public Stream getChannelConfigurations( + final ChannelConfigurationsRequest request) { + return channelsCache.getChannelConfigurations(request); + } + + @Override + public ChannelConfiguration getChannelConfiguration( + final ChannelName channel) { + return channelsCache.getChannelConfiguration(channel); + } + + @Override + public Entry>> queryConfigs( + final DAQConfigQuery daqQuery) { + // set backends if not defined yet + channelsCache.configureBackends(daqQuery.getChannels()); + + Stream> resultStreams = + BackendQueryImpl + .getBackendQueries(daqQuery) + .stream() + .filter( + query -> query.getBackend().getBackendAccess().hasDataReader()) + .flatMap( + query -> { + /* all the magic happens here */ + final Map> channelToConfig = + query.getChannelConfigurations(); + + return channelToConfig.entrySet().stream() + .map(entry -> { + return Triple.of( + query, + new ChannelName(entry.getKey(), query.getBackend()), + entry.getValue()); + }); + }); + + return Pair.of(daqQuery, resultStreams); + } + + @Override + public List>>> queryEvents( + final DAQQueries queries) { + // set backends if not defined yet + for (DAQQueryElement daqQuery : queries) { + channelsCache.configureBackends(daqQuery.getChannels()); + } + + final List>>> results = + new ArrayList<>(queries.getQueries().size()); + + for (final DAQQueryElement queryElement : queries) { + Stream> resultStreams = + BackendQueryImpl + .getBackendQueries(queryElement) + .stream() + .filter( + query -> { + return query.getBackend().getBackendAccess().hasDataReader() + && query.getBackend().getBackendAccess().hasQueryProcessor();}) + .flatMap( + query -> { + final QueryProcessor processor = + query.getBackend().getBackendAccess().getQueryProcessor(); + final BackendQueryAnalyzer queryAnalizer = queryAnalizerFactory.apply(query); + + // ChannelEvent query + /* all the magic happens here */ + final Stream>> channelToDataEvents = + processor.process(queryAnalizer); + /* do post-process */ + final Stream> channelToData = + queryAnalizer.postProcess(channelToDataEvents); + + // ChannelConfig query + final BackendQuery configQuery = + new BackendQueryImpl(query, queryElement.getConfigFields()); + final Map> channelToConfig = + configQuery.getChannelConfigurations(); + + return channelToData.map(entry -> { + return Quadruple.of( + query, + entry.getKey(), + channelToConfig.get(entry.getKey().getName()), + entry.getValue()); + }); + }); + + // Now we have a stream that loads elements sequential BackendQuery by BackendQuery. + // By materializing the outer Stream the elements of all BackendQuery are loaded async + // (speeds things up but requires also more memory - i.e. it relies on Backends not loading + // all elements into memory at once) + resultStreams = resultStreams.collect(Collectors.toList()).stream(); + + results.add(Pair.of(queryElement, resultStreams)); + } + + return results; + } + +} diff --git a/src/main/java/ch/psi/daq/queryrest/query/QueryManagerImpl.java b/src/main/java/ch/psi/daq/queryrest/query/QueryManagerImpl.java index 2d9e852..6553802 100644 --- a/src/main/java/ch/psi/daq/queryrest/query/QueryManagerImpl.java +++ b/src/main/java/ch/psi/daq/queryrest/query/QueryManagerImpl.java @@ -1,52 +1,27 @@ package ch.psi.daq.queryrest.query; -import java.util.ArrayList; import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; import java.util.Set; import java.util.function.Function; import java.util.function.Supplier; -import java.util.stream.Collectors; -import java.util.stream.Stream; import javax.annotation.PreDestroy; -import org.apache.commons.lang3.tuple.Pair; -import org.apache.commons.lang3.tuple.Triple; import org.springframework.beans.BeansException; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; -import ch.psi.daq.common.tuple.Quadruple; -import ch.psi.daq.domain.DataEvent; import ch.psi.daq.domain.backend.Backend; import ch.psi.daq.domain.config.DomainConfig; -import ch.psi.daq.domain.events.ChannelConfiguration; import ch.psi.daq.domain.json.ChannelName; -import ch.psi.daq.domain.query.DAQConfigQuery; -import ch.psi.daq.domain.query.DAQConfigQueryElement; -import ch.psi.daq.domain.query.DAQQueries; -import ch.psi.daq.domain.query.DAQQueryElement; import ch.psi.daq.domain.query.backend.BackendQuery; -import ch.psi.daq.domain.query.backend.BackendQueryImpl; import ch.psi.daq.domain.query.backend.analyzer.BackendQueryAnalyzer; import ch.psi.daq.domain.query.channels.BackendsChannelConfigurationCache; -import ch.psi.daq.domain.query.channels.ChannelConfigurationsRequest; -import ch.psi.daq.domain.query.channels.ChannelConfigurationsResponse; -import ch.psi.daq.domain.query.channels.ChannelsRequest; -import ch.psi.daq.domain.query.channels.ChannelsResponse; -import ch.psi.daq.domain.query.channels.LongHash; -import ch.psi.daq.domain.query.processor.QueryProcessor; import ch.psi.daq.query.config.QueryConfig; import ch.psi.daq.queryrest.config.QueryRestConfig; -public class QueryManagerImpl implements QueryManager, ApplicationContextAware { +public class QueryManagerImpl extends AbstractQueryManager implements ApplicationContextAware { public static final String QUERY_SERVER_TYPE = "local"; - private Set activeBackends; - private BackendsChannelConfigurationCache channelsCache; - private Function queryAnalizerFactory; @SuppressWarnings("unchecked") @Override @@ -54,142 +29,20 @@ public class QueryManagerImpl implements QueryManager, ApplicationContextAware { final Backend backend = context.getBean(DomainConfig.BEAN_NAME_BACKEND_DEFAULT, Backend.class); context = backend.getApplicationContext(); - activeBackends = context.getBean(DomainConfig.BEAN_NAME_BACKENDS_ACTIVE, Set.class); - channelsCache = + final Set activeBackends = context.getBean(DomainConfig.BEAN_NAME_BACKENDS_ACTIVE, Set.class); + final BackendsChannelConfigurationCache channelsCache = context.getBean(QueryConfig.BEAN_NAME_HISTORIC_CHANNELS_CACHE, BackendsChannelConfigurationCache.class); - queryAnalizerFactory = context.getBean(QueryRestConfig.BEAN_NAME_QUERY_ANALIZER_FACTORY, Function.class); + final Function queryAnalizerFactory = + context.getBean(QueryRestConfig.BEAN_NAME_QUERY_ANALIZER_FACTORY, Function.class); + + init(activeBackends, channelsCache, queryAnalizerFactory); } @PreDestroy public void destroy() {} - @Override - public Stream getBackends() { - return Backend.getBackends().stream() - .filter(backend -> activeBackends.contains(backend)); - } - @Override public String getRedirection(final Supplier> channels) { return null; } - - @Override - public LongHash getChannelsHash() { - return channelsCache.getChannelsHash(); - } - - @Override - public Stream getChannels( - final ChannelsRequest request) { - return channelsCache.getChannels(request); - } - - @Override - public LongHash getChannelConfigurationsHash() { - return channelsCache.getChannelConfigurationsHash(); - } - - @Override - public Stream getChannelConfigurations( - final ChannelConfigurationsRequest request) { - return channelsCache.getChannelConfigurations(request); - } - - @Override - public ChannelConfiguration getChannelConfiguration( - final ChannelName channel) { - return channelsCache.getChannelConfiguration(channel); - } - - @Override - public Entry>> queryConfigs( - final DAQConfigQuery daqQuery) { - // set backends if not defined yet - channelsCache.configureBackends(daqQuery.getChannels()); - - Stream> resultStreams = - BackendQueryImpl - .getBackendQueries(daqQuery) - .stream() - .filter( - query -> query.getBackend().getBackendAccess().hasStreamEventReader()) - .flatMap( - query -> { - /* all the magic happens here */ - final Map> channelToConfig = - query.getChannelConfigurations(); - - return channelToConfig.entrySet().stream() - .map(entry -> { - return Triple.of( - query, - new ChannelName(entry.getKey(), query.getBackend()), - entry.getValue()); - }); - }); - - return Pair.of(daqQuery, resultStreams); - } - - @Override - public List>>> queryEvents( - final DAQQueries queries) { - // set backends if not defined yet - for (DAQQueryElement daqQuery : queries) { - channelsCache.configureBackends(daqQuery.getChannels()); - } - - final List>>> results = - new ArrayList<>(queries.getQueries().size()); - - for (final DAQQueryElement queryElement : queries) { - Stream> resultStreams = - BackendQueryImpl - .getBackendQueries(queryElement) - .stream() - .filter( - query -> query.getBackend().getBackendAccess().hasDataReader() - && query.getBackend().getBackendAccess().hasQueryProcessor()) - .flatMap( - query -> { - final QueryProcessor processor = - query.getBackend().getBackendAccess().getQueryProcessor(); - final BackendQueryAnalyzer queryAnalizer = queryAnalizerFactory.apply(query); - - // ChannelEvent query - /* all the magic happens here */ - final Stream>> channelToDataEvents = - processor.process(queryAnalizer); - /* do post-process */ - final Stream> channelToData = - queryAnalizer.postProcess(channelToDataEvents); - - // ChannelConfig query - final BackendQuery configQuery = - new BackendQueryImpl(query, queryElement.getConfigFields()); - final Map> channelToConfig = - configQuery.getChannelConfigurations(); - - return channelToData.map(entry -> { - return Quadruple.of( - query, - entry.getKey(), - channelToConfig.get(entry.getKey().getName()), - entry.getValue()); - }); - }); - - // Now we have a stream that loads elements sequential BackendQuery by BackendQuery. - // By materializing the outer Stream the elements of all BackendQuery are loaded async - // (speeds things up but requires also more memory - i.e. it relies on Backends not loading - // all elements into memory at once) - resultStreams = resultStreams.collect(Collectors.toList()).stream(); - - results.add(Pair.of(queryElement, resultStreams)); - } - - return results; - } - } diff --git a/src/main/java/ch/psi/daq/queryrest/query/QueryManagerRemote.java b/src/main/java/ch/psi/daq/queryrest/query/QueryManagerRemote.java index f807213..031aba4 100644 --- a/src/main/java/ch/psi/daq/queryrest/query/QueryManagerRemote.java +++ b/src/main/java/ch/psi/daq/queryrest/query/QueryManagerRemote.java @@ -14,39 +14,42 @@ import java.util.stream.Stream; import javax.annotation.PreDestroy; import org.apache.commons.lang3.tuple.Pair; -import org.apache.commons.lang3.tuple.Triple; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.beans.BeansException; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import ch.psi.daq.common.tuple.Quadruple; import ch.psi.daq.domain.DataEvent; +import ch.psi.daq.domain.StreamEvent; import ch.psi.daq.domain.backend.Backend; import ch.psi.daq.domain.events.ChannelConfiguration; import ch.psi.daq.domain.json.ChannelName; -import ch.psi.daq.domain.query.DAQConfigQuery; -import ch.psi.daq.domain.query.DAQConfigQueryElement; import ch.psi.daq.domain.query.DAQQueries; +import ch.psi.daq.domain.query.DAQQuery; import ch.psi.daq.domain.query.DAQQueryElement; import ch.psi.daq.domain.query.backend.BackendQuery; import ch.psi.daq.domain.query.backend.BackendQueryImpl; import ch.psi.daq.domain.query.backend.analyzer.BackendQueryAnalyzer; import ch.psi.daq.domain.query.channels.BackendsChannelConfigurationCache; import ch.psi.daq.domain.query.channels.ChannelConfigurationLoader; -import ch.psi.daq.domain.query.channels.ChannelConfigurationsRequest; -import ch.psi.daq.domain.query.channels.ChannelConfigurationsResponse; -import ch.psi.daq.domain.query.channels.ChannelsRequest; -import ch.psi.daq.domain.query.channels.ChannelsResponse; -import ch.psi.daq.domain.query.channels.LongHash; -import ch.psi.daq.domain.query.processor.QueryProcessor; +import ch.psi.daq.domain.query.mapping.IncompleteStrategy; +import ch.psi.daq.domain.query.response.Response; +import ch.psi.daq.domain.query.response.ResponseFormat; +import ch.psi.daq.domain.query.response.ResponseImpl; +import ch.psi.daq.domain.rest.RestHelper; import ch.psi.daq.query.config.QueryConfig; import ch.psi.daq.queryrest.config.QueryRestConfig; -public class QueryManagerRemote implements QueryManager, ApplicationContextAware { +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +public class QueryManagerRemote extends AbstractQueryManager implements ApplicationContextAware { + private static final Logger LOGGER = LoggerFactory.getLogger(QueryManagerRemote.class); + public static final String QUERY_SERVER_TYPE = "remote"; private Map backendToServerAddresses; - private BackendsChannelConfigurationCache channelsCache; - private Function queryAnalizerFactory; @SuppressWarnings("unchecked") @Override @@ -61,27 +64,31 @@ public class QueryManagerRemote implements QueryManager, ApplicationContextAware return null; } }; - channelsCache = new BackendsChannelConfigurationCache(loaderProvider, reloadPeriodMillis); + final BackendsChannelConfigurationCache channelsCache = + new BackendsChannelConfigurationCache(loaderProvider, reloadPeriodMillis); channelsCache.init( context, backendToServerAddresses.keySet().iterator().next(), backendToServerAddresses.keySet()); - queryAnalizerFactory = context.getBean(QueryRestConfig.BEAN_NAME_QUERY_ANALIZER_FACTORY, Function.class); + final Function queryAnalizerFactory = + context.getBean(QueryRestConfig.BEAN_NAME_QUERY_ANALIZER_FACTORY, Function.class); + + init(backendToServerAddresses.keySet(), channelsCache, queryAnalizerFactory); } @PreDestroy public void destroy() {} - @Override - public Stream getBackends() { - return backendToServerAddresses.keySet().stream(); - } + // @Override + // public Stream getBackends() { + // return backendToServerAddresses.keySet().stream(); + // } @Override public String getRedirection(final Supplier> channelsSupplier) { // set backends if not defined yet final Collection channels = channelsSupplier.get(); - channelsCache.configureBackends(channels); + getConfigurationCache().configureBackends(channels); final Set backendRoots = channels.stream() .map(channelName -> backendToServerAddresses.get(channelName.getBackend())) @@ -95,93 +102,85 @@ public class QueryManagerRemote implements QueryManager, ApplicationContextAware } } - @Override - public LongHash getChannelsHash() { - return channelsCache.getChannelsHash(); - } - - @Override - public Stream getChannels( - final ChannelsRequest request) { - return channelsCache.getChannels(request); - } - - @Override - public LongHash getChannelConfigurationsHash() { - return channelsCache.getChannelConfigurationsHash(); - } - - @Override - public Stream getChannelConfigurations( - final ChannelConfigurationsRequest request) { - return channelsCache.getChannelConfigurations(request); - } - - @Override - public ChannelConfiguration getChannelConfiguration( - final ChannelName channel) { - return channelsCache.getChannelConfiguration(channel); - } - - @Override - public Entry>> queryConfigs( - final DAQConfigQuery daqQuery) { - // set backends if not defined yet - channelsCache.configureBackends(daqQuery.getChannels()); - - Stream> resultStreams = - BackendQueryImpl - .getBackendQueries(daqQuery) - .stream() - .filter( - query -> query.getBackend().getBackendAccess().hasStreamEventReader()) - .flatMap( - query -> { - /* all the magic happens here */ - final Map> channelToConfig = - query.getChannelConfigurations(); - - return channelToConfig.entrySet().stream() - .map(entry -> { - return Triple.of( - query, - new ChannelName(entry.getKey(), query.getBackend()), - entry.getValue()); - }); - }); - - return Pair.of(daqQuery, resultStreams); - } - @Override public List>>> queryEvents( final DAQQueries queries) { + // INFO: It is always an option to call super.queryEvents(queries); + // The super call will use QueryRestStreamEventReader. However this will be slower because + // more data is transfered (aggregation info for e.g. index/bin is not available on + // QueryRestStreamEventReader level). + // set backends if not defined yet for (DAQQueryElement daqQuery : queries) { - channelsCache.configureBackends(daqQuery.getChannels()); + getConfigurationCache().configureBackends(daqQuery.getChannels()); } final List>>> results = new ArrayList<>(queries.getQueries().size()); + // TODO: consider rawevent -> ??? + final Response response = new ResponseImpl(ResponseFormat.JSON); for (final DAQQueryElement queryElement : queries) { Stream> resultStreams = BackendQueryImpl .getBackendQueries(queryElement) .stream() - .filter( - query -> query.getBackend().getBackendAccess().hasDataReader() - && query.getBackend().getBackendAccess().hasQueryProcessor()) .flatMap( query -> { - final QueryProcessor processor = - query.getBackend().getBackendAccess().getQueryProcessor(); - final BackendQueryAnalyzer queryAnalizer = queryAnalizerFactory.apply(query); + final String queryServer = backendToServerAddresses.get(query.getBackend()); - // ChannelEvent query - /* all the magic happens here */ final Stream>> channelToDataEvents = - processor.process(queryAnalizer); + query.getChannels() + .stream() + .map(channel -> { + if (queryServer == null) { + LOGGER.warn( + "There is no query server defined for '{}' of '{}'. Provide empty stream.", + channel, query.getBackend()); + return Pair.of(new ChannelName(channel, query.getBackend()), + Stream.empty()); + } else { + final ChannelName channelName = + new ChannelName(channel, query.getBackend()); + final DAQQuery daqQuery = + new DAQQuery(queryElement, response, channelName); + final Stream events; + if (daqQuery.getMapping() != null) { + daqQuery.getMapping().setIncomplete(IncompleteStrategy.FILL_NULL); + events = RestHelper.queryEventsTableAsync( + query.getBackend().getApplicationContext(), queryServer, + daqQuery) + .onErrorResume(thrw -> { + LOGGER.warn( + "Could not query '{}' for '{}' of '{}' as table. Provide empty stream.", + queryServer, channel, query.getBackend(), thrw); + return Mono.empty(); + }) + .flatMapMany(eventTable -> Flux + .fromStream(eventTable.getEventsOfColumn(0))) + .toStream(); + } else { + events = RestHelper + .queryEventsAsync(query.getBackend().getApplicationContext(), + queryServer, daqQuery) + .onErrorResume(thrw -> { + LOGGER.warn( + "Could not query '{}' for '{}' of '{}'. Provide empty stream.", + queryServer, channel, query.getBackend(), thrw); + return Flux.empty(); + }) + .flatMap(channelEvents -> Flux + .fromStream(channelEvents.getEvents())) + .toStream(); + } + + return Pair.of(channelName, events); + } + }); + + + final BackendQueryAnalyzer queryAnalizer = getQueryAnalizerFactory().apply(query); + /* do post-process */ final Stream> channelToData = queryAnalizer.postProcess(channelToDataEvents); @@ -212,5 +211,4 @@ public class QueryManagerRemote implements QueryManager, ApplicationContextAware return results; } - } diff --git a/src/main/resources/queryrest-all.properties b/src/main/resources/queryrest-all.properties new file mode 100644 index 0000000..a4908d2 --- /dev/null +++ b/src/main/resources/queryrest-all.properties @@ -0,0 +1,3 @@ +query.server.type=remote +query.processor.type=local +query.server.addresses=[{"protocol":"http","host":"localhost","port":8081,"path":""}] \ No newline at end of file diff --git a/src/main/resources/queryrest.properties b/src/main/resources/queryrest.properties index 38594b6..44151d9 100644 --- a/src/main/resources/queryrest.properties +++ b/src/main/resources/queryrest.properties @@ -14,7 +14,7 @@ queryrest.response.fields.config.historic=name,backend,type,shape,source,descrip ########################################## # defines the query server type "local" for local backend access and "remote" for remote backend access through REST calls query.server.type=local -# defines REST backends (for query.server.type=remote +# defines REST backends (for query.server.type=remote) query.server.addresses=[{"path":"/sf"},{"path":"/gls"},{"path":"/hipa"},{"path":"/saresa"},{"path":"/saresb"}] # defines if the writer is a local writer (can write data to filesystem) diff --git a/src/test/java/ch/psi/daq/test/queryrest/config/DaqWebMvcConfig.java b/src/test/java/ch/psi/daq/test/queryrest/config/DaqWebMvcConfig.java index 799ad1f..d8ddb78 100644 --- a/src/test/java/ch/psi/daq/test/queryrest/config/DaqWebMvcConfig.java +++ b/src/test/java/ch/psi/daq/test/queryrest/config/DaqWebMvcConfig.java @@ -25,10 +25,10 @@ import ch.psi.daq.domain.backend.BackendType; import ch.psi.daq.domain.config.DomainConfig; import ch.psi.daq.domain.events.ChannelEvent; import ch.psi.daq.domain.query.processor.QueryProcessor; +import ch.psi.daq.domain.query.processor.QueryProcessorLocal; import ch.psi.daq.domain.reader.StreamEventReader; import ch.psi.daq.domain.test.reader.TestReader; import ch.psi.daq.query.config.QueryConfig; -import ch.psi.daq.query.processor.QueryProcessorLocal; import ch.psi.daq.queryrest.config.QueryRestConfig; @Configuration diff --git a/src/test/java/ch/psi/daq/test/queryrest/controller/AbstractQueryRestControllerTableTest.java b/src/test/java/ch/psi/daq/test/queryrest/controller/AbstractQueryRestControllerTableTest.java index 69e65d4..3f37cdd 100644 --- a/src/test/java/ch/psi/daq/test/queryrest/controller/AbstractQueryRestControllerTableTest.java +++ b/src/test/java/ch/psi/daq/test/queryrest/controller/AbstractQueryRestControllerTableTest.java @@ -83,7 +83,7 @@ public abstract class AbstractQueryRestControllerTableTest extends AbstractDaqRe ChannelEventTableImpl table = getResponseMapper().readValue(responseBytes, ChannelEventTableImpl.class); assertEquals(2, table.size()); - List events = table.getEvents(0).collect(Collectors.toList()); + List events = table.getEventsOfRow(0).collect(Collectors.toList()); assertEquals(2, events.size()); DataEvent event = events.get(0); assertEquals(TEST_CHANNEL_01, event.getChannel()); @@ -100,7 +100,7 @@ public abstract class AbstractQueryRestControllerTableTest extends AbstractDaqRe assertEquals(new Time(1, 0), event.getIocTime()); assertEquals(100, event.getValue(Number.class).longValue()); - events = table.getEvents(1).collect(Collectors.toList()); + events = table.getEventsOfRow(1).collect(Collectors.toList()); assertEquals(2, events.size()); event = events.get(0); assertEquals(TEST_CHANNEL_01, event.getChannel()); diff --git a/src/test/java/ch/psi/daq/test/queryrest/query/QueryManagerRemoteTest.java b/src/test/java/ch/psi/daq/test/queryrest/query/QueryManagerRemoteTest.java index 9d26da9..e90382e 100644 --- a/src/test/java/ch/psi/daq/test/queryrest/query/QueryManagerRemoteTest.java +++ b/src/test/java/ch/psi/daq/test/queryrest/query/QueryManagerRemoteTest.java @@ -1,9 +1,16 @@ package ch.psi.daq.test.queryrest.query; +import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import java.util.Arrays; +import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; +import java.util.function.Function; import java.util.stream.Collectors; import javax.annotation.Resource; @@ -13,26 +20,534 @@ import org.junit.Before; import org.junit.Test; import org.springframework.context.ApplicationContext; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; + +import ch.psi.bsread.message.Type; +import ch.psi.daq.common.concurrent.LoggingMapMergeFunction; +import ch.psi.daq.common.statistic.CombinableStatistics; +import ch.psi.daq.common.statistic.Statistics; +import ch.psi.daq.common.statistic.StorelessStatistics; +import ch.psi.daq.common.time.TimeUtils; +import ch.psi.daq.domain.StreamEvent; import ch.psi.daq.domain.backend.Backend; +import ch.psi.daq.domain.config.DomainConfig; +import ch.psi.daq.domain.events.ChannelConfiguration; +import ch.psi.daq.domain.events.impl.ChannelConfigurationImpl; +import ch.psi.daq.domain.json.ChannelEventTable; +import ch.psi.daq.domain.json.ChannelEvents; +import ch.psi.daq.domain.json.ChannelName; +import ch.psi.daq.domain.json.Event; +import ch.psi.daq.domain.query.DAQConfigQuery; +import ch.psi.daq.domain.query.DAQQuery; import ch.psi.daq.domain.query.channels.ChannelsRequest; +import ch.psi.daq.domain.query.mapping.Mapping; +import ch.psi.daq.domain.query.operation.Aggregation; +import ch.psi.daq.domain.query.operation.AggregationDescriptor; +import ch.psi.daq.domain.query.operation.AggregationType; +import ch.psi.daq.domain.request.range.RequestRangeTime; +import ch.psi.daq.domain.rest.RestHelper; import ch.psi.daq.queryrest.config.QueryRestConfig; import ch.psi.daq.queryrest.query.QueryManager; import ch.psi.daq.test.queryrest.AbstractDaqRestTest; +import ch.psi.data.converters.ByteConverter; public class QueryManagerRemoteTest extends AbstractDaqRestTest { + private final static String DATA_BUFFER = "daqlocal";// "sf-databuffer"; @Resource private ApplicationContext context; private QueryManager queryManager; + private Map backendToServerAddresses; + private Backend queryBackend; + private String queryServer; + @SuppressWarnings("unchecked") @Before public void setUp() throws Exception { queryManager = context.getBean(QueryRestConfig.BEAN_NAME_QUERY_MANAGER_REMOTE, QueryManager.class); + backendToServerAddresses = + context.getBean(QueryRestConfig.BEAN_NAME_QUERY_SERVER_BACKENDS, Map.class); + queryBackend = backendToServerAddresses.keySet().stream() + .filter(backend -> DATA_BUFFER.equals(backend.getName())) + .findFirst() + .orElse(null); + queryServer = backendToServerAddresses.get(queryBackend); } @After public void tearDown() {} + @Test + public void testStatisticsJSON_01() throws Exception { + final ObjectMapper objectMapper = context.getBean(DomainConfig.BEAN_NAME_OBJECT_MAPPER, ObjectMapper.class); + + final StorelessStatistics stats1 = new StorelessStatistics(3); + stats1.add(5); + final StorelessStatistics stats2 = new StorelessStatistics(3); + stats2.add(5); + stats2.add(7); + + List statisticsList = Arrays.asList( + stats1, + stats2); + + String statsStr = objectMapper.writeValueAsString( + statisticsList + .stream() + .map(stat -> stat.getTyped())); + + List statisticsListDes = objectMapper.readValue(statsStr, new TypeReference>() {}); + assertEquals(statisticsList, statisticsListDes); + + List combStatisticsListDes = + objectMapper.readValue(statsStr, new TypeReference>() {}); + assertEquals(statisticsList, combStatisticsListDes); + } + + @Test + public void testConfigQuery_01() throws Exception { + assertNotNull(queryBackend); + assertNotNull(queryServer); + + String channel = "SINEG01-RCIR-PUP10:SIG-AMPLT-AVG"; + DAQConfigQuery query = new DAQConfigQuery( + new RequestRangeTime( + TimeUtils.parse("2016-10-12T14:00"), + TimeUtils.parse("2018-12-03T08:00")), + new ChannelName(channel, queryBackend)); + + + List configs = RestHelper.queryConfigs(context, queryServer, query) + .stream() + .flatMap(channelConfigs -> channelConfigs.getConfigs()) + .collect(Collectors.toList()); + assertTrue("Size was " + configs.size(), configs.size() > 0); + assertEquals(ChannelConfigurationImpl.class, configs.get(0).getClass()); + assertEquals(channel, configs.get(0).getChannel()); + assertEquals(queryBackend, configs.get(0).getBackend()); + } + + @Test + public void testEventQueryRaw_01() throws Exception { + assertNotNull(queryBackend); + assertNotNull(queryServer); + + String scalarChannel = "SINEG01-RCIR-PUP10:SIG-AMPLT-AVG"; + String waveformChannel = "SINEG01-RCIR-PUP10:SIG-AMPLT"; + DAQQuery query = new DAQQuery( + new RequestRangeTime( + TimeUtils.parse("2016-10-12T14:00"), + TimeUtils.parse("2016-10-12T16:00")), + new ChannelName(scalarChannel, queryBackend), + new ChannelName(waveformChannel, queryBackend)); + + final List eventsList = RestHelper.queryEvents(context, queryServer, query); + assertEquals("Size was " + eventsList.size(), 2, eventsList.size()); + + List events = eventsList.get(0) + .getEvents() + .collect(Collectors.toList()); + assertEquals("Size was " + events.size(), 11, events.size()); + StreamEvent event = events.get(0); + assertEquals(Event.class, event.getClass()); + assertEquals(queryBackend, event.getBackend()); + assertEquals(scalarChannel, event.getChannel()); + assertEquals(1, event.getDoubleStream(ByteConverter.PARALLELISM_FALSE).count()); + assertEquals(1, event.getEventCount()); + assertEquals("2016-10-12T14:07:09.914760000+02:00", event.getGlobalDate()); + assertEquals("1970-01-01T01:00:00.000000000+01:00", event.getIocDate()); + assertEquals(638760000, event.getPulseId()); + assertArrayEquals(new int[] {1}, event.getShape()); + assertEquals(1, event.getStatistics().getCount()); + assertEquals(Type.Float64.getKey(), event.getType()); + assertEquals("9.430576E-5", event.getValueAsString()); + + events = eventsList.get(1) + .getEvents() + .collect(Collectors.toList()); + assertEquals("Size was " + events.size(), 11, events.size()); + event = events.get(0); + assertEquals(Event.class, event.getClass()); + assertEquals(queryBackend, event.getBackend()); + assertEquals(waveformChannel, event.getChannel()); + assertEquals(2048, event.getDoubleStream(ByteConverter.PARALLELISM_FALSE).count()); + assertEquals(1, event.getEventCount()); + assertEquals("2016-10-12T14:07:09.914760000+02:00", event.getGlobalDate()); + assertEquals("1970-01-01T01:00:00.000000000+01:00", event.getIocDate()); + assertEquals(638760000, event.getPulseId()); + assertArrayEquals(new int[] {2048}, event.getShape()); + assertEquals(2048, event.getStatistics().getCount()); + assertEquals(Type.Int32.getKey(), event.getType()); + assertEquals( + "[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2, 3, 3, 4, 6, 6, 8, 6, 5, 3, 3, 3, 8, 12, 14... shape [2048]]", + event.getValueAsString(30)); + } + + @Test + public void testEventQueryAggregated_01() throws Exception { + assertNotNull(queryBackend); + assertNotNull(queryServer); + + String scalarChannel = "SINEG01-RCIR-PUP10:SIG-AMPLT-AVG"; + String waveformChannel = "SINEG01-RCIR-PUP10:SIG-AMPLT"; + DAQQuery query = new DAQQuery( + new RequestRangeTime( + TimeUtils.parse("2016-10-12T14:00"), + TimeUtils.parse("2016-10-12T16:00")), + new ChannelName(scalarChannel, queryBackend), + new ChannelName(waveformChannel, queryBackend)); + query.setAggregation(new AggregationDescriptor(AggregationType.value) + .setAggregations(Arrays.asList(Aggregation.typed))); + + final List eventsList = RestHelper.queryEvents(context, queryServer, query); + assertEquals("Size was " + eventsList.size(), 2, eventsList.size()); + + List events = eventsList.get(0) + .getEvents() + .collect(Collectors.toList()); + assertEquals("Size was " + events.size(), 11, events.size()); + StreamEvent event = events.get(0); + assertEquals(Event.class, event.getClass()); + assertEquals(queryBackend, event.getBackend()); + assertEquals(scalarChannel, event.getChannel()); + // assertEquals(1, event.getDoubleStream(ByteConverter.PARALLELISM_FALSE).count()); + assertEquals(1, event.getEventCount()); + assertEquals("2016-10-12T14:07:09.914760000+02:00", event.getGlobalDate()); + assertEquals("1970-01-01T01:00:00.000000000+01:00", event.getIocDate()); + assertEquals(638760000, event.getPulseId()); + assertArrayEquals(new int[] {1}, event.getShape()); + assertEquals(1, event.getStatistics().getCount()); + assertNull(event.getType()); + assertTrue(event.getValueAsString(30).startsWith("StorelessStatistics")); + + events = eventsList.get(1) + .getEvents() + .collect(Collectors.toList()); + assertEquals("Size was " + events.size(), 11, events.size()); + event = events.get(0); + assertEquals(Event.class, event.getClass()); + assertEquals(queryBackend, event.getBackend()); + assertEquals(waveformChannel, event.getChannel()); + // assertEquals(1, event.getDoubleStream(ByteConverter.PARALLELISM_FALSE).count()); + assertEquals(1, event.getEventCount()); + assertEquals("2016-10-12T14:07:09.914760000+02:00", event.getGlobalDate()); + assertEquals("1970-01-01T01:00:00.000000000+01:00", event.getIocDate()); + assertEquals(638760000, event.getPulseId()); + assertArrayEquals(new int[] {1}, event.getShape()); + assertEquals(2048, event.getStatistics().getCount()); + assertNull(event.getType()); + assertTrue(event.getValueAsString(30).startsWith("StorelessStatistics")); + } + + @Test + public void testEventQueryAggregatedBin_01() throws Exception { + assertNotNull(queryBackend); + assertNotNull(queryServer); + + String scalarChannel = "SINEG01-RCIR-PUP10:SIG-AMPLT-AVG"; + String waveformChannel = "SINEG01-RCIR-PUP10:SIG-AMPLT"; + DAQQuery query = new DAQQuery( + new RequestRangeTime( + TimeUtils.parse("2016-10-12T14:00"), + TimeUtils.parse("2016-10-12T16:00")), + new ChannelName(scalarChannel, queryBackend), + new ChannelName(waveformChannel, queryBackend)); + query.setAggregation(new AggregationDescriptor(AggregationType.value) + .setNrOfBins(2) + .setAggregations(Arrays.asList(Aggregation.typed))); + + int firstBinEventCount = 6; // 5; + final List eventsList = RestHelper.queryEvents(context, queryServer, query); + assertEquals("Size was " + eventsList.size(), 2, eventsList.size()); + + List events = eventsList.get(0) + .getEvents() + .collect(Collectors.toList()); + assertEquals("Size was " + events.size(), 2, events.size()); + StreamEvent event = events.get(0); + assertEquals(Event.class, event.getClass()); + assertEquals(queryBackend, event.getBackend()); + assertEquals(scalarChannel, event.getChannel()); + // assertEquals(1, event.getDoubleStream(ByteConverter.PARALLELISM_FALSE).count()); + assertEquals(firstBinEventCount, event.getEventCount()); + assertEquals("2016-10-12T14:07:09.914760000+02:00", event.getGlobalDate()); + assertEquals("1970-01-01T01:00:00.000000000+01:00", event.getIocDate()); + assertEquals(638760000, event.getPulseId()); + assertArrayEquals(new int[] {1}, event.getShape()); + assertEquals(firstBinEventCount, event.getStatistics().getCount()); + assertNull(event.getType()); + assertTrue(event.getValueAsString(30).startsWith("StorelessStatistics")); + + events = eventsList.get(1) + .getEvents() + .collect(Collectors.toList()); + assertEquals("Size was " + events.size(), 11, events.size()); + event = events.get(0); + assertEquals(Event.class, event.getClass()); + assertEquals(queryBackend, event.getBackend()); + assertEquals(waveformChannel, event.getChannel()); + // assertEquals(1, event.getDoubleStream(ByteConverter.PARALLELISM_FALSE).count()); + assertEquals(firstBinEventCount, event.getEventCount()); + assertEquals("2016-10-12T14:07:09.914760000+02:00", event.getGlobalDate()); + assertEquals("1970-01-01T01:00:00.000000000+01:00", event.getIocDate()); + assertEquals(638760000, event.getPulseId()); + assertArrayEquals(new int[] {1}, event.getShape()); + assertEquals(firstBinEventCount * 2048, event.getStatistics().getCount()); + assertNull(event.getType()); + assertTrue(event.getValueAsString(30).startsWith("StorelessStatistics")); + } + + @Test + public void testEventQueryIndexAggregateBin_01() throws Exception { + assertNotNull(queryBackend); + assertNotNull(queryServer); + + String scalarChannel = "SINEG01-RCIR-PUP10:SIG-AMPLT-AVG"; + String waveformChannel = "SINEG01-RCIR-PUP10:SIG-AMPLT"; + DAQQuery query = new DAQQuery( + new RequestRangeTime( + TimeUtils.parse("2016-10-12T14:00"), + TimeUtils.parse("2016-10-12T16:00")), + new ChannelName(scalarChannel, queryBackend), + new ChannelName(waveformChannel, queryBackend)); + query.setAggregation(new AggregationDescriptor(AggregationType.index) + .setNrOfBins(2) + .setAggregations(Arrays.asList(Aggregation.typed))); + + final List eventsList = RestHelper.queryEvents(context, queryServer, query); + assertEquals("Size was " + eventsList.size(), 2, eventsList.size()); + + int firstBinEventCount = 6; // 5; + List events = eventsList.get(0) + .getEvents() + .collect(Collectors.toList()); + assertEquals("Size was " + events.size(), 11, events.size()); + StreamEvent event = events.get(0); + assertEquals(Event.class, event.getClass()); + assertEquals(queryBackend, event.getBackend()); + assertEquals(scalarChannel, event.getChannel()); + // assertEquals(1, event.getDoubleStream(ByteConverter.PARALLELISM_FALSE).count()); + assertEquals(firstBinEventCount, event.getEventCount()); + assertEquals("2016-10-12T14:07:09.914760000+02:00", event.getGlobalDate()); + assertEquals("1970-01-01T01:00:00.000000000+01:00", event.getIocDate()); + assertEquals(638760000, event.getPulseId()); + assertArrayEquals(new int[] {1}, event.getShape()); + assertEquals(firstBinEventCount, event.getStatistics().getCount()); + assertEquals(firstBinEventCount, event.getValue(Statistics.class).getCount()); + assertNull(event.getType()); + assertTrue(event.getValueAsString(30).startsWith("StorelessStatistics")); + + events = eventsList.get(1) + .getEvents() + .collect(Collectors.toList()); + assertEquals("Size was " + events.size(), 11, events.size()); + event = events.get(0); + assertEquals(Event.class, event.getClass()); + assertEquals(queryBackend, event.getBackend()); + assertEquals(waveformChannel, event.getChannel()); + // assertEquals(1, event.getDoubleStream(ByteConverter.PARALLELISM_FALSE).count()); + assertEquals(firstBinEventCount, event.getEventCount()); + assertEquals("2016-10-12T14:07:09.914760000+02:00", event.getGlobalDate()); + assertEquals("1970-01-01T01:00:00.000000000+01:00", event.getIocDate()); + assertEquals(638760000, event.getPulseId()); + assertArrayEquals(new int[] {2048}, event.getShape()); + assertEquals(2048, event.getValue(List.class).size()); + assertEquals(firstBinEventCount, ((Statistics) event.getValue(List.class).get(0)).getCount()); + assertNull(event.getType()); + // + // System.out.println(event.getEventCount()); + // System.out.println(event.getGlobalDate()); + // System.out.println(event.getIocDate()); + // System.out.println(event.getPulseId()); + // System.out.println(Arrays.toString(event.getShape())); + // System.out.println(event.getValue(Object.class)); + // System.out.println(event.getType()); + // System.out.println(event.getValueAsString(30)); + } + + @Test + public void testEventQueryTableRaw_01() throws Exception { + assertNotNull(queryBackend); + assertNotNull(queryServer); + + String scalarChannel = "SINEG01-RCIR-PUP10:SIG-AMPLT-AVG"; + String waveformChannel = "SINEG01-RCIR-PUP10:SIG-AMPLT"; + DAQQuery query = new DAQQuery( + new RequestRangeTime( + TimeUtils.parse("2016-10-12T14:00"), + TimeUtils.parse("2016-10-12T16:00")), + new ChannelName(scalarChannel, queryBackend), + new ChannelName(waveformChannel, queryBackend)); + query.setMapping(new Mapping()); + + final ChannelEventTable table = RestHelper.queryEventsTable(context, queryServer, query); + assertEquals("Size was " + table.size(), 11, table.size()); + final List> eventsTable = table.getEvents(Collectors.toMap( + event -> event.getChannel(), + Function.identity(), + LoggingMapMergeFunction.getInstance(), + LinkedHashMap::new)) + .collect(Collectors.toList()); + assertEquals("Size was " + eventsTable.size(), 11, eventsTable.size()); + final Map eventsRow = eventsTable.get(0); + assertEquals("Size was " + eventsRow.size(), 2, eventsRow.size()); + assertTrue(eventsRow.containsKey(scalarChannel)); + assertTrue(eventsRow.containsKey(waveformChannel)); + + StreamEvent event = eventsRow.get(scalarChannel); + assertEquals(Event.class, event.getClass()); + assertEquals(queryBackend, event.getBackend()); + assertEquals(scalarChannel, event.getChannel()); + assertEquals(1, event.getDoubleStream(ByteConverter.PARALLELISM_FALSE).count()); + assertEquals(1, event.getEventCount()); + assertEquals("2016-10-12T14:07:09.914760000+02:00", event.getGlobalDate()); + assertEquals("1970-01-01T01:00:00.000000000+01:00", event.getIocDate()); + assertEquals(638760000, event.getPulseId()); + assertArrayEquals(new int[] {1}, event.getShape()); + assertEquals(1, event.getStatistics().getCount()); + assertEquals(Type.Float64.getKey(), event.getType()); + assertEquals("9.430576E-5", event.getValueAsString()); + + event = eventsRow.get(waveformChannel); + assertEquals(Event.class, event.getClass()); + assertEquals(queryBackend, event.getBackend()); + assertEquals(waveformChannel, event.getChannel()); + assertEquals(2048, event.getDoubleStream(ByteConverter.PARALLELISM_FALSE).count()); + assertEquals(1, event.getEventCount()); + assertEquals("2016-10-12T14:07:09.914760000+02:00", event.getGlobalDate()); + assertEquals("1970-01-01T01:00:00.000000000+01:00", event.getIocDate()); + assertEquals(638760000, event.getPulseId()); + assertArrayEquals(new int[] {2048}, event.getShape()); + assertEquals(2048, event.getStatistics().getCount()); + assertEquals(Type.Int32.getKey(), event.getType()); + assertEquals( + "[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2, 3, 3, 4, 6, 6, 8, 6, 5, 3, 3, 3, 8, 12, 14... shape [2048]]", + event.getValueAsString(30)); + } + + @Test + public void testEventQueryTableAggregated_01() throws Exception { + assertNotNull(queryBackend); + assertNotNull(queryServer); + + String scalarChannel = "SINEG01-RCIR-PUP10:SIG-AMPLT-AVG"; + String waveformChannel = "SINEG01-RCIR-PUP10:SIG-AMPLT"; + DAQQuery query = new DAQQuery( + new RequestRangeTime( + TimeUtils.parse("2016-10-12T14:00"), + TimeUtils.parse("2016-10-12T16:00")), + new ChannelName(scalarChannel, queryBackend), + new ChannelName(waveformChannel, queryBackend)); + query.setAggregation(new AggregationDescriptor(AggregationType.value) + .setAggregations(Arrays.asList(Aggregation.typed))); + query.setMapping(new Mapping()); + + final ChannelEventTable table = RestHelper.queryEventsTable(context, queryServer, query); + assertEquals("Size was " + table.size(), 11, table.size()); + final List> eventsTable = table.getEvents(Collectors.toMap( + event -> event.getChannel(), + Function.identity(), + LoggingMapMergeFunction.getInstance(), + LinkedHashMap::new)) + .collect(Collectors.toList()); + assertEquals("Size was " + eventsTable.size(), 11, eventsTable.size()); + final Map eventsRow = eventsTable.get(0); + assertEquals("Size was " + eventsRow.size(), 2, eventsRow.size()); + assertTrue(eventsRow.containsKey(scalarChannel)); + assertTrue(eventsRow.containsKey(waveformChannel)); + + StreamEvent event = eventsRow.get(scalarChannel); + assertEquals(Event.class, event.getClass()); + assertEquals(queryBackend, event.getBackend()); + assertEquals(scalarChannel, event.getChannel()); + // assertEquals(1, event.getDoubleStream(ByteConverter.PARALLELISM_FALSE).count()); + assertEquals(1, event.getEventCount()); + assertEquals("2016-10-12T14:07:09.914760000+02:00", event.getGlobalDate()); + assertEquals("1970-01-01T01:00:00.000000000+01:00", event.getIocDate()); + assertEquals(638760000, event.getPulseId()); + assertArrayEquals(new int[] {1}, event.getShape()); + assertEquals(1, event.getStatistics().getCount()); + assertNull(event.getType()); + assertTrue(event.getValueAsString(30).startsWith("StorelessStatistics")); + + event = eventsRow.get(waveformChannel); + assertEquals(Event.class, event.getClass()); + assertEquals(queryBackend, event.getBackend()); + assertEquals(waveformChannel, event.getChannel()); + // assertEquals(1, event.getDoubleStream(ByteConverter.PARALLELISM_FALSE).count()); + assertEquals(1, event.getEventCount()); + assertEquals("2016-10-12T14:07:09.914760000+02:00", event.getGlobalDate()); + assertEquals("1970-01-01T01:00:00.000000000+01:00", event.getIocDate()); + assertEquals(638760000, event.getPulseId()); + assertArrayEquals(new int[] {1}, event.getShape()); + assertEquals(2048, event.getStatistics().getCount()); + assertNull(event.getType()); + assertTrue(event.getValueAsString(30).startsWith("StorelessStatistics")); + } + + @Test + public void testEventQueryTableAggregatedBin_01() throws Exception { + assertNotNull(queryBackend); + assertNotNull(queryServer); + + String scalarChannel = "SINEG01-RCIR-PUP10:SIG-AMPLT-AVG"; + String waveformChannel = "SINEG01-RCIR-PUP10:SIG-AMPLT"; + DAQQuery query = new DAQQuery( + new RequestRangeTime( + TimeUtils.parse("2016-10-12T14:00"), + TimeUtils.parse("2016-10-12T16:00")), + new ChannelName(scalarChannel, queryBackend), + new ChannelName(waveformChannel, queryBackend)); + query.setAggregation(new AggregationDescriptor(AggregationType.value) + .setNrOfBins(2) + .setAggregations(Arrays.asList(Aggregation.typed))); + query.setMapping(new Mapping()); + + int firstBinEventCount = 6; // 5; + final ChannelEventTable table = RestHelper.queryEventsTable(context, queryServer, query); + assertEquals("Size was " + table.size(), 2, table.size()); + final List> eventsTable = table.getEvents(Collectors.toMap( + event -> event.getChannel(), + Function.identity(), + LoggingMapMergeFunction.getInstance(), + LinkedHashMap::new)) + .collect(Collectors.toList()); + assertEquals("Size was " + eventsTable.size(), 2, eventsTable.size()); + final Map eventsRow = eventsTable.get(0); + assertEquals("Size was " + eventsRow.size(), 2, eventsRow.size()); + assertTrue(eventsRow.containsKey(scalarChannel)); + assertTrue(eventsRow.containsKey(waveformChannel)); + + StreamEvent event = eventsRow.get(scalarChannel); + assertEquals(Event.class, event.getClass()); + assertEquals(queryBackend, event.getBackend()); + assertEquals(scalarChannel, event.getChannel()); + // assertEquals(1, event.getDoubleStream(ByteConverter.PARALLELISM_FALSE).count()); + assertEquals(firstBinEventCount, event.getEventCount()); + assertEquals("2016-10-12T14:07:09.914760000+02:00", event.getGlobalDate()); + assertEquals("1970-01-01T01:00:00.000000000+01:00", event.getIocDate()); + assertEquals(638760000, event.getPulseId()); + assertArrayEquals(new int[] {1}, event.getShape()); + assertEquals(firstBinEventCount, event.getStatistics().getCount()); + assertNull(event.getType()); + assertTrue(event.getValueAsString(30).startsWith("StorelessStatistics")); + + event = eventsRow.get(waveformChannel); + assertEquals(Event.class, event.getClass()); + assertEquals(queryBackend, event.getBackend()); + assertEquals(waveformChannel, event.getChannel()); + // assertEquals(1, event.getDoubleStream(ByteConverter.PARALLELISM_FALSE).count()); + assertEquals(firstBinEventCount, event.getEventCount()); + assertEquals("2016-10-12T14:07:09.914760000+02:00", event.getGlobalDate()); + assertEquals("1970-01-01T01:00:00.000000000+01:00", event.getIocDate()); + assertEquals(638760000, event.getPulseId()); + assertArrayEquals(new int[] {1}, event.getShape()); + assertEquals(firstBinEventCount * 2048, event.getStatistics().getCount()); + assertNull(event.getType()); + assertTrue(event.getValueAsString(30).startsWith("StorelessStatistics")); + } + @Test public void testQueryManager_01() throws Exception { final List backends = queryManager.getBackends().collect(Collectors.toList());