ATEST-296

This commit is contained in:
Fabian Märki
2015-12-14 11:27:08 +01:00
parent c3a307f25c
commit bc1ecb312d
8 changed files with 328 additions and 108 deletions

View File

@ -147,8 +147,8 @@ There exist following fields:
- **range**: The range of the query (see [Query Range](Readme.md#query_range)).
- **ordering**: The ordering of the data (see [here](https://github.psi.ch/projects/ST/repos/ch.psi.daq.common/browse/src/main/java/ch/psi/daq/common/ordering/Ordering.java) for possible values).
- **fields**: The requested fields (see [here](https://github.psi.ch/projects/ST/repos/ch.psi.daq.query/browse/src/main/java/ch/psi/daq/query/model/QueryField.java) for possible values).
- **nrOfBins**: Activates data binning. Specifies the number of bins the pulse/time range should be devided into.
- **binSize**: Activates data binning. Specifies the number of pulses per bin for pulse-range queries or the number of milliseconds per bin for time-range queries.
- **nrOfBins**: Activates data binning. Specifies the number of bins the pulse/time range should be divided into.
- **binSize**: Activates data binning. Specifies the number of pulses per bin for pulse-range queries or the number of milliseconds per bin for time-range queries (using number of pulses and number of milliseconds makes this binning strategy consistent between channel with different update frequencies).
- **aggregations**: Activates data aggregation. Array of requested aggregations (see [here](https://github.psi.ch/projects/ST/repos/ch.psi.daq.query/browse/src/main/java/ch/psi/daq/query/model/Aggregation.java) for possible values). These values will be added to the *data* array response.
- **aggregationType**: Specifies the type of aggregation (see [here](https://github.psi.ch/projects/ST/repos/ch.psi.daq.query/browse/src/main/java/ch/psi/daq/query/model/AggregationType.java)). The default type is *value* aggregation (e.g., sum([1,2,3])=6). Alternatively, it is possible to define *index* aggregation for multiple arrays in combination with binning (e.g., sum([1,2,3], [3,2,1]) = [4,4,4]).
- **aggregateChannels**: Specifies whether the data of the requested channels should be combined together using the defined aggregation (values: true|**false**)
@ -577,7 +577,7 @@ Array value [aggregations](https://github.psi.ch/projects/ST/repos/ch.psi.daq.qu
![Value Aggregation](doc/images/Value_Aggregation.png)
##### Value Aggregation with Binning
##### Value Aggregation with Binning (nrOfBins)
```json
{
@ -632,7 +632,66 @@ curl -H "Content-Type: application/json" -X POST -d '{"nrOfBins":2,"aggregationT
Array value [aggregations](https://github.psi.ch/projects/ST/repos/ch.psi.daq.query/browse/src/main/java/ch/psi/daq/query/model/Aggregation.java) with additional binning:
![Value Aggregation with Binning](doc/images/Value_Binning.png)
![Value Aggregation with Binning](doc/images/Value_Binning_NrOfBins.png)
##### Value Aggregation with Binning (binSize)
**binSize** specifies the number of pulses per bin for pulse-range queries or the number of milliseconds per bin for time-range queries (using number of pulses and number of milliseconds makes this binning strategy consistent between channel with different update frequencies).
```json
{
"binSize":10,
"aggregationType":"value",
"aggregations":["min","max","mean"],
"fields":["globalMillis","value"],
"range":{
"globalMillis":0,
"globalMillis":3
},
"channels":[
"Channel_01"
]
}
```
###### Command
```bash
curl -H "Content-Type: application/json" -X POST -d '{"binSize":10,"aggregationType":"value","aggregations":["min","max","mean"],"fields":["globalMillis","value"],"range":{"globalMillis":0,"globalMillis":3},"channels":["Channel_01"]}' http://data-api.psi.ch/sf/query
```
###### Response
```json
[
{
"channel":"Channel_01",
"data":[
{
"globalMillis":0,
"value":{
"min":1.0,
"max":5.0,
"mean":3.0
}
},
{
"globalMillis":20,
"value":{
"min":3.0,
"max":7.0,
"mean":5.0
}
}
]
}
]
```
Array value [aggregations](https://github.psi.ch/projects/ST/repos/ch.psi.daq.query/browse/src/main/java/ch/psi/daq/query/model/Aggregation.java) with additional binning:
![Value Aggregation with Binning](doc/images/Value_Binning_BinSize.png)
##### Index Aggregation

Binary file not shown.

Before

Width:  |  Height:  |  Size: 44 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 48 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 48 KiB

View File

@ -36,7 +36,9 @@ public class QueryRestControllerTest extends AbstractDaqRestTest {
@Resource
private CorsFilter corsFilter;
public static final String[] TEST_CHANNEL_NAMES = new String[] {"testChannel1", "testChannel2"};
public static final String TEST_CHANNEL_01 = "testChannel1";
public static final String TEST_CHANNEL_02 = "testChannel2";
public static final String[] TEST_CHANNEL_NAMES = new String[] {TEST_CHANNEL_01, TEST_CHANNEL_02};
@After
public void tearDown() throws Exception {}
@ -168,8 +170,8 @@ public class QueryRestControllerTest extends AbstractDaqRestTest {
public void testPulseRangeQuery() throws Exception {
DAQQuery request = new DAQQuery(
new RequestRangePulseId(
100,
101),
10,
11),
TEST_CHANNEL_NAMES);
String content = mapper.writeValueAsString(request);
@ -185,15 +187,18 @@ public class QueryRestControllerTest extends AbstractDaqRestTest {
.andExpect(MockMvcResultMatchers.status().isOk())
.andExpect(MockMvcResultMatchers.jsonPath("$").isArray())
.andExpect(MockMvcResultMatchers.jsonPath("$[0]").exists())
.andExpect(MockMvcResultMatchers.jsonPath("$[0].channel").value("testChannel1"))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data").isArray())
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].pulseId").value(100))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].pulseId").value(101))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].channel").value(TEST_CHANNEL_01))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].pulseId").value(10))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].globalMillis").value(100))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].pulseId").value(11))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].globalMillis").value(110))
.andExpect(MockMvcResultMatchers.jsonPath("$[1]").exists())
.andExpect(MockMvcResultMatchers.jsonPath("$[1].channel").value("testChannel2"))
.andExpect(MockMvcResultMatchers.jsonPath("$[1].channel").value(TEST_CHANNEL_02))
.andExpect(MockMvcResultMatchers.jsonPath("$[1].data").isArray())
.andExpect(MockMvcResultMatchers.jsonPath("$[1].data[0].pulseId").value(100))
.andExpect(MockMvcResultMatchers.jsonPath("$[1].data[1].pulseId").value(101));
.andExpect(MockMvcResultMatchers.jsonPath("$[1].data[0].pulseId").value(10))
.andExpect(MockMvcResultMatchers.jsonPath("$[1].data[0].globalMillis").value(100))
.andExpect(MockMvcResultMatchers.jsonPath("$[1].data[1].pulseId").value(11))
.andExpect(MockMvcResultMatchers.jsonPath("$[1].data[1].globalMillis").value(110));
}
@Test
@ -201,7 +206,7 @@ public class QueryRestControllerTest extends AbstractDaqRestTest {
DAQQuery request = new DAQQuery(
new RequestRangeTime(
100,
101),
110),
TEST_CHANNEL_NAMES);
String content = mapper.writeValueAsString(request);
@ -215,21 +220,25 @@ public class QueryRestControllerTest extends AbstractDaqRestTest {
.andExpect(MockMvcResultMatchers.status().isOk())
.andExpect(MockMvcResultMatchers.jsonPath("$").isArray())
.andExpect(MockMvcResultMatchers.jsonPath("$[0]").exists())
.andExpect(MockMvcResultMatchers.jsonPath("$[0].channel").value("testChannel1"))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].channel").value(TEST_CHANNEL_01))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data").isArray())
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].pulseId").value(100))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].pulseId").value(101))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].pulseId").value(10))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].globalMillis").value(100))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].pulseId").value(11))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].globalMillis").value(110))
.andExpect(MockMvcResultMatchers.jsonPath("$[1]").exists())
.andExpect(MockMvcResultMatchers.jsonPath("$[1].channel").value("testChannel2"))
.andExpect(MockMvcResultMatchers.jsonPath("$[1].channel").value(TEST_CHANNEL_02))
.andExpect(MockMvcResultMatchers.jsonPath("$[1].data").isArray())
.andExpect(MockMvcResultMatchers.jsonPath("$[1].data[0].pulseId").value(100))
.andExpect(MockMvcResultMatchers.jsonPath("$[1].data[1].pulseId").value(101));
.andExpect(MockMvcResultMatchers.jsonPath("$[1].data[0].pulseId").value(10))
.andExpect(MockMvcResultMatchers.jsonPath("$[1].data[0].globalMillis").value(100))
.andExpect(MockMvcResultMatchers.jsonPath("$[1].data[1].pulseId").value(11))
.andExpect(MockMvcResultMatchers.jsonPath("$[1].data[1].globalMillis").value(110));
}
@Test
public void testDateRangeQuery() throws Exception {
String startDate = RequestRangeDate.format(100);
String endDate = RequestRangeDate.format(101);
String endDate = RequestRangeDate.format(110);
DAQQuery request = new DAQQuery(
new RequestRangeDate(
startDate,
@ -250,15 +259,19 @@ public class QueryRestControllerTest extends AbstractDaqRestTest {
.andExpect(MockMvcResultMatchers.status().isOk())
.andExpect(MockMvcResultMatchers.jsonPath("$").isArray())
.andExpect(MockMvcResultMatchers.jsonPath("$[0]").exists())
.andExpect(MockMvcResultMatchers.jsonPath("$[0].channel").value("testChannel1"))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].channel").value(TEST_CHANNEL_01))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data").isArray())
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].pulseId").value(100))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].pulseId").value(101))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].pulseId").value(10))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].globalMillis").value(100))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].pulseId").value(11))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].globalMillis").value(110))
.andExpect(MockMvcResultMatchers.jsonPath("$[1]").exists())
.andExpect(MockMvcResultMatchers.jsonPath("$[1].channel").value("testChannel2"))
.andExpect(MockMvcResultMatchers.jsonPath("$[1].channel").value(TEST_CHANNEL_02))
.andExpect(MockMvcResultMatchers.jsonPath("$[1].data").isArray())
.andExpect(MockMvcResultMatchers.jsonPath("$[1].data[0].pulseId").value(100))
.andExpect(MockMvcResultMatchers.jsonPath("$[1].data[1].pulseId").value(101));
.andExpect(MockMvcResultMatchers.jsonPath("$[1].data[0].pulseId").value(10))
.andExpect(MockMvcResultMatchers.jsonPath("$[1].data[0].globalMillis").value(100))
.andExpect(MockMvcResultMatchers.jsonPath("$[1].data[1].pulseId").value(11))
.andExpect(MockMvcResultMatchers.jsonPath("$[1].data[1].globalMillis").value(110));
}
@Test
@ -302,5 +315,101 @@ public class QueryRestControllerTest extends AbstractDaqRestTest {
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data.maxima.max.event.pulseId").value(101));
}
@Test
public void testDateRangeQueryNrOfBinsAggregate() throws Exception {
long startTime = 100;
long endTime = 199;
String startDate = RequestRangeDate.format(startTime);
String endDate = RequestRangeDate.format(endTime);
DAQQuery request = new DAQQuery(
new RequestRangeDate(
startDate,
endDate),
TEST_CHANNEL_01);
request.setNrOfBins(2);
String content = mapper.writeValueAsString(request);
System.out.println(content);
this.mockMvc
.perform(
MockMvcRequestBuilders
.post(QueryRestController.QUERY)
.contentType(MediaType.APPLICATION_JSON)
.content(content)
)
.andDo(MockMvcResultHandlers.print())
.andExpect(MockMvcResultMatchers.status().isOk())
.andExpect(MockMvcResultMatchers.jsonPath("$").isArray())
.andExpect(MockMvcResultMatchers.jsonPath("$[0]").exists())
.andExpect(MockMvcResultMatchers.jsonPath("$[0].channel").value(TEST_CHANNEL_01))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data").isArray())
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].pulseId").value(10))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].globalMillis").value(100))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].eventCount").value(5))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].pulseId").value(15))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].globalMillis").value(150))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].eventCount").value(5));
}
@Test
public void testDateRangeQueryBinSizeAggregate() throws Exception {
long startTime = 1000;
long endTime = 1999;
String startDate = RequestRangeDate.format(startTime);
String endDate = RequestRangeDate.format(endTime);
DAQQuery request = new DAQQuery(
new RequestRangeDate(
startDate,
endDate),
TEST_CHANNEL_01);
request.setBinSize(100);
String content = mapper.writeValueAsString(request);
System.out.println(content);
this.mockMvc
.perform(
MockMvcRequestBuilders
.post(QueryRestController.QUERY)
.contentType(MediaType.APPLICATION_JSON)
.content(content)
)
.andDo(MockMvcResultHandlers.print())
.andExpect(MockMvcResultMatchers.status().isOk())
.andExpect(MockMvcResultMatchers.jsonPath("$").isArray())
.andExpect(MockMvcResultMatchers.jsonPath("$[0]").exists())
.andExpect(MockMvcResultMatchers.jsonPath("$[0].channel").value(TEST_CHANNEL_01))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data").isArray())
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].pulseId").value(100))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].globalMillis").value(1000))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].eventCount").value(10))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].pulseId").value(110))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].globalMillis").value(1100))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[1].eventCount").value(10))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[2].pulseId").value(120))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[2].globalMillis").value(1200))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[2].eventCount").value(10))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[3].pulseId").value(130))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[3].globalMillis").value(1300))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[3].eventCount").value(10))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[4].pulseId").value(140))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[4].globalMillis").value(1400))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[4].eventCount").value(10))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[5].pulseId").value(150))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[5].globalMillis").value(1500))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[5].eventCount").value(10))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[6].pulseId").value(160))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[6].globalMillis").value(1600))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[6].eventCount").value(10))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[7].pulseId").value(170))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[7].globalMillis").value(1700))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[7].eventCount").value(10))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[8].pulseId").value(180))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[8].globalMillis").value(1800))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[8].eventCount").value(10))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[9].pulseId").value(190))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[9].globalMillis").value(1900))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[9].eventCount").value(10));
}
}

View File

@ -30,19 +30,19 @@ import com.google.common.collect.Lists;
*
*/
public class DummyCassandraReader implements CassandraReader {
private static final int KEYSPACE = 1;
private CassandraDataGen dataGen;
private String[] channels;
/**
*
*/
public DummyCassandraReader() {
this.dataGen = new CassandraDataGen();
this.channels = new String[]{
"testChannel1",
this.channels = new String[] {
"testChannel1",
"testChannel2",
"BoolScalar",
"BoolWaveform",
@ -68,8 +68,9 @@ public class DummyCassandraReader implements CassandraReader {
"Float64Waveform",
"StringScalar"};
}
/**
* @{inheritDoc}
* @{inheritDoc
*/
@Override
public Stream<String> getChannelStream(String regex) {
@ -82,7 +83,7 @@ public class DummyCassandraReader implements CassandraReader {
}
/**
* @{inheritDoc}
* @{inheritDoc
*/
@Override
public Stream<? extends DataEvent> getEventStream(String channel, long startPulseId, long endPulseId,
@ -91,77 +92,84 @@ public class DummyCassandraReader implements CassandraReader {
}
/**
* @{inheritDoc}
* @{inheritDoc
*/
@Override
public Stream<? extends DataEvent> getEventStream(String channel, long startMillis, long startNanos, long endMillis,
long endNanos, Ordering ordering, String... columns) {
return getDummyEventStream(channel, startMillis, endMillis);
return getDummyEventStream(channel, startMillis / 10, endMillis / 10);
}
/**
* @{inheritDoc}
* @{inheritDoc
*/
@Override
public Stream<ChannelEvent> getEventStream(EventQuery eventQuery, Stream<? extends ChannelEventQuery> queryProviders) {
List<ChannelEvent> result = Lists.newArrayList();
queryProviders.forEach(ceq -> {
Stream<ChannelEvent> result = queryProviders.map(ceq -> {
if (ceq instanceof ChannelEventQueryInfo) {
result.add(getEvent((ChannelEventQueryInfo) ceq));
return getEvent((ChannelEventQueryInfo) ceq);
} else {
throw new UnsupportedOperationException("This is not yet implemented!");
}
});
return result.stream();
return result;
}
/**
* @{inheritDoc}
*/
* @{inheritDoc
*/
@Override
public Stream<ChannelEvent> getEventStream(PulseIdRangeQuery query) {
Stream<ChannelEvent> dummyEventStream = getDummyEventStream(query.getChannel(), query.getStartPulseId(), query.getEndPulseId())
.map(ce -> { return (ChannelEvent) ce; });
Stream<ChannelEvent> dummyEventStream =
getDummyEventStream(query.getChannel(), query.getStartPulseId(), query.getEndPulseId())
.map(ce -> {
return (ChannelEvent) ce;
});
return dummyEventStream;
}
/**
* @{inheritDoc}
* @{inheritDoc
*/
@Override
public Stream<ChannelEvent> getEventStream(TimeRangeQuery query) {
Stream<ChannelEvent> dummyEventStream = getDummyEventStream(query.getChannel(), query.getStartMillis(), query.getEndMillis())
.map(ce -> { return (ChannelEvent) ce; });
Stream<ChannelEvent> dummyEventStream =
getDummyEventStream(query.getChannel(), query.getStartMillis() / 10, query.getEndMillis() / 10)
.map(ce -> {
return (ChannelEvent) ce;
});
return dummyEventStream;
}
private Stream<? extends DataEvent> getDummyEventStream(String channel, long startIndex, long endIndex) {
return dataGen.generateData(startIndex, (endIndex-startIndex + 1),
i -> i,
i -> i,
i -> i,
i -> i,
return dataGen.generateData(startIndex, (endIndex - startIndex + 1),
i -> i * 10,
i -> 0,
i -> i,
i -> i * 10,
i -> 0,
i -> Long.valueOf(i),
channel).stream();
}
private List<? extends DataEvent> getDummyEvents(String channel, long startIndex, long endIndex) {
return dataGen.generateData(startIndex, (endIndex-startIndex + 1),
i -> i,
i -> i,
i -> i,
i -> i,
return dataGen.generateData(startIndex, (endIndex - startIndex + 1),
i -> i * 10,
i -> 0,
i -> i,
i -> i * 10,
i -> 0,
i -> Long.valueOf(i),
channel);
}
/**
* @{inheritDoc}
* @{inheritDoc
*/
@Override
public List<String> getChannels() {
@ -169,26 +177,30 @@ public class DummyCassandraReader implements CassandraReader {
}
/**
* @{inheritDoc}
* @{inheritDoc
*/
@Override
public List<String> getChannels(String regex) {
return Lists.newArrayList(channels).stream().filter(s -> { return s.contains(regex); }).collect(Collectors.toList());
return Lists.newArrayList(channels).stream().filter(s -> {
return s.contains(regex);
}).collect(Collectors.toList());
}
/**
* @{inheritDoc}
* @{inheritDoc
*/
@Override
public ChannelEvent getEvent(ChannelEventQueryInfo queryInfo, String... columns) {
if (queryInfo.getPulseId() > 0) {
return (ChannelEvent) getDummyEvents(queryInfo.getChannel(), queryInfo.getPulseId(), queryInfo.getPulseId()).get(0);
return (ChannelEvent) getDummyEvents(queryInfo.getChannel(), queryInfo.getPulseId(), queryInfo.getPulseId())
.get(0);
}
return (ChannelEvent) getDummyEvents(queryInfo.getChannel(), queryInfo.getGlobalMillis(), queryInfo.getGlobalMillis()).get(0);
return (ChannelEvent) getDummyEvents(queryInfo.getChannel(), queryInfo.getGlobalMillis() / 10,
queryInfo.getGlobalMillis() / 10).get(0);
}
/**
* @{inheritDoc}
* @{inheritDoc
*/
@Override
public CompletableFuture<ChannelEvent> getEventAsync(ChannelEventQueryInfo queryInfo, String... columns) {
@ -196,61 +208,63 @@ public class DummyCassandraReader implements CassandraReader {
}
/**
* @{inheritDoc}
* @{inheritDoc
*/
@Override
public Stream<? extends ChannelEventQuery> getChannelEventQueryStream(PulseIdRangeQuery query) {
return dataGen.generateMetaPulseId(
KEYSPACE,
ConverterProvider.TYPE_INT32,
KEYSPACE,
ConverterProvider.TYPE_INT32,
null,
query.getStartPulseId(),
query.getStartPulseId(),
(query.getEndPulseId() - query.getStartPulseId() + 1),
i -> i,
i -> i,
i -> i * 10,
i -> 0,
i -> i,
query.getChannel()).stream();
}
/**
* @{inheritDoc}
* @{inheritDoc
*/
@Override
public Stream<? extends ChannelEventQuery> getChannelEventQueryStream(TimeRangeQuery query) {
return dataGen.generateMetaTime(
KEYSPACE,
ConverterProvider.TYPE_INT32,
KEYSPACE,
ConverterProvider.TYPE_INT32,
null,
query.getStartMillis(),
(query.getEndMillis() - query.getStartMillis() + 1),
i -> i,
i -> i,
query.getStartMillis() / 10,
((query.getEndMillis() - query.getStartMillis()) / 10 + 1),
i -> i * 10,
i -> 0,
i -> i,
query.getChannel()).stream();
}
/**
* @{inheritDoc}
* @{inheritDoc
*/
@Override
public Stream<MetaPulseId> getMetaPulseIdStream(PulseIdRangeQuery query) {
return getChannelEventQueryStream(query).map(r -> { return (MetaPulseId) r; });
return getChannelEventQueryStream(query).map(r -> {
return (MetaPulseId) r;
});
}
/**
* @{inheritDoc}
* @{inheritDoc
*/
@Override
public Stream<MetaTime> getMetaTimeStream(TimeRangeQuery query) {
return getChannelEventQueryStream(query).map(r -> { return (MetaTime) r; });
return getChannelEventQueryStream(query).map(r -> {
return (MetaTime) r;
});
}
}

View File

@ -1,6 +1,7 @@
package ch.psi.daq.test.queryrest.query;
import java.util.List;
import java.util.Random;
import java.util.regex.Pattern;
import java.util.stream.LongStream;
import java.util.stream.Stream;
@ -18,6 +19,8 @@ public class DummyDataReader implements DataReader {
public static final String TEST_CHANNEL_2 = "testChannel2";
public static final List<String> TEST_CHANNEL_NAMES = Lists.newArrayList(TEST_CHANNEL_1, TEST_CHANNEL_2);
private final Random random = new Random(0);
@Override
public Stream<String> getChannelStream(String regex) {
Stream<String> channelStream = TEST_CHANNEL_NAMES.stream();
@ -39,20 +42,55 @@ public class DummyDataReader implements DataReader {
@Override
public Stream<? extends DataEvent> getEventStream(String channel, long startMillis, long startNanos, long endMillis,
long endNanos, Ordering ordering, String... columns) {
return getElements(channel, startMillis, endMillis);
return getElements(channel, startMillis / 10, endMillis / 10);
}
protected Stream<? extends DataEvent> getElements(String channel, long start, long end) {
return LongStream.rangeClosed(start, end).mapToObj(i -> {
return new ChannelEvent(
channel,
i,
0,
i,
i,
0,
123 // dummy value
);
String channelLower = channel.toLowerCase();
Stream<? extends DataEvent> eventStream = LongStream.rangeClosed(start, end).mapToObj(i -> {
if (channelLower.contains("waveform")) {
long[] value = random.longs(2048).toArray();
value[0] = i;
return new ChannelEvent(
channel,
i * 10,
0,
i,
i * 10,
0,
value
);
} else if (channelLower.contains("image")) {
int x = 640;
int y = 480;
int[] shape = new int[] {x, y};
long[] value = random.longs(x * y).toArray();
value[0] = i;
return new ChannelEvent(
channel,
i * 10,
0,
i,
i * 10,
0,
value,
shape
);
} else {
return new ChannelEvent(
channel,
i * 10,
0,
i,
i * 10,
0,
i
);
}
});
return eventStream;
}
}