diff --git a/src/main/java/ch/psi/daq/queryrest/controller/QueryRestController.java b/src/main/java/ch/psi/daq/queryrest/controller/QueryRestController.java index 16a11b9..f4e751e 100644 --- a/src/main/java/ch/psi/daq/queryrest/controller/QueryRestController.java +++ b/src/main/java/ch/psi/daq/queryrest/controller/QueryRestController.java @@ -54,10 +54,12 @@ import ch.psi.daq.domain.query.DAQConfigQueryElement; import ch.psi.daq.domain.query.DAQQueries; import ch.psi.daq.domain.query.DAQQuery; import ch.psi.daq.domain.query.DAQQueryElement; +import ch.psi.daq.domain.query.RangeQuery; import ch.psi.daq.domain.query.backend.BackendQuery; import ch.psi.daq.domain.query.channels.ChannelConfigurationsRequest; import ch.psi.daq.domain.query.channels.ChannelsRequest; import ch.psi.daq.domain.query.channels.LongHash; +import ch.psi.daq.domain.query.channels.RangeQueryResponse; import ch.psi.daq.domain.query.operation.Aggregation; import ch.psi.daq.domain.query.operation.AggregationType; import ch.psi.daq.domain.query.operation.Compression; @@ -250,11 +252,13 @@ public class QueryRestController implements ApplicationContextAware { final Supplier> channelsSupplier = () -> query.getChannels(); final String redirect = queryManager.getRedirection(channelsSupplier); if (redirect != null) { - final String redirectURL = + final String redirectURL = redirect - + httpRequest.getRequestURI() - + ((httpRequest.getQueryString() != null && StringUtils.NULL_STRING.equals(httpRequest.getQueryString())) ? StringUtils.QUESTION_MARK + httpRequest.getQueryString() - : StringUtils.EMPTY); + + httpRequest.getRequestURI() + + ((httpRequest.getQueryString() != null + && StringUtils.NULL_STRING.equals(httpRequest.getQueryString())) + ? StringUtils.QUESTION_MARK + httpRequest.getQueryString() + : StringUtils.EMPTY); LOGGER.info("Send redirect to '{}'.", redirectURL); httpResponse.setHeader(HttpHeaders.LOCATION, redirectURL); @@ -433,11 +437,13 @@ public class QueryRestController implements ApplicationContextAware { .collect(Collectors.toList()); final String redirect = queryManager.getRedirection(channelsSupplier); if (redirect != null) { - final String redirectURL = + final String redirectURL = redirect - + httpRequest.getRequestURI() - + ((httpRequest.getQueryString() != null && StringUtils.NULL_STRING.equals(httpRequest.getQueryString())) ? StringUtils.QUESTION_MARK + httpRequest.getQueryString() - : StringUtils.EMPTY); + + httpRequest.getRequestURI() + + ((httpRequest.getQueryString() != null + && StringUtils.NULL_STRING.equals(httpRequest.getQueryString())) + ? StringUtils.QUESTION_MARK + httpRequest.getQueryString() + : StringUtils.EMPTY); LOGGER.info("Send redirect to '{}'.", redirectURL); httpResponse.setHeader(HttpHeaders.LOCATION, redirectURL); @@ -479,6 +485,48 @@ public class QueryRestController implements ApplicationContextAware { } } + @RequestMapping( + value = DomainConfig.PATH_QUERY_RANGE, + method = RequestMethod.POST, + consumes = {MediaType.APPLICATION_JSON_VALUE}) + public Stream queryRange( + @RequestBody @Valid final RangeQuery rangeQuery, + final HttpServletRequest httpRequest, + final HttpServletResponse httpResponse) throws Exception { + + final Response response = rangeQuery.getResponseOrDefault(defaultResponse); + if (response.isAllowRedirect()) { + // Do a redirection if only one backend is requested + // + final Supplier> channelsSupplier = () -> rangeQuery.getChannels(); + final String redirect = queryManager.getRedirection(channelsSupplier); + if (redirect != null) { + final String redirectURL = + redirect + + httpRequest.getRequestURI() + + ((httpRequest.getQueryString() != null + && StringUtils.NULL_STRING.equals(httpRequest.getQueryString())) + ? StringUtils.QUESTION_MARK + httpRequest.getQueryString() + : StringUtils.EMPTY); + + LOGGER.info("Send redirect to '{}'.", redirectURL); + httpResponse.setHeader(HttpHeaders.LOCATION, redirectURL); + // use 307 - works for POST too + httpResponse.setStatus(HttpServletResponse.SC_TEMPORARY_REDIRECT); + return Stream.empty(); + } + } + + try { + LOGGER.debug("Executing range query '{}'", rangeQuery); + + return queryManager.queryRange(rangeQuery); + } catch (Exception e) { + LOGGER.error("Failed to execute range query '{}'.", rangeQuery, e); + throw e; + } + } + /** * Returns the current list of {@link Ordering}s available. * diff --git a/src/main/java/ch/psi/daq/queryrest/query/AbstractQueryManager.java b/src/main/java/ch/psi/daq/queryrest/query/AbstractQueryManager.java index 96ac39c..3c7159d 100644 --- a/src/main/java/ch/psi/daq/queryrest/query/AbstractQueryManager.java +++ b/src/main/java/ch/psi/daq/queryrest/query/AbstractQueryManager.java @@ -1,25 +1,37 @@ package ch.psi.daq.queryrest.query; import java.util.ArrayList; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.commons.lang3.tuple.Pair; import org.apache.commons.lang3.tuple.Triple; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import com.google.common.collect.Streams; + +import ch.psi.daq.common.ordering.Ordering; import ch.psi.daq.common.tuple.Quadruple; import ch.psi.daq.domain.DataEvent; +import ch.psi.daq.domain.PulseIdTime; import ch.psi.daq.domain.backend.Backend; +import ch.psi.daq.domain.backend.BackendAccess; +import ch.psi.daq.domain.config.DomainConfig; import ch.psi.daq.domain.events.ChannelConfiguration; import ch.psi.daq.domain.json.ChannelName; import ch.psi.daq.domain.query.DAQConfigQuery; import ch.psi.daq.domain.query.DAQConfigQueryElement; import ch.psi.daq.domain.query.DAQQueries; import ch.psi.daq.domain.query.DAQQueryElement; +import ch.psi.daq.domain.query.RangeQuery; import ch.psi.daq.domain.query.backend.BackendQuery; import ch.psi.daq.domain.query.backend.BackendQueryImpl; import ch.psi.daq.domain.query.backend.analyzer.BackendQueryAnalyzer; @@ -29,9 +41,14 @@ import ch.psi.daq.domain.query.channels.ChannelConfigurationsResponse; import ch.psi.daq.domain.query.channels.ChannelsRequest; import ch.psi.daq.domain.query.channels.ChannelsResponse; import ch.psi.daq.domain.query.channels.LongHash; +import ch.psi.daq.domain.query.channels.RangeQueryResponse; +import ch.psi.daq.domain.query.operation.EventField; import ch.psi.daq.domain.query.processor.QueryProcessor; +import ch.psi.daq.domain.reader.RequestRangeQueryResult; public abstract class AbstractQueryManager implements QueryManager { + private static final Logger LOGGER = LoggerFactory.getLogger(AbstractQueryManager.class); + private BackendsChannelConfigurationCache channelsCache; private Function queryAnalizerFactory; @@ -176,4 +193,107 @@ public abstract class AbstractQueryManager implements QueryManager { return results; } + @Override + public Stream queryRange(final RangeQuery rangeQuery) throws Exception { + if (rangeQuery.getChannels().isEmpty()) { + // query for the general (not channel specific) pulseId/globalTime + final BackendAccess backendAccess = getConfigurationCache().getDefaultBackend().getBackendAccess(); + if (backendAccess.hasStreamEventReader()) { + final CompletableFuture future = + backendAccess.getStreamEventReader().getPulseIdTimeMappingAsync(rangeQuery.getRange()); + final RequestRangeQueryResult result = future.get( + backendAccess.getBackend().getApplicationContext() + .getBean(DomainConfig.BEAN_NAME_DISTRIBUTED_READ_TIMEOUT, Integer.class), + TimeUnit.SECONDS); + return Stream.of( + new RangeQueryResponse( + new ChannelName(ChannelConfiguration.COMBINED_CHANNEL_NAME, backendAccess.getBackend()), + new PulseIdTime(result.getStart(), false), + new PulseIdTime(result.getEnd(), false))); + } else { + LOGGER.warn("Backend '{}' has no StreamEventReader. Cannot execute '{}'.", backendAccess.getBackend(), + rangeQuery); + return Stream.of( + new RangeQueryResponse( + new ChannelName(ChannelConfiguration.COMBINED_CHANNEL_NAME, backendAccess.getBackend()), + null, + null)); + } + } else { + // set backends if not defined yet + getConfigurationCache().configureBackends(rangeQuery.getChannels()); + + final LinkedHashSet eventFields = new LinkedHashSet<>(2); + eventFields.add(EventField.pulseId); + eventFields.add(EventField.globalTime); + eventFields.add(EventField.globalDate); + eventFields.add(EventField.globalSeconds); + rangeQuery.setAggregation(null); + rangeQuery.setEventFields(eventFields); + rangeQuery.setMapping(null); + rangeQuery.setValueTransformations(null); + rangeQuery.setLimit(1); + rangeQuery.setOrdering(Ordering.asc); + + Stream, Stream>> resultStreams = + BackendQueryImpl + .getBackendQueries(rangeQuery) + .stream() + .filter( + query -> { + return query.getBackend().getBackendAccess().hasDataReader() + && query.getBackend().getBackendAccess().hasQueryProcessor(); + }) + .flatMap( + query -> { + final BackendQueryImpl queryForward = new BackendQueryImpl(query); + // queryForward.setAggregation(null); + // queryForward.setEventFields(eventFields); + // queryForward.setMapping(null); + // queryForward.setValueTransformations(null); + // queryForward.setLimit(1); + // queryForward.setOrdering(Ordering.asc); + + final QueryProcessor processor = + queryForward.getBackend().getBackendAccess().getQueryProcessor(); + + final BackendQueryAnalyzer queryAnalizerForward = + queryAnalizerFactory.apply(queryForward); + final Stream>> channelToDataEventForward = + processor.process(queryAnalizerForward); + + final BackendQueryImpl queryBackwards = new BackendQueryImpl(queryForward); + queryBackwards.setOrdering(Ordering.desc); + final BackendQueryAnalyzer queryAnalizerBackwards = + queryAnalizerFactory.apply(queryForward); + final Stream>> channelToDataEventBackwards = + processor.process(queryAnalizerBackwards); + + return Streams.zip( + channelToDataEventForward, + channelToDataEventBackwards, + (entry1, entry2) -> { + return Triple.of( + entry1.getKey(), + entry1.getValue(), + entry2.getValue()); + // return Triple.of( + // entry1.getKey(), + // Streams.concat(entry1.getValue(), entry2.getValue())); + }); + }); + + // make sure queries are executed (Streams populate async) + resultStreams = resultStreams.collect(Collectors.toList()).stream(); + + // now access internal streams (which will block until data is available) + return resultStreams + .map(triple -> { + return new RangeQueryResponse( + triple.getLeft(), + new PulseIdTime(triple.getMiddle().findFirst().orElse(null), false), + new PulseIdTime(triple.getRight().findFirst().orElse(null), false)); + }); + } + } } diff --git a/src/main/java/ch/psi/daq/queryrest/query/QueryManager.java b/src/main/java/ch/psi/daq/queryrest/query/QueryManager.java index c4691f7..e093431 100644 --- a/src/main/java/ch/psi/daq/queryrest/query/QueryManager.java +++ b/src/main/java/ch/psi/daq/queryrest/query/QueryManager.java @@ -16,12 +16,14 @@ import ch.psi.daq.domain.query.DAQConfigQuery; import ch.psi.daq.domain.query.DAQConfigQueryElement; import ch.psi.daq.domain.query.DAQQueries; import ch.psi.daq.domain.query.DAQQueryElement; +import ch.psi.daq.domain.query.RangeQuery; import ch.psi.daq.domain.query.backend.BackendQuery; import ch.psi.daq.domain.query.channels.ChannelConfigurationsRequest; import ch.psi.daq.domain.query.channels.ChannelConfigurationsResponse; import ch.psi.daq.domain.query.channels.ChannelsRequest; import ch.psi.daq.domain.query.channels.ChannelsResponse; import ch.psi.daq.domain.query.channels.LongHash; +import ch.psi.daq.domain.query.channels.RangeQueryResponse; public interface QueryManager { @@ -61,4 +63,6 @@ public interface QueryManager { List>>> queryEvents( final DAQQueries queries) throws Exception; + + Stream queryRange(final RangeQuery rangeQuery) throws Exception; } diff --git a/src/main/java/ch/psi/daq/queryrest/query/QueryManagerRemote.java b/src/main/java/ch/psi/daq/queryrest/query/QueryManagerRemote.java index d12fbd8..048a76e 100644 --- a/src/main/java/ch/psi/daq/queryrest/query/QueryManagerRemote.java +++ b/src/main/java/ch/psi/daq/queryrest/query/QueryManagerRemote.java @@ -30,11 +30,13 @@ import ch.psi.daq.domain.json.ChannelName; import ch.psi.daq.domain.query.DAQQueries; import ch.psi.daq.domain.query.DAQQuery; import ch.psi.daq.domain.query.DAQQueryElement; +import ch.psi.daq.domain.query.RangeQuery; import ch.psi.daq.domain.query.backend.BackendQuery; import ch.psi.daq.domain.query.backend.BackendQueryImpl; import ch.psi.daq.domain.query.backend.analyzer.BackendQueryAnalyzer; import ch.psi.daq.domain.query.channels.BackendsChannelConfigurationCache; import ch.psi.daq.domain.query.channels.ChannelConfigurationLoader; +import ch.psi.daq.domain.query.channels.RangeQueryResponse; import ch.psi.daq.domain.query.mapping.IncompleteStrategy; import ch.psi.daq.domain.query.response.Response; import ch.psi.daq.domain.query.response.ResponseFormat; @@ -212,4 +214,9 @@ public class QueryManagerRemote extends AbstractQueryManager implements Applicat return results; } + + @Override + public Stream queryRange(final RangeQuery rangeQuery) throws Exception { + throw new UnsupportedOperationException("Not yet implemented."); + } }