diff --git a/Readme.md b/Readme.md index bb7c0bc..c698a84 100644 --- a/Readme.md +++ b/Readme.md @@ -523,12 +523,15 @@ It is possible to map values based on their pulse-id/global time. Setting this o ```json "mapping":{ - "incomplete":"provide-as-is" + "incomplete":"provide-as-is", + "alignment":"by-pulse", + "aggregations":["count","min","mean","max"] } ``` - **incomplete**: Defines how incomplete mappings should be handled (e.g., when the values of two channels should be mapped but these channels have different frequencies or one was not available at the specified query range (values: **provide-as-is**|drop|fill-null). *provide-as-is* provides the data as recorded, *drop* discards incomplete mappings, and *fill-null* fills incomplete mappings with a *null* string (simplifies parsing). - +- **alignment**: Defines how the events should be aligned to each other (values: **by-pulse**|by-time|none). In case alignment is undefined it will be selected based on the query type (query by pulse-id or by time). _none_ will simply add one event of a channel after the other (independent of other channels). +- **aggregations**: In case several events are mapped into the same bin (e.g. due to activated binning or duplicated pulse-ids) the values will be aggregated based on this parameter (in case it is undefined it will use the global/default aggregations). diff --git a/src/main/java/ch/psi/daq/queryrest/response/csv/CSVResponseStreamWriter.java b/src/main/java/ch/psi/daq/queryrest/response/csv/CSVResponseStreamWriter.java index ab6c9b2..759b033 100644 --- a/src/main/java/ch/psi/daq/queryrest/response/csv/CSVResponseStreamWriter.java +++ b/src/main/java/ch/psi/daq/queryrest/response/csv/CSVResponseStreamWriter.java @@ -311,6 +311,8 @@ public class CSVResponseStreamWriter implements ResponseStreamWriter, Applicatio final List aggregations = daqQuery.getAggregation() != null ? daqQuery.getAggregation().getAggregations() : null; final List extrema = daqQuery.getAggregation() != null ? daqQuery.getAggregation().getExtrema() : null; + final List mappingAggregations = + daqQuery.getMapping() != null ? daqQuery.getMapping().getAggregations() : null; final BackendQueryAnalyzer queryAnalyzer = queryAnalizerFactory.apply(backendQuery); @@ -323,30 +325,38 @@ public class CSVResponseStreamWriter implements ResponseStreamWriter, Applicatio // in case Mapping maps several events to the same bin -> values will be aggregated into // BinnedValueCombinedDataEvent or BinnedIndexCombinedDataEvent - final Collection locAggregations; - if (aggregations != null) { - locAggregations = aggregations; - } else { - locAggregations = - context.getBean(QueryRestConfig.BEAN_NAME_DEFAULT_EVENT_RESPONSE_AGGREGATIONS, Set.class); - } - final Stringifier statsStringifier; - if (locAggregations.size() == 1) { - // if only one aggregation is active, write only this value - // -> makes it compatible with scalar/array layout - statsStringifier = new AggregationStringifyer(locAggregations.iterator().next(), EMPTY_VALUE); - } else { - final Map aggregationsStringifiers = new LinkedHashMap<>(locAggregations.size()); - for (final Aggregation aggregation : locAggregations) { - aggregationsStringifiers.put(aggregation.name(), - new AggregationStringifyer(aggregation, EMPTY_VALUE)); + Collection locMappingAggregations = null; + if (aggregations == null) { + // only when general aggregations is disabled + // -> otherwise values will already be globally aggregated + locMappingAggregations = mappingAggregations; + if (locMappingAggregations == null) { + // use default -> should there be an extra field + locMappingAggregations = + context.getBean(QueryRestConfig.BEAN_NAME_DEFAULT_EVENT_RESPONSE_AGGREGATIONS, Set.class); } - statsStringifier = new ObjectStringifier(aggregationsStringifiers); } - final Map, Stringifier> stringifiers = new HashMap<>(2); - stringifiers.put(StorelessStatistics.class, statsStringifier); - stringifiers.put(StorelessStatistics[].class, statsStringifier); + Map, Stringifier> stringifiers = null; + if (locMappingAggregations != null) { + final Stringifier statsStringifier; + if (locMappingAggregations.size() == 1) { + // if only one aggregation is active, write only this value + // -> makes it compatible with scalar/array layout + statsStringifier = new AggregationStringifyer(locMappingAggregations.iterator().next(), EMPTY_VALUE); + } else { + final Map aggregationsStringifiers = + new LinkedHashMap<>(locMappingAggregations.size()); + for (final Aggregation aggregation : locMappingAggregations) { + aggregationsStringifiers.put(aggregation.name(), + new AggregationStringifyer(aggregation, EMPTY_VALUE)); + } + statsStringifier = new ObjectStringifier(aggregationsStringifiers); + } + stringifiers = new HashMap<>(2); + stringifiers.put(StorelessStatistics.class, statsStringifier); + stringifiers.put(StorelessStatistics[].class, statsStringifier); + } header.add(buf.toString()); accessors.add(Pair.of( channelName, diff --git a/src/main/java/ch/psi/daq/queryrest/response/formatter/DAQQueriesResponseFormatter.java b/src/main/java/ch/psi/daq/queryrest/response/formatter/DAQQueriesResponseFormatter.java index aff0f76..0594efe 100644 --- a/src/main/java/ch/psi/daq/queryrest/response/formatter/DAQQueriesResponseFormatter.java +++ b/src/main/java/ch/psi/daq/queryrest/response/formatter/DAQQueriesResponseFormatter.java @@ -222,12 +222,13 @@ public class DAQQueriesResponseFormatter final Mapping mapping = daqQuery.getMappingOrDefault(DEFAULT_MAPPING); final Padder padder = mapping.getIncomplete().getPadder(backendQuery); - ToLongFunction alignmentProvider = Alignment.getAlignmentProvider(daqQuery.getMapping().getAlignment(), binningStrategy); + ToLongFunction alignmentProvider = + Alignment.getAlignmentProvider(daqQuery.getMapping().getAlignment(), binningStrategy); if (binningStrategy == null) { binningStrategy = Alignment.getBinningStrategy(daqQuery.getMapping().getAlignment(), requestRange); - } + } BiFunction valueCombiner = null; - if(binningStrategy != null){ + if (binningStrategy != null) { binningStrategy.setRequestRange(requestRange); valueCombiner = new BinnedValueCombiner(binningStrategy); } @@ -327,8 +328,12 @@ public class DAQQueriesResponseFormatter private static Set getEventFields(final DAQQueryElement query, final boolean removeIdentifiers) { final Set eventFields = query.getEventFields(); - final List aggregations = + List aggregations = query.getAggregation() != null ? query.getAggregation().getAggregations() : null; + if (aggregations == null && query.getMapping() != null) { + // global aggregation has precedence + aggregations = query.getMapping().getAggregations(); + } final List extrema = query.getAggregation() != null ? query.getAggregation().getExtrema() : null; final Set includedFields = diff --git a/src/test/java/ch/psi/daq/test/queryrest/controller/CSVQueryRestControllerTest.java b/src/test/java/ch/psi/daq/test/queryrest/controller/CSVQueryRestControllerTest.java index eaf6c9b..dbc8fcf 100644 --- a/src/test/java/ch/psi/daq/test/queryrest/controller/CSVQueryRestControllerTest.java +++ b/src/test/java/ch/psi/daq/test/queryrest/controller/CSVQueryRestControllerTest.java @@ -346,6 +346,178 @@ public class CSVQueryRestControllerTest extends AbstractDaqRestTest implements A reader.close(); csvParser.close(); } + + request = new DAQQuery( + new RequestRangePulseId( + 0, + 2), + channels); + request.setResponse(new CSVHTTPResponse()); + request.setEventFields(eventFields); + request.setMapping(new Mapping()); + request.getMapping().setAggregations(Arrays.asList(Aggregation.count, Aggregation.mean)); + + content = mapper.writeValueAsString(request); + System.out.println(content); + + result = this.mockMvc + .perform(MockMvcRequestBuilders + .post(DomainConfig.PATH_QUERY) + .contentType(MediaType.APPLICATION_JSON) + .content(content)) + .andDo(MockMvcResultHandlers.print()) + .andExpect(MockMvcResultMatchers.status().isOk()) + .andReturn(); + + response = result.getResponse().getContentAsString(); + System.out.println("Response: " + response); + + csvFormat = CSVFormat.EXCEL.withDelimiter(CSVResponseStreamWriter.DELIMITER_CVS); + reader = new StringReader(response); + csvParser = new CSVParser(reader, csvFormat); + + try { + long pulse = 0; + int totalEventRows = 3; + + List records = csvParser.getRecords(); + assertEquals(totalEventRows + 1, records.size()); + CSVRecord record = records.remove(0); + assertEquals(eventFields.size() * channels.size(), record.size()); + int column = 0; + for (String channel : channels) { + for (EventField eventField : eventFields) { + assertEquals(channel + CSVResponseStreamWriter.DELIMITER_CHANNELNAME_FIELDNAME + eventField.name(), + record.get(column++)); + } + } + + for (int row = 0; row < totalEventRows; ++row) { + record = records.get(row); + + assertEquals(eventFields.size() * channels.size(), record.size()); + + column = 0; + for (String channel : channels) { + if (channelSeq.get(channel).contains(pulse)) { + assertEquals(channel, record.get(column++)); + assertEquals("" + pulse, record.get(column++)); + assertEquals("" + TestTimeUtils.getTimeFromPulseId(pulse).getMillis(), record.get(column++)); + if (channel.toLowerCase().contains("waveform")) { + if (pulse != 1) { + assertEquals("[" + pulse + "," + pulse + ",2,3,4,5,6,7]", record.get(column++)); + } else { + assertEquals( + "[{count:2.0 mean:1.0},{count:2.0 mean:1.0},{count:2.0 mean:2.0},{count:2.0 mean:3.0},{count:2.0 mean:4.0},{count:2.0 mean:5.0},{count:2.0 mean:6.0},{count:2.0 mean:7.0}]", + record.get(column++)); + } + } else { + if (pulse != 1) { + assertEquals("" + pulse, record.get(column++)); + } else { + assertEquals("{count:2.0 mean:1.0}", record.get(column++)); + } + } + } else { + assertEquals(CSVResponseStreamWriter.EMPTY_VALUE, record.get(column++)); + assertEquals(CSVResponseStreamWriter.EMPTY_VALUE, record.get(column++)); + assertEquals(CSVResponseStreamWriter.EMPTY_VALUE, record.get(column++)); + assertEquals(CSVResponseStreamWriter.EMPTY_VALUE, record.get(column++)); + } + + } + ++pulse; + } + } finally { + reader.close(); + csvParser.close(); + } + + request = new DAQQuery( + new RequestRangePulseId( + 0, + 2), + channels); + request.setResponse(new CSVHTTPResponse()); + request.setEventFields(eventFields); + request.setMapping(new Mapping()); + request.getMapping().setAggregations(Arrays.asList(Aggregation.mean)); + + content = mapper.writeValueAsString(request); + System.out.println(content); + + result = this.mockMvc + .perform(MockMvcRequestBuilders + .post(DomainConfig.PATH_QUERY) + .contentType(MediaType.APPLICATION_JSON) + .content(content)) + .andDo(MockMvcResultHandlers.print()) + .andExpect(MockMvcResultMatchers.status().isOk()) + .andReturn(); + + response = result.getResponse().getContentAsString(); + System.out.println("Response: " + response); + + csvFormat = CSVFormat.EXCEL.withDelimiter(CSVResponseStreamWriter.DELIMITER_CVS); + reader = new StringReader(response); + csvParser = new CSVParser(reader, csvFormat); + + try { + long pulse = 0; + int totalEventRows = 3; + + List records = csvParser.getRecords(); + assertEquals(totalEventRows + 1, records.size()); + CSVRecord record = records.remove(0); + assertEquals(eventFields.size() * channels.size(), record.size()); + int column = 0; + for (String channel : channels) { + for (EventField eventField : eventFields) { + assertEquals(channel + CSVResponseStreamWriter.DELIMITER_CHANNELNAME_FIELDNAME + eventField.name(), + record.get(column++)); + } + } + + for (int row = 0; row < totalEventRows; ++row) { + record = records.get(row); + + assertEquals(eventFields.size() * channels.size(), record.size()); + + column = 0; + for (String channel : channels) { + if (channelSeq.get(channel).contains(pulse)) { + assertEquals(channel, record.get(column++)); + assertEquals("" + pulse, record.get(column++)); + assertEquals("" + TestTimeUtils.getTimeFromPulseId(pulse).getMillis(), record.get(column++)); + if (channel.toLowerCase().contains("waveform")) { + if (pulse != 1) { + assertEquals("[" + pulse + "," + pulse + ",2,3,4,5,6,7]", record.get(column++)); + } else { + assertEquals( + "[1.0,1.0,2.0,3.0,4.0,5.0,6.0,7.0]", + record.get(column++)); + } + } else { + if (pulse != 1) { + assertEquals("" + pulse, record.get(column++)); + } else { + assertEquals("1.0", record.get(column++)); + } + } + } else { + assertEquals(CSVResponseStreamWriter.EMPTY_VALUE, record.get(column++)); + assertEquals(CSVResponseStreamWriter.EMPTY_VALUE, record.get(column++)); + assertEquals(CSVResponseStreamWriter.EMPTY_VALUE, record.get(column++)); + assertEquals(CSVResponseStreamWriter.EMPTY_VALUE, record.get(column++)); + } + + } + ++pulse; + } + } finally { + reader.close(); + csvParser.close(); + } } @Test