RestHelper using WebClient
This commit is contained in:
parent
a98132de76
commit
81501df38d
@ -1,186 +0,0 @@
|
||||
package ch.psi.daq.queryrest.query;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Set;
|
||||
import java.util.TreeMap;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import javax.annotation.PreDestroy;
|
||||
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
import org.apache.commons.lang3.tuple.Triple;
|
||||
import org.springframework.beans.BeansException;
|
||||
import org.springframework.context.ApplicationContext;
|
||||
import org.springframework.context.ApplicationContextAware;
|
||||
import org.springframework.web.reactive.function.client.WebClient;
|
||||
|
||||
import ch.psi.daq.common.tuple.Quadruple;
|
||||
import ch.psi.daq.domain.DataEvent;
|
||||
import ch.psi.daq.domain.backend.Backend;
|
||||
import ch.psi.daq.domain.config.DomainConfig;
|
||||
import ch.psi.daq.domain.events.ChannelConfiguration;
|
||||
import ch.psi.daq.domain.json.ChannelName;
|
||||
import ch.psi.daq.domain.query.DAQConfigQuery;
|
||||
import ch.psi.daq.domain.query.DAQConfigQueryElement;
|
||||
import ch.psi.daq.domain.query.DAQQueries;
|
||||
import ch.psi.daq.domain.query.DAQQueryElement;
|
||||
import ch.psi.daq.domain.query.backend.BackendQuery;
|
||||
import ch.psi.daq.domain.query.backend.BackendQueryImpl;
|
||||
import ch.psi.daq.domain.query.backend.analyzer.BackendQueryAnalyzer;
|
||||
import ch.psi.daq.domain.query.channels.BackendsChannelConfigurationCache;
|
||||
import ch.psi.daq.domain.query.channels.ChannelConfigurationsRequest;
|
||||
import ch.psi.daq.domain.query.channels.ChannelConfigurationsResponse;
|
||||
import ch.psi.daq.domain.query.channels.ChannelsRequest;
|
||||
import ch.psi.daq.domain.query.channels.ChannelsResponse;
|
||||
import ch.psi.daq.domain.query.channels.LongHash;
|
||||
import ch.psi.daq.domain.query.processor.QueryProcessor;
|
||||
import ch.psi.daq.query.config.QueryConfig;
|
||||
import ch.psi.daq.queryrest.config.QueryRestConfig;
|
||||
|
||||
public class QueryManagerRemote implements QueryManager, ApplicationContextAware {
|
||||
private WebClient client;
|
||||
|
||||
private Map<Backend, String> backendToServerAddresses;
|
||||
// private BackendsChannelConfigurationCache channelsCache;
|
||||
// private Function<BackendQuery, BackendQueryAnalyzer> queryAnalizerFactory;
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public void setApplicationContext(ApplicationContext context) throws BeansException {
|
||||
backendToServerAddresses = new TreeMap<>(Comparator.comparingInt(backend -> backend.getId()));
|
||||
|
||||
|
||||
}
|
||||
|
||||
@PreDestroy
|
||||
public void destroy() {}
|
||||
|
||||
@Override
|
||||
public Stream<Backend> getBackends() {
|
||||
return Backend.getBackends().stream()
|
||||
.filter(backend -> activeBackends.contains(backend))
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
@Override
|
||||
public LongHash getChannelsHash() {
|
||||
return channelsCache.getChannelsHash();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Stream<ChannelsResponse> getChannels(ChannelsRequest request) {
|
||||
return channelsCache.getChannels(request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public LongHash getChannelConfigurationsHash() {
|
||||
return channelsCache.getChannelConfigurationsHash();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Stream<ChannelConfigurationsResponse> getChannelConfigurations(ChannelConfigurationsRequest request) {
|
||||
return channelsCache.getChannelConfigurations(request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelConfiguration getChannelConfiguration(ChannelName channel) {
|
||||
return channelsCache.getChannelConfiguration(channel);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Entry<DAQConfigQueryElement, Stream<Triple<BackendQuery, ChannelName, ?>>> queryConfigs(
|
||||
final DAQConfigQuery daqQuery) {
|
||||
// set backends if not defined yet
|
||||
channelsCache.configureBackends(daqQuery.getChannels());
|
||||
|
||||
Stream<Triple<BackendQuery, ChannelName, ?>> resultStreams =
|
||||
BackendQueryImpl
|
||||
.getBackendQueries(daqQuery)
|
||||
.stream()
|
||||
.filter(
|
||||
query -> query.getBackend().getBackendAccess().hasStreamEventReader())
|
||||
.flatMap(
|
||||
query -> {
|
||||
/* all the magic happens here */
|
||||
final Map<String, Stream<? extends ChannelConfiguration>> channelToConfig =
|
||||
query.getChannelConfigurations();
|
||||
|
||||
return channelToConfig.entrySet().stream()
|
||||
.map(entry -> {
|
||||
return Triple.of(
|
||||
query,
|
||||
new ChannelName(entry.getKey(), query.getBackend()),
|
||||
entry.getValue());
|
||||
});
|
||||
});
|
||||
|
||||
return Pair.of(daqQuery, resultStreams);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Entry<DAQQueryElement, Stream<Quadruple<BackendQuery, ChannelName, ?, ?>>>> queryEvents(
|
||||
final DAQQueries queries) {
|
||||
// set backends if not defined yet
|
||||
for (DAQQueryElement daqQuery : queries) {
|
||||
channelsCache.configureBackends(daqQuery.getChannels());
|
||||
}
|
||||
|
||||
final List<Entry<DAQQueryElement, Stream<Quadruple<BackendQuery, ChannelName, ?, ?>>>> results =
|
||||
new ArrayList<>(queries.getQueries().size());
|
||||
|
||||
for (final DAQQueryElement queryElement : queries) {
|
||||
Stream<Quadruple<BackendQuery, ChannelName, ?, ?>> resultStreams =
|
||||
BackendQueryImpl
|
||||
.getBackendQueries(queryElement)
|
||||
.stream()
|
||||
.filter(
|
||||
query -> query.getBackend().getBackendAccess().hasDataReader()
|
||||
&& query.getBackend().getBackendAccess().hasQueryProcessor())
|
||||
.flatMap(
|
||||
query -> {
|
||||
final QueryProcessor processor =
|
||||
query.getBackend().getBackendAccess().getQueryProcessor();
|
||||
final BackendQueryAnalyzer queryAnalizer = queryAnalizerFactory.apply(query);
|
||||
|
||||
// ChannelEvent query
|
||||
/* all the magic happens here */
|
||||
final Stream<Entry<ChannelName, Stream<? extends DataEvent>>> channelToDataEvents =
|
||||
processor.process(queryAnalizer);
|
||||
/* do post-process */
|
||||
final Stream<Entry<ChannelName, ?>> channelToData =
|
||||
queryAnalizer.postProcess(channelToDataEvents);
|
||||
|
||||
// ChannelConfig query
|
||||
final BackendQuery configQuery = new BackendQueryImpl(query, queryElement.getConfigFields());
|
||||
final Map<String, Stream<? extends ChannelConfiguration>> channelToConfig =
|
||||
configQuery.getChannelConfigurations();
|
||||
|
||||
return channelToData.map(entry -> {
|
||||
return Quadruple.of(
|
||||
query,
|
||||
entry.getKey(),
|
||||
channelToConfig.get(entry.getKey().getName()),
|
||||
entry.getValue());
|
||||
});
|
||||
});
|
||||
|
||||
// Now we have a stream that loads elements sequential BackendQuery by BackendQuery.
|
||||
// By materializing the outer Stream the elements of all BackendQuery are loaded async
|
||||
// (speeds things up but requires also more memory - i.e. it relies on Backends not loading
|
||||
// all elements into memory at once)
|
||||
resultStreams = resultStreams.collect(Collectors.toList()).stream();
|
||||
|
||||
results.add(Pair.of(queryElement, resultStreams));
|
||||
}
|
||||
|
||||
return results;
|
||||
}
|
||||
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user