ATEST-594

This commit is contained in:
Fabian Märki
2016-11-04 17:17:52 +01:00
parent 3d920ef048
commit 6dbd79bbd0
17 changed files with 2580 additions and 166 deletions

View File

@ -33,8 +33,8 @@ import ch.psi.daq.domain.query.backend.BackendQuery;
import ch.psi.daq.domain.query.backend.analyzer.BackendQueryAnalyzer;
import ch.psi.daq.domain.query.operation.Aggregation;
import ch.psi.daq.domain.query.operation.QueryField;
import ch.psi.daq.domain.query.operation.Response;
import ch.psi.daq.domain.query.operation.aggregation.extrema.AbstractExtremaMeta;
import ch.psi.daq.domain.query.response.Response;
import ch.psi.daq.query.analyzer.BackendQueryAnalyzerImpl;
import ch.psi.daq.query.config.QueryConfig;
import ch.psi.daq.queryrest.controller.validator.QueryValidator;
@ -44,6 +44,7 @@ import ch.psi.daq.queryrest.query.QueryManagerImpl;
import ch.psi.daq.queryrest.response.PolymorphicResponseMixIn;
import ch.psi.daq.queryrest.response.csv.CSVResponseStreamWriter;
import ch.psi.daq.queryrest.response.json.JSONResponseStreamWriter;
import ch.psi.daq.queryrest.response.json.JSONTableResponseStreamWriter;
@Configuration
@Import(value = QueryRestConfigCORS.class)
@ -128,6 +129,11 @@ public class QueryRestConfig extends WebMvcConfigurerAdapter {
public JSONResponseStreamWriter jsonResponseStreamWriter() {
return new JSONResponseStreamWriter();
}
@Bean
public JSONTableResponseStreamWriter jsonTableResponseStreamWriter() {
return new JSONTableResponseStreamWriter();
}
@Bean
public CSVResponseStreamWriter csvResponseStreamWriter() {

View File

@ -39,8 +39,8 @@ import ch.psi.daq.domain.query.operation.Aggregation;
import ch.psi.daq.domain.query.operation.AggregationType;
import ch.psi.daq.domain.query.operation.Compression;
import ch.psi.daq.domain.query.operation.QueryField;
import ch.psi.daq.domain.query.operation.Response;
import ch.psi.daq.domain.query.operation.ResponseFormat;
import ch.psi.daq.domain.query.response.Response;
import ch.psi.daq.domain.query.response.ResponseFormat;
import ch.psi.daq.domain.request.validate.RequestProviderValidator;
import ch.psi.daq.queryrest.query.QueryManager;
import ch.psi.daq.queryrest.response.AbstractHTTPResponse;

View File

@ -10,8 +10,8 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.core.JsonEncoding;
import ch.psi.daq.domain.query.DAQQueries;
import ch.psi.daq.domain.query.operation.ResponseFormat;
import ch.psi.daq.domain.query.operation.ResponseImpl;
import ch.psi.daq.domain.query.response.ResponseFormat;
import ch.psi.daq.domain.query.response.ResponseImpl;
public abstract class AbstractHTTPResponse extends ResponseImpl {

View File

@ -12,6 +12,7 @@ import org.apache.commons.lang3.tuple.Triple;
import ch.psi.daq.domain.json.ChannelName;
import ch.psi.daq.domain.query.DAQQueryElement;
import ch.psi.daq.domain.query.backend.BackendQuery;
import ch.psi.daq.domain.query.response.Response;
public interface ResponseStreamWriter {
@ -21,7 +22,8 @@ public interface ResponseStreamWriter {
*
* @param results The results results
* @param out The OutputStream
* @param response The Response
* @throws Exception thrown if writing to the output stream fails
*/
public void respond(List<Entry<DAQQueryElement, Stream<Triple<BackendQuery, ChannelName, ?>>>> results, OutputStream out) throws Exception;
public void respond(List<Entry<DAQQueryElement, Stream<Triple<BackendQuery, ChannelName, ?>>>> results, OutputStream out, Response response) throws Exception;
}

View File

@ -19,11 +19,10 @@ import ch.psi.daq.domain.json.ChannelName;
import ch.psi.daq.domain.query.DAQQueries;
import ch.psi.daq.domain.query.DAQQueryElement;
import ch.psi.daq.domain.query.backend.BackendQuery;
import ch.psi.daq.domain.query.backend.BackendQueryImpl;
import ch.psi.daq.domain.query.operation.AggregationType;
import ch.psi.daq.domain.query.operation.Compression;
import ch.psi.daq.domain.query.operation.QueryField;
import ch.psi.daq.domain.query.operation.ResponseFormat;
import ch.psi.daq.domain.query.response.ResponseFormat;
import ch.psi.daq.queryrest.query.QueryManager;
import ch.psi.daq.queryrest.response.AbstractHTTPResponse;
@ -43,7 +42,8 @@ public class CSVHTTPResponse extends AbstractHTTPResponse {
}
@Override
public void respond(ApplicationContext context, DAQQueries queries, HttpServletResponse httpResponse) throws Exception {
public void respond(ApplicationContext context, DAQQueries queries, HttpServletResponse httpResponse)
throws Exception {
OutputStream out = handleCompressionAndResponseHeaders(httpResponse, CONTENT_TYPE);
// do csv specific validations
@ -59,7 +59,7 @@ public class CSVHTTPResponse extends AbstractHTTPResponse {
List<Entry<DAQQueryElement, Stream<Triple<BackendQuery, ChannelName, ?>>>> result =
queryManager.getEvents(queries);
// write the response back to the client using java 8 streams
streamWriter.respond(result, out);
streamWriter.respond(result, out, this);
} catch (Exception e) {
LOGGER.error("Failed to execute query '{}'.", queries, e);
throw e;
@ -68,7 +68,8 @@ public class CSVHTTPResponse extends AbstractHTTPResponse {
protected void validateQueries(DAQQueries queries) {
for (DAQQueryElement query : queries) {
if (!(query.getAggregation() == null || AggregationType.value.equals(query.getAggregation().getAggregationType()))) {
if (!(query.getAggregation() == null || AggregationType.value.equals(query.getAggregation()
.getAggregationType()))) {
// We allow only no aggregation or value aggregation as
// extrema: nested structure and not clear how to map it to one line
// index: value is an array of Statistics whose size is not clear at initialization time

View File

@ -28,18 +28,24 @@ import org.apache.commons.lang3.tuple.Pair;
import org.apache.commons.lang3.tuple.Triple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext;
import ch.psi.daq.common.stream.StreamIterable;
import ch.psi.daq.common.stream.StreamMatcher;
import ch.psi.daq.common.stream.match.MapCreator;
import ch.psi.daq.common.stream.match.MapFiller;
import ch.psi.daq.common.stream.match.Padder;
import ch.psi.daq.common.stream.match.StreamMatcher;
import ch.psi.daq.domain.DataEvent;
import ch.psi.daq.domain.json.ChannelName;
import ch.psi.daq.domain.query.DAQQueryElement;
import ch.psi.daq.domain.query.backend.BackendQuery;
import ch.psi.daq.domain.query.backend.BackendQueryImpl;
import ch.psi.daq.domain.query.backend.analyzer.BackendQueryAnalyzer;
import ch.psi.daq.domain.query.mapping.Mapping;
import ch.psi.daq.domain.query.mapping.IncompleteStrategy;
import ch.psi.daq.domain.query.operation.Aggregation;
import ch.psi.daq.domain.query.operation.Extrema;
import ch.psi.daq.domain.query.operation.QueryField;
import ch.psi.daq.domain.query.response.Response;
import ch.psi.daq.queryrest.response.ResponseStreamWriter;
/**
@ -49,41 +55,52 @@ import ch.psi.daq.queryrest.response.ResponseStreamWriter;
public class CSVResponseStreamWriter implements ResponseStreamWriter {
private static final Logger LOGGER = LoggerFactory.getLogger(CSVResponseStreamWriter.class);
public static final Mapping DEFAULT_MAPPING = new Mapping(IncompleteStrategy.FILL_NULL);
public static final char DELIMITER_CVS = ';';
public static final String DELIMITER_ARRAY = ",";
public static final char DELIMITER_CHANNELNAME_FIELDNAME = '.';
public static final String EMPTY_VALUE = "";
public static final String FIELDNAME_EXTREMA = "extrema";
private static final Function<Pair<ChannelName, DataEvent>, ChannelName> KEY_PROVIDER = (pair) -> pair.getKey();
private static final Function<DataEvent, ChannelName> KEY_PROVIDER = (event) -> new ChannelName(event.getChannel(), event.getBackend());
// try to match sync data (bsread) with non sync data (epics) based on the time usin 10 millis
// buckets.
private static final ToLongFunction<Pair<ChannelName, DataEvent>> MATCHER_PROVIDER = (pair) -> pair.getValue()
.getGlobalMillis() / 10L;
private static final ToLongFunction<DataEvent> MATCHER_PROVIDER = (event) -> event.getGlobalMillis() / 10L;
@Resource
private ApplicationContext context;
@Resource
private Function<BackendQuery, BackendQueryAnalyzer> queryAnalizerFactory;
@Override
public void respond(final List<Entry<DAQQueryElement, Stream<Triple<BackendQuery, ChannelName, ?>>>> results,
final OutputStream out) throws Exception {
final OutputStream out, final Response response) throws Exception {
if(results.size() > 1){
throw new IllegalStateException("CSV format does not allow for multiple queries.");
}
AtomicReference<Exception> exception = new AtomicReference<>();
final Map<ChannelName, Stream<Pair<ChannelName, DataEvent>>> streams = new LinkedHashMap<>(results.size());
final Map<ChannelName, Stream<DataEvent>> streams = new LinkedHashMap<>(results.size());
final List<String> header = new ArrayList<>();
final Collection<Pair<ChannelName, Function<DataEvent, String>>> accessors = new ArrayList<>();
final AtomicReference<DAQQueryElement> daqQueryRef = new AtomicReference<>();
final AtomicReference<BackendQuery> backendQueryRef = new AtomicReference<>();
// prepare the streams
/* get DataEvent stream of all sub-queries for later match */
results.forEach(entry -> {
final DAQQueryElement query = entry.getKey();
daqQueryRef.compareAndSet(null, query);
entry.getValue()
.sequential()
.forEach(triple -> {
backendQueryRef.compareAndSet(null, triple.getLeft());
if (triple.getRight() instanceof Stream) {
setupChannelColumns(query, triple.getLeft(), triple.getMiddle(), header, accessors);
final Stream<Pair<ChannelName, DataEvent>> eventStream = ((Stream<DataEvent>) triple.getRight())
.map(dataEvent -> Pair.of(triple.getMiddle(), dataEvent));
final Stream<DataEvent> eventStream = ((Stream<DataEvent>) triple.getRight());
streams.put(triple.getMiddle(), eventStream);
} else {
final String message = String.format("Expect a DataEvent Stream for '%s'.", triple.getMiddle());
@ -92,10 +109,20 @@ public class CSVResponseStreamWriter implements ResponseStreamWriter {
});
});
Mapping mapping = daqQueryRef.get().getMappingOrDefault(DEFAULT_MAPPING);
Padder<ChannelName, DataEvent> padder = mapping.getIncomplete().getPadder(context, backendQueryRef.get());
// online matching of the stream's content
StreamMatcher<ChannelName, Pair<ChannelName, DataEvent>> streamMatcher =
new StreamMatcher<>(KEY_PROVIDER, MATCHER_PROVIDER, streams.values());
Iterator<Map<ChannelName, Pair<ChannelName, DataEvent>>> streamsMatchIter = streamMatcher.iterator();
StreamMatcher<ChannelName, DataEvent, Map<ChannelName, DataEvent>> streamMatcher =
new StreamMatcher<>(
KEY_PROVIDER,
MATCHER_PROVIDER,
new MapCreator<>(),
new MapFiller<>(),
null,
padder,
streams.values());
Iterator<Map<ChannelName, DataEvent>> streamsMatchIter = streamMatcher.iterator();
// prepare csv output
CSVFormat csvFormat = CSVFormat.EXCEL.withDelimiter(DELIMITER_CVS);
@ -111,14 +138,14 @@ public class CSVResponseStreamWriter implements ResponseStreamWriter {
csvFilePrinter.printRecord(header);
while (streamsMatchIter.hasNext()) {
final Map<ChannelName, Pair<ChannelName, DataEvent>> match = streamsMatchIter.next();
final Map<ChannelName, DataEvent> match = streamsMatchIter.next();
// ensure correct order
Stream<String> rowStream = accessors.stream().sequential()
.map(accessorPair -> {
Pair<ChannelName, DataEvent> eventPair = match.get(accessorPair.getKey());
if (eventPair != null) {
return accessorPair.getValue().apply(eventPair.getValue());
DataEvent event = match.get(accessorPair.getKey());
if (event != null) {
return accessorPair.getValue().apply(event);
} else {
return EMPTY_VALUE;
}

View File

@ -1,6 +1,7 @@
package ch.psi.daq.queryrest.response.json;
import java.io.OutputStream;
import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;
import java.util.stream.Stream;
@ -18,9 +19,10 @@ import ch.psi.daq.domain.query.DAQQueries;
import ch.psi.daq.domain.query.DAQQueryElement;
import ch.psi.daq.domain.query.backend.BackendQuery;
import ch.psi.daq.domain.query.operation.Compression;
import ch.psi.daq.domain.query.operation.ResponseFormat;
import ch.psi.daq.domain.query.response.ResponseFormat;
import ch.psi.daq.queryrest.query.QueryManager;
import ch.psi.daq.queryrest.response.AbstractHTTPResponse;
import ch.psi.daq.queryrest.response.ResponseStreamWriter;
public class JSONHTTPResponse extends AbstractHTTPResponse {
private static final Logger LOGGER = LoggerFactory.getLogger(JSONHTTPResponse.class);
@ -31,7 +33,7 @@ public class JSONHTTPResponse extends AbstractHTTPResponse {
public JSONHTTPResponse() {
super(ResponseFormat.JSON);
}
public JSONHTTPResponse(Compression compression) {
this();
setCompression(compression);
@ -41,16 +43,31 @@ public class JSONHTTPResponse extends AbstractHTTPResponse {
public void respond(ApplicationContext context, DAQQueries queries, HttpServletResponse response) throws Exception {
OutputStream out = handleCompressionAndResponseHeaders(response, CONTENT_TYPE);
boolean hasMapping = false;
Iterator<DAQQueryElement> iter = queries.getQueries().iterator();
while (!hasMapping && iter.hasNext()) {
DAQQueryElement daqQueryElement = iter.next();
if (daqQueryElement.getMapping() != null) {
hasMapping = true;
}
}
try {
LOGGER.debug("Executing query '{}'", queries);
QueryManager queryManager = context.getBean(QueryManager.class);
JSONResponseStreamWriter streamWriter = context.getBean(JSONResponseStreamWriter.class);
ResponseStreamWriter streamWriter;
if (hasMapping) {
streamWriter = context.getBean(JSONTableResponseStreamWriter.class);
} else {
streamWriter = context.getBean(JSONResponseStreamWriter.class);
}
// execute query
List<Entry<DAQQueryElement, Stream<Triple<BackendQuery, ChannelName, ?>>>> result = queryManager.getEvents(queries);
List<Entry<DAQQueryElement, Stream<Triple<BackendQuery, ChannelName, ?>>>> result =
queryManager.getEvents(queries);
// write the response back to the client using java 8 streams
streamWriter.respond(result, out);
streamWriter.respond(result, out, this);
} catch (Exception e) {
LOGGER.error("Failed to execute query '{}'.", queries, e);
throw e;

View File

@ -29,6 +29,7 @@ import ch.psi.daq.domain.query.backend.BackendQuery;
import ch.psi.daq.domain.query.operation.Aggregation;
import ch.psi.daq.domain.query.operation.Extrema;
import ch.psi.daq.domain.query.operation.QueryField;
import ch.psi.daq.domain.query.response.Response;
import ch.psi.daq.queryrest.response.ResponseStreamWriter;
/**
@ -48,8 +49,8 @@ public class JSONResponseStreamWriter implements ResponseStreamWriter {
private ObjectMapper mapper;
@Override
public void respond(List<Entry<DAQQueryElement, Stream<Triple<BackendQuery, ChannelName, ?>>>> results,
OutputStream out) throws Exception {
public void respond(final List<Entry<DAQQueryElement, Stream<Triple<BackendQuery, ChannelName, ?>>>> results,
final OutputStream out, final Response response) throws Exception {
AtomicReference<Exception> exception = new AtomicReference<>();
JsonGenerator generator = jsonFactory.createGenerator(out, JsonEncoding.UTF8);
@ -61,8 +62,8 @@ public class JSONResponseStreamWriter implements ResponseStreamWriter {
results
.forEach(entryy -> {
DAQQueryElement daqQuery = entryy.getKey();
Set<String> includedFields = getFields(daqQuery);
ObjectWriter writer = configureWriter(includedFields);
Set<String> includedFields = getFields(daqQuery, true);
ObjectWriter writer = configureWriter(includedFields, mapper);
try {
generator.writeStartArray();
@ -106,7 +107,7 @@ public class JSONResponseStreamWriter implements ResponseStreamWriter {
}
}
protected Set<String> getFields(DAQQueryElement query) {
public static Set<String> getFields(DAQQueryElement query, boolean removeIdentifiers) {
Set<QueryField> queryFields = query.getFields();
List<Aggregation> aggregations = query.getAggregation() != null ? query.getAggregation().getAggregations() : null;
List<Extrema> extrema = query.getAggregation() != null ? query.getAggregation().getExtrema() : null;
@ -129,8 +130,11 @@ public class JSONResponseStreamWriter implements ResponseStreamWriter {
includedFields.add("extrema");
}
// do not write channel since it is already provided as key in mapping
includedFields.remove(QueryField.channel.name());
if (removeIdentifiers) {
// do not write channel/backend since it is already provided as key in mapping
includedFields.remove(QueryField.channel.name());
includedFields.remove(QueryField.backend.name());
}
return includedFields;
}
@ -141,9 +145,10 @@ public class JSONResponseStreamWriter implements ResponseStreamWriter {
*
* @param includedFields set of strings which correspond to the getter method names of the
* classes registered as a mixed-in
* @param mapper The ObjectMapper
* @return the configured writer that includes the specified fields
*/
private ObjectWriter configureWriter(Set<String> includedFields) {
public static ObjectWriter configureWriter(Set<String> includedFields, ObjectMapper mapper) {
SimpleFilterProvider propertyFilter = new SimpleFilterProvider();
propertyFilter.addFilter("namedPropertyFilter", SimpleBeanPropertyFilter.filterOutAllExcept(includedFields));
// only write the properties not excluded in the filter

View File

@ -0,0 +1,195 @@
package ch.psi.daq.queryrest.response.json;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.ToLongFunction;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import javax.servlet.ServletResponse;
import org.apache.commons.lang3.tuple.Triple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext;
import com.fasterxml.jackson.core.JsonEncoding;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
import ch.psi.daq.common.stream.match.ListCreator;
import ch.psi.daq.common.stream.match.ListFiller;
import ch.psi.daq.common.stream.match.Padder;
import ch.psi.daq.common.stream.match.StreamMatcher;
import ch.psi.daq.common.time.TimeUtils;
import ch.psi.daq.domain.DataEvent;
import ch.psi.daq.domain.json.ChannelName;
import ch.psi.daq.domain.query.DAQQueryElement;
import ch.psi.daq.domain.query.backend.BackendQuery;
import ch.psi.daq.domain.query.bin.BinningStrategy;
import ch.psi.daq.domain.query.bin.strategy.BinningStrategyPerBinPulse;
import ch.psi.daq.domain.query.bin.strategy.BinningStrategyPerBinTime;
import ch.psi.daq.domain.query.mapping.Mapping;
import ch.psi.daq.domain.query.mapping.IncompleteStrategy;
import ch.psi.daq.domain.query.operation.Aggregation;
import ch.psi.daq.domain.query.operation.QueryField;
import ch.psi.daq.domain.query.response.Response;
import ch.psi.daq.domain.request.range.RequestRange;
import ch.psi.daq.query.bin.aggregate.BinnedValueCombiner;
import ch.psi.daq.queryrest.config.QueryRestConfig;
import ch.psi.daq.queryrest.response.ResponseStreamWriter;
/**
* Takes a Java 8 stream and writes it to the output stream provided by the {@link ServletResponse}
* of the current request.
*/
public class JSONTableResponseStreamWriter implements ResponseStreamWriter {
private static final Logger LOGGER = LoggerFactory.getLogger(JSONTableResponseStreamWriter.class);
public static final Mapping DEFAULT_MAPPING = new Mapping(IncompleteStrategy.KEEP_AS_IS);
private static final long MILLIS_PER_PULSE = TimeUtils.MILLIS_PER_PULSE;
private static final Function<DataEvent, ChannelName> KEY_PROVIDER = (event) -> new ChannelName(event.getChannel(),
event.getBackend());
// try to match sync data (bsread) with non sync data (epics) based on the time usin 10 millis
// buckets.
private static final ToLongFunction<DataEvent> MATCHER_PROVIDER = (event) -> event.getGlobalMillis()
/ MILLIS_PER_PULSE;
@Resource
private ApplicationContext context;
@Resource
private JsonFactory jsonFactory;
@Resource
private ObjectMapper mapper;
@Resource(name = QueryRestConfig.BEAN_NAME_DEFAULT_RESPONSE_AGGREGATIONS)
private Set<Aggregation> defaultResponseAggregations;
private Set<String> defaultResponseAggregationsStr;
@PostConstruct
public void afterPropertiesSet() {
defaultResponseAggregationsStr =
defaultResponseAggregations.stream().map(Aggregation::name)
.collect(Collectors.toCollection(LinkedHashSet::new));
}
@Override
public void respond(final List<Entry<DAQQueryElement, Stream<Triple<BackendQuery, ChannelName, ?>>>> results,
final OutputStream out, final Response response) throws Exception {
AtomicReference<Exception> exception = new AtomicReference<>();
JsonGenerator generator = jsonFactory.createGenerator(out, JsonEncoding.UTF8);
try {
if (results.size() > 1) {
generator.writeStartArray();
}
results
.forEach(entryy -> {
DAQQueryElement daqQuery = entryy.getKey();
Set<String> includedFields = JSONResponseStreamWriter.getFields(daqQuery, false);
/* make sure identifiers are available */
includedFields.add(QueryField.channel.name());
includedFields.add(QueryField.backend.name());
includedFields.addAll(defaultResponseAggregationsStr);
ObjectWriter writer = JSONResponseStreamWriter.configureWriter(includedFields, mapper);
/* get DataEvent stream of sub-queries for later match */
final Map<ChannelName, Stream<DataEvent>> streams =
new LinkedHashMap<>(results.size());
final AtomicReference<BackendQuery> backendQueryRef = new AtomicReference<>();
entryy.getValue()
.sequential()
.forEach(
triple -> {
backendQueryRef.compareAndSet(null, triple.getLeft());
if (triple.getRight() instanceof Stream) {
streams.put(triple.getMiddle(), ((Stream<DataEvent>) triple.getRight()));
} else {
final String message =
String.format("Expect a DataEvent Stream for '%s' but got '%s'.",
triple.getMiddle(), triple.getRight().getClass().getSimpleName());
LOGGER.warn(message);
streams.put(triple.getMiddle(), Stream.empty());
}
});
BackendQuery backendQuery = backendQueryRef.get();
RequestRange requestRange = backendQuery.getRequest().getRequestRange();
BinningStrategy binningStrategy = backendQuery.getBinningStrategy();
Mapping mapping = daqQuery.getMappingOrDefault(DEFAULT_MAPPING);
Padder<ChannelName, DataEvent> padder = mapping.getIncomplete().getPadder(context, backendQuery);
ToLongFunction<DataEvent> matchProvider = binningStrategy;
if (binningStrategy == null) {
matchProvider = MATCHER_PROVIDER;
if (requestRange.isPulseIdRangeDefined()) {
binningStrategy = new BinningStrategyPerBinPulse(1);
} else if (requestRange.isTimeRangeDefined()) {
binningStrategy = new BinningStrategyPerBinTime(MILLIS_PER_PULSE);
} else {
String message = "Either time or pulseId range must be defined by the query!";
LOGGER.error(message);
throw new IllegalStateException(message);
}
}
binningStrategy.setRequestRange(requestRange);
/* online matching of the stream's content */
StreamMatcher<ChannelName, DataEvent, List<DataEvent>> streamMatcher =
new StreamMatcher<>(
KEY_PROVIDER,
matchProvider,
new ListCreator<ChannelName, DataEvent>(),
new ListFiller<ChannelName, DataEvent>(),
new BinnedValueCombiner(binningStrategy),
padder,
streams.values());
Iterator<List<DataEvent>> streamsMatchIter = streamMatcher.iterator();
try {
writer.writeValue(generator, streamsMatchIter);
} catch (Exception e) {
LOGGER.error("Exception while writing json for '{}'", daqQuery.getChannels(), e);
exception.compareAndSet(null, e);
}
});
} catch (IOException e) {
LOGGER.error("Could not write JSON.", e);
exception.compareAndSet(null, e);
} finally {
if (results.size() > 1) {
generator.writeEndArray();
}
generator.flush();
generator.close();
}
if (exception.get() != null) {
throw exception.get();
}
}
}

View File

@ -1,6 +1,6 @@
# defines the fields that are included in the response
# if no fields have been specified by the user
queryrest.default.response.fields=channel,pulseId,globalSeconds,iocSeconds,shape,eventCount,value
queryrest.default.response.fields=channel,backend,pulseId,globalSeconds,iocSeconds,shape,eventCount,value
# aggregation which are included in the response by default if aggregation is enabled for a given query
queryrest.default.response.aggregations=min,mean,max

View File

@ -26,9 +26,7 @@ import org.springframework.test.web.servlet.result.MockMvcResultMatchers;
import ch.psi.daq.common.ordering.Ordering;
import ch.psi.daq.common.time.TimeUtils;
import ch.psi.daq.domain.config.DomainConfig;
import ch.psi.daq.domain.query.DAQQueries;
import ch.psi.daq.domain.query.DAQQuery;
import ch.psi.daq.domain.query.DAQQueryElement;
import ch.psi.daq.domain.query.operation.Aggregation;
import ch.psi.daq.domain.query.operation.AggregationDescriptor;
import ch.psi.daq.domain.query.operation.AggregationType;
@ -323,98 +321,103 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest {
}
}
@Test
public void testPulseRangeQueries() throws Exception {
List<String> channels = Arrays.asList(TEST_CHANNEL_01, TEST_CHANNEL_02);
String testChannel3 = "testChannel3";
DAQQueries request = new DAQQueries(
new DAQQueryElement(
new RequestRangePulseId(
0,
1),
channels),
new DAQQueryElement(
new RequestRangePulseId(
0,
1),
testChannel3));
request.setResponse(new CSVHTTPResponse());
channels = Arrays.asList(TEST_CHANNEL_01, TEST_CHANNEL_02, testChannel3);
LinkedHashSet<QueryField> queryFields = new LinkedHashSet<>();
queryFields.add(QueryField.channel);
queryFields.add(QueryField.pulseId);
queryFields.add(QueryField.iocSeconds);
queryFields.add(QueryField.iocMillis);
queryFields.add(QueryField.globalSeconds);
queryFields.add(QueryField.globalMillis);
queryFields.add(QueryField.shape);
queryFields.add(QueryField.eventCount);
queryFields.add(QueryField.value);
for (DAQQueryElement element : request) {
element.setFields(queryFields);
}
String content = mapper.writeValueAsString(request);
System.out.println(content);
MvcResult result = this.mockMvc
.perform(MockMvcRequestBuilders
.post(DomainConfig.PATH_QUERIES)
.contentType(MediaType.APPLICATION_JSON)
.content(content))
.andDo(MockMvcResultHandlers.print())
.andExpect(MockMvcResultMatchers.status().isOk())
.andReturn();
String response = result.getResponse().getContentAsString();
System.out.println("Response: " + response);
CSVFormat csvFormat = CSVFormat.EXCEL.withDelimiter(CSVResponseStreamWriter.DELIMITER_CVS);
StringReader reader = new StringReader(response);
CSVParser csvParser = new CSVParser(reader, csvFormat);
try {
long pulse = 0;
int totalRows = 2;
List<CSVRecord> records = csvParser.getRecords();
assertEquals(totalRows + 1, records.size());
// remove header
CSVRecord record = records.remove(0);
assertEquals(queryFields.size() * channels.size(), record.size());
int column = 0;
for (String channel : channels) {
for (QueryField queryField : queryFields) {
assertEquals(channel + CSVResponseStreamWriter.DELIMITER_CHANNELNAME_FIELDNAME + queryField.name(),
record.get(column++));
}
}
for (int row = 0; row < totalRows; ++row) {
record = records.get(row);
assertEquals(queryFields.size() * channels.size(), record.size());
column = 0;
for (String channel : channels) {
assertEquals(channel, record.get(column++));
assertEquals("" + pulse, record.get(column++));
assertEquals(TimeUtils.getTimeStr(TestTimeUtils.getTimeFromPulseId(pulse)), record.get(column++));
assertEquals("" + TimeUtils.getMillis(TestTimeUtils.getTimeFromPulseId(pulse)), record.get(column++));
assertEquals(TimeUtils.getTimeStr(TestTimeUtils.getTimeFromPulseId(pulse)), record.get(column++));
assertEquals("" + TimeUtils.getMillis(TestTimeUtils.getTimeFromPulseId(pulse)), record.get(column++));
assertEquals("[1]", record.get(column++));
assertEquals("1", record.get(column++));
assertEquals("" + pulse, record.get(column++));
}
++pulse;
}
} finally {
reader.close();
csvParser.close();
}
}
// @Test
// public void testPulseRangeQueries() throws Exception {
// List<String> channels = Arrays.asList(TEST_CHANNEL_01, TEST_CHANNEL_02);
// String testChannel3 = "testChannel3";
// DAQQueries request = new DAQQueries(
// new DAQQueryElement(
// new RequestRangePulseId(
// 0,
// 1),
// channels),
// new DAQQueryElement(
// new RequestRangePulseId(
// 0,
// 1),
// testChannel3));
// request.setResponse(new CSVHTTPResponse());
// channels = Arrays.asList(TEST_CHANNEL_01, TEST_CHANNEL_02, testChannel3);
//
// LinkedHashSet<QueryField> queryFields = new LinkedHashSet<>();
// queryFields.add(QueryField.channel);
// queryFields.add(QueryField.pulseId);
// queryFields.add(QueryField.iocSeconds);
// queryFields.add(QueryField.iocMillis);
// queryFields.add(QueryField.globalSeconds);
// queryFields.add(QueryField.globalMillis);
// queryFields.add(QueryField.shape);
// queryFields.add(QueryField.eventCount);
// queryFields.add(QueryField.value);
// for (DAQQueryElement element : request) {
// element.setFields(queryFields);
// }
//
// String content = mapper.writeValueAsString(request);
// System.out.println(content);
//
// MvcResult result = this.mockMvc
// .perform(MockMvcRequestBuilders
// .post(DomainConfig.PATH_QUERIES)
// .contentType(MediaType.APPLICATION_JSON)
// .content(content))
// .andDo(MockMvcResultHandlers.print())
// .andExpect(MockMvcResultMatchers.status().isOk())
// .andReturn();
//
// String response = result.getResponse().getContentAsString();
// System.out.println("Response: " + response);
//
// CSVFormat csvFormat = CSVFormat.EXCEL.withDelimiter(CSVResponseStreamWriter.DELIMITER_CVS);
// StringReader reader = new StringReader(response);
// CSVParser csvParser = new CSVParser(reader, csvFormat);
//
// try {
// long pulse = 0;
// int totalRows = 2;
//
// List<CSVRecord> records = csvParser.getRecords();
// assertEquals(totalRows + 1, records.size());
// // remove header
// CSVRecord record = records.remove(0);
// assertEquals(queryFields.size() * channels.size(), record.size());
// int column = 0;
// for (String channel : channels) {
// for (QueryField queryField : queryFields) {
// assertEquals(channel + CSVResponseStreamWriter.DELIMITER_CHANNELNAME_FIELDNAME +
// queryField.name(),
// record.get(column++));
// }
// }
//
// for (int row = 0; row < totalRows; ++row) {
// record = records.get(row);
//
// assertEquals(queryFields.size() * channels.size(), record.size());
//
// column = 0;
// for (String channel : channels) {
// assertEquals(channel, record.get(column++));
// assertEquals("" + pulse, record.get(column++));
// assertEquals(TimeUtils.getTimeStr(TestTimeUtils.getTimeFromPulseId(pulse)),
// record.get(column++));
// assertEquals("" + TimeUtils.getMillis(TestTimeUtils.getTimeFromPulseId(pulse)),
// record.get(column++));
// assertEquals(TimeUtils.getTimeStr(TestTimeUtils.getTimeFromPulseId(pulse)),
// record.get(column++));
// assertEquals("" + TimeUtils.getMillis(TestTimeUtils.getTimeFromPulseId(pulse)),
// record.get(column++));
// assertEquals("[1]", record.get(column++));
// assertEquals("1", record.get(column++));
// assertEquals("" + pulse, record.get(column++));
// }
// ++pulse;
// }
// } finally {
// reader.close();
// csvParser.close();
// }
// }
@Test
public void testPulseRangeQueryWaveform() throws Exception {

View File

@ -373,15 +373,16 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest {
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].pulseId").value(101))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].globalSeconds").value(
TestTimeUtils.getTimeStr(1, 10000000)))
// .andExpect(MockMvcResultMatchers.jsonPath("$[1]").exists())
// .andExpect(MockMvcResultMatchers.jsonPath("$[1].channel").isMap())
// .andExpect(MockMvcResultMatchers.jsonPath("$[1].channel.name").value(TEST_CHANNEL_02))
// .andExpect(MockMvcResultMatchers.jsonPath("$[1].data").isArray())
// .andExpect(MockMvcResultMatchers.jsonPath("$[1].data[0].pulseId").value(10))
// .andExpect(MockMvcResultMatchers.jsonPath("$[1].data[0].globalMillis").value(100))
// .andExpect(MockMvcResultMatchers.jsonPath("$[1].data[1].pulseId").value(11))
// .andExpect(MockMvcResultMatchers.jsonPath("$[1].data[1].globalMillis").value(110))
;
.andExpect(MockMvcResultMatchers.jsonPath("$[1]").exists())
.andExpect(MockMvcResultMatchers.jsonPath("$[1].channel").isMap())
.andExpect(MockMvcResultMatchers.jsonPath("$[1].channel.name").value(TEST_CHANNEL_02))
.andExpect(MockMvcResultMatchers.jsonPath("$[1].data").isArray())
.andExpect(MockMvcResultMatchers.jsonPath("$[1].data[0].pulseId").value(100))
.andExpect(MockMvcResultMatchers.jsonPath("$[1].data[0].globalSeconds").value(
TestTimeUtils.getTimeStr(1, 0)))
.andExpect(MockMvcResultMatchers.jsonPath("$[1].data[1].pulseId").value(101))
.andExpect(MockMvcResultMatchers.jsonPath("$[1].data[1].globalSeconds").value(
TestTimeUtils.getTimeStr(1, 10000000)));
}
@Test

View File

@ -84,8 +84,8 @@ public abstract class AbstractStreamEventReader implements StreamEventReader<Cha
"Float64Waveform",
"StringScalar");
}
protected void init(Backend backend){
protected void init(Backend backend) {
this.backend = backend;
this.dataGen = testBackendAccess.getTestDataGen(backend);
this.testChannelName = backend.getKey() + "_TestChannel_";
@ -120,7 +120,8 @@ public abstract class AbstractStreamEventReader implements StreamEventReader<Cha
@Override
public Stream<ChannelEvent> getEventStream(TimeRangeQuery query) {
return getDummyEventStream(query.getChannel(), query.getStartMillis() / 10, query.getEndMillis() / 10)
return getDummyEventStream(query.getChannel(), getBackend(), query.getStartMillis() / 10,
query.getEndMillis() / 10)
.filter(query.getFilterOrDefault(EventQuery.NO_OP_FILTER));
}
@ -136,39 +137,50 @@ public abstract class AbstractStreamEventReader implements StreamEventReader<Cha
return result;
}
public static Stream<ChannelEvent> getDummyEventStream(String channelParam, long startIndex, long endIndex,
public static Stream<ChannelEvent> getDummyEventStream(String channelParam, Backend backend, long startIndex,
long endIndex,
String... columns) {
String channelLower = channelParam.toLowerCase();
String channel =
(columns == null || columns.length == 0 || ArrayUtils.contains(columns, FieldNames.FIELD_CHANNEL)) ? channelParam
: null;
Stream<Long> rangeStream;
LongStream millisRangeStream = null;
if (channelParam.contains("[") && channelParam.contains("]")) {
rangeStream =
millisRangeStream =
Arrays.stream(
channelParam.substring(
channelParam.indexOf("[") + 1,
channelParam.indexOf("]"))
.split(",")
)
.map(str -> str.trim())
.map(Long::parseLong);
.mapToLong(str -> Long.parseLong(str.trim()) * 10);
} else if (channelParam.contains("{") && channelParam.contains("}")) {
millisRangeStream =
Arrays.stream(
channelParam.substring(
channelParam.indexOf("{") + 1,
channelParam.indexOf("}"))
.split(",")
)
.mapToLong(str -> Long.parseLong(str));
} else {
rangeStream = LongStream.rangeClosed(startIndex, endIndex).boxed();
millisRangeStream = LongStream.rangeClosed(startIndex * 10, endIndex * 10)
.filter(val -> val % 10 == 0);
}
Stream<ChannelEvent> eventStream =
rangeStream.map(
i -> {
millisRangeStream.mapToObj(
millis -> {
long i = millis / 10;
BigDecimal iocTime =
(columns == null || columns.length == 0 || ArrayUtils.contains(columns,
FieldNames.FIELD_IOC_TIME)) ? TimeUtils.getTimeFromMillis(i * 10, 0)
FieldNames.FIELD_IOC_TIME)) ? TimeUtils.getTimeFromMillis(millis, 0)
: PropertiesUtils.DEFAULT_VALUE_DECIMAL;
BigDecimal globalTime =
(columns == null || columns.length == 0 || ArrayUtils.contains(columns,
FieldNames.FIELD_GLOBAL_TIME)) ? TimeUtils.getTimeFromMillis(i * 10, 0)
FieldNames.FIELD_GLOBAL_TIME)) ? TimeUtils.getTimeFromMillis(millis, 0)
: PropertiesUtils.DEFAULT_VALUE_DECIMAL;
long pulseId =
(columns == null || columns.length == 0 || ArrayUtils.contains(columns,
@ -180,6 +192,7 @@ public abstract class AbstractStreamEventReader implements StreamEventReader<Cha
value[1] = i;
return new ChannelEventImpl(
channel,
backend,
iocTime,
pulseId,
globalTime,
@ -196,6 +209,7 @@ public abstract class AbstractStreamEventReader implements StreamEventReader<Cha
value[1] = i;
return new ChannelEventImpl(
channel,
backend,
iocTime,
pulseId,
globalTime,
@ -206,6 +220,7 @@ public abstract class AbstractStreamEventReader implements StreamEventReader<Cha
} else {
return new ChannelEventImpl(
channel,
backend,
iocTime,
pulseId,
globalTime,
@ -219,7 +234,7 @@ public abstract class AbstractStreamEventReader implements StreamEventReader<Cha
}
private List<? extends DataEvent> getDummyEvents(String channel, long startIndex, long endIndex, String... columns) {
return getDummyEventStream(channel, startIndex, endIndex, columns).collect(Collectors.toList());
return getDummyEventStream(channel, getBackend(), startIndex, endIndex, columns).collect(Collectors.toList());
}
/**
@ -293,6 +308,7 @@ public abstract class AbstractStreamEventReader implements StreamEventReader<Cha
i -> i * 10,
i -> 0,
i -> i,
getBackend(),
query.getChannel()).stream();
}
@ -390,16 +406,18 @@ public abstract class AbstractStreamEventReader implements StreamEventReader<Cha
@Override
public void truncateCache() {}
@Override
public CompletableFuture<MetaPulseId> getStartMetaPulseIdAsync(PulseIdRangeQuery query) {
return CompletableFuture.completedFuture(new MetaPulseIdImpl(query.getChannel(), query.getStartPulseId(),
return CompletableFuture.completedFuture(new MetaPulseIdImpl(query.getChannel(), getBackend(), query
.getStartPulseId(),
TimeUtils.getTimeFromMillis(query.getStartPulseId() * 10, 0)));
}
@Override
public CompletableFuture<MetaPulseId> getEndMetaPulseIdAsync(PulseIdRangeQuery query) {
return CompletableFuture.completedFuture(new MetaPulseIdImpl(query.getChannel(), query.getEndPulseId(),
return CompletableFuture.completedFuture(new MetaPulseIdImpl(query.getChannel(), getBackend(), query
.getEndPulseId(),
TimeUtils.getTimeFromMillis(query.getEndPulseId() * 10, 0)));
}
}

View File

@ -53,7 +53,7 @@ public class DummyArchiverApplianceReader implements DataReader {
@Override
public Stream<? extends StreamEvent> getEventStream(TimeRangeQuery query) {
return DummyCassandraReader.getDummyEventStream(query.getChannel(), query.getStartMillis() / 10,
return DummyCassandraReader.getDummyEventStream(query.getChannel(), getBackend(), query.getStartMillis() / 10,
query.getEndMillis() / 10)
.filter(query.getFilterOrDefault(EventQuery.NO_OP_FILTER));
}

View File

@ -64,13 +64,13 @@ public class DummyCassandraReader extends AbstractStreamEventReader implements C
@Override
public CompletableFuture<MetaPulseId> getStartMetaPulseIdAsync(PulseIdRangeQuery query) {
return CompletableFuture.completedFuture(new MetaPulseIdImpl(query.getChannel(), query.getStartPulseId(),
return CompletableFuture.completedFuture(new MetaPulseIdImpl(query.getChannel(), getBackend(), query.getStartPulseId(),
TimeUtils.getTimeFromMillis(query.getStartPulseId() * 10, 0)));
}
@Override
public CompletableFuture<MetaPulseId> getEndMetaPulseIdAsync(PulseIdRangeQuery query) {
return CompletableFuture.completedFuture(new MetaPulseIdImpl(query.getChannel(), query.getEndPulseId(),
return CompletableFuture.completedFuture(new MetaPulseIdImpl(query.getChannel(), getBackend(), query.getEndPulseId(),
TimeUtils.getTimeFromMillis(query.getEndPulseId() * 10, 0)));
}
}

View File

@ -17,7 +17,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import ch.psi.daq.domain.query.DAQQuery;
import ch.psi.daq.domain.query.operation.Compression;
import ch.psi.daq.domain.query.operation.Response;
import ch.psi.daq.domain.query.response.Response;
import ch.psi.daq.domain.request.range.RequestRangePulseId;
import ch.psi.daq.queryrest.response.csv.CSVHTTPResponse;
import ch.psi.daq.queryrest.response.json.JSONHTTPResponse;