Option to define mapping aggregations.

This commit is contained in:
Fabian Märki 2018-10-30 09:28:34 +01:00
parent 206bc63879
commit 980ddc2098
4 changed files with 217 additions and 27 deletions

View File

@ -523,12 +523,15 @@ It is possible to map values based on their pulse-id/global time. Setting this o
```json ```json
"mapping":{ "mapping":{
"incomplete":"provide-as-is" "incomplete":"provide-as-is",
"alignment":"by-pulse",
"aggregations":["count","min","mean","max"]
} }
``` ```
- **incomplete**: Defines how incomplete mappings should be handled (e.g., when the values of two channels should be mapped but these channels have different frequencies or one was not available at the specified query range (values: **provide-as-is**|drop|fill-null). *provide-as-is* provides the data as recorded, *drop* discards incomplete mappings, and *fill-null* fills incomplete mappings with a *null* string (simplifies parsing). - **incomplete**: Defines how incomplete mappings should be handled (e.g., when the values of two channels should be mapped but these channels have different frequencies or one was not available at the specified query range (values: **provide-as-is**|drop|fill-null). *provide-as-is* provides the data as recorded, *drop* discards incomplete mappings, and *fill-null* fills incomplete mappings with a *null* string (simplifies parsing).
- **alignment**: Defines how the events should be aligned to each other (values: **by-pulse**|by-time|none). In case alignment is undefined it will be selected based on the query type (query by pulse-id or by time). _none_ will simply add one event of a channel after the other (independent of other channels).
- **aggregations**: In case several events are mapped into the same bin (e.g. due to activated binning or duplicated pulse-ids) the values will be aggregated based on this parameter (in case it is undefined it will use the global/default aggregations).
<a name="value_transformations"/> <a name="value_transformations"/>

View File

@ -311,6 +311,8 @@ public class CSVResponseStreamWriter implements ResponseStreamWriter, Applicatio
final List<Aggregation> aggregations = final List<Aggregation> aggregations =
daqQuery.getAggregation() != null ? daqQuery.getAggregation().getAggregations() : null; daqQuery.getAggregation() != null ? daqQuery.getAggregation().getAggregations() : null;
final List<Extrema> extrema = daqQuery.getAggregation() != null ? daqQuery.getAggregation().getExtrema() : null; final List<Extrema> extrema = daqQuery.getAggregation() != null ? daqQuery.getAggregation().getExtrema() : null;
final List<Aggregation> mappingAggregations =
daqQuery.getMapping() != null ? daqQuery.getMapping().getAggregations() : null;
final BackendQueryAnalyzer queryAnalyzer = queryAnalizerFactory.apply(backendQuery); final BackendQueryAnalyzer queryAnalyzer = queryAnalizerFactory.apply(backendQuery);
@ -323,30 +325,38 @@ public class CSVResponseStreamWriter implements ResponseStreamWriter, Applicatio
// in case Mapping maps several events to the same bin -> values will be aggregated into // in case Mapping maps several events to the same bin -> values will be aggregated into
// BinnedValueCombinedDataEvent or BinnedIndexCombinedDataEvent // BinnedValueCombinedDataEvent or BinnedIndexCombinedDataEvent
final Collection<Aggregation> locAggregations; Collection<Aggregation> locMappingAggregations = null;
if (aggregations != null) { if (aggregations == null) {
locAggregations = aggregations; // only when general aggregations is disabled
} else { // -> otherwise values will already be globally aggregated
locAggregations = locMappingAggregations = mappingAggregations;
if (locMappingAggregations == null) {
// use default -> should there be an extra field
locMappingAggregations =
context.getBean(QueryRestConfig.BEAN_NAME_DEFAULT_EVENT_RESPONSE_AGGREGATIONS, Set.class); context.getBean(QueryRestConfig.BEAN_NAME_DEFAULT_EVENT_RESPONSE_AGGREGATIONS, Set.class);
} }
}
Map<Class<?>, Stringifier> stringifiers = null;
if (locMappingAggregations != null) {
final Stringifier statsStringifier; final Stringifier statsStringifier;
if (locAggregations.size() == 1) { if (locMappingAggregations.size() == 1) {
// if only one aggregation is active, write only this value // if only one aggregation is active, write only this value
// -> makes it compatible with scalar/array layout // -> makes it compatible with scalar/array layout
statsStringifier = new AggregationStringifyer(locAggregations.iterator().next(), EMPTY_VALUE); statsStringifier = new AggregationStringifyer(locMappingAggregations.iterator().next(), EMPTY_VALUE);
} else { } else {
final Map<String, Stringifier> aggregationsStringifiers = new LinkedHashMap<>(locAggregations.size()); final Map<String, Stringifier> aggregationsStringifiers =
for (final Aggregation aggregation : locAggregations) { new LinkedHashMap<>(locMappingAggregations.size());
for (final Aggregation aggregation : locMappingAggregations) {
aggregationsStringifiers.put(aggregation.name(), aggregationsStringifiers.put(aggregation.name(),
new AggregationStringifyer(aggregation, EMPTY_VALUE)); new AggregationStringifyer(aggregation, EMPTY_VALUE));
} }
statsStringifier = new ObjectStringifier(aggregationsStringifiers); statsStringifier = new ObjectStringifier(aggregationsStringifiers);
} }
final Map<Class<?>, Stringifier> stringifiers = new HashMap<>(2); stringifiers = new HashMap<>(2);
stringifiers.put(StorelessStatistics.class, statsStringifier); stringifiers.put(StorelessStatistics.class, statsStringifier);
stringifiers.put(StorelessStatistics[].class, statsStringifier); stringifiers.put(StorelessStatistics[].class, statsStringifier);
}
header.add(buf.toString()); header.add(buf.toString());
accessors.add(Pair.of( accessors.add(Pair.of(
channelName, channelName,

View File

@ -222,12 +222,13 @@ public class DAQQueriesResponseFormatter
final Mapping mapping = daqQuery.getMappingOrDefault(DEFAULT_MAPPING); final Mapping mapping = daqQuery.getMappingOrDefault(DEFAULT_MAPPING);
final Padder<ChannelName, DataEvent> padder = mapping.getIncomplete().getPadder(backendQuery); final Padder<ChannelName, DataEvent> padder = mapping.getIncomplete().getPadder(backendQuery);
ToLongFunction<DataEvent> alignmentProvider = Alignment.getAlignmentProvider(daqQuery.getMapping().getAlignment(), binningStrategy); ToLongFunction<DataEvent> alignmentProvider =
Alignment.getAlignmentProvider(daqQuery.getMapping().getAlignment(), binningStrategy);
if (binningStrategy == null) { if (binningStrategy == null) {
binningStrategy = Alignment.getBinningStrategy(daqQuery.getMapping().getAlignment(), requestRange); binningStrategy = Alignment.getBinningStrategy(daqQuery.getMapping().getAlignment(), requestRange);
} }
BiFunction<DataEvent, DataEvent, DataEvent> valueCombiner = null; BiFunction<DataEvent, DataEvent, DataEvent> valueCombiner = null;
if(binningStrategy != null){ if (binningStrategy != null) {
binningStrategy.setRequestRange(requestRange); binningStrategy.setRequestRange(requestRange);
valueCombiner = new BinnedValueCombiner(binningStrategy); valueCombiner = new BinnedValueCombiner(binningStrategy);
} }
@ -327,8 +328,12 @@ public class DAQQueriesResponseFormatter
private static Set<String> getEventFields(final DAQQueryElement query, final boolean removeIdentifiers) { private static Set<String> getEventFields(final DAQQueryElement query, final boolean removeIdentifiers) {
final Set<? extends QueryField> eventFields = query.getEventFields(); final Set<? extends QueryField> eventFields = query.getEventFields();
final List<Aggregation> aggregations = List<Aggregation> aggregations =
query.getAggregation() != null ? query.getAggregation().getAggregations() : null; query.getAggregation() != null ? query.getAggregation().getAggregations() : null;
if (aggregations == null && query.getMapping() != null) {
// global aggregation has precedence
aggregations = query.getMapping().getAggregations();
}
final List<Extrema> extrema = query.getAggregation() != null ? query.getAggregation().getExtrema() : null; final List<Extrema> extrema = query.getAggregation() != null ? query.getAggregation().getExtrema() : null;
final Set<String> includedFields = final Set<String> includedFields =

View File

@ -346,6 +346,178 @@ public class CSVQueryRestControllerTest extends AbstractDaqRestTest implements A
reader.close(); reader.close();
csvParser.close(); csvParser.close();
} }
request = new DAQQuery(
new RequestRangePulseId(
0,
2),
channels);
request.setResponse(new CSVHTTPResponse());
request.setEventFields(eventFields);
request.setMapping(new Mapping());
request.getMapping().setAggregations(Arrays.asList(Aggregation.count, Aggregation.mean));
content = mapper.writeValueAsString(request);
System.out.println(content);
result = this.mockMvc
.perform(MockMvcRequestBuilders
.post(DomainConfig.PATH_QUERY)
.contentType(MediaType.APPLICATION_JSON)
.content(content))
.andDo(MockMvcResultHandlers.print())
.andExpect(MockMvcResultMatchers.status().isOk())
.andReturn();
response = result.getResponse().getContentAsString();
System.out.println("Response: " + response);
csvFormat = CSVFormat.EXCEL.withDelimiter(CSVResponseStreamWriter.DELIMITER_CVS);
reader = new StringReader(response);
csvParser = new CSVParser(reader, csvFormat);
try {
long pulse = 0;
int totalEventRows = 3;
List<CSVRecord> records = csvParser.getRecords();
assertEquals(totalEventRows + 1, records.size());
CSVRecord record = records.remove(0);
assertEquals(eventFields.size() * channels.size(), record.size());
int column = 0;
for (String channel : channels) {
for (EventField eventField : eventFields) {
assertEquals(channel + CSVResponseStreamWriter.DELIMITER_CHANNELNAME_FIELDNAME + eventField.name(),
record.get(column++));
}
}
for (int row = 0; row < totalEventRows; ++row) {
record = records.get(row);
assertEquals(eventFields.size() * channels.size(), record.size());
column = 0;
for (String channel : channels) {
if (channelSeq.get(channel).contains(pulse)) {
assertEquals(channel, record.get(column++));
assertEquals("" + pulse, record.get(column++));
assertEquals("" + TestTimeUtils.getTimeFromPulseId(pulse).getMillis(), record.get(column++));
if (channel.toLowerCase().contains("waveform")) {
if (pulse != 1) {
assertEquals("[" + pulse + "," + pulse + ",2,3,4,5,6,7]", record.get(column++));
} else {
assertEquals(
"[{count:2.0 mean:1.0},{count:2.0 mean:1.0},{count:2.0 mean:2.0},{count:2.0 mean:3.0},{count:2.0 mean:4.0},{count:2.0 mean:5.0},{count:2.0 mean:6.0},{count:2.0 mean:7.0}]",
record.get(column++));
}
} else {
if (pulse != 1) {
assertEquals("" + pulse, record.get(column++));
} else {
assertEquals("{count:2.0 mean:1.0}", record.get(column++));
}
}
} else {
assertEquals(CSVResponseStreamWriter.EMPTY_VALUE, record.get(column++));
assertEquals(CSVResponseStreamWriter.EMPTY_VALUE, record.get(column++));
assertEquals(CSVResponseStreamWriter.EMPTY_VALUE, record.get(column++));
assertEquals(CSVResponseStreamWriter.EMPTY_VALUE, record.get(column++));
}
}
++pulse;
}
} finally {
reader.close();
csvParser.close();
}
request = new DAQQuery(
new RequestRangePulseId(
0,
2),
channels);
request.setResponse(new CSVHTTPResponse());
request.setEventFields(eventFields);
request.setMapping(new Mapping());
request.getMapping().setAggregations(Arrays.asList(Aggregation.mean));
content = mapper.writeValueAsString(request);
System.out.println(content);
result = this.mockMvc
.perform(MockMvcRequestBuilders
.post(DomainConfig.PATH_QUERY)
.contentType(MediaType.APPLICATION_JSON)
.content(content))
.andDo(MockMvcResultHandlers.print())
.andExpect(MockMvcResultMatchers.status().isOk())
.andReturn();
response = result.getResponse().getContentAsString();
System.out.println("Response: " + response);
csvFormat = CSVFormat.EXCEL.withDelimiter(CSVResponseStreamWriter.DELIMITER_CVS);
reader = new StringReader(response);
csvParser = new CSVParser(reader, csvFormat);
try {
long pulse = 0;
int totalEventRows = 3;
List<CSVRecord> records = csvParser.getRecords();
assertEquals(totalEventRows + 1, records.size());
CSVRecord record = records.remove(0);
assertEquals(eventFields.size() * channels.size(), record.size());
int column = 0;
for (String channel : channels) {
for (EventField eventField : eventFields) {
assertEquals(channel + CSVResponseStreamWriter.DELIMITER_CHANNELNAME_FIELDNAME + eventField.name(),
record.get(column++));
}
}
for (int row = 0; row < totalEventRows; ++row) {
record = records.get(row);
assertEquals(eventFields.size() * channels.size(), record.size());
column = 0;
for (String channel : channels) {
if (channelSeq.get(channel).contains(pulse)) {
assertEquals(channel, record.get(column++));
assertEquals("" + pulse, record.get(column++));
assertEquals("" + TestTimeUtils.getTimeFromPulseId(pulse).getMillis(), record.get(column++));
if (channel.toLowerCase().contains("waveform")) {
if (pulse != 1) {
assertEquals("[" + pulse + "," + pulse + ",2,3,4,5,6,7]", record.get(column++));
} else {
assertEquals(
"[1.0,1.0,2.0,3.0,4.0,5.0,6.0,7.0]",
record.get(column++));
}
} else {
if (pulse != 1) {
assertEquals("" + pulse, record.get(column++));
} else {
assertEquals("1.0", record.get(column++));
}
}
} else {
assertEquals(CSVResponseStreamWriter.EMPTY_VALUE, record.get(column++));
assertEquals(CSVResponseStreamWriter.EMPTY_VALUE, record.get(column++));
assertEquals(CSVResponseStreamWriter.EMPTY_VALUE, record.get(column++));
assertEquals(CSVResponseStreamWriter.EMPTY_VALUE, record.get(column++));
}
}
++pulse;
}
} finally {
reader.close();
csvParser.close();
}
} }
@Test @Test