MsgPack and BSON Response writer.

This commit is contained in:
Fabian Märki
2016-11-23 11:13:22 +01:00
parent ebac00d678
commit b9755b6ad4
23 changed files with 864 additions and 70 deletions

View File

@ -1,5 +1,5 @@
#
#Thu Aug 04 12:28:57 CEST 2016
#Tue Nov 22 17:23:47 CET 2016
org.eclipse.jdt.core.compiler.debug.localVariable=generate
org.eclipse.jdt.core.compiler.compliance=1.8
org.eclipse.jdt.core.compiler.codegen.unusedLocal=preserve

View File

@ -33,6 +33,8 @@ dependencies {
compile libraries.commons_io
compile libraries.commons_csv
compile libraries.netty_all
compile libraries.bson_jackson
compile libraries.msgpack_jackson
testCompile libraries.spring_boot_starter_test
testCompile libraries.jsonassert

View File

@ -10,6 +10,7 @@ import java.util.function.Function;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import org.msgpack.jackson.dataformat.MessagePackFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Bean;
@ -42,9 +43,16 @@ import ch.psi.daq.queryrest.model.PropertyFilterMixin;
import ch.psi.daq.queryrest.query.QueryManager;
import ch.psi.daq.queryrest.query.QueryManagerImpl;
import ch.psi.daq.queryrest.response.PolymorphicResponseMixIn;
import ch.psi.daq.queryrest.response.bson.BSONResponseStreamWriter;
import ch.psi.daq.queryrest.response.bson.BSONTableResponseStreamWriter;
import ch.psi.daq.queryrest.response.csv.CSVResponseStreamWriter;
import ch.psi.daq.queryrest.response.json.JSONResponseStreamWriter;
import ch.psi.daq.queryrest.response.json.JSONTableResponseStreamWriter;
import ch.psi.daq.queryrest.response.msgpack.MsgPackResponseStreamWriter;
import ch.psi.daq.queryrest.response.msgpack.MsgPackTableResponseStreamWriter;
import de.undercouch.bson4jackson.BsonFactory;
import de.undercouch.bson4jackson.BsonGenerator;
@Configuration
@Import(value = QueryRestConfigCORS.class)
@ -119,6 +127,21 @@ public class QueryRestConfig extends WebMvcConfigurerAdapter {
public JsonFactory jsonFactory() {
return new JsonFactory();
}
@Bean
public BsonFactory bsonFactory() {
BsonFactory factory = new BsonFactory();
factory.enable(BsonGenerator.Feature.ENABLE_STREAMING);
return factory;
}
@Bean
public MessagePackFactory nessagePackFactory() {
MessagePackFactory factory = new MessagePackFactory();
return factory;
}
@Bean
public Function<BackendQuery, BackendQueryAnalyzer> queryAnalizerFactory() {
@ -134,6 +157,26 @@ public class QueryRestConfig extends WebMvcConfigurerAdapter {
public JSONTableResponseStreamWriter jsonTableResponseStreamWriter() {
return new JSONTableResponseStreamWriter();
}
@Bean
public BSONResponseStreamWriter bsonResponseStreamWriter() {
return new BSONResponseStreamWriter();
}
@Bean
public BSONTableResponseStreamWriter bsonTableResponseStreamWriter() {
return new BSONTableResponseStreamWriter();
}
@Bean
public MsgPackResponseStreamWriter msgPackResponseStreamWriter() {
return new MsgPackResponseStreamWriter();
}
@Bean
public MsgPackTableResponseStreamWriter msgPackTableResponseStreamWriter() {
return new MsgPackTableResponseStreamWriter();
}
@Bean
public CSVResponseStreamWriter csvResponseStreamWriter() {

View File

@ -4,8 +4,10 @@ import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonSubTypes.Type;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import ch.psi.daq.queryrest.response.bson.BSONHTTPResponse;
import ch.psi.daq.queryrest.response.csv.CSVHTTPResponse;
import ch.psi.daq.queryrest.response.json.JSONHTTPResponse;
import ch.psi.daq.queryrest.response.msgpack.MsgPackHTTPResponse;
@JsonTypeInfo(
use = JsonTypeInfo.Id.NAME,
@ -13,7 +15,9 @@ import ch.psi.daq.queryrest.response.json.JSONHTTPResponse;
property = "format")
@JsonSubTypes({
@Type(value = JSONHTTPResponse.class, name = JSONHTTPResponse.FORMAT),
@Type(value = CSVHTTPResponse.class, name = CSVHTTPResponse.FORMAT)
@Type(value = BSONHTTPResponse.class, name = BSONHTTPResponse.FORMAT),
@Type(value = MsgPackHTTPResponse.class, name = MsgPackHTTPResponse.FORMAT),
@Type(value = CSVHTTPResponse.class, name = CSVHTTPResponse.FORMAT)
})
// see: http://stackoverflow.com/questions/24631923/alternative-to-jackson-jsonsubtypes
public abstract class PolymorphicResponseMixIn {

View File

@ -0,0 +1,69 @@
package ch.psi.daq.queryrest.response.bson;
import java.io.OutputStream;
import java.util.List;
import java.util.Map.Entry;
import java.util.stream.Stream;
import javax.servlet.http.HttpServletResponse;
import org.apache.commons.lang3.tuple.Triple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.http.MediaType;
import ch.psi.daq.domain.json.ChannelName;
import ch.psi.daq.domain.query.DAQQueries;
import ch.psi.daq.domain.query.DAQQueryElement;
import ch.psi.daq.domain.query.backend.BackendQuery;
import ch.psi.daq.domain.query.operation.Compression;
import ch.psi.daq.domain.query.response.ResponseFormat;
import ch.psi.daq.queryrest.query.QueryManager;
import ch.psi.daq.queryrest.response.AbstractHTTPResponse;
import ch.psi.daq.queryrest.response.ResponseStreamWriter;
import ch.psi.daq.queryrest.response.json.JSONHTTPResponse;
public class BSONHTTPResponse extends AbstractHTTPResponse {
private static final Logger LOGGER = LoggerFactory.getLogger(BSONHTTPResponse.class);
public static final String FORMAT = "bson";
public static final String CONTENT_TYPE = MediaType.APPLICATION_OCTET_STREAM_VALUE;
public BSONHTTPResponse() {
super(ResponseFormat.BSON);
}
public BSONHTTPResponse(Compression compression) {
this();
setCompression(compression);
}
@Override
public void respond(ApplicationContext context, DAQQueries queries, HttpServletResponse response) throws Exception {
OutputStream out = handleCompressionAndResponseHeaders(response, CONTENT_TYPE);
boolean hasMapping = JSONHTTPResponse.validateQueries(queries);
try {
LOGGER.debug("Executing query '{}'", queries);
QueryManager queryManager = context.getBean(QueryManager.class);
ResponseStreamWriter streamWriter;
if (hasMapping) {
streamWriter = context.getBean(BSONTableResponseStreamWriter.class);
} else {
streamWriter = context.getBean(BSONResponseStreamWriter.class);
}
// execute query
List<Entry<DAQQueryElement, Stream<Triple<BackendQuery, ChannelName, ?>>>> result =
queryManager.getEvents(queries);
// write the response back to the client using java 8 streams
streamWriter.respond(result, out, this);
} catch (Exception e) {
LOGGER.error("Failed to execute query '{}'.", queries, e);
throw e;
}
}
}

View File

@ -0,0 +1,41 @@
package ch.psi.daq.queryrest.response.bson;
import java.io.OutputStream;
import java.util.List;
import java.util.Map.Entry;
import java.util.stream.Stream;
import javax.annotation.Resource;
import javax.servlet.ServletResponse;
import org.apache.commons.lang3.tuple.Triple;
import com.fasterxml.jackson.databind.ObjectMapper;
import ch.psi.daq.domain.json.ChannelName;
import ch.psi.daq.domain.query.DAQQueryElement;
import ch.psi.daq.domain.query.backend.BackendQuery;
import ch.psi.daq.domain.query.response.Response;
import ch.psi.daq.queryrest.response.ResponseStreamWriter;
import ch.psi.daq.queryrest.response.json.JSONResponseStreamWriter;
import de.undercouch.bson4jackson.BsonFactory;
/**
* Takes a Java 8 stream and writes it to the output stream provided by the {@link ServletResponse}
* of the current request.
*/
public class BSONResponseStreamWriter implements ResponseStreamWriter {
@Resource
private BsonFactory bsonFactory;
@Resource
private ObjectMapper mapper;
@Override
public void respond(final List<Entry<DAQQueryElement, Stream<Triple<BackendQuery, ChannelName, ?>>>> results,
final OutputStream out, final Response response) throws Exception {
JSONResponseStreamWriter.respond(bsonFactory, mapper, results, out, response);
}
}

View File

@ -0,0 +1,64 @@
package ch.psi.daq.queryrest.response.bson;
import java.io.OutputStream;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map.Entry;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import javax.servlet.ServletResponse;
import org.apache.commons.lang3.tuple.Triple;
import org.springframework.context.ApplicationContext;
import com.fasterxml.jackson.databind.ObjectMapper;
import ch.psi.daq.domain.json.ChannelName;
import ch.psi.daq.domain.query.DAQQueryElement;
import ch.psi.daq.domain.query.backend.BackendQuery;
import ch.psi.daq.domain.query.operation.Aggregation;
import ch.psi.daq.domain.query.response.Response;
import ch.psi.daq.queryrest.config.QueryRestConfig;
import ch.psi.daq.queryrest.response.ResponseStreamWriter;
import ch.psi.daq.queryrest.response.json.JSONTableResponseStreamWriter;
import de.undercouch.bson4jackson.BsonFactory;
/**
* Takes a Java 8 stream and writes it to the output stream provided by the {@link ServletResponse}
* of the current request.
*/
public class BSONTableResponseStreamWriter implements ResponseStreamWriter {
@Resource
private ApplicationContext context;
@Resource
private BsonFactory bsonFactory;
@Resource
private ObjectMapper mapper;
@Resource(name = QueryRestConfig.BEAN_NAME_DEFAULT_RESPONSE_AGGREGATIONS)
private Set<Aggregation> defaultResponseAggregations;
private Set<String> defaultResponseAggregationsStr;
@PostConstruct
public void afterPropertiesSet() {
defaultResponseAggregationsStr =
defaultResponseAggregations.stream().map(Aggregation::name)
.collect(Collectors.toCollection(LinkedHashSet::new));
}
@Override
public void respond(final List<Entry<DAQQueryElement, Stream<Triple<BackendQuery, ChannelName, ?>>>> results,
final OutputStream out, final Response response) throws Exception {
JSONTableResponseStreamWriter.respond(context, bsonFactory, mapper, defaultResponseAggregationsStr, results, out,
response);
}
}

View File

@ -61,24 +61,25 @@ public class CSVResponseStreamWriter implements ResponseStreamWriter {
public static final char DELIMITER_CHANNELNAME_FIELDNAME = '.';
public static final String EMPTY_VALUE = "";
public static final String FIELDNAME_EXTREMA = "extrema";
private static final Function<DataEvent, ChannelName> KEY_PROVIDER = (event) -> new ChannelName(event.getChannel(), event.getBackend());
private static final Function<DataEvent, ChannelName> KEY_PROVIDER = (event) -> new ChannelName(event.getChannel(),
event.getBackend());
// try to match sync data (bsread) with non sync data (epics) based on the time usin 10 millis
// buckets.
private static final ToLongFunction<DataEvent> MATCHER_PROVIDER = (event) -> event.getGlobalMillis() / 10L;
@Resource
private ApplicationContext context;
@Resource
private Function<BackendQuery, BackendQueryAnalyzer> queryAnalizerFactory;
@Override
public void respond(final List<Entry<DAQQueryElement, Stream<Triple<BackendQuery, ChannelName, ?>>>> results,
final OutputStream out, final Response response) throws Exception {
if(results.size() > 1){
if (results.size() > 1) {
throw new IllegalStateException("CSV format does not allow for multiple queries.");
}
AtomicReference<Exception> exception = new AtomicReference<>();
final Map<ChannelName, Stream<DataEvent>> streams = new LinkedHashMap<>(results.size());
@ -96,7 +97,7 @@ public class CSVResponseStreamWriter implements ResponseStreamWriter {
.sequential()
.forEach(triple -> {
backendQueryRef.compareAndSet(null, triple.getLeft());
if (triple.getRight() instanceof Stream) {
setupChannelColumns(query, triple.getLeft(), triple.getMiddle(), header, accessors);
@ -115,7 +116,7 @@ public class CSVResponseStreamWriter implements ResponseStreamWriter {
// online matching of the stream's content
StreamMatcher<ChannelName, DataEvent, Map<ChannelName, DataEvent>> streamMatcher =
new StreamMatcher<>(
KEY_PROVIDER,
KEY_PROVIDER,
MATCHER_PROVIDER,
new MapCreator<>(),
new MapFiller<>(),
@ -163,6 +164,14 @@ public class CSVResponseStreamWriter implements ResponseStreamWriter {
writer.close();
}
if (streamMatcher != null) {
try {
streamMatcher.close();
} catch (Throwable t) {
LOGGER.error("Something went wrong while closing stream matcher for csv response writer.", t);
}
}
if (csvFilePrinter != null) {
csvFilePrinter.close();
}

View File

@ -46,7 +46,7 @@ public class JSONHTTPResponse extends AbstractHTTPResponse {
public void respond(ApplicationContext context, DAQQueries queries, HttpServletResponse response) throws Exception {
OutputStream out = handleCompressionAndResponseHeaders(response, CONTENT_TYPE);
boolean hasMapping = validateQueries(queries);
boolean hasMapping = JSONHTTPResponse.validateQueries(queries);
try {
LOGGER.debug("Executing query '{}'", queries);
@ -71,7 +71,7 @@ public class JSONHTTPResponse extends AbstractHTTPResponse {
}
protected boolean validateQueries(DAQQueries queries) {
public static boolean validateQueries(DAQQueries queries) {
boolean hasMapping = false;
for (DAQQueryElement query : queries) {

View File

@ -51,60 +51,7 @@ public class JSONResponseStreamWriter implements ResponseStreamWriter {
@Override
public void respond(final List<Entry<DAQQueryElement, Stream<Triple<BackendQuery, ChannelName, ?>>>> results,
final OutputStream out, final Response response) throws Exception {
AtomicReference<Exception> exception = new AtomicReference<>();
JsonGenerator generator = jsonFactory.createGenerator(out, JsonEncoding.UTF8);
try {
if (results.size() > 1) {
generator.writeStartArray();
}
results
.forEach(entryy -> {
DAQQueryElement daqQuery = entryy.getKey();
Set<String> includedFields = getFields(daqQuery, true);
ObjectWriter writer = configureWriter(includedFields, mapper);
try {
generator.writeStartArray();
entryy.getValue()
/* ensure elements are sequentially written */
.sequential()
.forEach(
triple -> {
try {
generator.writeStartObject();
generator.writeFieldName(QueryField.channel.name());
writer.writeValue(generator, triple.getMiddle());
generator.writeFieldName(DATA_RESP_FIELD);
writer.writeValue(generator, triple.getRight());
generator.writeEndObject();
} catch (Exception e) {
LOGGER.error("Could not write channel name of channel '{}'", triple.getMiddle(),
e);
exception.compareAndSet(null, e);
}
});
generator.writeEndArray();
} catch (Exception e) {
LOGGER.error("Exception while writing json for '{}'", daqQuery.getChannels(), e);
exception.compareAndSet(null, e);
}
});
} finally {
if (results.size() > 1) {
generator.writeEndArray();
}
generator.flush();
generator.close();
}
if (exception.get() != null) {
throw exception.get();
}
respond(jsonFactory, mapper, results, out, response);
}
public static Set<String> getFields(DAQQueryElement query, boolean removeIdentifiers) {
@ -138,6 +85,68 @@ public class JSONResponseStreamWriter implements ResponseStreamWriter {
return includedFields;
}
public static void respond(final JsonFactory factory, final ObjectMapper mapper, final List<Entry<DAQQueryElement, Stream<Triple<BackendQuery, ChannelName, ?>>>> results,
final OutputStream out, final Response response) throws Exception {
AtomicReference<Exception> exception = new AtomicReference<>();
JsonGenerator generator = factory.createGenerator(out, JsonEncoding.UTF8);
try {
if (results.size() > 1) {
generator.writeStartArray();
}
results
.forEach(entryy -> {
DAQQueryElement daqQuery = entryy.getKey();
Set<String> includedFields = getFields(daqQuery, true);
ObjectWriter writer = configureWriter(includedFields, mapper);
try {
generator.writeStartArray();
entryy.getValue()
/* ensure elements are sequentially written */
.sequential()
.forEach(
triple -> {
try {
generator.writeStartObject();
generator.writeFieldName(QueryField.channel.name());
writer.writeValue(generator, triple.getMiddle());
generator.writeFieldName(DATA_RESP_FIELD);
writer.writeValue(generator, triple.getRight());
generator.writeEndObject();
} catch (Exception e) {
LOGGER.error("Could not write channel name of channel '{}'", triple.getMiddle(),
e);
exception.compareAndSet(null, e);
} finally{
if(triple.getRight() instanceof Stream){
((Stream<?>)(triple.getRight())).close();
}
}
});
generator.writeEndArray();
} catch (Exception e) {
LOGGER.error("Exception while writing json for '{}'", daqQuery.getChannels(), e);
exception.compareAndSet(null, e);
}
});
} finally {
if (results.size() > 1) {
generator.writeEndArray();
}
generator.flush();
generator.close();
}
if (exception.get() != null) {
throw exception.get();
}
}
/**
* Configures the writer dynamically by including the fields which should be included in the

View File

@ -92,11 +92,18 @@ public class JSONTableResponseStreamWriter implements ResponseStreamWriter {
@Override
public void respond(final List<Entry<DAQQueryElement, Stream<Triple<BackendQuery, ChannelName, ?>>>> results,
final OutputStream out, final Response response) throws Exception {
respond(context, jsonFactory, mapper, defaultResponseAggregationsStr, results, out, response);
}
public static void respond(final ApplicationContext context, final JsonFactory factory,
final ObjectMapper mapper, final Set<String> defaultResponseAggregationsStr,
final List<Entry<DAQQueryElement, Stream<Triple<BackendQuery, ChannelName, ?>>>> results,
final OutputStream out, final Response response) throws Exception {
AtomicReference<Exception> exception = new AtomicReference<>();
JsonGenerator generator = jsonFactory.createGenerator(out, JsonEncoding.UTF8);
JsonGenerator generator = factory.createGenerator(out, JsonEncoding.UTF8);
try {
if (results.size() > 1) {
generator.writeStartArray();
@ -177,6 +184,15 @@ public class JSONTableResponseStreamWriter implements ResponseStreamWriter {
} catch (Exception e) {
LOGGER.error("Exception while writing json for '{}'", daqQuery.getChannels(), e);
exception.compareAndSet(null, e);
} finally {
if (streamMatcher != null) {
try {
streamMatcher.close();
} catch (Throwable t) {
LOGGER.error(
"Something went wrong while closing stream matcher for JSON table response writer.", t);
}
}
}
});
} catch (IOException e) {

View File

@ -0,0 +1,69 @@
package ch.psi.daq.queryrest.response.msgpack;
import java.io.OutputStream;
import java.util.List;
import java.util.Map.Entry;
import java.util.stream.Stream;
import javax.servlet.http.HttpServletResponse;
import org.apache.commons.lang3.tuple.Triple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.http.MediaType;
import ch.psi.daq.domain.json.ChannelName;
import ch.psi.daq.domain.query.DAQQueries;
import ch.psi.daq.domain.query.DAQQueryElement;
import ch.psi.daq.domain.query.backend.BackendQuery;
import ch.psi.daq.domain.query.operation.Compression;
import ch.psi.daq.domain.query.response.ResponseFormat;
import ch.psi.daq.queryrest.query.QueryManager;
import ch.psi.daq.queryrest.response.AbstractHTTPResponse;
import ch.psi.daq.queryrest.response.ResponseStreamWriter;
import ch.psi.daq.queryrest.response.json.JSONHTTPResponse;
public class MsgPackHTTPResponse extends AbstractHTTPResponse {
private static final Logger LOGGER = LoggerFactory.getLogger(MsgPackHTTPResponse.class);
public static final String FORMAT = "msgp";
public static final String CONTENT_TYPE = MediaType.APPLICATION_OCTET_STREAM_VALUE;
public MsgPackHTTPResponse() {
super(ResponseFormat.MSGP);
}
public MsgPackHTTPResponse(Compression compression) {
this();
setCompression(compression);
}
@Override
public void respond(ApplicationContext context, DAQQueries queries, HttpServletResponse response) throws Exception {
OutputStream out = handleCompressionAndResponseHeaders(response, CONTENT_TYPE);
boolean hasMapping = JSONHTTPResponse.validateQueries(queries);
try {
LOGGER.debug("Executing query '{}'", queries);
QueryManager queryManager = context.getBean(QueryManager.class);
ResponseStreamWriter streamWriter;
if (hasMapping) {
streamWriter = context.getBean(MsgPackTableResponseStreamWriter.class);
} else {
streamWriter = context.getBean(MsgPackResponseStreamWriter.class);
}
// execute query
List<Entry<DAQQueryElement, Stream<Triple<BackendQuery, ChannelName, ?>>>> result =
queryManager.getEvents(queries);
// write the response back to the client using java 8 streams
streamWriter.respond(result, out, this);
} catch (Exception e) {
LOGGER.error("Failed to execute query '{}'.", queries, e);
throw e;
}
}
}

View File

@ -0,0 +1,40 @@
package ch.psi.daq.queryrest.response.msgpack;
import java.io.OutputStream;
import java.util.List;
import java.util.Map.Entry;
import java.util.stream.Stream;
import javax.annotation.Resource;
import javax.servlet.ServletResponse;
import org.apache.commons.lang3.tuple.Triple;
import org.msgpack.jackson.dataformat.MessagePackFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import ch.psi.daq.domain.json.ChannelName;
import ch.psi.daq.domain.query.DAQQueryElement;
import ch.psi.daq.domain.query.backend.BackendQuery;
import ch.psi.daq.domain.query.response.Response;
import ch.psi.daq.queryrest.response.ResponseStreamWriter;
import ch.psi.daq.queryrest.response.json.JSONResponseStreamWriter;
/**
* Takes a Java 8 stream and writes it to the output stream provided by the {@link ServletResponse}
* of the current request.
*/
public class MsgPackResponseStreamWriter implements ResponseStreamWriter {
@Resource
private MessagePackFactory msgPackFactory;
@Resource
private ObjectMapper mapper;
@Override
public void respond(final List<Entry<DAQQueryElement, Stream<Triple<BackendQuery, ChannelName, ?>>>> results,
final OutputStream out, final Response response) throws Exception {
JSONResponseStreamWriter.respond(msgPackFactory, mapper, results, out, response);
}
}

View File

@ -0,0 +1,62 @@
package ch.psi.daq.queryrest.response.msgpack;
import java.io.OutputStream;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map.Entry;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import javax.servlet.ServletResponse;
import org.apache.commons.lang3.tuple.Triple;
import org.msgpack.jackson.dataformat.MessagePackFactory;
import org.springframework.context.ApplicationContext;
import com.fasterxml.jackson.databind.ObjectMapper;
import ch.psi.daq.domain.json.ChannelName;
import ch.psi.daq.domain.query.DAQQueryElement;
import ch.psi.daq.domain.query.backend.BackendQuery;
import ch.psi.daq.domain.query.operation.Aggregation;
import ch.psi.daq.domain.query.response.Response;
import ch.psi.daq.queryrest.config.QueryRestConfig;
import ch.psi.daq.queryrest.response.ResponseStreamWriter;
import ch.psi.daq.queryrest.response.json.JSONTableResponseStreamWriter;
/**
* Takes a Java 8 stream and writes it to the output stream provided by the {@link ServletResponse}
* of the current request.
*/
public class MsgPackTableResponseStreamWriter implements ResponseStreamWriter {
@Resource
private ApplicationContext context;
@Resource
private MessagePackFactory msgPackFactory;
@Resource
private ObjectMapper mapper;
@Resource(name = QueryRestConfig.BEAN_NAME_DEFAULT_RESPONSE_AGGREGATIONS)
private Set<Aggregation> defaultResponseAggregations;
private Set<String> defaultResponseAggregationsStr;
@PostConstruct
public void afterPropertiesSet() {
defaultResponseAggregationsStr =
defaultResponseAggregations.stream().map(Aggregation::name)
.collect(Collectors.toCollection(LinkedHashSet::new));
}
@Override
public void respond(final List<Entry<DAQQueryElement, Stream<Triple<BackendQuery, ChannelName, ?>>>> results,
final OutputStream out, final Response response) throws Exception {
JSONTableResponseStreamWriter.respond(context, msgPackFactory, mapper, defaultResponseAggregationsStr, results, out, response);
}
}

View File

@ -0,0 +1,120 @@
package ch.psi.daq.test.queryrest.controller;
import static org.junit.Assert.assertEquals;
import java.util.List;
import java.util.stream.Collectors;
import javax.annotation.Resource;
import org.junit.After;
import org.junit.Test;
import org.springframework.http.MediaType;
import org.springframework.test.web.servlet.MvcResult;
import org.springframework.test.web.servlet.request.MockMvcRequestBuilders;
import org.springframework.test.web.servlet.result.MockMvcResultHandlers;
import org.springframework.test.web.servlet.result.MockMvcResultMatchers;
import com.fasterxml.jackson.databind.ObjectMapper;
import ch.psi.daq.common.time.TimeUtils;
import ch.psi.daq.domain.DataEvent;
import ch.psi.daq.domain.backend.Backend;
import ch.psi.daq.domain.config.DomainConfig;
import ch.psi.daq.domain.json.ChannelEventTableImpl;
import ch.psi.daq.domain.query.DAQQuery;
import ch.psi.daq.domain.query.mapping.Mapping;
import ch.psi.daq.domain.query.operation.QueryField;
import ch.psi.daq.domain.query.response.Response;
import ch.psi.daq.domain.request.range.RequestRangePulseId;
import ch.psi.daq.test.queryrest.AbstractDaqRestTest;
/**
* Tests the {@link DaqController} implementation.
*/
public abstract class AbstractQueryRestControllerTableTest extends AbstractDaqRestTest {
public static final String TEST_CHANNEL_01 = "testChannel1";
public static final String TEST_CHANNEL_02 = "testChannel2";
public static final String TEST_CHANNEL_WAVEFORM_01 = "testChannelWaveform1";
public static final String TEST_CHANNEL_WAVEFORM_02 = "testChannelWaveform2";
public static final String[] TEST_CHANNEL_NAMES = new String[] {TEST_CHANNEL_01, TEST_CHANNEL_02};
@Resource(name = DomainConfig.BEAN_NAME_BACKEND_DEFAULT)
private Backend backend;
@After
public void tearDown() throws Exception {}
protected abstract ObjectMapper getResponseMapper();
protected abstract Response getResponse();
@Test
public void testPulseRangeQuery() throws Exception {
DAQQuery request = new DAQQuery(
new RequestRangePulseId(
100,
101),
TEST_CHANNEL_NAMES);
request.setMapping(new Mapping());
request.addField(QueryField.pulseId);
request.addField(QueryField.globalSeconds);
request.addField(QueryField.globalMillis);
request.addField(QueryField.iocSeconds);
request.addField(QueryField.iocMillis);
request.addField(QueryField.value);
request.setResponse(getResponse());
String content = mapper.writeValueAsString(request);
System.out.println(content);
MvcResult result = this.mockMvc
.perform(MockMvcRequestBuilders
.post(DomainConfig.PATH_QUERY)
.contentType(MediaType.APPLICATION_JSON)
.content(content))
.andDo(MockMvcResultHandlers.print())
.andExpect(MockMvcResultMatchers.status().isOk())
.andReturn();
byte[] responseBytes = result.getResponse().getContentAsByteArray();
System.out.println("ResponseSize: "+responseBytes.length);
ChannelEventTableImpl table = getResponseMapper().readValue(responseBytes, ChannelEventTableImpl.class);
assertEquals(2, table.size());
List<DataEvent> events = table.getEvents(0).collect(Collectors.toList());
assertEquals(2, events.size());
DataEvent event = events.get(0);
assertEquals(TEST_CHANNEL_01, event.getChannel());
assertEquals(Backend.SF_DATABUFFER, event.getBackend());
assertEquals(100, event.getPulseId());
assertEquals(TimeUtils.getTime(1, 0), event.getGlobalTime());
assertEquals(TimeUtils.getTime(1, 0), event.getIocTime());
assertEquals(100, event.getValue(Number.class).longValue());
event = events.get(1);
assertEquals(TEST_CHANNEL_02, event.getChannel());
assertEquals(Backend.SF_DATABUFFER, event.getBackend());
assertEquals(100, event.getPulseId());
assertEquals(TimeUtils.getTime(1, 0), event.getGlobalTime());
assertEquals(TimeUtils.getTime(1, 0), event.getIocTime());
assertEquals(100, event.getValue(Number.class).longValue());
events = table.getEvents(1).collect(Collectors.toList());
assertEquals(2, events.size());
event = events.get(0);
assertEquals(TEST_CHANNEL_01, event.getChannel());
assertEquals(Backend.SF_DATABUFFER, event.getBackend());
assertEquals(101, event.getPulseId());
assertEquals(TimeUtils.getTime(1, 10000000), event.getGlobalTime());
assertEquals(TimeUtils.getTime(1, 10000000), event.getIocTime());
assertEquals(101, event.getValue(Number.class).longValue());
event = events.get(1);
assertEquals(TEST_CHANNEL_02, event.getChannel());
assertEquals(Backend.SF_DATABUFFER, event.getBackend());
assertEquals(101, event.getPulseId());
assertEquals(TimeUtils.getTime(1, 10000000), event.getGlobalTime());
assertEquals(TimeUtils.getTime(1, 10000000), event.getIocTime());
assertEquals(101, event.getValue(Number.class).longValue());
}
}

View File

@ -0,0 +1,124 @@
package ch.psi.daq.test.queryrest.controller;
import static org.junit.Assert.assertEquals;
import java.util.List;
import java.util.stream.Collectors;
import javax.annotation.Resource;
import org.junit.After;
import org.junit.Test;
import org.springframework.http.MediaType;
import org.springframework.test.web.servlet.MvcResult;
import org.springframework.test.web.servlet.request.MockMvcRequestBuilders;
import org.springframework.test.web.servlet.result.MockMvcResultHandlers;
import org.springframework.test.web.servlet.result.MockMvcResultMatchers;
import com.fasterxml.jackson.databind.ObjectMapper;
import ch.psi.daq.common.time.TimeUtils;
import ch.psi.daq.domain.DataEvent;
import ch.psi.daq.domain.backend.Backend;
import ch.psi.daq.domain.config.DomainConfig;
import ch.psi.daq.domain.json.ChannelEventsImpl;
import ch.psi.daq.domain.json.ChannelEventsList;
import ch.psi.daq.domain.query.DAQQuery;
import ch.psi.daq.domain.query.operation.QueryField;
import ch.psi.daq.domain.query.response.Response;
import ch.psi.daq.domain.request.range.RequestRangePulseId;
import ch.psi.daq.test.queryrest.AbstractDaqRestTest;
/**
* Tests the {@link DaqController} implementation.
*/
public abstract class AbstractQueryRestControllerTest extends AbstractDaqRestTest {
public static final String TEST_CHANNEL_01 = "testChannel1";
public static final String TEST_CHANNEL_02 = "testChannel2";
public static final String TEST_CHANNEL_WAVEFORM_01 = "testChannelWaveform1";
public static final String[] TEST_CHANNEL_NAMES = new String[] {TEST_CHANNEL_01, TEST_CHANNEL_02};
@Resource(name = DomainConfig.BEAN_NAME_BACKEND_DEFAULT)
private Backend backend;
@After
public void tearDown() throws Exception {}
protected abstract ObjectMapper getResponseMapper();
protected abstract Response getResponse();
@Test
public void testPulseRangeQuery() throws Exception {
DAQQuery request = new DAQQuery(
new RequestRangePulseId(
100,
101),
TEST_CHANNEL_NAMES);
request.addField(QueryField.pulseId);
request.addField(QueryField.globalSeconds);
request.addField(QueryField.globalMillis);
request.addField(QueryField.iocSeconds);
request.addField(QueryField.iocMillis);
request.addField(QueryField.value);
request.setResponse(getResponse());
String content = mapper.writeValueAsString(request);
System.out.println(content);
MvcResult result = this.mockMvc
.perform(MockMvcRequestBuilders
.post(DomainConfig.PATH_QUERY)
.contentType(MediaType.APPLICATION_JSON)
.content(content))
.andDo(MockMvcResultHandlers.print())
.andExpect(MockMvcResultMatchers.status().isOk())
.andReturn();
byte[] responseBytes = result.getResponse().getContentAsByteArray();
System.out.println("ResponseSize: "+responseBytes.length);
ChannelEventsList eventsList = getResponseMapper().readValue(responseBytes, ChannelEventsList.class);
assertEquals(2, eventsList.size());
ChannelEventsImpl chEvents = eventsList.get(0);
assertEquals(TEST_CHANNEL_01, chEvents.getChannel().getName());
assertEquals(Backend.SF_DATABUFFER, chEvents.getChannel().getBackend());
List<DataEvent> events = chEvents.getEvents().collect(Collectors.toList());
assertEquals(2, events.size());
DataEvent event = events.get(0);
assertEquals(TEST_CHANNEL_01, event.getChannel());
assertEquals(Backend.SF_DATABUFFER, event.getBackend());
assertEquals(100, event.getPulseId());
assertEquals(TimeUtils.getTime(1, 0), event.getGlobalTime());
assertEquals(TimeUtils.getTime(1, 0), event.getIocTime());
assertEquals(100, event.getValue(Number.class).longValue());
event = events.get(1);
assertEquals(TEST_CHANNEL_01, event.getChannel());
assertEquals(Backend.SF_DATABUFFER, event.getBackend());
assertEquals(101, event.getPulseId());
assertEquals(TimeUtils.getTime(1, 10000000), event.getGlobalTime());
assertEquals(TimeUtils.getTime(1, 10000000), event.getIocTime());
assertEquals(101, event.getValue(Number.class).longValue());
chEvents = eventsList.get(1);
assertEquals(TEST_CHANNEL_02, chEvents.getChannel().getName());
assertEquals(Backend.SF_DATABUFFER, chEvents.getChannel().getBackend());
events = chEvents.getEvents().collect(Collectors.toList());
assertEquals(2, events.size());
event = events.get(0);
assertEquals(TEST_CHANNEL_02, event.getChannel());
assertEquals(Backend.SF_DATABUFFER, event.getBackend());
assertEquals(100, event.getPulseId());
assertEquals(TimeUtils.getTime(1, 0), event.getGlobalTime());
assertEquals(TimeUtils.getTime(1, 0), event.getIocTime());
assertEquals(100, event.getValue(Number.class).longValue());
event = events.get(1);
assertEquals(TEST_CHANNEL_02, event.getChannel());
assertEquals(Backend.SF_DATABUFFER, event.getBackend());
assertEquals(101, event.getPulseId());
assertEquals(TimeUtils.getTime(1, 10000000), event.getGlobalTime());
assertEquals(TimeUtils.getTime(1, 10000000), event.getIocTime());
assertEquals(101, event.getValue(Number.class).longValue());
}
}

View File

@ -0,0 +1,31 @@
package ch.psi.daq.test.queryrest.controller;
import org.junit.After;
import com.fasterxml.jackson.databind.ObjectMapper;
import ch.psi.daq.domain.query.response.Response;
import ch.psi.daq.queryrest.response.bson.BSONHTTPResponse;
import de.undercouch.bson4jackson.BsonFactory;
/**
* Tests the {@link DaqController} implementation.
*/
public class BsonQueryRestControllerTableTest extends AbstractQueryRestControllerTableTest {
private ObjectMapper responseMapper = new ObjectMapper(new BsonFactory());
@After
public void tearDown() throws Exception {}
@Override
protected ObjectMapper getResponseMapper(){
return responseMapper;
}
@Override
protected Response getResponse() {
return new BSONHTTPResponse();
}
}

View File

@ -0,0 +1,31 @@
package ch.psi.daq.test.queryrest.controller;
import org.junit.After;
import com.fasterxml.jackson.databind.ObjectMapper;
import ch.psi.daq.domain.query.response.Response;
import ch.psi.daq.queryrest.response.bson.BSONHTTPResponse;
import de.undercouch.bson4jackson.BsonFactory;
/**
* Tests the {@link DaqController} implementation.
*/
public class BsonQueryRestControllerTest extends AbstractQueryRestControllerTest {
private ObjectMapper responseMapper = new ObjectMapper(new BsonFactory());
@After
public void tearDown() throws Exception {}
@Override
protected ObjectMapper getResponseMapper(){
return responseMapper;
}
@Override
protected Response getResponse() {
return new BSONHTTPResponse();
}
}

View File

@ -44,7 +44,7 @@ import ch.psi.daq.test.queryrest.AbstractDaqRestTest;
/**
* Tests the {@link DaqController} implementation.
*/
public class QueryRestControllerCsvTest extends AbstractDaqRestTest {
public class CSVQueryRestControllerTest extends AbstractDaqRestTest {
public static final String TEST_CHANNEL = "testChannel";
public static final String TEST_CHANNEL_01 = TEST_CHANNEL + "1";

View File

@ -41,7 +41,7 @@ import ch.psi.daq.test.queryrest.AbstractDaqRestTest;
/**
* Tests the {@link DaqController} implementation.
*/
public class QueryRestControllerJsonTableTest extends AbstractDaqRestTest {
public class JsonQueryRestControllerTableTest extends AbstractDaqRestTest {
public static final String TEST_CHANNEL_01 = "testChannel1";
public static final String TEST_CHANNEL_02 = "testChannel2";

View File

@ -38,7 +38,7 @@ import ch.psi.daq.test.queryrest.AbstractDaqRestTest;
/**
* Tests the {@link DaqController} implementation.
*/
public class QueryRestControllerJsonTest extends AbstractDaqRestTest {
public class JsonQueryRestControllerTest extends AbstractDaqRestTest {
public static final String TEST_CHANNEL_01 = "testChannel1";
public static final String TEST_CHANNEL_02 = "testChannel2";

View File

@ -0,0 +1,30 @@
package ch.psi.daq.test.queryrest.controller;
import org.junit.After;
import org.msgpack.jackson.dataformat.MessagePackFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import ch.psi.daq.domain.query.response.Response;
import ch.psi.daq.queryrest.response.msgpack.MsgPackHTTPResponse;
/**
* Tests the {@link DaqController} implementation.
*/
public class MsgPackQueryRestControllerTableTest extends AbstractQueryRestControllerTableTest {
private ObjectMapper responseMapper = new ObjectMapper(new MessagePackFactory());
@After
public void tearDown() throws Exception {}
@Override
protected ObjectMapper getResponseMapper(){
return responseMapper;
}
@Override
protected Response getResponse() {
return new MsgPackHTTPResponse();
}
}

View File

@ -0,0 +1,30 @@
package ch.psi.daq.test.queryrest.controller;
import org.junit.After;
import org.msgpack.jackson.dataformat.MessagePackFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import ch.psi.daq.domain.query.response.Response;
import ch.psi.daq.queryrest.response.msgpack.MsgPackHTTPResponse;
/**
* Tests the {@link DaqController} implementation.
*/
public class MsgPackQueryRestControllerTest extends AbstractQueryRestControllerTest {
private ObjectMapper responseMapper = new ObjectMapper(new MessagePackFactory());
@After
public void tearDown() throws Exception {}
@Override
protected ObjectMapper getResponseMapper(){
return responseMapper;
}
@Override
protected Response getResponse() {
return new MsgPackHTTPResponse();
}
}