Provide option to define alignment

This commit is contained in:
Fabian Märki
2018-07-17 12:24:31 +02:00
parent 4ad546e927
commit a0c916e8bb
5 changed files with 581 additions and 35 deletions

View File

@@ -17,8 +17,11 @@ import ch.psi.daq.domain.config.DomainConfig;
import ch.psi.daq.domain.query.DAQQueries;
import ch.psi.daq.domain.query.DAQQuery;
import ch.psi.daq.domain.query.DAQQueryElement;
import ch.psi.daq.domain.query.mapping.Mapping;
import ch.psi.daq.domain.query.operation.Aggregation;
import ch.psi.daq.domain.query.operation.EventField;
import ch.psi.daq.domain.query.response.Response;
import ch.psi.daq.domain.query.response.ResponseFormat;
import ch.psi.daq.domain.query.transform.ExecutionEnvironment;
import ch.psi.daq.domain.query.transform.ValueTransformationSequence;
import ch.psi.daq.domain.request.Request;
@@ -53,16 +56,16 @@ public class EventQueryValidator implements Validator, ApplicationContextAware {
@Override
public void validate(final Object target, final Errors errors) {
if (target instanceof DAQQuery) {
this.checkElement((DAQQuery) target, errors);
this.checkElement(((DAQQuery) target).getResponse(), (DAQQuery) target, errors);
} else if (target instanceof DAQQueries) {
final DAQQueries queries = (DAQQueries) target;
for (final DAQQueryElement daqQueryElement : queries) {
this.checkElement(daqQueryElement, errors);
this.checkElement(queries.getResponse(), daqQueryElement, errors);
}
}
}
private void checkElement(final DAQQueryElement query, final Errors errors) {
private void checkElement(final Response response, final DAQQueryElement query, final Errors errors) {
// set default values (if not set)
if (query.getEventFields() == null || query.getEventFields().isEmpty()) {
query.setEventFields(new LinkedHashSet<>(defaultResponseFields));
@@ -140,5 +143,9 @@ public class EventQueryValidator implements Validator, ApplicationContextAware {
transformationSequence.setExecutionEnvironment(ExecutionEnvironment.QUERYING);
}
}
if (response != null && response.getFormat() == ResponseFormat.CSV && query.getMapping() == null) {
query.setMapping(new Mapping());
}
}
}

View File

@@ -15,7 +15,9 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.ToLongFunction;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -31,9 +33,9 @@ import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import ch.psi.daq.common.stream.StreamIterable;
import ch.psi.daq.common.stream.match.ReusingMapCreator;
import ch.psi.daq.common.stream.match.MapFiller;
import ch.psi.daq.common.stream.match.Padder;
import ch.psi.daq.common.stream.match.ReusingMapCreator;
import ch.psi.daq.common.stream.match.StreamMatcher;
import ch.psi.daq.common.tuple.Quadruple;
import ch.psi.daq.domain.DataEvent;
@@ -45,6 +47,8 @@ import ch.psi.daq.domain.query.DAQQueries;
import ch.psi.daq.domain.query.DAQQueryElement;
import ch.psi.daq.domain.query.backend.BackendQuery;
import ch.psi.daq.domain.query.backend.analyzer.BackendQueryAnalyzer;
import ch.psi.daq.domain.query.bin.BinningStrategy;
import ch.psi.daq.domain.query.mapping.Alignment;
import ch.psi.daq.domain.query.mapping.IncompleteStrategy;
import ch.psi.daq.domain.query.mapping.Mapping;
import ch.psi.daq.domain.query.operation.Aggregation;
@@ -52,6 +56,8 @@ import ch.psi.daq.domain.query.operation.ConfigField;
import ch.psi.daq.domain.query.operation.EventField;
import ch.psi.daq.domain.query.operation.Extrema;
import ch.psi.daq.domain.query.response.Response;
import ch.psi.daq.domain.request.range.RequestRange;
import ch.psi.daq.query.bin.aggregate.BinnedValueCombiner;
import ch.psi.daq.queryrest.config.QueryRestConfig;
import ch.psi.daq.queryrest.response.AbstractHTTPResponse;
import ch.psi.daq.queryrest.response.ResponseStreamWriter;
@@ -126,7 +132,7 @@ public class CSVResponseStreamWriter implements ResponseStreamWriter, Applicatio
entry.getValue()
.sequential()
.forEach(quadruple -> {
// single BackendQuery instance for all
// single BackendQuery instance for all
backendQueryRef.compareAndSet(null, quadruple.getFirst());
if (query.hasConfigFields() && quadruple.getThird() instanceof Stream) {
@@ -135,7 +141,7 @@ public class CSVResponseStreamWriter implements ResponseStreamWriter, Applicatio
final List<ChannelConfiguration> configList =
((Stream<ChannelConfiguration>) quadruple.getThird())
.collect(Collectors.toList());
.collect(Collectors.toList());
configLists.put(quadruple.getSecond(), configList);
}
@@ -152,17 +158,32 @@ public class CSVResponseStreamWriter implements ResponseStreamWriter, Applicatio
});
});
final DAQQueryElement daqQuery = daqQueryRef.get();
final BackendQuery backendQuery = backendQueryRef.get();
final RequestRange requestRange = backendQuery.getRequest().getRequestRange();
BinningStrategy binningStrategy = backendQuery.getBinningStrategy();
final Mapping mapping = daqQueryRef.get().getMappingOrDefault(DEFAULT_MAPPING);
final Padder<ChannelName, DataEvent> padder = mapping.getIncomplete().getPadder(backendQueryRef.get());
ToLongFunction<DataEvent> alignmentProvider = Alignment.getAlignmentProvider(daqQuery.getMapping().getAlignment(), binningStrategy);
if (binningStrategy == null) {
binningStrategy = Alignment.getBinningStrategy(daqQuery.getMapping().getAlignment(), requestRange);
}
BiFunction<DataEvent, DataEvent, DataEvent> valueCombiner = null;
if(binningStrategy != null){
binningStrategy.setRequestRange(requestRange);
valueCombiner = new BinnedValueCombiner(binningStrategy);
}
// online matching of the stream's content
final StreamMatcher<ChannelName, DataEvent, Map<ChannelName, DataEvent>> streamMatcher =
new StreamMatcher<>(
DAQQueriesResponseFormatter.KEY_PROVIDER,
DAQQueriesResponseFormatter.MATCHER_PROVIDER,
alignmentProvider,
new ReusingMapCreator<>(),
new MapFiller<>(),
null,
valueCombiner,
padder,
eventStreams.keySet(),
eventStreams.values());

View File

@@ -9,6 +9,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.ToLongFunction;
import java.util.stream.Collectors;
@@ -28,11 +29,10 @@ import com.fasterxml.jackson.databind.ObjectWriter;
import com.fasterxml.jackson.databind.ser.impl.SimpleBeanPropertyFilter;
import com.fasterxml.jackson.databind.ser.impl.SimpleFilterProvider;
import ch.psi.daq.common.stream.match.ReuseingListCreator;
import ch.psi.daq.common.stream.match.ListFiller;
import ch.psi.daq.common.stream.match.Padder;
import ch.psi.daq.common.stream.match.ReuseingListCreator;
import ch.psi.daq.common.stream.match.StreamMatcher;
import ch.psi.daq.common.time.TimeUtils;
import ch.psi.daq.common.tuple.Quadruple;
import ch.psi.daq.domain.DataEvent;
import ch.psi.daq.domain.events.ChannelConfiguration;
@@ -40,8 +40,7 @@ import ch.psi.daq.domain.json.ChannelName;
import ch.psi.daq.domain.query.DAQQueryElement;
import ch.psi.daq.domain.query.backend.BackendQuery;
import ch.psi.daq.domain.query.bin.BinningStrategy;
import ch.psi.daq.domain.query.bin.strategy.BinningStrategyPerBinPulse;
import ch.psi.daq.domain.query.bin.strategy.BinningStrategyPerBinTime;
import ch.psi.daq.domain.query.mapping.Alignment;
import ch.psi.daq.domain.query.mapping.IncompleteStrategy;
import ch.psi.daq.domain.query.mapping.Mapping;
import ch.psi.daq.domain.query.operation.Aggregation;
@@ -68,10 +67,6 @@ public class DAQQueriesResponseFormatter
public static final Mapping DEFAULT_MAPPING = new Mapping(IncompleteStrategy.PROVIDE_AS_IS);
public static final Function<DataEvent, ChannelName> KEY_PROVIDER = (event) -> new ChannelName(event.getChannel(),
event.getBackend());
// try to match sync data (bsread) with non sync data (epics) based on the time usin 10 millis
// buckets.
public static final ToLongFunction<DataEvent> MATCHER_PROVIDER = (event) -> event.getGlobalMillis()
/ TimeUtils.MILLIS_PER_PULSE;
// In case ArchiverAppliance had several events within the 10ms mapping interval, return these
// aggregations (only used for table format)
@@ -227,29 +222,24 @@ public class DAQQueriesResponseFormatter
final Mapping mapping = daqQuery.getMappingOrDefault(DEFAULT_MAPPING);
final Padder<ChannelName, DataEvent> padder = mapping.getIncomplete().getPadder(backendQuery);
ToLongFunction<DataEvent> matchProvider = binningStrategy;
ToLongFunction<DataEvent> alignmentProvider = Alignment.getAlignmentProvider(daqQuery.getMapping().getAlignment(), binningStrategy);
if (binningStrategy == null) {
matchProvider = MATCHER_PROVIDER;
if (requestRange.isPulseIdRangeDefined()) {
binningStrategy = new BinningStrategyPerBinPulse(1);
} else if (requestRange.isTimeRangeDefined()) {
binningStrategy = new BinningStrategyPerBinTime(TimeUtils.MILLIS_PER_PULSE);
} else {
final String message = "Either time or pulseId range must be defined by the query!";
LOGGER.error(message);
throw new IllegalStateException(message);
}
binningStrategy = Alignment.getBinningStrategy(daqQuery.getMapping().getAlignment(), requestRange);
}
BiFunction<DataEvent, DataEvent, DataEvent> valueCombiner = null;
if(binningStrategy != null){
binningStrategy.setRequestRange(requestRange);
valueCombiner = new BinnedValueCombiner(binningStrategy);
}
binningStrategy.setRequestRange(requestRange);
/* online matching of the stream's content */
final StreamMatcher<ChannelName, DataEvent, List<DataEvent>> streamMatcher =
new StreamMatcher<>(
KEY_PROVIDER,
matchProvider,
DAQQueriesResponseFormatter.KEY_PROVIDER,
alignmentProvider,
new ReuseingListCreator<ChannelName, DataEvent>(),
new ListFiller<ChannelName, DataEvent>(),
new BinnedValueCombiner(binningStrategy),
valueCombiner,
padder,
eventStreams.keySet(),
eventStreams.values());

View File

@@ -41,6 +41,7 @@ import ch.psi.daq.domain.json.ChannelName;
import ch.psi.daq.domain.query.DAQQueryElement;
import ch.psi.daq.domain.query.backend.BackendQuery;
import ch.psi.daq.domain.query.mapping.Mapping;
import ch.psi.daq.domain.query.mapping.Alignment;
import ch.psi.daq.queryrest.response.AbstractHTTPResponse;
import ch.psi.daq.queryrest.response.ResponseFormatter;
import ch.psi.daq.queryrest.response.csv.CSVResponseStreamWriter;
@@ -162,7 +163,7 @@ public abstract class AbstractRawResponseFormatter implements ResponseFormatter<
final StreamMatcher<ChannelName, DataEvent, Map<ChannelName, DataEvent>> eventStreamMatcher =
new StreamMatcher<>(
DAQQueriesResponseFormatter.KEY_PROVIDER,
DAQQueriesResponseFormatter.MATCHER_PROVIDER,
Alignment.MATCHER_PROVIDER_BY_PULSE,
new RecreateMapCreator<>(),
new MapFiller<>(),
null,
@@ -185,7 +186,8 @@ public abstract class AbstractRawResponseFormatter implements ResponseFormatter<
final Map<ChannelName, Stream<ChannelConfiguration>> configStreams,
final Map<ChannelName, Stream<DataEvent>> eventStreams,
final Padder<ChannelName, DataEvent> eventPadder) {
// INFO: Do not use matchProvider = binningStrategy; since this is a raw data downloader (no aggregations supported so far)
// online matching of the stream's content
final StreamMatcher<ChannelName, ChannelConfiguration, Map<ChannelName, ChannelConfiguration>> configStreamMatcher =
new StreamMatcher<>(
@@ -202,7 +204,7 @@ public abstract class AbstractRawResponseFormatter implements ResponseFormatter<
final StreamMatcher<ChannelName, DataEvent, Map<ChannelName, DataEvent>> eventStreamMatcher =
new StreamMatcher<>(
DAQQueriesResponseFormatter.KEY_PROVIDER,
DAQQueriesResponseFormatter.MATCHER_PROVIDER,
Alignment.MATCHER_PROVIDER_BY_PULSE,
new RecreateMapCreator<>(),
new MapFiller<>(),
null,

View File

@@ -33,6 +33,8 @@ import ch.psi.daq.common.time.TimeUtils;
import ch.psi.daq.domain.backend.Backend;
import ch.psi.daq.domain.config.DomainConfig;
import ch.psi.daq.domain.query.DAQQuery;
import ch.psi.daq.domain.query.mapping.Mapping;
import ch.psi.daq.domain.query.mapping.Alignment;
import ch.psi.daq.domain.query.operation.Aggregation;
import ch.psi.daq.domain.query.operation.AggregationDescriptor;
import ch.psi.daq.domain.query.operation.AggregationType;
@@ -1134,7 +1136,7 @@ public class CSVQueryRestControllerTest extends AbstractDaqRestTest implements A
}
@Test
public void testDateRangeQueryBinSizeAggregate() throws Exception {
public void testDateRangeQueryBinSizeAggregate_01() throws Exception {
List<String> channels = Arrays.asList(TEST_CHANNEL_01);
long startTime = 0;
long endTime = 999;
@@ -1182,6 +1184,530 @@ public class CSVQueryRestControllerTest extends AbstractDaqRestTest implements A
checkDateRangeQueryBinSizeAggregate(channels, aggregations, eventFields, response);
}
@Test
public void testDateRangeQueryBinSizeAggregate_02() throws Exception {
List<String> channels = Arrays.asList(
TEST_CHANNEL_01 + "_{0,100,200,300,400,500,600,700,800,900}",
TEST_CHANNEL_02 + "_{50,150,250,350,450,550,650,750,850,950}");
long startTime = 0;
long endTime = 999;
String startDate = TimeUtils.format(startTime);
String endDate = TimeUtils.format(endTime);
List<Aggregation> aggregations = new ArrayList<>();
aggregations.add(Aggregation.min);
aggregations.add(Aggregation.mean);
aggregations.add(Aggregation.max);
DAQQuery request = new DAQQuery(
new RequestRangeDate(
startDate,
endDate),
channels);
request.setAggregation(new AggregationDescriptor().setDurationPerBin(100).setAggregations(aggregations));
request.setResponse(new CSVHTTPResponse());
LinkedHashSet<EventField> eventFields = new LinkedHashSet<>();
eventFields.add(EventField.channel);
eventFields.add(EventField.pulseId);
eventFields.add(EventField.iocSeconds);
eventFields.add(EventField.iocMillis);
eventFields.add(EventField.globalSeconds);
eventFields.add(EventField.globalMillis);
eventFields.add(EventField.shape);
eventFields.add(EventField.eventCount);
request.setFields(eventFields);
String content = mapper.writeValueAsString(request);
System.out.println(content);
MvcResult result = this.mockMvc
.perform(MockMvcRequestBuilders
.post(DomainConfig.PATH_QUERY)
.contentType(MediaType.APPLICATION_JSON)
.content(content))
.andDo(MockMvcResultHandlers.print())
.andExpect(MockMvcResultMatchers.status().isOk())
.andReturn();
String response = result.getResponse().getContentAsString();
System.out.println("Response: " + response);
CSVFormat csvFormat = CSVFormat.EXCEL.withDelimiter(CSVResponseStreamWriter.DELIMITER_CVS);
StringReader reader = new StringReader(response);
CSVParser csvParser = new CSVParser(reader, csvFormat);
try {
long pulse = 0;
int totalEventRows = 10;
List<CSVRecord> records = csvParser.getRecords();
assertEquals(totalEventRows + 1, records.size());
// remove header
CSVRecord record = records.remove(0);
assertEquals((eventFields.size() + aggregations.size()) * channels.size(), record.size());
int column = 0;
for (String channel : channels) {
for (EventField eventField : eventFields) {
assertEquals(channel + CSVResponseStreamWriter.DELIMITER_CHANNELNAME_FIELDNAME + eventField.name(),
record.get(column++));
}
for (Aggregation aggregation : aggregations) {
assertEquals(channel + CSVResponseStreamWriter.DELIMITER_CHANNELNAME_FIELDNAME + EventField.value
+ CSVResponseStreamWriter.DELIMITER_CHANNELNAME_FIELDNAME + aggregation.name(),
record.get(column++));
}
}
for (int row = 0; row < totalEventRows; ++row) {
record = records.get(row);
assertEquals((eventFields.size() + aggregations.size()) * channels.size(), record.size());
column = 0;
String channel = channels.get(0);
assertEquals(channel, record.get(column++));
assertEquals("" + pulse, record.get(column++));
assertEquals(TimeUtils.getTimeStr(TestTimeUtils.getTimeFromPulseId(pulse)), record.get(column++));
assertEquals("" + TestTimeUtils.getTimeFromPulseId(pulse).getMillis(), record.get(column++));
assertEquals(TimeUtils.getTimeStr(TestTimeUtils.getTimeFromPulseId(pulse)), record.get(column++));
assertEquals("" + TestTimeUtils.getTimeFromPulseId(pulse).getMillis(), record.get(column++));
assertEquals("[1]", record.get(column++));
assertEquals("1", record.get(column++));
assertEquals("" + pulse + ".0", record.get(column++));
assertEquals("" + pulse + ".0", record.get(column++));
assertEquals("" + pulse + ".0", record.get(column++));
pulse += 5;
channel = channels.get(1);
assertEquals(channel, record.get(column++));
assertEquals("" + pulse, record.get(column++));
assertEquals(TimeUtils.getTimeStr(TestTimeUtils.getTimeFromPulseId(pulse)), record.get(column++));
assertEquals("" + TestTimeUtils.getTimeFromPulseId(pulse).getMillis(), record.get(column++));
assertEquals(TimeUtils.getTimeStr(TestTimeUtils.getTimeFromPulseId(pulse)), record.get(column++));
assertEquals("" + TestTimeUtils.getTimeFromPulseId(pulse).getMillis(), record.get(column++));
assertEquals("[1]", record.get(column++));
assertEquals("1", record.get(column++));
assertEquals("" + pulse + ".0", record.get(column++));
assertEquals("" + pulse + ".0", record.get(column++));
assertEquals("" + pulse + ".0", record.get(column++));
pulse += 5;
}
} finally {
reader.close();
csvParser.close();
}
}
@Test
public void testDateRangeQueryBinSizeAggregate_MissingPulses() throws Exception {
List<String> channels = Arrays.asList(
TEST_CHANNEL_01 + "_{100,200}",
TEST_CHANNEL_01 + "_{10,110,210}",
TEST_CHANNEL_01 + "_{120}");
long startTime = 0;
long endTime = 400;
String startDate = TimeUtils.format(startTime);
String endDate = TimeUtils.format(endTime);
List<Aggregation> aggregations = new ArrayList<>();
aggregations.add(Aggregation.min);
aggregations.add(Aggregation.mean);
aggregations.add(Aggregation.max);
DAQQuery request = new DAQQuery(
new RequestRangeDate(
startDate,
endDate),
channels);
request.setAggregation(new AggregationDescriptor().setDurationPerBin(100).setAggregations(aggregations));
request.setResponse(new CSVHTTPResponse());
LinkedHashSet<EventField> eventFields = new LinkedHashSet<>();
eventFields.add(EventField.channel);
eventFields.add(EventField.pulseId);
eventFields.add(EventField.iocSeconds);
eventFields.add(EventField.iocMillis);
eventFields.add(EventField.globalSeconds);
eventFields.add(EventField.globalMillis);
eventFields.add(EventField.shape);
eventFields.add(EventField.eventCount);
request.setFields(eventFields);
String content = mapper.writeValueAsString(request);
System.out.println(content);
MvcResult result = this.mockMvc
.perform(MockMvcRequestBuilders
.post(DomainConfig.PATH_QUERY)
.contentType(MediaType.APPLICATION_JSON)
.content(content))
.andDo(MockMvcResultHandlers.print())
.andExpect(MockMvcResultMatchers.status().isOk())
.andReturn();
String response = result.getResponse().getContentAsString();
System.out.println("Response: " + response);
CSVFormat csvFormat = CSVFormat.EXCEL.withDelimiter(CSVResponseStreamWriter.DELIMITER_CVS);
StringReader reader = new StringReader(response);
CSVParser csvParser = new CSVParser(reader, csvFormat);
try {
long pulse = 0;
int totalEventRows = 3;
List<CSVRecord> records = csvParser.getRecords();
assertEquals(totalEventRows + 1, records.size());
// remove header
CSVRecord record = records.remove(0);
assertEquals((eventFields.size() + aggregations.size()) * channels.size(), record.size());
int column = 0;
for (String channel : channels) {
for (EventField eventField : eventFields) {
assertEquals(channel + CSVResponseStreamWriter.DELIMITER_CHANNELNAME_FIELDNAME + eventField.name(),
record.get(column++));
}
for (Aggregation aggregation : aggregations) {
assertEquals(channel + CSVResponseStreamWriter.DELIMITER_CHANNELNAME_FIELDNAME + EventField.value
+ CSVResponseStreamWriter.DELIMITER_CHANNELNAME_FIELDNAME + aggregation.name(),
record.get(column++));
}
}
int row = 0;
record = records.get(row);
assertEquals((eventFields.size() + aggregations.size()) * channels.size(), record.size());
column = 0;
String channel = channels.get(0);
assertEquals("", record.get(column++));
assertEquals("", record.get(column++));
assertEquals("", record.get(column++));
assertEquals("", record.get(column++));
assertEquals("", record.get(column++));
assertEquals("", record.get(column++));
assertEquals("", record.get(column++));
assertEquals("", record.get(column++));
assertEquals("", record.get(column++));
assertEquals("", record.get(column++));
assertEquals("", record.get(column++));
pulse = 1;
channel = channels.get(1);
assertEquals(channel, record.get(column++));
assertEquals("" + pulse, record.get(column++));
assertEquals(TimeUtils.getTimeStr(TestTimeUtils.getTimeFromPulseId(pulse)), record.get(column++));
assertEquals("" + TestTimeUtils.getTimeFromPulseId(pulse).getMillis(), record.get(column++));
assertEquals(TimeUtils.getTimeStr(TestTimeUtils.getTimeFromPulseId(pulse)), record.get(column++));
assertEquals("" + TestTimeUtils.getTimeFromPulseId(pulse).getMillis(), record.get(column++));
assertEquals("[1]", record.get(column++));
assertEquals("1", record.get(column++));
assertEquals("" + pulse + ".0", record.get(column++));
assertEquals("" + pulse + ".0", record.get(column++));
assertEquals("" + pulse + ".0", record.get(column++));
pulse = 2;
channel = channels.get(2);
assertEquals("", record.get(column++));
assertEquals("", record.get(column++));
assertEquals("", record.get(column++));
assertEquals("", record.get(column++));
assertEquals("", record.get(column++));
assertEquals("", record.get(column++));
assertEquals("", record.get(column++));
assertEquals("", record.get(column++));
assertEquals("", record.get(column++));
assertEquals("", record.get(column++));
assertEquals("", record.get(column++));
row = 1;
record = records.get(row);
assertEquals((eventFields.size() + aggregations.size()) * channels.size(), record.size());
column = 0;
pulse = 10;
channel = channels.get(0);
assertEquals(channel, record.get(column++));
assertEquals("" + pulse, record.get(column++));
assertEquals(TimeUtils.getTimeStr(TestTimeUtils.getTimeFromPulseId(pulse)), record.get(column++));
assertEquals("" + TestTimeUtils.getTimeFromPulseId(pulse).getMillis(), record.get(column++));
assertEquals(TimeUtils.getTimeStr(TestTimeUtils.getTimeFromPulseId(pulse)), record.get(column++));
assertEquals("" + TestTimeUtils.getTimeFromPulseId(pulse).getMillis(), record.get(column++));
assertEquals("[1]", record.get(column++));
assertEquals("1", record.get(column++));
assertEquals("" + pulse + ".0", record.get(column++));
assertEquals("" + pulse + ".0", record.get(column++));
assertEquals("" + pulse + ".0", record.get(column++));
pulse = 11;
channel = channels.get(1);
assertEquals(channel, record.get(column++));
assertEquals("" + pulse, record.get(column++));
assertEquals(TimeUtils.getTimeStr(TestTimeUtils.getTimeFromPulseId(pulse)), record.get(column++));
assertEquals("" + TestTimeUtils.getTimeFromPulseId(pulse).getMillis(), record.get(column++));
assertEquals(TimeUtils.getTimeStr(TestTimeUtils.getTimeFromPulseId(pulse)), record.get(column++));
assertEquals("" + TestTimeUtils.getTimeFromPulseId(pulse).getMillis(), record.get(column++));
assertEquals("[1]", record.get(column++));
assertEquals("1", record.get(column++));
assertEquals("" + pulse + ".0", record.get(column++));
assertEquals("" + pulse + ".0", record.get(column++));
assertEquals("" + pulse + ".0", record.get(column++));
pulse = 12;
channel = channels.get(2);
assertEquals(channel, record.get(column++));
assertEquals("" + pulse, record.get(column++));
assertEquals(TimeUtils.getTimeStr(TestTimeUtils.getTimeFromPulseId(pulse)), record.get(column++));
assertEquals("" + TestTimeUtils.getTimeFromPulseId(pulse).getMillis(), record.get(column++));
assertEquals(TimeUtils.getTimeStr(TestTimeUtils.getTimeFromPulseId(pulse)), record.get(column++));
assertEquals("" + TestTimeUtils.getTimeFromPulseId(pulse).getMillis(), record.get(column++));
assertEquals("[1]", record.get(column++));
assertEquals("1", record.get(column++));
assertEquals("" + pulse + ".0", record.get(column++));
assertEquals("" + pulse + ".0", record.get(column++));
assertEquals("" + pulse + ".0", record.get(column++));
row = 2;
record = records.get(row);
assertEquals((eventFields.size() + aggregations.size()) * channels.size(), record.size());
column = 0;
pulse = 20;
channel = channels.get(0);
assertEquals(channel, record.get(column++));
assertEquals("" + pulse, record.get(column++));
assertEquals(TimeUtils.getTimeStr(TestTimeUtils.getTimeFromPulseId(pulse)), record.get(column++));
assertEquals("" + TestTimeUtils.getTimeFromPulseId(pulse).getMillis(), record.get(column++));
assertEquals(TimeUtils.getTimeStr(TestTimeUtils.getTimeFromPulseId(pulse)), record.get(column++));
assertEquals("" + TestTimeUtils.getTimeFromPulseId(pulse).getMillis(), record.get(column++));
assertEquals("[1]", record.get(column++));
assertEquals("1", record.get(column++));
assertEquals("" + pulse + ".0", record.get(column++));
assertEquals("" + pulse + ".0", record.get(column++));
assertEquals("" + pulse + ".0", record.get(column++));
pulse = 21;
channel = channels.get(1);
assertEquals(channel, record.get(column++));
assertEquals("" + pulse, record.get(column++));
assertEquals(TimeUtils.getTimeStr(TestTimeUtils.getTimeFromPulseId(pulse)), record.get(column++));
assertEquals("" + TestTimeUtils.getTimeFromPulseId(pulse).getMillis(), record.get(column++));
assertEquals(TimeUtils.getTimeStr(TestTimeUtils.getTimeFromPulseId(pulse)), record.get(column++));
assertEquals("" + TestTimeUtils.getTimeFromPulseId(pulse).getMillis(), record.get(column++));
assertEquals("[1]", record.get(column++));
assertEquals("1", record.get(column++));
assertEquals("" + pulse + ".0", record.get(column++));
assertEquals("" + pulse + ".0", record.get(column++));
assertEquals("" + pulse + ".0", record.get(column++));
pulse = 22;
channel = channels.get(2);
assertEquals("", record.get(column++));
assertEquals("", record.get(column++));
assertEquals("", record.get(column++));
assertEquals("", record.get(column++));
assertEquals("", record.get(column++));
assertEquals("", record.get(column++));
assertEquals("", record.get(column++));
assertEquals("", record.get(column++));
assertEquals("", record.get(column++));
assertEquals("", record.get(column++));
assertEquals("", record.get(column++));
} finally {
reader.close();
csvParser.close();
}
}
@Test
public void testDateRangeQuery_NoneAlignment() throws Exception {
List<String> channels = Arrays.asList(
TEST_CHANNEL_01 + "_{100,200}",
TEST_CHANNEL_01 + "_{10,110,210}",
TEST_CHANNEL_01 + "_{120}");
long startTime = 0;
long endTime = 400;
String startDate = TimeUtils.format(startTime);
String endDate = TimeUtils.format(endTime);
DAQQuery request = new DAQQuery(
new RequestRangeDate(
startDate,
endDate),
channels);
request.setResponse(new CSVHTTPResponse());
request.setMapping(new Mapping(Alignment.NONE));
LinkedHashSet<EventField> eventFields = new LinkedHashSet<>();
eventFields.add(EventField.channel);
eventFields.add(EventField.pulseId);
eventFields.add(EventField.iocSeconds);
eventFields.add(EventField.iocMillis);
eventFields.add(EventField.globalSeconds);
eventFields.add(EventField.globalMillis);
eventFields.add(EventField.shape);
eventFields.add(EventField.eventCount);
eventFields.add(EventField.value);
request.setFields(eventFields);
String content = mapper.writeValueAsString(request);
System.out.println(content);
MvcResult result = this.mockMvc
.perform(MockMvcRequestBuilders
.post(DomainConfig.PATH_QUERY)
.contentType(MediaType.APPLICATION_JSON)
.content(content))
.andDo(MockMvcResultHandlers.print())
.andExpect(MockMvcResultMatchers.status().isOk())
.andReturn();
String response = result.getResponse().getContentAsString();
System.out.println("Response: " + response);
CSVFormat csvFormat = CSVFormat.EXCEL.withDelimiter(CSVResponseStreamWriter.DELIMITER_CVS);
StringReader reader = new StringReader(response);
CSVParser csvParser = new CSVParser(reader, csvFormat);
try {
long pulse = 0;
int totalEventRows = 3;
List<CSVRecord> records = csvParser.getRecords();
assertEquals(totalEventRows + 1, records.size());
// remove header
CSVRecord record = records.remove(0);
assertEquals((eventFields.size()) * channels.size(), record.size());
int column = 0;
for (String channel : channels) {
for (EventField eventField : eventFields) {
assertEquals(channel + CSVResponseStreamWriter.DELIMITER_CHANNELNAME_FIELDNAME + eventField.name(),
record.get(column++));
}
}
int row = 0;
record = records.get(row);
assertEquals((eventFields.size()) * channels.size(), record.size());
column = 0;
pulse = 10;
String channel = channels.get(0);
assertEquals(channel, record.get(column++));
assertEquals("" + pulse, record.get(column++));
assertEquals(TimeUtils.getTimeStr(TestTimeUtils.getTimeFromPulseId(pulse)), record.get(column++));
assertEquals("" + TestTimeUtils.getTimeFromPulseId(pulse).getMillis(), record.get(column++));
assertEquals(TimeUtils.getTimeStr(TestTimeUtils.getTimeFromPulseId(pulse)), record.get(column++));
assertEquals("" + TestTimeUtils.getTimeFromPulseId(pulse).getMillis(), record.get(column++));
assertEquals("[1]", record.get(column++));
assertEquals("1", record.get(column++));
assertEquals("" + pulse, record.get(column++));
pulse = 1;
channel = channels.get(1);
assertEquals(channel, record.get(column++));
assertEquals("" + pulse, record.get(column++));
assertEquals(TimeUtils.getTimeStr(TestTimeUtils.getTimeFromPulseId(pulse)), record.get(column++));
assertEquals("" + TestTimeUtils.getTimeFromPulseId(pulse).getMillis(), record.get(column++));
assertEquals(TimeUtils.getTimeStr(TestTimeUtils.getTimeFromPulseId(pulse)), record.get(column++));
assertEquals("" + TestTimeUtils.getTimeFromPulseId(pulse).getMillis(), record.get(column++));
assertEquals("[1]", record.get(column++));
assertEquals("1", record.get(column++));
assertEquals("" + pulse, record.get(column++));
pulse = 12;
channel = channels.get(2);
assertEquals(channel, record.get(column++));
assertEquals("" + pulse, record.get(column++));
assertEquals(TimeUtils.getTimeStr(TestTimeUtils.getTimeFromPulseId(pulse)), record.get(column++));
assertEquals("" + TestTimeUtils.getTimeFromPulseId(pulse).getMillis(), record.get(column++));
assertEquals(TimeUtils.getTimeStr(TestTimeUtils.getTimeFromPulseId(pulse)), record.get(column++));
assertEquals("" + TestTimeUtils.getTimeFromPulseId(pulse).getMillis(), record.get(column++));
assertEquals("[1]", record.get(column++));
assertEquals("1", record.get(column++));
assertEquals("" + pulse, record.get(column++));
row = 1;
record = records.get(row);
assertEquals((eventFields.size()) * channels.size(), record.size());
column = 0;
pulse = 20;
channel = channels.get(0);
assertEquals(channel, record.get(column++));
assertEquals("" + pulse, record.get(column++));
assertEquals(TimeUtils.getTimeStr(TestTimeUtils.getTimeFromPulseId(pulse)), record.get(column++));
assertEquals("" + TestTimeUtils.getTimeFromPulseId(pulse).getMillis(), record.get(column++));
assertEquals(TimeUtils.getTimeStr(TestTimeUtils.getTimeFromPulseId(pulse)), record.get(column++));
assertEquals("" + TestTimeUtils.getTimeFromPulseId(pulse).getMillis(), record.get(column++));
assertEquals("[1]", record.get(column++));
assertEquals("1", record.get(column++));
assertEquals("" + pulse, record.get(column++));
pulse = 11;
channel = channels.get(1);
assertEquals(channel, record.get(column++));
assertEquals("" + pulse, record.get(column++));
assertEquals(TimeUtils.getTimeStr(TestTimeUtils.getTimeFromPulseId(pulse)), record.get(column++));
assertEquals("" + TestTimeUtils.getTimeFromPulseId(pulse).getMillis(), record.get(column++));
assertEquals(TimeUtils.getTimeStr(TestTimeUtils.getTimeFromPulseId(pulse)), record.get(column++));
assertEquals("" + TestTimeUtils.getTimeFromPulseId(pulse).getMillis(), record.get(column++));
assertEquals("[1]", record.get(column++));
assertEquals("1", record.get(column++));
assertEquals("" + pulse, record.get(column++));
pulse = 12;
channel = channels.get(2);
assertEquals("", record.get(column++));
assertEquals("", record.get(column++));
assertEquals("", record.get(column++));
assertEquals("", record.get(column++));
assertEquals("", record.get(column++));
assertEquals("", record.get(column++));
assertEquals("", record.get(column++));
assertEquals("", record.get(column++));
assertEquals("", record.get(column++));
row = 2;
record = records.get(row);
assertEquals((eventFields.size()) * channels.size(), record.size());
column = 0;
pulse = 20;
channel = channels.get(0);
assertEquals("", record.get(column++));
assertEquals("", record.get(column++));
assertEquals("", record.get(column++));
assertEquals("", record.get(column++));
assertEquals("", record.get(column++));
assertEquals("", record.get(column++));
assertEquals("", record.get(column++));
assertEquals("", record.get(column++));
assertEquals("", record.get(column++));
pulse = 21;
channel = channels.get(1);
assertEquals(channel, record.get(column++));
assertEquals("" + pulse, record.get(column++));
assertEquals(TimeUtils.getTimeStr(TestTimeUtils.getTimeFromPulseId(pulse)), record.get(column++));
assertEquals("" + TestTimeUtils.getTimeFromPulseId(pulse).getMillis(), record.get(column++));
assertEquals(TimeUtils.getTimeStr(TestTimeUtils.getTimeFromPulseId(pulse)), record.get(column++));
assertEquals("" + TestTimeUtils.getTimeFromPulseId(pulse).getMillis(), record.get(column++));
assertEquals("[1]", record.get(column++));
assertEquals("1", record.get(column++));
assertEquals("" + pulse, record.get(column++));
pulse = 22;
channel = channels.get(2);
assertEquals("", record.get(column++));
assertEquals("", record.get(column++));
assertEquals("", record.get(column++));
assertEquals("", record.get(column++));
assertEquals("", record.get(column++));
assertEquals("", record.get(column++));
assertEquals("", record.get(column++));
assertEquals("", record.get(column++));
assertEquals("", record.get(column++));
} finally {
reader.close();
csvParser.close();
}
}
private void checkDateRangeQueryBinSizeAggregate(final List<String> channels, final List<Aggregation> aggregations,
final Set<EventField> eventFields, final String response) throws Exception {
CSVFormat csvFormat = CSVFormat.EXCEL.withDelimiter(CSVResponseStreamWriter.DELIMITER_CVS);