ATEST-958

This commit is contained in:
Fabian Märki 2018-10-29 17:01:10 +01:00
parent 9905da7047
commit 206bc63879
7 changed files with 245 additions and 49 deletions

View File

@ -1,25 +1,36 @@
package ch.psi.daq.queryrest.response.csv;
import java.util.function.Function;
import java.util.function.ToDoubleFunction;
import ch.psi.daq.common.statistic.Statistics;
import ch.psi.daq.common.string.stringifier.Stringifier;
import ch.psi.daq.domain.DataEvent;
import ch.psi.daq.domain.query.operation.Aggregation;
public class AggregationStringifyer implements Function<DataEvent, String> {
private final ToDoubleFunction<DataEvent> accessor;
public class AggregationStringifyer implements Function<DataEvent, String>, Stringifier {
private final Aggregation aggregation;
private final String nonValue;
public AggregationStringifyer(final ToDoubleFunction<DataEvent> accessor, final String nonValue) {
this.accessor = accessor;
public AggregationStringifyer(final Aggregation aggregation, final String nonValue) {
this.aggregation = aggregation;
this.nonValue = nonValue;
}
@Override
public String apply(final DataEvent event) {
if (event == null) {
return toString(event);
}
@Override
public String toString(final Object obj) {
if (obj == null) {
return nonValue;
} else if (obj instanceof DataEvent) {
return Double.toString(aggregation.getAccessor().applyAsDouble((DataEvent) obj));
} else if (obj instanceof Statistics) {
return Double.toString(aggregation.extractValue((Statistics) obj));
} else {
return Double.toString(accessor.applyAsDouble(event));
return String.valueOf(obj);
}
}
}

View File

@ -8,6 +8,7 @@ import java.io.Writer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
@ -32,11 +33,14 @@ import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import ch.psi.daq.common.statistic.StorelessStatistics;
import ch.psi.daq.common.stream.StreamIterable;
import ch.psi.daq.common.stream.match.MapFiller;
import ch.psi.daq.common.stream.match.Padder;
import ch.psi.daq.common.stream.match.ReusingMapCreator;
import ch.psi.daq.common.stream.match.StreamMatcher;
import ch.psi.daq.common.string.stringifier.ObjectStringifier;
import ch.psi.daq.common.string.stringifier.Stringifier;
import ch.psi.daq.common.tuple.Quadruple;
import ch.psi.daq.domain.DataEvent;
import ch.psi.daq.domain.backend.Backend;
@ -77,6 +81,7 @@ public class CSVResponseStreamWriter implements ResponseStreamWriter, Applicatio
public static final String EMPTY_VALUE = "";
public static final String FIELDNAME_EXTREMA = "extrema";
private ApplicationContext context;
private Function<BackendQuery, BackendQueryAnalyzer> queryAnalizerFactory;
@SuppressWarnings("unchecked")
@ -84,6 +89,7 @@ public class CSVResponseStreamWriter implements ResponseStreamWriter, Applicatio
public void setApplicationContext(ApplicationContext context) throws BeansException {
final Backend backend = context.getBean(DomainConfig.BEAN_NAME_BACKEND_DEFAULT, Backend.class);
context = backend.getApplicationContext();
this.context = context;
queryAnalizerFactory = context.getBean(QueryRestConfig.BEAN_NAME_QUERY_ANALIZER_FACTORY, Function.class);
}
@ -166,12 +172,13 @@ public class CSVResponseStreamWriter implements ResponseStreamWriter, Applicatio
final Mapping mapping = daqQueryRef.get().getMappingOrDefault(DEFAULT_MAPPING);
final Padder<ChannelName, DataEvent> padder = mapping.getIncomplete().getPadder(backendQueryRef.get());
ToLongFunction<DataEvent> alignmentProvider = Alignment.getAlignmentProvider(daqQuery.getMapping().getAlignment(), binningStrategy);
ToLongFunction<DataEvent> alignmentProvider =
Alignment.getAlignmentProvider(daqQuery.getMapping().getAlignment(), binningStrategy);
if (binningStrategy == null) {
binningStrategy = Alignment.getBinningStrategy(daqQuery.getMapping().getAlignment(), requestRange);
}
}
BiFunction<DataEvent, DataEvent, DataEvent> valueCombiner = null;
if(binningStrategy != null){
if (binningStrategy != null) {
binningStrategy.setRequestRange(requestRange);
valueCombiner = new BinnedValueCombiner(binningStrategy);
}
@ -270,7 +277,9 @@ public class CSVResponseStreamWriter implements ResponseStreamWriter, Applicatio
}
private void setupChannelConfigColumns(final DAQQueryElement daqQuery, final BackendQuery backendQuery,
private void setupChannelConfigColumns(
final DAQQueryElement daqQuery,
final BackendQuery backendQuery,
final ChannelName channelName,
final Collection<String> header,
Collection<Pair<ChannelName, Function<ChannelConfiguration, String>>> accessors) {
@ -283,12 +292,19 @@ public class CSVResponseStreamWriter implements ResponseStreamWriter, Applicatio
.append(field.name());
header.add(buf.toString());
accessors.add(Pair.of(channelName, new ConfigFieldStringifyer(field.getAccessor(), EMPTY_VALUE,
DELIMITER_ARRAY)));
accessors.add(Pair.of(
channelName,
new ConfigFieldStringifyer(field.getAccessor(),
EMPTY_VALUE,
DELIMITER_ARRAY,
null)));
}
}
private void setupChannelEventColumns(final DAQQueryElement daqQuery, final BackendQuery backendQuery,
@SuppressWarnings("unchecked")
private void setupChannelEventColumns(
final DAQQueryElement daqQuery,
final BackendQuery backendQuery,
final ChannelName channelName,
final Collection<String> header, Collection<Pair<ChannelName, Function<DataEvent, String>>> accessors) {
final Set<EventField> eventFields = daqQuery.getEventFields();
@ -305,9 +321,40 @@ public class CSVResponseStreamWriter implements ResponseStreamWriter, Applicatio
.append(DELIMITER_CHANNELNAME_FIELDNAME)
.append(field.name());
// in case Mapping maps several events to the same bin -> values will be aggregated into
// BinnedValueCombinedDataEvent or BinnedIndexCombinedDataEvent
final Collection<Aggregation> locAggregations;
if (aggregations != null) {
locAggregations = aggregations;
} else {
locAggregations =
context.getBean(QueryRestConfig.BEAN_NAME_DEFAULT_EVENT_RESPONSE_AGGREGATIONS, Set.class);
}
final Stringifier statsStringifier;
if (locAggregations.size() == 1) {
// if only one aggregation is active, write only this value
// -> makes it compatible with scalar/array layout
statsStringifier = new AggregationStringifyer(locAggregations.iterator().next(), EMPTY_VALUE);
} else {
final Map<String, Stringifier> aggregationsStringifiers = new LinkedHashMap<>(locAggregations.size());
for (final Aggregation aggregation : locAggregations) {
aggregationsStringifiers.put(aggregation.name(),
new AggregationStringifyer(aggregation, EMPTY_VALUE));
}
statsStringifier = new ObjectStringifier(aggregationsStringifiers);
}
final Map<Class<?>, Stringifier> stringifiers = new HashMap<>(2);
stringifiers.put(StorelessStatistics.class, statsStringifier);
stringifiers.put(StorelessStatistics[].class, statsStringifier);
header.add(buf.toString());
accessors.add(Pair.of(channelName, new EventFieldStringifyer(field.getAccessor(), EMPTY_VALUE,
DELIMITER_ARRAY)));
accessors.add(Pair.of(
channelName,
new EventFieldStringifyer(
field.getAccessor(),
EMPTY_VALUE,
DELIMITER_ARRAY,
stringifiers)));
}
}
@ -321,7 +368,7 @@ public class CSVResponseStreamWriter implements ResponseStreamWriter, Applicatio
.append(aggregation.name());
header.add(buf.toString());
accessors.add(Pair.of(channelName, new AggregationStringifyer(aggregation.getAccessor(), EMPTY_VALUE)));
accessors.add(Pair.of(channelName, new AggregationStringifyer(aggregation, EMPTY_VALUE)));
}
}
@ -341,8 +388,13 @@ public class CSVResponseStreamWriter implements ResponseStreamWriter, Applicatio
header.add(buf.toString());
accessors
.add(Pair.of(channelName, new EventFieldStringifyer(accessor, EMPTY_VALUE,
DELIMITER_ARRAY)));
.add(Pair.of(
channelName,
new EventFieldStringifyer(
accessor,
EMPTY_VALUE,
DELIMITER_ARRAY,
null)));
}
}
}

View File

@ -1,13 +1,19 @@
package ch.psi.daq.queryrest.response.csv;
import java.util.Map;
import java.util.function.Function;
import ch.psi.daq.common.string.stringifier.Stringifier;
import ch.psi.daq.domain.events.ChannelConfiguration;
public class ConfigFieldStringifyer extends QueryFieldStringifyer implements Function<ChannelConfiguration, String> {
public ConfigFieldStringifyer(Function<Object, Object> accessor, String nonValue, String arraySeparator) {
super(accessor, nonValue, arraySeparator);
public ConfigFieldStringifyer(
final Function<Object, Object> accessor,
final String nonValue,
final String arraySeparator,
final Map<Class<?>, Stringifier> stringifiers) {
super(accessor, nonValue, arraySeparator, stringifiers);
}
@Override

View File

@ -1,13 +1,19 @@
package ch.psi.daq.queryrest.response.csv;
import java.util.Map;
import java.util.function.Function;
import ch.psi.daq.common.string.stringifier.Stringifier;
import ch.psi.daq.domain.DataEvent;
public class EventFieldStringifyer extends QueryFieldStringifyer implements Function<DataEvent, String> {
public EventFieldStringifyer(Function<Object, Object> accessor, String nonValue, String arraySeparator) {
super(accessor, nonValue, arraySeparator);
public EventFieldStringifyer(
final Function<Object, Object> accessor,
final String nonValue,
final String arraySeparator,
final Map<Class<?>, Stringifier> stringifiers) {
super(accessor, nonValue, arraySeparator, stringifiers);
}
@Override

View File

@ -1,42 +1,61 @@
package ch.psi.daq.queryrest.response.csv;
import java.util.Collections;
import java.util.Map;
import java.util.function.Function;
import ch.psi.daq.common.string.stringifier.Stringifier;
import ch.psi.daq.common.util.Arrays;
import ch.psi.data.collection.PrimitiveList;
public class QueryFieldStringifyer {
public class QueryFieldStringifyer implements Stringifier {
public static final String OPEN_BRACKET = "[";
public static final String CLOSE_BRACKET = "]";
private Function<Object, Object> accessor;
private String nonValue;
private String arraySeparator;
private final Function<Object, Object> accessor;
private final String nonValue;
private final String arraySeparator;
private final Map<Class<?>, Stringifier> stringifiers;
public QueryFieldStringifyer(Function<Object, Object> accessor, String nonValue, String arraySeparator) {
public QueryFieldStringifyer(
final Function<Object, Object> accessor,
final String nonValue,
final String arraySeparator,
final Map<Class<?>, Stringifier> stringifiers) {
this.accessor = accessor;
this.nonValue = nonValue;
this.arraySeparator = arraySeparator;
if (stringifiers != null) {
this.stringifiers = stringifiers;
} else {
this.stringifiers = Collections.emptyMap();
}
}
protected Function<Object, Object> getAccessor() {
return accessor;
}
protected String getNonValue(){
protected String getNonValue() {
return nonValue;
}
@SuppressWarnings("rawtypes")
@Override
public String toString(Object value) {
if (value == null) {
return nonValue;
} else if (value instanceof PrimitiveList) {
return toString((PrimitiveList) value, arraySeparator);
} else if (value.getClass().isArray()) {
return Arrays.toString(value, arraySeparator);
return Arrays.toString(value, arraySeparator, stringifiers.get(value.getClass()));
} else {
return value.toString();
final Stringifier stringifier = stringifiers.get(value.getClass());
if (stringifier != null) {
return stringifier.toString(value);
} else {
return value.toString();
}
}
}

View File

@ -76,7 +76,7 @@ public class DAQQueriesResponseFormatter
@Override
public void setApplicationContext(ApplicationContext context) throws BeansException {
final Set<Aggregation> defaultEventResponseAggregations =
context.getBean(QueryRestConfig.BEAN_NAME_DEFAULT_EVENT_RESPONSE_AGGREGATIONS, Set.class);;
context.getBean(QueryRestConfig.BEAN_NAME_DEFAULT_EVENT_RESPONSE_AGGREGATIONS, Set.class);
this.defaultEventResponseAggregations =
defaultEventResponseAggregations.stream().map(Aggregation::name)
.collect(Collectors.toCollection(LinkedHashSet::new));

View File

@ -6,6 +6,7 @@ import static org.junit.Assert.assertTrue;
import java.io.StringReader;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
@ -91,7 +92,7 @@ public class CSVQueryRestControllerTest extends AbstractDaqRestTest implements A
eventFields.add(EventField.shape);
eventFields.add(EventField.eventCount);
eventFields.add(EventField.value);
request.setFields(eventFields);
request.setEventFields(eventFields);
String content = mapper.writeValueAsString(request);
System.out.println(content);
@ -181,7 +182,7 @@ public class CSVQueryRestControllerTest extends AbstractDaqRestTest implements A
eventFields.add(EventField.pulseId);
eventFields.add(EventField.globalMillis);
eventFields.add(EventField.value);
request.setFields(eventFields);
request.setEventFields(eventFields);
String content = mapper.writeValueAsString(request);
System.out.println(content);
@ -246,6 +247,107 @@ public class CSVQueryRestControllerTest extends AbstractDaqRestTest implements A
}
}
@Test
public void testPulseRangeQuery_DuplicatedPulse() throws Exception {
Map<String, Collection<Long>> channelSeq = new HashMap<>();
List<Long> channel_01_Seq = Arrays.asList(0L, 1L, 1L, 2L);
String channel_01 = "TestChannel_Scalar_01_" + Arrays.toString(channel_01_Seq.toArray());
channelSeq.put(channel_01, channel_01_Seq);
List<Long> channel_02_Seq = Arrays.asList(0L, 1L, 1L, 2L);
String channel_02 = "TestChannel_Waveform_02_" + Arrays.toString(channel_01_Seq.toArray());
channelSeq.put(channel_02, channel_02_Seq);
List<String> channels = Arrays.asList(channel_01, channel_02);
DAQQuery request = new DAQQuery(
new RequestRangePulseId(
0,
2),
channels);
request.setResponse(new CSVHTTPResponse());
LinkedHashSet<EventField> eventFields = new LinkedHashSet<>();
eventFields.add(EventField.channel);
eventFields.add(EventField.pulseId);
eventFields.add(EventField.globalMillis);
eventFields.add(EventField.value);
request.setEventFields(eventFields);
String content = mapper.writeValueAsString(request);
System.out.println(content);
MvcResult result = this.mockMvc
.perform(MockMvcRequestBuilders
.post(DomainConfig.PATH_QUERY)
.contentType(MediaType.APPLICATION_JSON)
.content(content))
.andDo(MockMvcResultHandlers.print())
.andExpect(MockMvcResultMatchers.status().isOk())
.andReturn();
String response = result.getResponse().getContentAsString();
System.out.println("Response: " + response);
CSVFormat csvFormat = CSVFormat.EXCEL.withDelimiter(CSVResponseStreamWriter.DELIMITER_CVS);
StringReader reader = new StringReader(response);
CSVParser csvParser = new CSVParser(reader, csvFormat);
try {
long pulse = 0;
int totalEventRows = 3;
List<CSVRecord> records = csvParser.getRecords();
assertEquals(totalEventRows + 1, records.size());
CSVRecord record = records.remove(0);
assertEquals(eventFields.size() * channels.size(), record.size());
int column = 0;
for (String channel : channels) {
for (EventField eventField : eventFields) {
assertEquals(channel + CSVResponseStreamWriter.DELIMITER_CHANNELNAME_FIELDNAME + eventField.name(),
record.get(column++));
}
}
for (int row = 0; row < totalEventRows; ++row) {
record = records.get(row);
assertEquals(eventFields.size() * channels.size(), record.size());
column = 0;
for (String channel : channels) {
if (channelSeq.get(channel).contains(pulse)) {
assertEquals(channel, record.get(column++));
assertEquals("" + pulse, record.get(column++));
assertEquals("" + TestTimeUtils.getTimeFromPulseId(pulse).getMillis(), record.get(column++));
if (channel.toLowerCase().contains("waveform")) {
if (pulse != 1) {
assertEquals("[" + pulse + "," + pulse + ",2,3,4,5,6,7]", record.get(column++));
} else {
assertEquals(
"[{min:1.0 mean:1.0 max:1.0},{min:1.0 mean:1.0 max:1.0},{min:2.0 mean:2.0 max:2.0},{min:3.0 mean:3.0 max:3.0},{min:4.0 mean:4.0 max:4.0},{min:5.0 mean:5.0 max:5.0},{min:6.0 mean:6.0 max:6.0},{min:7.0 mean:7.0 max:7.0}]",
record.get(column++));
}
} else {
if (pulse != 1) {
assertEquals("" + pulse, record.get(column++));
} else {
assertEquals("{min:1.0 mean:1.0 max:1.0}", record.get(column++));
}
}
} else {
assertEquals(CSVResponseStreamWriter.EMPTY_VALUE, record.get(column++));
assertEquals(CSVResponseStreamWriter.EMPTY_VALUE, record.get(column++));
assertEquals(CSVResponseStreamWriter.EMPTY_VALUE, record.get(column++));
assertEquals(CSVResponseStreamWriter.EMPTY_VALUE, record.get(column++));
}
}
++pulse;
}
} finally {
reader.close();
csvParser.close();
}
}
@Test
public void testPulseRangeQuery_Match_NoTimeField() throws Exception {
Map<String, Set<Long>> channelSeq = new HashMap<>();
@ -271,7 +373,7 @@ public class CSVQueryRestControllerTest extends AbstractDaqRestTest implements A
LinkedHashSet<EventField> eventFields = new LinkedHashSet<>();
eventFields.add(EventField.channel);
eventFields.add(EventField.value);
request.setFields(eventFields);
request.setEventFields(eventFields);
String content = mapper.writeValueAsString(request);
System.out.println(content);
@ -367,7 +469,7 @@ public class CSVQueryRestControllerTest extends AbstractDaqRestTest implements A
// eventFields.add(eventField.eventCount);
// eventFields.add(eventField.value);
// for (DAQQueryElement element : request) {
// element.setFields(eventFields);
// element.setEventFields(eventFields);
// }
//
// String content = mapper.writeValueAsString(request);
@ -456,7 +558,7 @@ public class CSVQueryRestControllerTest extends AbstractDaqRestTest implements A
eventFields.add(EventField.shape);
eventFields.add(EventField.eventCount);
eventFields.add(EventField.value);
request.setFields(eventFields);
request.setEventFields(eventFields);
String content = mapper.writeValueAsString(request);
System.out.println(content);
@ -540,7 +642,7 @@ public class CSVQueryRestControllerTest extends AbstractDaqRestTest implements A
eventFields.add(EventField.shape);
eventFields.add(EventField.eventCount);
eventFields.add(EventField.value);
request.setFields(eventFields);
request.setEventFields(eventFields);
String content = mapper.writeValueAsString(request);
System.out.println(content);
@ -640,7 +742,7 @@ public class CSVQueryRestControllerTest extends AbstractDaqRestTest implements A
eventFields.add(EventField.shape);
eventFields.add(EventField.eventCount);
eventFields.add(EventField.value);
request.setFields(eventFields);
request.setEventFields(eventFields);
String content = mapper.writeValueAsString(request);
System.out.println(content);
@ -771,7 +873,7 @@ public class CSVQueryRestControllerTest extends AbstractDaqRestTest implements A
eventFields.add(EventField.shape);
eventFields.add(EventField.eventCount);
eventFields.add(EventField.value);
request.setFields(eventFields);
request.setEventFields(eventFields);
String content = mapper.writeValueAsString(request);
System.out.println(content);
@ -925,7 +1027,7 @@ public class CSVQueryRestControllerTest extends AbstractDaqRestTest implements A
eventFields.add(EventField.globalMillis);
eventFields.add(EventField.shape);
eventFields.add(EventField.eventCount);
request.setFields(eventFields);
request.setEventFields(eventFields);
String content = mapper.writeValueAsString(request);
System.out.println(content);
@ -1029,7 +1131,7 @@ public class CSVQueryRestControllerTest extends AbstractDaqRestTest implements A
eventFields.add(EventField.shape);
eventFields.add(EventField.eventCount);
eventFields.add(EventField.value);
request.setFields(eventFields);
request.setEventFields(eventFields);
Set<EventField> extremaFields = new LinkedHashSet<>();
for (Extrema extremum : extrema) {
@ -1164,7 +1266,7 @@ public class CSVQueryRestControllerTest extends AbstractDaqRestTest implements A
eventFields.add(EventField.globalMillis);
eventFields.add(EventField.shape);
eventFields.add(EventField.eventCount);
request.setFields(eventFields);
request.setEventFields(eventFields);
String content = mapper.writeValueAsString(request);
System.out.println(content);
@ -1215,7 +1317,7 @@ public class CSVQueryRestControllerTest extends AbstractDaqRestTest implements A
eventFields.add(EventField.globalMillis);
eventFields.add(EventField.shape);
eventFields.add(EventField.eventCount);
request.setFields(eventFields);
request.setEventFields(eventFields);
String content = mapper.writeValueAsString(request);
System.out.println(content);
@ -1331,7 +1433,7 @@ public class CSVQueryRestControllerTest extends AbstractDaqRestTest implements A
eventFields.add(EventField.globalMillis);
eventFields.add(EventField.shape);
eventFields.add(EventField.eventCount);
request.setFields(eventFields);
request.setEventFields(eventFields);
String content = mapper.writeValueAsString(request);
System.out.println(content);
@ -1515,7 +1617,7 @@ public class CSVQueryRestControllerTest extends AbstractDaqRestTest implements A
csvParser.close();
}
}
@Test
public void testDateRangeQuery_NoneAlignment() throws Exception {
List<String> channels = Arrays.asList(
@ -1545,7 +1647,7 @@ public class CSVQueryRestControllerTest extends AbstractDaqRestTest implements A
eventFields.add(EventField.shape);
eventFields.add(EventField.eventCount);
eventFields.add(EventField.value);
request.setFields(eventFields);
request.setEventFields(eventFields);
String content = mapper.writeValueAsString(request);
System.out.println(content);
@ -1795,7 +1897,7 @@ public class CSVQueryRestControllerTest extends AbstractDaqRestTest implements A
eventFields.add(EventField.globalMillis);
eventFields.add(EventField.shape);
eventFields.add(EventField.eventCount);
request.setFields(eventFields);
request.setEventFields(eventFields);
String content = mapper.writeValueAsString(request);
System.out.println(content);
@ -1831,7 +1933,7 @@ public class CSVQueryRestControllerTest extends AbstractDaqRestTest implements A
LinkedHashSet<EventField> eventFields = new LinkedHashSet<>();
eventFields.add(EventField.channel);
eventFields.add(EventField.value);
request.setFields(eventFields);
request.setEventFields(eventFields);
String content = mapper.writeValueAsString(request);
System.out.println(content);