diff --git a/Readme.md b/Readme.md
index 0abb712..9c6c485 100644
--- a/Readme.md
+++ b/Readme.md
@@ -1483,3 +1483,59 @@ The query format is equivalent to the data query (see [here](Readme.md#query_dat
```bash
curl -H "Content-Type: application/json" -X POST -d '{"range":{"startDate":"2017-01-01T01:00:00.000","endDate":"2017-11-01T01:00:00.030"},"channels":["Channel_01"]}' https://data-api.psi.ch/sf/query/config | python -m json.tool
```
+
+
+
+
+### Query Pulse-Id Time Mapping
+
+### Request
+
+```
+POST https://:/query/mapping
+```
+
+#### Request body
+
+A request is performed by sending a valid JSON object in the HTTP request body. The JSON query defines the channels to be queried and the range. In case no channels are defined the global pulseId-time mapping will be queried (otherwise the channel specific mapping - e.g. some events might have been delete).
+
+#### Data
+
+```json
+{
+ "channels":[
+ "Channel_01"
+ ],
+ "range":{
+ "startPulseId":0,
+ "endPulseId":3
+ },
+ "ordering":"asc",
+ "eventFields":[
+ "pulseId",
+ "globalDate"
+ ],
+ "response":{
+ "format":"json",
+ "compression":"none",
+ "allowRedirect":true
+ }
+}
+```
+
+##### Explanation
+
+The query format is equivalent to the data query (see [here](Readme.md#query_data) for further explanations - be aware that the url is not equivalent).
+
+##### Command
+
+```bash
+curl -H "Content-Type: application/json" -X POST -d '{"range":{"startDate":"2017-01-01T01:00:00.000","endDate":"2017-11-01T01:00:00.030"},"channels":["Channel_01"]}' https://data-api.psi.ch/sf/query/mapping | python -m json.tool
+```
+
+or
+
+```bash
+curl -H "Content-Type: application/json" -X POST -d '{"range":{"startPulseId":4,"endPulseId":20},"eventFields":["pulseId","globalSeconds"]}' https://data-api.psi.ch/sf/query/mapping | python -m json.tool
+```
+
diff --git a/src/main/java/ch/psi/daq/queryrest/config/QueryRestConfig.java b/src/main/java/ch/psi/daq/queryrest/config/QueryRestConfig.java
index 8c9f43e..970b83d 100644
--- a/src/main/java/ch/psi/daq/queryrest/config/QueryRestConfig.java
+++ b/src/main/java/ch/psi/daq/queryrest/config/QueryRestConfig.java
@@ -39,6 +39,7 @@ import com.fasterxml.jackson.dataformat.smile.SmileFactory;
import ch.psi.daq.common.statistic.Statistics;
import ch.psi.daq.domain.DataEvent;
+import ch.psi.daq.domain.PulseIdTime;
import ch.psi.daq.domain.backend.Backend;
import ch.psi.daq.domain.backend.BackendType;
import ch.psi.daq.domain.backend.DomainBackendType;
@@ -163,6 +164,7 @@ public class QueryRestConfig { // extends WebMvcConfigurerAdapter {
objectMapper.addMixIn(Statistics.class, PropertyFilterMixin.class);
objectMapper.addMixIn(AbstractExtremaMeta.class, PropertyFilterMixin.class);
objectMapper.addMixIn(EnumMap.class, PropertyFilterMixin.class);
+ objectMapper.addMixIn(PulseIdTime.class, PropertyFilterMixin.class);
objectMapper.addMixIn(ChannelConfiguration.class, ChannelConfigurationPropertyFilterMixin.class);
diff --git a/src/main/java/ch/psi/daq/queryrest/controller/QueryRestController.java b/src/main/java/ch/psi/daq/queryrest/controller/QueryRestController.java
index f4e751e..5131e96 100644
--- a/src/main/java/ch/psi/daq/queryrest/controller/QueryRestController.java
+++ b/src/main/java/ch/psi/daq/queryrest/controller/QueryRestController.java
@@ -486,14 +486,14 @@ public class QueryRestController implements ApplicationContextAware {
}
@RequestMapping(
- value = DomainConfig.PATH_QUERY_RANGE,
+ value = DomainConfig.PATH_QUERY_MAPPING,
method = RequestMethod.POST,
consumes = {MediaType.APPLICATION_JSON_VALUE})
- public Stream queryRange(
+ public void queryRange(
@RequestBody @Valid final RangeQuery rangeQuery,
final HttpServletRequest httpRequest,
final HttpServletResponse httpResponse) throws Exception {
-
+
final Response response = rangeQuery.getResponseOrDefault(defaultResponse);
if (response.isAllowRedirect()) {
// Do a redirection if only one backend is requested
@@ -513,14 +513,34 @@ public class QueryRestController implements ApplicationContextAware {
httpResponse.setHeader(HttpHeaders.LOCATION, redirectURL);
// use 307 - works for POST too
httpResponse.setStatus(HttpServletResponse.SC_TEMPORARY_REDIRECT);
- return Stream.empty();
+ return;
}
}
-
- try {
- LOGGER.debug("Executing range query '{}'", rangeQuery);
- return queryManager.queryRange(rangeQuery);
+
+ try {
+ if (response instanceof AbstractHTTPResponse) {
+ LOGGER.debug("Executing range query '{}'", rangeQuery);
+ final AbstractHTTPResponse httpRes = ((AbstractHTTPResponse) response);
+
+ httpRes.validateQuery(rangeQuery);
+ // execute query
+ final Stream result = queryManager.queryRange(rangeQuery);
+
+ httpRes.respond(
+ context,
+ httpResponse,
+ rangeQuery,
+ result);
+ } else {
+ final String message =
+ String.format(
+ "Expecting Response of type '%s' but received '%s'. Check JSON deserialization defined in '%s'",
+ AbstractHTTPResponse.class.getName(), response.getClass().getName(),
+ PolymorphicResponseMixIn.class.getName());
+ LOGGER.error(message);
+ throw new IllegalArgumentException(message);
+ }
} catch (Exception e) {
LOGGER.error("Failed to execute range query '{}'.", rangeQuery, e);
throw e;
diff --git a/src/main/java/ch/psi/daq/queryrest/query/AbstractQueryManager.java b/src/main/java/ch/psi/daq/queryrest/query/AbstractQueryManager.java
index 3c7159d..5ca7dd7 100644
--- a/src/main/java/ch/psi/daq/queryrest/query/AbstractQueryManager.java
+++ b/src/main/java/ch/psi/daq/queryrest/query/AbstractQueryManager.java
@@ -1,7 +1,6 @@
package ch.psi.daq.queryrest.query;
import java.util.ArrayList;
-import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -42,7 +41,6 @@ import ch.psi.daq.domain.query.channels.ChannelsRequest;
import ch.psi.daq.domain.query.channels.ChannelsResponse;
import ch.psi.daq.domain.query.channels.LongHash;
import ch.psi.daq.domain.query.channels.RangeQueryResponse;
-import ch.psi.daq.domain.query.operation.EventField;
import ch.psi.daq.domain.query.processor.QueryProcessor;
import ch.psi.daq.domain.reader.RequestRangeQueryResult;
@@ -111,21 +109,29 @@ public abstract class AbstractQueryManager implements QueryManager {
BackendQueryImpl
.getBackendQueries(daqQuery)
.stream()
- .filter(
- query -> query.getBackend().getBackendAccess().hasDataReader())
.flatMap(
query -> {
- /* all the magic happens here */
- final Map> channelToConfig =
- query.getChannelConfigurations();
+ if (query.getBackend().getBackendAccess().hasDataReader()) {
+ /* all the magic happens here */
+ final Map> channelToConfig =
+ query.getChannelConfigurations();
- return channelToConfig.entrySet().stream()
- .map(entry -> {
- return Triple.of(
- query,
- new ChannelName(entry.getKey(), query.getBackend()),
- entry.getValue());
- });
+ return channelToConfig.entrySet().stream()
+ .map(entry -> {
+ return Triple.of(
+ query,
+ new ChannelName(entry.getKey(), query.getBackend()),
+ entry.getValue());
+ });
+ } else {
+ return query.getChannels().stream()
+ .map(channel -> {
+ return Triple.of(
+ query,
+ new ChannelName(channel, query.getBackend()),
+ Stream.empty());
+ });
+ }
});
return Pair.of(daqQuery, resultStreams);
@@ -147,38 +153,46 @@ public abstract class AbstractQueryManager implements QueryManager {
BackendQueryImpl
.getBackendQueries(queryElement)
.stream()
- .filter(
- query -> {
- return query.getBackend().getBackendAccess().hasDataReader()
- && query.getBackend().getBackendAccess().hasQueryProcessor();
- })
.flatMap(
query -> {
- final QueryProcessor processor =
- query.getBackend().getBackendAccess().getQueryProcessor();
- final BackendQueryAnalyzer queryAnalizer = queryAnalizerFactory.apply(query);
+ if (query.getBackend().getBackendAccess().hasDataReader()
+ && query.getBackend().getBackendAccess().hasQueryProcessor()) {
- // ChannelEvent query
- /* all the magic happens here */
- final Stream>> channelToDataEvents =
- processor.process(queryAnalizer);
- /* do post-process */
- final Stream> channelToData =
- queryAnalizer.postProcess(channelToDataEvents);
+ final QueryProcessor processor =
+ query.getBackend().getBackendAccess().getQueryProcessor();
+ final BackendQueryAnalyzer queryAnalizer = queryAnalizerFactory.apply(query);
- // ChannelConfig query
- final BackendQuery configQuery =
- new BackendQueryImpl(query, queryElement.getConfigFields());
- final Map> channelToConfig =
- configQuery.getChannelConfigurations();
+ // ChannelEvent query
+ /* all the magic happens here */
+ final Stream>> channelToDataEvents =
+ processor.process(queryAnalizer);
+ /* do post-process */
+ final Stream> channelToData =
+ queryAnalizer.postProcess(channelToDataEvents);
- return channelToData.map(entry -> {
- return Quadruple.of(
- query,
- entry.getKey(),
- channelToConfig.get(entry.getKey().getName()),
- entry.getValue());
- });
+ // ChannelConfig query
+ final BackendQuery configQuery =
+ new BackendQueryImpl(query, queryElement.getConfigFields());
+ final Map> channelToConfig =
+ configQuery.getChannelConfigurations();
+
+ return channelToData.map(entry -> {
+ return Quadruple.of(
+ query,
+ entry.getKey(),
+ channelToConfig.get(entry.getKey().getName()),
+ entry.getValue());
+ });
+ } else {
+ return query.getChannels().stream()
+ .map(channel -> {
+ return Quadruple.of(
+ query,
+ new ChannelName(channel, query.getBackend()),
+ Stream.empty(),
+ Stream.empty());
+ });
+ }
});
// Now we have a stream that loads elements sequential BackendQuery by BackendQuery.
@@ -198,9 +212,9 @@ public abstract class AbstractQueryManager implements QueryManager {
if (rangeQuery.getChannels().isEmpty()) {
// query for the general (not channel specific) pulseId/globalTime
final BackendAccess backendAccess = getConfigurationCache().getDefaultBackend().getBackendAccess();
- if (backendAccess.hasStreamEventReader()) {
+ if (backendAccess.hasDataReader()) {
final CompletableFuture future =
- backendAccess.getStreamEventReader().getPulseIdTimeMappingAsync(rangeQuery.getRange());
+ backendAccess.getDataReader().getPulseIdTimeMappingAsync(rangeQuery.getRange());
final RequestRangeQueryResult result = future.get(
backendAccess.getBackend().getApplicationContext()
.getBean(DomainConfig.BEAN_NAME_DISTRIBUTED_READ_TIMEOUT, Integer.class),
@@ -223,13 +237,7 @@ public abstract class AbstractQueryManager implements QueryManager {
// set backends if not defined yet
getConfigurationCache().configureBackends(rangeQuery.getChannels());
- final LinkedHashSet eventFields = new LinkedHashSet<>(2);
- eventFields.add(EventField.pulseId);
- eventFields.add(EventField.globalTime);
- eventFields.add(EventField.globalDate);
- eventFields.add(EventField.globalSeconds);
rangeQuery.setAggregation(null);
- rangeQuery.setEventFields(eventFields);
rangeQuery.setMapping(null);
rangeQuery.setValueTransformations(null);
rangeQuery.setLimit(1);
@@ -239,48 +247,54 @@ public abstract class AbstractQueryManager implements QueryManager {
BackendQueryImpl
.getBackendQueries(rangeQuery)
.stream()
- .filter(
- query -> {
- return query.getBackend().getBackendAccess().hasDataReader()
- && query.getBackend().getBackendAccess().hasQueryProcessor();
- })
.flatMap(
query -> {
- final BackendQueryImpl queryForward = new BackendQueryImpl(query);
- // queryForward.setAggregation(null);
- // queryForward.setEventFields(eventFields);
- // queryForward.setMapping(null);
- // queryForward.setValueTransformations(null);
- // queryForward.setLimit(1);
- // queryForward.setOrdering(Ordering.asc);
+ if (query.getBackend().getBackendAccess().hasDataReader()
+ && query.getBackend().getBackendAccess().hasQueryProcessor()) {
+ final BackendQueryImpl queryForward = new BackendQueryImpl(query);
+ // queryForward.setAggregation(null);
+ // queryForward.setEventFields(eventFields);
+ // queryForward.setMapping(null);
+ // queryForward.setValueTransformations(null);
+ // queryForward.setLimit(1);
+ // queryForward.setOrdering(Ordering.asc);
- final QueryProcessor processor =
- queryForward.getBackend().getBackendAccess().getQueryProcessor();
+ final QueryProcessor processor =
+ queryForward.getBackend().getBackendAccess().getQueryProcessor();
- final BackendQueryAnalyzer queryAnalizerForward =
- queryAnalizerFactory.apply(queryForward);
- final Stream>> channelToDataEventForward =
- processor.process(queryAnalizerForward);
+ final BackendQueryAnalyzer queryAnalizerForward =
+ queryAnalizerFactory.apply(queryForward);
+ final Stream>> channelToDataEventForward =
+ processor.process(queryAnalizerForward);
- final BackendQueryImpl queryBackwards = new BackendQueryImpl(queryForward);
- queryBackwards.setOrdering(Ordering.desc);
- final BackendQueryAnalyzer queryAnalizerBackwards =
- queryAnalizerFactory.apply(queryForward);
- final Stream>> channelToDataEventBackwards =
- processor.process(queryAnalizerBackwards);
+ final BackendQueryImpl queryBackwards = new BackendQueryImpl(queryForward);
+ queryBackwards.setOrdering(Ordering.desc);
+ final BackendQueryAnalyzer queryAnalizerBackwards =
+ queryAnalizerFactory.apply(queryBackwards);
+ final Stream>> channelToDataEventBackwards =
+ processor.process(queryAnalizerBackwards);
- return Streams.zip(
- channelToDataEventForward,
- channelToDataEventBackwards,
- (entry1, entry2) -> {
- return Triple.of(
- entry1.getKey(),
- entry1.getValue(),
- entry2.getValue());
- // return Triple.of(
- // entry1.getKey(),
- // Streams.concat(entry1.getValue(), entry2.getValue()));
- });
+ return Streams.zip(
+ channelToDataEventForward,
+ channelToDataEventBackwards,
+ (entry1, entry2) -> {
+ return Triple.of(
+ entry1.getKey(),
+ entry1.getValue(),
+ entry2.getValue());
+ // return Triple.of(
+ // entry1.getKey(),
+ // Streams.concat(entry1.getValue(), entry2.getValue()));
+ });
+ } else {
+ return query.getChannels().stream()
+ .map(channel -> {
+ return Triple.of(
+ new ChannelName(channel, query.getBackend()),
+ Stream.empty(),
+ Stream.empty());
+ });
+ }
});
// make sure queries are executed (Streams populate async)
diff --git a/src/main/java/ch/psi/daq/queryrest/query/QueryManagerRemote.java b/src/main/java/ch/psi/daq/queryrest/query/QueryManagerRemote.java
index 048a76e..4472cf4 100644
--- a/src/main/java/ch/psi/daq/queryrest/query/QueryManagerRemote.java
+++ b/src/main/java/ch/psi/daq/queryrest/query/QueryManagerRemote.java
@@ -217,6 +217,44 @@ public class QueryManagerRemote extends AbstractQueryManager implements Applicat
@Override
public Stream queryRange(final RangeQuery rangeQuery) throws Exception {
- throw new UnsupportedOperationException("Not yet implemented.");
+ // INFO: It is always an option to call super.queryRange(rangeQuery);
+ // The super call will use QueryRestStreamEventReader.
+
+ // set backends if not defined yet
+ getConfigurationCache().configureBackends(rangeQuery.getChannels());
+
+ final Response response = new ResponseImpl(ResponseFormat.JSON);
+
+ final Flux resultStreams =
+ Flux.fromIterable(BackendQueryImpl
+ .getBackendQueries(rangeQuery))
+ .flatMap(
+ query -> {
+ final String queryServer = backendToServerAddresses.get(query.getBackend());
+
+ return Flux.fromIterable(query.getChannels())
+ .flatMap(channel -> {
+ final ChannelName channelName =
+ new ChannelName(channel, query.getBackend());
+
+ if (queryServer == null) {
+ LOGGER.warn(
+ "There is no query server defined for '{}' of '{}'. Provide empty stream.",
+ channel, query.getBackend());
+ return Flux.just(new RangeQueryResponse(channelName, null, null));
+ } else {
+
+ final RangeQuery rQuery =
+ new RangeQuery(rangeQuery.getRange(), response, channelName);
+
+ return RestHelper
+ .queryRangeAsync(query.getBackend().getApplicationContext(),
+ queryServer, rQuery)
+ .cast(RangeQueryResponse.class);
+ }
+ });
+ });
+
+ return resultStreams.toStream();
}
}
diff --git a/src/main/java/ch/psi/daq/queryrest/response/formatter/AnyResponseFormatter.java b/src/main/java/ch/psi/daq/queryrest/response/formatter/AnyResponseFormatter.java
index b7c1c58..35d8e47 100644
--- a/src/main/java/ch/psi/daq/queryrest/response/formatter/AnyResponseFormatter.java
+++ b/src/main/java/ch/psi/daq/queryrest/response/formatter/AnyResponseFormatter.java
@@ -43,7 +43,7 @@ public class AnyResponseFormatter implements ResponseFormatter