From 09851a72079233bf8c39322a92a9e1a28b147c64 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fabian=20M=C3=A4rki?= Date: Thu, 18 Jan 2018 15:26:02 +0100 Subject: [PATCH] Intermediate AnyRead format/protocol. --- .../response/csv/CSVResponseStreamWriter.java | 12 +- .../DAQQueriesResponseFormatter.java | 9 +- .../raq/anyread/AnyReadResponseFormatter.java | 224 ++++++++++++++++++ 3 files changed, 232 insertions(+), 13 deletions(-) create mode 100644 src/main/java/ch/psi/daq/queryrest/response/raq/anyread/AnyReadResponseFormatter.java diff --git a/src/main/java/ch/psi/daq/queryrest/response/csv/CSVResponseStreamWriter.java b/src/main/java/ch/psi/daq/queryrest/response/csv/CSVResponseStreamWriter.java index 3d74966..769176a 100644 --- a/src/main/java/ch/psi/daq/queryrest/response/csv/CSVResponseStreamWriter.java +++ b/src/main/java/ch/psi/daq/queryrest/response/csv/CSVResponseStreamWriter.java @@ -16,7 +16,6 @@ import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; -import java.util.function.ToLongFunction; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -57,6 +56,7 @@ import ch.psi.daq.queryrest.config.QueryRestConfig; import ch.psi.daq.queryrest.response.AbstractHTTPResponse; import ch.psi.daq.queryrest.response.ResponseFormatter; import ch.psi.daq.queryrest.response.ResponseStreamWriter; +import ch.psi.daq.queryrest.response.formatter.DAQQueriesResponseFormatter; /** * Takes a Java 8 stream and writes it to the output stream provided by the {@link ServletResponse} @@ -71,11 +71,6 @@ public class CSVResponseStreamWriter implements ResponseStreamWriter, Applicatio public static final char DELIMITER_CHANNELNAME_FIELDNAME = '.'; public static final String EMPTY_VALUE = ""; public static final String FIELDNAME_EXTREMA = "extrema"; - private static final Function KEY_PROVIDER = (event) -> new ChannelName(event.getChannel(), - event.getBackend()); - // try to match sync data (bsread) with non sync data (epics) based on the time usin 10 millis - // buckets. - private static final ToLongFunction MATCHER_PROVIDER = (event) -> event.getGlobalMillis() / 10L; private Function queryAnalizerFactory; @@ -133,6 +128,7 @@ public class CSVResponseStreamWriter implements ResponseStreamWriter, Applicatio entry.getValue() .sequential() .forEach(quadruple -> { + // single BackendQuery instance for all backendQueryRef.compareAndSet(null, quadruple.getFirst()); if (query.hasConfigFields() && quadruple.getThird() instanceof Stream) { @@ -164,8 +160,8 @@ public class CSVResponseStreamWriter implements ResponseStreamWriter, Applicatio // online matching of the stream's content final StreamMatcher> streamMatcher = new StreamMatcher<>( - KEY_PROVIDER, - MATCHER_PROVIDER, + DAQQueriesResponseFormatter.KEY_PROVIDER, + DAQQueriesResponseFormatter.MATCHER_PROVIDER, new MapCreator<>(), new MapFiller<>(), null, diff --git a/src/main/java/ch/psi/daq/queryrest/response/formatter/DAQQueriesResponseFormatter.java b/src/main/java/ch/psi/daq/queryrest/response/formatter/DAQQueriesResponseFormatter.java index 89cd25a..ac8ede7 100644 --- a/src/main/java/ch/psi/daq/queryrest/response/formatter/DAQQueriesResponseFormatter.java +++ b/src/main/java/ch/psi/daq/queryrest/response/formatter/DAQQueriesResponseFormatter.java @@ -66,13 +66,12 @@ public class DAQQueriesResponseFormatter public static final String META_RESP_FIELD = "meta"; public static final Mapping DEFAULT_MAPPING = new Mapping(IncompleteStrategy.PROVIDE_AS_IS); - private static final long MILLIS_PER_PULSE = TimeUtils.MILLIS_PER_PULSE; - private static final Function KEY_PROVIDER = (event) -> new ChannelName(event.getChannel(), + public static final Function KEY_PROVIDER = (event) -> new ChannelName(event.getChannel(), event.getBackend()); // try to match sync data (bsread) with non sync data (epics) based on the time usin 10 millis // buckets. - private static final ToLongFunction MATCHER_PROVIDER = (event) -> event.getGlobalMillis() - / MILLIS_PER_PULSE; + public static final ToLongFunction MATCHER_PROVIDER = (event) -> event.getGlobalMillis() + / TimeUtils.MILLIS_PER_PULSE; // In case ArchiverAppliance had several events within the 10ms mapping interval, return these // aggregations (only used for table format) @@ -234,7 +233,7 @@ public class DAQQueriesResponseFormatter if (requestRange.isPulseIdRangeDefined()) { binningStrategy = new BinningStrategyPerBinPulse(1); } else if (requestRange.isTimeRangeDefined()) { - binningStrategy = new BinningStrategyPerBinTime(MILLIS_PER_PULSE); + binningStrategy = new BinningStrategyPerBinTime(TimeUtils.MILLIS_PER_PULSE); } else { final String message = "Either time or pulseId range must be defined by the query!"; LOGGER.error(message); diff --git a/src/main/java/ch/psi/daq/queryrest/response/raq/anyread/AnyReadResponseFormatter.java b/src/main/java/ch/psi/daq/queryrest/response/raq/anyread/AnyReadResponseFormatter.java new file mode 100644 index 0000000..f58838e --- /dev/null +++ b/src/main/java/ch/psi/daq/queryrest/response/raq/anyread/AnyReadResponseFormatter.java @@ -0,0 +1,224 @@ +package ch.psi.daq.queryrest.response.raq.anyread; + +import java.io.BufferedOutputStream; +import java.io.OutputStream; +import java.util.Collection; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; +import java.util.function.ToLongFunction; +import java.util.stream.Stream; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Sets; + +import ch.psi.daq.common.stream.match.KeepAsIsPadder; +import ch.psi.daq.common.stream.match.KeepPreviousPadder; +import ch.psi.daq.common.stream.match.MapCreator; +import ch.psi.daq.common.stream.match.MapFiller; +import ch.psi.daq.common.stream.match.Padder; +import ch.psi.daq.common.stream.match.StreamMatcher; +import ch.psi.daq.common.time.TimeUtils; +import ch.psi.daq.common.tuple.Quadruple; +import ch.psi.daq.domain.DataEvent; +import ch.psi.daq.domain.events.ChannelConfiguration; +import ch.psi.daq.domain.json.ChannelName; +import ch.psi.daq.domain.query.DAQQueryElement; +import ch.psi.daq.domain.query.backend.BackendQuery; +import ch.psi.daq.domain.query.mapping.Mapping; +import ch.psi.daq.queryrest.response.AbstractHTTPResponse; +import ch.psi.daq.queryrest.response.csv.CSVResponseStreamWriter; +import ch.psi.daq.queryrest.response.formatter.DAQQueriesResponseFormatter; + +public class AnyReadResponseFormatter { + private static final Logger LOGGER = LoggerFactory.getLogger(AnyReadResponseFormatter.class); + + public static final Function CONFIG_KEY_PROVIDER = + (config) -> new ChannelName(config.getChannel(), + config.getBackend()); + // try to match sync data (bsread) with non sync data (epics) based on the time usin 10 millis + // buckets. + public static final ToLongFunction CONFIG_MATCHER_PROVIDER = + (config) -> config.getGlobalMillis() + / TimeUtils.MILLIS_PER_PULSE; + + @SuppressWarnings("unchecked") + public void format( + final ObjectMapper mapper, + final List>>> results, + final OutputStream out, + final AbstractHTTPResponse response) throws Exception { + final AtomicReference exception = new AtomicReference<>(); + + try (final BufferedOutputStream os = new BufferedOutputStream(out, 64 * 1024)) { + + results + .forEach(entryy -> { + final DAQQueryElement daqQuery = entryy.getKey(); + final AtomicReference backendQueryRef = new AtomicReference<>(); + + final Map> configStreams = + new LinkedHashMap<>(results.size()); + final Map> eventStreams = new LinkedHashMap<>(results.size()); + + entryy.getValue() + .forEach(quadruple -> { + // single BackendQuery instance for all + backendQueryRef.compareAndSet(null, quadruple.getFirst()); + + configStreams.put(quadruple.getSecond(), + (Stream) quadruple.getThird()); + eventStreams.put(quadruple.getSecond(), + (Stream) quadruple.getFourth()); + }); + + + if (exception.get() != null) { + try { + if (response.useTableFormat(daqQuery)) { + final Mapping mapping = + daqQuery.getMappingOrDefault(CSVResponseStreamWriter.DEFAULT_MAPPING); + final Padder padder = + mapping.getIncomplete().getPadder(backendQueryRef.get()); + + writeTableFormat( + os, + configStreams, + eventStreams, + padder); + } else { + writeArrayFormat( + os, + configStreams, + eventStreams); + } + } catch (Exception e) { + final String message = String.format("Could not write data for '{}'.", daqQuery); + LOGGER.warn(message, e); + exception.compareAndSet(null, new IllegalStateException(message, e)); + } + } + + for (Stream stream : configStreams.values()) { + stream.close(); + } + for (Stream stream : eventStreams.values()) { + stream.close(); + } + }); + } + + if (exception.get() != null) { + throw exception.get(); + } + } + + @SuppressWarnings("unchecked") + protected void writeArrayFormat( + final OutputStream os, + final Map> configStreams, + final Map> eventStreams) throws Exception { + + // padder do not matter as we are streaming channel by channel and do no inter Channel mapping + final Padder configPadder = new KeepAsIsPadder<>(); + final Padder eventPadder = new KeepAsIsPadder<>(); + + for (final Entry> entry : eventStreams.entrySet()) { + final Stream configStream = configStreams.get(entry.getKey()); + final Set keySet = Sets.newHashSet(entry.getKey()); + final Collection> configValues = Sets.newHashSet(configStream); + final Collection> eventValues = Sets.newHashSet(entry.getValue()); + + // online matching of the stream's content + final StreamMatcher> configStreamMatcher = + new StreamMatcher<>( + CONFIG_KEY_PROVIDER, + CONFIG_MATCHER_PROVIDER, + new MapCreator<>(), + new MapFiller<>(), + null, + configPadder, + keySet, + configValues); + final Iterator> configStreamsMatchIter = configStreamMatcher.iterator(); + + final StreamMatcher> eventStreamMatcher = + new StreamMatcher<>( + DAQQueriesResponseFormatter.KEY_PROVIDER, + DAQQueriesResponseFormatter.MATCHER_PROVIDER, + new MapCreator<>(), + new MapFiller<>(), + null, + eventPadder, + keySet, + eventValues); + final Iterator> eventStreamsMatchIter = eventStreamMatcher.iterator(); + + try { + write(os, configStreamsMatchIter, eventStreamsMatchIter); + } finally { + eventStreamMatcher.close(); + configStreamMatcher.close(); + } + } + } + + protected void writeTableFormat( + final OutputStream os, + final Map> configStreams, + final Map> eventStreams, + final Padder eventPadder) { + + // online matching of the stream's content + final StreamMatcher> configStreamMatcher = + new StreamMatcher<>( + CONFIG_KEY_PROVIDER, + CONFIG_MATCHER_PROVIDER, + new MapCreator<>(), + new MapFiller<>(), + null, + new KeepPreviousPadder<>(), + configStreams.keySet(), + configStreams.values()); + final Iterator> configStreamsMatchIter = configStreamMatcher.iterator(); + + final StreamMatcher> eventStreamMatcher = + new StreamMatcher<>( + DAQQueriesResponseFormatter.KEY_PROVIDER, + DAQQueriesResponseFormatter.MATCHER_PROVIDER, + new MapCreator<>(), + new MapFiller<>(), + null, + eventPadder, + eventStreams.keySet(), + eventStreams.values()); + final Iterator> eventStreamsMatchIter = eventStreamMatcher.iterator(); + + try { + write(os, configStreamsMatchIter, eventStreamsMatchIter); + } finally { + eventStreamMatcher.close(); + configStreamMatcher.close(); + } + } + + protected void write( + final OutputStream os, + final Iterator> configStreamsMatchIter, + final Iterator> eventStreamsMatchIter) { + + // TODO: implement whatefer protocol... + throw new UnsupportedOperationException("Proteocol not yet implemented."); + //Channel/DataEvent iterator + // -> hasNextConfigs() -> write config(s) - based on time of next event is after latest next-config + // -> hasNextEvents() -> write event(s) else stop + } +}