ATEST-972

This commit is contained in:
Fabian Märki
2019-03-14 16:03:16 +01:00
parent 5dc54653f0
commit 9371beff66
9 changed files with 516 additions and 103 deletions

View File

@@ -1483,3 +1483,59 @@ The query format is equivalent to the data query (see [here](Readme.md#query_dat
```bash ```bash
curl -H "Content-Type: application/json" -X POST -d '{"range":{"startDate":"2017-01-01T01:00:00.000","endDate":"2017-11-01T01:00:00.030"},"channels":["Channel_01"]}' https://data-api.psi.ch/sf/query/config | python -m json.tool curl -H "Content-Type: application/json" -X POST -d '{"range":{"startDate":"2017-01-01T01:00:00.000","endDate":"2017-11-01T01:00:00.030"},"channels":["Channel_01"]}' https://data-api.psi.ch/sf/query/config | python -m json.tool
``` ```
<a name="pulse-id-time-mapping"/>
### Query Pulse-Id Time Mapping
### Request
```
POST https://<host>:<port>/query/mapping
```
#### Request body
A request is performed by sending a valid JSON object in the HTTP request body. The JSON query defines the channels to be queried and the range. In case no channels are defined the global pulseId-time mapping will be queried (otherwise the channel specific mapping - e.g. some events might have been delete).
#### Data
```json
{
"channels":[
"Channel_01"
],
"range":{
"startPulseId":0,
"endPulseId":3
},
"ordering":"asc",
"eventFields":[
"pulseId",
"globalDate"
],
"response":{
"format":"json",
"compression":"none",
"allowRedirect":true
}
}
```
##### Explanation
The query format is equivalent to the data query (see [here](Readme.md#query_data) for further explanations - be aware that the url is not equivalent).
##### Command
```bash
curl -H "Content-Type: application/json" -X POST -d '{"range":{"startDate":"2017-01-01T01:00:00.000","endDate":"2017-11-01T01:00:00.030"},"channels":["Channel_01"]}' https://data-api.psi.ch/sf/query/mapping | python -m json.tool
```
or
```bash
curl -H "Content-Type: application/json" -X POST -d '{"range":{"startPulseId":4,"endPulseId":20},"eventFields":["pulseId","globalSeconds"]}' https://data-api.psi.ch/sf/query/mapping | python -m json.tool
```

View File

@@ -39,6 +39,7 @@ import com.fasterxml.jackson.dataformat.smile.SmileFactory;
import ch.psi.daq.common.statistic.Statistics; import ch.psi.daq.common.statistic.Statistics;
import ch.psi.daq.domain.DataEvent; import ch.psi.daq.domain.DataEvent;
import ch.psi.daq.domain.PulseIdTime;
import ch.psi.daq.domain.backend.Backend; import ch.psi.daq.domain.backend.Backend;
import ch.psi.daq.domain.backend.BackendType; import ch.psi.daq.domain.backend.BackendType;
import ch.psi.daq.domain.backend.DomainBackendType; import ch.psi.daq.domain.backend.DomainBackendType;
@@ -163,6 +164,7 @@ public class QueryRestConfig { // extends WebMvcConfigurerAdapter {
objectMapper.addMixIn(Statistics.class, PropertyFilterMixin.class); objectMapper.addMixIn(Statistics.class, PropertyFilterMixin.class);
objectMapper.addMixIn(AbstractExtremaMeta.class, PropertyFilterMixin.class); objectMapper.addMixIn(AbstractExtremaMeta.class, PropertyFilterMixin.class);
objectMapper.addMixIn(EnumMap.class, PropertyFilterMixin.class); objectMapper.addMixIn(EnumMap.class, PropertyFilterMixin.class);
objectMapper.addMixIn(PulseIdTime.class, PropertyFilterMixin.class);
objectMapper.addMixIn(ChannelConfiguration.class, ChannelConfigurationPropertyFilterMixin.class); objectMapper.addMixIn(ChannelConfiguration.class, ChannelConfigurationPropertyFilterMixin.class);

View File

@@ -486,14 +486,14 @@ public class QueryRestController implements ApplicationContextAware {
} }
@RequestMapping( @RequestMapping(
value = DomainConfig.PATH_QUERY_RANGE, value = DomainConfig.PATH_QUERY_MAPPING,
method = RequestMethod.POST, method = RequestMethod.POST,
consumes = {MediaType.APPLICATION_JSON_VALUE}) consumes = {MediaType.APPLICATION_JSON_VALUE})
public Stream<RangeQueryResponse> queryRange( public void queryRange(
@RequestBody @Valid final RangeQuery rangeQuery, @RequestBody @Valid final RangeQuery rangeQuery,
final HttpServletRequest httpRequest, final HttpServletRequest httpRequest,
final HttpServletResponse httpResponse) throws Exception { final HttpServletResponse httpResponse) throws Exception {
final Response response = rangeQuery.getResponseOrDefault(defaultResponse); final Response response = rangeQuery.getResponseOrDefault(defaultResponse);
if (response.isAllowRedirect()) { if (response.isAllowRedirect()) {
// Do a redirection if only one backend is requested // Do a redirection if only one backend is requested
@@ -513,14 +513,34 @@ public class QueryRestController implements ApplicationContextAware {
httpResponse.setHeader(HttpHeaders.LOCATION, redirectURL); httpResponse.setHeader(HttpHeaders.LOCATION, redirectURL);
// use 307 - works for POST too // use 307 - works for POST too
httpResponse.setStatus(HttpServletResponse.SC_TEMPORARY_REDIRECT); httpResponse.setStatus(HttpServletResponse.SC_TEMPORARY_REDIRECT);
return Stream.empty(); return;
} }
} }
try {
LOGGER.debug("Executing range query '{}'", rangeQuery);
return queryManager.queryRange(rangeQuery);
try {
if (response instanceof AbstractHTTPResponse) {
LOGGER.debug("Executing range query '{}'", rangeQuery);
final AbstractHTTPResponse httpRes = ((AbstractHTTPResponse) response);
httpRes.validateQuery(rangeQuery);
// execute query
final Stream<RangeQueryResponse> result = queryManager.queryRange(rangeQuery);
httpRes.respond(
context,
httpResponse,
rangeQuery,
result);
} else {
final String message =
String.format(
"Expecting Response of type '%s' but received '%s'. Check JSON deserialization defined in '%s'",
AbstractHTTPResponse.class.getName(), response.getClass().getName(),
PolymorphicResponseMixIn.class.getName());
LOGGER.error(message);
throw new IllegalArgumentException(message);
}
} catch (Exception e) { } catch (Exception e) {
LOGGER.error("Failed to execute range query '{}'.", rangeQuery, e); LOGGER.error("Failed to execute range query '{}'.", rangeQuery, e);
throw e; throw e;

View File

@@ -1,7 +1,6 @@
package ch.psi.daq.queryrest.query; package ch.psi.daq.queryrest.query;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
@@ -42,7 +41,6 @@ import ch.psi.daq.domain.query.channels.ChannelsRequest;
import ch.psi.daq.domain.query.channels.ChannelsResponse; import ch.psi.daq.domain.query.channels.ChannelsResponse;
import ch.psi.daq.domain.query.channels.LongHash; import ch.psi.daq.domain.query.channels.LongHash;
import ch.psi.daq.domain.query.channels.RangeQueryResponse; import ch.psi.daq.domain.query.channels.RangeQueryResponse;
import ch.psi.daq.domain.query.operation.EventField;
import ch.psi.daq.domain.query.processor.QueryProcessor; import ch.psi.daq.domain.query.processor.QueryProcessor;
import ch.psi.daq.domain.reader.RequestRangeQueryResult; import ch.psi.daq.domain.reader.RequestRangeQueryResult;
@@ -111,21 +109,29 @@ public abstract class AbstractQueryManager implements QueryManager {
BackendQueryImpl BackendQueryImpl
.getBackendQueries(daqQuery) .getBackendQueries(daqQuery)
.stream() .stream()
.filter(
query -> query.getBackend().getBackendAccess().hasDataReader())
.flatMap( .flatMap(
query -> { query -> {
/* all the magic happens here */ if (query.getBackend().getBackendAccess().hasDataReader()) {
final Map<String, Stream<? extends ChannelConfiguration>> channelToConfig = /* all the magic happens here */
query.getChannelConfigurations(); final Map<String, Stream<? extends ChannelConfiguration>> channelToConfig =
query.getChannelConfigurations();
return channelToConfig.entrySet().stream() return channelToConfig.entrySet().stream()
.map(entry -> { .map(entry -> {
return Triple.of( return Triple.of(
query, query,
new ChannelName(entry.getKey(), query.getBackend()), new ChannelName(entry.getKey(), query.getBackend()),
entry.getValue()); entry.getValue());
}); });
} else {
return query.getChannels().stream()
.map(channel -> {
return Triple.of(
query,
new ChannelName(channel, query.getBackend()),
Stream.empty());
});
}
}); });
return Pair.of(daqQuery, resultStreams); return Pair.of(daqQuery, resultStreams);
@@ -147,38 +153,46 @@ public abstract class AbstractQueryManager implements QueryManager {
BackendQueryImpl BackendQueryImpl
.getBackendQueries(queryElement) .getBackendQueries(queryElement)
.stream() .stream()
.filter(
query -> {
return query.getBackend().getBackendAccess().hasDataReader()
&& query.getBackend().getBackendAccess().hasQueryProcessor();
})
.flatMap( .flatMap(
query -> { query -> {
final QueryProcessor processor = if (query.getBackend().getBackendAccess().hasDataReader()
query.getBackend().getBackendAccess().getQueryProcessor(); && query.getBackend().getBackendAccess().hasQueryProcessor()) {
final BackendQueryAnalyzer queryAnalizer = queryAnalizerFactory.apply(query);
// ChannelEvent query final QueryProcessor processor =
/* all the magic happens here */ query.getBackend().getBackendAccess().getQueryProcessor();
final Stream<Entry<ChannelName, Stream<? extends DataEvent>>> channelToDataEvents = final BackendQueryAnalyzer queryAnalizer = queryAnalizerFactory.apply(query);
processor.process(queryAnalizer);
/* do post-process */
final Stream<Entry<ChannelName, ?>> channelToData =
queryAnalizer.postProcess(channelToDataEvents);
// ChannelConfig query // ChannelEvent query
final BackendQuery configQuery = /* all the magic happens here */
new BackendQueryImpl(query, queryElement.getConfigFields()); final Stream<Entry<ChannelName, Stream<? extends DataEvent>>> channelToDataEvents =
final Map<String, Stream<? extends ChannelConfiguration>> channelToConfig = processor.process(queryAnalizer);
configQuery.getChannelConfigurations(); /* do post-process */
final Stream<Entry<ChannelName, ?>> channelToData =
queryAnalizer.postProcess(channelToDataEvents);
return channelToData.map(entry -> { // ChannelConfig query
return Quadruple.of( final BackendQuery configQuery =
query, new BackendQueryImpl(query, queryElement.getConfigFields());
entry.getKey(), final Map<String, Stream<? extends ChannelConfiguration>> channelToConfig =
channelToConfig.get(entry.getKey().getName()), configQuery.getChannelConfigurations();
entry.getValue());
}); return channelToData.map(entry -> {
return Quadruple.of(
query,
entry.getKey(),
channelToConfig.get(entry.getKey().getName()),
entry.getValue());
});
} else {
return query.getChannels().stream()
.map(channel -> {
return Quadruple.of(
query,
new ChannelName(channel, query.getBackend()),
Stream.empty(),
Stream.empty());
});
}
}); });
// Now we have a stream that loads elements sequential BackendQuery by BackendQuery. // Now we have a stream that loads elements sequential BackendQuery by BackendQuery.
@@ -198,9 +212,9 @@ public abstract class AbstractQueryManager implements QueryManager {
if (rangeQuery.getChannels().isEmpty()) { if (rangeQuery.getChannels().isEmpty()) {
// query for the general (not channel specific) pulseId/globalTime // query for the general (not channel specific) pulseId/globalTime
final BackendAccess backendAccess = getConfigurationCache().getDefaultBackend().getBackendAccess(); final BackendAccess backendAccess = getConfigurationCache().getDefaultBackend().getBackendAccess();
if (backendAccess.hasStreamEventReader()) { if (backendAccess.hasDataReader()) {
final CompletableFuture<RequestRangeQueryResult> future = final CompletableFuture<RequestRangeQueryResult> future =
backendAccess.getStreamEventReader().getPulseIdTimeMappingAsync(rangeQuery.getRange()); backendAccess.getDataReader().getPulseIdTimeMappingAsync(rangeQuery.getRange());
final RequestRangeQueryResult result = future.get( final RequestRangeQueryResult result = future.get(
backendAccess.getBackend().getApplicationContext() backendAccess.getBackend().getApplicationContext()
.getBean(DomainConfig.BEAN_NAME_DISTRIBUTED_READ_TIMEOUT, Integer.class), .getBean(DomainConfig.BEAN_NAME_DISTRIBUTED_READ_TIMEOUT, Integer.class),
@@ -223,13 +237,7 @@ public abstract class AbstractQueryManager implements QueryManager {
// set backends if not defined yet // set backends if not defined yet
getConfigurationCache().configureBackends(rangeQuery.getChannels()); getConfigurationCache().configureBackends(rangeQuery.getChannels());
final LinkedHashSet<EventField> eventFields = new LinkedHashSet<>(2);
eventFields.add(EventField.pulseId);
eventFields.add(EventField.globalTime);
eventFields.add(EventField.globalDate);
eventFields.add(EventField.globalSeconds);
rangeQuery.setAggregation(null); rangeQuery.setAggregation(null);
rangeQuery.setEventFields(eventFields);
rangeQuery.setMapping(null); rangeQuery.setMapping(null);
rangeQuery.setValueTransformations(null); rangeQuery.setValueTransformations(null);
rangeQuery.setLimit(1); rangeQuery.setLimit(1);
@@ -239,48 +247,54 @@ public abstract class AbstractQueryManager implements QueryManager {
BackendQueryImpl BackendQueryImpl
.getBackendQueries(rangeQuery) .getBackendQueries(rangeQuery)
.stream() .stream()
.filter(
query -> {
return query.getBackend().getBackendAccess().hasDataReader()
&& query.getBackend().getBackendAccess().hasQueryProcessor();
})
.flatMap( .flatMap(
query -> { query -> {
final BackendQueryImpl queryForward = new BackendQueryImpl(query); if (query.getBackend().getBackendAccess().hasDataReader()
// queryForward.setAggregation(null); && query.getBackend().getBackendAccess().hasQueryProcessor()) {
// queryForward.setEventFields(eventFields); final BackendQueryImpl queryForward = new BackendQueryImpl(query);
// queryForward.setMapping(null); // queryForward.setAggregation(null);
// queryForward.setValueTransformations(null); // queryForward.setEventFields(eventFields);
// queryForward.setLimit(1); // queryForward.setMapping(null);
// queryForward.setOrdering(Ordering.asc); // queryForward.setValueTransformations(null);
// queryForward.setLimit(1);
// queryForward.setOrdering(Ordering.asc);
final QueryProcessor processor = final QueryProcessor processor =
queryForward.getBackend().getBackendAccess().getQueryProcessor(); queryForward.getBackend().getBackendAccess().getQueryProcessor();
final BackendQueryAnalyzer queryAnalizerForward = final BackendQueryAnalyzer queryAnalizerForward =
queryAnalizerFactory.apply(queryForward); queryAnalizerFactory.apply(queryForward);
final Stream<Entry<ChannelName, Stream<? extends DataEvent>>> channelToDataEventForward = final Stream<Entry<ChannelName, Stream<? extends DataEvent>>> channelToDataEventForward =
processor.process(queryAnalizerForward); processor.process(queryAnalizerForward);
final BackendQueryImpl queryBackwards = new BackendQueryImpl(queryForward); final BackendQueryImpl queryBackwards = new BackendQueryImpl(queryForward);
queryBackwards.setOrdering(Ordering.desc); queryBackwards.setOrdering(Ordering.desc);
final BackendQueryAnalyzer queryAnalizerBackwards = final BackendQueryAnalyzer queryAnalizerBackwards =
queryAnalizerFactory.apply(queryForward); queryAnalizerFactory.apply(queryBackwards);
final Stream<Entry<ChannelName, Stream<? extends DataEvent>>> channelToDataEventBackwards = final Stream<Entry<ChannelName, Stream<? extends DataEvent>>> channelToDataEventBackwards =
processor.process(queryAnalizerBackwards); processor.process(queryAnalizerBackwards);
return Streams.zip( return Streams.zip(
channelToDataEventForward, channelToDataEventForward,
channelToDataEventBackwards, channelToDataEventBackwards,
(entry1, entry2) -> { (entry1, entry2) -> {
return Triple.of( return Triple.of(
entry1.getKey(), entry1.getKey(),
entry1.getValue(), entry1.getValue(),
entry2.getValue()); entry2.getValue());
// return Triple.of( // return Triple.of(
// entry1.getKey(), // entry1.getKey(),
// Streams.concat(entry1.getValue(), entry2.getValue())); // Streams.concat(entry1.getValue(), entry2.getValue()));
}); });
} else {
return query.getChannels().stream()
.map(channel -> {
return Triple.of(
new ChannelName(channel, query.getBackend()),
Stream.empty(),
Stream.empty());
});
}
}); });
// make sure queries are executed (Streams populate async) // make sure queries are executed (Streams populate async)

View File

@@ -217,6 +217,44 @@ public class QueryManagerRemote extends AbstractQueryManager implements Applicat
@Override @Override
public Stream<RangeQueryResponse> queryRange(final RangeQuery rangeQuery) throws Exception { public Stream<RangeQueryResponse> queryRange(final RangeQuery rangeQuery) throws Exception {
throw new UnsupportedOperationException("Not yet implemented."); // INFO: It is always an option to call super.queryRange(rangeQuery);
// The super call will use QueryRestStreamEventReader.
// set backends if not defined yet
getConfigurationCache().configureBackends(rangeQuery.getChannels());
final Response response = new ResponseImpl(ResponseFormat.JSON);
final Flux<RangeQueryResponse> resultStreams =
Flux.fromIterable(BackendQueryImpl
.getBackendQueries(rangeQuery))
.flatMap(
query -> {
final String queryServer = backendToServerAddresses.get(query.getBackend());
return Flux.fromIterable(query.getChannels())
.flatMap(channel -> {
final ChannelName channelName =
new ChannelName(channel, query.getBackend());
if (queryServer == null) {
LOGGER.warn(
"There is no query server defined for '{}' of '{}'. Provide empty stream.",
channel, query.getBackend());
return Flux.just(new RangeQueryResponse(channelName, null, null));
} else {
final RangeQuery rQuery =
new RangeQuery(rangeQuery.getRange(), response, channelName);
return RestHelper
.queryRangeAsync(query.getBackend().getApplicationContext(),
queryServer, rQuery)
.cast(RangeQueryResponse.class);
}
});
});
return resultStreams.toStream();
} }
} }

View File

@@ -43,7 +43,7 @@ public class AnyResponseFormatter implements ResponseFormatter<Object>,
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Override @Override
public void setApplicationContext(ApplicationContext context) throws BeansException { public void setApplicationContext(ApplicationContext context) throws BeansException {
if (eventFields == null) { if (eventFields == null || eventFields.isEmpty()) {
final Set<EventField> defaultEventFields = final Set<EventField> defaultEventFields =
context.getBean(eventFieldsBeanName, Set.class); context.getBean(eventFieldsBeanName, Set.class);
this.eventFields = this.eventFields =
@@ -51,7 +51,7 @@ public class AnyResponseFormatter implements ResponseFormatter<Object>,
.collect(Collectors.toCollection(LinkedHashSet::new)); .collect(Collectors.toCollection(LinkedHashSet::new));
} }
if (configFields == null) { if (configFields == null || configFields.isEmpty()) {
final Set<ConfigField> defaultConfigFields = final Set<ConfigField> defaultConfigFields =
context.getBean(configFieldsBeanName, Set.class); context.getBean(configFieldsBeanName, Set.class);
this.configFields = this.configFields =

View File

@@ -3,6 +3,7 @@ package ch.psi.daq.queryrest.response.json;
import java.io.OutputStream; import java.io.OutputStream;
import java.util.List; import java.util.List;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
import javax.servlet.ServletResponse; import javax.servlet.ServletResponse;
@@ -21,6 +22,7 @@ import ch.psi.daq.domain.query.DAQConfigQuery;
import ch.psi.daq.domain.query.DAQConfigQueryElement; import ch.psi.daq.domain.query.DAQConfigQueryElement;
import ch.psi.daq.domain.query.DAQQueries; import ch.psi.daq.domain.query.DAQQueries;
import ch.psi.daq.domain.query.DAQQueryElement; import ch.psi.daq.domain.query.DAQQueryElement;
import ch.psi.daq.domain.query.RangeQuery;
import ch.psi.daq.domain.query.backend.BackendQuery; import ch.psi.daq.domain.query.backend.BackendQuery;
import ch.psi.daq.domain.query.channels.ChannelConfigurationsRequest; import ch.psi.daq.domain.query.channels.ChannelConfigurationsRequest;
import ch.psi.daq.domain.query.channels.ChannelsRequest; import ch.psi.daq.domain.query.channels.ChannelsRequest;
@@ -97,6 +99,24 @@ public abstract class AbstractResponseStreamWriter implements ResponseStreamWrit
result, result,
out, out,
response); response);
} else if (query instanceof RangeQuery) {
final RangeQuery rangeQuery = (RangeQuery) query;
final AnyResponseFormatter formatter;
if (rangeQuery.hasEventFields()) {
formatter = new AnyResponseFormatter(
rangeQuery.getEventFields().stream().map(field -> field.getName()).collect(Collectors.toSet()),
rangeQuery.getConfigFields().stream().map(field -> field.getName()).collect(Collectors.toSet()));
} else {
formatter = backend.getApplicationContext()
.getBean(QueryRestConfig.BEAN_NAME_FORMATTER_ANY, AnyResponseFormatter.class);
}
formatter
.format(
getJsonFactory(),
getObjectMapper(),
result,
out,
response);
} else { } else {
final String message = String.format("'%s' has no response type for '%s'.", this.getClass(), query); final String message = String.format("'%s' has no response type for '%s'.", this.getClass(), query);
LOGGER.error(message); LOGGER.error(message);

View File

@@ -28,10 +28,12 @@ import ch.psi.daq.common.ordering.Ordering;
import ch.psi.daq.common.time.TimeUtils; import ch.psi.daq.common.time.TimeUtils;
import ch.psi.daq.domain.backend.Backend; import ch.psi.daq.domain.backend.Backend;
import ch.psi.daq.domain.config.DomainConfig; import ch.psi.daq.domain.config.DomainConfig;
import ch.psi.daq.domain.events.ChannelConfiguration;
import ch.psi.daq.domain.json.ChannelName; import ch.psi.daq.domain.json.ChannelName;
import ch.psi.daq.domain.query.DAQQueries; import ch.psi.daq.domain.query.DAQQueries;
import ch.psi.daq.domain.query.DAQQuery; import ch.psi.daq.domain.query.DAQQuery;
import ch.psi.daq.domain.query.DAQQueryElement; import ch.psi.daq.domain.query.DAQQueryElement;
import ch.psi.daq.domain.query.RangeQuery;
import ch.psi.daq.domain.query.channels.ChannelsRequest; import ch.psi.daq.domain.query.channels.ChannelsRequest;
import ch.psi.daq.domain.query.channels.LongHash; import ch.psi.daq.domain.query.channels.LongHash;
import ch.psi.daq.domain.query.operation.Aggregation; import ch.psi.daq.domain.query.operation.Aggregation;
@@ -395,7 +397,7 @@ public class JsonQueryRestControllerTest extends AbstractDaqRestTest implements
new RequestRangePulseId( new RequestRangePulseId(
100, 100,
true, true,
101, 101,
false), false),
TEST_CHANNEL_NAMES); TEST_CHANNEL_NAMES);
request.addEventField(EventField.pulseId); request.addEventField(EventField.pulseId);
@@ -471,7 +473,7 @@ public class JsonQueryRestControllerTest extends AbstractDaqRestTest implements
new RequestRangePulseId( new RequestRangePulseId(
99, 99,
false, false,
100, 100,
true), true),
TEST_CHANNEL_NAMES); TEST_CHANNEL_NAMES);
request.addEventField(EventField.pulseId); request.addEventField(EventField.pulseId);
@@ -540,7 +542,7 @@ public class JsonQueryRestControllerTest extends AbstractDaqRestTest implements
TestTimeUtils.getTimeStr(1, 10000000))) TestTimeUtils.getTimeStr(1, 10000000)))
.andExpect(MockMvcResultMatchers.jsonPath("$[1].data[2].iocMillis").value(1010)); .andExpect(MockMvcResultMatchers.jsonPath("$[1].data[2].iocMillis").value(1010));
} }
@Test @Test
public void testPulseRangeQuery_Fields() throws Exception { public void testPulseRangeQuery_Fields() throws Exception {
DAQQuery request = new DAQQuery( DAQQuery request = new DAQQuery(
@@ -725,7 +727,7 @@ public class JsonQueryRestControllerTest extends AbstractDaqRestTest implements
.andExpect(MockMvcResultMatchers.jsonPath("$[1].data[1].globalSeconds").value( .andExpect(MockMvcResultMatchers.jsonPath("$[1].data[1].globalSeconds").value(
TestTimeUtils.getTimeStr(2, 10000000))); TestTimeUtils.getTimeStr(2, 10000000)));
} }
@Test @Test
public void testTimeRangeQuery_01_StartExpansion() throws Exception { public void testTimeRangeQuery_01_StartExpansion() throws Exception {
DAQQuery request = new DAQQuery( DAQQuery request = new DAQQuery(
@@ -775,7 +777,7 @@ public class JsonQueryRestControllerTest extends AbstractDaqRestTest implements
.andExpect(MockMvcResultMatchers.jsonPath("$[1].data[2].globalSeconds").value( .andExpect(MockMvcResultMatchers.jsonPath("$[1].data[2].globalSeconds").value(
TestTimeUtils.getTimeStr(2, 10000000))); TestTimeUtils.getTimeStr(2, 10000000)));
} }
@Test @Test
public void testTimeRangeQuery_01_EndExpansion() throws Exception { public void testTimeRangeQuery_01_EndExpansion() throws Exception {
DAQQuery request = new DAQQuery( DAQQuery request = new DAQQuery(
@@ -910,10 +912,10 @@ public class JsonQueryRestControllerTest extends AbstractDaqRestTest implements
.andExpect(MockMvcResultMatchers.jsonPath("$[1].configs[0].precision").value(0)) .andExpect(MockMvcResultMatchers.jsonPath("$[1].configs[0].precision").value(0))
.andExpect(MockMvcResultMatchers.jsonPath("$[1].configs[0].source").value("unknown")) .andExpect(MockMvcResultMatchers.jsonPath("$[1].configs[0].source").value("unknown"))
.andExpect(MockMvcResultMatchers.jsonPath("$[1].configs[0].type").value(Type.Int32.getKey())) .andExpect(MockMvcResultMatchers.jsonPath("$[1].configs[0].type").value(Type.Int32.getKey()))
.andExpect(MockMvcResultMatchers.jsonPath("$[1].configs[0].unit").doesNotExist()) .andExpect(MockMvcResultMatchers.jsonPath("$[1].configs[0].unit").doesNotExist())
.andExpect(MockMvcResultMatchers.jsonPath("$[1].configs[1].pulseId").value(201)) .andExpect(MockMvcResultMatchers.jsonPath("$[1].configs[1].pulseId").value(201))
.andExpect(MockMvcResultMatchers.jsonPath("$[1].configs[1].globalSeconds").value( .andExpect(MockMvcResultMatchers.jsonPath("$[1].configs[1].globalSeconds").value(
TestTimeUtils.getTimeStr(2, 10000000))) TestTimeUtils.getTimeStr(2, 10000000)))
.andExpect(MockMvcResultMatchers.jsonPath("$[1].configs[1].globalMillis").value(2010)) .andExpect(MockMvcResultMatchers.jsonPath("$[1].configs[1].globalMillis").value(2010))
.andExpect(MockMvcResultMatchers.jsonPath("$[1].configs[1].shape").value(1)) .andExpect(MockMvcResultMatchers.jsonPath("$[1].configs[1].shape").value(1))
.andExpect(MockMvcResultMatchers.jsonPath("$[1].configs[1].description").doesNotExist()) .andExpect(MockMvcResultMatchers.jsonPath("$[1].configs[1].description").doesNotExist())
@@ -2395,4 +2397,165 @@ public class JsonQueryRestControllerTest extends AbstractDaqRestTest implements
value = JsonPath.read(document, "$[1].data[1].transformedValue.value").toString(); value = JsonPath.read(document, "$[1].data[1].transformedValue.value").toString();
assertTrue(value.startsWith("[101.0,")); assertTrue(value.startsWith("[101.0,"));
} }
@Test
public void testRangeQuery() throws Exception {
RangeQuery request = new RangeQuery(
new RequestRangePulseId(
100,
101),
TEST_CHANNEL_NAMES);
request.addEventField(EventField.pulseId);
request.addEventField(EventField.globalSeconds);
request.addEventField(EventField.globalTime);
String content = mapper.writeValueAsString(request);
System.out.println(content);
this.mockMvc
.perform(MockMvcRequestBuilders
.post(DomainConfig.PATH_QUERY_MAPPING)
.contentType(MediaType.APPLICATION_JSON)
.content(content))
.andDo(MockMvcResultHandlers.print())
.andExpect(MockMvcResultMatchers.status().isOk())
.andExpect(MockMvcResultMatchers.jsonPath("$").isArray())
.andExpect(MockMvcResultMatchers.jsonPath("$[0]").exists())
.andExpect(MockMvcResultMatchers.jsonPath("$[0].channel.name").value(TEST_CHANNEL_01))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].channel.backend").value(backend.getName()))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].start.pulseId").value(100))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].start.globalSeconds").value(
TestTimeUtils.getTimeStr(1, 0)))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].start.globalTime").value(1000000000))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].end.pulseId").value(101))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].end.globalSeconds").value(
TestTimeUtils.getTimeStr(1, 10000000)))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].end.globalTime").value(1010000000))
.andExpect(MockMvcResultMatchers.jsonPath("$[1]").exists())
.andExpect(MockMvcResultMatchers.jsonPath("$[1].channel.name").value(TEST_CHANNEL_02))
.andExpect(MockMvcResultMatchers.jsonPath("$[1].channel.backend").value(backend.getName()))
.andExpect(MockMvcResultMatchers.jsonPath("$[1].start.pulseId").value(100))
.andExpect(MockMvcResultMatchers.jsonPath("$[1].start.globalSeconds").value(
TestTimeUtils.getTimeStr(1, 0)))
.andExpect(MockMvcResultMatchers.jsonPath("$[1].start.globalTime").value(1000000000))
.andExpect(MockMvcResultMatchers.jsonPath("$[1].end.pulseId").value(101))
.andExpect(MockMvcResultMatchers.jsonPath("$[1].end.globalSeconds").value(
TestTimeUtils.getTimeStr(1, 10000000)))
.andExpect(MockMvcResultMatchers.jsonPath("$[1].end.globalTime").value(1010000000));
request = new RangeQuery(
new RequestRangePulseId(
100,
true,
101,
true),
TEST_CHANNEL_NAMES);
request.addEventField(EventField.pulseId);
request.addEventField(EventField.globalDate);
request.addEventField(EventField.globalTime);
content = mapper.writeValueAsString(request);
System.out.println(content);
this.mockMvc
.perform(MockMvcRequestBuilders
.post(DomainConfig.PATH_QUERY_MAPPING)
.contentType(MediaType.APPLICATION_JSON)
.content(content))
.andDo(MockMvcResultHandlers.print())
.andExpect(MockMvcResultMatchers.status().isOk())
.andExpect(MockMvcResultMatchers.jsonPath("$").isArray())
.andExpect(MockMvcResultMatchers.jsonPath("$[0]").exists())
.andExpect(MockMvcResultMatchers.jsonPath("$[0].channel.name").value(TEST_CHANNEL_01))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].channel.backend").value(backend.getName()))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].start.pulseId").value(99))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].start.globalDate").value(
TestTimeUtils.getTimeDate(0, 990000000)))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].start.globalTime").value(990000000))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].end.pulseId").value(102))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].end.globalDate").value(
TestTimeUtils.getTimeDate(1, 20000000)))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].end.globalTime").value(1020000000))
.andExpect(MockMvcResultMatchers.jsonPath("$[1]").exists())
.andExpect(MockMvcResultMatchers.jsonPath("$[1].channel.name").value(TEST_CHANNEL_02))
.andExpect(MockMvcResultMatchers.jsonPath("$[1].channel.backend").value(backend.getName()))
.andExpect(MockMvcResultMatchers.jsonPath("$[1].start.pulseId").value(99))
.andExpect(MockMvcResultMatchers.jsonPath("$[1].start.globalDate").value(
TestTimeUtils.getTimeDate(0, 990000000)))
.andExpect(MockMvcResultMatchers.jsonPath("$[1].start.globalTime").value(990000000))
.andExpect(MockMvcResultMatchers.jsonPath("$[1].end.pulseId").value(102))
.andExpect(MockMvcResultMatchers.jsonPath("$[1].end.globalDate").value(
TestTimeUtils.getTimeDate(1, 20000000)))
.andExpect(MockMvcResultMatchers.jsonPath("$[1].end.globalTime").value(1020000000));
request = new RangeQuery(
new RequestRangePulseId(
100,
101));
request.addEventField(EventField.pulseId);
request.addEventField(EventField.globalSeconds);
request.addEventField(EventField.globalTime);
content = mapper.writeValueAsString(request);
System.out.println(content);
this.mockMvc
.perform(MockMvcRequestBuilders
.post(DomainConfig.PATH_QUERY_MAPPING)
.contentType(MediaType.APPLICATION_JSON)
.content(content))
.andDo(MockMvcResultHandlers.print())
.andExpect(MockMvcResultMatchers.status().isOk())
.andExpect(MockMvcResultMatchers.jsonPath("$").isArray())
.andExpect(MockMvcResultMatchers.jsonPath("$[0]").exists())
.andExpect(
MockMvcResultMatchers.jsonPath("$[0].channel.name").value(ChannelConfiguration.COMBINED_CHANNEL_NAME))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].channel.backend").value(backend.getName()))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].start.pulseId").value(100))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].start.globalSeconds").value(
TestTimeUtils.getTimeStr(1, 0)))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].start.globalTime").value(1000000000))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].end.pulseId").value(101))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].end.globalSeconds").value(
TestTimeUtils.getTimeStr(1, 10000000)))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].end.globalTime").value(1010000000));
request = new RangeQuery(
new RequestRangePulseId(
100,
true,
101,
true));
request.addEventField(EventField.pulseId);
request.addEventField(EventField.globalDate);
request.addEventField(EventField.globalTime);
content = mapper.writeValueAsString(request);
System.out.println(content);
this.mockMvc
.perform(MockMvcRequestBuilders
.post(DomainConfig.PATH_QUERY_MAPPING)
.contentType(MediaType.APPLICATION_JSON)
.content(content))
.andDo(MockMvcResultHandlers.print())
.andExpect(MockMvcResultMatchers.status().isOk())
.andExpect(MockMvcResultMatchers.jsonPath("$").isArray())
.andExpect(MockMvcResultMatchers.jsonPath("$[0]").exists())
.andExpect(
MockMvcResultMatchers.jsonPath("$[0].channel.name").value(ChannelConfiguration.COMBINED_CHANNEL_NAME))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].channel.backend").value(backend.getName()))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].start.pulseId").value(99))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].start.globalDate").value(
TestTimeUtils.getTimeDate(0, 990000000)))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].start.globalTime").value(990000000))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].end.pulseId").value(102))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].end.globalDate").value(
TestTimeUtils.getTimeDate(1, 20000000)))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].end.globalTime").value(1020000000));
}
} }

View File

@@ -41,7 +41,9 @@ import ch.psi.daq.domain.json.ChannelName;
import ch.psi.daq.domain.json.Event; import ch.psi.daq.domain.json.Event;
import ch.psi.daq.domain.query.DAQConfigQuery; import ch.psi.daq.domain.query.DAQConfigQuery;
import ch.psi.daq.domain.query.DAQQuery; import ch.psi.daq.domain.query.DAQQuery;
import ch.psi.daq.domain.query.RangeQuery;
import ch.psi.daq.domain.query.channels.ChannelsRequest; import ch.psi.daq.domain.query.channels.ChannelsRequest;
import ch.psi.daq.domain.query.channels.RangeQueryResponse;
import ch.psi.daq.domain.query.mapping.Mapping; import ch.psi.daq.domain.query.mapping.Mapping;
import ch.psi.daq.domain.query.operation.Aggregation; import ch.psi.daq.domain.query.operation.Aggregation;
import ch.psi.daq.domain.query.operation.AggregationDescriptor; import ch.psi.daq.domain.query.operation.AggregationDescriptor;
@@ -566,4 +568,102 @@ public class QueryManagerRemoteTest extends AbstractDaqRestTest {
.collect(Collectors.toList()); .collect(Collectors.toList());
assertTrue("Size was " + channels.size(), channels.size() > 400000); assertTrue("Size was " + channels.size(), channels.size() > 400000);
} }
@Test
public void testRangeQuery_01() throws Exception {
assertNotNull(queryBackend);
assertNotNull(queryServer);
String scalarChannel = "SINEG01-RCIR-PUP10:SIG-AMPLT-AVG";
String waveformChannel = "SINEG01-RCIR-PUP10:SIG-AMPLT";
RangeQuery query = new RangeQuery(
new RequestRangeSeconds(
TimeUtils.parse("2016-10-12T14:00"),
TimeUtils.parse("2016-10-12T16:00")),
new ChannelName(scalarChannel, queryBackend),
new ChannelName(waveformChannel, queryBackend));
List<? extends RangeQueryResponse> rangeQueryResponses = RestHelper.queryRange(context, queryServer, query);
assertEquals("Size was " + rangeQueryResponses.size(), 2, rangeQueryResponses.size());
RangeQueryResponse queryResponse = rangeQueryResponses.get(0);
assertEquals(queryBackend, queryResponse.getChannel().getBackend());
assertEquals(scalarChannel, queryResponse.getChannel().getName());
assertEquals("2016-10-12T14:07:09.914760000+02:00", queryResponse.getStart().getGlobalDate());
assertEquals(638760000, queryResponse.getStart().getPulseId());
assertEquals("2016-10-12T15:57:10.053420000+02:00", queryResponse.getStart().getGlobalDate());
assertEquals(639420000, queryResponse.getStart().getPulseId());
queryResponse = rangeQueryResponses.get(0);
assertEquals(queryBackend, queryResponse.getChannel().getBackend());
assertEquals(waveformChannel, queryResponse.getChannel().getName());
assertEquals("2016-10-12T14:07:09.914760000+02:00", queryResponse.getStart().getGlobalDate());
assertEquals(638760000, queryResponse.getStart().getPulseId());
assertEquals("2016-10-12T15:57:10.053420000+02:00", queryResponse.getStart().getGlobalDate());
assertEquals(639420000, queryResponse.getStart().getPulseId());
query = new RangeQuery(
new RequestRangeSeconds(
TimeUtils.parse("2016-10-12T14:00"),
true,
TimeUtils.parse("2016-10-12T16:00"),
true),
new ChannelName(scalarChannel, queryBackend),
new ChannelName(waveformChannel, queryBackend));
rangeQueryResponses = RestHelper.queryRange(context, queryServer, query);
assertEquals("Size was " + rangeQueryResponses.size(), 2, rangeQueryResponses.size());
queryResponse = rangeQueryResponses.get(0);
assertEquals(queryBackend, queryResponse.getChannel().getBackend());
assertEquals(scalarChannel, queryResponse.getChannel().getName());
// it seems the system stopped a few days
assertEquals("2016-10-07T09:32:24.579300000+02:00", queryResponse.getStart().getGlobalDate());
assertEquals(609300000, queryResponse.getStart().getPulseId());
assertEquals("2016-10-12T16:07:09.793480000+02:00", queryResponse.getStart().getGlobalDate());
assertEquals(639480000, queryResponse.getStart().getPulseId());
queryResponse = rangeQueryResponses.get(1);
assertEquals(queryBackend, queryResponse.getChannel().getBackend());
assertEquals(scalarChannel, queryResponse.getChannel().getName());
// it seems the system stopped a few days
assertEquals("2016-10-07T09:32:24.579300000+02:00", queryResponse.getStart().getGlobalDate());
assertEquals(609300000, queryResponse.getStart().getPulseId());
assertEquals("2016-10-12T16:07:09.793480000+02:00", queryResponse.getStart().getGlobalDate());
assertEquals(639480000, queryResponse.getStart().getPulseId());
query = new RangeQuery(
new RequestRangeSeconds(
TimeUtils.parse("2016-10-12T14:00"),
TimeUtils.parse("2016-10-12T16:00")));
rangeQueryResponses = RestHelper.queryRange(context, queryServer, query);
assertEquals("Size was " + rangeQueryResponses.size(), 1, rangeQueryResponses.size());
// TODO: set time/pulse correctly (was not clear at time of writing)
queryResponse = rangeQueryResponses.get(0);
assertEquals(queryBackend, queryResponse.getChannel().getBackend());
assertEquals(waveformChannel, queryResponse.getChannel().getName());
assertEquals("2016-10-12T13:57:09.914760000+02:00", queryResponse.getStart().getGlobalDate());
assertEquals(638750000, queryResponse.getStart().getPulseId());
assertEquals("2016-10-12T16:07:10.053420000+02:00", queryResponse.getStart().getGlobalDate());
assertEquals(639430000, queryResponse.getStart().getPulseId());
query = new RangeQuery(
new RequestRangeSeconds(
TimeUtils.parse("2016-10-12T14:00"),
true,
TimeUtils.parse("2016-10-12T16:00"),
true));
rangeQueryResponses = RestHelper.queryRange(context, queryServer, query);
assertEquals("Size was " + rangeQueryResponses.size(), 1, rangeQueryResponses.size());
// TODO: set time/pulse correctly (was not clear at time of writing)
queryResponse = rangeQueryResponses.get(0);
assertEquals(queryBackend, queryResponse.getChannel().getBackend());
assertEquals(scalarChannel, ChannelConfiguration.COMBINED_CHANNEL_NAME);
assertEquals("2016-10-12T14:07:09.913760000+02:00", queryResponse.getStart().getGlobalDate());
assertEquals(638759999, queryResponse.getStart().getPulseId());
assertEquals("2016-10-12T15:57:10.054420000+02:00", queryResponse.getStart().getGlobalDate());
assertEquals(639420001, queryResponse.getStart().getPulseId());
}
} }