diff --git a/src/main/java/ch/psi/daq/queryrest/query/QueryManagerRemote.java b/src/main/java/ch/psi/daq/queryrest/query/QueryManagerRemote.java deleted file mode 100644 index 5f20378..0000000 --- a/src/main/java/ch/psi/daq/queryrest/query/QueryManagerRemote.java +++ /dev/null @@ -1,186 +0,0 @@ -package ch.psi.daq.queryrest.query; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.Comparator; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.TreeMap; -import java.util.function.Function; -import java.util.stream.Collectors; -import java.util.stream.Stream; - -import javax.annotation.PreDestroy; - -import org.apache.commons.lang3.tuple.Pair; -import org.apache.commons.lang3.tuple.Triple; -import org.springframework.beans.BeansException; -import org.springframework.context.ApplicationContext; -import org.springframework.context.ApplicationContextAware; -import org.springframework.web.reactive.function.client.WebClient; - -import ch.psi.daq.common.tuple.Quadruple; -import ch.psi.daq.domain.DataEvent; -import ch.psi.daq.domain.backend.Backend; -import ch.psi.daq.domain.config.DomainConfig; -import ch.psi.daq.domain.events.ChannelConfiguration; -import ch.psi.daq.domain.json.ChannelName; -import ch.psi.daq.domain.query.DAQConfigQuery; -import ch.psi.daq.domain.query.DAQConfigQueryElement; -import ch.psi.daq.domain.query.DAQQueries; -import ch.psi.daq.domain.query.DAQQueryElement; -import ch.psi.daq.domain.query.backend.BackendQuery; -import ch.psi.daq.domain.query.backend.BackendQueryImpl; -import ch.psi.daq.domain.query.backend.analyzer.BackendQueryAnalyzer; -import ch.psi.daq.domain.query.channels.BackendsChannelConfigurationCache; -import ch.psi.daq.domain.query.channels.ChannelConfigurationsRequest; -import ch.psi.daq.domain.query.channels.ChannelConfigurationsResponse; -import ch.psi.daq.domain.query.channels.ChannelsRequest; -import ch.psi.daq.domain.query.channels.ChannelsResponse; -import ch.psi.daq.domain.query.channels.LongHash; -import ch.psi.daq.domain.query.processor.QueryProcessor; -import ch.psi.daq.query.config.QueryConfig; -import ch.psi.daq.queryrest.config.QueryRestConfig; - -public class QueryManagerRemote implements QueryManager, ApplicationContextAware { - private WebClient client; - - private Map backendToServerAddresses; -// private BackendsChannelConfigurationCache channelsCache; -// private Function queryAnalizerFactory; - - @SuppressWarnings("unchecked") - @Override - public void setApplicationContext(ApplicationContext context) throws BeansException { - backendToServerAddresses = new TreeMap<>(Comparator.comparingInt(backend -> backend.getId())); - - - } - - @PreDestroy - public void destroy() {} - - @Override - public Stream getBackends() { - return Backend.getBackends().stream() - .filter(backend -> activeBackends.contains(backend)) - .collect(Collectors.toList()); - } - - @Override - public LongHash getChannelsHash() { - return channelsCache.getChannelsHash(); - } - - @Override - public Stream getChannels(ChannelsRequest request) { - return channelsCache.getChannels(request); - } - - @Override - public LongHash getChannelConfigurationsHash() { - return channelsCache.getChannelConfigurationsHash(); - } - - @Override - public Stream getChannelConfigurations(ChannelConfigurationsRequest request) { - return channelsCache.getChannelConfigurations(request); - } - - @Override - public ChannelConfiguration getChannelConfiguration(ChannelName channel) { - return channelsCache.getChannelConfiguration(channel); - } - - @Override - public Entry>> queryConfigs( - final DAQConfigQuery daqQuery) { - // set backends if not defined yet - channelsCache.configureBackends(daqQuery.getChannels()); - - Stream> resultStreams = - BackendQueryImpl - .getBackendQueries(daqQuery) - .stream() - .filter( - query -> query.getBackend().getBackendAccess().hasStreamEventReader()) - .flatMap( - query -> { - /* all the magic happens here */ - final Map> channelToConfig = - query.getChannelConfigurations(); - - return channelToConfig.entrySet().stream() - .map(entry -> { - return Triple.of( - query, - new ChannelName(entry.getKey(), query.getBackend()), - entry.getValue()); - }); - }); - - return Pair.of(daqQuery, resultStreams); - } - - @Override - public List>>> queryEvents( - final DAQQueries queries) { - // set backends if not defined yet - for (DAQQueryElement daqQuery : queries) { - channelsCache.configureBackends(daqQuery.getChannels()); - } - - final List>>> results = - new ArrayList<>(queries.getQueries().size()); - - for (final DAQQueryElement queryElement : queries) { - Stream> resultStreams = - BackendQueryImpl - .getBackendQueries(queryElement) - .stream() - .filter( - query -> query.getBackend().getBackendAccess().hasDataReader() - && query.getBackend().getBackendAccess().hasQueryProcessor()) - .flatMap( - query -> { - final QueryProcessor processor = - query.getBackend().getBackendAccess().getQueryProcessor(); - final BackendQueryAnalyzer queryAnalizer = queryAnalizerFactory.apply(query); - - // ChannelEvent query - /* all the magic happens here */ - final Stream>> channelToDataEvents = - processor.process(queryAnalizer); - /* do post-process */ - final Stream> channelToData = - queryAnalizer.postProcess(channelToDataEvents); - - // ChannelConfig query - final BackendQuery configQuery = new BackendQueryImpl(query, queryElement.getConfigFields()); - final Map> channelToConfig = - configQuery.getChannelConfigurations(); - - return channelToData.map(entry -> { - return Quadruple.of( - query, - entry.getKey(), - channelToConfig.get(entry.getKey().getName()), - entry.getValue()); - }); - }); - - // Now we have a stream that loads elements sequential BackendQuery by BackendQuery. - // By materializing the outer Stream the elements of all BackendQuery are loaded async - // (speeds things up but requires also more memory - i.e. it relies on Backends not loading - // all elements into memory at once) - resultStreams = resultStreams.collect(Collectors.toList()).stream(); - - results.add(Pair.of(queryElement, resultStreams)); - } - - return results; - } - -}