PulseId-GlobalTime mapping.

This commit is contained in:
Fabian Märki
2019-03-11 17:58:48 +01:00
parent 7366d87fbc
commit 8b44ceb69f
4 changed files with 187 additions and 8 deletions

View File

@ -54,10 +54,12 @@ import ch.psi.daq.domain.query.DAQConfigQueryElement;
import ch.psi.daq.domain.query.DAQQueries;
import ch.psi.daq.domain.query.DAQQuery;
import ch.psi.daq.domain.query.DAQQueryElement;
import ch.psi.daq.domain.query.RangeQuery;
import ch.psi.daq.domain.query.backend.BackendQuery;
import ch.psi.daq.domain.query.channels.ChannelConfigurationsRequest;
import ch.psi.daq.domain.query.channels.ChannelsRequest;
import ch.psi.daq.domain.query.channels.LongHash;
import ch.psi.daq.domain.query.channels.RangeQueryResponse;
import ch.psi.daq.domain.query.operation.Aggregation;
import ch.psi.daq.domain.query.operation.AggregationType;
import ch.psi.daq.domain.query.operation.Compression;
@ -250,11 +252,13 @@ public class QueryRestController implements ApplicationContextAware {
final Supplier<Collection<ChannelName>> channelsSupplier = () -> query.getChannels();
final String redirect = queryManager.getRedirection(channelsSupplier);
if (redirect != null) {
final String redirectURL =
final String redirectURL =
redirect
+ httpRequest.getRequestURI()
+ ((httpRequest.getQueryString() != null && StringUtils.NULL_STRING.equals(httpRequest.getQueryString())) ? StringUtils.QUESTION_MARK + httpRequest.getQueryString()
: StringUtils.EMPTY);
+ httpRequest.getRequestURI()
+ ((httpRequest.getQueryString() != null
&& StringUtils.NULL_STRING.equals(httpRequest.getQueryString()))
? StringUtils.QUESTION_MARK + httpRequest.getQueryString()
: StringUtils.EMPTY);
LOGGER.info("Send redirect to '{}'.", redirectURL);
httpResponse.setHeader(HttpHeaders.LOCATION, redirectURL);
@ -433,11 +437,13 @@ public class QueryRestController implements ApplicationContextAware {
.collect(Collectors.toList());
final String redirect = queryManager.getRedirection(channelsSupplier);
if (redirect != null) {
final String redirectURL =
final String redirectURL =
redirect
+ httpRequest.getRequestURI()
+ ((httpRequest.getQueryString() != null && StringUtils.NULL_STRING.equals(httpRequest.getQueryString())) ? StringUtils.QUESTION_MARK + httpRequest.getQueryString()
: StringUtils.EMPTY);
+ httpRequest.getRequestURI()
+ ((httpRequest.getQueryString() != null
&& StringUtils.NULL_STRING.equals(httpRequest.getQueryString()))
? StringUtils.QUESTION_MARK + httpRequest.getQueryString()
: StringUtils.EMPTY);
LOGGER.info("Send redirect to '{}'.", redirectURL);
httpResponse.setHeader(HttpHeaders.LOCATION, redirectURL);
@ -479,6 +485,48 @@ public class QueryRestController implements ApplicationContextAware {
}
}
@RequestMapping(
value = DomainConfig.PATH_QUERY_RANGE,
method = RequestMethod.POST,
consumes = {MediaType.APPLICATION_JSON_VALUE})
public Stream<RangeQueryResponse> queryRange(
@RequestBody @Valid final RangeQuery rangeQuery,
final HttpServletRequest httpRequest,
final HttpServletResponse httpResponse) throws Exception {
final Response response = rangeQuery.getResponseOrDefault(defaultResponse);
if (response.isAllowRedirect()) {
// Do a redirection if only one backend is requested
//
final Supplier<Collection<ChannelName>> channelsSupplier = () -> rangeQuery.getChannels();
final String redirect = queryManager.getRedirection(channelsSupplier);
if (redirect != null) {
final String redirectURL =
redirect
+ httpRequest.getRequestURI()
+ ((httpRequest.getQueryString() != null
&& StringUtils.NULL_STRING.equals(httpRequest.getQueryString()))
? StringUtils.QUESTION_MARK + httpRequest.getQueryString()
: StringUtils.EMPTY);
LOGGER.info("Send redirect to '{}'.", redirectURL);
httpResponse.setHeader(HttpHeaders.LOCATION, redirectURL);
// use 307 - works for POST too
httpResponse.setStatus(HttpServletResponse.SC_TEMPORARY_REDIRECT);
return Stream.empty();
}
}
try {
LOGGER.debug("Executing range query '{}'", rangeQuery);
return queryManager.queryRange(rangeQuery);
} catch (Exception e) {
LOGGER.error("Failed to execute range query '{}'.", rangeQuery, e);
throw e;
}
}
/**
* Returns the current list of {@link Ordering}s available.
*

View File

@ -1,25 +1,37 @@
package ch.psi.daq.queryrest.query;
import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.commons.lang3.tuple.Triple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.Streams;
import ch.psi.daq.common.ordering.Ordering;
import ch.psi.daq.common.tuple.Quadruple;
import ch.psi.daq.domain.DataEvent;
import ch.psi.daq.domain.PulseIdTime;
import ch.psi.daq.domain.backend.Backend;
import ch.psi.daq.domain.backend.BackendAccess;
import ch.psi.daq.domain.config.DomainConfig;
import ch.psi.daq.domain.events.ChannelConfiguration;
import ch.psi.daq.domain.json.ChannelName;
import ch.psi.daq.domain.query.DAQConfigQuery;
import ch.psi.daq.domain.query.DAQConfigQueryElement;
import ch.psi.daq.domain.query.DAQQueries;
import ch.psi.daq.domain.query.DAQQueryElement;
import ch.psi.daq.domain.query.RangeQuery;
import ch.psi.daq.domain.query.backend.BackendQuery;
import ch.psi.daq.domain.query.backend.BackendQueryImpl;
import ch.psi.daq.domain.query.backend.analyzer.BackendQueryAnalyzer;
@ -29,9 +41,14 @@ import ch.psi.daq.domain.query.channels.ChannelConfigurationsResponse;
import ch.psi.daq.domain.query.channels.ChannelsRequest;
import ch.psi.daq.domain.query.channels.ChannelsResponse;
import ch.psi.daq.domain.query.channels.LongHash;
import ch.psi.daq.domain.query.channels.RangeQueryResponse;
import ch.psi.daq.domain.query.operation.EventField;
import ch.psi.daq.domain.query.processor.QueryProcessor;
import ch.psi.daq.domain.reader.RequestRangeQueryResult;
public abstract class AbstractQueryManager implements QueryManager {
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractQueryManager.class);
private BackendsChannelConfigurationCache channelsCache;
private Function<BackendQuery, BackendQueryAnalyzer> queryAnalizerFactory;
@ -176,4 +193,107 @@ public abstract class AbstractQueryManager implements QueryManager {
return results;
}
@Override
public Stream<RangeQueryResponse> queryRange(final RangeQuery rangeQuery) throws Exception {
if (rangeQuery.getChannels().isEmpty()) {
// query for the general (not channel specific) pulseId/globalTime
final BackendAccess backendAccess = getConfigurationCache().getDefaultBackend().getBackendAccess();
if (backendAccess.hasStreamEventReader()) {
final CompletableFuture<RequestRangeQueryResult> future =
backendAccess.getStreamEventReader().getPulseIdTimeMappingAsync(rangeQuery.getRange());
final RequestRangeQueryResult result = future.get(
backendAccess.getBackend().getApplicationContext()
.getBean(DomainConfig.BEAN_NAME_DISTRIBUTED_READ_TIMEOUT, Integer.class),
TimeUnit.SECONDS);
return Stream.of(
new RangeQueryResponse(
new ChannelName(ChannelConfiguration.COMBINED_CHANNEL_NAME, backendAccess.getBackend()),
new PulseIdTime(result.getStart(), false),
new PulseIdTime(result.getEnd(), false)));
} else {
LOGGER.warn("Backend '{}' has no StreamEventReader. Cannot execute '{}'.", backendAccess.getBackend(),
rangeQuery);
return Stream.of(
new RangeQueryResponse(
new ChannelName(ChannelConfiguration.COMBINED_CHANNEL_NAME, backendAccess.getBackend()),
null,
null));
}
} else {
// set backends if not defined yet
getConfigurationCache().configureBackends(rangeQuery.getChannels());
final LinkedHashSet<EventField> eventFields = new LinkedHashSet<>(2);
eventFields.add(EventField.pulseId);
eventFields.add(EventField.globalTime);
eventFields.add(EventField.globalDate);
eventFields.add(EventField.globalSeconds);
rangeQuery.setAggregation(null);
rangeQuery.setEventFields(eventFields);
rangeQuery.setMapping(null);
rangeQuery.setValueTransformations(null);
rangeQuery.setLimit(1);
rangeQuery.setOrdering(Ordering.asc);
Stream<Triple<ChannelName, Stream<? extends DataEvent>, Stream<? extends DataEvent>>> resultStreams =
BackendQueryImpl
.getBackendQueries(rangeQuery)
.stream()
.filter(
query -> {
return query.getBackend().getBackendAccess().hasDataReader()
&& query.getBackend().getBackendAccess().hasQueryProcessor();
})
.flatMap(
query -> {
final BackendQueryImpl queryForward = new BackendQueryImpl(query);
// queryForward.setAggregation(null);
// queryForward.setEventFields(eventFields);
// queryForward.setMapping(null);
// queryForward.setValueTransformations(null);
// queryForward.setLimit(1);
// queryForward.setOrdering(Ordering.asc);
final QueryProcessor processor =
queryForward.getBackend().getBackendAccess().getQueryProcessor();
final BackendQueryAnalyzer queryAnalizerForward =
queryAnalizerFactory.apply(queryForward);
final Stream<Entry<ChannelName, Stream<? extends DataEvent>>> channelToDataEventForward =
processor.process(queryAnalizerForward);
final BackendQueryImpl queryBackwards = new BackendQueryImpl(queryForward);
queryBackwards.setOrdering(Ordering.desc);
final BackendQueryAnalyzer queryAnalizerBackwards =
queryAnalizerFactory.apply(queryForward);
final Stream<Entry<ChannelName, Stream<? extends DataEvent>>> channelToDataEventBackwards =
processor.process(queryAnalizerBackwards);
return Streams.zip(
channelToDataEventForward,
channelToDataEventBackwards,
(entry1, entry2) -> {
return Triple.of(
entry1.getKey(),
entry1.getValue(),
entry2.getValue());
// return Triple.of(
// entry1.getKey(),
// Streams.concat(entry1.getValue(), entry2.getValue()));
});
});
// make sure queries are executed (Streams populate async)
resultStreams = resultStreams.collect(Collectors.toList()).stream();
// now access internal streams (which will block until data is available)
return resultStreams
.map(triple -> {
return new RangeQueryResponse(
triple.getLeft(),
new PulseIdTime(triple.getMiddle().findFirst().orElse(null), false),
new PulseIdTime(triple.getRight().findFirst().orElse(null), false));
});
}
}
}

View File

@ -16,12 +16,14 @@ import ch.psi.daq.domain.query.DAQConfigQuery;
import ch.psi.daq.domain.query.DAQConfigQueryElement;
import ch.psi.daq.domain.query.DAQQueries;
import ch.psi.daq.domain.query.DAQQueryElement;
import ch.psi.daq.domain.query.RangeQuery;
import ch.psi.daq.domain.query.backend.BackendQuery;
import ch.psi.daq.domain.query.channels.ChannelConfigurationsRequest;
import ch.psi.daq.domain.query.channels.ChannelConfigurationsResponse;
import ch.psi.daq.domain.query.channels.ChannelsRequest;
import ch.psi.daq.domain.query.channels.ChannelsResponse;
import ch.psi.daq.domain.query.channels.LongHash;
import ch.psi.daq.domain.query.channels.RangeQueryResponse;
public interface QueryManager {
@ -61,4 +63,6 @@ public interface QueryManager {
List<Entry<DAQQueryElement, Stream<Quadruple<BackendQuery, ChannelName, ?, ?>>>> queryEvents(
final DAQQueries queries)
throws Exception;
Stream<RangeQueryResponse> queryRange(final RangeQuery rangeQuery) throws Exception;
}

View File

@ -30,11 +30,13 @@ import ch.psi.daq.domain.json.ChannelName;
import ch.psi.daq.domain.query.DAQQueries;
import ch.psi.daq.domain.query.DAQQuery;
import ch.psi.daq.domain.query.DAQQueryElement;
import ch.psi.daq.domain.query.RangeQuery;
import ch.psi.daq.domain.query.backend.BackendQuery;
import ch.psi.daq.domain.query.backend.BackendQueryImpl;
import ch.psi.daq.domain.query.backend.analyzer.BackendQueryAnalyzer;
import ch.psi.daq.domain.query.channels.BackendsChannelConfigurationCache;
import ch.psi.daq.domain.query.channels.ChannelConfigurationLoader;
import ch.psi.daq.domain.query.channels.RangeQueryResponse;
import ch.psi.daq.domain.query.mapping.IncompleteStrategy;
import ch.psi.daq.domain.query.response.Response;
import ch.psi.daq.domain.query.response.ResponseFormat;
@ -212,4 +214,9 @@ public class QueryManagerRemote extends AbstractQueryManager implements Applicat
return results;
}
@Override
public Stream<RangeQueryResponse> queryRange(final RangeQuery rangeQuery) throws Exception {
throw new UnsupportedOperationException("Not yet implemented.");
}
}