New output format.
This commit is contained in:
@ -1,5 +1,5 @@
|
||||
#
|
||||
#Tue Nov 22 17:23:47 CET 2016
|
||||
#Wed Nov 23 17:35:46 CET 2016
|
||||
org.eclipse.jdt.core.compiler.debug.localVariable=generate
|
||||
org.eclipse.jdt.core.compiler.compliance=1.8
|
||||
org.eclipse.jdt.core.compiler.codegen.unusedLocal=preserve
|
||||
|
@ -33,8 +33,8 @@ dependencies {
|
||||
compile libraries.commons_io
|
||||
compile libraries.commons_csv
|
||||
compile libraries.netty_all
|
||||
compile libraries.bson_jackson
|
||||
compile libraries.msgpack_jackson
|
||||
compile libraries.smile_jackson
|
||||
|
||||
testCompile libraries.spring_boot_starter_test
|
||||
testCompile libraries.jsonassert
|
||||
|
@ -27,6 +27,7 @@ import org.springframework.web.servlet.config.annotation.WebMvcConfigurerAdapter
|
||||
import com.fasterxml.jackson.annotation.JsonInclude.Include;
|
||||
import com.fasterxml.jackson.core.JsonFactory;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.dataformat.smile.SmileFactory;
|
||||
|
||||
import ch.psi.daq.common.statistic.Statistics;
|
||||
import ch.psi.daq.domain.DataEvent;
|
||||
@ -48,9 +49,8 @@ import ch.psi.daq.queryrest.response.json.JSONResponseStreamWriter;
|
||||
import ch.psi.daq.queryrest.response.json.JSONTableResponseStreamWriter;
|
||||
import ch.psi.daq.queryrest.response.msgpack.MsgPackResponseStreamWriter;
|
||||
import ch.psi.daq.queryrest.response.msgpack.MsgPackTableResponseStreamWriter;
|
||||
|
||||
import de.undercouch.bson4jackson.BsonFactory;
|
||||
import de.undercouch.bson4jackson.BsonGenerator;
|
||||
import ch.psi.daq.queryrest.response.smile.SmileResponseStreamWriter;
|
||||
import ch.psi.daq.queryrest.response.smile.SmileTableResponseStreamWriter;
|
||||
|
||||
@Configuration
|
||||
@Import(value = QueryRestConfigCORS.class)
|
||||
@ -127,18 +127,13 @@ public class QueryRestConfig extends WebMvcConfigurerAdapter {
|
||||
}
|
||||
|
||||
@Bean
|
||||
public BsonFactory bsonFactory() {
|
||||
BsonFactory factory = new BsonFactory();
|
||||
factory.enable(BsonGenerator.Feature.ENABLE_STREAMING);
|
||||
|
||||
return factory;
|
||||
public MessagePackFactory messagePackFactory() {
|
||||
return new MessagePackFactory();
|
||||
}
|
||||
|
||||
@Bean
|
||||
public MessagePackFactory nessagePackFactory() {
|
||||
MessagePackFactory factory = new MessagePackFactory();
|
||||
|
||||
return factory;
|
||||
public SmileFactory smileFactory() {
|
||||
return new SmileFactory();
|
||||
}
|
||||
|
||||
@Bean
|
||||
@ -165,6 +160,16 @@ public class QueryRestConfig extends WebMvcConfigurerAdapter {
|
||||
public MsgPackTableResponseStreamWriter msgPackTableResponseStreamWriter() {
|
||||
return new MsgPackTableResponseStreamWriter();
|
||||
}
|
||||
|
||||
@Bean
|
||||
public SmileResponseStreamWriter smileResponseStreamWriter() {
|
||||
return new SmileResponseStreamWriter();
|
||||
}
|
||||
|
||||
@Bean
|
||||
public SmileTableResponseStreamWriter smileTableResponseStreamWriter() {
|
||||
return new SmileTableResponseStreamWriter();
|
||||
}
|
||||
|
||||
@Bean
|
||||
public CSVResponseStreamWriter csvResponseStreamWriter() {
|
||||
|
@ -7,6 +7,7 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo;
|
||||
import ch.psi.daq.queryrest.response.csv.CSVHTTPResponse;
|
||||
import ch.psi.daq.queryrest.response.json.JSONHTTPResponse;
|
||||
import ch.psi.daq.queryrest.response.msgpack.MsgPackHTTPResponse;
|
||||
import ch.psi.daq.queryrest.response.smile.SmileHTTPResponse;
|
||||
|
||||
@JsonTypeInfo(
|
||||
use = JsonTypeInfo.Id.NAME,
|
||||
@ -15,6 +16,7 @@ import ch.psi.daq.queryrest.response.msgpack.MsgPackHTTPResponse;
|
||||
@JsonSubTypes({
|
||||
@Type(value = JSONHTTPResponse.class, name = JSONHTTPResponse.FORMAT),
|
||||
@Type(value = MsgPackHTTPResponse.class, name = MsgPackHTTPResponse.FORMAT),
|
||||
@Type(value = SmileHTTPResponse.class, name = SmileHTTPResponse.FORMAT),
|
||||
@Type(value = CSVHTTPResponse.class, name = CSVHTTPResponse.FORMAT)
|
||||
})
|
||||
// see: http://stackoverflow.com/questions/24631923/alternative-to-jackson-jsonsubtypes
|
||||
|
@ -0,0 +1,69 @@
|
||||
package ch.psi.daq.queryrest.response.smile;
|
||||
|
||||
import java.io.OutputStream;
|
||||
import java.util.List;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
|
||||
import org.apache.commons.lang3.tuple.Triple;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.context.ApplicationContext;
|
||||
import org.springframework.http.MediaType;
|
||||
|
||||
import ch.psi.daq.domain.json.ChannelName;
|
||||
import ch.psi.daq.domain.query.DAQQueries;
|
||||
import ch.psi.daq.domain.query.DAQQueryElement;
|
||||
import ch.psi.daq.domain.query.backend.BackendQuery;
|
||||
import ch.psi.daq.domain.query.operation.Compression;
|
||||
import ch.psi.daq.domain.query.response.ResponseFormat;
|
||||
import ch.psi.daq.queryrest.query.QueryManager;
|
||||
import ch.psi.daq.queryrest.response.AbstractHTTPResponse;
|
||||
import ch.psi.daq.queryrest.response.ResponseStreamWriter;
|
||||
import ch.psi.daq.queryrest.response.json.JSONHTTPResponse;
|
||||
|
||||
public class SmileHTTPResponse extends AbstractHTTPResponse {
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(SmileHTTPResponse.class);
|
||||
|
||||
public static final String FORMAT = "smile";
|
||||
public static final String CONTENT_TYPE = MediaType.APPLICATION_OCTET_STREAM_VALUE;
|
||||
|
||||
public SmileHTTPResponse() {
|
||||
super(ResponseFormat.SMILE);
|
||||
}
|
||||
|
||||
public SmileHTTPResponse(Compression compression) {
|
||||
this();
|
||||
setCompression(compression);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void respond(ApplicationContext context, DAQQueries queries, HttpServletResponse response) throws Exception {
|
||||
OutputStream out = handleCompressionAndResponseHeaders(response, CONTENT_TYPE);
|
||||
|
||||
boolean hasMapping = JSONHTTPResponse.validateQueries(queries);
|
||||
|
||||
try {
|
||||
LOGGER.debug("Executing query '{}'", queries);
|
||||
|
||||
QueryManager queryManager = context.getBean(QueryManager.class);
|
||||
ResponseStreamWriter streamWriter;
|
||||
if (hasMapping) {
|
||||
streamWriter = context.getBean(SmileTableResponseStreamWriter.class);
|
||||
} else {
|
||||
streamWriter = context.getBean(SmileResponseStreamWriter.class);
|
||||
}
|
||||
|
||||
// execute query
|
||||
List<Entry<DAQQueryElement, Stream<Triple<BackendQuery, ChannelName, ?>>>> result =
|
||||
queryManager.getEvents(queries);
|
||||
// write the response back to the client using java 8 streams
|
||||
streamWriter.respond(result, out, this);
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("Failed to execute query '{}'.", queries, e);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,40 @@
|
||||
package ch.psi.daq.queryrest.response.smile;
|
||||
|
||||
import java.io.OutputStream;
|
||||
import java.util.List;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import javax.servlet.ServletResponse;
|
||||
|
||||
import org.apache.commons.lang3.tuple.Triple;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.dataformat.smile.SmileFactory;
|
||||
|
||||
import ch.psi.daq.domain.json.ChannelName;
|
||||
import ch.psi.daq.domain.query.DAQQueryElement;
|
||||
import ch.psi.daq.domain.query.backend.BackendQuery;
|
||||
import ch.psi.daq.domain.query.response.Response;
|
||||
import ch.psi.daq.queryrest.response.ResponseStreamWriter;
|
||||
import ch.psi.daq.queryrest.response.json.JSONResponseStreamWriter;
|
||||
|
||||
/**
|
||||
* Takes a Java 8 stream and writes it to the output stream provided by the {@link ServletResponse}
|
||||
* of the current request.
|
||||
*/
|
||||
public class SmileResponseStreamWriter implements ResponseStreamWriter {
|
||||
|
||||
@Resource
|
||||
private SmileFactory smileFactory;
|
||||
|
||||
@Resource
|
||||
private ObjectMapper mapper;
|
||||
|
||||
@Override
|
||||
public void respond(final List<Entry<DAQQueryElement, Stream<Triple<BackendQuery, ChannelName, ?>>>> results,
|
||||
final OutputStream out, final Response response) throws Exception {
|
||||
JSONResponseStreamWriter.respond(smileFactory, mapper, results, out, response);
|
||||
}
|
||||
}
|
@ -0,0 +1,62 @@
|
||||
package ch.psi.daq.queryrest.response.smile;
|
||||
|
||||
import java.io.OutputStream;
|
||||
import java.util.LinkedHashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
import javax.annotation.Resource;
|
||||
import javax.servlet.ServletResponse;
|
||||
|
||||
import org.apache.commons.lang3.tuple.Triple;
|
||||
import org.springframework.context.ApplicationContext;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.dataformat.smile.SmileFactory;
|
||||
|
||||
import ch.psi.daq.domain.json.ChannelName;
|
||||
import ch.psi.daq.domain.query.DAQQueryElement;
|
||||
import ch.psi.daq.domain.query.backend.BackendQuery;
|
||||
import ch.psi.daq.domain.query.operation.Aggregation;
|
||||
import ch.psi.daq.domain.query.response.Response;
|
||||
import ch.psi.daq.queryrest.config.QueryRestConfig;
|
||||
import ch.psi.daq.queryrest.response.ResponseStreamWriter;
|
||||
import ch.psi.daq.queryrest.response.json.JSONTableResponseStreamWriter;
|
||||
|
||||
/**
|
||||
* Takes a Java 8 stream and writes it to the output stream provided by the {@link ServletResponse}
|
||||
* of the current request.
|
||||
*/
|
||||
public class SmileTableResponseStreamWriter implements ResponseStreamWriter {
|
||||
|
||||
@Resource
|
||||
private ApplicationContext context;
|
||||
|
||||
@Resource
|
||||
private SmileFactory smileFactory;
|
||||
|
||||
@Resource
|
||||
private ObjectMapper mapper;
|
||||
|
||||
@Resource(name = QueryRestConfig.BEAN_NAME_DEFAULT_RESPONSE_AGGREGATIONS)
|
||||
private Set<Aggregation> defaultResponseAggregations;
|
||||
|
||||
private Set<String> defaultResponseAggregationsStr;
|
||||
|
||||
@PostConstruct
|
||||
public void afterPropertiesSet() {
|
||||
defaultResponseAggregationsStr =
|
||||
defaultResponseAggregations.stream().map(Aggregation::name)
|
||||
.collect(Collectors.toCollection(LinkedHashSet::new));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void respond(final List<Entry<DAQQueryElement, Stream<Triple<BackendQuery, ChannelName, ?>>>> results,
|
||||
final OutputStream out, final Response response) throws Exception {
|
||||
JSONTableResponseStreamWriter.respond(context, smileFactory, mapper, defaultResponseAggregationsStr, results, out, response);
|
||||
}
|
||||
}
|
@ -0,0 +1,30 @@
|
||||
package ch.psi.daq.test.queryrest.controller;
|
||||
|
||||
import org.junit.After;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.dataformat.smile.SmileFactory;
|
||||
|
||||
import ch.psi.daq.domain.query.response.Response;
|
||||
import ch.psi.daq.queryrest.response.smile.SmileHTTPResponse;
|
||||
|
||||
/**
|
||||
* Tests the {@link DaqController} implementation.
|
||||
*/
|
||||
public class SmileQueryRestControllerTableTest extends AbstractQueryRestControllerTableTest {
|
||||
|
||||
private ObjectMapper responseMapper = new ObjectMapper(new SmileFactory());
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception {}
|
||||
|
||||
@Override
|
||||
protected ObjectMapper getResponseMapper(){
|
||||
return responseMapper;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Response getResponse() {
|
||||
return new SmileHTTPResponse();
|
||||
}
|
||||
}
|
@ -0,0 +1,30 @@
|
||||
package ch.psi.daq.test.queryrest.controller;
|
||||
|
||||
import org.junit.After;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.dataformat.smile.SmileFactory;
|
||||
|
||||
import ch.psi.daq.domain.query.response.Response;
|
||||
import ch.psi.daq.queryrest.response.smile.SmileHTTPResponse;
|
||||
|
||||
/**
|
||||
* Tests the {@link DaqController} implementation.
|
||||
*/
|
||||
public class SmileQueryRestControllerTest extends AbstractQueryRestControllerTest {
|
||||
|
||||
private ObjectMapper responseMapper = new ObjectMapper(new SmileFactory());
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception {}
|
||||
|
||||
@Override
|
||||
protected ObjectMapper getResponseMapper(){
|
||||
return responseMapper;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Response getResponse() {
|
||||
return new SmileHTTPResponse();
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user