Merge branch 'ATEST-530' into 'master'

ATEST-530



See merge request !12
This commit is contained in:
maerki_f
2016-08-18 13:49:16 +02:00
22 changed files with 737 additions and 749 deletions

View File

@ -1,5 +1,5 @@
# #
#Wed Jun 08 12:47:13 CEST 2016 #Thu Aug 04 12:28:57 CEST 2016
org.eclipse.jdt.core.compiler.debug.localVariable=generate org.eclipse.jdt.core.compiler.debug.localVariable=generate
org.eclipse.jdt.core.compiler.compliance=1.8 org.eclipse.jdt.core.compiler.compliance=1.8
org.eclipse.jdt.core.compiler.codegen.unusedLocal=preserve org.eclipse.jdt.core.compiler.codegen.unusedLocal=preserve

View File

@ -930,70 +930,3 @@ Illustration of array index aggregation with additional with binning (several nr
![Index Aggregation with Binning](doc/images/Index_Binning.png) ![Index Aggregation with Binning](doc/images/Index_Binning.png)
``` ```
<a name="query_channel_status"/>
## Query Channel Status
It is possible to retieve channel specific status information.
### Request
```
POST http://<host>:<port>/channels/status
```
#### Data
```json
{
"channels":[
"Channel_02",
"Channel_04"
]
}
```
##### Explanation
- **channels**: Array of channels to be queried (see [here](Readme.md#query_channel_names) and [here](Readme.md#define_channel_names)).
### Example
#### Command
```bash
curl -H "Content-Type: application/json" -X POST -d '{"channels": ["Channel_02","Channel_04"]}' http://data-api.psi.ch/sf/channels/status | python -m json.tool
```
#### Response
```json
[
{
"channel":{
"name":"Channel_02",
"backend":"sf-databuffer"
},
"recording":true,
"connected":true,
"lastEventDate":"2016-07-06T09:16:19.607242575+02:00"
},
{
"channel":{
"name":"Channel_04",
"backend":"sf-archiverappliance"
},
"recording":false,
"connected":false,
"lastEventDate":"2016-07-06T04:16:14.000000000+02:00"
}
]
```
##### Explanation
- **channel**: The name and backend of the channel.
- **recording**: Defines if the channel is still recorded (please note that for beam synchronous DAQ this means that the source/IOC providing the channel is still recorded).
- **connected**: Defines if the channel is still connected (please note that for beam synchronous DAQ this means that the source/IOC providing the channel is still connected).
- **lastEventDate**: The timestamp of the last received event from the channel in the ISO8601 format.

View File

@ -29,14 +29,14 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import ch.psi.daq.common.statistic.Statistics; import ch.psi.daq.common.statistic.Statistics;
import ch.psi.daq.domain.DataEvent; import ch.psi.daq.domain.DataEvent;
import ch.psi.daq.domain.query.backend.BackendQuery;
import ch.psi.daq.domain.query.backend.analyzer.BackendQueryAnalyzer;
import ch.psi.daq.domain.query.operation.Aggregation; import ch.psi.daq.domain.query.operation.Aggregation;
import ch.psi.daq.domain.query.operation.QueryField; import ch.psi.daq.domain.query.operation.QueryField;
import ch.psi.daq.domain.query.operation.Response; import ch.psi.daq.domain.query.operation.Response;
import ch.psi.daq.domain.query.operation.aggregation.extrema.AbstractExtremaMeta; import ch.psi.daq.domain.query.operation.aggregation.extrema.AbstractExtremaMeta;
import ch.psi.daq.query.analyzer.QueryAnalyzer; import ch.psi.daq.query.analyzer.BackendQueryAnalyzerImpl;
import ch.psi.daq.query.analyzer.QueryAnalyzerImpl;
import ch.psi.daq.query.config.QueryConfig; import ch.psi.daq.query.config.QueryConfig;
import ch.psi.daq.query.model.Query;
import ch.psi.daq.queryrest.controller.validator.QueryValidator; import ch.psi.daq.queryrest.controller.validator.QueryValidator;
import ch.psi.daq.queryrest.model.PropertyFilterMixin; import ch.psi.daq.queryrest.model.PropertyFilterMixin;
import ch.psi.daq.queryrest.query.QueryManager; import ch.psi.daq.queryrest.query.QueryManager;
@ -66,6 +66,11 @@ public class QueryRestConfig extends WebMvcConfigurerAdapter {
static class InnerConfiguration { static class InnerConfiguration {
} }
// somehow needed to make sure @Import elements will get initialized and afterPropertiesSet will
// get called (ensuring BackendAcces gets initialized according hierarchy)
@Resource
private QueryConfig queryConfig;
private static final Logger LOGGER = LoggerFactory.getLogger(QueryRestConfig.class); private static final Logger LOGGER = LoggerFactory.getLogger(QueryRestConfig.class);
public static final String BEAN_NAME_DEFAULT_RESPONSE_FIELDS = "defaultResponseFields"; public static final String BEAN_NAME_DEFAULT_RESPONSE_FIELDS = "defaultResponseFields";
@ -115,8 +120,8 @@ public class QueryRestConfig extends WebMvcConfigurerAdapter {
} }
@Bean @Bean
public Function<Query, QueryAnalyzer> queryAnalizerFactory() { public Function<BackendQuery, BackendQueryAnalyzer> queryAnalizerFactory() {
return (query) -> new QueryAnalyzerImpl(query); return (query) -> new BackendQueryAnalyzerImpl(query);
} }
@Bean @Bean

View File

@ -1,16 +1,9 @@
package ch.psi.daq.queryrest.controller; package ch.psi.daq.queryrest.controller;
import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.annotation.Resource; import javax.annotation.Resource;
import javax.servlet.http.HttpServletResponse; import javax.servlet.http.HttpServletResponse;
import javax.validation.Valid; import javax.validation.Valid;
@ -30,10 +23,14 @@ import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody; import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController; import org.springframework.web.bind.annotation.RestController;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import ch.psi.daq.common.ordering.Ordering; import ch.psi.daq.common.ordering.Ordering;
import ch.psi.daq.domain.backend.Backend;
import ch.psi.daq.domain.config.DomainConfig; import ch.psi.daq.domain.config.DomainConfig;
import ch.psi.daq.domain.json.ChannelName; import ch.psi.daq.domain.json.channels.info.ChannelInfos;
import ch.psi.daq.domain.json.status.channel.ChannelStatus; import ch.psi.daq.domain.query.ChannelNameRequest;
import ch.psi.daq.domain.query.DAQQueries; import ch.psi.daq.domain.query.DAQQueries;
import ch.psi.daq.domain.query.DAQQuery; import ch.psi.daq.domain.query.DAQQuery;
import ch.psi.daq.domain.query.channels.ChannelsRequest; import ch.psi.daq.domain.query.channels.ChannelsRequest;
@ -44,33 +41,17 @@ import ch.psi.daq.domain.query.operation.Compression;
import ch.psi.daq.domain.query.operation.QueryField; import ch.psi.daq.domain.query.operation.QueryField;
import ch.psi.daq.domain.query.operation.Response; import ch.psi.daq.domain.query.operation.Response;
import ch.psi.daq.domain.query.operation.ResponseFormat; import ch.psi.daq.domain.query.operation.ResponseFormat;
import ch.psi.daq.domain.query.status.channel.ChannelStatusQuery;
import ch.psi.daq.domain.reader.Backend;
import ch.psi.daq.domain.request.validate.RequestProviderValidator; import ch.psi.daq.domain.request.validate.RequestProviderValidator;
import ch.psi.daq.domain.status.StatusReader;
import ch.psi.daq.query.analyzer.QueryAnalyzer;
import ch.psi.daq.query.config.QueryConfig;
import ch.psi.daq.query.model.Query;
import ch.psi.daq.query.processor.ChannelNameCache;
import ch.psi.daq.query.processor.QueryProcessor;
import ch.psi.daq.queryrest.query.QueryManager; import ch.psi.daq.queryrest.query.QueryManager;
import ch.psi.daq.queryrest.response.AbstractHTTPResponse; import ch.psi.daq.queryrest.response.AbstractHTTPResponse;
import ch.psi.daq.queryrest.response.PolymorphicResponseMixIn; import ch.psi.daq.queryrest.response.PolymorphicResponseMixIn;
import ch.psi.daq.queryrest.response.json.JSONHTTPResponse; import ch.psi.daq.queryrest.response.json.JSONHTTPResponse;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
@RestController @RestController
public class QueryRestController { public class QueryRestController {
private static final Logger LOGGER = LoggerFactory.getLogger(QueryRestController.class); private static final Logger LOGGER = LoggerFactory.getLogger(QueryRestController.class);
public static final String PATH_CHANNELS = DomainConfig.PATH_CHANNELS;
public static final String PATH_QUERY = DomainConfig.PATH_QUERY;
public static final String PATH_QUERIES = DomainConfig.PATH_QUERIES;
public static final String PATH_STATUS_CHANNELS = DomainConfig.PATH_STATUS_CHANNELS;
@Resource @Resource
private ApplicationContext appContext; private ApplicationContext appContext;
@ -86,91 +67,8 @@ public class QueryRestController {
private Response defaultResponse = new JSONHTTPResponse(); private Response defaultResponse = new JSONHTTPResponse();
@Resource
private Function<Query, QueryAnalyzer> queryAnalizerFactory;
@Resource(name = DomainConfig.BEAN_NAME_READ_TIMEOUT)
private Integer readTimeout;
private Map<Backend, QueryProcessor> queryProcessors = new LinkedHashMap<>();
private ChannelNameCache channelNameCache;
private Map<Backend, StatusReader> statusReaders = new LinkedHashMap<>();
@PostConstruct @PostConstruct
public void afterPropertiesSet() { public void afterPropertiesSet() {}
List<Exception> exceptions = new ArrayList<>();
try {
QueryProcessor queryProcessor =
appContext.getBean(QueryConfig.BEAN_NAME_CASSANDRA_QUERY_PROCESSOR, QueryProcessor.class);
queryProcessors.put(queryProcessor.getBackend(), queryProcessor);
} catch (Exception e) {
exceptions.add(e);
LOGGER.warn("");
LOGGER.warn("##########");
LOGGER.warn("Could not load query processor for cassandra.");
LOGGER.warn("##########");
LOGGER.warn("");
}
try {
QueryProcessor queryProcessor =
appContext.getBean(QueryConfig.BEAN_NAME_ARCHIVER_APPLIANCE_QUERY_PROCESSOR, QueryProcessor.class);
queryProcessors.put(queryProcessor.getBackend(), queryProcessor);
} catch (Exception e) {
exceptions.add(e);
LOGGER.warn("");
LOGGER.warn("##########");
LOGGER.warn("Could not load query processor for archiverappliance.");
LOGGER.warn("##########");
LOGGER.warn("");
}
if (queryProcessors.isEmpty()) {
LOGGER.error("No query processor could be loaded! Exceptions were: ");
for (Exception exception : exceptions) {
LOGGER.error("", exception);
}
throw new RuntimeException("No Backends available!");
}
try {
StatusReader statusReader =
appContext.getBean(QueryConfig.BEAN_NAME_CASSANDRA_STATUS_READER, StatusReader.class);
statusReaders.put(statusReader.getBackend(), statusReader);
} catch (Exception e) {
exceptions.add(e);
LOGGER.warn("");
LOGGER.warn("##########");
LOGGER.warn("Could not load status reader for cassandra.");
LOGGER.warn("##########");
LOGGER.warn("");
}
try {
StatusReader statusReader =
appContext.getBean(QueryConfig.BEAN_NAME_ARCHIVER_APPLIANCE_STATUS_READER, StatusReader.class);
statusReaders.put(statusReader.getBackend(), statusReader);
} catch (Exception e) {
exceptions.add(e);
LOGGER.warn("");
LOGGER.warn("##########");
LOGGER.warn("Could not load status reader for archiverappliance.");
LOGGER.warn("##########");
LOGGER.warn("");
}
channelNameCache =
new ChannelNameCache(queryProcessors, appContext.getBean(DomainConfig.BEAN_NAME_READ_TIMEOUT,
Integer.class).longValue());
}
@PreDestroy
public void destroy() {
channelNameCache.destroy();
}
@InitBinder @InitBinder
protected void initBinder(WebDataBinder binder) { protected void initBinder(WebDataBinder binder) {
@ -184,7 +82,7 @@ public class QueryRestController {
} }
} }
@RequestMapping(value = PATH_CHANNELS, method = {RequestMethod.GET, RequestMethod.POST}, @RequestMapping(value = DomainConfig.PATH_CHANNELS, method = {RequestMethod.GET, RequestMethod.POST},
produces = {MediaType.APPLICATION_JSON_VALUE}) produces = {MediaType.APPLICATION_JSON_VALUE})
public @ResponseBody List<ChannelsResponse> getChannels(@RequestBody(required = false) ChannelsRequest request) public @ResponseBody List<ChannelsResponse> getChannels(@RequestBody(required = false) ChannelsRequest request)
throws Throwable { throws Throwable {
@ -198,7 +96,7 @@ public class QueryRestController {
* @return Collection of channel names matching the specified input channel name * @return Collection of channel names matching the specified input channel name
* @throws Throwable in case something goes wrong * @throws Throwable in case something goes wrong
*/ */
@RequestMapping(value = PATH_CHANNELS + "/{channelName}", method = {RequestMethod.GET}, @RequestMapping(value = DomainConfig.PATH_CHANNELS + "/{channelName}", method = {RequestMethod.GET},
produces = {MediaType.APPLICATION_JSON_VALUE}) produces = {MediaType.APPLICATION_JSON_VALUE})
public @ResponseBody Collection<ChannelsResponse> getChannels(@PathVariable(value = "channelName") String channelName) public @ResponseBody Collection<ChannelsResponse> getChannels(@PathVariable(value = "channelName") String channelName)
throws Throwable { throws Throwable {
@ -206,44 +104,19 @@ public class QueryRestController {
} }
/** /**
* Queries for channels status * Queries for channels info
* *
* @param query the {@link ChannelStatusQuery} * @param request the ChannelNameRequest
* @param res the {@link HttpServletResponse} instance associated with this request * @return Collection of ChannelInfos
* @return Collection of channel status info
* @throws Throwable in case something goes wrong * @throws Throwable in case something goes wrong
*/ */
@RequestMapping( @RequestMapping(
value = PATH_STATUS_CHANNELS, value = DomainConfig.PATH_CHANNELS_INFO,
method = RequestMethod.POST, method = RequestMethod.POST,
consumes = {MediaType.APPLICATION_JSON_VALUE}) consumes = {MediaType.APPLICATION_JSON_VALUE})
public @ResponseBody Collection<ChannelStatus> executeChannelStatusQuery(@RequestBody ChannelStatusQuery query, public @ResponseBody Collection<ChannelInfos> executeChannelInfoQuery(@RequestBody ChannelNameRequest request)
HttpServletResponse res) throws Throwable { throws Throwable {
return queryManager.getChannelInfos(request);
// set backends if not defined yet
channelNameCache.setBackends(query.getChannels());
List<CompletableFuture<ChannelStatus>> futures = new ArrayList<>(query.getChannels().size());
for (ChannelName channel : query.getChannels()) {
StatusReader statusReader = statusReaders.get(channel.getBackend());
if (statusReader != null) {
futures.add(statusReader.getChannelStatusAsync(channel.getName()));
} else {
LOGGER.warn("There is no StatusReader available for '{}'.", channel.getBackend());
}
}
try {
List<ChannelStatus> retStatus = new ArrayList<>(futures.size());
for (CompletableFuture<ChannelStatus> completableFuture : futures) {
retStatus.add(completableFuture.get(readTimeout, TimeUnit.SECONDS));
}
return retStatus;
} catch (Exception e) {
String message = "Could not extract ChannelStatus within timelimits.";
LOGGER.error(message, e);
throw new IllegalStateException(message, e);
}
} }
/** /**
@ -256,7 +129,7 @@ public class QueryRestController {
* {@link #executeQuery(DAQQuery, HttpServletResponse)} fails * {@link #executeQuery(DAQQuery, HttpServletResponse)} fails
*/ */
@RequestMapping( @RequestMapping(
value = PATH_QUERY, value = DomainConfig.PATH_QUERY,
method = RequestMethod.GET) method = RequestMethod.GET)
public void executeQueryBodyAsString(@RequestParam String jsonBody, HttpServletResponse res) throws Exception { public void executeQueryBodyAsString(@RequestParam String jsonBody, HttpServletResponse res) throws Exception {
DAQQuery query = objectMapper.readValue(jsonBody, DAQQuery.class); DAQQuery query = objectMapper.readValue(jsonBody, DAQQuery.class);
@ -271,7 +144,7 @@ public class QueryRestController {
* @throws Exception thrown if writing to the output stream fails * @throws Exception thrown if writing to the output stream fails
*/ */
@RequestMapping( @RequestMapping(
value = PATH_QUERY, value = DomainConfig.PATH_QUERY,
method = RequestMethod.POST, method = RequestMethod.POST,
consumes = {MediaType.APPLICATION_JSON_VALUE}) consumes = {MediaType.APPLICATION_JSON_VALUE})
public void executeQuery(@RequestBody @Valid DAQQuery query, HttpServletResponse res) throws Exception { public void executeQuery(@RequestBody @Valid DAQQuery query, HttpServletResponse res) throws Exception {
@ -288,7 +161,7 @@ public class QueryRestController {
* {@link #executeQueries(DAQQueries, HttpServletResponse)} fails * {@link #executeQueries(DAQQueries, HttpServletResponse)} fails
*/ */
@RequestMapping( @RequestMapping(
value = PATH_QUERIES, value = DomainConfig.PATH_QUERIES,
method = RequestMethod.GET) method = RequestMethod.GET)
public void executeQueriesBodyAsString(@RequestParam String jsonBody, HttpServletResponse res) throws Exception { public void executeQueriesBodyAsString(@RequestParam String jsonBody, HttpServletResponse res) throws Exception {
DAQQueries queries = objectMapper.readValue(jsonBody, DAQQueries.class); DAQQueries queries = objectMapper.readValue(jsonBody, DAQQueries.class);
@ -299,8 +172,8 @@ public class QueryRestController {
* Catch-all query method for getting data from the backend for both JSON and CSV requests. * Catch-all query method for getting data from the backend for both JSON and CSV requests.
* <p> * <p>
* The {@link DAQQueries} object will contain the concrete subclass based on the combination of * The {@link DAQQueries} object will contain the concrete subclass based on the combination of
* fields defined in the user's query. The AttributeBasedDeserializer decides which class * fields defined in the user's query. The AttributeBasedDeserializer decides which class to
* to deserialize the information into and has been configured (see * deserialize the information into and has been configured (see
* QueryRestConfig#afterPropertiesSet) accordingly. * QueryRestConfig#afterPropertiesSet) accordingly.
* *
* @param queries the {@link DAQQueries} * @param queries the {@link DAQQueries}
@ -308,7 +181,7 @@ public class QueryRestController {
* @throws Exception thrown if writing to the output stream fails * @throws Exception thrown if writing to the output stream fails
*/ */
@RequestMapping( @RequestMapping(
value = PATH_QUERIES, value = DomainConfig.PATH_QUERIES,
method = RequestMethod.POST, method = RequestMethod.POST,
consumes = {MediaType.APPLICATION_JSON_VALUE}) consumes = {MediaType.APPLICATION_JSON_VALUE})
public void executeQueries(@RequestBody @Valid DAQQueries queries, HttpServletResponse res) throws Exception { public void executeQueries(@RequestBody @Valid DAQQueries queries, HttpServletResponse res) throws Exception {

View File

@ -1,5 +1,6 @@
package ch.psi.daq.queryrest.query; package ch.psi.daq.queryrest.query;
import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.stream.Stream; import java.util.stream.Stream;
@ -7,16 +8,20 @@ import java.util.stream.Stream;
import org.apache.commons.lang3.tuple.Triple; import org.apache.commons.lang3.tuple.Triple;
import ch.psi.daq.domain.json.ChannelName; import ch.psi.daq.domain.json.ChannelName;
import ch.psi.daq.domain.json.channels.info.ChannelInfos;
import ch.psi.daq.domain.query.ChannelNameRequest;
import ch.psi.daq.domain.query.DAQQueries; import ch.psi.daq.domain.query.DAQQueries;
import ch.psi.daq.domain.query.DAQQueryElement; import ch.psi.daq.domain.query.DAQQueryElement;
import ch.psi.daq.domain.query.backend.BackendQuery;
import ch.psi.daq.domain.query.channels.ChannelsRequest; import ch.psi.daq.domain.query.channels.ChannelsRequest;
import ch.psi.daq.domain.query.channels.ChannelsResponse; import ch.psi.daq.domain.query.channels.ChannelsResponse;
import ch.psi.daq.query.model.impl.BackendQuery;
public interface QueryManager { public interface QueryManager {
List<ChannelsResponse> getChannels(ChannelsRequest request) throws Exception; List<ChannelsResponse> getChannels(ChannelsRequest request) throws Exception;
List<Entry<DAQQueryElement, Stream<Triple<BackendQuery, ChannelName, ?>>>> executeQueries(DAQQueries queries) Collection<ChannelInfos> getChannelInfos(ChannelNameRequest request) throws Exception;
List<Entry<DAQQueryElement, Stream<Triple<BackendQuery, ChannelName, ?>>>> getEvents(DAQQueries queries)
throws Exception; throws Exception;
} }

View File

@ -1,9 +1,8 @@
package ch.psi.daq.queryrest.query; package ch.psi.daq.queryrest.query;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.LinkedHashMap; import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.function.Function; import java.util.function.Function;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -19,82 +18,44 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContext;
import ch.psi.daq.cassandra.config.CassandraConfig;
import ch.psi.daq.domain.DataEvent; import ch.psi.daq.domain.DataEvent;
import ch.psi.daq.domain.backend.BackendAccess;
import ch.psi.daq.domain.config.DomainConfig; import ch.psi.daq.domain.config.DomainConfig;
import ch.psi.daq.domain.json.ChannelName; import ch.psi.daq.domain.json.ChannelName;
import ch.psi.daq.domain.json.channels.info.ChannelInfos;
import ch.psi.daq.domain.query.ChannelNameRequest;
import ch.psi.daq.domain.query.DAQQueries; import ch.psi.daq.domain.query.DAQQueries;
import ch.psi.daq.domain.query.DAQQueryElement; import ch.psi.daq.domain.query.DAQQueryElement;
import ch.psi.daq.domain.query.backend.BackendQuery;
import ch.psi.daq.domain.query.backend.BackendQueryImpl;
import ch.psi.daq.domain.query.backend.analyzer.BackendQueryAnalyzer;
import ch.psi.daq.domain.query.channels.ChannelNameCache;
import ch.psi.daq.domain.query.channels.ChannelsRequest; import ch.psi.daq.domain.query.channels.ChannelsRequest;
import ch.psi.daq.domain.query.channels.ChannelsResponse; import ch.psi.daq.domain.query.channels.ChannelsResponse;
import ch.psi.daq.domain.reader.Backend; import ch.psi.daq.domain.query.processor.QueryProcessor;
import ch.psi.daq.query.analyzer.QueryAnalyzer; import ch.psi.daq.queryrest.query.model.ChannelInfosStreamImpl;
import ch.psi.daq.query.config.QueryConfig;
import ch.psi.daq.query.model.Query;
import ch.psi.daq.query.model.impl.BackendQuery;
import ch.psi.daq.query.processor.ChannelNameCache;
import ch.psi.daq.query.processor.QueryProcessor;
public class QueryManagerImpl implements QueryManager { public class QueryManagerImpl implements QueryManager {
@SuppressWarnings("unused")
private static final Logger LOGGER = LoggerFactory.getLogger(QueryManagerImpl.class); private static final Logger LOGGER = LoggerFactory.getLogger(QueryManagerImpl.class);
@Resource @Resource
private ApplicationContext appContext; private ApplicationContext appContext;
@Resource @Resource
private Function<Query, QueryAnalyzer> queryAnalizerFactory; private Function<BackendQuery, BackendQueryAnalyzer> queryAnalizerFactory;
private Map<Backend, QueryProcessor> queryProcessors = new LinkedHashMap<>(); @Resource(name = DomainConfig.BEAN_NAME_BACKEND_ACCESS)
private BackendAccess backendAccess;
@Resource(name = DomainConfig.BEAN_NAME_CHANNEL_NAME_CACHE)
private ChannelNameCache channelNameCache; private ChannelNameCache channelNameCache;
@PostConstruct @PostConstruct
public void afterPropertiesSet() { public void afterPropertiesSet() {}
List<Exception> exceptions = new ArrayList<>();
try {
QueryProcessor queryProcessor =
appContext.getBean(QueryConfig.BEAN_NAME_CASSANDRA_QUERY_PROCESSOR, QueryProcessor.class);
queryProcessors.put(queryProcessor.getBackend(), queryProcessor);
} catch (Exception e) {
exceptions.add(e);
LOGGER.warn("");
LOGGER.warn("##########");
LOGGER.warn("Could not load query processor for cassandra.");
LOGGER.warn("##########");
LOGGER.warn("");
}
try {
QueryProcessor queryProcessor =
appContext.getBean(QueryConfig.BEAN_NAME_ARCHIVER_APPLIANCE_QUERY_PROCESSOR, QueryProcessor.class);
queryProcessors.put(queryProcessor.getBackend(), queryProcessor);
} catch (Exception e) {
exceptions.add(e);
LOGGER.warn("");
LOGGER.warn("##########");
LOGGER.warn("Could not load query processor for archiverappliance.");
LOGGER.warn("##########");
LOGGER.warn("");
}
if (queryProcessors.isEmpty()) {
LOGGER.error("No query processor could be loaded! Exceptions were: ");
for (Exception exception : exceptions) {
LOGGER.error("", exception);
}
throw new RuntimeException("No Backends available!");
}
channelNameCache =
new ChannelNameCache(queryProcessors, appContext.getBean(DomainConfig.BEAN_NAME_READ_TIMEOUT,
Integer.class).longValue());
}
@PreDestroy @PreDestroy
public void destroy() { public void destroy() {}
channelNameCache.destroy();
}
@Override @Override
public List<ChannelsResponse> getChannels(ChannelsRequest request) { public List<ChannelsResponse> getChannels(ChannelsRequest request) {
@ -106,8 +67,32 @@ public class QueryManagerImpl implements QueryManager {
return channelNameCache.getChannels(request); return channelNameCache.getChannels(request);
} }
public Collection<ChannelInfos> getChannelInfos(ChannelNameRequest request) {
// set backends if not defined yet
channelNameCache.setBackends(request.getChannels());
Stream<ChannelInfos> stream = request.getRequestsByBackend().entrySet().stream()
.filter(entry ->
backendAccess.hasDataReader(entry.getKey())
&& backendAccess.hasChannelInfoReader(entry.getKey()))
.flatMap(entry -> {
return entry.getValue().getChannelInfos(entry.getKey(), backendAccess)
.entrySet().stream()
.map(innerEntry -> {
return new ChannelInfosStreamImpl(
new ChannelName(innerEntry.getKey(), entry.getKey()),
innerEntry.getValue()
);
}
);
});
// materialize
return stream.collect(Collectors.toList());
}
@Override @Override
public List<Entry<DAQQueryElement, Stream<Triple<BackendQuery, ChannelName, ?>>>> executeQueries(DAQQueries queries) { public List<Entry<DAQQueryElement, Stream<Triple<BackendQuery, ChannelName, ?>>>> getEvents(DAQQueries queries) {
// set backends if not defined yet // set backends if not defined yet
channelNameCache.setBackends(queries); channelNameCache.setBackends(queries);
@ -116,27 +101,25 @@ public class QueryManagerImpl implements QueryManager {
for (DAQQueryElement queryElement : queries) { for (DAQQueryElement queryElement : queries) {
Stream<Triple<BackendQuery, ChannelName, ?>> resultStreams = Stream<Triple<BackendQuery, ChannelName, ?>> resultStreams =
BackendQuery BackendQueryImpl
.getBackendQueries(queryElement) .getBackendQueries(queryElement)
.stream() .stream()
.filter(query -> { .filter(
QueryProcessor processor = queryProcessors.get(query.getBackend()); query ->
if (processor != null) { backendAccess.hasDataReader(query.getBackend())
return true; && backendAccess.hasQueryProcessor(query.getBackend())
} else { )
LOGGER.warn("There is no QueryProcessor available for '{}'", query.getBackend()); .flatMap(
return false; query -> {
} QueryProcessor processor = backendAccess.getQueryProcessor(query.getBackend());
}) BackendQueryAnalyzer queryAnalizer = queryAnalizerFactory.apply(query);
.flatMap(query -> {
QueryProcessor processor = queryProcessors.get(query.getBackend());
QueryAnalyzer queryAnalizer = queryAnalizerFactory.apply(query);
// all the magic happens here /* all the magic happens here */
Stream<Entry<ChannelName, Stream<? extends DataEvent>>> channelToDataEvents = Stream<Entry<ChannelName, Stream<? extends DataEvent>>> channelToDataEvents =
processor.process(queryAnalizer); processor.process(queryAnalizer);
// do post-process /* do post-process */
Stream<Entry<ChannelName, ?>> channelToData = queryAnalizer.postProcess(channelToDataEvents); Stream<Entry<ChannelName, ?>> channelToData =
queryAnalizer.postProcess(channelToDataEvents);
return channelToData.map(entry -> { return channelToData.map(entry -> {
return Triple.of(query, entry.getKey(), entry.getValue()); return Triple.of(query, entry.getKey(), entry.getValue());

View File

@ -0,0 +1,44 @@
package ch.psi.daq.queryrest.query.model;
import java.util.Iterator;
import java.util.stream.Stream;
import com.fasterxml.jackson.annotation.JsonIgnore;
import ch.psi.daq.domain.json.ChannelName;
import ch.psi.daq.domain.json.channels.info.ChannelInfo;
import ch.psi.daq.domain.json.channels.info.ChannelInfos;
public class ChannelInfosStreamImpl implements ChannelInfos {
private ChannelName channel;
private Stream<? extends ChannelInfo> infos;
public ChannelInfosStreamImpl() {}
public ChannelInfosStreamImpl(ChannelName channel, Stream<? extends ChannelInfo> infos) {
this.channel = channel;
this.infos = infos;
}
@Override
public ChannelName getChannel() {
return channel;
}
public Stream<? extends ChannelInfo> getInfos() {
// can only be consumed once
return infos;
}
@JsonIgnore
@Override
public Iterator<ChannelInfo> iterator() {
return getChannelInfos().iterator();
}
@JsonIgnore
@Override
public Stream<ChannelInfo> getChannelInfos() {
return infos.map(info -> (ChannelInfo) info);
}
}

View File

@ -11,7 +11,7 @@ import org.apache.commons.lang3.tuple.Triple;
import ch.psi.daq.domain.json.ChannelName; import ch.psi.daq.domain.json.ChannelName;
import ch.psi.daq.domain.query.DAQQueryElement; import ch.psi.daq.domain.query.DAQQueryElement;
import ch.psi.daq.query.model.impl.BackendQuery; import ch.psi.daq.domain.query.backend.BackendQuery;
public interface ResponseStreamWriter { public interface ResponseStreamWriter {

View File

@ -18,11 +18,12 @@ import ch.psi.daq.domain.FieldNames;
import ch.psi.daq.domain.json.ChannelName; import ch.psi.daq.domain.json.ChannelName;
import ch.psi.daq.domain.query.DAQQueries; import ch.psi.daq.domain.query.DAQQueries;
import ch.psi.daq.domain.query.DAQQueryElement; import ch.psi.daq.domain.query.DAQQueryElement;
import ch.psi.daq.domain.query.backend.BackendQuery;
import ch.psi.daq.domain.query.backend.BackendQueryImpl;
import ch.psi.daq.domain.query.operation.AggregationType; import ch.psi.daq.domain.query.operation.AggregationType;
import ch.psi.daq.domain.query.operation.Compression; import ch.psi.daq.domain.query.operation.Compression;
import ch.psi.daq.domain.query.operation.QueryField; import ch.psi.daq.domain.query.operation.QueryField;
import ch.psi.daq.domain.query.operation.ResponseFormat; import ch.psi.daq.domain.query.operation.ResponseFormat;
import ch.psi.daq.query.model.impl.BackendQuery;
import ch.psi.daq.queryrest.query.QueryManager; import ch.psi.daq.queryrest.query.QueryManager;
import ch.psi.daq.queryrest.response.AbstractHTTPResponse; import ch.psi.daq.queryrest.response.AbstractHTTPResponse;
@ -56,7 +57,7 @@ public class CSVHTTPResponse extends AbstractHTTPResponse {
// execute query // execute query
List<Entry<DAQQueryElement, Stream<Triple<BackendQuery, ChannelName, ?>>>> result = List<Entry<DAQQueryElement, Stream<Triple<BackendQuery, ChannelName, ?>>>> result =
queryManager.executeQueries(queries); queryManager.getEvents(queries);
// write the response back to the client using java 8 streams // write the response back to the client using java 8 streams
streamWriter.respond(result, out); streamWriter.respond(result, out);
} catch (Exception e) { } catch (Exception e) {

View File

@ -34,12 +34,12 @@ import ch.psi.daq.common.stream.StreamMatcher;
import ch.psi.daq.domain.DataEvent; import ch.psi.daq.domain.DataEvent;
import ch.psi.daq.domain.json.ChannelName; import ch.psi.daq.domain.json.ChannelName;
import ch.psi.daq.domain.query.DAQQueryElement; import ch.psi.daq.domain.query.DAQQueryElement;
import ch.psi.daq.domain.query.backend.BackendQuery;
import ch.psi.daq.domain.query.backend.BackendQueryImpl;
import ch.psi.daq.domain.query.backend.analyzer.BackendQueryAnalyzer;
import ch.psi.daq.domain.query.operation.Aggregation; import ch.psi.daq.domain.query.operation.Aggregation;
import ch.psi.daq.domain.query.operation.Extrema; import ch.psi.daq.domain.query.operation.Extrema;
import ch.psi.daq.domain.query.operation.QueryField; import ch.psi.daq.domain.query.operation.QueryField;
import ch.psi.daq.query.analyzer.QueryAnalyzer;
import ch.psi.daq.query.model.Query;
import ch.psi.daq.query.model.impl.BackendQuery;
import ch.psi.daq.queryrest.response.ResponseStreamWriter; import ch.psi.daq.queryrest.response.ResponseStreamWriter;
/** /**
@ -61,7 +61,7 @@ public class CSVResponseStreamWriter implements ResponseStreamWriter {
.getGlobalMillis() / 10L; .getGlobalMillis() / 10L;
@Resource @Resource
private Function<Query, QueryAnalyzer> queryAnalizerFactory; private Function<BackendQuery, BackendQueryAnalyzer> queryAnalizerFactory;
@Override @Override
public void respond(final List<Entry<DAQQueryElement, Stream<Triple<BackendQuery, ChannelName, ?>>>> results, public void respond(final List<Entry<DAQQueryElement, Stream<Triple<BackendQuery, ChannelName, ?>>>> results,
@ -158,7 +158,7 @@ public class CSVResponseStreamWriter implements ResponseStreamWriter {
daqQuery.getAggregation() != null ? daqQuery.getAggregation().getAggregations() : null; daqQuery.getAggregation() != null ? daqQuery.getAggregation().getAggregations() : null;
List<Extrema> extrema = daqQuery.getAggregation() != null ? daqQuery.getAggregation().getExtrema() : null; List<Extrema> extrema = daqQuery.getAggregation() != null ? daqQuery.getAggregation().getExtrema() : null;
QueryAnalyzer queryAnalyzer = queryAnalizerFactory.apply(backendQuery); BackendQueryAnalyzer queryAnalyzer = queryAnalizerFactory.apply(backendQuery);
for (QueryField field : queryFields) { for (QueryField field : queryFields) {
if (!(QueryField.value.equals(field) && queryAnalyzer.isAggregationEnabled())) { if (!(QueryField.value.equals(field) && queryAnalyzer.isAggregationEnabled())) {

View File

@ -16,9 +16,9 @@ import org.springframework.http.MediaType;
import ch.psi.daq.domain.json.ChannelName; import ch.psi.daq.domain.json.ChannelName;
import ch.psi.daq.domain.query.DAQQueries; import ch.psi.daq.domain.query.DAQQueries;
import ch.psi.daq.domain.query.DAQQueryElement; import ch.psi.daq.domain.query.DAQQueryElement;
import ch.psi.daq.domain.query.backend.BackendQuery;
import ch.psi.daq.domain.query.operation.Compression; import ch.psi.daq.domain.query.operation.Compression;
import ch.psi.daq.domain.query.operation.ResponseFormat; import ch.psi.daq.domain.query.operation.ResponseFormat;
import ch.psi.daq.query.model.impl.BackendQuery;
import ch.psi.daq.queryrest.query.QueryManager; import ch.psi.daq.queryrest.query.QueryManager;
import ch.psi.daq.queryrest.response.AbstractHTTPResponse; import ch.psi.daq.queryrest.response.AbstractHTTPResponse;
@ -48,7 +48,7 @@ public class JSONHTTPResponse extends AbstractHTTPResponse {
JSONResponseStreamWriter streamWriter = context.getBean(JSONResponseStreamWriter.class); JSONResponseStreamWriter streamWriter = context.getBean(JSONResponseStreamWriter.class);
// execute query // execute query
List<Entry<DAQQueryElement, Stream<Triple<BackendQuery, ChannelName, ?>>>> result = queryManager.executeQueries(queries); List<Entry<DAQQueryElement, Stream<Triple<BackendQuery, ChannelName, ?>>>> result = queryManager.getEvents(queries);
// write the response back to the client using java 8 streams // write the response back to the client using java 8 streams
streamWriter.respond(result, out); streamWriter.respond(result, out);
} catch (Exception e) { } catch (Exception e) {

View File

@ -25,10 +25,10 @@ import com.fasterxml.jackson.databind.ser.impl.SimpleFilterProvider;
import ch.psi.daq.domain.json.ChannelName; import ch.psi.daq.domain.json.ChannelName;
import ch.psi.daq.domain.query.DAQQueryElement; import ch.psi.daq.domain.query.DAQQueryElement;
import ch.psi.daq.domain.query.backend.BackendQuery;
import ch.psi.daq.domain.query.operation.Aggregation; import ch.psi.daq.domain.query.operation.Aggregation;
import ch.psi.daq.domain.query.operation.Extrema; import ch.psi.daq.domain.query.operation.Extrema;
import ch.psi.daq.domain.query.operation.QueryField; import ch.psi.daq.domain.query.operation.QueryField;
import ch.psi.daq.query.model.impl.BackendQuery;
import ch.psi.daq.queryrest.response.ResponseStreamWriter; import ch.psi.daq.queryrest.response.ResponseStreamWriter;
/** /**

View File

@ -21,6 +21,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import ch.psi.daq.queryrest.QueryRestApplication; import ch.psi.daq.queryrest.QueryRestApplication;
import ch.psi.daq.queryrest.config.QueryRestConfig; import ch.psi.daq.queryrest.config.QueryRestConfig;
import ch.psi.daq.test.cassandra.CassandraDaqUnitDependencyInjectionTestExecutionListener; import ch.psi.daq.test.cassandra.CassandraDaqUnitDependencyInjectionTestExecutionListener;
import ch.psi.daq.test.queryrest.config.DaqWebMvcConfig;
@TestExecutionListeners({ @TestExecutionListeners({

View File

@ -1,4 +1,7 @@
package ch.psi.daq.test.queryrest; package ch.psi.daq.test.queryrest.config;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.ComponentScan;
@ -10,17 +13,19 @@ import org.springframework.context.annotation.PropertySources;
import org.springframework.web.servlet.config.annotation.EnableWebMvc; import org.springframework.web.servlet.config.annotation.EnableWebMvc;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurationSupport; import org.springframework.web.servlet.config.annotation.WebMvcConfigurationSupport;
import ch.psi.daq.archiverappliance.config.ArchiverApplianceConfig;
import ch.psi.daq.cassandra.config.CassandraConfig;
import ch.psi.daq.cassandra.reader.CassandraReader; import ch.psi.daq.cassandra.reader.CassandraReader;
import ch.psi.daq.domain.backend.Backend;
import ch.psi.daq.domain.backend.BackendAccess;
import ch.psi.daq.domain.config.DomainConfig;
import ch.psi.daq.domain.query.processor.QueryProcessor;
import ch.psi.daq.domain.reader.DataReader; import ch.psi.daq.domain.reader.DataReader;
import ch.psi.daq.domain.status.StatusReader;
import ch.psi.daq.query.config.QueryConfig; import ch.psi.daq.query.config.QueryConfig;
import ch.psi.daq.query.processor.QueryProcessor;
import ch.psi.daq.query.processor.QueryProcessorLocal; import ch.psi.daq.query.processor.QueryProcessorLocal;
import ch.psi.daq.test.query.config.LocalQueryTestConfig; import ch.psi.daq.test.query.config.LocalQueryTestConfig;
import ch.psi.daq.test.queryrest.query.DummyArchiverApplianceReader; import ch.psi.daq.test.queryrest.query.DummyArchiverApplianceReader;
import ch.psi.daq.test.queryrest.query.DummyCassandraReader; import ch.psi.daq.test.queryrest.query.DummyCassandraReader;
import ch.psi.daq.test.queryrest.status.DummyArchiverApplianceStatusReader;
import ch.psi.daq.test.queryrest.status.DummyCassandraStatusReader;
@Configuration @Configuration
@ComponentScan @ComponentScan
@ -36,33 +41,33 @@ public class DaqWebMvcConfig extends WebMvcConfigurationSupport {
static class InnerConfiguration { static class InnerConfiguration {
} }
@Bean(name = QueryConfig.BEAN_NAME_CASSANDRA_QUERY_PROCESSOR) @Resource(name = DomainConfig.BEAN_NAME_BACKEND_ACCESS)
@Lazy private BackendAccess backendAccess;
public QueryProcessor cassandraQueryProcessor() {
return new QueryProcessorLocal(cassandraReader()); @PostConstruct
public void afterPropertiesSet() {
backendAccess.addStreamEventReaderSupplier(Backend.SF_DATABUFFER, () -> cassandraReader());
backendAccess.addChannelInfoReaderSupplier(Backend.SF_DATABUFFER, () -> cassandraReader());
backendAccess.addDataReaderSupplier(Backend.SF_ARCHIVERAPPLIANCE, () -> archiverApplianceReader());
} }
@Bean // make sure we use a local QueryProcessor even for distributed calls -> no Hazelcast needs to be started
@Bean(name = QueryConfig.BEAN_NAME_QUERY_PROCESSOR_DISTRIBUTED)
@Lazy
public QueryProcessor distributedQueryProcessor() {
return new QueryProcessorLocal();
}
@Bean(name = CassandraConfig.BEAN_NAME_CASSANDRA_READER)
@Lazy @Lazy
public CassandraReader cassandraReader() { public CassandraReader cassandraReader() {
return new DummyCassandraReader(); return new DummyCassandraReader();
} }
@Bean @Bean(name = ArchiverApplianceConfig.BEAN_NAME_ARCHIVER_APPLIANCE_READER)
@Lazy @Lazy
public DataReader archiverApplianceReader() { public DataReader archiverApplianceReader() {
return new DummyArchiverApplianceReader(); return new DummyArchiverApplianceReader();
} }
@Bean(name = QueryConfig.BEAN_NAME_CASSANDRA_STATUS_READER)
@Lazy
public StatusReader cassandraStatusReader() {
return new DummyCassandraStatusReader();
}
@Bean(name = QueryConfig.BEAN_NAME_ARCHIVER_APPLIANCE_STATUS_READER)
@Lazy
public StatusReader archiverApplianceStatusReader() {
return new DummyArchiverApplianceStatusReader();
}
} }

View File

@ -0,0 +1,173 @@
package ch.psi.daq.test.queryrest.controller;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import java.util.List;
import java.util.stream.Collectors;
import org.junit.After;
import org.junit.Test;
import org.springframework.http.MediaType;
import org.springframework.test.web.servlet.MvcResult;
import org.springframework.test.web.servlet.request.MockMvcRequestBuilders;
import org.springframework.test.web.servlet.result.MockMvcResultHandlers;
import org.springframework.test.web.servlet.result.MockMvcResultMatchers;
import com.fasterxml.jackson.databind.ObjectMapper;
import ch.psi.bsread.message.Type;
import ch.psi.daq.common.ordering.Ordering;
import ch.psi.daq.common.time.TimeUtils;
import ch.psi.daq.domain.backend.Backend;
import ch.psi.daq.domain.config.DomainConfig;
import ch.psi.daq.domain.json.channels.info.ChannelInfo;
import ch.psi.daq.domain.json.channels.info.ChannelInfos;
import ch.psi.daq.domain.json.channels.info.ChannelInfosList;
import ch.psi.daq.domain.query.ChannelNameRequest;
import ch.psi.daq.domain.request.range.RequestRangePulseId;
import ch.psi.daq.test.queryrest.AbstractDaqRestTest;
/**
* Tests the {@link DaqController} implementation.
*/
public class QueryRestControllerChannelInfoTest extends AbstractDaqRestTest {
private ObjectMapper objectMapper = new ObjectMapper();
@After
public void tearDown() throws Exception {}
@Test
public void testChannelInfoQuery_01() throws Exception {
ChannelNameRequest query = new ChannelNameRequest(
new RequestRangePulseId(
100,
101),
"DataBuffer1", "DataBuffer2");
String content = mapper.writeValueAsString(query);
System.out.println(content);
MvcResult result = this.mockMvc.perform(MockMvcRequestBuilders
.post(DomainConfig.PATH_CHANNELS_INFO)
.contentType(MediaType.APPLICATION_JSON)
.content(content))
.andDo(MockMvcResultHandlers.print())
.andExpect(MockMvcResultMatchers.status().isOk())
.andReturn();
String response = result.getResponse().getContentAsString();
System.out.println("Response: " + response);
// test conversion used in DAQProcessing
List<? extends ChannelInfos> infosList = objectMapper.readValue(response, ChannelInfosList.class);
assertEquals(2, infosList.size());
ChannelInfos cInfos = infosList.get(0);
assertEquals("DataBuffer1", cInfos.getChannel().getName());
assertEquals(Backend.SF_DATABUFFER, cInfos.getChannel().getBackend());
List<ChannelInfo> infos = cInfos.getChannelInfos().collect(Collectors.toList());
assertEquals(2, infos.size());
ChannelInfo info = infos.get(0);
assertEquals("DataBuffer1", info.getChannel());
assertEquals(Backend.SF_DATABUFFER, info.getBackend());
assertEquals(TimeUtils.getTimeFromMillis(query.getRange().getStartPulseId() * 10, 0), info.getGlobalTime());
assertEquals(query.getRange().getStartPulseId(), info.getPulseId());
assertArrayEquals(new int[] {1}, info.getShape());
assertEquals(Type.Int32.getKey(), info.getType());
info = infos.get(1);
assertEquals("DataBuffer1", info.getChannel());
assertEquals(Backend.SF_DATABUFFER, info.getBackend());
assertEquals(TimeUtils.getTimeFromMillis(query.getRange().getEndPulseId() * 10, 0), info.getGlobalTime());
assertEquals(query.getRange().getEndPulseId(), info.getPulseId());
assertArrayEquals(new int[] {1}, info.getShape());
assertEquals(Type.Int32.getKey(), info.getType());
cInfos = infosList.get(1);
assertEquals("DataBuffer2", cInfos.getChannel().getName());
assertEquals(Backend.SF_DATABUFFER, cInfos.getChannel().getBackend());
infos = cInfos.getChannelInfos().collect(Collectors.toList());
assertEquals(2, infos.size());
info = infos.get(0);
assertEquals("DataBuffer2", info.getChannel());
assertEquals(Backend.SF_DATABUFFER, info.getBackend());
assertEquals(TimeUtils.getTimeFromMillis(query.getRange().getStartPulseId() * 10, 0), info.getGlobalTime());
assertEquals(query.getRange().getStartPulseId(), info.getPulseId());
assertArrayEquals(new int[] {1}, info.getShape());
assertEquals(Type.Int32.getKey(), info.getType());
info = infos.get(1);
assertEquals("DataBuffer2", info.getChannel());
assertEquals(Backend.SF_DATABUFFER, info.getBackend());
assertEquals(TimeUtils.getTimeFromMillis(query.getRange().getEndPulseId() * 10, 0), info.getGlobalTime());
assertEquals(query.getRange().getEndPulseId(), info.getPulseId());
assertArrayEquals(new int[] {1}, info.getShape());
assertEquals(Type.Int32.getKey(), info.getType());
}
@Test
public void testChannelInfoQuery_02() throws Exception {
ChannelNameRequest query = new ChannelNameRequest(
new RequestRangePulseId(
100,
101),
"DataBuffer1", "DataBuffer2");
query.setOrdering(Ordering.desc);
String content = mapper.writeValueAsString(query);
System.out.println(content);
MvcResult result = this.mockMvc.perform(MockMvcRequestBuilders
.post(DomainConfig.PATH_CHANNELS_INFO)
.contentType(MediaType.APPLICATION_JSON)
.content(content))
.andDo(MockMvcResultHandlers.print())
.andExpect(MockMvcResultMatchers.status().isOk())
.andReturn();
String response = result.getResponse().getContentAsString();
System.out.println("Response: " + response);
// test conversion used in DAQProcessing
List<? extends ChannelInfos> infosList = objectMapper.readValue(response, ChannelInfosList.class);
assertEquals(2, infosList.size());
ChannelInfos cInfos = infosList.get(0);
assertEquals("DataBuffer1", cInfos.getChannel().getName());
assertEquals(Backend.SF_DATABUFFER, cInfos.getChannel().getBackend());
List<ChannelInfo> infos = cInfos.getChannelInfos().collect(Collectors.toList());
assertEquals(2, infos.size());
ChannelInfo info = infos.get(0);
assertEquals("DataBuffer1", info.getChannel());
assertEquals(Backend.SF_DATABUFFER, info.getBackend());
assertEquals(TimeUtils.getTimeFromMillis(query.getRange().getEndPulseId() * 10, 0), info.getGlobalTime());
assertEquals(query.getRange().getEndPulseId(), info.getPulseId());
assertArrayEquals(new int[] {1}, info.getShape());
assertEquals(Type.Int32.getKey(), info.getType());
info = infos.get(1);
assertEquals("DataBuffer1", info.getChannel());
assertEquals(Backend.SF_DATABUFFER, info.getBackend());
assertEquals(TimeUtils.getTimeFromMillis(query.getRange().getStartPulseId() * 10, 0), info.getGlobalTime());
assertEquals(query.getRange().getStartPulseId(), info.getPulseId());
assertArrayEquals(new int[] {1}, info.getShape());
assertEquals(Type.Int32.getKey(), info.getType());
cInfos = infosList.get(1);
assertEquals("DataBuffer2", cInfos.getChannel().getName());
assertEquals(Backend.SF_DATABUFFER, cInfos.getChannel().getBackend());
infos = cInfos.getChannelInfos().collect(Collectors.toList());
assertEquals(2, infos.size());
info = infos.get(0);
assertEquals("DataBuffer2", info.getChannel());
assertEquals(Backend.SF_DATABUFFER, info.getBackend());
assertEquals(TimeUtils.getTimeFromMillis(query.getRange().getEndPulseId() * 10, 0), info.getGlobalTime());
assertEquals(query.getRange().getEndPulseId(), info.getPulseId());
assertArrayEquals(new int[] {1}, info.getShape());
assertEquals(Type.Int32.getKey(), info.getType());
info = infos.get(1);
assertEquals("DataBuffer2", info.getChannel());
assertEquals(Backend.SF_DATABUFFER, info.getBackend());
assertEquals(TimeUtils.getTimeFromMillis(query.getRange().getStartPulseId() * 10, 0), info.getGlobalTime());
assertEquals(query.getRange().getStartPulseId(), info.getPulseId());
assertArrayEquals(new int[] {1}, info.getShape());
assertEquals(Type.Int32.getKey(), info.getType());
}
}

View File

@ -1,122 +0,0 @@
package ch.psi.daq.test.queryrest.controller;
import static org.junit.Assert.assertFalse;
import org.junit.After;
import org.junit.Test;
import org.springframework.http.MediaType;
import org.springframework.test.web.servlet.MvcResult;
import org.springframework.test.web.servlet.request.MockMvcRequestBuilders;
import org.springframework.test.web.servlet.result.MockMvcResultHandlers;
import org.springframework.test.web.servlet.result.MockMvcResultMatchers;
import ch.psi.daq.common.time.TimeUtils;
import ch.psi.daq.domain.json.ChannelName;
import ch.psi.daq.domain.query.status.channel.ChannelStatusQuery;
import ch.psi.daq.domain.reader.Backend;
import ch.psi.daq.queryrest.controller.QueryRestController;
import ch.psi.daq.test.queryrest.AbstractDaqRestTest;
/**
* Tests the {@link DaqController} implementation.
*/
public class QueryRestControllerChannelStatusTest extends AbstractDaqRestTest {
@After
public void tearDown() throws Exception {}
@Test
public void testChannelStatusQuery_01() throws Exception {
ChannelStatusQuery query = new ChannelStatusQuery("DataBuffer1", "DataBuffer2");
String content = mapper.writeValueAsString(query);
System.out.println(content);
this.mockMvc
.perform(MockMvcRequestBuilders
.post(QueryRestController.PATH_STATUS_CHANNELS)
.contentType(MediaType.APPLICATION_JSON)
.content(content))
.andExpect(MockMvcResultMatchers.status().isOk())
.andExpect(MockMvcResultMatchers.jsonPath("$").isArray())
.andExpect(MockMvcResultMatchers.jsonPath("$[0]").exists())
.andExpect(MockMvcResultMatchers.jsonPath("$[0].channel.backend").value(Backend.SF_DATABUFFER.getKey()))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].channel.name").value("DataBuffer1"))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].recording").value(false))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].connected").value(false))
.andExpect(
MockMvcResultMatchers.jsonPath("$[0].lastEventDate").value(
TimeUtils.format(TimeUtils.getTimeFromMillis(0, 0))))
.andExpect(MockMvcResultMatchers.jsonPath("$[1]").exists())
.andExpect(MockMvcResultMatchers.jsonPath("$[1].channel.backend").value(Backend.SF_DATABUFFER.getKey()))
.andExpect(MockMvcResultMatchers.jsonPath("$[1].channel.name").value("DataBuffer2"))
.andExpect(MockMvcResultMatchers.jsonPath("$[1].recording").value(false))
.andExpect(MockMvcResultMatchers.jsonPath("$[1].connected").value(false))
.andExpect(
MockMvcResultMatchers.jsonPath("$[1].lastEventDate").value(
TimeUtils.format(TimeUtils.getTimeFromMillis(0, 0))))
.andExpect(MockMvcResultMatchers.jsonPath("$[2]").doesNotExist());
}
@Test
public void testChannelStatusQuery_02() throws Exception {
ChannelStatusQuery query = new ChannelStatusQuery();
query.addChannel(new ChannelName("ArchiverAppliance1", Backend.SF_ARCHIVERAPPLIANCE));
query.addChannel(new ChannelName("ArchiverAppliance2", Backend.SF_ARCHIVERAPPLIANCE));
String content = mapper.writeValueAsString(query);
System.out.println(content);
this.mockMvc
.perform(MockMvcRequestBuilders
.post(QueryRestController.PATH_STATUS_CHANNELS)
.contentType(MediaType.APPLICATION_JSON)
.content(content))
.andExpect(MockMvcResultMatchers.status().isOk())
.andExpect(MockMvcResultMatchers.jsonPath("$").isArray())
.andExpect(MockMvcResultMatchers.jsonPath("$[0]").exists())
.andExpect(
MockMvcResultMatchers.jsonPath("$[0].channel.backend").value(Backend.SF_ARCHIVERAPPLIANCE.getKey()))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].channel.name").value("ArchiverAppliance1"))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].recording").value(true))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].connected").value(true))
.andExpect(
MockMvcResultMatchers.jsonPath("$[0].lastEventDate").value(
TimeUtils.format(TimeUtils.getTimeFromMillis(1467638000000L, 0))))
.andExpect(MockMvcResultMatchers.jsonPath("$[1]").exists())
.andExpect(
MockMvcResultMatchers.jsonPath("$[1].channel.backend").value(Backend.SF_ARCHIVERAPPLIANCE.getKey()))
.andExpect(MockMvcResultMatchers.jsonPath("$[1].channel.name").value("ArchiverAppliance2"))
.andExpect(MockMvcResultMatchers.jsonPath("$[1].recording").value(true))
.andExpect(MockMvcResultMatchers.jsonPath("$[1].connected").value(true))
.andExpect(
MockMvcResultMatchers.jsonPath("$[1].lastEventDate").value(
TimeUtils.format(TimeUtils.getTimeFromMillis(1467638000000L, 0))))
.andExpect(MockMvcResultMatchers.jsonPath("$[2]").doesNotExist());
}
@Test
public void testChannelStatusQuery_03() throws Exception {
ChannelStatusQuery query = new ChannelStatusQuery();
query.addChannel(new ChannelName("ArchiverAppliance1", Backend.SF_ARCHIVERAPPLIANCE));
query.addChannel(new ChannelName("ArchiverAppliance2", Backend.SF_ARCHIVERAPPLIANCE));
String content = mapper.writeValueAsString(query);
System.out.println(content);
MvcResult result = this.mockMvc.perform(MockMvcRequestBuilders
.post(QueryRestController.PATH_STATUS_CHANNELS)
.contentType(MediaType.APPLICATION_JSON)
.content(content))
.andDo(MockMvcResultHandlers.print())
.andExpect(MockMvcResultMatchers.status().isOk())
.andReturn();
String response = result.getResponse().getContentAsString();
System.out.println("Response: " + response);
assertFalse(response.contains("lastEventTime"));
}
}

View File

@ -25,6 +25,7 @@ import org.springframework.test.web.servlet.result.MockMvcResultMatchers;
import ch.psi.daq.common.ordering.Ordering; import ch.psi.daq.common.ordering.Ordering;
import ch.psi.daq.common.time.TimeUtils; import ch.psi.daq.common.time.TimeUtils;
import ch.psi.daq.domain.config.DomainConfig;
import ch.psi.daq.domain.query.DAQQueries; import ch.psi.daq.domain.query.DAQQueries;
import ch.psi.daq.domain.query.DAQQuery; import ch.psi.daq.domain.query.DAQQuery;
import ch.psi.daq.domain.query.DAQQueryElement; import ch.psi.daq.domain.query.DAQQueryElement;
@ -38,7 +39,6 @@ import ch.psi.daq.domain.request.range.RequestRangeDate;
import ch.psi.daq.domain.request.range.RequestRangePulseId; import ch.psi.daq.domain.request.range.RequestRangePulseId;
import ch.psi.daq.domain.request.range.RequestRangeTime; import ch.psi.daq.domain.request.range.RequestRangeTime;
import ch.psi.daq.domain.test.TestTimeUtils; import ch.psi.daq.domain.test.TestTimeUtils;
import ch.psi.daq.queryrest.controller.QueryRestController;
import ch.psi.daq.queryrest.response.csv.CSVHTTPResponse; import ch.psi.daq.queryrest.response.csv.CSVHTTPResponse;
import ch.psi.daq.queryrest.response.csv.CSVResponseStreamWriter; import ch.psi.daq.queryrest.response.csv.CSVResponseStreamWriter;
import ch.psi.daq.test.queryrest.AbstractDaqRestTest; import ch.psi.daq.test.queryrest.AbstractDaqRestTest;
@ -83,7 +83,7 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest {
MvcResult result = this.mockMvc MvcResult result = this.mockMvc
.perform(MockMvcRequestBuilders .perform(MockMvcRequestBuilders
.post(QueryRestController.PATH_QUERY) .post(DomainConfig.PATH_QUERY)
.contentType(MediaType.APPLICATION_JSON) .contentType(MediaType.APPLICATION_JSON)
.content(content)) .content(content))
.andDo(MockMvcResultHandlers.print()) .andDo(MockMvcResultHandlers.print())
@ -154,8 +154,10 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest {
List<String> channels = Arrays.asList(channel_01, channel_02, channel_03); List<String> channels = Arrays.asList(channel_01, channel_02, channel_03);
DAQQuery request = new DAQQuery( DAQQuery request = new DAQQuery(
new RequestRangePulseId( new RequestRangePulseId(
-1, // dummy range as range is defined by channel_Seq (see
-1), // DummyCassandraReader.getDummyEventStream())
0,
0),
channels); channels);
request.setResponse(new CSVHTTPResponse()); request.setResponse(new CSVHTTPResponse());
@ -171,7 +173,7 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest {
MvcResult result = this.mockMvc MvcResult result = this.mockMvc
.perform(MockMvcRequestBuilders .perform(MockMvcRequestBuilders
.post(QueryRestController.PATH_QUERY) .post(DomainConfig.PATH_QUERY)
.contentType(MediaType.APPLICATION_JSON) .contentType(MediaType.APPLICATION_JSON)
.content(content)) .content(content))
.andDo(MockMvcResultHandlers.print()) .andDo(MockMvcResultHandlers.print())
@ -244,8 +246,10 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest {
List<String> channels = Arrays.asList(channel_01, channel_02, channel_03); List<String> channels = Arrays.asList(channel_01, channel_02, channel_03);
DAQQuery request = new DAQQuery( DAQQuery request = new DAQQuery(
new RequestRangePulseId( new RequestRangePulseId(
-1, // dummy range as range is defined by channel_Seq (see
-1), // DummyCassandraReader.getDummyEventStream())
0,
0),
channels); channels);
request.setResponse(new CSVHTTPResponse()); request.setResponse(new CSVHTTPResponse());
@ -259,7 +263,7 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest {
MvcResult result = this.mockMvc MvcResult result = this.mockMvc
.perform(MockMvcRequestBuilders .perform(MockMvcRequestBuilders
.post(QueryRestController.PATH_QUERY) .post(DomainConfig.PATH_QUERY)
.contentType(MediaType.APPLICATION_JSON) .contentType(MediaType.APPLICATION_JSON)
.content(content)) .content(content))
.andDo(MockMvcResultHandlers.print()) .andDo(MockMvcResultHandlers.print())
@ -356,7 +360,7 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest {
MvcResult result = this.mockMvc MvcResult result = this.mockMvc
.perform(MockMvcRequestBuilders .perform(MockMvcRequestBuilders
.post(QueryRestController.PATH_QUERIES) .post(DomainConfig.PATH_QUERIES)
.contentType(MediaType.APPLICATION_JSON) .contentType(MediaType.APPLICATION_JSON)
.content(content)) .content(content))
.andDo(MockMvcResultHandlers.print()) .andDo(MockMvcResultHandlers.print())
@ -439,7 +443,7 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest {
MvcResult result = this.mockMvc MvcResult result = this.mockMvc
.perform(MockMvcRequestBuilders .perform(MockMvcRequestBuilders
.post(QueryRestController.PATH_QUERY) .post(DomainConfig.PATH_QUERY)
.contentType(MediaType.APPLICATION_JSON) .contentType(MediaType.APPLICATION_JSON)
.content(content)) .content(content))
.andDo(MockMvcResultHandlers.print()) .andDo(MockMvcResultHandlers.print())
@ -523,7 +527,7 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest {
MvcResult result = this.mockMvc MvcResult result = this.mockMvc
.perform(MockMvcRequestBuilders .perform(MockMvcRequestBuilders
.post(QueryRestController.PATH_QUERY) .post(DomainConfig.PATH_QUERY)
.contentType(MediaType.APPLICATION_JSON) .contentType(MediaType.APPLICATION_JSON)
.content(content)) .content(content))
.andDo(MockMvcResultHandlers.print()) .andDo(MockMvcResultHandlers.print())
@ -610,7 +614,7 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest {
MvcResult result = this.mockMvc MvcResult result = this.mockMvc
.perform(MockMvcRequestBuilders .perform(MockMvcRequestBuilders
.post(QueryRestController.PATH_QUERY) .post(DomainConfig.PATH_QUERY)
.contentType(MediaType.APPLICATION_JSON) .contentType(MediaType.APPLICATION_JSON)
.content(content)) .content(content))
.andDo(MockMvcResultHandlers.print()) .andDo(MockMvcResultHandlers.print())
@ -684,7 +688,7 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest {
try { try {
this.mockMvc this.mockMvc
.perform(MockMvcRequestBuilders .perform(MockMvcRequestBuilders
.post(QueryRestController.PATH_QUERY) .post(DomainConfig.PATH_QUERY)
.contentType(MediaType.APPLICATION_JSON) .contentType(MediaType.APPLICATION_JSON)
.content(content)) .content(content))
.andDo(MockMvcResultHandlers.print()); .andDo(MockMvcResultHandlers.print());
@ -714,7 +718,7 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest {
try { try {
this.mockMvc this.mockMvc
.perform(MockMvcRequestBuilders .perform(MockMvcRequestBuilders
.post(QueryRestController.PATH_QUERY) .post(DomainConfig.PATH_QUERY)
.contentType(MediaType.APPLICATION_JSON) .contentType(MediaType.APPLICATION_JSON)
.content(content)) .content(content))
.andDo(MockMvcResultHandlers.print()); .andDo(MockMvcResultHandlers.print());
@ -764,7 +768,7 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest {
MvcResult result = this.mockMvc MvcResult result = this.mockMvc
.perform(MockMvcRequestBuilders .perform(MockMvcRequestBuilders
.post(QueryRestController.PATH_QUERY) .post(DomainConfig.PATH_QUERY)
.contentType(MediaType.APPLICATION_JSON) .contentType(MediaType.APPLICATION_JSON)
.content(content)) .content(content))
.andDo(MockMvcResultHandlers.print()) .andDo(MockMvcResultHandlers.print())
@ -877,7 +881,7 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest {
MvcResult result = this.mockMvc MvcResult result = this.mockMvc
.perform(MockMvcRequestBuilders .perform(MockMvcRequestBuilders
.post(QueryRestController.PATH_QUERY) .post(DomainConfig.PATH_QUERY)
.contentType(MediaType.APPLICATION_JSON) .contentType(MediaType.APPLICATION_JSON)
.content(content)) .content(content))
.andDo(MockMvcResultHandlers.print()) .andDo(MockMvcResultHandlers.print())
@ -1003,7 +1007,7 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest {
MvcResult result = this.mockMvc MvcResult result = this.mockMvc
.perform(MockMvcRequestBuilders .perform(MockMvcRequestBuilders
.post(QueryRestController.PATH_QUERY) .post(DomainConfig.PATH_QUERY)
.contentType(MediaType.APPLICATION_JSON) .contentType(MediaType.APPLICATION_JSON)
.content(content)) .content(content))
.andDo(MockMvcResultHandlers.print()) .andDo(MockMvcResultHandlers.print())
@ -1086,7 +1090,7 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest {
MvcResult result = this.mockMvc MvcResult result = this.mockMvc
.perform(MockMvcRequestBuilders .perform(MockMvcRequestBuilders
.post(QueryRestController.PATH_QUERY) .post(DomainConfig.PATH_QUERY)
.contentType(MediaType.APPLICATION_JSON) .contentType(MediaType.APPLICATION_JSON)
.content(content)) .content(content))
.andDo(MockMvcResultHandlers.print()) .andDo(MockMvcResultHandlers.print())
@ -1141,7 +1145,7 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest {
this.mockMvc this.mockMvc
.perform(MockMvcRequestBuilders .perform(MockMvcRequestBuilders
.post(QueryRestController.PATH_QUERY) .post(DomainConfig.PATH_QUERY)
.contentType(MediaType.APPLICATION_JSON) .contentType(MediaType.APPLICATION_JSON)
.content(content)) .content(content))
@ -1163,7 +1167,7 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest {
this.mockMvc this.mockMvc
.perform(MockMvcRequestBuilders .perform(MockMvcRequestBuilders
.post(QueryRestController.PATH_QUERY) .post(DomainConfig.PATH_QUERY)
.contentType(MediaType.APPLICATION_JSON) .contentType(MediaType.APPLICATION_JSON)
.content(content)) .content(content))

View File

@ -5,6 +5,7 @@ import java.util.Arrays;
import org.junit.After; import org.junit.After;
import org.junit.Test; import org.junit.Test;
import org.springframework.http.MediaType; import org.springframework.http.MediaType;
import org.springframework.test.web.servlet.MvcResult;
import org.springframework.test.web.servlet.request.MockMvcRequestBuilders; import org.springframework.test.web.servlet.request.MockMvcRequestBuilders;
import org.springframework.test.web.servlet.result.MockMvcResultHandlers; import org.springframework.test.web.servlet.result.MockMvcResultHandlers;
import org.springframework.test.web.servlet.result.MockMvcResultMatchers; import org.springframework.test.web.servlet.result.MockMvcResultMatchers;
@ -12,6 +13,8 @@ import org.springframework.test.web.servlet.setup.MockMvcBuilders;
import ch.psi.daq.common.ordering.Ordering; import ch.psi.daq.common.ordering.Ordering;
import ch.psi.daq.common.time.TimeUtils; import ch.psi.daq.common.time.TimeUtils;
import ch.psi.daq.domain.backend.Backend;
import ch.psi.daq.domain.config.DomainConfig;
import ch.psi.daq.domain.json.ChannelName; import ch.psi.daq.domain.json.ChannelName;
import ch.psi.daq.domain.query.DAQQueries; import ch.psi.daq.domain.query.DAQQueries;
import ch.psi.daq.domain.query.DAQQuery; import ch.psi.daq.domain.query.DAQQuery;
@ -23,12 +26,10 @@ import ch.psi.daq.domain.query.operation.AggregationType;
import ch.psi.daq.domain.query.operation.Compression; import ch.psi.daq.domain.query.operation.Compression;
import ch.psi.daq.domain.query.operation.Extrema; import ch.psi.daq.domain.query.operation.Extrema;
import ch.psi.daq.domain.query.operation.QueryField; import ch.psi.daq.domain.query.operation.QueryField;
import ch.psi.daq.domain.reader.Backend;
import ch.psi.daq.domain.request.range.RequestRangeDate; import ch.psi.daq.domain.request.range.RequestRangeDate;
import ch.psi.daq.domain.request.range.RequestRangePulseId; import ch.psi.daq.domain.request.range.RequestRangePulseId;
import ch.psi.daq.domain.request.range.RequestRangeTime; import ch.psi.daq.domain.request.range.RequestRangeTime;
import ch.psi.daq.domain.test.TestTimeUtils; import ch.psi.daq.domain.test.TestTimeUtils;
import ch.psi.daq.queryrest.controller.QueryRestController;
import ch.psi.daq.queryrest.response.json.JSONHTTPResponse; import ch.psi.daq.queryrest.response.json.JSONHTTPResponse;
import ch.psi.daq.test.queryrest.AbstractDaqRestTest; import ch.psi.daq.test.queryrest.AbstractDaqRestTest;
@ -51,7 +52,7 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest {
this.mockMvc this.mockMvc
.perform( .perform(
MockMvcRequestBuilders MockMvcRequestBuilders
.get(QueryRestController.PATH_CHANNELS) .get(DomainConfig.PATH_CHANNELS)
.contentType(MediaType.APPLICATION_JSON)) .contentType(MediaType.APPLICATION_JSON))
.andDo(MockMvcResultHandlers.print()) .andDo(MockMvcResultHandlers.print())
.andExpect(MockMvcResultMatchers.status().isOk()) .andExpect(MockMvcResultMatchers.status().isOk())
@ -80,7 +81,7 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest {
public void testSpecificChannelSearch() throws Exception { public void testSpecificChannelSearch() throws Exception {
this.mockMvc.perform( this.mockMvc.perform(
MockMvcRequestBuilders MockMvcRequestBuilders
.get(QueryRestController.PATH_CHANNELS + "/int32") .get(DomainConfig.PATH_CHANNELS + "/int32")
.contentType(MediaType.APPLICATION_JSON)) .contentType(MediaType.APPLICATION_JSON))
.andDo(MockMvcResultHandlers.print()) .andDo(MockMvcResultHandlers.print())
.andExpect(MockMvcResultMatchers.status().isOk()) .andExpect(MockMvcResultMatchers.status().isOk())
@ -112,7 +113,7 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest {
this.mockMvc this.mockMvc
.perform(MockMvcRequestBuilders .perform(MockMvcRequestBuilders
.post(QueryRestController.PATH_CHANNELS) .post(DomainConfig.PATH_CHANNELS)
.contentType(MediaType.APPLICATION_JSON) .contentType(MediaType.APPLICATION_JSON)
.content(content)) .content(content))
@ -141,7 +142,7 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest {
this.mockMvc this.mockMvc
.perform(MockMvcRequestBuilders .perform(MockMvcRequestBuilders
.post(QueryRestController.PATH_CHANNELS) .post(DomainConfig.PATH_CHANNELS)
.contentType(MediaType.APPLICATION_JSON) .contentType(MediaType.APPLICATION_JSON)
.content(content)) .content(content))
@ -167,7 +168,7 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest {
this.mockMvc this.mockMvc
.perform(MockMvcRequestBuilders .perform(MockMvcRequestBuilders
.post(QueryRestController.PATH_CHANNELS) .post(DomainConfig.PATH_CHANNELS)
.contentType(MediaType.APPLICATION_JSON) .contentType(MediaType.APPLICATION_JSON)
.content(content)) .content(content))
@ -192,7 +193,7 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest {
this.mockMvc.perform( this.mockMvc.perform(
MockMvcRequestBuilders MockMvcRequestBuilders
.options(QueryRestController.PATH_CHANNELS) .options(DomainConfig.PATH_CHANNELS)
.contentType(MediaType.APPLICATION_JSON)) .contentType(MediaType.APPLICATION_JSON))
.andDo(MockMvcResultHandlers.print()) .andDo(MockMvcResultHandlers.print())
.andExpect(MockMvcResultMatchers.status().isOk()) .andExpect(MockMvcResultMatchers.status().isOk())
@ -209,7 +210,7 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest {
// http://localhost:8080/channels // http://localhost:8080/channels
this.mockMvc.perform( this.mockMvc.perform(
MockMvcRequestBuilders MockMvcRequestBuilders
.options(QueryRestController.PATH_CHANNELS) .options(DomainConfig.PATH_CHANNELS)
.header("Origin", "*") .header("Origin", "*")
.header("Access-Control-Request-Method", "POST") .header("Access-Control-Request-Method", "POST")
.contentType(MediaType.APPLICATION_JSON)) .contentType(MediaType.APPLICATION_JSON))
@ -221,7 +222,7 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest {
// -v http://localhost:8080/channels // -v http://localhost:8080/channels
this.mockMvc.perform( this.mockMvc.perform(
MockMvcRequestBuilders MockMvcRequestBuilders
.options(QueryRestController.PATH_CHANNELS) .options(DomainConfig.PATH_CHANNELS)
.header("Origin", "http://localhost:8080") .header("Origin", "http://localhost:8080")
.header("Access-Control-Request-Method", "POST") .header("Access-Control-Request-Method", "POST")
.contentType(MediaType.APPLICATION_JSON)) .contentType(MediaType.APPLICATION_JSON))
@ -248,7 +249,7 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest {
this.mockMvc this.mockMvc
.perform(MockMvcRequestBuilders .perform(MockMvcRequestBuilders
.post(QueryRestController.PATH_QUERY) .post(DomainConfig.PATH_QUERY)
.contentType(MediaType.APPLICATION_JSON) .contentType(MediaType.APPLICATION_JSON)
.content(content)) .content(content))
@ -306,7 +307,7 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest {
this.mockMvc this.mockMvc
.perform(MockMvcRequestBuilders .perform(MockMvcRequestBuilders
.post(QueryRestController.PATH_QUERY) .post(DomainConfig.PATH_QUERY)
.contentType(MediaType.APPLICATION_JSON) .contentType(MediaType.APPLICATION_JSON)
.content(content)) .content(content))
@ -342,7 +343,7 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest {
this.mockMvc this.mockMvc
.perform(MockMvcRequestBuilders .perform(MockMvcRequestBuilders
.post(QueryRestController.PATH_QUERY) .post(DomainConfig.PATH_QUERY)
.contentType(MediaType.APPLICATION_JSON) .contentType(MediaType.APPLICATION_JSON)
.content(content)) .content(content))
@ -389,7 +390,7 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest {
this.mockMvc this.mockMvc
.perform(MockMvcRequestBuilders .perform(MockMvcRequestBuilders
.post(QueryRestController.PATH_QUERIES) .post(DomainConfig.PATH_QUERIES)
.contentType(MediaType.APPLICATION_JSON) .contentType(MediaType.APPLICATION_JSON)
.content(content)) .content(content))
@ -428,7 +429,7 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest {
} }
@Test @Test
public void testTimeRangeQuery() throws Exception { public void testTimeRangeQuery_01() throws Exception {
DAQQuery request = new DAQQuery( DAQQuery request = new DAQQuery(
new RequestRangeTime( new RequestRangeTime(
TimeUtils.getTimeFromMillis(2000, 0), TimeUtils.getTimeFromMillis(2000, 0),
@ -438,7 +439,53 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest {
String content = mapper.writeValueAsString(request); String content = mapper.writeValueAsString(request);
this.mockMvc.perform(MockMvcRequestBuilders this.mockMvc.perform(MockMvcRequestBuilders
.post(QueryRestController.PATH_QUERY) .post(DomainConfig.PATH_QUERY)
.contentType(MediaType.APPLICATION_JSON)
.content(content))
.andDo(MockMvcResultHandlers.print())
.andExpect(MockMvcResultMatchers.status().isOk())
.andExpect(MockMvcResultMatchers.jsonPath("$").isArray())
.andExpect(MockMvcResultMatchers.jsonPath("$[0]").exists())
.andExpect(MockMvcResultMatchers.jsonPath("$[0].channel").isMap())
.andExpect(MockMvcResultMatchers.jsonPath("$[0].channel.name").value(TEST_CHANNEL_01))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data").isArray())
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].pulseId").value(200))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].globalSeconds").value(
TestTimeUtils.getTimeStr(2, 0)))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].pulseId").value(201))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].globalSeconds").value(
TestTimeUtils.getTimeStr(2, 10000000)))
.andExpect(MockMvcResultMatchers.jsonPath("$[1]").exists())
.andExpect(MockMvcResultMatchers.jsonPath("$[1].channel").isMap())
.andExpect(MockMvcResultMatchers.jsonPath("$[1].channel.name").value(TEST_CHANNEL_02))
.andExpect(MockMvcResultMatchers.jsonPath("$[1].data").isArray())
.andExpect(MockMvcResultMatchers.jsonPath("$[1].data[0].pulseId").value(200))
.andExpect(MockMvcResultMatchers.jsonPath("$[1].data[0].globalSeconds").value(
TestTimeUtils.getTimeStr(2, 0)))
.andExpect(MockMvcResultMatchers.jsonPath("$[1].data[1].pulseId").value(201))
.andExpect(MockMvcResultMatchers.jsonPath("$[1].data[1].globalSeconds").value(
TestTimeUtils.getTimeStr(2, 10000000)));
}
@Test
public void testTimeRangeQuery_02() throws Exception {
String content =
"{\"channels\":[{\"name\":\"testChannel1\"},{\"name\":\"testChannel2\"}],\"fields\":[],\"ordering\":\"asc\",\"range\":{\"startSeconds\":\"2.0\",\"endSeconds\":\"2.01\"}}";
MvcResult result = this.mockMvc.perform(MockMvcRequestBuilders
.post(DomainConfig.PATH_QUERY)
.contentType(MediaType.APPLICATION_JSON)
.content(content))
.andDo(MockMvcResultHandlers.print())
.andExpect(MockMvcResultMatchers.status().isOk())
.andReturn();
String response = result.getResponse().getContentAsString();
System.out.println("Response: " + response);
this.mockMvc.perform(MockMvcRequestBuilders
.post(DomainConfig.PATH_QUERY)
.contentType(MediaType.APPLICATION_JSON) .contentType(MediaType.APPLICATION_JSON)
.content(content)) .content(content))
@ -483,7 +530,7 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest {
this.mockMvc this.mockMvc
.perform( .perform(
MockMvcRequestBuilders MockMvcRequestBuilders
.post(QueryRestController.PATH_QUERY) .post(DomainConfig.PATH_QUERY)
.contentType(MediaType.APPLICATION_JSON) .contentType(MediaType.APPLICATION_JSON)
.content(content) .content(content)
) )
@ -525,7 +572,7 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest {
String content = mapper.writeValueAsString(request); String content = mapper.writeValueAsString(request);
this.mockMvc this.mockMvc
.perform(MockMvcRequestBuilders.post(QueryRestController.PATH_QUERY) .perform(MockMvcRequestBuilders.post(DomainConfig.PATH_QUERY)
.contentType(MediaType.APPLICATION_JSON) .contentType(MediaType.APPLICATION_JSON)
.content(content)) .content(content))
@ -573,7 +620,7 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest {
this.mockMvc this.mockMvc
.perform( .perform(
MockMvcRequestBuilders MockMvcRequestBuilders
.post(QueryRestController.PATH_QUERY) .post(DomainConfig.PATH_QUERY)
.contentType(MediaType.APPLICATION_JSON) .contentType(MediaType.APPLICATION_JSON)
.content(content) .content(content)
) )
@ -625,7 +672,7 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest {
this.mockMvc this.mockMvc
.perform( .perform(
MockMvcRequestBuilders MockMvcRequestBuilders
.post(QueryRestController.PATH_QUERY) .post(DomainConfig.PATH_QUERY)
.contentType(MediaType.APPLICATION_JSON) .contentType(MediaType.APPLICATION_JSON)
.content(content) .content(content)
) )
@ -693,7 +740,7 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest {
this.mockMvc this.mockMvc
.perform( .perform(
MockMvcRequestBuilders MockMvcRequestBuilders
.post(QueryRestController.PATH_QUERY) .post(DomainConfig.PATH_QUERY)
.contentType(MediaType.APPLICATION_JSON) .contentType(MediaType.APPLICATION_JSON)
.content(content) .content(content)
) )
@ -809,7 +856,7 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest {
this.mockMvc this.mockMvc
.perform( .perform(
MockMvcRequestBuilders MockMvcRequestBuilders
.post(QueryRestController.PATH_QUERY) .post(DomainConfig.PATH_QUERY)
.contentType(MediaType.APPLICATION_JSON) .contentType(MediaType.APPLICATION_JSON)
.content(content) .content(content)
) )
@ -870,7 +917,7 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest {
this.mockMvc this.mockMvc
.perform( .perform(
MockMvcRequestBuilders MockMvcRequestBuilders
.post(QueryRestController.PATH_QUERY) .post(DomainConfig.PATH_QUERY)
.contentType(MediaType.APPLICATION_JSON) .contentType(MediaType.APPLICATION_JSON)
.content(content) .content(content)
) )
@ -963,7 +1010,7 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest {
this.mockMvc this.mockMvc
.perform(MockMvcRequestBuilders .perform(MockMvcRequestBuilders
.post(QueryRestController.PATH_QUERY) .post(DomainConfig.PATH_QUERY)
.contentType(MediaType.APPLICATION_JSON) .contentType(MediaType.APPLICATION_JSON)
.content(content)) .content(content))
@ -984,7 +1031,7 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest {
this.mockMvc this.mockMvc
.perform(MockMvcRequestBuilders .perform(MockMvcRequestBuilders
.post(QueryRestController.PATH_QUERY) .post(DomainConfig.PATH_QUERY)
.contentType(MediaType.APPLICATION_JSON) .contentType(MediaType.APPLICATION_JSON)
.content(content)) .content(content))
@ -1007,7 +1054,7 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest {
this.mockMvc this.mockMvc
.perform(MockMvcRequestBuilders .perform(MockMvcRequestBuilders
.post(QueryRestController.PATH_QUERY) .post(DomainConfig.PATH_QUERY)
.contentType(MediaType.APPLICATION_JSON) .contentType(MediaType.APPLICATION_JSON)
.content(content)) .content(content))
@ -1029,7 +1076,7 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest {
this.mockMvc this.mockMvc
.perform(MockMvcRequestBuilders .perform(MockMvcRequestBuilders
.post(QueryRestController.PATH_QUERY) .post(DomainConfig.PATH_QUERY)
.contentType(MediaType.APPLICATION_JSON) .contentType(MediaType.APPLICATION_JSON)
.content(content)) .content(content))
@ -1051,7 +1098,7 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest {
this.mockMvc this.mockMvc
.perform(MockMvcRequestBuilders .perform(MockMvcRequestBuilders
.post(QueryRestController.PATH_QUERY) .post(DomainConfig.PATH_QUERY)
.contentType(MediaType.APPLICATION_JSON) .contentType(MediaType.APPLICATION_JSON)
.content(content)) .content(content))

View File

@ -7,11 +7,12 @@ import java.util.stream.Stream;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import ch.psi.daq.common.time.TimeUtils;
import ch.psi.daq.domain.StreamEvent; import ch.psi.daq.domain.StreamEvent;
import ch.psi.daq.domain.query.backend.PulseIdRangeQuery; import ch.psi.daq.domain.backend.Backend;
import ch.psi.daq.domain.query.backend.TimeRangeQuery;
import ch.psi.daq.domain.query.event.EventQuery; import ch.psi.daq.domain.query.event.EventQuery;
import ch.psi.daq.domain.reader.Backend; import ch.psi.daq.domain.query.range.PulseIdRangeQuery;
import ch.psi.daq.domain.query.range.TimeRangeQuery;
import ch.psi.daq.domain.reader.DataReader; import ch.psi.daq.domain.reader.DataReader;
public class DummyArchiverApplianceReader implements DataReader { public class DummyArchiverApplianceReader implements DataReader {
@ -42,16 +43,30 @@ public class DummyArchiverApplianceReader implements DataReader {
return channelStream; return channelStream;
} }
// @Override
// public Stream<? extends StreamEvent> getEventStream(PulseIdRangeQuery query) {
// return DummyCassandraReader.getDummyEventStream(query.getChannel(), query.getStartPulseId(),
// query.getEndPulseId(),
// query.getEventColumns())
// .filter(query.getFilterOrDefault(EventQuery.NO_OP_FILTER));
// }
@Override @Override
public Stream<? extends StreamEvent> getEventStream(PulseIdRangeQuery query) { public Stream<? extends StreamEvent> getEventStream(TimeRangeQuery query) {
return DummyCassandraReader.getDummyEventStream(query.getChannel(), query.getStartPulseId(), query.getEndPulseId(), return DummyCassandraReader.getDummyEventStream(query.getChannel(), query.getStartMillis() / 10,
query.getEventColumns()) query.getEndMillis() / 10)
.filter(query.getFilterOrDefault(EventQuery.NO_OP_FILTER)); .filter(query.getFilterOrDefault(EventQuery.NO_OP_FILTER));
} }
@Override @Override
public Stream<? extends StreamEvent> getEventStream(TimeRangeQuery query) { public TimeRangeQuery getTimeRangeQuery(PulseIdRangeQuery query) {
return DummyCassandraReader.getDummyEventStream(query.getChannel(), query.getStartMillis() / 10, query.getEndMillis() / 10) return new TimeRangeQuery(
.filter(query.getFilterOrDefault(EventQuery.NO_OP_FILTER)); TimeUtils.getTimeFromMillis(query.getStartPulseId() * 10, 0),
TimeUtils.getTimeFromMillis(query.getEndPulseId() * 10, 0),
query);
}
@Override
public void truncateCache() {
} }
} }

View File

@ -1,10 +1,9 @@
/**
*
*/
package ch.psi.daq.test.queryrest.query; package ch.psi.daq.test.queryrest.query;
import java.math.BigDecimal; import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Random; import java.util.Random;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
@ -15,45 +14,54 @@ import java.util.stream.Collectors;
import java.util.stream.LongStream; import java.util.stream.LongStream;
import java.util.stream.Stream; import java.util.stream.Stream;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import org.apache.commons.lang3.ArrayUtils; import org.apache.commons.lang3.ArrayUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import ch.psi.bsread.message.Type;
import ch.psi.daq.cassandra.reader.CassandraReader; import ch.psi.daq.cassandra.reader.CassandraReader;
import ch.psi.daq.cassandra.util.test.CassandraDataGen; import ch.psi.daq.common.ordering.Ordering;
import ch.psi.daq.common.time.TimeUtils; import ch.psi.daq.common.time.TimeUtils;
import ch.psi.daq.domain.DataEvent; import ch.psi.daq.domain.DataEvent;
import ch.psi.daq.domain.FieldNames; import ch.psi.daq.domain.FieldNames;
import ch.psi.daq.domain.backend.Backend;
import ch.psi.daq.domain.config.DomainConfig;
import ch.psi.daq.domain.events.ChannelConfiguration; import ch.psi.daq.domain.events.ChannelConfiguration;
import ch.psi.daq.domain.events.ChannelEvent; import ch.psi.daq.domain.events.ChannelEvent;
import ch.psi.daq.domain.events.MetaPulseId; import ch.psi.daq.domain.events.impl.ChannelConfigurationImpl;
import ch.psi.daq.domain.events.impl.ChannelEventImpl; import ch.psi.daq.domain.events.impl.ChannelEventImpl;
import ch.psi.daq.domain.query.backend.PulseIdRangeQuery; import ch.psi.daq.domain.json.channels.info.ChannelInfo;
import ch.psi.daq.domain.query.backend.TimeRangeQuery; import ch.psi.daq.domain.json.channels.info.ChannelInfoImpl;
import ch.psi.daq.domain.query.event.EventQuery; import ch.psi.daq.domain.query.event.EventQuery;
import ch.psi.daq.domain.query.event.StreamEventQuery; import ch.psi.daq.domain.query.event.StreamEventQuery;
import ch.psi.daq.domain.reader.Backend; import ch.psi.daq.domain.query.range.PulseIdRangeQuery;
import ch.psi.daq.domain.query.range.TimeRangeQuery;
import ch.psi.daq.domain.reader.MetaStreamEventQuery; import ch.psi.daq.domain.reader.MetaStreamEventQuery;
import ch.psi.daq.domain.test.backend.TestBackendAccess;
import ch.psi.daq.domain.test.gen.TestDataGen;
import ch.psi.daq.domain.utils.PropertiesUtils; import ch.psi.daq.domain.utils.PropertiesUtils;
public class DummyCassandraReader implements CassandraReader { public class DummyCassandraReader implements CassandraReader {
private static final Logger LOGGER = LoggerFactory.getLogger(DummyCassandraReader.class); private static final Logger LOGGER = LoggerFactory.getLogger(DummyCassandraReader.class);
public static final String DATABUFFER_TEST_CHANNEL = "DataBufferTestChannel_"; public static final String DATABUFFER_TEST_CHANNEL = "DataBufferTestChannel_";
private static final Random random = new Random(0); private static final Random random = new Random(0);
private static final int KEYSPACE = 1; private static final int KEYSPACE = 1;
private CassandraDataGen dataGen;
private List<String> channels; private List<String> channels;
private AtomicLong channelNameCallCounter = new AtomicLong(); private AtomicLong channelNameCallCounter = new AtomicLong();
/** private TestDataGen dataGen;
* private Backend backend = Backend.SF_DATABUFFER;
*/
public DummyCassandraReader() {
this.dataGen = new CassandraDataGen();
@Resource(name = DomainConfig.BEAN_NAME_TEST_BACKEND_ACCESS)
private TestBackendAccess testBackendAccess;
public DummyCassandraReader() {
this.channels = Lists.newArrayList( this.channels = Lists.newArrayList(
"BoolScalar", "BoolScalar",
"BoolWaveform", "BoolWaveform",
@ -80,9 +88,14 @@ public class DummyCassandraReader implements CassandraReader {
"StringScalar"); "StringScalar");
} }
@PostConstruct
public void afterPropertiesSet() {
dataGen = testBackendAccess.getTestDataGen(backend);
}
@Override @Override
public Backend getBackend() { public Backend getBackend() {
return Backend.SF_DATABUFFER; return backend;
} }
/** /**
@ -100,12 +113,12 @@ public class DummyCassandraReader implements CassandraReader {
return channelStream; return channelStream;
} }
@Override // @Override
public Stream<ChannelEvent> getEventStream(PulseIdRangeQuery query) { // public Stream<ChannelEvent> getEventStream(PulseIdRangeQuery query) {
return getDummyEventStream(query.getChannel(), query.getStartPulseId(), query.getEndPulseId(), // return getDummyEventStream(query.getChannel(), query.getStartPulseId(), query.getEndPulseId(),
query.getEventColumns()) // query.getEventColumns())
.filter(query.getFilterOrDefault(EventQuery.NO_OP_FILTER)); // .filter(query.getFilterOrDefault(EventQuery.NO_OP_FILTER));
} // }
@Override @Override
public Stream<ChannelEvent> getEventStream(TimeRangeQuery query) { public Stream<ChannelEvent> getEventStream(TimeRangeQuery query) {
@ -113,10 +126,6 @@ public class DummyCassandraReader implements CassandraReader {
.filter(query.getFilterOrDefault(EventQuery.NO_OP_FILTER)); .filter(query.getFilterOrDefault(EventQuery.NO_OP_FILTER));
} }
/**
* @{inheritDoc
*/
@Override
public Stream<ChannelEvent> getEventStream(EventQuery eventQuery, Stream<? extends StreamEventQuery> queryProviders) { public Stream<ChannelEvent> getEventStream(EventQuery eventQuery, Stream<? extends StreamEventQuery> queryProviders) {
Stream<ChannelEvent> result = queryProviders.map(ceq -> { Stream<ChannelEvent> result = queryProviders.map(ceq -> {
if (ceq instanceof MetaStreamEventQuery) { if (ceq instanceof MetaStreamEventQuery) {
@ -256,30 +265,26 @@ public class DummyCassandraReader implements CassandraReader {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
/** // /**
* @{inheritDoc // * @{inheritDoc
*/ // */
@Override // @Override
public Stream<? extends StreamEventQuery> getStreamEventQueryStream(PulseIdRangeQuery query) { // public Stream<? extends StreamEventQuery> getStreamEventQueryStream(PulseIdRangeQuery query) {
//
// return dataGen.generateMetaPulseId(
// query.getStartPulseId(),
// (query.getEndPulseId() - query.getStartPulseId() + 1),
// i -> i * 10,
// i -> 0,
// i -> i,
// query.getChannel())
// .stream()
// .map(metaPulse -> {
// metaPulse.setKeyspace(KEYSPACE);
// return metaPulse;
// });
// }
return dataGen.generateMetaPulseId(
query.getStartPulseId(),
(query.getEndPulseId() - query.getStartPulseId() + 1),
i -> i * 10,
i -> 0,
i -> i,
query.getChannel())
.stream()
.map(metaPulse -> {
metaPulse.setKeyspace(KEYSPACE);
return metaPulse;
});
}
/**
* @{inheritDoc
*/
@Override
public Stream<? extends StreamEventQuery> getStreamEventQueryStream(TimeRangeQuery query) { public Stream<? extends StreamEventQuery> getStreamEventQueryStream(TimeRangeQuery query) {
return dataGen.generateMetaTime( return dataGen.generateMetaTime(
@ -293,17 +298,17 @@ public class DummyCassandraReader implements CassandraReader {
query.getChannel()).stream(); query.getChannel()).stream();
} }
/** // /**
* @{inheritDoc // * @{inheritDoc
*/ // */
@Override // @Override
public Stream<MetaPulseId> getMetaStream(PulseIdRangeQuery query) { // public Stream<MetaPulseId> getMetaStream(PulseIdRangeQuery query) {
//
return getStreamEventQueryStream(query).map(r -> { // return getStreamEventQueryStream(query).map(r -> {
return (MetaPulseId) r; // return (MetaPulseId) r;
}); // });
//
} // }
/** /**
@ -324,8 +329,50 @@ public class DummyCassandraReader implements CassandraReader {
@Override @Override
public Stream<ChannelConfiguration> getChannelConfiguration(TimeRangeQuery query) { public Stream<ChannelConfiguration> getChannelConfiguration(TimeRangeQuery query) {
// implement when needed List<ChannelConfiguration> configs = new ArrayList<>();
throw new UnsupportedOperationException();
BigDecimal time = query.getStartTime();
configs.add(
new ChannelConfigurationImpl(
query.getChannel(),
time,
TimeUtils.getMillis(time) / 10,
0,
Type.Int32.getKey(),
new int[] {1},
false,
ChannelConfiguration.DEFAULT_LOCAL_WRITE,
ChannelConfiguration.DEFAULT_BIN_SIZE_IN_MILLIS,
ChannelConfiguration.SPLIT_COUNT,
ChannelConfiguration.DEFAULT_SOURCE,
ChannelConfiguration.DEFAULT_MODULO,
ChannelConfiguration.DEFAULT_OFFSET,
Backend.SF_DATABUFFER));
if (query.getEndMillis() > query.getStartMillis()) {
time = query.getEndTime();
configs.add(
new ChannelConfigurationImpl(
query.getChannel(),
time,
TimeUtils.getMillis(time) / 10,
1,
Type.Int32.getKey(),
new int[] {1},
false,
ChannelConfiguration.DEFAULT_LOCAL_WRITE,
ChannelConfiguration.DEFAULT_BIN_SIZE_IN_MILLIS,
ChannelConfiguration.SPLIT_COUNT,
ChannelConfiguration.DEFAULT_SOURCE,
ChannelConfiguration.DEFAULT_MODULO,
ChannelConfiguration.DEFAULT_OFFSET,
Backend.SF_DATABUFFER));
}
if(Ordering.desc.equals(query.getOrdering())){
Collections.reverse(configs);
}
return configs.stream();
} }
@Override @Override
@ -361,4 +408,22 @@ public class DummyCassandraReader implements CassandraReader {
// implement when needed // implement when needed
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
@Override
public TimeRangeQuery getTimeRangeQuery(PulseIdRangeQuery query) {
return new TimeRangeQuery(
TimeUtils.getTimeFromMillis(query.getStartPulseId() * 10, 0),
TimeUtils.getTimeFromMillis(query.getEndPulseId() * 10, 0),
query);
}
@Override
public Stream<? extends ChannelInfo> getChannelInfoStream(TimeRangeQuery query) {
return getChannelConfiguration(query)
.map(channelConfiguration -> new ChannelInfoImpl(channelConfiguration));
}
@Override
public void truncateCache() {
}
} }

View File

@ -1,22 +0,0 @@
package ch.psi.daq.test.queryrest.status;
import java.util.concurrent.CompletableFuture;
import ch.psi.daq.common.time.TimeUtils;
import ch.psi.daq.domain.json.ChannelName;
import ch.psi.daq.domain.json.status.channel.ChannelStatus;
import ch.psi.daq.domain.reader.Backend;
import ch.psi.daq.domain.status.AbstractStatusReader;
public class DummyArchiverApplianceStatusReader extends AbstractStatusReader {
public DummyArchiverApplianceStatusReader() {
super(Backend.SF_ARCHIVERAPPLIANCE, 30);
}
@Override
public CompletableFuture<ChannelStatus> getChannelStatusAsync(String channel) {
return CompletableFuture.completedFuture(new ChannelStatus(new ChannelName(channel, getBackend()), true, true,
TimeUtils.getTimeFromMillis(1467638000000L, 0)));
}
}

View File

@ -1,22 +0,0 @@
package ch.psi.daq.test.queryrest.status;
import java.util.concurrent.CompletableFuture;
import ch.psi.daq.common.time.TimeUtils;
import ch.psi.daq.domain.json.ChannelName;
import ch.psi.daq.domain.json.status.channel.ChannelStatus;
import ch.psi.daq.domain.reader.Backend;
import ch.psi.daq.domain.status.AbstractStatusReader;
public class DummyCassandraStatusReader extends AbstractStatusReader {
public DummyCassandraStatusReader() {
super(Backend.SF_DATABUFFER, 30);
}
@Override
public CompletableFuture<ChannelStatus> getChannelStatusAsync(String channel) {
return CompletableFuture.completedFuture(new ChannelStatus(new ChannelName(channel, getBackend()), false, false,
TimeUtils.getTimeFromMillis(0, 0)));
}
}