From 9371beff664384d1e86acd68674f1ddaa409fe3c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fabian=20M=C3=A4rki?= Date: Thu, 14 Mar 2019 16:03:16 +0100 Subject: [PATCH] ATEST-972 --- Readme.md | 56 ++++++ .../daq/queryrest/config/QueryRestConfig.java | 2 + .../controller/QueryRestController.java | 36 +++- .../queryrest/query/AbstractQueryManager.java | 184 ++++++++++-------- .../queryrest/query/QueryManagerRemote.java | 40 +++- .../formatter/AnyResponseFormatter.java | 4 +- .../json/AbstractResponseStreamWriter.java | 20 ++ .../JsonQueryRestControllerTest.java | 177 ++++++++++++++++- .../query/QueryManagerRemoteTest.java | 100 ++++++++++ 9 files changed, 516 insertions(+), 103 deletions(-) diff --git a/Readme.md b/Readme.md index 0abb712..9c6c485 100644 --- a/Readme.md +++ b/Readme.md @@ -1483,3 +1483,59 @@ The query format is equivalent to the data query (see [here](Readme.md#query_dat ```bash curl -H "Content-Type: application/json" -X POST -d '{"range":{"startDate":"2017-01-01T01:00:00.000","endDate":"2017-11-01T01:00:00.030"},"channels":["Channel_01"]}' https://data-api.psi.ch/sf/query/config | python -m json.tool ``` + + + + +### Query Pulse-Id Time Mapping + +### Request + +``` +POST https://:/query/mapping +``` + +#### Request body + +A request is performed by sending a valid JSON object in the HTTP request body. The JSON query defines the channels to be queried and the range. In case no channels are defined the global pulseId-time mapping will be queried (otherwise the channel specific mapping - e.g. some events might have been delete). + +#### Data + +```json +{ + "channels":[ + "Channel_01" + ], + "range":{ + "startPulseId":0, + "endPulseId":3 + }, + "ordering":"asc", + "eventFields":[ + "pulseId", + "globalDate" + ], + "response":{ + "format":"json", + "compression":"none", + "allowRedirect":true + } +} +``` + +##### Explanation + +The query format is equivalent to the data query (see [here](Readme.md#query_data) for further explanations - be aware that the url is not equivalent). + +##### Command + +```bash +curl -H "Content-Type: application/json" -X POST -d '{"range":{"startDate":"2017-01-01T01:00:00.000","endDate":"2017-11-01T01:00:00.030"},"channels":["Channel_01"]}' https://data-api.psi.ch/sf/query/mapping | python -m json.tool +``` + +or + +```bash +curl -H "Content-Type: application/json" -X POST -d '{"range":{"startPulseId":4,"endPulseId":20},"eventFields":["pulseId","globalSeconds"]}' https://data-api.psi.ch/sf/query/mapping | python -m json.tool +``` + diff --git a/src/main/java/ch/psi/daq/queryrest/config/QueryRestConfig.java b/src/main/java/ch/psi/daq/queryrest/config/QueryRestConfig.java index 8c9f43e..970b83d 100644 --- a/src/main/java/ch/psi/daq/queryrest/config/QueryRestConfig.java +++ b/src/main/java/ch/psi/daq/queryrest/config/QueryRestConfig.java @@ -39,6 +39,7 @@ import com.fasterxml.jackson.dataformat.smile.SmileFactory; import ch.psi.daq.common.statistic.Statistics; import ch.psi.daq.domain.DataEvent; +import ch.psi.daq.domain.PulseIdTime; import ch.psi.daq.domain.backend.Backend; import ch.psi.daq.domain.backend.BackendType; import ch.psi.daq.domain.backend.DomainBackendType; @@ -163,6 +164,7 @@ public class QueryRestConfig { // extends WebMvcConfigurerAdapter { objectMapper.addMixIn(Statistics.class, PropertyFilterMixin.class); objectMapper.addMixIn(AbstractExtremaMeta.class, PropertyFilterMixin.class); objectMapper.addMixIn(EnumMap.class, PropertyFilterMixin.class); + objectMapper.addMixIn(PulseIdTime.class, PropertyFilterMixin.class); objectMapper.addMixIn(ChannelConfiguration.class, ChannelConfigurationPropertyFilterMixin.class); diff --git a/src/main/java/ch/psi/daq/queryrest/controller/QueryRestController.java b/src/main/java/ch/psi/daq/queryrest/controller/QueryRestController.java index f4e751e..5131e96 100644 --- a/src/main/java/ch/psi/daq/queryrest/controller/QueryRestController.java +++ b/src/main/java/ch/psi/daq/queryrest/controller/QueryRestController.java @@ -486,14 +486,14 @@ public class QueryRestController implements ApplicationContextAware { } @RequestMapping( - value = DomainConfig.PATH_QUERY_RANGE, + value = DomainConfig.PATH_QUERY_MAPPING, method = RequestMethod.POST, consumes = {MediaType.APPLICATION_JSON_VALUE}) - public Stream queryRange( + public void queryRange( @RequestBody @Valid final RangeQuery rangeQuery, final HttpServletRequest httpRequest, final HttpServletResponse httpResponse) throws Exception { - + final Response response = rangeQuery.getResponseOrDefault(defaultResponse); if (response.isAllowRedirect()) { // Do a redirection if only one backend is requested @@ -513,14 +513,34 @@ public class QueryRestController implements ApplicationContextAware { httpResponse.setHeader(HttpHeaders.LOCATION, redirectURL); // use 307 - works for POST too httpResponse.setStatus(HttpServletResponse.SC_TEMPORARY_REDIRECT); - return Stream.empty(); + return; } } - - try { - LOGGER.debug("Executing range query '{}'", rangeQuery); - return queryManager.queryRange(rangeQuery); + + try { + if (response instanceof AbstractHTTPResponse) { + LOGGER.debug("Executing range query '{}'", rangeQuery); + final AbstractHTTPResponse httpRes = ((AbstractHTTPResponse) response); + + httpRes.validateQuery(rangeQuery); + // execute query + final Stream result = queryManager.queryRange(rangeQuery); + + httpRes.respond( + context, + httpResponse, + rangeQuery, + result); + } else { + final String message = + String.format( + "Expecting Response of type '%s' but received '%s'. Check JSON deserialization defined in '%s'", + AbstractHTTPResponse.class.getName(), response.getClass().getName(), + PolymorphicResponseMixIn.class.getName()); + LOGGER.error(message); + throw new IllegalArgumentException(message); + } } catch (Exception e) { LOGGER.error("Failed to execute range query '{}'.", rangeQuery, e); throw e; diff --git a/src/main/java/ch/psi/daq/queryrest/query/AbstractQueryManager.java b/src/main/java/ch/psi/daq/queryrest/query/AbstractQueryManager.java index 3c7159d..5ca7dd7 100644 --- a/src/main/java/ch/psi/daq/queryrest/query/AbstractQueryManager.java +++ b/src/main/java/ch/psi/daq/queryrest/query/AbstractQueryManager.java @@ -1,7 +1,6 @@ package ch.psi.daq.queryrest.query; import java.util.ArrayList; -import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -42,7 +41,6 @@ import ch.psi.daq.domain.query.channels.ChannelsRequest; import ch.psi.daq.domain.query.channels.ChannelsResponse; import ch.psi.daq.domain.query.channels.LongHash; import ch.psi.daq.domain.query.channels.RangeQueryResponse; -import ch.psi.daq.domain.query.operation.EventField; import ch.psi.daq.domain.query.processor.QueryProcessor; import ch.psi.daq.domain.reader.RequestRangeQueryResult; @@ -111,21 +109,29 @@ public abstract class AbstractQueryManager implements QueryManager { BackendQueryImpl .getBackendQueries(daqQuery) .stream() - .filter( - query -> query.getBackend().getBackendAccess().hasDataReader()) .flatMap( query -> { - /* all the magic happens here */ - final Map> channelToConfig = - query.getChannelConfigurations(); + if (query.getBackend().getBackendAccess().hasDataReader()) { + /* all the magic happens here */ + final Map> channelToConfig = + query.getChannelConfigurations(); - return channelToConfig.entrySet().stream() - .map(entry -> { - return Triple.of( - query, - new ChannelName(entry.getKey(), query.getBackend()), - entry.getValue()); - }); + return channelToConfig.entrySet().stream() + .map(entry -> { + return Triple.of( + query, + new ChannelName(entry.getKey(), query.getBackend()), + entry.getValue()); + }); + } else { + return query.getChannels().stream() + .map(channel -> { + return Triple.of( + query, + new ChannelName(channel, query.getBackend()), + Stream.empty()); + }); + } }); return Pair.of(daqQuery, resultStreams); @@ -147,38 +153,46 @@ public abstract class AbstractQueryManager implements QueryManager { BackendQueryImpl .getBackendQueries(queryElement) .stream() - .filter( - query -> { - return query.getBackend().getBackendAccess().hasDataReader() - && query.getBackend().getBackendAccess().hasQueryProcessor(); - }) .flatMap( query -> { - final QueryProcessor processor = - query.getBackend().getBackendAccess().getQueryProcessor(); - final BackendQueryAnalyzer queryAnalizer = queryAnalizerFactory.apply(query); + if (query.getBackend().getBackendAccess().hasDataReader() + && query.getBackend().getBackendAccess().hasQueryProcessor()) { - // ChannelEvent query - /* all the magic happens here */ - final Stream>> channelToDataEvents = - processor.process(queryAnalizer); - /* do post-process */ - final Stream> channelToData = - queryAnalizer.postProcess(channelToDataEvents); + final QueryProcessor processor = + query.getBackend().getBackendAccess().getQueryProcessor(); + final BackendQueryAnalyzer queryAnalizer = queryAnalizerFactory.apply(query); - // ChannelConfig query - final BackendQuery configQuery = - new BackendQueryImpl(query, queryElement.getConfigFields()); - final Map> channelToConfig = - configQuery.getChannelConfigurations(); + // ChannelEvent query + /* all the magic happens here */ + final Stream>> channelToDataEvents = + processor.process(queryAnalizer); + /* do post-process */ + final Stream> channelToData = + queryAnalizer.postProcess(channelToDataEvents); - return channelToData.map(entry -> { - return Quadruple.of( - query, - entry.getKey(), - channelToConfig.get(entry.getKey().getName()), - entry.getValue()); - }); + // ChannelConfig query + final BackendQuery configQuery = + new BackendQueryImpl(query, queryElement.getConfigFields()); + final Map> channelToConfig = + configQuery.getChannelConfigurations(); + + return channelToData.map(entry -> { + return Quadruple.of( + query, + entry.getKey(), + channelToConfig.get(entry.getKey().getName()), + entry.getValue()); + }); + } else { + return query.getChannels().stream() + .map(channel -> { + return Quadruple.of( + query, + new ChannelName(channel, query.getBackend()), + Stream.empty(), + Stream.empty()); + }); + } }); // Now we have a stream that loads elements sequential BackendQuery by BackendQuery. @@ -198,9 +212,9 @@ public abstract class AbstractQueryManager implements QueryManager { if (rangeQuery.getChannels().isEmpty()) { // query for the general (not channel specific) pulseId/globalTime final BackendAccess backendAccess = getConfigurationCache().getDefaultBackend().getBackendAccess(); - if (backendAccess.hasStreamEventReader()) { + if (backendAccess.hasDataReader()) { final CompletableFuture future = - backendAccess.getStreamEventReader().getPulseIdTimeMappingAsync(rangeQuery.getRange()); + backendAccess.getDataReader().getPulseIdTimeMappingAsync(rangeQuery.getRange()); final RequestRangeQueryResult result = future.get( backendAccess.getBackend().getApplicationContext() .getBean(DomainConfig.BEAN_NAME_DISTRIBUTED_READ_TIMEOUT, Integer.class), @@ -223,13 +237,7 @@ public abstract class AbstractQueryManager implements QueryManager { // set backends if not defined yet getConfigurationCache().configureBackends(rangeQuery.getChannels()); - final LinkedHashSet eventFields = new LinkedHashSet<>(2); - eventFields.add(EventField.pulseId); - eventFields.add(EventField.globalTime); - eventFields.add(EventField.globalDate); - eventFields.add(EventField.globalSeconds); rangeQuery.setAggregation(null); - rangeQuery.setEventFields(eventFields); rangeQuery.setMapping(null); rangeQuery.setValueTransformations(null); rangeQuery.setLimit(1); @@ -239,48 +247,54 @@ public abstract class AbstractQueryManager implements QueryManager { BackendQueryImpl .getBackendQueries(rangeQuery) .stream() - .filter( - query -> { - return query.getBackend().getBackendAccess().hasDataReader() - && query.getBackend().getBackendAccess().hasQueryProcessor(); - }) .flatMap( query -> { - final BackendQueryImpl queryForward = new BackendQueryImpl(query); - // queryForward.setAggregation(null); - // queryForward.setEventFields(eventFields); - // queryForward.setMapping(null); - // queryForward.setValueTransformations(null); - // queryForward.setLimit(1); - // queryForward.setOrdering(Ordering.asc); + if (query.getBackend().getBackendAccess().hasDataReader() + && query.getBackend().getBackendAccess().hasQueryProcessor()) { + final BackendQueryImpl queryForward = new BackendQueryImpl(query); + // queryForward.setAggregation(null); + // queryForward.setEventFields(eventFields); + // queryForward.setMapping(null); + // queryForward.setValueTransformations(null); + // queryForward.setLimit(1); + // queryForward.setOrdering(Ordering.asc); - final QueryProcessor processor = - queryForward.getBackend().getBackendAccess().getQueryProcessor(); + final QueryProcessor processor = + queryForward.getBackend().getBackendAccess().getQueryProcessor(); - final BackendQueryAnalyzer queryAnalizerForward = - queryAnalizerFactory.apply(queryForward); - final Stream>> channelToDataEventForward = - processor.process(queryAnalizerForward); + final BackendQueryAnalyzer queryAnalizerForward = + queryAnalizerFactory.apply(queryForward); + final Stream>> channelToDataEventForward = + processor.process(queryAnalizerForward); - final BackendQueryImpl queryBackwards = new BackendQueryImpl(queryForward); - queryBackwards.setOrdering(Ordering.desc); - final BackendQueryAnalyzer queryAnalizerBackwards = - queryAnalizerFactory.apply(queryForward); - final Stream>> channelToDataEventBackwards = - processor.process(queryAnalizerBackwards); + final BackendQueryImpl queryBackwards = new BackendQueryImpl(queryForward); + queryBackwards.setOrdering(Ordering.desc); + final BackendQueryAnalyzer queryAnalizerBackwards = + queryAnalizerFactory.apply(queryBackwards); + final Stream>> channelToDataEventBackwards = + processor.process(queryAnalizerBackwards); - return Streams.zip( - channelToDataEventForward, - channelToDataEventBackwards, - (entry1, entry2) -> { - return Triple.of( - entry1.getKey(), - entry1.getValue(), - entry2.getValue()); - // return Triple.of( - // entry1.getKey(), - // Streams.concat(entry1.getValue(), entry2.getValue())); - }); + return Streams.zip( + channelToDataEventForward, + channelToDataEventBackwards, + (entry1, entry2) -> { + return Triple.of( + entry1.getKey(), + entry1.getValue(), + entry2.getValue()); + // return Triple.of( + // entry1.getKey(), + // Streams.concat(entry1.getValue(), entry2.getValue())); + }); + } else { + return query.getChannels().stream() + .map(channel -> { + return Triple.of( + new ChannelName(channel, query.getBackend()), + Stream.empty(), + Stream.empty()); + }); + } }); // make sure queries are executed (Streams populate async) diff --git a/src/main/java/ch/psi/daq/queryrest/query/QueryManagerRemote.java b/src/main/java/ch/psi/daq/queryrest/query/QueryManagerRemote.java index 048a76e..4472cf4 100644 --- a/src/main/java/ch/psi/daq/queryrest/query/QueryManagerRemote.java +++ b/src/main/java/ch/psi/daq/queryrest/query/QueryManagerRemote.java @@ -217,6 +217,44 @@ public class QueryManagerRemote extends AbstractQueryManager implements Applicat @Override public Stream queryRange(final RangeQuery rangeQuery) throws Exception { - throw new UnsupportedOperationException("Not yet implemented."); + // INFO: It is always an option to call super.queryRange(rangeQuery); + // The super call will use QueryRestStreamEventReader. + + // set backends if not defined yet + getConfigurationCache().configureBackends(rangeQuery.getChannels()); + + final Response response = new ResponseImpl(ResponseFormat.JSON); + + final Flux resultStreams = + Flux.fromIterable(BackendQueryImpl + .getBackendQueries(rangeQuery)) + .flatMap( + query -> { + final String queryServer = backendToServerAddresses.get(query.getBackend()); + + return Flux.fromIterable(query.getChannels()) + .flatMap(channel -> { + final ChannelName channelName = + new ChannelName(channel, query.getBackend()); + + if (queryServer == null) { + LOGGER.warn( + "There is no query server defined for '{}' of '{}'. Provide empty stream.", + channel, query.getBackend()); + return Flux.just(new RangeQueryResponse(channelName, null, null)); + } else { + + final RangeQuery rQuery = + new RangeQuery(rangeQuery.getRange(), response, channelName); + + return RestHelper + .queryRangeAsync(query.getBackend().getApplicationContext(), + queryServer, rQuery) + .cast(RangeQueryResponse.class); + } + }); + }); + + return resultStreams.toStream(); } } diff --git a/src/main/java/ch/psi/daq/queryrest/response/formatter/AnyResponseFormatter.java b/src/main/java/ch/psi/daq/queryrest/response/formatter/AnyResponseFormatter.java index b7c1c58..35d8e47 100644 --- a/src/main/java/ch/psi/daq/queryrest/response/formatter/AnyResponseFormatter.java +++ b/src/main/java/ch/psi/daq/queryrest/response/formatter/AnyResponseFormatter.java @@ -43,7 +43,7 @@ public class AnyResponseFormatter implements ResponseFormatter, @SuppressWarnings("unchecked") @Override public void setApplicationContext(ApplicationContext context) throws BeansException { - if (eventFields == null) { + if (eventFields == null || eventFields.isEmpty()) { final Set defaultEventFields = context.getBean(eventFieldsBeanName, Set.class); this.eventFields = @@ -51,7 +51,7 @@ public class AnyResponseFormatter implements ResponseFormatter, .collect(Collectors.toCollection(LinkedHashSet::new)); } - if (configFields == null) { + if (configFields == null || configFields.isEmpty()) { final Set defaultConfigFields = context.getBean(configFieldsBeanName, Set.class); this.configFields = diff --git a/src/main/java/ch/psi/daq/queryrest/response/json/AbstractResponseStreamWriter.java b/src/main/java/ch/psi/daq/queryrest/response/json/AbstractResponseStreamWriter.java index af3f360..8352f91 100644 --- a/src/main/java/ch/psi/daq/queryrest/response/json/AbstractResponseStreamWriter.java +++ b/src/main/java/ch/psi/daq/queryrest/response/json/AbstractResponseStreamWriter.java @@ -3,6 +3,7 @@ package ch.psi.daq.queryrest.response.json; import java.io.OutputStream; import java.util.List; import java.util.Map.Entry; +import java.util.stream.Collectors; import java.util.stream.Stream; import javax.servlet.ServletResponse; @@ -21,6 +22,7 @@ import ch.psi.daq.domain.query.DAQConfigQuery; import ch.psi.daq.domain.query.DAQConfigQueryElement; import ch.psi.daq.domain.query.DAQQueries; import ch.psi.daq.domain.query.DAQQueryElement; +import ch.psi.daq.domain.query.RangeQuery; import ch.psi.daq.domain.query.backend.BackendQuery; import ch.psi.daq.domain.query.channels.ChannelConfigurationsRequest; import ch.psi.daq.domain.query.channels.ChannelsRequest; @@ -97,6 +99,24 @@ public abstract class AbstractResponseStreamWriter implements ResponseStreamWrit result, out, response); + } else if (query instanceof RangeQuery) { + final RangeQuery rangeQuery = (RangeQuery) query; + final AnyResponseFormatter formatter; + if (rangeQuery.hasEventFields()) { + formatter = new AnyResponseFormatter( + rangeQuery.getEventFields().stream().map(field -> field.getName()).collect(Collectors.toSet()), + rangeQuery.getConfigFields().stream().map(field -> field.getName()).collect(Collectors.toSet())); + } else { + formatter = backend.getApplicationContext() + .getBean(QueryRestConfig.BEAN_NAME_FORMATTER_ANY, AnyResponseFormatter.class); + } + formatter + .format( + getJsonFactory(), + getObjectMapper(), + result, + out, + response); } else { final String message = String.format("'%s' has no response type for '%s'.", this.getClass(), query); LOGGER.error(message); diff --git a/src/test/java/ch/psi/daq/test/queryrest/controller/JsonQueryRestControllerTest.java b/src/test/java/ch/psi/daq/test/queryrest/controller/JsonQueryRestControllerTest.java index 691e21a..1877d63 100644 --- a/src/test/java/ch/psi/daq/test/queryrest/controller/JsonQueryRestControllerTest.java +++ b/src/test/java/ch/psi/daq/test/queryrest/controller/JsonQueryRestControllerTest.java @@ -28,10 +28,12 @@ import ch.psi.daq.common.ordering.Ordering; import ch.psi.daq.common.time.TimeUtils; import ch.psi.daq.domain.backend.Backend; import ch.psi.daq.domain.config.DomainConfig; +import ch.psi.daq.domain.events.ChannelConfiguration; import ch.psi.daq.domain.json.ChannelName; import ch.psi.daq.domain.query.DAQQueries; import ch.psi.daq.domain.query.DAQQuery; import ch.psi.daq.domain.query.DAQQueryElement; +import ch.psi.daq.domain.query.RangeQuery; import ch.psi.daq.domain.query.channels.ChannelsRequest; import ch.psi.daq.domain.query.channels.LongHash; import ch.psi.daq.domain.query.operation.Aggregation; @@ -395,7 +397,7 @@ public class JsonQueryRestControllerTest extends AbstractDaqRestTest implements new RequestRangePulseId( 100, true, - 101, + 101, false), TEST_CHANNEL_NAMES); request.addEventField(EventField.pulseId); @@ -471,7 +473,7 @@ public class JsonQueryRestControllerTest extends AbstractDaqRestTest implements new RequestRangePulseId( 99, false, - 100, + 100, true), TEST_CHANNEL_NAMES); request.addEventField(EventField.pulseId); @@ -540,7 +542,7 @@ public class JsonQueryRestControllerTest extends AbstractDaqRestTest implements TestTimeUtils.getTimeStr(1, 10000000))) .andExpect(MockMvcResultMatchers.jsonPath("$[1].data[2].iocMillis").value(1010)); } - + @Test public void testPulseRangeQuery_Fields() throws Exception { DAQQuery request = new DAQQuery( @@ -725,7 +727,7 @@ public class JsonQueryRestControllerTest extends AbstractDaqRestTest implements .andExpect(MockMvcResultMatchers.jsonPath("$[1].data[1].globalSeconds").value( TestTimeUtils.getTimeStr(2, 10000000))); } - + @Test public void testTimeRangeQuery_01_StartExpansion() throws Exception { DAQQuery request = new DAQQuery( @@ -775,7 +777,7 @@ public class JsonQueryRestControllerTest extends AbstractDaqRestTest implements .andExpect(MockMvcResultMatchers.jsonPath("$[1].data[2].globalSeconds").value( TestTimeUtils.getTimeStr(2, 10000000))); } - + @Test public void testTimeRangeQuery_01_EndExpansion() throws Exception { DAQQuery request = new DAQQuery( @@ -910,10 +912,10 @@ public class JsonQueryRestControllerTest extends AbstractDaqRestTest implements .andExpect(MockMvcResultMatchers.jsonPath("$[1].configs[0].precision").value(0)) .andExpect(MockMvcResultMatchers.jsonPath("$[1].configs[0].source").value("unknown")) .andExpect(MockMvcResultMatchers.jsonPath("$[1].configs[0].type").value(Type.Int32.getKey())) - .andExpect(MockMvcResultMatchers.jsonPath("$[1].configs[0].unit").doesNotExist()) + .andExpect(MockMvcResultMatchers.jsonPath("$[1].configs[0].unit").doesNotExist()) .andExpect(MockMvcResultMatchers.jsonPath("$[1].configs[1].pulseId").value(201)) .andExpect(MockMvcResultMatchers.jsonPath("$[1].configs[1].globalSeconds").value( - TestTimeUtils.getTimeStr(2, 10000000))) + TestTimeUtils.getTimeStr(2, 10000000))) .andExpect(MockMvcResultMatchers.jsonPath("$[1].configs[1].globalMillis").value(2010)) .andExpect(MockMvcResultMatchers.jsonPath("$[1].configs[1].shape").value(1)) .andExpect(MockMvcResultMatchers.jsonPath("$[1].configs[1].description").doesNotExist()) @@ -2395,4 +2397,165 @@ public class JsonQueryRestControllerTest extends AbstractDaqRestTest implements value = JsonPath.read(document, "$[1].data[1].transformedValue.value").toString(); assertTrue(value.startsWith("[101.0,")); } + + @Test + public void testRangeQuery() throws Exception { + RangeQuery request = new RangeQuery( + new RequestRangePulseId( + 100, + 101), + TEST_CHANNEL_NAMES); + request.addEventField(EventField.pulseId); + request.addEventField(EventField.globalSeconds); + request.addEventField(EventField.globalTime); + + String content = mapper.writeValueAsString(request); + System.out.println(content); + + this.mockMvc + .perform(MockMvcRequestBuilders + .post(DomainConfig.PATH_QUERY_MAPPING) + .contentType(MediaType.APPLICATION_JSON) + .content(content)) + + .andDo(MockMvcResultHandlers.print()) + .andExpect(MockMvcResultMatchers.status().isOk()) + .andExpect(MockMvcResultMatchers.jsonPath("$").isArray()) + .andExpect(MockMvcResultMatchers.jsonPath("$[0]").exists()) + .andExpect(MockMvcResultMatchers.jsonPath("$[0].channel.name").value(TEST_CHANNEL_01)) + .andExpect(MockMvcResultMatchers.jsonPath("$[0].channel.backend").value(backend.getName())) + .andExpect(MockMvcResultMatchers.jsonPath("$[0].start.pulseId").value(100)) + .andExpect(MockMvcResultMatchers.jsonPath("$[0].start.globalSeconds").value( + TestTimeUtils.getTimeStr(1, 0))) + .andExpect(MockMvcResultMatchers.jsonPath("$[0].start.globalTime").value(1000000000)) + .andExpect(MockMvcResultMatchers.jsonPath("$[0].end.pulseId").value(101)) + .andExpect(MockMvcResultMatchers.jsonPath("$[0].end.globalSeconds").value( + TestTimeUtils.getTimeStr(1, 10000000))) + .andExpect(MockMvcResultMatchers.jsonPath("$[0].end.globalTime").value(1010000000)) + .andExpect(MockMvcResultMatchers.jsonPath("$[1]").exists()) + .andExpect(MockMvcResultMatchers.jsonPath("$[1].channel.name").value(TEST_CHANNEL_02)) + .andExpect(MockMvcResultMatchers.jsonPath("$[1].channel.backend").value(backend.getName())) + .andExpect(MockMvcResultMatchers.jsonPath("$[1].start.pulseId").value(100)) + .andExpect(MockMvcResultMatchers.jsonPath("$[1].start.globalSeconds").value( + TestTimeUtils.getTimeStr(1, 0))) + .andExpect(MockMvcResultMatchers.jsonPath("$[1].start.globalTime").value(1000000000)) + .andExpect(MockMvcResultMatchers.jsonPath("$[1].end.pulseId").value(101)) + .andExpect(MockMvcResultMatchers.jsonPath("$[1].end.globalSeconds").value( + TestTimeUtils.getTimeStr(1, 10000000))) + .andExpect(MockMvcResultMatchers.jsonPath("$[1].end.globalTime").value(1010000000)); + + request = new RangeQuery( + new RequestRangePulseId( + 100, + true, + 101, + true), + TEST_CHANNEL_NAMES); + request.addEventField(EventField.pulseId); + request.addEventField(EventField.globalDate); + request.addEventField(EventField.globalTime); + + content = mapper.writeValueAsString(request); + System.out.println(content); + + this.mockMvc + .perform(MockMvcRequestBuilders + .post(DomainConfig.PATH_QUERY_MAPPING) + .contentType(MediaType.APPLICATION_JSON) + .content(content)) + + .andDo(MockMvcResultHandlers.print()) + .andExpect(MockMvcResultMatchers.status().isOk()) + .andExpect(MockMvcResultMatchers.jsonPath("$").isArray()) + .andExpect(MockMvcResultMatchers.jsonPath("$[0]").exists()) + .andExpect(MockMvcResultMatchers.jsonPath("$[0].channel.name").value(TEST_CHANNEL_01)) + .andExpect(MockMvcResultMatchers.jsonPath("$[0].channel.backend").value(backend.getName())) + .andExpect(MockMvcResultMatchers.jsonPath("$[0].start.pulseId").value(99)) + .andExpect(MockMvcResultMatchers.jsonPath("$[0].start.globalDate").value( + TestTimeUtils.getTimeDate(0, 990000000))) + .andExpect(MockMvcResultMatchers.jsonPath("$[0].start.globalTime").value(990000000)) + .andExpect(MockMvcResultMatchers.jsonPath("$[0].end.pulseId").value(102)) + .andExpect(MockMvcResultMatchers.jsonPath("$[0].end.globalDate").value( + TestTimeUtils.getTimeDate(1, 20000000))) + .andExpect(MockMvcResultMatchers.jsonPath("$[0].end.globalTime").value(1020000000)) + .andExpect(MockMvcResultMatchers.jsonPath("$[1]").exists()) + .andExpect(MockMvcResultMatchers.jsonPath("$[1].channel.name").value(TEST_CHANNEL_02)) + .andExpect(MockMvcResultMatchers.jsonPath("$[1].channel.backend").value(backend.getName())) + .andExpect(MockMvcResultMatchers.jsonPath("$[1].start.pulseId").value(99)) + .andExpect(MockMvcResultMatchers.jsonPath("$[1].start.globalDate").value( + TestTimeUtils.getTimeDate(0, 990000000))) + .andExpect(MockMvcResultMatchers.jsonPath("$[1].start.globalTime").value(990000000)) + .andExpect(MockMvcResultMatchers.jsonPath("$[1].end.pulseId").value(102)) + .andExpect(MockMvcResultMatchers.jsonPath("$[1].end.globalDate").value( + TestTimeUtils.getTimeDate(1, 20000000))) + .andExpect(MockMvcResultMatchers.jsonPath("$[1].end.globalTime").value(1020000000)); + + request = new RangeQuery( + new RequestRangePulseId( + 100, + 101)); + request.addEventField(EventField.pulseId); + request.addEventField(EventField.globalSeconds); + request.addEventField(EventField.globalTime); + + content = mapper.writeValueAsString(request); + System.out.println(content); + + this.mockMvc + .perform(MockMvcRequestBuilders + .post(DomainConfig.PATH_QUERY_MAPPING) + .contentType(MediaType.APPLICATION_JSON) + .content(content)) + + .andDo(MockMvcResultHandlers.print()) + .andExpect(MockMvcResultMatchers.status().isOk()) + .andExpect(MockMvcResultMatchers.jsonPath("$").isArray()) + .andExpect(MockMvcResultMatchers.jsonPath("$[0]").exists()) + .andExpect( + MockMvcResultMatchers.jsonPath("$[0].channel.name").value(ChannelConfiguration.COMBINED_CHANNEL_NAME)) + .andExpect(MockMvcResultMatchers.jsonPath("$[0].channel.backend").value(backend.getName())) + .andExpect(MockMvcResultMatchers.jsonPath("$[0].start.pulseId").value(100)) + .andExpect(MockMvcResultMatchers.jsonPath("$[0].start.globalSeconds").value( + TestTimeUtils.getTimeStr(1, 0))) + .andExpect(MockMvcResultMatchers.jsonPath("$[0].start.globalTime").value(1000000000)) + .andExpect(MockMvcResultMatchers.jsonPath("$[0].end.pulseId").value(101)) + .andExpect(MockMvcResultMatchers.jsonPath("$[0].end.globalSeconds").value( + TestTimeUtils.getTimeStr(1, 10000000))) + .andExpect(MockMvcResultMatchers.jsonPath("$[0].end.globalTime").value(1010000000)); + + request = new RangeQuery( + new RequestRangePulseId( + 100, + true, + 101, + true)); + request.addEventField(EventField.pulseId); + request.addEventField(EventField.globalDate); + request.addEventField(EventField.globalTime); + + content = mapper.writeValueAsString(request); + System.out.println(content); + + this.mockMvc + .perform(MockMvcRequestBuilders + .post(DomainConfig.PATH_QUERY_MAPPING) + .contentType(MediaType.APPLICATION_JSON) + .content(content)) + + .andDo(MockMvcResultHandlers.print()) + .andExpect(MockMvcResultMatchers.status().isOk()) + .andExpect(MockMvcResultMatchers.jsonPath("$").isArray()) + .andExpect(MockMvcResultMatchers.jsonPath("$[0]").exists()) + .andExpect( + MockMvcResultMatchers.jsonPath("$[0].channel.name").value(ChannelConfiguration.COMBINED_CHANNEL_NAME)) + .andExpect(MockMvcResultMatchers.jsonPath("$[0].channel.backend").value(backend.getName())) + .andExpect(MockMvcResultMatchers.jsonPath("$[0].start.pulseId").value(99)) + .andExpect(MockMvcResultMatchers.jsonPath("$[0].start.globalDate").value( + TestTimeUtils.getTimeDate(0, 990000000))) + .andExpect(MockMvcResultMatchers.jsonPath("$[0].start.globalTime").value(990000000)) + .andExpect(MockMvcResultMatchers.jsonPath("$[0].end.pulseId").value(102)) + .andExpect(MockMvcResultMatchers.jsonPath("$[0].end.globalDate").value( + TestTimeUtils.getTimeDate(1, 20000000))) + .andExpect(MockMvcResultMatchers.jsonPath("$[0].end.globalTime").value(1020000000)); + } } diff --git a/src/test/java/ch/psi/daq/test/queryrest/query/QueryManagerRemoteTest.java b/src/test/java/ch/psi/daq/test/queryrest/query/QueryManagerRemoteTest.java index b812e91..30876b8 100644 --- a/src/test/java/ch/psi/daq/test/queryrest/query/QueryManagerRemoteTest.java +++ b/src/test/java/ch/psi/daq/test/queryrest/query/QueryManagerRemoteTest.java @@ -41,7 +41,9 @@ import ch.psi.daq.domain.json.ChannelName; import ch.psi.daq.domain.json.Event; import ch.psi.daq.domain.query.DAQConfigQuery; import ch.psi.daq.domain.query.DAQQuery; +import ch.psi.daq.domain.query.RangeQuery; import ch.psi.daq.domain.query.channels.ChannelsRequest; +import ch.psi.daq.domain.query.channels.RangeQueryResponse; import ch.psi.daq.domain.query.mapping.Mapping; import ch.psi.daq.domain.query.operation.Aggregation; import ch.psi.daq.domain.query.operation.AggregationDescriptor; @@ -566,4 +568,102 @@ public class QueryManagerRemoteTest extends AbstractDaqRestTest { .collect(Collectors.toList()); assertTrue("Size was " + channels.size(), channels.size() > 400000); } + + @Test + public void testRangeQuery_01() throws Exception { + assertNotNull(queryBackend); + assertNotNull(queryServer); + + String scalarChannel = "SINEG01-RCIR-PUP10:SIG-AMPLT-AVG"; + String waveformChannel = "SINEG01-RCIR-PUP10:SIG-AMPLT"; + RangeQuery query = new RangeQuery( + new RequestRangeSeconds( + TimeUtils.parse("2016-10-12T14:00"), + TimeUtils.parse("2016-10-12T16:00")), + new ChannelName(scalarChannel, queryBackend), + new ChannelName(waveformChannel, queryBackend)); + + List rangeQueryResponses = RestHelper.queryRange(context, queryServer, query); + assertEquals("Size was " + rangeQueryResponses.size(), 2, rangeQueryResponses.size()); + + RangeQueryResponse queryResponse = rangeQueryResponses.get(0); + assertEquals(queryBackend, queryResponse.getChannel().getBackend()); + assertEquals(scalarChannel, queryResponse.getChannel().getName()); + assertEquals("2016-10-12T14:07:09.914760000+02:00", queryResponse.getStart().getGlobalDate()); + assertEquals(638760000, queryResponse.getStart().getPulseId()); + assertEquals("2016-10-12T15:57:10.053420000+02:00", queryResponse.getStart().getGlobalDate()); + assertEquals(639420000, queryResponse.getStart().getPulseId()); + + queryResponse = rangeQueryResponses.get(0); + assertEquals(queryBackend, queryResponse.getChannel().getBackend()); + assertEquals(waveformChannel, queryResponse.getChannel().getName()); + assertEquals("2016-10-12T14:07:09.914760000+02:00", queryResponse.getStart().getGlobalDate()); + assertEquals(638760000, queryResponse.getStart().getPulseId()); + assertEquals("2016-10-12T15:57:10.053420000+02:00", queryResponse.getStart().getGlobalDate()); + assertEquals(639420000, queryResponse.getStart().getPulseId()); + + query = new RangeQuery( + new RequestRangeSeconds( + TimeUtils.parse("2016-10-12T14:00"), + true, + TimeUtils.parse("2016-10-12T16:00"), + true), + new ChannelName(scalarChannel, queryBackend), + new ChannelName(waveformChannel, queryBackend)); + + rangeQueryResponses = RestHelper.queryRange(context, queryServer, query); + assertEquals("Size was " + rangeQueryResponses.size(), 2, rangeQueryResponses.size()); + + queryResponse = rangeQueryResponses.get(0); + assertEquals(queryBackend, queryResponse.getChannel().getBackend()); + assertEquals(scalarChannel, queryResponse.getChannel().getName()); + // it seems the system stopped a few days + assertEquals("2016-10-07T09:32:24.579300000+02:00", queryResponse.getStart().getGlobalDate()); + assertEquals(609300000, queryResponse.getStart().getPulseId()); + assertEquals("2016-10-12T16:07:09.793480000+02:00", queryResponse.getStart().getGlobalDate()); + assertEquals(639480000, queryResponse.getStart().getPulseId()); + + queryResponse = rangeQueryResponses.get(1); + assertEquals(queryBackend, queryResponse.getChannel().getBackend()); + assertEquals(scalarChannel, queryResponse.getChannel().getName()); + // it seems the system stopped a few days + assertEquals("2016-10-07T09:32:24.579300000+02:00", queryResponse.getStart().getGlobalDate()); + assertEquals(609300000, queryResponse.getStart().getPulseId()); + assertEquals("2016-10-12T16:07:09.793480000+02:00", queryResponse.getStart().getGlobalDate()); + assertEquals(639480000, queryResponse.getStart().getPulseId()); + + query = new RangeQuery( + new RequestRangeSeconds( + TimeUtils.parse("2016-10-12T14:00"), + TimeUtils.parse("2016-10-12T16:00"))); + + rangeQueryResponses = RestHelper.queryRange(context, queryServer, query); + assertEquals("Size was " + rangeQueryResponses.size(), 1, rangeQueryResponses.size()); + // TODO: set time/pulse correctly (was not clear at time of writing) + queryResponse = rangeQueryResponses.get(0); + assertEquals(queryBackend, queryResponse.getChannel().getBackend()); + assertEquals(waveformChannel, queryResponse.getChannel().getName()); + assertEquals("2016-10-12T13:57:09.914760000+02:00", queryResponse.getStart().getGlobalDate()); + assertEquals(638750000, queryResponse.getStart().getPulseId()); + assertEquals("2016-10-12T16:07:10.053420000+02:00", queryResponse.getStart().getGlobalDate()); + assertEquals(639430000, queryResponse.getStart().getPulseId()); + + query = new RangeQuery( + new RequestRangeSeconds( + TimeUtils.parse("2016-10-12T14:00"), + true, + TimeUtils.parse("2016-10-12T16:00"), + true)); + + rangeQueryResponses = RestHelper.queryRange(context, queryServer, query); + assertEquals("Size was " + rangeQueryResponses.size(), 1, rangeQueryResponses.size()); + // TODO: set time/pulse correctly (was not clear at time of writing) + queryResponse = rangeQueryResponses.get(0); + assertEquals(queryBackend, queryResponse.getChannel().getBackend()); + assertEquals(scalarChannel, ChannelConfiguration.COMBINED_CHANNEL_NAME); + assertEquals("2016-10-12T14:07:09.913760000+02:00", queryResponse.getStart().getGlobalDate()); + assertEquals(638759999, queryResponse.getStart().getPulseId()); + assertEquals("2016-10-12T15:57:10.054420000+02:00", queryResponse.getStart().getGlobalDate()); + assertEquals(639420001, queryResponse.getStart().getPulseId()); + } }