ATEST-442 -> Extrema

This commit is contained in:
Fabian Märki
2016-06-10 17:43:34 +02:00
parent 6638043102
commit fb327f7457
7 changed files with 457 additions and 89 deletions

View File

@ -288,6 +288,7 @@ It is possible (and recommended) to aggregate queried data.
- **aggregationType**: Specifies the type of aggregation (see [here](https://github.psi.ch/sf_daq/ch.psi.daq.domain/blob/master/src/main/java/ch/psi/daq/domain/query/operation/AggregationType.java)). The default type is *value* aggregation (e.g., sum([1,2,3])=6). Alternatively, it is possible to define *index* aggregation for multiple arrays in combination with binning (e.g., sum([1,2,3], [3,2,1]) = [4,4,4]).
- **aggregations**: Array of requested aggregations (see [here](https://github.psi.ch/sf_daq/ch.psi.daq.domain/blob/master/src/main/java/ch/psi/daq/domain/query/operation/Aggregation.java) for possible values). These values will be added to the *data* array response.
- **extrema**: Array of requested extrema (see [here](https://github.psi.ch/sf_daq/ch.psi.daq.domain/blob/master/src/main/java/ch/psi/daq/domain/query/operation/Extrema.java) for possible values). These values will be added to the *data* array response.
- **nrOfBins**: Activates data binning. Specifies the number of bins the pulse/time range should be divided into.
- **durationPerBin**: Activates data binning. Specifies the duration per bin for time-range queries (using duration makes this binning strategy consistent between channel with different update frequencies). The duration is defined as a [ISO-8601](https://en.wikipedia.org/wiki/ISO_8601#Durations) duration (e.g., `PT1H` for 1 hour, `PT2S` for 2 seconds, `PT0.05S` for 50 milliseconds etc.). The resolution is in milliseconds and thus the minimal duration is 1 millisecond.
- **pulsesPerBin**: Activates data binning. Specifies the number of pulses per bin for pulse-range queries (using number of pulses makes this binning strategy consistent between channel with different update frequencies).
@ -929,73 +930,5 @@ Illustration of array index aggregation with additional with binning (several nr
![Index Aggregation with Binning](doc/images/Index_Binning.png)
#### Extrema Search
```json
{
"aggregation":{
"aggregationType":"extrema",
"aggregations":["min","max","sum"]
},
"fields":["pulseId","value"],
"range":{
"startPulseId":0,
"endPulseId":3
},
"channels":[
"Channel_01"
]
}
```
##### Command
```bash
curl -H "Content-Type: application/json" -X POST -d '"aggregation":{"aggregationType":"extrema","aggregations":["min","max","sum"]},"fields":["pulseId","value"],"range":{"startPulseId":0,"endPulseId":3},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query | python -m json.tool
```
##### Response
```json
[
{
"channel":"Channel_01",
"data":{
"minima":{
"min":{
"value":1.0,
"event":{
"pulseId":0,
"value":[1,2,3,4]
}
},
"sum":{
"value":10.0,
"event":{
"pulseId":0,
"value":[1,2,3,4]
}
}
},
"maxima":{
"max":{
"value":7.0,
"event":{
"pulseId":3,
"value":[4,5,6,7]
}
},
"sum":{
"value":22.0,
"event":{
"pulseId":3,
"value":[4,5,6,7]
}
}
}
}
}
]
```

View File

@ -32,6 +32,7 @@ import ch.psi.daq.domain.DataEvent;
import ch.psi.daq.domain.query.operation.Aggregation;
import ch.psi.daq.domain.query.operation.QueryField;
import ch.psi.daq.domain.query.operation.Response;
import ch.psi.daq.domain.query.operation.aggregation.extrema.AbstractExtremaMeta;
import ch.psi.daq.query.analyzer.QueryAnalyzer;
import ch.psi.daq.query.analyzer.QueryAnalyzerImpl;
import ch.psi.daq.query.config.QueryConfig;
@ -86,6 +87,7 @@ public class QueryRestConfig extends WebMvcConfigurerAdapter {
// won't. This way, the user can specify which columns are to be received.
objectMapper.addMixIn(DataEvent.class, PropertyFilterMixin.class);
objectMapper.addMixIn(Statistics.class, PropertyFilterMixin.class);
objectMapper.addMixIn(AbstractExtremaMeta.class, PropertyFilterMixin.class);
objectMapper.addMixIn(EnumMap.class, PropertyFilterMixin.class);
objectMapper.addMixIn(Response.class, PolymorphicResponseMixIn.class);

View File

@ -35,6 +35,7 @@ import ch.psi.daq.domain.DataEvent;
import ch.psi.daq.domain.json.ChannelName;
import ch.psi.daq.domain.query.DAQQueryElement;
import ch.psi.daq.domain.query.operation.Aggregation;
import ch.psi.daq.domain.query.operation.Extrema;
import ch.psi.daq.domain.query.operation.QueryField;
import ch.psi.daq.query.analyzer.QueryAnalyzer;
import ch.psi.daq.query.model.Query;
@ -52,6 +53,7 @@ public class CSVResponseStreamWriter implements ResponseStreamWriter {
public static final String DELIMITER_ARRAY = ",";
public static final char DELIMITER_CHANNELNAME_FIELDNAME = '.';
public static final String EMPTY_VALUE = "";
public static final String FIELDNAME_EXTREMA = "extrema";
private static final Function<Pair<ChannelName, DataEvent>, ChannelName> KEY_PROVIDER = (pair) -> pair.getKey();
// try to match sync data (bsread) with non sync data (epics) based on the time usin 10 millis
// buckets.
@ -62,7 +64,8 @@ public class CSVResponseStreamWriter implements ResponseStreamWriter {
private Function<Query, QueryAnalyzer> queryAnalizerFactory;
@Override
public void respond(final List<Entry<DAQQueryElement, Stream<Triple<BackendQuery, ChannelName, ?>>>> results, final OutputStream out) throws Exception {
public void respond(final List<Entry<DAQQueryElement, Stream<Triple<BackendQuery, ChannelName, ?>>>> results,
final OutputStream out) throws Exception {
AtomicReference<Exception> exception = new AtomicReference<>();
final Map<ChannelName, Stream<Pair<ChannelName, DataEvent>>> streams = new LinkedHashMap<>(results.size());
@ -151,14 +154,20 @@ public class CSVResponseStreamWriter implements ResponseStreamWriter {
private void setupChannelColumns(DAQQueryElement daqQuery, BackendQuery backendQuery, ChannelName channelName,
Collection<String> header, Collection<Pair<ChannelName, Function<DataEvent, String>>> accessors) {
Set<QueryField> queryFields = daqQuery.getFields();
List<Aggregation> aggregations = daqQuery.getAggregation() != null ? daqQuery.getAggregation().getAggregations() : null;
List<Aggregation> aggregations =
daqQuery.getAggregation() != null ? daqQuery.getAggregation().getAggregations() : null;
List<Extrema> extrema = daqQuery.getAggregation() != null ? daqQuery.getAggregation().getExtrema() : null;
QueryAnalyzer queryAnalyzer = queryAnalizerFactory.apply(backendQuery);
for (QueryField field : queryFields) {
if (!(QueryField.value.equals(field) && queryAnalyzer.isAggregationEnabled())) {
header.add(channelName.getName() + DELIMITER_CHANNELNAME_FIELDNAME + field.name());
StringBuilder buf = new StringBuilder(3)
.append(channelName.getName())
.append(DELIMITER_CHANNELNAME_FIELDNAME)
.append(field.name());
header.add(buf.toString());
accessors.add(Pair.of(channelName, new QueryFieldStringifyer(field.getAccessor(), EMPTY_VALUE,
DELIMITER_ARRAY)));
}
@ -166,10 +175,39 @@ public class CSVResponseStreamWriter implements ResponseStreamWriter {
if (aggregations != null && queryAnalyzer.isAggregationEnabled()) {
for (Aggregation aggregation : aggregations) {
header.add(channelName.getName() + DELIMITER_CHANNELNAME_FIELDNAME + QueryField.value.name()
+ DELIMITER_CHANNELNAME_FIELDNAME + aggregation.name());
StringBuilder buf = new StringBuilder(5)
.append(channelName.getName())
.append(DELIMITER_CHANNELNAME_FIELDNAME)
.append(QueryField.value.name())
.append(DELIMITER_CHANNELNAME_FIELDNAME)
.append(aggregation.name());
header.add(buf.toString());
accessors.add(Pair.of(channelName, new AggregationStringifyer(aggregation.getAccessor(), EMPTY_VALUE)));
}
}
if (extrema != null && queryAnalyzer.isAggregationEnabled()) {
for (Extrema extremum : extrema) {
for (QueryField field : queryFields) {
Function<DataEvent, Object> accessor = extremum.getAccessor(field);
if (accessor != null) {
StringBuilder buf = new StringBuilder(7)
.append(channelName.getName())
.append(DELIMITER_CHANNELNAME_FIELDNAME)
.append(FIELDNAME_EXTREMA)
.append(DELIMITER_CHANNELNAME_FIELDNAME)
.append(extremum.name())
.append(DELIMITER_CHANNELNAME_FIELDNAME)
.append(field.name());
header.add(buf.toString());
accessors
.add(Pair.of(channelName, new QueryFieldStringifyer(accessor, EMPTY_VALUE,
DELIMITER_ARRAY)));
}
}
}
}
}
}

View File

@ -26,6 +26,7 @@ import com.fasterxml.jackson.databind.ser.impl.SimpleFilterProvider;
import ch.psi.daq.domain.json.ChannelName;
import ch.psi.daq.domain.query.DAQQueryElement;
import ch.psi.daq.domain.query.operation.Aggregation;
import ch.psi.daq.domain.query.operation.Extrema;
import ch.psi.daq.domain.query.operation.QueryField;
import ch.psi.daq.query.model.impl.BackendQuery;
import ch.psi.daq.queryrest.response.ResponseStreamWriter;
@ -47,7 +48,8 @@ public class JSONResponseStreamWriter implements ResponseStreamWriter {
private ObjectMapper mapper;
@Override
public void respond(List<Entry<DAQQueryElement, Stream<Triple<BackendQuery, ChannelName, ?>>>> results, OutputStream out) throws Exception {
public void respond(List<Entry<DAQQueryElement, Stream<Triple<BackendQuery, ChannelName, ?>>>> results,
OutputStream out) throws Exception {
AtomicReference<Exception> exception = new AtomicReference<>();
JsonGenerator generator = jsonFactory.createGenerator(out, JsonEncoding.UTF8);
@ -78,7 +80,8 @@ public class JSONResponseStreamWriter implements ResponseStreamWriter {
writer.writeValue(generator, triple.getRight());
generator.writeEndObject();
} catch (Exception e) {
LOGGER.error("Could not write channel name of channel '{}'", triple.getMiddle(), e);
LOGGER.error("Could not write channel name of channel '{}'", triple.getMiddle(),
e);
exception.compareAndSet(null, e);
}
});
@ -106,9 +109,11 @@ public class JSONResponseStreamWriter implements ResponseStreamWriter {
protected Set<String> getFields(DAQQueryElement query) {
Set<QueryField> queryFields = query.getFields();
List<Aggregation> aggregations = query.getAggregation() != null ? query.getAggregation().getAggregations() : null;
List<Extrema> extrema = query.getAggregation() != null ? query.getAggregation().getExtrema() : null;
Set<String> includedFields =
new LinkedHashSet<String>(queryFields.size() + (aggregations != null ? aggregations.size() : 0));
new LinkedHashSet<String>(queryFields.size() + (aggregations != null ? aggregations.size() : 0)
+ (extrema != null ? extrema.size() : 0));
for (QueryField field : queryFields) {
includedFields.add(field.name());
@ -118,6 +123,11 @@ public class JSONResponseStreamWriter implements ResponseStreamWriter {
includedFields.add(aggregation.name());
}
}
if (extrema != null) {
// field of ExtremaCalculator (extrema in BinnedValueCombinedDataEvent and
// BinnedIndexCombinedDataEvent)
includedFields.add("extrema");
}
// do not write channel since it is already provided as key in mapping
includedFields.remove(QueryField.channel.name());

View File

@ -32,6 +32,7 @@ import ch.psi.daq.domain.query.operation.Aggregation;
import ch.psi.daq.domain.query.operation.AggregationDescriptor;
import ch.psi.daq.domain.query.operation.AggregationType;
import ch.psi.daq.domain.query.operation.Compression;
import ch.psi.daq.domain.query.operation.Extrema;
import ch.psi.daq.domain.query.operation.QueryField;
import ch.psi.daq.domain.request.range.RequestRangeDate;
import ch.psi.daq.domain.request.range.RequestRangePulseId;
@ -482,7 +483,7 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest {
assertEquals("" + TimeUtils.getMillis(TestTimeUtils.getTimeFromPulseId(pulse)), record.get(column++));
assertEquals(TimeUtils.getTimeStr(TestTimeUtils.getTimeFromPulseId(pulse)), record.get(column++));
assertEquals("" + TimeUtils.getMillis(TestTimeUtils.getTimeFromPulseId(pulse)), record.get(column++));
assertEquals("[2048]", record.get(column++));
assertEquals("[8]", record.get(column++));
assertEquals("1", record.get(column++));
assertTrue(record.get(column).startsWith("["));
assertTrue(record.get(column++).endsWith("]"));
@ -826,6 +827,146 @@ public class QueryRestControllerCsvTest extends AbstractDaqRestTest {
}
}
@Test
public void testDateRangeQueryNrOfBinsAggregateExtrema() throws Exception {
List<String> channels = Arrays.asList(TEST_CHANNEL_01);
long startTime = 0;
long endTime = 99;
String startDate = TimeUtils.format(startTime);
String endDate = TimeUtils.format(endTime);
List<Aggregation> aggregations = new ArrayList<>();
aggregations.add(Aggregation.min);
aggregations.add(Aggregation.mean);
aggregations.add(Aggregation.max);
List<Extrema> extrema = new ArrayList<>();
extrema.add(Extrema.minValue);
extrema.add(Extrema.maxValue);
DAQQuery request = new DAQQuery(
new RequestRangeDate(
startDate,
endDate),
channels);
request.setAggregation(new AggregationDescriptor().setNrOfBins(2).setAggregations(aggregations)
.setExtrema(extrema));
request.setResponse(new CSVHTTPResponse());
LinkedHashSet<QueryField> queryFields = new LinkedHashSet<>();
queryFields.add(QueryField.channel);
queryFields.add(QueryField.pulseId);
queryFields.add(QueryField.iocSeconds);
queryFields.add(QueryField.iocMillis);
queryFields.add(QueryField.globalSeconds);
queryFields.add(QueryField.globalMillis);
queryFields.add(QueryField.shape);
queryFields.add(QueryField.eventCount);
queryFields.add(QueryField.value);
request.setFields(queryFields);
Set<QueryField> extremaFields = new LinkedHashSet<>();
for (Extrema extremum : extrema) {
for (QueryField queryField : queryFields) {
if (extremum.getAccessor(queryField) != null) {
extremaFields.add(queryField);
}
}
}
String content = mapper.writeValueAsString(request);
System.out.println(content);
MvcResult result = this.mockMvc
.perform(MockMvcRequestBuilders
.post(QueryRestController.PATH_QUERY)
.contentType(MediaType.APPLICATION_JSON)
.content(content))
.andDo(MockMvcResultHandlers.print())
.andExpect(MockMvcResultMatchers.status().isOk())
.andReturn();
String response = result.getResponse().getContentAsString();
System.out.println("Response: " + response);
CSVFormat csvFormat = CSVFormat.EXCEL.withDelimiter(CSVResponseStreamWriter.DELIMITER_CVS);
StringReader reader = new StringReader(response);
CSVParser csvParser = new CSVParser(reader, csvFormat);
// will not be included as it is an aggregation
queryFields.remove(QueryField.value);
try {
long pulse = 0;
int totalRows = 2;
List<CSVRecord> records = csvParser.getRecords();
assertEquals(totalRows + 1, records.size());
// remove header
CSVRecord record = records.remove(0);
assertEquals(
(queryFields.size() + aggregations.size() + (extremaFields.size() * extrema.size())) * channels.size(),
record.size());
int column = 0;
for (String channel : channels) {
for (QueryField queryField : queryFields) {
assertEquals(channel + CSVResponseStreamWriter.DELIMITER_CHANNELNAME_FIELDNAME + queryField.name(),
record.get(column++));
}
for (Aggregation aggregation : aggregations) {
assertEquals(channel + CSVResponseStreamWriter.DELIMITER_CHANNELNAME_FIELDNAME + QueryField.value
+ CSVResponseStreamWriter.DELIMITER_CHANNELNAME_FIELDNAME + aggregation.name(),
record.get(column++));
}
for (Extrema extremum : extrema) {
for (QueryField queryField : extremaFields) {
assertEquals(channel + CSVResponseStreamWriter.DELIMITER_CHANNELNAME_FIELDNAME
+ CSVResponseStreamWriter.FIELDNAME_EXTREMA
+ CSVResponseStreamWriter.DELIMITER_CHANNELNAME_FIELDNAME + extremum.name()
+ CSVResponseStreamWriter.DELIMITER_CHANNELNAME_FIELDNAME + queryField.name(),
record.get(column++));
}
}
}
for (int row = 0; row < totalRows; ++row) {
record = records.get(row);
assertEquals((queryFields.size() + aggregations.size() + (extremaFields.size() * extrema.size()))
* channels.size(), record.size());
column = 0;
for (String channel : channels) {
assertEquals(channel, record.get(column++));
assertEquals("" + pulse, record.get(column++));
assertEquals(TimeUtils.getTimeStr(TestTimeUtils.getTimeFromPulseId(pulse)), record.get(column++));
assertEquals("" + TimeUtils.getMillis(TestTimeUtils.getTimeFromPulseId(pulse)), record.get(column++));
assertEquals(TimeUtils.getTimeStr(TestTimeUtils.getTimeFromPulseId(pulse)), record.get(column++));
assertEquals("" + TimeUtils.getMillis(TestTimeUtils.getTimeFromPulseId(pulse)), record.get(column++));
assertEquals("[1]", record.get(column++));
assertEquals("5", record.get(column++));
assertEquals("" + pulse + ".0", record.get(column++));
assertEquals("" + (pulse + 2) + ".0", record.get(column++));
assertEquals("" + (pulse + 4) + ".0", record.get(column++));
assertEquals("" + pulse, record.get(column++));
assertEquals(TimeUtils.getTimeStr(TestTimeUtils.getTimeFromPulseId(pulse)), record.get(column++));
assertEquals("" + TimeUtils.getMillis(TestTimeUtils.getTimeFromPulseId(pulse)), record.get(column++));
assertEquals("1", record.get(column++));
assertEquals("" + pulse + ".0", record.get(column++));
assertEquals("" + (pulse + 4), record.get(column++));
assertEquals(TimeUtils.getTimeStr(TestTimeUtils.getTimeFromPulseId((pulse + 4))), record.get(column++));
assertEquals("" + TimeUtils.getMillis(TestTimeUtils.getTimeFromPulseId((pulse + 4))),
record.get(column++));
assertEquals("1", record.get(column++));
assertEquals("" + (pulse + 4) + ".0", record.get(column++));
}
pulse += 5;
}
} finally {
reader.close();
csvParser.close();
}
}
@Test
public void testDateRangeQueryBinSizeAggregate() throws Exception {
List<String> channels = Arrays.asList(TEST_CHANNEL_01);

View File

@ -1,5 +1,7 @@
package ch.psi.daq.test.queryrest.controller;
import java.util.Arrays;
import org.junit.After;
import org.junit.Test;
import org.springframework.http.MediaType;
@ -15,9 +17,11 @@ import ch.psi.daq.domain.query.DAQQueries;
import ch.psi.daq.domain.query.DAQQuery;
import ch.psi.daq.domain.query.DAQQueryElement;
import ch.psi.daq.domain.query.channels.ChannelsRequest;
import ch.psi.daq.domain.query.operation.Aggregation;
import ch.psi.daq.domain.query.operation.AggregationDescriptor;
import ch.psi.daq.domain.query.operation.AggregationType;
import ch.psi.daq.domain.query.operation.Compression;
import ch.psi.daq.domain.query.operation.Extrema;
import ch.psi.daq.domain.query.operation.QueryField;
import ch.psi.daq.domain.reader.Backend;
import ch.psi.daq.domain.request.range.RequestRangeDate;
@ -35,6 +39,7 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest {
public static final String TEST_CHANNEL_01 = "testChannel1";
public static final String TEST_CHANNEL_02 = "testChannel2";
public static final String TEST_CHANNEL_WAVEFORM_01 = "testChannelWaveform1";
public static final String[] TEST_CHANNEL_NAMES = new String[] {TEST_CHANNEL_01, TEST_CHANNEL_02};
@After
@ -553,12 +558,14 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest {
long endTime = 1099;
String startDate = TimeUtils.format(startTime);
String endDate = TimeUtils.format(endTime);
DAQQuery request = new DAQQuery(
new RequestRangeDate(
startDate,
endDate),
new AggregationDescriptor().setNrOfBins(2),
TEST_CHANNEL_01);
DAQQuery request =
new DAQQuery(
new RequestRangeDate(
startDate,
endDate),
new AggregationDescriptor().setNrOfBins(2).setAggregations(
Arrays.asList(Aggregation.min, Aggregation.mean, Aggregation.max)),
TEST_CHANNEL_01);
String content = mapper.writeValueAsString(request);
System.out.println(content);
@ -595,6 +602,78 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest {
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].value.max").value(109.0));
}
@Test
public void testDateRangeQueryNrOfBinsAggregateExtrema() throws Exception {
long startTime = 1000;
long endTime = 1099;
String startDate = TimeUtils.format(startTime);
String endDate = TimeUtils.format(endTime);
DAQQuery request =
new DAQQuery(
new RequestRangeDate(
startDate,
endDate),
new AggregationDescriptor()
.setNrOfBins(2)
.setAggregations(Arrays.asList(Aggregation.min, Aggregation.mean, Aggregation.max))
.setExtrema(Arrays.asList(Extrema.minValue, Extrema.maxValue)),
TEST_CHANNEL_01);
String content = mapper.writeValueAsString(request);
System.out.println(content);
this.mockMvc
.perform(
MockMvcRequestBuilders
.post(QueryRestController.PATH_QUERY)
.contentType(MediaType.APPLICATION_JSON)
.content(content)
)
.andDo(MockMvcResultHandlers.print())
.andExpect(MockMvcResultMatchers.status().isOk())
.andExpect(MockMvcResultMatchers.jsonPath("$").isArray())
.andExpect(MockMvcResultMatchers.jsonPath("$[0]").exists())
.andExpect(MockMvcResultMatchers.jsonPath("$[0].channel").isMap())
.andExpect(MockMvcResultMatchers.jsonPath("$[0].channel.name").value(TEST_CHANNEL_01))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].channel.backend").value(Backend.SF_DATABUFFER.getKey()))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data").isArray())
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].pulseId").value(100))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].globalSeconds").value(
TestTimeUtils.getTimeStr(1, 0)))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].eventCount").value(5))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].value.min").value(100.0))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].value.mean").value(102.0))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].value.max").value(104.0))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].extrema.minValue.value").value(100.0))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].extrema.minValue.pulseId").value(100))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].extrema.minValue.globalSeconds").value(
TestTimeUtils.getTimeStr(1, 0)))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].extrema.minValue.eventCount").value(1))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].extrema.maxValue.value").value(104.0))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].extrema.maxValue.pulseId").value(104))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].extrema.maxValue.globalSeconds").value(
TestTimeUtils.getTimeStr(1, 40000000)))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].extrema.maxValue.eventCount").value(1))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].pulseId").value(105))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].globalSeconds").value(
TestTimeUtils.getTimeStr(1, 50000000)))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].eventCount").value(5))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].value.min").value(105.0))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].value.mean").value(107.0))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].value.max").value(109.0))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].extrema.minValue.value").value(105.0))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].extrema.minValue.pulseId").value(105))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].extrema.minValue.globalSeconds").value(
TestTimeUtils.getTimeStr(1, 50000000)))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].extrema.minValue.eventCount").value(1))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].extrema.maxValue.value").value(109.0))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].extrema.maxValue.pulseId").value(109))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].extrema.maxValue.globalSeconds").value(
TestTimeUtils.getTimeStr(1, 90000000)))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].extrema.maxValue.eventCount").value(1));
}
@Test
public void testDateRangeQueryBinSizeAggregate() throws Exception {
long startTime = 10000;
@ -707,6 +786,169 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest {
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[9].value.max").value(1099.0));
}
@Test
public void testDateRangeQueryIndexAggregate() throws Exception {
long startTime = 1000;
long endTime = 1099;
String startDate = TimeUtils.format(startTime);
String endDate = TimeUtils.format(endTime);
DAQQuery request =
new DAQQuery(
new RequestRangeDate(
startDate,
endDate),
new AggregationDescriptor()
.setAggregationType(AggregationType.index)
.setNrOfBins(2)
.setAggregations(Arrays.asList(Aggregation.min, Aggregation.mean, Aggregation.max)),
TEST_CHANNEL_WAVEFORM_01);
String content = mapper.writeValueAsString(request);
System.out.println(content);
this.mockMvc
.perform(
MockMvcRequestBuilders
.post(QueryRestController.PATH_QUERY)
.contentType(MediaType.APPLICATION_JSON)
.content(content)
)
.andDo(MockMvcResultHandlers.print())
.andExpect(MockMvcResultMatchers.status().isOk())
.andExpect(MockMvcResultMatchers.jsonPath("$").isArray())
.andExpect(MockMvcResultMatchers.jsonPath("$[0]").exists())
.andExpect(MockMvcResultMatchers.jsonPath("$[0].channel").isMap())
.andExpect(MockMvcResultMatchers.jsonPath("$[0].channel.name").value(TEST_CHANNEL_WAVEFORM_01))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].channel.backend").value(Backend.SF_DATABUFFER.getKey()))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data").isArray())
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].pulseId").value(100))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].globalSeconds").value(
TestTimeUtils.getTimeStr(1, 0)))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].eventCount").value(5))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].value").isArray())
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].value[0].min").value(100.0))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].value[0].mean").value(102.0))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].value[0].max").value(104.0))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].value[1].min").value(100.0))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].value[1].mean").value(102.0))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].value[1].max").value(104.0))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].pulseId").value(105))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].globalSeconds").value(
TestTimeUtils.getTimeStr(1, 50000000)))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].eventCount").value(5))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].value").isArray())
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].value[0].min").value(105.0))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].value[0].mean").value(107.0))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].value[0].max").value(109.0))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].value[1].min").value(105.0))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].value[1].mean").value(107.0))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].value[1].max").value(109.0));
}
@Test
public void testDateRangeQueryIndexAggregateExtrema() throws Exception {
long startTime = 1000;
long endTime = 1099;
String startDate = TimeUtils.format(startTime);
String endDate = TimeUtils.format(endTime);
DAQQuery request =
new DAQQuery(
new RequestRangeDate(
startDate,
endDate),
new AggregationDescriptor()
.setAggregationType(AggregationType.index)
.setNrOfBins(2)
.setAggregations(Arrays.asList(Aggregation.min, Aggregation.mean, Aggregation.max))
.setExtrema(Arrays.asList(Extrema.minValue, Extrema.maxValue)),
TEST_CHANNEL_WAVEFORM_01);
String content = mapper.writeValueAsString(request);
System.out.println(content);
this.mockMvc
.perform(
MockMvcRequestBuilders
.post(QueryRestController.PATH_QUERY)
.contentType(MediaType.APPLICATION_JSON)
.content(content)
)
.andDo(MockMvcResultHandlers.print())
.andExpect(MockMvcResultMatchers.status().isOk())
.andExpect(MockMvcResultMatchers.jsonPath("$").isArray())
.andExpect(MockMvcResultMatchers.jsonPath("$[0]").exists())
.andExpect(MockMvcResultMatchers.jsonPath("$[0].channel").isMap())
.andExpect(MockMvcResultMatchers.jsonPath("$[0].channel.name").value(TEST_CHANNEL_WAVEFORM_01))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].channel.backend").value(Backend.SF_DATABUFFER.getKey()))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data").isArray())
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].pulseId").value(100))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].globalSeconds").value(
TestTimeUtils.getTimeStr(1, 0)))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].eventCount").value(5))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].value").isArray())
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].value[0].min").value(100.0))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].value[0].mean").value(102.0))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].value[0].max").value(104.0))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].value[1].min").value(100.0))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].value[1].mean").value(102.0))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].value[1].max").value(104.0))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].extrema").isArray())
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].extrema[0].minValue.value").value(100.0))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].extrema[0].minValue.pulseId").value(100))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].extrema[0].minValue.globalSeconds").value(
TestTimeUtils.getTimeStr(1, 0)))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].extrema[0].minValue.eventCount").value(1))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].extrema[0].maxValue.value").value(104.0))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].extrema[0].maxValue.pulseId").value(104))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].extrema[0].maxValue.globalSeconds").value(
TestTimeUtils.getTimeStr(1, 40000000)))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].extrema[0].maxValue.eventCount").value(1))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].extrema[1].minValue.value").value(100.0))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].extrema[1].minValue.pulseId").value(100))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].extrema[1].minValue.globalSeconds").value(
TestTimeUtils.getTimeStr(1, 0)))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].extrema[1].minValue.eventCount").value(1))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].extrema[1].maxValue.value").value(104.0))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].extrema[1].maxValue.pulseId").value(104))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].extrema[1].maxValue.globalSeconds").value(
TestTimeUtils.getTimeStr(1, 40000000)))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].extrema[1].maxValue.eventCount").value(1))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].pulseId").value(105))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].globalSeconds").value(
TestTimeUtils.getTimeStr(1, 50000000)))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].eventCount").value(5))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].value").isArray())
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].value[0].min").value(105.0))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].value[0].mean").value(107.0))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].value[0].max").value(109.0))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].value[1].min").value(105.0))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].value[1].mean").value(107.0))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].value[1].max").value(109.0))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].extrema").isArray())
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].extrema[0].minValue.value").value(105.0))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].extrema[0].minValue.pulseId").value(105))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].extrema[0].minValue.globalSeconds").value(
TestTimeUtils.getTimeStr(1, 50000000)))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].extrema[0].minValue.eventCount").value(1))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].extrema[0].maxValue.value").value(109.0))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].extrema[0].maxValue.pulseId").value(109))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].extrema[0].maxValue.globalSeconds").value(
TestTimeUtils.getTimeStr(1, 90000000)))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].extrema[0].maxValue.eventCount").value(1))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].extrema[1].minValue.value").value(105.0))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].extrema[1].minValue.pulseId").value(105))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].extrema[1].minValue.globalSeconds").value(
TestTimeUtils.getTimeStr(1, 50000000)))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].extrema[1].minValue.eventCount").value(1))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].extrema[1].maxValue.value").value(109.0))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].extrema[1].maxValue.pulseId").value(109))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].extrema[1].maxValue.globalSeconds").value(
TestTimeUtils.getTimeStr(1, 90000000)))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].extrema[1].maxValue.eventCount").value(1));
}
@Test
public void testGzipFileSuffixHeader() throws Exception {
DAQQuery request = new DAQQuery(

View File

@ -168,8 +168,9 @@ public class DummyCassandraReader implements CassandraReader {
FieldNames.FIELD_PULSE_ID)) ? i : PropertiesUtils.DEFAULT_VALUE_BIGINT_PRIMITIVE;
if (channelLower.contains("waveform")) {
long[] value = random.longs(2048).toArray();
long[] value = random.longs(8).toArray();
value[0] = i;
value[1] = i;
return new ChannelEventImpl(
channel,
iocTime,
@ -180,11 +181,12 @@ public class DummyCassandraReader implements CassandraReader {
);
} else if (channelLower.contains("image")) {
int x = 640;
int y = 480;
int x = 4;
int y = 8;
int[] shape = new int[] {x, y};
long[] value = random.longs(x * y).toArray();
value[0] = i;
value[1] = i;
return new ChannelEventImpl(
channel,
iocTime,