Raw data format.

This commit is contained in:
Fabian Märki
2018-02-07 13:59:43 +01:00
parent 09851a7207
commit 8d0acaccb4
41 changed files with 4659 additions and 107 deletions

View File

@@ -13,12 +13,14 @@ import javax.annotation.Resource;
import org.msgpack.jackson.dataformat.MessagePackFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.context.annotation.Lazy;
import org.springframework.context.annotation.PropertySource;
import org.springframework.context.annotation.Scope;
import org.springframework.core.env.Environment;
import org.springframework.util.StringUtils;
import org.springframework.validation.Validator;
@@ -57,6 +59,8 @@ import ch.psi.daq.queryrest.response.formatter.DAQConfigQueryResponseFormatter;
import ch.psi.daq.queryrest.response.formatter.DAQQueriesResponseFormatter;
import ch.psi.daq.queryrest.response.json.JSONResponseStreamWriter;
import ch.psi.daq.queryrest.response.msgpack.MsgPackResponseStreamWriter;
import ch.psi.daq.queryrest.response.raw.event.RawEventResponseFormatter;
import ch.psi.daq.queryrest.response.raw.event.RawEventResponseStreamWriter;
import ch.psi.daq.queryrest.response.smile.SmileResponseStreamWriter;
@Configuration
@@ -107,6 +111,7 @@ public class QueryRestConfig { // extends WebMvcConfigurerAdapter {
public static final String BEAN_NAME_FORMATTER_ANY = "formatterAny";
public static final String BEAN_NAME_FORMATTER_HISTORIC_CHANNEL_CONFIGURATION =
"formatterHistoricChannelConfiguration";
public static final String BEAN_NAME_FORMATTER_RAW_EVENT = "formatterRawEvent";
@Resource
private ApplicationContext context;
@@ -211,12 +216,18 @@ public class QueryRestConfig { // extends WebMvcConfigurerAdapter {
public SmileResponseStreamWriter smileResponseStreamWriter() {
return new SmileResponseStreamWriter();
}
@Bean
@Lazy
public CSVResponseStreamWriter csvResponseStreamWriter() {
return new CSVResponseStreamWriter();
}
@Bean
@Lazy
public RawEventResponseStreamWriter rawEventResponseStreamWriter() {
return new RawEventResponseStreamWriter();
}
@Bean(name = BEAN_NAME_QUERY_MANAGER)
@Lazy
@@ -352,4 +363,11 @@ public class QueryRestConfig { // extends WebMvcConfigurerAdapter {
BEAN_NAME_DEFAULT_EVENT_RESPONSE_FIELDS,
BEAN_NAME_CONFIG_RESPONSE_FIELDS_HISTORIC);
}
@Bean(name = BEAN_NAME_FORMATTER_RAW_EVENT)
@Scope(BeanDefinition.SCOPE_PROTOTYPE)
@Lazy
public RawEventResponseFormatter idRawResponseFormatter(){
return new RawEventResponseFormatter();
}
}

View File

@@ -65,9 +65,6 @@ import ch.psi.daq.queryrest.config.QueryRestConfig;
import ch.psi.daq.queryrest.query.QueryManager;
import ch.psi.daq.queryrest.response.AbstractHTTPResponse;
import ch.psi.daq.queryrest.response.PolymorphicResponseMixIn;
import ch.psi.daq.queryrest.response.formatter.AnyResponseFormatter;
import ch.psi.daq.queryrest.response.formatter.DAQConfigQueryResponseFormatter;
import ch.psi.daq.queryrest.response.formatter.DAQQueriesResponseFormatter;
import ch.psi.daq.queryrest.response.json.JSONHTTPResponse;
@RestController
@@ -83,10 +80,6 @@ public class QueryRestController implements ApplicationContextAware {
private Validator configQueryValidator;
private Validator requestProviderValidator;
private Response defaultResponse = new JSONHTTPResponse();
private AnyResponseFormatter anyFormatter;
private AnyResponseFormatter historicConfigFormatter;
private DAQConfigQueryResponseFormatter configQueryFormatter;
private DAQQueriesResponseFormatter daqQueriesFormatter;
@SuppressWarnings("unchecked")
@Override
@@ -101,14 +94,6 @@ public class QueryRestController implements ApplicationContextAware {
eventQueryValidator = context.getBean(QueryRestConfig.BEAN_NAME_EVENT_QUERY_VALIDATOR, Validator.class);
configQueryValidator = context.getBean(QueryRestConfig.BEAN_NAME_CONFIG_QUERY_VALIDATOR, Validator.class);
requestProviderValidator = context.getBean(QueryRestConfig.BEAN_NAME_REQUEST_PROVIDER_VALIDATOR, Validator.class);
anyFormatter = context.getBean(QueryRestConfig.BEAN_NAME_FORMATTER_ANY, AnyResponseFormatter.class);
historicConfigFormatter = context.getBean(QueryRestConfig.BEAN_NAME_FORMATTER_HISTORIC_CHANNEL_CONFIGURATION,
AnyResponseFormatter.class);
configQueryFormatter = context.getBean(QueryRestConfig.BEAN_NAME_FORMATTER_DAQ_CONFIG_QUERY,
DAQConfigQueryResponseFormatter.class);
daqQueriesFormatter =
context.getBean(QueryRestConfig.BEAN_NAME_FORMATTER_DAQ_QUERIES, DAQQueriesResponseFormatter.class);
}
@InitBinder
@@ -141,12 +126,16 @@ public class QueryRestController implements ApplicationContextAware {
public void getChannels(@RequestBody(required = false) ChannelsRequest request,
final HttpServletResponse res)
throws Throwable {
// in case not specified use defaults (e.g. GET)
if (request == null) {
request = new ChannelsRequest();
}
((AbstractHTTPResponse) defaultResponse).respond(
context,
res,
null,
queryManager.getChannels(request),
anyFormatter);
request,
queryManager.getChannels(request));
}
@RequestMapping(
@@ -173,12 +162,16 @@ public class QueryRestController implements ApplicationContextAware {
produces = {MediaType.APPLICATION_JSON_VALUE})
public void getChannelConfigurations(@RequestBody(required = false) ChannelConfigurationsRequest request,
final HttpServletResponse res) throws Throwable {
// in case not specified use defaults (e.g. GET)
if (request == null) {
request = new ChannelConfigurationsRequest();
}
((AbstractHTTPResponse) defaultResponse).respond(
context,
res,
null,
queryManager.getChannelConfigurations(request),
historicConfigFormatter);
request,
queryManager.getChannelConfigurations(request));
}
@RequestMapping(
@@ -204,9 +197,8 @@ public class QueryRestController implements ApplicationContextAware {
((AbstractHTTPResponse) defaultResponse).respond(
context,
res,
null,
config,
historicConfigFormatter);
channelName,
config);
} else {
res.setStatus(HttpStatus.NOT_FOUND.value());
}
@@ -249,8 +241,7 @@ public class QueryRestController implements ApplicationContextAware {
context,
res,
query,
result,
configQueryFormatter);
result);
} else {
final String message =
String.format(
@@ -395,8 +386,7 @@ public class QueryRestController implements ApplicationContextAware {
context,
res,
queries,
result,
daqQueriesFormatter);
result);
} else {
final String message =
String.format(
@@ -435,7 +425,9 @@ public class QueryRestController implements ApplicationContextAware {
method = {RequestMethod.GET},
produces = {MediaType.APPLICATION_JSON_VALUE})
public @ResponseBody List<ResponseFormat> getResponseFormatValues() {
return Lists.newArrayList(ResponseFormat.values());
return Arrays.stream(ResponseFormat.values())
.filter(format -> format.isPublish())
.collect(Collectors.toList());
}
/**
@@ -452,7 +444,7 @@ public class QueryRestController implements ApplicationContextAware {
.filter(queryField -> queryField.isPublish())
.collect(Collectors.toList());
}
/**
* Returns the current list of {@link EventField}s available.
*
@@ -467,7 +459,7 @@ public class QueryRestController implements ApplicationContextAware {
.filter(queryField -> queryField.isPublish())
.collect(Collectors.toList());
}
/**
* Returns the current list of {@link EventField}s available.
*

View File

@@ -64,11 +64,6 @@ public class QueryManagerImpl implements QueryManager, ApplicationContextAware {
@Override
public Stream<ChannelsResponse> getChannels(ChannelsRequest request) {
// in case not specified use defaults (e.g. GET)
if (request == null) {
request = new ChannelsRequest();
}
return channelsCache.getChannels(request);
}
@@ -79,11 +74,6 @@ public class QueryManagerImpl implements QueryManager, ApplicationContextAware {
@Override
public Stream<ChannelConfigurationsResponse> getChannelConfigurations(ChannelConfigurationsRequest request) {
// in case not specified use defaults (e.g. GET)
if (request == null) {
request = new ChannelConfigurationsRequest();
}
return channelsCache.getChannelConfigurations(request);
}

View File

@@ -44,8 +44,8 @@ public abstract class AbstractHTTPResponse extends ResponseImpl {
public abstract <R> void respond(
final ApplicationContext context,
final HttpServletResponse httpResponse,
final Object query, final R result,
final ResponseFormatter<R> formatter)
final Object query,
final R result)
throws Exception;
/**

View File

@@ -7,6 +7,7 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo;
import ch.psi.daq.queryrest.response.csv.CSVHTTPResponse;
import ch.psi.daq.queryrest.response.json.JSONHTTPResponse;
import ch.psi.daq.queryrest.response.msgpack.MsgPackHTTPResponse;
import ch.psi.daq.queryrest.response.raw.event.RawEventHTTPResponse;
import ch.psi.daq.queryrest.response.smile.SmileHTTPResponse;
@JsonTypeInfo(
@@ -17,7 +18,8 @@ import ch.psi.daq.queryrest.response.smile.SmileHTTPResponse;
@Type(value = JSONHTTPResponse.class, name = JSONHTTPResponse.FORMAT),
@Type(value = MsgPackHTTPResponse.class, name = MsgPackHTTPResponse.FORMAT),
@Type(value = SmileHTTPResponse.class, name = SmileHTTPResponse.FORMAT),
@Type(value = CSVHTTPResponse.class, name = CSVHTTPResponse.FORMAT)
@Type(value = CSVHTTPResponse.class, name = CSVHTTPResponse.FORMAT),
@Type(value = RawEventHTTPResponse.class, name = RawEventHTTPResponse.FORMAT)
})
// see: http://stackoverflow.com/questions/24631923/alternative-to-jackson-jsonsubtypes
public abstract class PolymorphicResponseMixIn {

View File

@@ -15,13 +15,11 @@ public interface ResponseStreamWriter {
* @param result The result
* @param out The OutputStream
* @param response The Response
* @param formatter The ResponseFormatter
* @throws Exception thrown if writing to the output stream fails
*/
public <R> void respond(
final Object query,
final R result,
final OutputStream out,
final AbstractHTTPResponse response,
final ResponseFormatter<R> formatter) throws Exception;
final AbstractHTTPResponse response) throws Exception;
}

View File

@@ -18,7 +18,6 @@ import ch.psi.daq.domain.query.operation.Compression;
import ch.psi.daq.domain.query.operation.EventField;
import ch.psi.daq.domain.query.response.ResponseFormat;
import ch.psi.daq.queryrest.response.AbstractHTTPResponse;
import ch.psi.daq.queryrest.response.ResponseFormatter;
public class CSVHTTPResponse extends AbstractHTTPResponse {
private static final Logger LOGGER = LoggerFactory.getLogger(CSVHTTPResponse.class);
@@ -65,13 +64,12 @@ public class CSVHTTPResponse extends AbstractHTTPResponse {
final ApplicationContext context,
final HttpServletResponse response,
final Object query,
final R result,
final ResponseFormatter<R> formatter) throws Exception {
final R result) throws Exception {
final OutputStream out = handleCompressionAndResponseHeaders(response, CONTENT_TYPE);
final CSVResponseStreamWriter streamWriter = context.getBean(CSVResponseStreamWriter.class);
// write the response back to the client using java 8 streams
streamWriter.respond(query, result, out, this, formatter);
streamWriter.respond(query, result, out, this);
}
}

View File

@@ -31,7 +31,7 @@ import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import ch.psi.daq.common.stream.StreamIterable;
import ch.psi.daq.common.stream.match.MapCreator;
import ch.psi.daq.common.stream.match.ReusingMapCreator;
import ch.psi.daq.common.stream.match.MapFiller;
import ch.psi.daq.common.stream.match.Padder;
import ch.psi.daq.common.stream.match.StreamMatcher;
@@ -54,7 +54,6 @@ import ch.psi.daq.domain.query.operation.Extrema;
import ch.psi.daq.domain.query.response.Response;
import ch.psi.daq.queryrest.config.QueryRestConfig;
import ch.psi.daq.queryrest.response.AbstractHTTPResponse;
import ch.psi.daq.queryrest.response.ResponseFormatter;
import ch.psi.daq.queryrest.response.ResponseStreamWriter;
import ch.psi.daq.queryrest.response.formatter.DAQQueriesResponseFormatter;
@@ -89,8 +88,7 @@ public class CSVResponseStreamWriter implements ResponseStreamWriter, Applicatio
final Object query,
final R result,
final OutputStream out,
final AbstractHTTPResponse response,
final ResponseFormatter<R> formatter) throws Exception {
final AbstractHTTPResponse response) throws Exception {
if (query instanceof DAQQueries) {
respond((List<Entry<DAQQueryElement, Stream<Quadruple<BackendQuery, ChannelName, ?, ?>>>>) result,
out, response);
@@ -162,7 +160,7 @@ public class CSVResponseStreamWriter implements ResponseStreamWriter, Applicatio
new StreamMatcher<>(
DAQQueriesResponseFormatter.KEY_PROVIDER,
DAQQueriesResponseFormatter.MATCHER_PROVIDER,
new MapCreator<>(),
new ReusingMapCreator<>(),
new MapFiller<>(),
null,
padder,

View File

@@ -28,7 +28,7 @@ import com.fasterxml.jackson.databind.ObjectWriter;
import com.fasterxml.jackson.databind.ser.impl.SimpleBeanPropertyFilter;
import com.fasterxml.jackson.databind.ser.impl.SimpleFilterProvider;
import ch.psi.daq.common.stream.match.ListCreator;
import ch.psi.daq.common.stream.match.ReuseingListCreator;
import ch.psi.daq.common.stream.match.ListFiller;
import ch.psi.daq.common.stream.match.Padder;
import ch.psi.daq.common.stream.match.StreamMatcher;
@@ -247,7 +247,7 @@ public class DAQQueriesResponseFormatter
new StreamMatcher<>(
KEY_PROVIDER,
matchProvider,
new ListCreator<ChannelName, DataEvent>(),
new ReuseingListCreator<ChannelName, DataEvent>(),
new ListFiller<ChannelName, DataEvent>(),
new BinnedValueCombiner(binningStrategy),
padder,

View File

@@ -1,48 +1,107 @@
package ch.psi.daq.queryrest.response.json;
import java.io.OutputStream;
import java.util.List;
import java.util.Map.Entry;
import java.util.stream.Stream;
import javax.servlet.ServletResponse;
import org.apache.commons.lang3.tuple.Triple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import ch.psi.daq.common.tuple.Quadruple;
import ch.psi.daq.domain.backend.Backend;
import ch.psi.daq.domain.json.ChannelName;
import ch.psi.daq.domain.query.DAQConfigQuery;
import ch.psi.daq.domain.query.DAQConfigQueryElement;
import ch.psi.daq.domain.query.DAQQueries;
import ch.psi.daq.domain.query.DAQQueryElement;
import ch.psi.daq.domain.query.backend.BackendQuery;
import ch.psi.daq.domain.query.channels.ChannelConfigurationsRequest;
import ch.psi.daq.domain.query.channels.ChannelsRequest;
import ch.psi.daq.queryrest.config.QueryRestConfig;
import ch.psi.daq.queryrest.response.AbstractHTTPResponse;
import ch.psi.daq.queryrest.response.ResponseFormatter;
import ch.psi.daq.queryrest.response.ResponseStreamWriter;
import ch.psi.daq.queryrest.response.formatter.AnyResponseFormatter;
import ch.psi.daq.queryrest.response.formatter.DAQConfigQueryResponseFormatter;
import ch.psi.daq.queryrest.response.formatter.DAQQueriesResponseFormatter;
/**
* Takes a Java 8 stream and writes it to the output stream provided by the {@link ServletResponse}
* of the current request.
*/
public abstract class AbstractResponseStreamWriter implements ResponseStreamWriter {
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractResponseStreamWriter.class);
protected void init(final Backend backend) {}
private Backend backend;
protected void init(final Backend backend) {
this.backend = backend;
}
@SuppressWarnings("unchecked")
@Override
public <R> void respond(
final Object query,
final R result,
final OutputStream out,
final AbstractHTTPResponse response,
final ResponseFormatter<R> formatter) throws Exception {
formatter.format(getJsonFactory(), getObjectMapper(), result, out, response);
final AbstractHTTPResponse response) throws Exception {
if (query instanceof DAQQueries) {
backend.getApplicationContext()
.getBean(QueryRestConfig.BEAN_NAME_FORMATTER_DAQ_QUERIES, DAQQueriesResponseFormatter.class)
.format(
getJsonFactory(),
getObjectMapper(),
(List<Entry<DAQQueryElement, Stream<Quadruple<BackendQuery, ChannelName, ?, ?>>>>) result,
out,
response);
} else if (query instanceof DAQConfigQuery) {
backend.getApplicationContext()
.getBean(QueryRestConfig.BEAN_NAME_FORMATTER_DAQ_CONFIG_QUERY, DAQConfigQueryResponseFormatter.class)
.format(
getJsonFactory(),
getObjectMapper(),
(Entry<DAQConfigQueryElement, Stream<Triple<BackendQuery, ChannelName, ?>>>) result,
out,
response);
} else if (query instanceof ChannelConfigurationsRequest) {
backend.getApplicationContext()
.getBean(QueryRestConfig.BEAN_NAME_FORMATTER_HISTORIC_CHANNEL_CONFIGURATION, AnyResponseFormatter.class)
.format(
getJsonFactory(),
getObjectMapper(),
result,
out,
response);
} else if (query instanceof ChannelName) {
backend.getApplicationContext()
.getBean(QueryRestConfig.BEAN_NAME_FORMATTER_HISTORIC_CHANNEL_CONFIGURATION, AnyResponseFormatter.class)
.format(
getJsonFactory(),
getObjectMapper(),
result,
out,
response);
// if (query instanceof DAQQueries) {
// DAQQueriesResponseFormatter.respond(getJsonFactory(), getObjectMapper(),
// defaultEventResponseAggregations,
// (List<Entry<DAQQueryElement, Stream<Triple<BackendQuery, ChannelName, ?>>>>) result,
// out, response);
// } else if (query instanceof DAQConfigQuery) {
// DAQChannelConfigurationQueryResponseWriter.respond(getJsonFactory(), getObjectMapper(),
// (Entry<DAQConfigQueryElement, Stream<Triple<BackendQuery, ChannelName, ?>>>) result,
// out, response);
// } else {
// AnyResponseWriter.respond(getJsonFactory(), getObjectMapper(), result, out, response,
// defaultEventFields,
// defaultConfigFields);
// }
} else if (query instanceof ChannelsRequest) {
backend.getApplicationContext()
.getBean(QueryRestConfig.BEAN_NAME_FORMATTER_ANY, AnyResponseFormatter.class)
.format(
getJsonFactory(),
getObjectMapper(),
result,
out,
response);
} else {
final String message = String.format("'%s' has no response type for '%s'.", this.getClass(), query);
LOGGER.error(message);
throw new IllegalStateException(message);
}
}
protected abstract JsonFactory getJsonFactory();

View File

@@ -16,7 +16,6 @@ import ch.psi.daq.domain.query.operation.Compression;
import ch.psi.daq.domain.query.operation.EventField;
import ch.psi.daq.domain.query.response.ResponseFormat;
import ch.psi.daq.queryrest.response.AbstractHTTPResponse;
import ch.psi.daq.queryrest.response.ResponseFormatter;
import ch.psi.daq.queryrest.response.ResponseStreamWriter;
public class JSONHTTPResponse extends AbstractHTTPResponse {
@@ -60,14 +59,13 @@ public class JSONHTTPResponse extends AbstractHTTPResponse {
final ApplicationContext context,
final HttpServletResponse response,
final Object query,
final R result,
final ResponseFormatter<R> formatter) throws Exception {
final R result) throws Exception {
final OutputStream out = handleCompressionAndResponseHeaders(response, CONTENT_TYPE);
final ResponseStreamWriter streamWriter = context.getBean(JSONResponseStreamWriter.class);
// write the response back to the client using java 8 streams
streamWriter.respond(query, result, out, this, formatter);
streamWriter.respond(query, result, out, this);
}
public static void defaultQueryValidation(final Object queryObj) {

View File

@@ -10,7 +10,6 @@ import org.springframework.http.MediaType;
import ch.psi.daq.domain.query.operation.Compression;
import ch.psi.daq.domain.query.response.ResponseFormat;
import ch.psi.daq.queryrest.response.AbstractHTTPResponse;
import ch.psi.daq.queryrest.response.ResponseFormatter;
import ch.psi.daq.queryrest.response.ResponseStreamWriter;
import ch.psi.daq.queryrest.response.json.JSONHTTPResponse;
@@ -37,13 +36,12 @@ public class MsgPackHTTPResponse extends AbstractHTTPResponse {
final ApplicationContext context,
final HttpServletResponse response,
final Object query,
final R result,
final ResponseFormatter<R> formatter) throws Exception {
final R result) throws Exception {
final OutputStream out = handleCompressionAndResponseHeaders(response, CONTENT_TYPE);
final ResponseStreamWriter streamWriter = context.getBean(MsgPackResponseStreamWriter.class);
// write the response back to the client using java 8 streams
streamWriter.respond(query, result, out, this, formatter);
streamWriter.respond(query, result, out, this);
}
}

View File

@@ -1,8 +1,10 @@
package ch.psi.daq.queryrest.response.raq.anyread;
package ch.psi.daq.queryrest.response.raw;
import java.io.BufferedOutputStream;
import java.io.OutputStream;
import java.nio.ByteOrder;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
@@ -14,32 +16,38 @@ import java.util.function.Function;
import java.util.function.ToLongFunction;
import java.util.stream.Stream;
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Sets;
import ch.psi.bsread.compression.Compression;
import ch.psi.daq.common.stream.match.KeepAsIsPadder;
import ch.psi.daq.common.stream.match.KeepPreviousPadder;
import ch.psi.daq.common.stream.match.MapCreator;
import ch.psi.daq.common.stream.match.MapFiller;
import ch.psi.daq.common.stream.match.Padder;
import ch.psi.daq.common.stream.match.RecreateMapCreator;
import ch.psi.daq.common.stream.match.StreamMatcher;
import ch.psi.daq.common.time.TimeUtils;
import ch.psi.daq.common.tuple.Quadruple;
import ch.psi.daq.domain.DataEvent;
import ch.psi.daq.domain.StreamEvent;
import ch.psi.daq.domain.events.ChannelConfiguration;
import ch.psi.daq.domain.json.ChannelName;
import ch.psi.daq.domain.query.DAQQueryElement;
import ch.psi.daq.domain.query.backend.BackendQuery;
import ch.psi.daq.domain.query.mapping.Mapping;
import ch.psi.daq.queryrest.response.AbstractHTTPResponse;
import ch.psi.daq.queryrest.response.ResponseFormatter;
import ch.psi.daq.queryrest.response.csv.CSVResponseStreamWriter;
import ch.psi.daq.queryrest.response.formatter.DAQQueriesResponseFormatter;
public class AnyReadResponseFormatter {
private static final Logger LOGGER = LoggerFactory.getLogger(AnyReadResponseFormatter.class);
public abstract class AbstractRawResponseFormatter implements ResponseFormatter<List<Entry<DAQQueryElement, Stream<Quadruple<BackendQuery, ChannelName, ?, ?>>>>> {
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractRawResponseFormatter.class);
public static final Function<ChannelConfiguration, ChannelName> CONFIG_KEY_PROVIDER =
(config) -> new ChannelName(config.getChannel(),
@@ -49,9 +57,10 @@ public class AnyReadResponseFormatter {
public static final ToLongFunction<ChannelConfiguration> CONFIG_MATCHER_PROVIDER =
(config) -> config.getGlobalMillis()
/ TimeUtils.MILLIS_PER_PULSE;
@SuppressWarnings("unchecked")
public void format(
final JsonFactory factory,
final ObjectMapper mapper,
final List<Entry<DAQQueryElement, Stream<Quadruple<BackendQuery, ChannelName, ?, ?>>>> results,
final OutputStream out,
@@ -81,7 +90,7 @@ public class AnyReadResponseFormatter {
});
if (exception.get() != null) {
if (exception.get() == null) {
try {
if (response.useTableFormat(daqQuery)) {
final Mapping mapping =
@@ -142,7 +151,7 @@ public class AnyReadResponseFormatter {
new StreamMatcher<>(
CONFIG_KEY_PROVIDER,
CONFIG_MATCHER_PROVIDER,
new MapCreator<>(),
new RecreateMapCreator<>(),
new MapFiller<>(),
null,
configPadder,
@@ -154,7 +163,7 @@ public class AnyReadResponseFormatter {
new StreamMatcher<>(
DAQQueriesResponseFormatter.KEY_PROVIDER,
DAQQueriesResponseFormatter.MATCHER_PROVIDER,
new MapCreator<>(),
new RecreateMapCreator<>(),
new MapFiller<>(),
null,
eventPadder,
@@ -182,7 +191,7 @@ public class AnyReadResponseFormatter {
new StreamMatcher<>(
CONFIG_KEY_PROVIDER,
CONFIG_MATCHER_PROVIDER,
new MapCreator<>(),
new RecreateMapCreator<>(),
new MapFiller<>(),
null,
new KeepPreviousPadder<>(),
@@ -194,7 +203,7 @@ public class AnyReadResponseFormatter {
new StreamMatcher<>(
DAQQueriesResponseFormatter.KEY_PROVIDER,
DAQQueriesResponseFormatter.MATCHER_PROVIDER,
new MapCreator<>(),
new RecreateMapCreator<>(),
new MapFiller<>(),
null,
eventPadder,
@@ -215,10 +224,150 @@ public class AnyReadResponseFormatter {
final Iterator<Map<ChannelName, ChannelConfiguration>> configStreamsMatchIter,
final Iterator<Map<ChannelName, DataEvent>> eventStreamsMatchIter) {
// TODO: implement whatefer protocol...
throw new UnsupportedOperationException("Proteocol not yet implemented.");
//Channel/DataEvent iterator
// -> hasNextConfigs() -> write config(s) - based on time of next event is after latest next-config
// -> hasNextEvents() -> write event(s) else stop
final Iterator<Pair<Map<ChannelName, ChannelConfiguration>, Map<ChannelName, DataEvent>>> iter =
new ConfigEventIterator(configStreamsMatchIter, eventStreamsMatchIter);
final Map<ChannelName, DataEvent> latestEvents = new HashMap<>();
final ConfigChangeDetector coonfigChangeDetector = new ConfigChangeDetector();
while (iter.hasNext()) {
final Pair<Map<ChannelName, ChannelConfiguration>, Map<ChannelName, DataEvent>> currentPair = iter.next();
latestEvents.putAll(currentPair.getRight());
// check if changed
if (coonfigChangeDetector.hasConfigChanged(currentPair)) {
writeConfigs(os, latestEvents, currentPair);
}
writeEvents(os, currentPair);
}
}
protected abstract void writeConfigs(final OutputStream os, final Map<ChannelName, DataEvent> latestEvents,
final Pair<Map<ChannelName, ChannelConfiguration>, Map<ChannelName, DataEvent>> pair);
protected abstract void writeEvents(final OutputStream os,
final Pair<Map<ChannelName, ChannelConfiguration>, Map<ChannelName, DataEvent>> pair);
protected Compression getCompression(final DataEvent event) {
if (event instanceof StreamEvent) {
return ((StreamEvent) event).getCompression().getCompression();
} else {
if (event != null && event.isArray()) {
return Compression.bitshuffle_lz4;
} else {
return Compression.none;
}
}
}
protected ByteOrder getEncoding(final DataEvent event) {
if (event instanceof StreamEvent) {
return ((StreamEvent) event).getValueByteOrder();
} else {
// use native as it is most likely fastest
return ByteOrder.nativeOrder();
}
}
public class ConfigEventIterator
implements Iterator<Pair<Map<ChannelName, ChannelConfiguration>, Map<ChannelName, DataEvent>>> {
private final Iterator<Map<ChannelName, ChannelConfiguration>> configIter;
private Map<ChannelName, ChannelConfiguration> nextNextConfigMap;
private Entry<ChannelName, ChannelConfiguration> nextNextConfig;
private final Iterator<Map<ChannelName, DataEvent>> eventIter;
private MutablePair<Map<ChannelName, ChannelConfiguration>, Map<ChannelName, DataEvent>> next;
public ConfigEventIterator(
final Iterator<Map<ChannelName, ChannelConfiguration>> configIter,
final Iterator<Map<ChannelName, DataEvent>> eventIter) {
this.configIter = configIter;
this.eventIter = eventIter;
if (configIter.hasNext() && eventIter.hasNext()) {
next = MutablePair.of(configIter.next(), eventIter.next());
}
extractNextNextConfig();
}
@Override
public boolean hasNext() {
return next != null;
}
@Override
public Pair<Map<ChannelName, ChannelConfiguration>, Map<ChannelName, DataEvent>> next() {
final Pair<Map<ChannelName, ChannelConfiguration>, Map<ChannelName, DataEvent>> next = this.next;
extractNext();
return next;
}
private void extractNext() {
if (eventIter.hasNext()) {
next = MutablePair.of(next.getKey(), eventIter.next());
if (nextNextConfig != null) {
final DataEvent event = next.getValue().get(nextNextConfig.getKey());
if (event != null
&& TimeUtils.isAfterEquals(event.getGlobalTime(), nextNextConfig.getValue().getGlobalTime())) {
next.setLeft(nextNextConfigMap);
extractNextNextConfig();
}
}
} else {
next = null;
}
}
private void extractNextNextConfig() {
if (configIter.hasNext()) {
nextNextConfigMap = configIter.next();
ChannelConfiguration tmp = nextNextConfigMap.values()
.stream()
.sorted(ChannelConfiguration.COMPARATOR.reversed())
.findFirst()
.orElse(null);
if (tmp != null) {
nextNextConfig = Pair.of(new ChannelName(tmp.getChannel(), tmp.getBackend()), tmp);
} else {
// should not happen
nextNextConfigMap = null;
nextNextConfig = null;
}
} else {
nextNextConfigMap = null;
nextNextConfig = null;
}
}
}
public class ConfigChangeDetector {
private Map<ChannelName, ChannelConfiguration> currentConfigs;
private final Map<ChannelName, Compression> latestCompressions = new HashMap<>();
private final Map<ChannelName, ByteOrder> latestEncoding = new HashMap<>();
public boolean hasConfigChanged(
final Pair<Map<ChannelName, ChannelConfiguration>, Map<ChannelName, DataEvent>> pair) {
boolean hasChanged = currentConfigs == null || currentConfigs != pair.getKey();
Compression compression;
ByteOrder encoding;
for (final Entry<ChannelName, DataEvent> entry : pair.getValue().entrySet()) {
compression = getCompression(entry.getValue());
if (!compression.equals(latestCompressions.put(entry.getKey(), compression))) {
hasChanged = true;
}
encoding = getEncoding(entry.getValue());
if (!encoding.equals(latestEncoding.put(entry.getKey(), encoding))) {
hasChanged = true;
}
}
currentConfigs = pair.getKey();
return hasChanged;
}
}
}

View File

@@ -0,0 +1,6 @@
package ch.psi.daq.queryrest.response.raw;
public interface IdProvider {
short getId();
}

View File

@@ -0,0 +1,8 @@
package ch.psi.daq.queryrest.response.raw;
public interface Ids {
public static final short DATA_EVENTS = 0;
public static final short DATA_EVENTS_HEADER = 1;
public static final short BLOB_DATA_EVENT_STANDARD = 0;
}

View File

@@ -0,0 +1,47 @@
package ch.psi.daq.queryrest.response.raw;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import ch.psi.daq.queryrest.response.raw.event.serialize.ConfigValuesHolder;
import ch.psi.daq.queryrest.response.raw.serialize.RawDeserializer;
public class RawDeserializerProvider<C, V> {
private Map<Short, RawDeserializer<C, V>> deserializers = new HashMap<>();
public boolean hasDeserializer(short id) {
return deserializers.containsKey(id);
}
public boolean hasAllDeserializer(final Collection<Short> ids) {
return deserializers.keySet().containsAll(ids);
}
public Set<Short> getDeserializerIds() {
return deserializers.keySet();
}
public RawDeserializer<C, V> getDeserializer(short id) {
return deserializers.get(id);
}
public void init(final ConfigValuesHolder<C, V> config) {
for (final RawDeserializer<C, V> deserializer : deserializers.values()) {
deserializer.init(config);
}
}
public void setDeserializers(Map<Short, RawDeserializer<C, V>> deserializers) {
this.deserializers = deserializers;
}
public Map<Short, RawDeserializer<C, V>> getDeserializers() {
return deserializers;
}
public void add(RawDeserializer<C, V> deserializer) {
this.deserializers.put(deserializer.getId(), deserializer);
}
}

View File

@@ -0,0 +1,47 @@
package ch.psi.daq.queryrest.response.raw.event;
import java.io.OutputStream;
import javax.servlet.http.HttpServletResponse;
import org.springframework.context.ApplicationContext;
import org.springframework.http.MediaType;
import ch.psi.daq.domain.query.operation.Compression;
import ch.psi.daq.domain.query.response.ResponseFormat;
import ch.psi.daq.queryrest.response.AbstractHTTPResponse;
import ch.psi.daq.queryrest.response.ResponseStreamWriter;
import ch.psi.daq.queryrest.response.json.JSONHTTPResponse;
public class RawEventHTTPResponse extends AbstractHTTPResponse {
public static final String FORMAT = "rawevent";
public static final String CONTENT_TYPE = MediaType.APPLICATION_OCTET_STREAM_VALUE;
public RawEventHTTPResponse() {
super(ResponseFormat.RAWEVENT);
}
public RawEventHTTPResponse(final Compression compression) {
this();
setCompression(compression);
}
@Override
public void validateQuery(final Object query) {
JSONHTTPResponse.defaultQueryValidation(query);
}
@Override
public <R> void respond(
final ApplicationContext context,
final HttpServletResponse response,
final Object query,
final R result) throws Exception {
final OutputStream out = handleCompressionAndResponseHeaders(response, CONTENT_TYPE);
final ResponseStreamWriter streamWriter = context.getBean(RawEventResponseStreamWriter.class);
// write the response back to the client using java 8 streams
streamWriter.respond(query, result, out, this);
}
}

View File

@@ -0,0 +1,84 @@
package ch.psi.daq.queryrest.response.raw.event;
import java.io.OutputStream;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.commons.lang3.tuple.Pair;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import com.fasterxml.jackson.databind.ObjectMapper;
import ch.psi.bsread.common.allocator.ByteBufferAllocator;
import ch.psi.bsread.message.Type;
import ch.psi.daq.common.time.TimeUtils;
import ch.psi.daq.domain.DataEvent;
import ch.psi.daq.domain.config.DomainConfig;
import ch.psi.daq.domain.events.ChannelConfiguration;
import ch.psi.daq.domain.json.ChannelName;
import ch.psi.daq.queryrest.response.raw.AbstractRawResponseFormatter;
import ch.psi.daq.queryrest.response.raw.event.serialize.EventConfig;
import ch.psi.daq.queryrest.response.raw.event.serialize.EventDeSerializer;
import ch.psi.daq.queryrest.response.raw.event.serialize.EventHeader;
import ch.psi.daq.queryrest.response.raw.event.serialize.EventHeaderDeSerializer;
public class RawEventResponseFormatter extends AbstractRawResponseFormatter implements ApplicationContextAware {
private ObjectMapper mapper;
private EventDeSerializer eventsDeSerializer;
@Override
public void setApplicationContext(ApplicationContext context) throws BeansException {
mapper = context.getBean(DomainConfig.BEAN_NAME_OBJECT_MAPPER, ObjectMapper.class);
}
@Override
protected void writeConfigs(final OutputStream os, final Map<ChannelName, DataEvent> latestEvents,
final Pair<Map<ChannelName, ChannelConfiguration>, Map<ChannelName, DataEvent>> pair) {
final EventHeaderDeSerializer eventHeaderDeSerializer =
new EventHeaderDeSerializer(ByteBufferAllocator.DEFAULT_ALLOCATOR, mapper);
eventHeaderDeSerializer.configure(null, pair.getKey());
eventsDeSerializer = new EventDeSerializer(ByteBufferAllocator.DEFAULT_ALLOCATOR);
final EventHeader eventHeader = new EventHeader();
ChannelConfiguration config;
DataEvent latestEvent;
for (final Entry<ChannelName, ChannelConfiguration> entry : pair.getKey().entrySet()) {
config = entry.getValue();
latestEvent = latestEvents.get(entry.getKey());
final EventConfig eventConfig = new EventConfig(
EventConfig.BLOB_ID_UNDEFINED,
config.getChannel(),
config.getBackend().getName(),
Type.newInstance(config.getType()),
config.getShape(),
config.getModulo(),
config.getOffset(),
EventConfig.getEncoding(getEncoding(latestEvent)),
getCompression(latestEvent),
config.getPulseId(),
TimeUtils.getUnscaledTime(config.getGlobalTime()),
config.getSource(),
config.getPrecision(),
config.getUnit(),
config.getDescription());
eventHeader.addChannel(eventConfig);
}
// eventHeader.addDataConverterId(eventHeaderDeSerializer.getId());
eventsDeSerializer.configure(eventHeader, pair.getKey());
eventHeaderDeSerializer.serialize(os, eventHeader);
}
@Override
protected void writeEvents(final OutputStream os,
final Pair<Map<ChannelName, ChannelConfiguration>, Map<ChannelName, DataEvent>> pair) {
eventsDeSerializer.serialize(os, pair.getRight());
}
}

View File

@@ -0,0 +1,64 @@
package ch.psi.daq.queryrest.response.raw.event;
import java.io.OutputStream;
import java.util.List;
import java.util.Map.Entry;
import java.util.stream.Stream;
import javax.servlet.ServletResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import ch.psi.daq.common.tuple.Quadruple;
import ch.psi.daq.domain.backend.Backend;
import ch.psi.daq.domain.config.DomainConfig;
import ch.psi.daq.domain.json.ChannelName;
import ch.psi.daq.domain.query.DAQQueries;
import ch.psi.daq.domain.query.DAQQueryElement;
import ch.psi.daq.domain.query.backend.BackendQuery;
import ch.psi.daq.queryrest.config.QueryRestConfig;
import ch.psi.daq.queryrest.response.AbstractHTTPResponse;
import ch.psi.daq.queryrest.response.ResponseStreamWriter;
/**
* Takes a Java 8 stream and writes it to the output stream provided by the {@link ServletResponse}
* of the current request.
*/
public class RawEventResponseStreamWriter implements ResponseStreamWriter, ApplicationContextAware {
private static final Logger LOGGER = LoggerFactory.getLogger(RawEventResponseStreamWriter.class);
private ApplicationContext context;
@Override
public void setApplicationContext(ApplicationContext context) throws BeansException {
final Backend backend = context.getBean(DomainConfig.BEAN_NAME_BACKEND_DEFAULT, Backend.class);
this.context = backend.getApplicationContext();
}
@SuppressWarnings("unchecked")
@Override
public <R> void respond(
final Object query,
final R result,
final OutputStream out,
final AbstractHTTPResponse response) throws Exception {
if (query instanceof DAQQueries) {
context
.getBean(QueryRestConfig.BEAN_NAME_FORMATTER_RAW_EVENT, RawEventResponseFormatter.class)
.format(
null,
null,
(List<Entry<DAQQueryElement, Stream<Quadruple<BackendQuery, ChannelName, ?, ?>>>>) result,
out,
response);
} else {
final String message = String.format("'%s' has no response type for '%s'.", query);
LOGGER.error(message);
throw new IllegalStateException(message);
}
}
}

View File

@@ -0,0 +1,8 @@
package ch.psi.daq.queryrest.response.raw.event.serialize;
public interface ConfigHolder<C> {
C getConfig();
void setConfig(C config);
}

View File

@@ -0,0 +1,12 @@
package ch.psi.daq.queryrest.response.raw.event.serialize;
import java.util.Map;
import ch.psi.daq.domain.json.BackendChannel;
public interface ConfigValuesHolder<C, V> extends ConfigHolder<C> {
Map<BackendChannel, V> getValues();
void add(BackendChannel channel, V value);
}

View File

@@ -0,0 +1,236 @@
package ch.psi.daq.queryrest.response.raw.event.serialize;
import java.util.Map;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonInclude.Include;
import ch.psi.bsread.compression.Compression;
import ch.psi.bsread.message.Type;
@JsonInclude(Include.NON_DEFAULT)
public class EventConfig extends ch.psi.bsread.message.ChannelConfig {
private static final long serialVersionUID = -8184893719772535221L;
public static final short BLOB_ID_UNDEFINED = Short.MIN_VALUE;
private short blobId = BLOB_ID_UNDEFINED;
private String backend;
private long pulseId;
private long globalTime;
private String source;
private short precision;
private String unit;
private String description;
private Map<Object, Object> valueMapping;
public EventConfig() {}
public EventConfig(short blobId, String name, String backend, Type type) {
super(name, type);
this.blobId = blobId;
this.backend = backend;
}
public EventConfig(short blobId, String name, String backend, Type type, int modulo, int offset) {
this(blobId, name, backend, type);
this.setModulo(modulo);
this.setOffset(offset);
}
public EventConfig(short blobId, String name, String backend, Type type, int[] shape, int modulo, int offset) {
this(blobId, name, backend, type, modulo, offset);
this.setShape(shape);
}
public EventConfig(short blobId, String name, String backend, Type type, int[] shape, int modulo, int offset,
String encoding) {
this(blobId, name, backend, type, shape, modulo, offset);
this.setEncoding(encoding);
}
public EventConfig(short blobId, String name, String backend, Type type, int[] shape, int modulo, int offset, String encoding,
Compression compression) {
this(blobId, name, backend, type, shape, modulo, offset, encoding);
this.setCompression(compression);
}
public EventConfig(short blobId, String name, String backend, Type type, int[] shape, int modulo, int offset, String encoding,
Compression compression, long pulseId, long globalTime, String source, short precision, String unit,
String description) {
this(blobId, name, backend, type, shape, modulo, offset, encoding, compression);
setPulseId(pulseId);
setGlobalTime(globalTime);
setSource(source);
setPrecision(precision);
setUnit(unit);
setDescription(description);
}
public EventConfig(EventConfig other) {
super(other);
this.blobId = other.blobId;
this.backend = other.backend;
this.pulseId = other.pulseId;
this.globalTime = other.globalTime;
this.source = other.source;
this.precision = other.precision;
this.unit = other.unit;
this.description = other.description;
this.valueMapping = other.valueMapping;
}
public void copy(EventConfig other) {
super.copy(other);
this.blobId = other.blobId;
this.backend = other.backend;
this.pulseId = other.pulseId;
this.globalTime = other.globalTime;
this.source = other.source;
this.precision = other.precision;
this.unit = other.unit;
this.description = other.description;
this.valueMapping = other.valueMapping;
}
public short getBlobId() {
return blobId;
}
public void setBlobId(short blobId) {
this.blobId = blobId;
}
public String getBackend() {
return backend;
}
public void setBackend(String backend) {
this.backend = backend;
}
public long getPulseId() {
return pulseId;
}
public void setPulseId(long pulseId) {
this.pulseId = pulseId;
}
public long getGlobalTime() {
return globalTime;
}
public void setGlobalTime(long globalTime) {
this.globalTime = globalTime;
}
public String getSource() {
return source;
}
public void setSource(String source) {
this.source = source;
}
public short getPrecision() {
return precision;
}
public void setPrecision(short precision) {
this.precision = precision;
}
public String getUnit() {
return unit;
}
public void setUnit(String unit) {
this.unit = unit;
}
public String getDescription() {
return description;
}
public void setDescription(String description) {
this.description = description;
}
public Map<Object, Object> getValueMapping() {
return valueMapping;
}
public void setValueMapping(Map<Object, Object> valueMapping) {
this.valueMapping = valueMapping;
}
@Override
public int hashCode() {
final int prime = 31;
int result = super.hashCode();
result = prime * result + ((backend == null) ? 0 : backend.hashCode());
result = prime * result + blobId;
result = prime * result + ((description == null) ? 0 : description.hashCode());
result = prime * result + precision;
result = prime * result + (int) (pulseId ^ (pulseId >>> 32));
result = prime * result + ((source == null) ? 0 : source.hashCode());
result = prime * result + (int) (globalTime ^ (globalTime >>> 32));
result = prime * result + ((unit == null) ? 0 : unit.hashCode());
result = prime * result + ((valueMapping == null) ? 0 : valueMapping.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (!super.equals(obj))
return false;
if (getClass() != obj.getClass())
return false;
EventConfig other = (EventConfig) obj;
if (backend == null) {
if (other.backend != null)
return false;
} else if (!backend.equals(other.backend))
return false;
if (blobId != other.blobId)
return false;
if (description == null) {
if (other.description != null)
return false;
} else if (!description.equals(other.description))
return false;
if (precision != other.precision)
return false;
if (pulseId != other.pulseId)
return false;
if (source == null) {
if (other.source != null)
return false;
} else if (!source.equals(other.source))
return false;
if (globalTime != other.globalTime)
return false;
if (unit == null) {
if (other.unit != null)
return false;
} else if (!unit.equals(other.unit))
return false;
if (valueMapping == null) {
if (other.valueMapping != null)
return false;
} else if (!valueMapping.equals(other.valueMapping))
return false;
return true;
}
}

View File

@@ -0,0 +1,181 @@
package ch.psi.daq.queryrest.response.raw.event.serialize;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.function.Function;
import java.util.function.IntFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import ch.psi.bsread.common.allocator.ByteBufferAllocator;
import ch.psi.bsread.common.helper.ByteBufferHelper;
import ch.psi.daq.domain.DataEvent;
import ch.psi.daq.domain.events.ChannelConfiguration;
import ch.psi.daq.domain.json.BackendChannel;
import ch.psi.daq.queryrest.response.raw.Ids;
import ch.psi.daq.queryrest.response.raw.serialize.RawBlobDeserializer;
import ch.psi.daq.queryrest.response.raw.serialize.RawBlobSerializer;
import ch.psi.daq.queryrest.response.raw.serialize.RawDeserializer;
import ch.psi.daq.queryrest.response.raw.serialize.RawSerializer;
public class EventDeSerializer implements RawSerializer<EventHeader, Map<? extends BackendChannel, DataEvent>>,
RawDeserializer<EventHeader, DataEvent> {
private static final Logger LOGGER = LoggerFactory.getLogger(EventDeSerializer.class);
private static final short ID = Ids.DATA_EVENTS;
private static final ByteBuffer ID_BYTES;
static {
ID_BYTES = ByteBufferAllocator.DEFAULT_ALLOCATOR.apply(Short.BYTES)
.order(RawSerializer.DEFAULT_ENCODING)
.putShort(0, ID);
}
private final IntFunction<ByteBuffer> allocator;
private EventHeader header;
private Map<EventConfig, BackendChannel> cache = new HashMap<>();
private Map<EventConfig, RawBlobSerializer<EventConfig, DataEvent>> blobSerializers = new HashMap<>();
private ConfigValuesHolder<EventHeader, DataEvent> valueHolder;
private Function<EventConfig, RawBlobDeserializer<EventConfig, DataEvent>> blobDeserializerCreator;
private Map<EventConfig, RawBlobDeserializer<EventConfig, DataEvent>> blobDeserializers = new HashMap<>();
public EventDeSerializer(final IntFunction<ByteBuffer> allocator) {
this.allocator = allocator;
}
public EventDeSerializer(
final IntFunction<ByteBuffer> allocator,
final Function<EventConfig, RawBlobDeserializer<EventConfig, DataEvent>> blobDeserializerCreator) {
this.allocator = allocator;
this.blobDeserializerCreator = blobDeserializerCreator;
}
@Override
public short getId() {
return ID;
}
@Override
public ByteBuffer getIdBytes() {
return ID_BYTES.duplicate().order(ID_BYTES.order());
}
@Override
public void configure(final EventHeader header,
final Map<? extends BackendChannel, ChannelConfiguration> channelConfigs) {
this.header = header;
cache.clear();
header.addEventSerializerId(getId());
final Map<BackendChannel, EventConfig> eventConfigs = header.getChannelsMapping();
ChannelConfiguration channelConfig;
EventConfig eventConfig;
RawBlobSerializer<EventConfig, DataEvent> serializer;
for (final Entry<BackendChannel, EventConfig> entry : eventConfigs.entrySet()) {
eventConfig = entry.getValue();
channelConfig = channelConfigs.get(entry.getKey());
serializer = new StandardEventDeSerializer(allocator);
serializer.configure(eventConfig, channelConfig);
// add at end since configure can modify eventConfig
cache.put(eventConfig, entry.getKey());
blobSerializers.put(eventConfig, serializer);
}
}
@Override
public void serialize(OutputStream os, Map<? extends BackendChannel, DataEvent> events) {
try {
final Collection<EventConfig> channels = header.getChannels();
final List<ByteBuffer> buffers = new ArrayList<>();
final ByteBuffer idBuf = getIdBytes();
buffers.add(idBuf);
BackendChannel channelName;
DataEvent event;
for (final EventConfig channel : channels) {
channelName = cache.get(channel);
event = events.get(channelName);
final RawBlobSerializer<EventConfig, DataEvent> serializer = blobSerializers.get(channel);
serializer.serialize(event, buffers);
}
long size = 0;
for (final ByteBuffer buffer : buffers) {
size += buffer.remaining();
}
final ByteBuffer sizeBuf = allocator
.apply(Long.BYTES)
.order(RawSerializer.DEFAULT_ENCODING)
.putLong(0, size);
// size (long)
ByteBufferHelper.writeByteBuffer(sizeBuf, os);
for (final ByteBuffer buffer : buffers) {
ByteBufferHelper.writeByteBuffer(buffer, os);
}
} catch (Exception e) {
final String message = String.format("Could not write Events '%s'.", events.keySet());
LOGGER.warn(message, e);
throw new IllegalStateException(message, e);
}
}
@Override
public void init(final ConfigValuesHolder<EventHeader, DataEvent> valueHolder) {
this.header = valueHolder.getConfig();
this.valueHolder = valueHolder;
cache.clear();
final Map<BackendChannel, EventConfig> eventConfigs = header.getChannelsMapping();
EventConfig eventConfig;
for (final Entry<BackendChannel, EventConfig> entry : eventConfigs.entrySet()) {
eventConfig = entry.getValue();
cache.put(eventConfig, entry.getKey());
final RawBlobDeserializer<EventConfig, DataEvent> blobDeserializer =
blobDeserializerCreator.apply(eventConfig);
blobDeserializer.init(eventConfig);
blobDeserializers.put(eventConfig, blobDeserializer);
}
}
@Override
public void deserialize(ByteBuffer buffer) {
try {
// skip id
buffer.position(buffer.position() + Short.BYTES);
final Collection<EventConfig> channels = header.getChannels();
BackendChannel channelName;
RawBlobDeserializer<EventConfig, DataEvent> blobDeserializer;
for (final EventConfig channel : channels) {
blobDeserializer = blobDeserializers.get(channel);
final DataEvent event = blobDeserializer.deserialize(buffer);
if (event != null) {
channelName = cache.get(channel);
valueHolder.add(channelName, event);
}
}
} catch (Exception e) {
final String message = "Could not read DataHeader.";
LOGGER.warn(message, e);
throw new IllegalStateException(message, e);
}
}
}

View File

@@ -0,0 +1,71 @@
package ch.psi.daq.queryrest.response.raw.event.serialize;
import java.io.Serializable;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonInclude.Include;
import ch.psi.daq.domain.backend.Backend;
import ch.psi.daq.domain.json.BackendChannel;
import ch.psi.daq.domain.json.ChannelName;
import ch.psi.daq.queryrest.response.raw.IdProvider;
import ch.psi.daq.queryrest.response.raw.Ids;
@JsonInclude(Include.NON_DEFAULT)
public class EventHeader implements IdProvider, Serializable {
private static final long serialVersionUID = -5342463617678945523L;
public static final short ID = Ids.DATA_EVENTS_HEADER;
private Set<Short> eventSerializerIds = new TreeSet<>();
private Map<BackendChannel, EventConfig> channelsMapping = new LinkedHashMap<>();
public EventHeader() {}
@JsonIgnore
public short getId() {
return ID;
}
public Set<Short> getEventSerializerIds() {
return eventSerializerIds;
}
public void setEventSerializerIds(Set<Short> eventSerializerIds) {
this.eventSerializerIds = eventSerializerIds;
}
public void addEventSerializerId(short eventSerializerId) {
this.eventSerializerIds.add(eventSerializerId);
}
public Collection<EventConfig> getChannels() {
return channelsMapping.values();
}
public void setChannels(Collection<EventConfig> channels) {
this.channelsMapping.clear();
for (EventConfig channel : channels) {
this.channelsMapping.put(
new ChannelName(channel.getName(), Backend.byName(channel.getBackend())),
channel);
}
}
public void addChannel(EventConfig channel) {
this.channelsMapping.put(
new ChannelName(channel.getName(), Backend.byName(channel.getBackend())),
channel);
}
@JsonIgnore
public Map<BackendChannel, EventConfig> getChannelsMapping() {
return channelsMapping;
}
}

View File

@@ -0,0 +1,130 @@
package ch.psi.daq.queryrest.response.raw.event.serialize;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.function.IntFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.hash.Hasher;
import com.google.common.hash.Hashing;
import ch.psi.bsread.common.allocator.ByteBufferAllocator;
import ch.psi.bsread.common.helper.ByteBufferHelper;
import ch.psi.bsread.compression.Compression;
import ch.psi.daq.common.io.ByteBufferInputStream;
import ch.psi.daq.domain.events.ChannelConfiguration;
import ch.psi.daq.domain.json.BackendChannel;
import ch.psi.daq.queryrest.response.raw.receive.ReceiverState;
import ch.psi.daq.queryrest.response.raw.serialize.RawDeserializer;
import ch.psi.daq.queryrest.response.raw.serialize.RawSerializer;
public class EventHeaderDeSerializer implements RawSerializer<Void, EventHeader>, RawDeserializer<Void, EventHeader> {
private static final Logger LOGGER = LoggerFactory.getLogger(EventHeaderDeSerializer.class);
private static final short ID = EventHeader.ID;
private static final ByteBuffer ID_BYTES;
static {
ID_BYTES = ByteBufferAllocator.DEFAULT_ALLOCATOR.apply(Short.BYTES)
.order(RawSerializer.DEFAULT_ENCODING)
.putShort(0, ID);
}
private final IntFunction<ByteBuffer> allocator;
private final ObjectMapper mapper;
private final ReceiverState<EventHeader, ?> state;
public EventHeaderDeSerializer(final IntFunction<ByteBuffer> allocator, final ObjectMapper mapper) {
this(allocator, mapper, null);
}
public EventHeaderDeSerializer(final IntFunction<ByteBuffer> allocator, final ObjectMapper mapper, final ReceiverState<EventHeader, ?> state) {
this.allocator = allocator;
this.mapper = mapper;
this.state = state;
}
@Override
public short getId() {
return ID;
}
@Override
public ByteBuffer getIdBytes() {
return ID_BYTES.duplicate().order(ID_BYTES.order());
}
@Override
public void configure(Void config, Map<? extends BackendChannel, ChannelConfiguration> channelConfigs) {
// noop
}
@Override
public void serialize(OutputStream os, EventHeader value) {
try {
final Compression compression = Compression.bitshuffle_lz4;
final byte[] eventHeaderByteArray = mapper.writeValueAsBytes(value);
final ByteBuffer eventHeaderBytes = compression.getCompressor()
.compressDataHeader(
ByteBuffer.wrap(eventHeaderByteArray),
allocator);
final Hasher eventHeaderHash = Hashing.murmur3_128().newHasher(eventHeaderBytes.remaining());
// dataHeaderHash.putBytes(dataHeaderBytes.duplicate());
eventHeaderHash.putBytes(eventHeaderByteArray);
final ByteBuffer idBuf = getIdBytes();
final ByteBuffer hashCompressBuf = allocator
.apply(Long.BYTES + Byte.BYTES)
.order(RawSerializer.DEFAULT_ENCODING)
.putLong(0, eventHeaderHash.hash().asLong())
.put(Long.BYTES, compression.getId());
final ByteBuffer sizeBuf = allocator
.apply(Long.BYTES)
.order(RawSerializer.DEFAULT_ENCODING)
.putLong(0, idBuf.remaining() + hashCompressBuf.remaining() + eventHeaderBytes.remaining());
// size (long)
ByteBufferHelper.writeByteBuffer(sizeBuf, os);
// id (short)
ByteBufferHelper.writeByteBuffer(idBuf, os);
// hash (long)
ByteBufferHelper.writeByteBuffer(hashCompressBuf, os);
// DataHeader (bitshuffle_lz4 compressed json string)
ByteBufferHelper.writeByteBuffer(eventHeaderBytes, os);
} catch (Exception e) {
final String message = String.format("Could not write DataHeader '%s'.", value);
LOGGER.warn(message, e);
throw new IllegalStateException(message, e);
}
}
@Override
public void init(final ConfigValuesHolder<Void, EventHeader> config) {}
@Override
public void deserialize(ByteBuffer buffer) {
try {
// skip id
buffer.position(buffer.position() + Short.BYTES);
final long hash = buffer.getLong();
if (state.getHash() != hash) {
final Compression compression = Compression.byId(buffer.get());
final ByteBuffer eventHeaderBytes = compression.getCompressor()
.decompressDataHeader(buffer, allocator);
final EventHeader eventHeader =
mapper.readValue(new ByteBufferInputStream(eventHeaderBytes), EventHeader.class);
state.setHash(hash);
state.setConfig(eventHeader);
}
} catch (Exception e) {
final String message = "Could not read EventHeader.";
LOGGER.warn(message, e);
throw new IllegalStateException(message, e);
}
}
}

View File

@@ -0,0 +1,165 @@
package ch.psi.daq.queryrest.response.raw.event.serialize;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.function.IntFunction;
import ch.psi.daq.common.time.TimeUtils;
import ch.psi.daq.domain.ByteValue;
import ch.psi.daq.domain.DataEvent;
import ch.psi.daq.domain.DataHelper;
import ch.psi.daq.domain.StreamEvent;
import ch.psi.daq.domain.backend.Backend;
import ch.psi.daq.domain.compression.CompressionAlgo;
import ch.psi.daq.domain.compression.CompressionRequest;
import ch.psi.daq.domain.events.ChannelConfiguration;
import ch.psi.daq.domain.events.ChannelEvent;
import ch.psi.daq.domain.events.Severity;
import ch.psi.daq.domain.events.Status;
import ch.psi.daq.domain.events.impl.ProcessingEvent;
import ch.psi.daq.queryrest.response.raw.Ids;
import ch.psi.daq.queryrest.response.raw.serialize.RawBlobDeserializer;
import ch.psi.daq.queryrest.response.raw.serialize.RawBlobSerializer;
public class StandardEventDeSerializer
implements RawBlobSerializer<EventConfig, DataEvent>, RawBlobDeserializer<EventConfig, DataEvent> {
public static final int ID = Ids.BLOB_DATA_EVENT_STANDARD;
private static final int META_VALUE_BLOB_SIZE_LENGTH = Integer.BYTES;
private static final int POS_META_BLOB_SIZE = 0;
private static final int POS_META_IOC_TIME = POS_META_BLOB_SIZE + Integer.BYTES;
private static final int POS_META_PULSE_ID = POS_META_IOC_TIME + Long.BYTES;
private static final int POS_META_GLOBAL_TIME = POS_META_PULSE_ID + Long.BYTES;
private static final int POS_META_STATUS = POS_META_GLOBAL_TIME + Long.BYTES;
private static final int POS_META_SEVERITY = POS_META_STATUS + Byte.BYTES;
private static final int POS_VALUE_BLOB = POS_META_SEVERITY + Byte.BYTES;
private static final int META_BLOB_LENGTH = POS_VALUE_BLOB - POS_META_IOC_TIME;
private static final int META_SIZE = META_VALUE_BLOB_SIZE_LENGTH + META_BLOB_LENGTH;
private final IntFunction<ByteBuffer> allocator;
private EventConfig config;
private ByteBuffer noEventBytes;
public StandardEventDeSerializer(final IntFunction<ByteBuffer> allocator) {
this.allocator = allocator;
}
@Override
public short getId() {
return ID;
}
@Override
public void configure(EventConfig config, ChannelConfiguration channelConfig) {
this.config = config;
config.setBlobId(getId());
noEventBytes = allocator.apply(Integer.BYTES)
.order(config.getByteOrder())
.putInt(0, 0);
}
@Override
public void serialize(final DataEvent event, final Collection<ByteBuffer> buffers) {
if (event == null) {
buffers.add(noEventBytes.duplicate().order(noEventBytes.order()));
} else {
Status status = Status.NO_ALARM;
Severity severity = Severity.NO_ALARM;
final ByteBuffer valueBuffer;
if (event instanceof StreamEvent) {
valueBuffer = ((StreamEvent) event).getValueBytes(CompressionRequest.any);
} else if (event instanceof ChannelEvent) {
valueBuffer = ((ChannelEvent) event).getValueBytes(CompressionRequest.any);
status = ((ChannelEvent) event).getStatus();
severity = ((ChannelEvent) event).getSeverity();
} else {
final ByteBuffer dbBytes = DataHelper.convertToDBBytes(
// allocator?
event.getValue(),
config.getType().getKey(),
config.getByteOrder(),
config.getShape(),
CompressionAlgo.getCompressionAlgo(config.getCompression().name()));
valueBuffer = DataHelper.extractValueBytesAsIs(dbBytes);
}
final ByteBuffer metaBuffer = allocator
.apply(META_SIZE)
.order(config.getByteOrder());
// meta value blob size
metaBuffer.putInt(POS_META_BLOB_SIZE,
metaBuffer.remaining() - META_VALUE_BLOB_SIZE_LENGTH
+ valueBuffer.remaining());
metaBuffer.putLong(POS_META_IOC_TIME, TimeUtils.getUnscaledTime(event.getIocTime()));
metaBuffer.putLong(POS_META_PULSE_ID, event.getPulseId());
metaBuffer.putLong(POS_META_GLOBAL_TIME, TimeUtils.getUnscaledTime(event.getGlobalTime()));
metaBuffer.put(POS_META_STATUS, status.getDBKey());
metaBuffer.put(POS_META_SEVERITY, severity.getDBKey());
buffers.add(metaBuffer);
buffers.add(valueBuffer);
}
}
@Override
public void init(final EventConfig config) {
this.config = config;
}
@Override
public DataEvent deserialize(ByteBuffer buffer) {
int metaValueBlobSize;
long iocTime;
long pulseId;
long globalTime;
Status status;
Severity severity;
ByteBuffer byteValue;
buffer.order(config.getByteOrder());
// size
metaValueBlobSize = buffer.getInt();
if (metaValueBlobSize > 0) {
iocTime = buffer.getLong();
pulseId = buffer.getLong();
globalTime = buffer.getLong();
status = Status.newInstance(buffer.get());
severity = Severity.newInstance(buffer.get());
byteValue = buffer.duplicate().order(buffer.order());
final int endPos = byteValue.position() + metaValueBlobSize - META_BLOB_LENGTH;
byteValue.limit(endPos);
ByteBuffer valueBytes =
config
.getCompression()
.getCompressor()
.decompressData(byteValue, byteValue.position(), DataHelper.ALLOCATOR_DEFAULT,
config.getType().getBytes());
buffer.position(endPos);
ProcessingEvent event =
new ProcessingEvent(
config.getName(),
Backend.byName(config.getBackend()),
TimeUtils.getTimeFromUnscaled(iocTime),
pulseId,
TimeUtils.getTimeFromUnscaled(globalTime),
config.getType().getKey(),
config.getShape(),
status,
severity,
new ByteValue(
config.getType().getKey(),
config.getShape(),
valueBytes));
return event;
} else {
return null;
}
}
}

View File

@@ -0,0 +1,43 @@
package ch.psi.daq.queryrest.response.raw.receive;
import java.io.Closeable;
import java.util.Collection;
import java.util.Map;
import java.util.function.Consumer;
import ch.psi.daq.domain.json.BackendChannel;
public interface IReceiver<C, V> extends Closeable {
/**
* Stop receiving and close resources.
*/
void close();
/**
* Establishes the connection.
*/
void connect();
/**
* Receive the next message (blocks for the next).
*
* @return Message The next message or <tt>null</tt> for termination.
* @throws RuntimeException Might throw a RuntimeException
*/
Message<C, V> receive() throws RuntimeException;
/**
* Provides access to the Value change handlers
*
* @return Collection The handlers
*/
Collection<Consumer<Map<BackendChannel, V>>> getValueHandlers();
/**
* Provides access to the Config change handler
*
* @return Collection The handlers
*/
Collection<Consumer<C>> getConfigHandlers();
}

View File

@@ -0,0 +1,94 @@
package ch.psi.daq.queryrest.response.raw.receive;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import ch.psi.daq.queryrest.response.raw.RawDeserializerProvider;
import ch.psi.daq.queryrest.response.raw.event.serialize.EventHeader;
import ch.psi.daq.queryrest.response.raw.serialize.RawDeserializer;
public class InputStreamMessageExtractor<V> implements MessageReader<EventHeader, V> {
private static final Logger LOGGER = LoggerFactory.getLogger(InputStreamMessageExtractor.class);
private final InputStream is;
private final Function<ReceiverState<EventHeader, V>, RawDeserializer<Void, EventHeader>> configDeserializerCreator;
private final RawDeserializerProvider<EventHeader, V> deserializerProvider;
private ReceiverState<EventHeader, V> state;
private RawDeserializer<Void, EventHeader> configDeserializer;
public InputStreamMessageExtractor(final InputStream is,
final Function<ReceiverState<EventHeader, V>, RawDeserializer<Void, EventHeader>> configDeserializerCreator,
final RawDeserializerProvider<EventHeader, V> deserializerProvider) {
this.is = is;
this.configDeserializerCreator = configDeserializerCreator;
this.deserializerProvider = deserializerProvider;
}
@Override
public void connect(final ReceiverState<EventHeader, V> state) {
this.state = state;
configDeserializer = configDeserializerCreator.apply(state);
configDeserializer.init(null);
}
@Override
public void readNext() {
state.init();
try {
while (!state.hasValues() && state.isRunning()) {
final ByteBuffer buffer = RawDeserializer.readBytes(is);
if(buffer.remaining() > 0){
final short id = RawDeserializer.getDataId(buffer);
if (id == configDeserializer.getId()) {
configDeserializer.deserialize(buffer);
final EventHeader config = state.getConfig();
// some checkes
if (deserializerProvider.hasAllDeserializer(config.getEventSerializerIds())) {
deserializerProvider.init(state);
} else {
final String message = String.format("Supported Deserializers '%s' but extected are '%s'.",
deserializerProvider.getDeserializerIds(), config.getEventSerializerIds());
LOGGER.error(message);
throw new IllegalStateException(message);
}
} else {
final RawDeserializer<EventHeader, V> deserializer = deserializerProvider.getDeserializer(id);
if (deserializer != null) {
deserializer.deserialize(buffer);
} else {
final String message =
String.format("There is no Deserializer for '%d'. Available ids are '%s'.", id, deserializerProvider.getDeserializerIds());
LOGGER.warn(message);
throw new IllegalStateException(message);
}
}
} else{
// stop
state.stop();
}
}
} catch (IOException e) {
// assume end of stream reached
LOGGER.debug("Reached end of stream.", e);
}
}
@Override
public void close() {
try {
is.close();
} catch (Exception e) {
LOGGER.warn("Could not close InputStream.", e);
}
}
}

View File

@@ -0,0 +1,46 @@
package ch.psi.daq.queryrest.response.raw.receive;
import java.io.Serializable;
import java.util.LinkedHashMap;
import java.util.Map;
import ch.psi.daq.domain.json.BackendChannel;
public class Message<C, V> implements Serializable {
private static final long serialVersionUID = 5975345475367745542L;
private boolean configChanged = false;
private C config = null;
private Map<BackendChannel, V> values = new LinkedHashMap<>();
public Message() {}
public Message(final boolean configChanged, final C config, final Map<BackendChannel, V> values) {
this.configChanged = configChanged;
this.config = config;
this.values = values;
}
public boolean isConfigChanged() {
return configChanged;
}
public void setConfigChanged(final boolean configChanged) {
this.configChanged = configChanged;
}
public C getConfig() {
return config;
}
public void setValues(Map<BackendChannel, V> values) {
this.values = values;
}
public Map<BackendChannel, V> getValues() {
return values;
}
public void add(BackendChannel channel, V value) {
this.values.put(channel, value);
}
}

View File

@@ -0,0 +1,21 @@
package ch.psi.daq.queryrest.response.raw.receive;
public interface MessageReader<C, V> {
/**
* Establishes the connection.
*
* @param state The ReceiverState
*/
void connect(final ReceiverState<C, V> state);
/**
* Reads the value bytes of a channel.
*/
void readNext();
/**
* Stop receiving and close resources.
*/
void close();
}

View File

@@ -0,0 +1,133 @@
package ch.psi.daq.queryrest.response.raw.receive;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import ch.psi.daq.domain.json.BackendChannel;
import ch.psi.daq.queryrest.response.raw.event.serialize.EventHeader;
public class Receiver<C, V> implements IReceiver<C, V> {
private static final Logger LOGGER = LoggerFactory.getLogger(Receiver.class);
private final AtomicBoolean isRunning = new AtomicBoolean();
private final List<Consumer<C>> configHandlers = new ArrayList<>();
private final List<Consumer<Map<BackendChannel, V>>> valueHandlers = new ArrayList<>();
private final ReceiverConfig<C, V> receiverConfig;
private ReceiverState<C, V> receiverState;
public Receiver(ReceiverConfig<C, V> receiverConfig) {
this.receiverConfig = receiverConfig;
// this.dataHeaderHandlers.add(this.receiverConfig.getMessageExtractor());
}
public void connect() {
if (isRunning.compareAndSet(false, true)) {
// ensures new state after reconnect
receiverState = new ReceiverStateImpl<>(isRunning);
receiverConfig.getMessageReader().connect(receiverState);
}
}
@Override
public void close() {
if (isRunning.compareAndSet(true, false)) {
cleanup();
}
}
protected void cleanup() {
receiverConfig.getMessageReader().close();
}
public Message<C, V> receive() throws RuntimeException {
receiverState.init();
try {
while (receiverState.isRunning() && !receiverState.hasValues()) {
receiverConfig.getMessageReader().readNext();
}
if (receiverState.hasValues()) {
final Message<C, V> message = receiverState.getMessage();
if (message.isConfigChanged()) {
final C config = message.getConfig();
if (receiverConfig.isParallelHandlerProcessing()) {
configHandlers.parallelStream().forEach(handler -> handler.accept(config));
} else {
for (Consumer<C> eventHeaderHandler : configHandlers) {
eventHeaderHandler.accept(config);
}
}
}
final Map<BackendChannel, V> values = message.getValues();
if (receiverConfig.isParallelHandlerProcessing()) {
valueHandlers.parallelStream().forEach(handler -> handler.accept(values));
} else {
for (Consumer<Map<BackendChannel, V>> valueHandler : valueHandlers) {
valueHandler.accept(values);
}
}
return message;
} else {
return null;
}
} catch (Exception e) {
LOGGER.error(
"Receiver of '{}' stopped unexpectedly.", receiverConfig, e);
isRunning.set(false);
throw e;
} finally {
if (!isRunning.get()) {
cleanup();
}
}
}
public ReceiverConfig<C, V> getReceiverConfig() {
return receiverConfig;
}
public ReceiverState<C, V> getReceiverState() {
return receiverState;
}
@Override
public Collection<Consumer<Map<BackendChannel, V>>> getValueHandlers() {
return valueHandlers;
}
public void addValueHandler(Consumer<Map<BackendChannel, V>> handler) {
valueHandlers.add(handler);
}
public void removeValueHandler(Consumer<Map<BackendChannel, V>> handler) {
valueHandlers.remove(handler);
}
@Override
public Collection<Consumer<C>> getConfigHandlers() {
return configHandlers;
}
public void addConfigHandler(Consumer<C> handler) {
configHandlers.add(handler);
}
public void removeEventHeaderHandler(Consumer<EventHeader> handler) {
configHandlers.remove(handler);
}
}

View File

@@ -0,0 +1,51 @@
package ch.psi.daq.queryrest.response.raw.receive;
import java.nio.ByteBuffer;
import java.util.function.IntFunction;
import com.fasterxml.jackson.databind.ObjectMapper;
public class ReceiverConfig<C, V> {
private boolean parallelHandlerProcessing;
private final MessageReader<C, V> messageReader;
private IntFunction<ByteBuffer> allocator;
private ObjectMapper objectMapper;
public ReceiverConfig(MessageReader<C, V> messageReader) {
this.messageReader = messageReader;
}
public boolean isParallelHandlerProcessing() {
return parallelHandlerProcessing;
}
public void setParallelHandlerProcessing(boolean parallelHandlerProcessing) {
this.parallelHandlerProcessing = parallelHandlerProcessing;
}
public MessageReader<C, V> getMessageReader() {
return messageReader;
}
public IntFunction<ByteBuffer> getAllocator() {
return allocator;
}
public void setAllocator(IntFunction<ByteBuffer> allocator) {
this.allocator = allocator;
}
public ObjectMapper getObjectMapper() {
return objectMapper;
}
public void setObjectMapper(ObjectMapper objectMapper) {
this.objectMapper = objectMapper;
addObjectMapperMixin(objectMapper);
}
public static void addObjectMapperMixin(ObjectMapper objectMapper) {}
}

View File

@@ -0,0 +1,23 @@
package ch.psi.daq.queryrest.response.raw.receive;
import ch.psi.daq.domain.json.BackendChannel;
import ch.psi.daq.queryrest.response.raw.event.serialize.ConfigValuesHolder;
public interface ReceiverState<C, V> extends ConfigValuesHolder<C, V> {
boolean stop();
boolean isRunning();
long getHash();
void setHash(long hash);
void init();
boolean hasValues();
void add(BackendChannel channel, V value);
Message<C, V> getMessage();
}

View File

@@ -0,0 +1,81 @@
package ch.psi.daq.queryrest.response.raw.receive;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import ch.psi.daq.domain.json.BackendChannel;
public class ReceiverStateImpl<C, V> implements ReceiverState<C, V> {
private final AtomicBoolean isRunning;
private long hash = Long.MIN_VALUE;
private boolean hashChanged = false;
private C config;
private Message<C, V> message;
public ReceiverStateImpl(AtomicBoolean isRunning) {
this.isRunning = isRunning;
}
// public boolean start(){
// return isRunning.compareAndSet(false, true);
// }
@Override
public boolean stop() {
return isRunning.compareAndSet(true, false);
}
@Override
public boolean isRunning() {
return isRunning.get();
}
@Override
public long getHash() {
return hash;
}
@Override
public void setHash(long hash) {
hashChanged = hash != this.hash;
this.hash = hash;
}
@Override
public C getConfig() {
return config;
}
@Override
public void setConfig(C config) {
this.config = config;
}
public Map<BackendChannel, V> getValues() {
return message.getValues();
}
// public void setValues(Map<BackendChannel, V> values) {
// message.setValues(values);
// }
public void init() {
message = new Message<>();
}
public boolean hasValues() {
return message != null && !message.getValues().isEmpty();
}
@Override
public void add(BackendChannel channel, V value) {
this.message.add(channel, value);
}
@Override
public Message<C, V> getMessage() {
message.setConfigChanged(hashChanged);
hashChanged = false;
return message;
}
}

View File

@@ -0,0 +1,12 @@
package ch.psi.daq.queryrest.response.raw.serialize;
import java.nio.ByteBuffer;
import ch.psi.daq.queryrest.response.raw.IdProvider;
public interface RawBlobDeserializer<C, V> extends IdProvider {
void init(final C config);
V deserialize(final ByteBuffer blob);
}

View File

@@ -0,0 +1,14 @@
package ch.psi.daq.queryrest.response.raw.serialize;
import java.nio.ByteBuffer;
import java.util.Collection;
import ch.psi.daq.domain.events.ChannelConfiguration;
import ch.psi.daq.queryrest.response.raw.IdProvider;
public interface RawBlobSerializer<C, V> extends IdProvider {
void configure(final C config, final ChannelConfiguration channelConfig);
void serialize(final V value, final Collection<ByteBuffer> buffers);
}

View File

@@ -0,0 +1,49 @@
package ch.psi.daq.queryrest.response.raw.serialize;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import ch.psi.bsread.common.allocator.ByteBufferAllocator;
import ch.psi.bsread.common.helper.ByteBufferHelper;
import ch.psi.daq.queryrest.response.raw.IdProvider;
import ch.psi.daq.queryrest.response.raw.event.serialize.ConfigValuesHolder;
public interface RawDeserializer<C, V> extends IdProvider {
public static final Logger _LOGGER = LoggerFactory.getLogger(RawDeserializer.class);
public static final ByteOrder DEFAULT_ENCODING = RawSerializer.DEFAULT_ENCODING;
public static ByteBuffer readBytes(final InputStream is) throws IOException {
final ByteBuffer sizeBuf = ByteBufferAllocator.DEFAULT_ALLOCATOR
.apply(Long.BYTES)
.order(RawDeserializer.DEFAULT_ENCODING);
ByteBufferHelper.readByteBuffer(is, sizeBuf);
long size = sizeBuf.getLong(0);
final ByteBuffer blob = ByteBufferAllocator.DEFAULT_ALLOCATOR
.apply((int) size)
.order(RawDeserializer.DEFAULT_ENCODING);
ByteBufferHelper.readByteBuffer(is, blob);
return blob;
}
public static short readDataId(final InputStream is) throws IOException {
final ByteBuffer idBuf = ByteBufferAllocator.DEFAULT_ALLOCATOR
.apply(Short.BYTES)
.order(RawDeserializer.DEFAULT_ENCODING);
ByteBufferHelper.readByteBuffer(is, idBuf);
return getDataId(idBuf);
}
public static short getDataId(final ByteBuffer blob) {
return blob.getShort(0);
}
void init(final ConfigValuesHolder<C, V> config);
void deserialize(final ByteBuffer buffer) throws RuntimeException;
}

View File

@@ -0,0 +1,24 @@
package ch.psi.daq.queryrest.response.raw.serialize;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import ch.psi.daq.domain.events.ChannelConfiguration;
import ch.psi.daq.domain.json.BackendChannel;
import ch.psi.daq.queryrest.response.raw.IdProvider;
public interface RawSerializer<C, V> extends IdProvider {
public static final Logger _LOGGER = LoggerFactory.getLogger(RawSerializer.class);
public static final ByteOrder DEFAULT_ENCODING = ByteOrder.BIG_ENDIAN;
ByteBuffer getIdBytes();
void configure(final C config, final Map<? extends BackendChannel, ChannelConfiguration> channelConfigs);
void serialize(final OutputStream os, final V value) throws RuntimeException;
}

View File

@@ -10,7 +10,6 @@ import org.springframework.http.MediaType;
import ch.psi.daq.domain.query.operation.Compression;
import ch.psi.daq.domain.query.response.ResponseFormat;
import ch.psi.daq.queryrest.response.AbstractHTTPResponse;
import ch.psi.daq.queryrest.response.ResponseFormatter;
import ch.psi.daq.queryrest.response.ResponseStreamWriter;
import ch.psi.daq.queryrest.response.json.JSONHTTPResponse;
@@ -37,13 +36,12 @@ public class SmileHTTPResponse extends AbstractHTTPResponse {
final ApplicationContext context,
final HttpServletResponse response,
final Object query,
final R result,
final ResponseFormatter<R> formatter) throws Exception {
final R result) throws Exception {
final OutputStream out = handleCompressionAndResponseHeaders(response, CONTENT_TYPE);
final ResponseStreamWriter streamWriter = context.getBean(SmileResponseStreamWriter.class);
// write the response back to the client using java 8 streams
streamWriter.respond(query, result, out, this, formatter);
streamWriter.respond(query, result, out, this);
}
}

View File

@@ -703,7 +703,7 @@ public class JsonQueryRestControllerTableTest extends AbstractDaqRestTest implem
.andExpect(MockMvcResultMatchers.jsonPath("$.data[1][1].globalSeconds").value(
TestTimeUtils.getTimeStr(2, 10000000)));
}
@Test
public void testTimeRangeQuery_01_ConfigFields() throws Exception {
DAQQuery request = new DAQQuery(
@@ -799,7 +799,7 @@ public class JsonQueryRestControllerTableTest extends AbstractDaqRestTest implem
.andExpect(MockMvcResultMatchers.jsonPath("$.meta[1].configs[1].source").value("unknown"))
.andExpect(MockMvcResultMatchers.jsonPath("$.meta[1].configs[1].type").value(Type.Int32.getKey()))
.andExpect(MockMvcResultMatchers.jsonPath("$.meta[1].configs[1].unit").doesNotExist())
.andExpect(MockMvcResultMatchers.jsonPath("$.data").isArray())
.andExpect(MockMvcResultMatchers.jsonPath("$.data[0]").exists())
.andExpect(MockMvcResultMatchers.jsonPath("$.data[0]").isArray())