diff --git a/Readme.md b/Readme.md index 39710d7..c420bc2 100644 --- a/Readme.md +++ b/Readme.md @@ -931,3 +931,69 @@ Illustration of array index aggregation with additional with binning (several nr ``` + + +## Query Channel Status + +It is possible to retieve channel specific status information. + +### Request + +``` +POST http://:/status/channels +``` + +#### Data + +```json +{ + "channels":[ + "Channel_02", + "Channel_04" + ] +} +``` + +##### Explanation + +- **channels**: Array of channels to be queried (see [here](Readme.md#query_channel_names) and [here](Readme.md#define_channel_names)). + +### Example + +#### Command + +```bash +curl -H "Content-Type: application/json" -X POST -d '{"channels": ["Channel_02","Channel_04"]}' http://data-api.psi.ch/sf/status/channels | python -m json.tool +``` + +#### Response + +```json +[ + { + "channel":{ + "name":"Channel_02", + "backend":"sf-databuffer" + }, + "recording":true, + "connected":true, + "lastEventDate":"2016-07-06T09:16:19.607242575+02:00" + }, + { + "channel":{ + "name":"Channel_04", + "backend":"sf-archiverappliance" + }, + "recording":false, + "connected":false, + "lastEventDate":"2016-07-06T04:16:14.000000000+02:00" + } +] +``` + +##### Explanation + +- **channel**: The name and backend of the channel. +- **recording**: Defines if the channel is still recorded (please note that for beam synchronous DAQ this means that the source/IOC providing the channel is still recorded). +- **connected**: Defines if the channel is still connected (please note that for beam synchronous DAQ this means that the source/IOC providing the channel is still connected). +- **lastEventDate**: The timestamp of the last received event from the channel in the ISO8601 format. diff --git a/src/main/java/ch/psi/daq/queryrest/config/QueryRestConfig.java b/src/main/java/ch/psi/daq/queryrest/config/QueryRestConfig.java index 4ed4607..fd2dcf0 100644 --- a/src/main/java/ch/psi/daq/queryrest/config/QueryRestConfig.java +++ b/src/main/java/ch/psi/daq/queryrest/config/QueryRestConfig.java @@ -15,6 +15,7 @@ import org.slf4j.LoggerFactory; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Import; +import org.springframework.context.annotation.Lazy; import org.springframework.context.annotation.PropertySource; import org.springframework.core.env.Environment; import org.springframework.http.converter.HttpMessageConverter; @@ -56,6 +57,7 @@ public class QueryRestConfig extends WebMvcConfigurerAdapter { private static final String QUERYREST_DEFAULT_RESPONSE_FIELDS = "queryrest.default.response.fields"; private static final String QUERYREST_CORS_ALLOWEDORIGINS = "queryrest.cors.allowedorigins"; private static final String QUERYREST_CORS_FORCEALLHEADERS = "queryrest.cors.forceallheaders"; + private static final String QUERYREST_READTIMOUT = "queryrest.readtimeout"; // a nested configuration // this guarantees that the ordering of the properties file is as expected @@ -72,6 +74,7 @@ public class QueryRestConfig extends WebMvcConfigurerAdapter { public static final String BEAN_NAME_DEFAULT_RESPONSE_AGGREGATIONS = "defaultResponseAggregations"; public static final String BEAN_NAME_CORS_ALLOWEDORIGINS = "corsAllowedorigins"; public static final String BEAN_NAME_CORS_FORCEALLHEADERS = "corsForceallheaders"; + public static final String BEAN_NAME_READ_TIMOUT = "queryRestReadTimout"; @Resource private Environment env; @@ -191,4 +194,12 @@ public class QueryRestConfig extends WebMvcConfigurerAdapter { LOGGER.debug("Load '{}={}'", QUERYREST_CORS_FORCEALLHEADERS, value); return value; } + + @Bean(name = BEAN_NAME_READ_TIMOUT) + @Lazy + public Integer readTimeout() { + Integer value = env.getProperty(QUERYREST_READTIMOUT, Integer.class, 10); + LOGGER.debug("Load '{}={}'", QUERYREST_READTIMOUT, value); + return value; + } } diff --git a/src/main/java/ch/psi/daq/queryrest/controller/QueryRestController.java b/src/main/java/ch/psi/daq/queryrest/controller/QueryRestController.java index 8706227..ef45079 100644 --- a/src/main/java/ch/psi/daq/queryrest/controller/QueryRestController.java +++ b/src/main/java/ch/psi/daq/queryrest/controller/QueryRestController.java @@ -1,9 +1,16 @@ package ch.psi.daq.queryrest.controller; -import java.io.IOException; +import java.util.ArrayList; import java.util.Collection; +import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; import javax.annotation.Resource; import javax.servlet.http.HttpServletResponse; import javax.validation.Valid; @@ -23,15 +30,13 @@ import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.ResponseBody; import org.springframework.web.bind.annotation.RestController; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.collect.Lists; - -import ch.psi.daq.common.json.deserialize.AttributeBasedDeserializer; +import ch.psi.daq.cassandra.config.CassandraConfig; import ch.psi.daq.common.ordering.Ordering; import ch.psi.daq.domain.config.DomainConfig; +import ch.psi.daq.domain.json.ChannelName; +import ch.psi.daq.domain.json.status.channel.ChannelStatus; import ch.psi.daq.domain.query.DAQQueries; import ch.psi.daq.domain.query.DAQQuery; -import ch.psi.daq.domain.query.DAQQueryElement; import ch.psi.daq.domain.query.channels.ChannelsRequest; import ch.psi.daq.domain.query.channels.ChannelsResponse; import ch.psi.daq.domain.query.operation.Aggregation; @@ -39,13 +44,24 @@ import ch.psi.daq.domain.query.operation.AggregationType; import ch.psi.daq.domain.query.operation.QueryField; import ch.psi.daq.domain.query.operation.Response; import ch.psi.daq.domain.query.operation.ResponseFormat; +import ch.psi.daq.domain.query.status.channel.ChannelStatusQuery; import ch.psi.daq.domain.reader.Backend; import ch.psi.daq.domain.request.validate.RequestProviderValidator; +import ch.psi.daq.domain.status.StatusReader; +import ch.psi.daq.query.analyzer.QueryAnalyzer; +import ch.psi.daq.query.config.QueryConfig; +import ch.psi.daq.query.model.Query; +import ch.psi.daq.query.processor.ChannelNameCache; +import ch.psi.daq.query.processor.QueryProcessor; +import ch.psi.daq.queryrest.config.QueryRestConfig; import ch.psi.daq.queryrest.query.QueryManager; import ch.psi.daq.queryrest.response.AbstractHTTPResponse; import ch.psi.daq.queryrest.response.PolymorphicResponseMixIn; import ch.psi.daq.queryrest.response.json.JSONHTTPResponse; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Lists; + @RestController public class QueryRestController { @@ -54,6 +70,7 @@ public class QueryRestController { public static final String PATH_CHANNELS = DomainConfig.PATH_CHANNELS; public static final String PATH_QUERY = DomainConfig.PATH_QUERY; public static final String PATH_QUERIES = DomainConfig.PATH_QUERIES; + public static final String PATH_STATUS_CHANNELS = DomainConfig.PATH_STATUS_CHANNELS; @Resource private ApplicationContext appContext; @@ -70,6 +87,92 @@ public class QueryRestController { private Response defaultResponse = new JSONHTTPResponse(); + @Resource + private Function queryAnalizerFactory; + + @Resource(name = QueryRestConfig.BEAN_NAME_READ_TIMOUT) + private Integer readTimeout; + + private Map queryProcessors = new LinkedHashMap<>(); + private ChannelNameCache channelNameCache; + + private Map statusReaders = new LinkedHashMap<>(); + + @PostConstruct + public void afterPropertiesSet() { + List exceptions = new ArrayList<>(); + + try { + QueryProcessor queryProcessor = + appContext.getBean(QueryConfig.BEAN_NAME_CASSANDRA_QUERY_PROCESSOR, QueryProcessor.class); + queryProcessors.put(queryProcessor.getBackend(), queryProcessor); + } catch (Exception e) { + exceptions.add(e); + LOGGER.warn(""); + LOGGER.warn("##########"); + LOGGER.warn("Could not load query processor for cassandra."); + LOGGER.warn("##########"); + LOGGER.warn(""); + } + + try { + QueryProcessor queryProcessor = + appContext.getBean(QueryConfig.BEAN_NAME_ARCHIVER_APPLIANCE_QUERY_PROCESSOR, QueryProcessor.class); + queryProcessors.put(queryProcessor.getBackend(), queryProcessor); + } catch (Exception e) { + exceptions.add(e); + LOGGER.warn(""); + LOGGER.warn("##########"); + LOGGER.warn("Could not load query processor for archiverappliance."); + LOGGER.warn("##########"); + LOGGER.warn(""); + } + + if (queryProcessors.isEmpty()) { + LOGGER.error("No query processor could be loaded! Exceptions were: "); + for (Exception exception : exceptions) { + LOGGER.error("", exception); + } + + throw new RuntimeException("No Backends available!"); + } + + try { + StatusReader statusReader = + appContext.getBean(QueryConfig.BEAN_NAME_CASSANDRA_STATUS_READER, StatusReader.class); + statusReaders.put(statusReader.getBackend(), statusReader); + } catch (Exception e) { + exceptions.add(e); + LOGGER.warn(""); + LOGGER.warn("##########"); + LOGGER.warn("Could not load status reader for cassandra."); + LOGGER.warn("##########"); + LOGGER.warn(""); + } + + try { + StatusReader statusReader = + appContext.getBean(QueryConfig.BEAN_NAME_ARCHIVER_APPLIANCE_STATUS_READER, StatusReader.class); + statusReaders.put(statusReader.getBackend(), statusReader); + } catch (Exception e) { + exceptions.add(e); + LOGGER.warn(""); + LOGGER.warn("##########"); + LOGGER.warn("Could not load status reader for archiverappliance."); + LOGGER.warn("##########"); + LOGGER.warn(""); + } + + channelNameCache = + new ChannelNameCache(queryProcessors, appContext.getBean(CassandraConfig.BEAN_NAME_READ_TIMEOUT, + Integer.class).longValue()); + } + + @PreDestroy + public void destroy() { + channelNameCache.destroy(); + } + @InitBinder protected void initBinder(WebDataBinder binder) { if (binder.getTarget() != null) { @@ -103,6 +206,46 @@ public class QueryRestController { return getChannels(new ChannelsRequest(channelName)); } + /** + * Queries for channels status + * + * @param query the {@link ChannelStatusQuery} + * @param res the {@link HttpServletResponse} instance associated with this request + * @throws IOException thrown if writing to the output stream fails + */ + @RequestMapping( + value = PATH_STATUS_CHANNELS, + method = RequestMethod.POST, + consumes = {MediaType.APPLICATION_JSON_VALUE}) + public @ResponseBody Collection executeChannelStatusQuery(@RequestBody ChannelStatusQuery query, + HttpServletResponse res) throws Exception { + + // set backends if not defined yet + channelNameCache.setBackends(query.getChannels()); + + List> futures = new ArrayList<>(query.getChannels().size()); + for (ChannelName channel : query.getChannels()) { + StatusReader statusReader = statusReaders.get(channel.getBackend()); + if (statusReader != null) { + futures.add(statusReader.getChannelStatusAsync(channel.getName())); + } else { + LOGGER.warn("There is no StatusReader available for '{}'.", channel.getBackend()); + } + } + + try { + List retStatus = new ArrayList<>(futures.size()); + for (CompletableFuture completableFuture : futures) { + retStatus.add(completableFuture.get(readTimeout, TimeUnit.SECONDS)); + } + return retStatus; + } catch (Exception e) { + String message = "Could not extract ChannelStatus within timelimits."; + LOGGER.error(message, e); + throw new IllegalStateException(message, e); + } + } + /** * Accepts the properties for the {@link DAQQuery} instance as stringified JSON query string. * diff --git a/src/main/resources/queryrest.properties b/src/main/resources/queryrest.properties index 5fbaf69..baaec0b 100644 --- a/src/main/resources/queryrest.properties +++ b/src/main/resources/queryrest.properties @@ -16,3 +16,6 @@ queryrest.cors.forceallheaders=false # If this is set to '*', then all requests are allowed, from any source. If it's set to, say, http://ui-data-api.psi.ch, then only requests # originating from that domain (Origin header set to that value) will be allowed. Otherwise a 403 error will be returned. queryrest.cors.allowedorigins=* + +# Timeout for read operations in seconds +queryrest.readtimeout=30 diff --git a/src/test/java/ch/psi/daq/test/queryrest/DaqWebMvcConfig.java b/src/test/java/ch/psi/daq/test/queryrest/DaqWebMvcConfig.java index 6404193..cb512cd 100644 --- a/src/test/java/ch/psi/daq/test/queryrest/DaqWebMvcConfig.java +++ b/src/test/java/ch/psi/daq/test/queryrest/DaqWebMvcConfig.java @@ -12,12 +12,15 @@ import org.springframework.web.servlet.config.annotation.WebMvcConfigurationSupp import ch.psi.daq.cassandra.reader.CassandraReader; import ch.psi.daq.domain.reader.DataReader; +import ch.psi.daq.domain.status.StatusReader; import ch.psi.daq.query.config.QueryConfig; import ch.psi.daq.query.processor.QueryProcessor; import ch.psi.daq.query.processor.QueryProcessorLocal; import ch.psi.daq.test.query.config.LocalQueryTestConfig; import ch.psi.daq.test.queryrest.query.DummyArchiverApplianceReader; import ch.psi.daq.test.queryrest.query.DummyCassandraReader; +import ch.psi.daq.test.queryrest.status.DummyArchiverApplianceStatusReader; +import ch.psi.daq.test.queryrest.status.DummyCassandraStatusReader; @Configuration @ComponentScan @@ -50,4 +53,16 @@ public class DaqWebMvcConfig extends WebMvcConfigurationSupport { public DataReader archiverApplianceReader() { return new DummyArchiverApplianceReader(); } + + @Bean(name = QueryConfig.BEAN_NAME_CASSANDRA_STATUS_READER) + @Lazy + public StatusReader cassandraStatusReader() { + return new DummyCassandraStatusReader(); + } + + @Bean(name = QueryConfig.BEAN_NAME_ARCHIVER_APPLIANCE_STATUS_READER) + @Lazy + public StatusReader archiverApplianceStatusReader() { + return new DummyArchiverApplianceStatusReader(); + } } diff --git a/src/test/java/ch/psi/daq/test/queryrest/controller/QueryRestControllerChannelStatusTest.java b/src/test/java/ch/psi/daq/test/queryrest/controller/QueryRestControllerChannelStatusTest.java new file mode 100644 index 0000000..82ff11f --- /dev/null +++ b/src/test/java/ch/psi/daq/test/queryrest/controller/QueryRestControllerChannelStatusTest.java @@ -0,0 +1,122 @@ +package ch.psi.daq.test.queryrest.controller; + +import static org.junit.Assert.assertFalse; + +import org.junit.After; +import org.junit.Test; +import org.springframework.http.MediaType; +import org.springframework.test.web.servlet.MvcResult; +import org.springframework.test.web.servlet.request.MockMvcRequestBuilders; +import org.springframework.test.web.servlet.result.MockMvcResultHandlers; +import org.springframework.test.web.servlet.result.MockMvcResultMatchers; + +import ch.psi.daq.common.time.TimeUtils; +import ch.psi.daq.domain.json.ChannelName; +import ch.psi.daq.domain.query.status.channel.ChannelStatusQuery; +import ch.psi.daq.domain.reader.Backend; +import ch.psi.daq.queryrest.controller.QueryRestController; +import ch.psi.daq.test.queryrest.AbstractDaqRestTest; + +/** + * Tests the {@link DaqController} implementation. + */ +public class QueryRestControllerChannelStatusTest extends AbstractDaqRestTest { + + @After + public void tearDown() throws Exception {} + + @Test + public void testChannelStatusQuery_01() throws Exception { + ChannelStatusQuery query = new ChannelStatusQuery("DataBuffer1", "DataBuffer2"); + + String content = mapper.writeValueAsString(query); + System.out.println(content); + + this.mockMvc + .perform(MockMvcRequestBuilders + .post(QueryRestController.PATH_STATUS_CHANNELS) + .contentType(MediaType.APPLICATION_JSON) + .content(content)) + + .andExpect(MockMvcResultMatchers.status().isOk()) + .andExpect(MockMvcResultMatchers.jsonPath("$").isArray()) + .andExpect(MockMvcResultMatchers.jsonPath("$[0]").exists()) + .andExpect(MockMvcResultMatchers.jsonPath("$[0].channel.backend").value(Backend.SF_DATABUFFER.getKey())) + .andExpect(MockMvcResultMatchers.jsonPath("$[0].channel.name").value("DataBuffer1")) + .andExpect(MockMvcResultMatchers.jsonPath("$[0].recording").value(false)) + .andExpect(MockMvcResultMatchers.jsonPath("$[0].connected").value(false)) + .andExpect( + MockMvcResultMatchers.jsonPath("$[0].lastEventDate").value( + TimeUtils.format(TimeUtils.getTimeFromMillis(0, 0)))) + .andExpect(MockMvcResultMatchers.jsonPath("$[1]").exists()) + .andExpect(MockMvcResultMatchers.jsonPath("$[1].channel.backend").value(Backend.SF_DATABUFFER.getKey())) + .andExpect(MockMvcResultMatchers.jsonPath("$[1].channel.name").value("DataBuffer2")) + .andExpect(MockMvcResultMatchers.jsonPath("$[1].recording").value(false)) + .andExpect(MockMvcResultMatchers.jsonPath("$[1].connected").value(false)) + .andExpect( + MockMvcResultMatchers.jsonPath("$[1].lastEventDate").value( + TimeUtils.format(TimeUtils.getTimeFromMillis(0, 0)))) + .andExpect(MockMvcResultMatchers.jsonPath("$[2]").doesNotExist()); + } + + @Test + public void testChannelStatusQuery_02() throws Exception { + ChannelStatusQuery query = new ChannelStatusQuery(); + query.addChannel(new ChannelName("ArchiverAppliance1", Backend.SF_ARCHIVERAPPLIANCE)); + query.addChannel(new ChannelName("ArchiverAppliance2", Backend.SF_ARCHIVERAPPLIANCE)); + + String content = mapper.writeValueAsString(query); + System.out.println(content); + + this.mockMvc + .perform(MockMvcRequestBuilders + .post(QueryRestController.PATH_STATUS_CHANNELS) + .contentType(MediaType.APPLICATION_JSON) + .content(content)) + + .andExpect(MockMvcResultMatchers.status().isOk()) + .andExpect(MockMvcResultMatchers.jsonPath("$").isArray()) + .andExpect(MockMvcResultMatchers.jsonPath("$[0]").exists()) + .andExpect( + MockMvcResultMatchers.jsonPath("$[0].channel.backend").value(Backend.SF_ARCHIVERAPPLIANCE.getKey())) + .andExpect(MockMvcResultMatchers.jsonPath("$[0].channel.name").value("ArchiverAppliance1")) + .andExpect(MockMvcResultMatchers.jsonPath("$[0].recording").value(true)) + .andExpect(MockMvcResultMatchers.jsonPath("$[0].connected").value(true)) + .andExpect( + MockMvcResultMatchers.jsonPath("$[0].lastEventDate").value( + TimeUtils.format(TimeUtils.getTimeFromMillis(1467638000000L, 0)))) + .andExpect(MockMvcResultMatchers.jsonPath("$[1]").exists()) + .andExpect( + MockMvcResultMatchers.jsonPath("$[1].channel.backend").value(Backend.SF_ARCHIVERAPPLIANCE.getKey())) + .andExpect(MockMvcResultMatchers.jsonPath("$[1].channel.name").value("ArchiverAppliance2")) + .andExpect(MockMvcResultMatchers.jsonPath("$[1].recording").value(true)) + .andExpect(MockMvcResultMatchers.jsonPath("$[1].connected").value(true)) + .andExpect( + MockMvcResultMatchers.jsonPath("$[1].lastEventDate").value( + TimeUtils.format(TimeUtils.getTimeFromMillis(1467638000000L, 0)))) + .andExpect(MockMvcResultMatchers.jsonPath("$[2]").doesNotExist()); + } + + @Test + public void testChannelStatusQuery_03() throws Exception { + ChannelStatusQuery query = new ChannelStatusQuery(); + query.addChannel(new ChannelName("ArchiverAppliance1", Backend.SF_ARCHIVERAPPLIANCE)); + query.addChannel(new ChannelName("ArchiverAppliance2", Backend.SF_ARCHIVERAPPLIANCE)); + + String content = mapper.writeValueAsString(query); + System.out.println(content); + + MvcResult result = this.mockMvc.perform(MockMvcRequestBuilders + .post(QueryRestController.PATH_STATUS_CHANNELS) + .contentType(MediaType.APPLICATION_JSON) + .content(content)) + .andDo(MockMvcResultHandlers.print()) + .andExpect(MockMvcResultMatchers.status().isOk()) + .andReturn(); + + String response = result.getResponse().getContentAsString(); + System.out.println("Response: " + response); + + assertFalse(response.contains("lastEventTime")); + } +} diff --git a/src/test/java/ch/psi/daq/test/queryrest/status/DummyArchiverApplianceStatusReader.java b/src/test/java/ch/psi/daq/test/queryrest/status/DummyArchiverApplianceStatusReader.java new file mode 100644 index 0000000..bdf600b --- /dev/null +++ b/src/test/java/ch/psi/daq/test/queryrest/status/DummyArchiverApplianceStatusReader.java @@ -0,0 +1,22 @@ +package ch.psi.daq.test.queryrest.status; + +import java.util.concurrent.CompletableFuture; + +import ch.psi.daq.common.time.TimeUtils; +import ch.psi.daq.domain.json.ChannelName; +import ch.psi.daq.domain.json.status.channel.ChannelStatus; +import ch.psi.daq.domain.reader.Backend; +import ch.psi.daq.domain.status.AbstractStatusReader; + +public class DummyArchiverApplianceStatusReader extends AbstractStatusReader { + + public DummyArchiverApplianceStatusReader() { + super(Backend.SF_ARCHIVERAPPLIANCE, 30); + } + + @Override + public CompletableFuture getChannelStatusAsync(String channel) { + return CompletableFuture.completedFuture(new ChannelStatus(new ChannelName(channel, getBackend()), true, true, + TimeUtils.getTimeFromMillis(1467638000000L, 0))); + } +} diff --git a/src/test/java/ch/psi/daq/test/queryrest/status/DummyCassandraStatusReader.java b/src/test/java/ch/psi/daq/test/queryrest/status/DummyCassandraStatusReader.java new file mode 100644 index 0000000..6d2cd3a --- /dev/null +++ b/src/test/java/ch/psi/daq/test/queryrest/status/DummyCassandraStatusReader.java @@ -0,0 +1,22 @@ +package ch.psi.daq.test.queryrest.status; + +import java.util.concurrent.CompletableFuture; + +import ch.psi.daq.common.time.TimeUtils; +import ch.psi.daq.domain.json.ChannelName; +import ch.psi.daq.domain.json.status.channel.ChannelStatus; +import ch.psi.daq.domain.reader.Backend; +import ch.psi.daq.domain.status.AbstractStatusReader; + +public class DummyCassandraStatusReader extends AbstractStatusReader { + + public DummyCassandraStatusReader() { + super(Backend.SF_DATABUFFER, 30); + } + + @Override + public CompletableFuture getChannelStatusAsync(String channel) { + return CompletableFuture.completedFuture(new ChannelStatus(new ChannelName(channel, getBackend()), false, false, + TimeUtils.getTimeFromMillis(0, 0))); + } +}