From 596e2617a8a4ff3bd48e73a81d0cb3eb10ef81d0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fabian=20M=C3=A4rki?= Date: Tue, 5 Jul 2016 09:40:14 +0200 Subject: [PATCH 1/2] ATEST-344 --- .../daq/queryrest/config/QueryRestConfig.java | 11 ++ .../controller/QueryRestController.java | 78 +++++++++++ src/main/resources/queryrest.properties | 3 + .../daq/test/queryrest/DaqWebMvcConfig.java | 15 +++ .../QueryRestControllerChannelStatusTest.java | 122 ++++++++++++++++++ .../DummyArchiverApplianceStatusReader.java | 22 ++++ .../status/DummyCassandraStatusReader.java | 22 ++++ 7 files changed, 273 insertions(+) create mode 100644 src/test/java/ch/psi/daq/test/queryrest/controller/QueryRestControllerChannelStatusTest.java create mode 100644 src/test/java/ch/psi/daq/test/queryrest/status/DummyArchiverApplianceStatusReader.java create mode 100644 src/test/java/ch/psi/daq/test/queryrest/status/DummyCassandraStatusReader.java diff --git a/src/main/java/ch/psi/daq/queryrest/config/QueryRestConfig.java b/src/main/java/ch/psi/daq/queryrest/config/QueryRestConfig.java index 5cb7a0f..7b160a6 100644 --- a/src/main/java/ch/psi/daq/queryrest/config/QueryRestConfig.java +++ b/src/main/java/ch/psi/daq/queryrest/config/QueryRestConfig.java @@ -15,6 +15,7 @@ import org.slf4j.LoggerFactory; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Import; +import org.springframework.context.annotation.Lazy; import org.springframework.context.annotation.PropertySource; import org.springframework.core.env.Environment; import org.springframework.http.converter.HttpMessageConverter; @@ -51,6 +52,7 @@ public class QueryRestConfig extends WebMvcConfigurerAdapter { private static final String QUERYREST_DEFAULT_RESPONSE_FIELDS = "queryrest.default.response.fields"; private static final String QUERYREST_CORS_ALLOWEDORIGINS = "queryrest.cors.allowedorigins"; private static final String QUERYREST_CORS_FORCEALLHEADERS = "queryrest.cors.forceallheaders"; + private static final String QUERYREST_READTIMOUT = "queryrest.readtimeout"; // a nested configuration // this guarantees that the ordering of the properties file is as expected @@ -67,6 +69,7 @@ public class QueryRestConfig extends WebMvcConfigurerAdapter { public static final String BEAN_NAME_DEFAULT_RESPONSE_AGGREGATIONS = "defaultResponseAggregations"; public static final String BEAN_NAME_CORS_ALLOWEDORIGINS = "corsAllowedorigins"; public static final String BEAN_NAME_CORS_FORCEALLHEADERS = "corsForceallheaders"; + public static final String BEAN_NAME_READ_TIMOUT = "queryRestReadTimout"; @Resource private Environment env; @@ -178,4 +181,12 @@ public class QueryRestConfig extends WebMvcConfigurerAdapter { LOGGER.debug("Load '{}={}'", QUERYREST_CORS_FORCEALLHEADERS, value); return value; } + + @Bean(name = BEAN_NAME_READ_TIMOUT) + @Lazy + public Integer readTimeout() { + Integer value = env.getProperty(QUERYREST_READTIMOUT, Integer.class, 10); + LOGGER.debug("Load '{}={}'", QUERYREST_READTIMOUT, value); + return value; + } } diff --git a/src/main/java/ch/psi/daq/queryrest/controller/QueryRestController.java b/src/main/java/ch/psi/daq/queryrest/controller/QueryRestController.java index 422a1ff..30d4956 100644 --- a/src/main/java/ch/psi/daq/queryrest/controller/QueryRestController.java +++ b/src/main/java/ch/psi/daq/queryrest/controller/QueryRestController.java @@ -7,6 +7,8 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -42,6 +44,7 @@ import ch.psi.daq.domain.DataEvent; import ch.psi.daq.domain.FieldNames; import ch.psi.daq.domain.config.DomainConfig; import ch.psi.daq.domain.json.ChannelName; +import ch.psi.daq.domain.json.status.channel.ChannelStatus; import ch.psi.daq.domain.query.DAQQueries; import ch.psi.daq.domain.query.DAQQuery; import ch.psi.daq.domain.query.DAQQueryElement; @@ -51,14 +54,17 @@ import ch.psi.daq.domain.query.operation.Aggregation; import ch.psi.daq.domain.query.operation.AggregationType; import ch.psi.daq.domain.query.operation.QueryField; import ch.psi.daq.domain.query.operation.ResponseFormat; +import ch.psi.daq.domain.query.status.channel.ChannelStatusQuery; import ch.psi.daq.domain.reader.Backend; import ch.psi.daq.domain.request.validate.RequestProviderValidator; +import ch.psi.daq.domain.status.StatusReader; import ch.psi.daq.query.analyzer.QueryAnalyzer; import ch.psi.daq.query.config.QueryConfig; import ch.psi.daq.query.model.Query; import ch.psi.daq.query.model.impl.BackendQuery; import ch.psi.daq.query.processor.ChannelNameCache; import ch.psi.daq.query.processor.QueryProcessor; +import ch.psi.daq.queryrest.config.QueryRestConfig; import ch.psi.daq.queryrest.response.csv.CSVResponseStreamWriter; import ch.psi.daq.queryrest.response.json.JSONResponseStreamWriter; @@ -73,6 +79,7 @@ public class QueryRestController { public static final String PATH_CHANNELS = DomainConfig.PATH_CHANNELS; public static final String PATH_QUERY = DomainConfig.PATH_QUERY; public static final String PATH_QUERIES = DomainConfig.PATH_QUERIES; + public static final String PATH_STATUS_CHANNELS = DomainConfig.PATH_STATUS_CHANNELS; @Resource private Validator queryValidator; @@ -93,9 +100,14 @@ public class QueryRestController { @Resource private Function queryAnalizerFactory; + @Resource(name = QueryRestConfig.BEAN_NAME_READ_TIMOUT) + private Integer readTimeout; + private Map queryProcessors = new LinkedHashMap<>(); private ChannelNameCache channelNameCache; + private Map statusReaders = new LinkedHashMap<>(); + @PostConstruct public void afterPropertiesSet() { List exceptions = new ArrayList<>(); @@ -135,6 +147,32 @@ public class QueryRestController { throw new RuntimeException("No Backends available!"); } + try { + StatusReader statusReader = + appContext.getBean(QueryConfig.BEAN_NAME_CASSANDRA_STATUS_READER, StatusReader.class); + statusReaders.put(statusReader.getBackend(), statusReader); + } catch (Exception e) { + exceptions.add(e); + LOGGER.warn(""); + LOGGER.warn("##########"); + LOGGER.warn("Could not load status reader for cassandra."); + LOGGER.warn("##########"); + LOGGER.warn(""); + } + + try { + StatusReader statusReader = + appContext.getBean(QueryConfig.BEAN_NAME_ARCHIVER_APPLIANCE_STATUS_READER, StatusReader.class); + statusReaders.put(statusReader.getBackend(), statusReader); + } catch (Exception e) { + exceptions.add(e); + LOGGER.warn(""); + LOGGER.warn("##########"); + LOGGER.warn("Could not load status reader for archiverappliance."); + LOGGER.warn("##########"); + LOGGER.warn(""); + } + channelNameCache = new ChannelNameCache(queryProcessors, appContext.getBean(CassandraConfig.BEAN_NAME_READ_TIMEOUT, Integer.class).longValue()); @@ -183,6 +221,46 @@ public class QueryRestController { return getChannels(new ChannelsRequest(channelName)); } + /** + * Queries for channels status + * + * @param query the {@link ChannelStatusQuery} + * @param res the {@link HttpServletResponse} instance associated with this request + * @throws IOException thrown if writing to the output stream fails + */ + @RequestMapping( + value = PATH_STATUS_CHANNELS, + method = RequestMethod.POST, + consumes = {MediaType.APPLICATION_JSON_VALUE}) + public @ResponseBody Collection executeChannelStatusQuery(@RequestBody ChannelStatusQuery query, + HttpServletResponse res) throws Exception { + + // set backends if not defined yet + channelNameCache.setBackends(query.getChannels()); + + List> futures = new ArrayList<>(query.getChannels().size()); + for (ChannelName channel : query.getChannels()) { + StatusReader statusReader = statusReaders.get(channel.getBackend()); + if (statusReader != null) { + futures.add(statusReader.getChannelStatusAsync(channel.getName())); + } else { + LOGGER.warn("There is no StatusReader available for '{}'.", channel.getBackend()); + } + } + + try { + List retStatus = new ArrayList<>(futures.size()); + for (CompletableFuture completableFuture : futures) { + retStatus.add(completableFuture.get(readTimeout, TimeUnit.SECONDS)); + } + return retStatus; + } catch (Exception e) { + String message = "Could not extract ChannelStatus within timelimits."; + LOGGER.error(message, e); + throw new IllegalStateException(message, e); + } + } + /** * Accepts the properties for the {@link DAQQuery} instance as stringified JSON query string. * diff --git a/src/main/resources/queryrest.properties b/src/main/resources/queryrest.properties index 5fbaf69..baaec0b 100644 --- a/src/main/resources/queryrest.properties +++ b/src/main/resources/queryrest.properties @@ -16,3 +16,6 @@ queryrest.cors.forceallheaders=false # If this is set to '*', then all requests are allowed, from any source. If it's set to, say, http://ui-data-api.psi.ch, then only requests # originating from that domain (Origin header set to that value) will be allowed. Otherwise a 403 error will be returned. queryrest.cors.allowedorigins=* + +# Timeout for read operations in seconds +queryrest.readtimeout=30 diff --git a/src/test/java/ch/psi/daq/test/queryrest/DaqWebMvcConfig.java b/src/test/java/ch/psi/daq/test/queryrest/DaqWebMvcConfig.java index 6404193..cb512cd 100644 --- a/src/test/java/ch/psi/daq/test/queryrest/DaqWebMvcConfig.java +++ b/src/test/java/ch/psi/daq/test/queryrest/DaqWebMvcConfig.java @@ -12,12 +12,15 @@ import org.springframework.web.servlet.config.annotation.WebMvcConfigurationSupp import ch.psi.daq.cassandra.reader.CassandraReader; import ch.psi.daq.domain.reader.DataReader; +import ch.psi.daq.domain.status.StatusReader; import ch.psi.daq.query.config.QueryConfig; import ch.psi.daq.query.processor.QueryProcessor; import ch.psi.daq.query.processor.QueryProcessorLocal; import ch.psi.daq.test.query.config.LocalQueryTestConfig; import ch.psi.daq.test.queryrest.query.DummyArchiverApplianceReader; import ch.psi.daq.test.queryrest.query.DummyCassandraReader; +import ch.psi.daq.test.queryrest.status.DummyArchiverApplianceStatusReader; +import ch.psi.daq.test.queryrest.status.DummyCassandraStatusReader; @Configuration @ComponentScan @@ -50,4 +53,16 @@ public class DaqWebMvcConfig extends WebMvcConfigurationSupport { public DataReader archiverApplianceReader() { return new DummyArchiverApplianceReader(); } + + @Bean(name = QueryConfig.BEAN_NAME_CASSANDRA_STATUS_READER) + @Lazy + public StatusReader cassandraStatusReader() { + return new DummyCassandraStatusReader(); + } + + @Bean(name = QueryConfig.BEAN_NAME_ARCHIVER_APPLIANCE_STATUS_READER) + @Lazy + public StatusReader archiverApplianceStatusReader() { + return new DummyArchiverApplianceStatusReader(); + } } diff --git a/src/test/java/ch/psi/daq/test/queryrest/controller/QueryRestControllerChannelStatusTest.java b/src/test/java/ch/psi/daq/test/queryrest/controller/QueryRestControllerChannelStatusTest.java new file mode 100644 index 0000000..82ff11f --- /dev/null +++ b/src/test/java/ch/psi/daq/test/queryrest/controller/QueryRestControllerChannelStatusTest.java @@ -0,0 +1,122 @@ +package ch.psi.daq.test.queryrest.controller; + +import static org.junit.Assert.assertFalse; + +import org.junit.After; +import org.junit.Test; +import org.springframework.http.MediaType; +import org.springframework.test.web.servlet.MvcResult; +import org.springframework.test.web.servlet.request.MockMvcRequestBuilders; +import org.springframework.test.web.servlet.result.MockMvcResultHandlers; +import org.springframework.test.web.servlet.result.MockMvcResultMatchers; + +import ch.psi.daq.common.time.TimeUtils; +import ch.psi.daq.domain.json.ChannelName; +import ch.psi.daq.domain.query.status.channel.ChannelStatusQuery; +import ch.psi.daq.domain.reader.Backend; +import ch.psi.daq.queryrest.controller.QueryRestController; +import ch.psi.daq.test.queryrest.AbstractDaqRestTest; + +/** + * Tests the {@link DaqController} implementation. + */ +public class QueryRestControllerChannelStatusTest extends AbstractDaqRestTest { + + @After + public void tearDown() throws Exception {} + + @Test + public void testChannelStatusQuery_01() throws Exception { + ChannelStatusQuery query = new ChannelStatusQuery("DataBuffer1", "DataBuffer2"); + + String content = mapper.writeValueAsString(query); + System.out.println(content); + + this.mockMvc + .perform(MockMvcRequestBuilders + .post(QueryRestController.PATH_STATUS_CHANNELS) + .contentType(MediaType.APPLICATION_JSON) + .content(content)) + + .andExpect(MockMvcResultMatchers.status().isOk()) + .andExpect(MockMvcResultMatchers.jsonPath("$").isArray()) + .andExpect(MockMvcResultMatchers.jsonPath("$[0]").exists()) + .andExpect(MockMvcResultMatchers.jsonPath("$[0].channel.backend").value(Backend.SF_DATABUFFER.getKey())) + .andExpect(MockMvcResultMatchers.jsonPath("$[0].channel.name").value("DataBuffer1")) + .andExpect(MockMvcResultMatchers.jsonPath("$[0].recording").value(false)) + .andExpect(MockMvcResultMatchers.jsonPath("$[0].connected").value(false)) + .andExpect( + MockMvcResultMatchers.jsonPath("$[0].lastEventDate").value( + TimeUtils.format(TimeUtils.getTimeFromMillis(0, 0)))) + .andExpect(MockMvcResultMatchers.jsonPath("$[1]").exists()) + .andExpect(MockMvcResultMatchers.jsonPath("$[1].channel.backend").value(Backend.SF_DATABUFFER.getKey())) + .andExpect(MockMvcResultMatchers.jsonPath("$[1].channel.name").value("DataBuffer2")) + .andExpect(MockMvcResultMatchers.jsonPath("$[1].recording").value(false)) + .andExpect(MockMvcResultMatchers.jsonPath("$[1].connected").value(false)) + .andExpect( + MockMvcResultMatchers.jsonPath("$[1].lastEventDate").value( + TimeUtils.format(TimeUtils.getTimeFromMillis(0, 0)))) + .andExpect(MockMvcResultMatchers.jsonPath("$[2]").doesNotExist()); + } + + @Test + public void testChannelStatusQuery_02() throws Exception { + ChannelStatusQuery query = new ChannelStatusQuery(); + query.addChannel(new ChannelName("ArchiverAppliance1", Backend.SF_ARCHIVERAPPLIANCE)); + query.addChannel(new ChannelName("ArchiverAppliance2", Backend.SF_ARCHIVERAPPLIANCE)); + + String content = mapper.writeValueAsString(query); + System.out.println(content); + + this.mockMvc + .perform(MockMvcRequestBuilders + .post(QueryRestController.PATH_STATUS_CHANNELS) + .contentType(MediaType.APPLICATION_JSON) + .content(content)) + + .andExpect(MockMvcResultMatchers.status().isOk()) + .andExpect(MockMvcResultMatchers.jsonPath("$").isArray()) + .andExpect(MockMvcResultMatchers.jsonPath("$[0]").exists()) + .andExpect( + MockMvcResultMatchers.jsonPath("$[0].channel.backend").value(Backend.SF_ARCHIVERAPPLIANCE.getKey())) + .andExpect(MockMvcResultMatchers.jsonPath("$[0].channel.name").value("ArchiverAppliance1")) + .andExpect(MockMvcResultMatchers.jsonPath("$[0].recording").value(true)) + .andExpect(MockMvcResultMatchers.jsonPath("$[0].connected").value(true)) + .andExpect( + MockMvcResultMatchers.jsonPath("$[0].lastEventDate").value( + TimeUtils.format(TimeUtils.getTimeFromMillis(1467638000000L, 0)))) + .andExpect(MockMvcResultMatchers.jsonPath("$[1]").exists()) + .andExpect( + MockMvcResultMatchers.jsonPath("$[1].channel.backend").value(Backend.SF_ARCHIVERAPPLIANCE.getKey())) + .andExpect(MockMvcResultMatchers.jsonPath("$[1].channel.name").value("ArchiverAppliance2")) + .andExpect(MockMvcResultMatchers.jsonPath("$[1].recording").value(true)) + .andExpect(MockMvcResultMatchers.jsonPath("$[1].connected").value(true)) + .andExpect( + MockMvcResultMatchers.jsonPath("$[1].lastEventDate").value( + TimeUtils.format(TimeUtils.getTimeFromMillis(1467638000000L, 0)))) + .andExpect(MockMvcResultMatchers.jsonPath("$[2]").doesNotExist()); + } + + @Test + public void testChannelStatusQuery_03() throws Exception { + ChannelStatusQuery query = new ChannelStatusQuery(); + query.addChannel(new ChannelName("ArchiverAppliance1", Backend.SF_ARCHIVERAPPLIANCE)); + query.addChannel(new ChannelName("ArchiverAppliance2", Backend.SF_ARCHIVERAPPLIANCE)); + + String content = mapper.writeValueAsString(query); + System.out.println(content); + + MvcResult result = this.mockMvc.perform(MockMvcRequestBuilders + .post(QueryRestController.PATH_STATUS_CHANNELS) + .contentType(MediaType.APPLICATION_JSON) + .content(content)) + .andDo(MockMvcResultHandlers.print()) + .andExpect(MockMvcResultMatchers.status().isOk()) + .andReturn(); + + String response = result.getResponse().getContentAsString(); + System.out.println("Response: " + response); + + assertFalse(response.contains("lastEventTime")); + } +} diff --git a/src/test/java/ch/psi/daq/test/queryrest/status/DummyArchiverApplianceStatusReader.java b/src/test/java/ch/psi/daq/test/queryrest/status/DummyArchiverApplianceStatusReader.java new file mode 100644 index 0000000..bdf600b --- /dev/null +++ b/src/test/java/ch/psi/daq/test/queryrest/status/DummyArchiverApplianceStatusReader.java @@ -0,0 +1,22 @@ +package ch.psi.daq.test.queryrest.status; + +import java.util.concurrent.CompletableFuture; + +import ch.psi.daq.common.time.TimeUtils; +import ch.psi.daq.domain.json.ChannelName; +import ch.psi.daq.domain.json.status.channel.ChannelStatus; +import ch.psi.daq.domain.reader.Backend; +import ch.psi.daq.domain.status.AbstractStatusReader; + +public class DummyArchiverApplianceStatusReader extends AbstractStatusReader { + + public DummyArchiverApplianceStatusReader() { + super(Backend.SF_ARCHIVERAPPLIANCE, 30); + } + + @Override + public CompletableFuture getChannelStatusAsync(String channel) { + return CompletableFuture.completedFuture(new ChannelStatus(new ChannelName(channel, getBackend()), true, true, + TimeUtils.getTimeFromMillis(1467638000000L, 0))); + } +} diff --git a/src/test/java/ch/psi/daq/test/queryrest/status/DummyCassandraStatusReader.java b/src/test/java/ch/psi/daq/test/queryrest/status/DummyCassandraStatusReader.java new file mode 100644 index 0000000..6d2cd3a --- /dev/null +++ b/src/test/java/ch/psi/daq/test/queryrest/status/DummyCassandraStatusReader.java @@ -0,0 +1,22 @@ +package ch.psi.daq.test.queryrest.status; + +import java.util.concurrent.CompletableFuture; + +import ch.psi.daq.common.time.TimeUtils; +import ch.psi.daq.domain.json.ChannelName; +import ch.psi.daq.domain.json.status.channel.ChannelStatus; +import ch.psi.daq.domain.reader.Backend; +import ch.psi.daq.domain.status.AbstractStatusReader; + +public class DummyCassandraStatusReader extends AbstractStatusReader { + + public DummyCassandraStatusReader() { + super(Backend.SF_DATABUFFER, 30); + } + + @Override + public CompletableFuture getChannelStatusAsync(String channel) { + return CompletableFuture.completedFuture(new ChannelStatus(new ChannelName(channel, getBackend()), false, false, + TimeUtils.getTimeFromMillis(0, 0))); + } +} From ee43d775eab4b4957243351ecbfc42c76859e125 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fabian=20M=C3=A4rki?= Date: Wed, 6 Jul 2016 09:37:51 +0200 Subject: [PATCH 2/2] Explanation of channel status info. --- Readme.md | 66 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 66 insertions(+) diff --git a/Readme.md b/Readme.md index 1783c95..0cbe03f 100644 --- a/Readme.md +++ b/Readme.md @@ -906,3 +906,69 @@ curl -H "Content-Type: application/json" -X POST -d '{"aggregationType":"extrem ] ``` + + +## Query Channel Status + +It is possible to retieve channel specific status information. + +### Request + +``` +POST http://:/status/channels +``` + +#### Data + +```json +{ + "channels":[ + "Channel_02", + "Channel_04" + ] +} +``` + +##### Explanation + +- **channels**: Array of channels to be queried (see [here](Readme.md#query_channel_names) and [here](Readme.md#define_channel_names)). + +### Example + +#### Command + +```bash +curl -H "Content-Type: application/json" -X POST -d '{"channels": ["Channel_02","Channel_04"]}' http://data-api.psi.ch/sf/status/channels | python -m json.tool +``` + +#### Response + +```json +[ + { + "channel":{ + "name":"Channel_02", + "backend":"sf-databuffer" + }, + "recording":true, + "connected":true, + "lastEventDate":"2016-07-06T09:16:19.607242575+02:00" + }, + { + "channel":{ + "name":"Channel_04", + "backend":"sf-archiverappliance" + }, + "recording":false, + "connected":false, + "lastEventDate":"2016-07-06T04:16:14.000000000+02:00" + } +] +``` + +##### Explanation + +- **channel**: The name and backend of the channel. +- **recording**: Defines if the channel is still recorded (please note that for beam synchronous DAQ this means that the source/IOC providing the channel is still recorded). +- **connected**: Defines if the channel is still connected (please note that for beam synchronous DAQ this means that the source/IOC providing the channel is still connected). +- **lastEventDate**: The timestamp of the last received event from the channel in the ISO8601 format.