diff --git a/src/main/java/ch/psi/daq/queryrest/config/QueryRestConfig.java b/src/main/java/ch/psi/daq/queryrest/config/QueryRestConfig.java index 46339a3..2868334 100644 --- a/src/main/java/ch/psi/daq/queryrest/config/QueryRestConfig.java +++ b/src/main/java/ch/psi/daq/queryrest/config/QueryRestConfig.java @@ -1,5 +1,6 @@ package ch.psi.daq.queryrest.config; +import java.util.ArrayList; import java.util.Arrays; import java.util.EnumMap; import java.util.LinkedHashSet; @@ -27,6 +28,7 @@ import org.springframework.validation.Validator; import com.fasterxml.jackson.annotation.JsonInclude.Include; import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.dataformat.smile.SmileFactory; @@ -36,6 +38,7 @@ import ch.psi.daq.domain.backend.Backend; import ch.psi.daq.domain.config.DomainConfig; import ch.psi.daq.domain.config.DomainConfigCORS; import ch.psi.daq.domain.events.ChannelConfiguration; +import ch.psi.daq.domain.json.ServerAddress; import ch.psi.daq.domain.json.mixin.HistoricChannelConfigurationPropertyFilterMixin; import ch.psi.daq.domain.json.mixin.PropertyFilterMixin; import ch.psi.daq.domain.query.backend.BackendQuery; @@ -112,6 +115,9 @@ public class QueryRestConfig { // extends WebMvcConfigurerAdapter { public static final String BEAN_NAME_FORMATTER_HISTORIC_CHANNEL_CONFIGURATION = "formatterHistoricChannelConfiguration"; public static final String BEAN_NAME_FORMATTER_RAW_EVENT = "formatterRawEvent"; + public static final String BEAN_NAME_QUERY_BACKENDS = "queryBackends"; + private static final String QUERY_BACKENDS = "query.backens"; + @Resource private ApplicationContext context; @@ -216,13 +222,13 @@ public class QueryRestConfig { // extends WebMvcConfigurerAdapter { public SmileResponseStreamWriter smileResponseStreamWriter() { return new SmileResponseStreamWriter(); } - + @Bean @Lazy public CSVResponseStreamWriter csvResponseStreamWriter() { return new CSVResponseStreamWriter(); } - + @Bean @Lazy public RawEventResponseStreamWriter rawEventResponseStreamWriter() { @@ -299,7 +305,8 @@ public class QueryRestConfig { // extends WebMvcConfigurerAdapter { @Lazy public Set defaultResponseAggregations() { String[] responseAggregations = - StringUtils.commaDelimitedListToStringArray(env.getProperty(QUERYREST_RESPONSE_FIELDS_EVENT_QUERY_AGGREGATIONS)); + StringUtils + .commaDelimitedListToStringArray(env.getProperty(QUERYREST_RESPONSE_FIELDS_EVENT_QUERY_AGGREGATIONS)); LOGGER.debug("Load '{}={}'", QUERYREST_RESPONSE_FIELDS_EVENT_QUERY_AGGREGATIONS, Arrays.toString(responseAggregations)); @@ -363,11 +370,44 @@ public class QueryRestConfig { // extends WebMvcConfigurerAdapter { BEAN_NAME_DEFAULT_EVENT_RESPONSE_FIELDS, BEAN_NAME_CONFIG_RESPONSE_FIELDS_HISTORIC); } - + @Bean(name = BEAN_NAME_FORMATTER_RAW_EVENT) @Scope(BeanDefinition.SCOPE_PROTOTYPE) @Lazy - public RawEventResponseFormatter idRawResponseFormatter(){ + public RawEventResponseFormatter idRawResponseFormatter() { return new RawEventResponseFormatter(); } + + @Bean(name = BEAN_NAME_QUERY_BACKENDS) + @Lazy + public List queryBackends() { + final String value = env.getProperty(QUERY_BACKENDS, "[{}]"); + LOGGER.debug("Load '{}={}'", QUERY_BACKENDS, value); + + List queryBackends; + try { + queryBackends = objectMapper.readValue(value, new TypeReference>() {}); + } catch (final Exception e) { + LOGGER.warn("Could not load '{}={}'. Use default", QUERY_BACKENDS, value, e); + ServerAddress defaultAddress = new ServerAddress(); + queryBackends = Arrays.asList(defaultAddress); + } + + final String newDefaultProtocol = "https"; + final String newDefaultHost = "data-api.psi.ch"; + final int newDefaultPort = 443; + // this should be overwritten + final String newDefaultPath = ""; + final List queryBackendStrs = new ArrayList<>(queryBackends.size()); + for (final ServerAddress serverAddress : queryBackends) { + serverAddress.replaceDefaults( + newDefaultProtocol, + newDefaultHost, + newDefaultPort, + newDefaultPath); + queryBackendStrs.add(serverAddress.getAddress()); + } + + return queryBackendStrs; + } } diff --git a/src/main/java/ch/psi/daq/queryrest/controller/QueryRestController.java b/src/main/java/ch/psi/daq/queryrest/controller/QueryRestController.java index a231116..580281e 100644 --- a/src/main/java/ch/psi/daq/queryrest/controller/QueryRestController.java +++ b/src/main/java/ch/psi/daq/queryrest/controller/QueryRestController.java @@ -3,9 +3,11 @@ package ch.psi.daq.queryrest.controller; import java.net.URLDecoder; import java.nio.charset.StandardCharsets; import java.util.Arrays; +import java.util.Collection; import java.util.List; +import java.util.Map; import java.util.Map.Entry; -import java.util.Set; +import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -72,7 +74,6 @@ public class QueryRestController implements ApplicationContextAware { private static final Logger LOGGER = LoggerFactory.getLogger(QueryRestController.class); - private Set activeBackends; private ApplicationContext context; private ObjectMapper objectMapper; private QueryManager queryManager; @@ -81,14 +82,12 @@ public class QueryRestController implements ApplicationContextAware { private Validator requestProviderValidator; private Response defaultResponse = new JSONHTTPResponse(); - @SuppressWarnings("unchecked") @Override public void setApplicationContext(ApplicationContext context) throws BeansException { final Backend backend = context.getBean(DomainConfig.BEAN_NAME_BACKEND_DEFAULT, Backend.class); context = backend.getApplicationContext(); this.context = context; - activeBackends = context.getBean(DomainConfig.BEAN_NAME_BACKENDS_ACTIVE, Set.class); objectMapper = context.getBean(DomainConfig.BEAN_NAME_OBJECT_MAPPER, ObjectMapper.class); queryManager = context.getBean(QueryRestConfig.BEAN_NAME_QUERY_MANAGER, QueryManager.class); eventQueryValidator = context.getBean(QueryRestConfig.BEAN_NAME_EVENT_QUERY_VALIDATOR, Validator.class); @@ -130,7 +129,7 @@ public class QueryRestController implements ApplicationContextAware { if (request == null) { request = new ChannelsRequest(); } - + ((AbstractHTTPResponse) defaultResponse).respond( context, res, @@ -166,7 +165,7 @@ public class QueryRestController implements ApplicationContextAware { if (request == null) { request = new ChannelConfigurationsRequest(); } - + ((AbstractHTTPResponse) defaultResponse).respond( context, res, @@ -511,12 +510,28 @@ public class QueryRestController implements ApplicationContextAware { value = DomainConfig.PATH_BACKENDS, method = {RequestMethod.GET}, produces = {MediaType.APPLICATION_JSON_VALUE}) - public @ResponseBody List getBackendValues() { - return Backend.getBackends().stream() - .filter(backend -> activeBackends.contains(backend)) + public @ResponseBody Collection getBackendValues() { + return queryManager.getBackends() .collect(Collectors.toList()); } + /** + * Returns the current list of {@link Backend} values. + * + * @return list of {@link Backend}s as String array + */ + @RequestMapping( + value = DomainConfig.PATH_BACKENDS_BY_ID, + method = {RequestMethod.GET}, + produces = {MediaType.APPLICATION_JSON_VALUE}) + public @ResponseBody Map getBackendValuesById() { + return queryManager.getBackends() + .collect( + Collectors.toMap( + Backend::getId, + Function.identity())); + } + /** * Returns the current list of {@link Compression}s available. * diff --git a/src/main/java/ch/psi/daq/queryrest/query/QueryManager.java b/src/main/java/ch/psi/daq/queryrest/query/QueryManager.java index a15977a..cc1b317 100644 --- a/src/main/java/ch/psi/daq/queryrest/query/QueryManager.java +++ b/src/main/java/ch/psi/daq/queryrest/query/QueryManager.java @@ -7,6 +7,7 @@ import java.util.stream.Stream; import org.apache.commons.lang3.tuple.Triple; import ch.psi.daq.common.tuple.Quadruple; +import ch.psi.daq.domain.backend.Backend; import ch.psi.daq.domain.events.ChannelConfiguration; import ch.psi.daq.domain.json.ChannelName; import ch.psi.daq.domain.query.DAQConfigQuery; @@ -22,6 +23,8 @@ import ch.psi.daq.domain.query.channels.LongHash; public interface QueryManager { + Stream getBackends(); + LongHash getChannelsHash(); Stream getChannels(final ChannelsRequest request) throws Exception; diff --git a/src/main/java/ch/psi/daq/queryrest/query/QueryManagerImpl.java b/src/main/java/ch/psi/daq/queryrest/query/QueryManagerImpl.java index 1de0944..9b8957a 100644 --- a/src/main/java/ch/psi/daq/queryrest/query/QueryManagerImpl.java +++ b/src/main/java/ch/psi/daq/queryrest/query/QueryManagerImpl.java @@ -4,6 +4,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -40,6 +41,7 @@ import ch.psi.daq.query.config.QueryConfig; import ch.psi.daq.queryrest.config.QueryRestConfig; public class QueryManagerImpl implements QueryManager, ApplicationContextAware { + private Set activeBackends; private BackendsChannelConfigurationCache channelsCache; private Function queryAnalizerFactory; @@ -49,6 +51,7 @@ public class QueryManagerImpl implements QueryManager, ApplicationContextAware { final Backend backend = context.getBean(DomainConfig.BEAN_NAME_BACKEND_DEFAULT, Backend.class); context = backend.getApplicationContext(); + activeBackends = context.getBean(DomainConfig.BEAN_NAME_BACKENDS_ACTIVE, Set.class); channelsCache = context.getBean(QueryConfig.BEAN_NAME_HISTORIC_CHANNELS_CACHE, BackendsChannelConfigurationCache.class); queryAnalizerFactory = context.getBean(QueryRestConfig.BEAN_NAME_QUERY_ANALIZER_FACTORY, Function.class); @@ -57,6 +60,12 @@ public class QueryManagerImpl implements QueryManager, ApplicationContextAware { @PreDestroy public void destroy() {} + @Override + public Stream getBackends() { + return Backend.getBackends().stream() + .filter(backend -> activeBackends.contains(backend)); + } + @Override public LongHash getChannelsHash() { return channelsCache.getChannelsHash(); diff --git a/src/main/java/ch/psi/daq/queryrest/query/QueryManagerRemote.java b/src/main/java/ch/psi/daq/queryrest/query/QueryManagerRemote.java new file mode 100644 index 0000000..5f20378 --- /dev/null +++ b/src/main/java/ch/psi/daq/queryrest/query/QueryManagerRemote.java @@ -0,0 +1,186 @@ +package ch.psi.daq.queryrest.query; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.TreeMap; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import javax.annotation.PreDestroy; + +import org.apache.commons.lang3.tuple.Pair; +import org.apache.commons.lang3.tuple.Triple; +import org.springframework.beans.BeansException; +import org.springframework.context.ApplicationContext; +import org.springframework.context.ApplicationContextAware; +import org.springframework.web.reactive.function.client.WebClient; + +import ch.psi.daq.common.tuple.Quadruple; +import ch.psi.daq.domain.DataEvent; +import ch.psi.daq.domain.backend.Backend; +import ch.psi.daq.domain.config.DomainConfig; +import ch.psi.daq.domain.events.ChannelConfiguration; +import ch.psi.daq.domain.json.ChannelName; +import ch.psi.daq.domain.query.DAQConfigQuery; +import ch.psi.daq.domain.query.DAQConfigQueryElement; +import ch.psi.daq.domain.query.DAQQueries; +import ch.psi.daq.domain.query.DAQQueryElement; +import ch.psi.daq.domain.query.backend.BackendQuery; +import ch.psi.daq.domain.query.backend.BackendQueryImpl; +import ch.psi.daq.domain.query.backend.analyzer.BackendQueryAnalyzer; +import ch.psi.daq.domain.query.channels.BackendsChannelConfigurationCache; +import ch.psi.daq.domain.query.channels.ChannelConfigurationsRequest; +import ch.psi.daq.domain.query.channels.ChannelConfigurationsResponse; +import ch.psi.daq.domain.query.channels.ChannelsRequest; +import ch.psi.daq.domain.query.channels.ChannelsResponse; +import ch.psi.daq.domain.query.channels.LongHash; +import ch.psi.daq.domain.query.processor.QueryProcessor; +import ch.psi.daq.query.config.QueryConfig; +import ch.psi.daq.queryrest.config.QueryRestConfig; + +public class QueryManagerRemote implements QueryManager, ApplicationContextAware { + private WebClient client; + + private Map backendToServerAddresses; +// private BackendsChannelConfigurationCache channelsCache; +// private Function queryAnalizerFactory; + + @SuppressWarnings("unchecked") + @Override + public void setApplicationContext(ApplicationContext context) throws BeansException { + backendToServerAddresses = new TreeMap<>(Comparator.comparingInt(backend -> backend.getId())); + + + } + + @PreDestroy + public void destroy() {} + + @Override + public Stream getBackends() { + return Backend.getBackends().stream() + .filter(backend -> activeBackends.contains(backend)) + .collect(Collectors.toList()); + } + + @Override + public LongHash getChannelsHash() { + return channelsCache.getChannelsHash(); + } + + @Override + public Stream getChannels(ChannelsRequest request) { + return channelsCache.getChannels(request); + } + + @Override + public LongHash getChannelConfigurationsHash() { + return channelsCache.getChannelConfigurationsHash(); + } + + @Override + public Stream getChannelConfigurations(ChannelConfigurationsRequest request) { + return channelsCache.getChannelConfigurations(request); + } + + @Override + public ChannelConfiguration getChannelConfiguration(ChannelName channel) { + return channelsCache.getChannelConfiguration(channel); + } + + @Override + public Entry>> queryConfigs( + final DAQConfigQuery daqQuery) { + // set backends if not defined yet + channelsCache.configureBackends(daqQuery.getChannels()); + + Stream> resultStreams = + BackendQueryImpl + .getBackendQueries(daqQuery) + .stream() + .filter( + query -> query.getBackend().getBackendAccess().hasStreamEventReader()) + .flatMap( + query -> { + /* all the magic happens here */ + final Map> channelToConfig = + query.getChannelConfigurations(); + + return channelToConfig.entrySet().stream() + .map(entry -> { + return Triple.of( + query, + new ChannelName(entry.getKey(), query.getBackend()), + entry.getValue()); + }); + }); + + return Pair.of(daqQuery, resultStreams); + } + + @Override + public List>>> queryEvents( + final DAQQueries queries) { + // set backends if not defined yet + for (DAQQueryElement daqQuery : queries) { + channelsCache.configureBackends(daqQuery.getChannels()); + } + + final List>>> results = + new ArrayList<>(queries.getQueries().size()); + + for (final DAQQueryElement queryElement : queries) { + Stream> resultStreams = + BackendQueryImpl + .getBackendQueries(queryElement) + .stream() + .filter( + query -> query.getBackend().getBackendAccess().hasDataReader() + && query.getBackend().getBackendAccess().hasQueryProcessor()) + .flatMap( + query -> { + final QueryProcessor processor = + query.getBackend().getBackendAccess().getQueryProcessor(); + final BackendQueryAnalyzer queryAnalizer = queryAnalizerFactory.apply(query); + + // ChannelEvent query + /* all the magic happens here */ + final Stream>> channelToDataEvents = + processor.process(queryAnalizer); + /* do post-process */ + final Stream> channelToData = + queryAnalizer.postProcess(channelToDataEvents); + + // ChannelConfig query + final BackendQuery configQuery = new BackendQueryImpl(query, queryElement.getConfigFields()); + final Map> channelToConfig = + configQuery.getChannelConfigurations(); + + return channelToData.map(entry -> { + return Quadruple.of( + query, + entry.getKey(), + channelToConfig.get(entry.getKey().getName()), + entry.getValue()); + }); + }); + + // Now we have a stream that loads elements sequential BackendQuery by BackendQuery. + // By materializing the outer Stream the elements of all BackendQuery are loaded async + // (speeds things up but requires also more memory - i.e. it relies on Backends not loading + // all elements into memory at once) + resultStreams = resultStreams.collect(Collectors.toList()).stream(); + + results.add(Pair.of(queryElement, resultStreams)); + } + + return results; + } + +} diff --git a/src/main/resources/queryrest.properties b/src/main/resources/queryrest.properties index e846609..b14e1d3 100644 --- a/src/main/resources/queryrest.properties +++ b/src/main/resources/queryrest.properties @@ -9,6 +9,10 @@ queryrest.response.fields.event.query.aggregations=min,mean,max queryrest.response.fields.config.query=name,backend,pulseId,globalSeconds,type,shape,source,unit queryrest.response.fields.config.historic=name,backend,type,shape,source,description,unit +# Central Query Configs +####################### +query.backens=[{"path":"/sf"},{"path":"/gls"},{"path":"/hipa"},{"path":"/saresa"},{"path":"/saresb"}] + # defines if the writer is a local writer (can write data to filesystem) filestorage.writer.local=false # defines if the writer is a local reader (can read data from filesystem)