Intermediate AnyRead format/protocol.

This commit is contained in:
Fabian Märki 2018-01-18 15:26:02 +01:00
parent 0ff19f14a9
commit 09851a7207
3 changed files with 232 additions and 13 deletions

View File

@ -16,7 +16,6 @@ import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.ToLongFunction;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@ -57,6 +56,7 @@ import ch.psi.daq.queryrest.config.QueryRestConfig;
import ch.psi.daq.queryrest.response.AbstractHTTPResponse;
import ch.psi.daq.queryrest.response.ResponseFormatter;
import ch.psi.daq.queryrest.response.ResponseStreamWriter;
import ch.psi.daq.queryrest.response.formatter.DAQQueriesResponseFormatter;
/**
* Takes a Java 8 stream and writes it to the output stream provided by the {@link ServletResponse}
@ -71,11 +71,6 @@ public class CSVResponseStreamWriter implements ResponseStreamWriter, Applicatio
public static final char DELIMITER_CHANNELNAME_FIELDNAME = '.';
public static final String EMPTY_VALUE = "";
public static final String FIELDNAME_EXTREMA = "extrema";
private static final Function<DataEvent, ChannelName> KEY_PROVIDER = (event) -> new ChannelName(event.getChannel(),
event.getBackend());
// try to match sync data (bsread) with non sync data (epics) based on the time usin 10 millis
// buckets.
private static final ToLongFunction<DataEvent> MATCHER_PROVIDER = (event) -> event.getGlobalMillis() / 10L;
private Function<BackendQuery, BackendQueryAnalyzer> queryAnalizerFactory;
@ -133,6 +128,7 @@ public class CSVResponseStreamWriter implements ResponseStreamWriter, Applicatio
entry.getValue()
.sequential()
.forEach(quadruple -> {
// single BackendQuery instance for all
backendQueryRef.compareAndSet(null, quadruple.getFirst());
if (query.hasConfigFields() && quadruple.getThird() instanceof Stream) {
@ -164,8 +160,8 @@ public class CSVResponseStreamWriter implements ResponseStreamWriter, Applicatio
// online matching of the stream's content
final StreamMatcher<ChannelName, DataEvent, Map<ChannelName, DataEvent>> streamMatcher =
new StreamMatcher<>(
KEY_PROVIDER,
MATCHER_PROVIDER,
DAQQueriesResponseFormatter.KEY_PROVIDER,
DAQQueriesResponseFormatter.MATCHER_PROVIDER,
new MapCreator<>(),
new MapFiller<>(),
null,

View File

@ -66,13 +66,12 @@ public class DAQQueriesResponseFormatter
public static final String META_RESP_FIELD = "meta";
public static final Mapping DEFAULT_MAPPING = new Mapping(IncompleteStrategy.PROVIDE_AS_IS);
private static final long MILLIS_PER_PULSE = TimeUtils.MILLIS_PER_PULSE;
private static final Function<DataEvent, ChannelName> KEY_PROVIDER = (event) -> new ChannelName(event.getChannel(),
public static final Function<DataEvent, ChannelName> KEY_PROVIDER = (event) -> new ChannelName(event.getChannel(),
event.getBackend());
// try to match sync data (bsread) with non sync data (epics) based on the time usin 10 millis
// buckets.
private static final ToLongFunction<DataEvent> MATCHER_PROVIDER = (event) -> event.getGlobalMillis()
/ MILLIS_PER_PULSE;
public static final ToLongFunction<DataEvent> MATCHER_PROVIDER = (event) -> event.getGlobalMillis()
/ TimeUtils.MILLIS_PER_PULSE;
// In case ArchiverAppliance had several events within the 10ms mapping interval, return these
// aggregations (only used for table format)
@ -234,7 +233,7 @@ public class DAQQueriesResponseFormatter
if (requestRange.isPulseIdRangeDefined()) {
binningStrategy = new BinningStrategyPerBinPulse(1);
} else if (requestRange.isTimeRangeDefined()) {
binningStrategy = new BinningStrategyPerBinTime(MILLIS_PER_PULSE);
binningStrategy = new BinningStrategyPerBinTime(TimeUtils.MILLIS_PER_PULSE);
} else {
final String message = "Either time or pulseId range must be defined by the query!";
LOGGER.error(message);

View File

@ -0,0 +1,224 @@
package ch.psi.daq.queryrest.response.raq.anyread;
import java.io.BufferedOutputStream;
import java.io.OutputStream;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.ToLongFunction;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Sets;
import ch.psi.daq.common.stream.match.KeepAsIsPadder;
import ch.psi.daq.common.stream.match.KeepPreviousPadder;
import ch.psi.daq.common.stream.match.MapCreator;
import ch.psi.daq.common.stream.match.MapFiller;
import ch.psi.daq.common.stream.match.Padder;
import ch.psi.daq.common.stream.match.StreamMatcher;
import ch.psi.daq.common.time.TimeUtils;
import ch.psi.daq.common.tuple.Quadruple;
import ch.psi.daq.domain.DataEvent;
import ch.psi.daq.domain.events.ChannelConfiguration;
import ch.psi.daq.domain.json.ChannelName;
import ch.psi.daq.domain.query.DAQQueryElement;
import ch.psi.daq.domain.query.backend.BackendQuery;
import ch.psi.daq.domain.query.mapping.Mapping;
import ch.psi.daq.queryrest.response.AbstractHTTPResponse;
import ch.psi.daq.queryrest.response.csv.CSVResponseStreamWriter;
import ch.psi.daq.queryrest.response.formatter.DAQQueriesResponseFormatter;
public class AnyReadResponseFormatter {
private static final Logger LOGGER = LoggerFactory.getLogger(AnyReadResponseFormatter.class);
public static final Function<ChannelConfiguration, ChannelName> CONFIG_KEY_PROVIDER =
(config) -> new ChannelName(config.getChannel(),
config.getBackend());
// try to match sync data (bsread) with non sync data (epics) based on the time usin 10 millis
// buckets.
public static final ToLongFunction<ChannelConfiguration> CONFIG_MATCHER_PROVIDER =
(config) -> config.getGlobalMillis()
/ TimeUtils.MILLIS_PER_PULSE;
@SuppressWarnings("unchecked")
public void format(
final ObjectMapper mapper,
final List<Entry<DAQQueryElement, Stream<Quadruple<BackendQuery, ChannelName, ?, ?>>>> results,
final OutputStream out,
final AbstractHTTPResponse response) throws Exception {
final AtomicReference<Exception> exception = new AtomicReference<>();
try (final BufferedOutputStream os = new BufferedOutputStream(out, 64 * 1024)) {
results
.forEach(entryy -> {
final DAQQueryElement daqQuery = entryy.getKey();
final AtomicReference<BackendQuery> backendQueryRef = new AtomicReference<>();
final Map<ChannelName, Stream<ChannelConfiguration>> configStreams =
new LinkedHashMap<>(results.size());
final Map<ChannelName, Stream<DataEvent>> eventStreams = new LinkedHashMap<>(results.size());
entryy.getValue()
.forEach(quadruple -> {
// single BackendQuery instance for all
backendQueryRef.compareAndSet(null, quadruple.getFirst());
configStreams.put(quadruple.getSecond(),
(Stream<ChannelConfiguration>) quadruple.getThird());
eventStreams.put(quadruple.getSecond(),
(Stream<DataEvent>) quadruple.getFourth());
});
if (exception.get() != null) {
try {
if (response.useTableFormat(daqQuery)) {
final Mapping mapping =
daqQuery.getMappingOrDefault(CSVResponseStreamWriter.DEFAULT_MAPPING);
final Padder<ChannelName, DataEvent> padder =
mapping.getIncomplete().getPadder(backendQueryRef.get());
writeTableFormat(
os,
configStreams,
eventStreams,
padder);
} else {
writeArrayFormat(
os,
configStreams,
eventStreams);
}
} catch (Exception e) {
final String message = String.format("Could not write data for '{}'.", daqQuery);
LOGGER.warn(message, e);
exception.compareAndSet(null, new IllegalStateException(message, e));
}
}
for (Stream<ChannelConfiguration> stream : configStreams.values()) {
stream.close();
}
for (Stream<DataEvent> stream : eventStreams.values()) {
stream.close();
}
});
}
if (exception.get() != null) {
throw exception.get();
}
}
@SuppressWarnings("unchecked")
protected void writeArrayFormat(
final OutputStream os,
final Map<ChannelName, Stream<ChannelConfiguration>> configStreams,
final Map<ChannelName, Stream<DataEvent>> eventStreams) throws Exception {
// padder do not matter as we are streaming channel by channel and do no inter Channel mapping
final Padder<ChannelName, ChannelConfiguration> configPadder = new KeepAsIsPadder<>();
final Padder<ChannelName, DataEvent> eventPadder = new KeepAsIsPadder<>();
for (final Entry<ChannelName, Stream<DataEvent>> entry : eventStreams.entrySet()) {
final Stream<ChannelConfiguration> configStream = configStreams.get(entry.getKey());
final Set<ChannelName> keySet = Sets.newHashSet(entry.getKey());
final Collection<Stream<ChannelConfiguration>> configValues = Sets.newHashSet(configStream);
final Collection<Stream<DataEvent>> eventValues = Sets.newHashSet(entry.getValue());
// online matching of the stream's content
final StreamMatcher<ChannelName, ChannelConfiguration, Map<ChannelName, ChannelConfiguration>> configStreamMatcher =
new StreamMatcher<>(
CONFIG_KEY_PROVIDER,
CONFIG_MATCHER_PROVIDER,
new MapCreator<>(),
new MapFiller<>(),
null,
configPadder,
keySet,
configValues);
final Iterator<Map<ChannelName, ChannelConfiguration>> configStreamsMatchIter = configStreamMatcher.iterator();
final StreamMatcher<ChannelName, DataEvent, Map<ChannelName, DataEvent>> eventStreamMatcher =
new StreamMatcher<>(
DAQQueriesResponseFormatter.KEY_PROVIDER,
DAQQueriesResponseFormatter.MATCHER_PROVIDER,
new MapCreator<>(),
new MapFiller<>(),
null,
eventPadder,
keySet,
eventValues);
final Iterator<Map<ChannelName, DataEvent>> eventStreamsMatchIter = eventStreamMatcher.iterator();
try {
write(os, configStreamsMatchIter, eventStreamsMatchIter);
} finally {
eventStreamMatcher.close();
configStreamMatcher.close();
}
}
}
protected void writeTableFormat(
final OutputStream os,
final Map<ChannelName, Stream<ChannelConfiguration>> configStreams,
final Map<ChannelName, Stream<DataEvent>> eventStreams,
final Padder<ChannelName, DataEvent> eventPadder) {
// online matching of the stream's content
final StreamMatcher<ChannelName, ChannelConfiguration, Map<ChannelName, ChannelConfiguration>> configStreamMatcher =
new StreamMatcher<>(
CONFIG_KEY_PROVIDER,
CONFIG_MATCHER_PROVIDER,
new MapCreator<>(),
new MapFiller<>(),
null,
new KeepPreviousPadder<>(),
configStreams.keySet(),
configStreams.values());
final Iterator<Map<ChannelName, ChannelConfiguration>> configStreamsMatchIter = configStreamMatcher.iterator();
final StreamMatcher<ChannelName, DataEvent, Map<ChannelName, DataEvent>> eventStreamMatcher =
new StreamMatcher<>(
DAQQueriesResponseFormatter.KEY_PROVIDER,
DAQQueriesResponseFormatter.MATCHER_PROVIDER,
new MapCreator<>(),
new MapFiller<>(),
null,
eventPadder,
eventStreams.keySet(),
eventStreams.values());
final Iterator<Map<ChannelName, DataEvent>> eventStreamsMatchIter = eventStreamMatcher.iterator();
try {
write(os, configStreamsMatchIter, eventStreamsMatchIter);
} finally {
eventStreamMatcher.close();
configStreamMatcher.close();
}
}
protected void write(
final OutputStream os,
final Iterator<Map<ChannelName, ChannelConfiguration>> configStreamsMatchIter,
final Iterator<Map<ChannelName, DataEvent>> eventStreamsMatchIter) {
// TODO: implement whatefer protocol...
throw new UnsupportedOperationException("Proteocol not yet implemented.");
//Channel/DataEvent iterator
// -> hasNextConfigs() -> write config(s) - based on time of next event is after latest next-config
// -> hasNextEvents() -> write event(s) else stop
}
}