Use new query structure.
This commit is contained in:
@ -1,7 +1,6 @@
|
|||||||
package ch.psi.daq.queryrest.config;
|
package ch.psi.daq.queryrest.config;
|
||||||
|
|
||||||
import java.util.Arrays;
|
import java.util.LinkedHashSet;
|
||||||
import java.util.List;
|
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
@ -15,7 +14,6 @@ import org.springframework.util.StringUtils;
|
|||||||
import com.fasterxml.jackson.annotation.JsonInclude.Include;
|
import com.fasterxml.jackson.annotation.JsonInclude.Include;
|
||||||
import com.fasterxml.jackson.core.JsonFactory;
|
import com.fasterxml.jackson.core.JsonFactory;
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
import com.google.common.collect.Sets;
|
|
||||||
|
|
||||||
import ch.psi.daq.common.statistic.StorelessStatistics;
|
import ch.psi.daq.common.statistic.StorelessStatistics;
|
||||||
import ch.psi.daq.domain.cassandra.ChannelEvent;
|
import ch.psi.daq.domain.cassandra.ChannelEvent;
|
||||||
@ -64,12 +62,14 @@ public class QueryRestConfig {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Bean
|
@Bean
|
||||||
private Set<String> defaultResponseFields() {
|
public Set<String> defaultResponseFields() {
|
||||||
List<String> defaultFields = Arrays.asList(
|
String[] responseFields = StringUtils.commaDelimitedListToStringArray(env.getProperty("queryrest.default.response.fields"));
|
||||||
StringUtils.commaDelimitedListToStringArray(
|
// preserve order
|
||||||
env.getProperty("queryrest.default.response.fields")
|
LinkedHashSet<String> defaultResponseFields = new LinkedHashSet<>(responseFields.length);
|
||||||
));
|
for (String field : defaultResponseFields) {
|
||||||
Set<String> defaultResponseFields = Sets.newHashSet(defaultFields.iterator());
|
defaultResponseFields.add(field);
|
||||||
|
}
|
||||||
|
|
||||||
return defaultResponseFields;
|
return defaultResponseFields;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,11 +1,8 @@
|
|||||||
/**
|
|
||||||
*
|
|
||||||
*/
|
|
||||||
package ch.psi.daq.queryrest.response;
|
package ch.psi.daq.queryrest.response;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.LinkedHashSet;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.stream.Collectors;
|
|
||||||
import java.util.stream.Stream;
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
import javax.servlet.ServletResponse;
|
import javax.servlet.ServletResponse;
|
||||||
@ -24,6 +21,7 @@ import com.fasterxml.jackson.databind.ser.impl.SimpleFilterProvider;
|
|||||||
|
|
||||||
import ch.psi.daq.domain.cassandra.DataEvent;
|
import ch.psi.daq.domain.cassandra.DataEvent;
|
||||||
import ch.psi.daq.query.model.AbstractQuery;
|
import ch.psi.daq.query.model.AbstractQuery;
|
||||||
|
import ch.psi.daq.query.model.AggregationEnum;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Takes a Java 8 stream and writes it to the output stream provided by the {@link ServletResponse}
|
* Takes a Java 8 stream and writes it to the output stream provided by the {@link ServletResponse}
|
||||||
@ -41,6 +39,7 @@ public class ResponseStreamWriter {
|
|||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
private Set<String> defaultResponseFields;
|
private Set<String> defaultResponseFields;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Responding with the the contents of the stream by writing into the output stream of the
|
* Responding with the the contents of the stream by writing into the output stream of the
|
||||||
* {@link ServletResponse}.
|
* {@link ServletResponse}.
|
||||||
@ -52,17 +51,13 @@ public class ResponseStreamWriter {
|
|||||||
*/
|
*/
|
||||||
public void respond(Stream<DataEvent> stream, AbstractQuery query, ServletResponse response) throws IOException {
|
public void respond(Stream<DataEvent> stream, AbstractQuery query, ServletResponse response) throws IOException {
|
||||||
|
|
||||||
Set<String> includedFields = query.getFields();
|
Set<String> includedFields = query.getFieldsOrDefault(defaultResponseFields);
|
||||||
if (includedFields == null) {
|
|
||||||
includedFields = defaultResponseFields;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (query.getAggregations() != null) {
|
if (query.getAggregations() != null) {
|
||||||
includedFields.addAll(query.getAggregations()
|
includedFields = new LinkedHashSet<String>(includedFields);
|
||||||
.stream()
|
for (AggregationEnum aggregation : query.getAggregations()) {
|
||||||
.map(a -> {
|
includedFields.add(aggregation.name());
|
||||||
return a.getType().toString();
|
}
|
||||||
}).collect(Collectors.toSet()));
|
|
||||||
}
|
}
|
||||||
ObjectWriter writer = configureWriter(includedFields);
|
ObjectWriter writer = configureWriter(includedFields);
|
||||||
respondInternal(stream, response, writer);
|
respondInternal(stream, response, writer);
|
||||||
|
@ -1,12 +1,15 @@
|
|||||||
package ch.psi.daq.test.queryrest;
|
package ch.psi.daq.test.queryrest;
|
||||||
|
|
||||||
|
import org.springframework.context.annotation.Bean;
|
||||||
import org.springframework.context.annotation.Configuration;
|
import org.springframework.context.annotation.Configuration;
|
||||||
import org.springframework.context.annotation.Import;
|
import org.springframework.context.annotation.Import;
|
||||||
import org.springframework.web.servlet.config.annotation.EnableWebMvc;
|
import org.springframework.web.servlet.config.annotation.EnableWebMvc;
|
||||||
import org.springframework.web.servlet.config.annotation.WebMvcConfigurationSupport;
|
import org.springframework.web.servlet.config.annotation.WebMvcConfigurationSupport;
|
||||||
|
|
||||||
|
import ch.psi.daq.query.processor.QueryProcessor;
|
||||||
import ch.psi.daq.queryrest.config.QueryRestConfig;
|
import ch.psi.daq.queryrest.config.QueryRestConfig;
|
||||||
import ch.psi.daq.test.cassandra.config.LocalCassandraTestConfig;
|
import ch.psi.daq.test.cassandra.config.LocalCassandraTestConfig;
|
||||||
|
import ch.psi.daq.test.queryrest.query.DummyQueryProcessor;
|
||||||
|
|
||||||
@Configuration
|
@Configuration
|
||||||
@Import(value = {LocalCassandraTestConfig.class, QueryRestConfig.class})
|
@Import(value = {LocalCassandraTestConfig.class, QueryRestConfig.class})
|
||||||
@ -14,4 +17,9 @@ import ch.psi.daq.test.cassandra.config.LocalCassandraTestConfig;
|
|||||||
public class DaqWebMvcConfig extends WebMvcConfigurationSupport {
|
public class DaqWebMvcConfig extends WebMvcConfigurationSupport {
|
||||||
|
|
||||||
// add test-specific beans and configurations here
|
// add test-specific beans and configurations here
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public QueryProcessor queryProcessor() {
|
||||||
|
return new DummyQueryProcessor();
|
||||||
|
}
|
||||||
}
|
}
|
@ -16,11 +16,8 @@ import com.google.common.collect.Sets;
|
|||||||
|
|
||||||
import ch.psi.daq.cassandra.reader.Ordering;
|
import ch.psi.daq.cassandra.reader.Ordering;
|
||||||
import ch.psi.daq.query.model.AggregationType;
|
import ch.psi.daq.query.model.AggregationType;
|
||||||
import ch.psi.daq.query.model.BinningStrategyEnum;
|
|
||||||
import ch.psi.daq.query.model.PulseRangeQuery;
|
import ch.psi.daq.query.model.PulseRangeQuery;
|
||||||
import ch.psi.daq.query.model.TimeRangeQuery;
|
import ch.psi.daq.query.model.TimeRangeQuery;
|
||||||
import ch.psi.daq.query.range.QueryRange;
|
|
||||||
import ch.psi.daq.query.range.QueryRangeImpl;
|
|
||||||
import ch.psi.daq.test.queryrest.AbstractDaqRestTest;
|
import ch.psi.daq.test.queryrest.AbstractDaqRestTest;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -32,59 +29,53 @@ public class DaqRestControllerTest extends AbstractDaqRestTest {
|
|||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testPulseRangeQuery() throws Exception {
|
public void testPulseRangeQuery() throws Exception {
|
||||||
QueryRange range = new QueryRangeImpl(100l, 101l);
|
|
||||||
PulseRangeQuery request = new PulseRangeQuery(
|
PulseRangeQuery request = new PulseRangeQuery(
|
||||||
Ordering.desc, //ordering
|
|
||||||
Lists.newArrayList(), // channels, DummyQueryProcessor simply returns a fixed list
|
|
||||||
Sets.newLinkedHashSet(DEFAULT_PROPERTIES), // fields
|
|
||||||
BinningStrategyEnum.count,
|
|
||||||
100,
|
100,
|
||||||
false,
|
101
|
||||||
AggregationType.index,
|
);
|
||||||
null,
|
request.setOrdering(Ordering.desc);
|
||||||
range);
|
request.setFields(Sets.newLinkedHashSet(DEFAULT_PROPERTIES));
|
||||||
|
request.setNrOfBins(100);
|
||||||
|
request.setAggregateChannels(false);
|
||||||
|
request.setAggregationType(AggregationType.index);
|
||||||
|
|
||||||
String content = mapper.writeValueAsString(request);
|
String content = mapper.writeValueAsString(request);
|
||||||
|
|
||||||
this.mockMvc
|
this.mockMvc
|
||||||
.perform(MockMvcRequestBuilders.post("/pulserange")
|
.perform(MockMvcRequestBuilders.post("/pulserange")
|
||||||
.contentType(MediaType.APPLICATION_JSON)
|
.contentType(MediaType.APPLICATION_JSON)
|
||||||
.content(content))
|
.content(content))
|
||||||
|
|
||||||
.andDo(MockMvcResultHandlers.print())
|
.andDo(MockMvcResultHandlers.print())
|
||||||
.andExpect(MockMvcResultMatchers.status().isOk())
|
.andExpect(MockMvcResultMatchers.status().isOk())
|
||||||
.andExpect(MockMvcResultMatchers.jsonPath("$").isArray())
|
.andExpect(MockMvcResultMatchers.jsonPath("$").isArray())
|
||||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0]").exists())
|
.andExpect(MockMvcResultMatchers.jsonPath("$[0]").exists())
|
||||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].pulseId").value(100))
|
.andExpect(MockMvcResultMatchers.jsonPath("$[0].pulseId").value(100))
|
||||||
.andExpect(MockMvcResultMatchers.jsonPath("$[0].channel").value("testChannel1"));
|
.andExpect(MockMvcResultMatchers.jsonPath("$[0].channel").value("testChannel1"));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testTimeRangeQuery() throws Exception {
|
public void testTimeRangeQuery() throws Exception {
|
||||||
|
|
||||||
long startTime = new Date().getTime();
|
long startTime = new Date().getTime();
|
||||||
long endTime = startTime + TimeUnit.SECONDS.toMillis(1);
|
long endTime = startTime + TimeUnit.SECONDS.toMillis(1);
|
||||||
QueryRange range = new QueryRangeImpl(startTime, 0, endTime, 0);
|
|
||||||
TimeRangeQuery request = new TimeRangeQuery(
|
TimeRangeQuery request = new TimeRangeQuery(
|
||||||
Ordering.asc,
|
startTime,
|
||||||
Lists.newArrayList("test"),
|
endTime,
|
||||||
Sets.newLinkedHashSet(DEFAULT_PROPERTIES),
|
"test");
|
||||||
BinningStrategyEnum.count,
|
request.setOrdering(Ordering.asc);
|
||||||
100,
|
request.setFields(Sets.newLinkedHashSet(DEFAULT_PROPERTIES));
|
||||||
false,
|
request.setNrOfBins(100);
|
||||||
AggregationType.index,
|
request.setAggregateChannels(false);
|
||||||
null, // aggregations
|
request.setAggregationType(AggregationType.index);
|
||||||
range,
|
|
||||||
null, // startMillis
|
|
||||||
null);
|
|
||||||
|
|
||||||
String content = mapper.writeValueAsString(request);
|
String content = mapper.writeValueAsString(request);
|
||||||
System.out.println(content);
|
System.out.println(content);
|
||||||
|
|
||||||
this.mockMvc
|
this.mockMvc
|
||||||
.perform(MockMvcRequestBuilders.post("/timerange")
|
.perform(MockMvcRequestBuilders.post("/timerange")
|
||||||
.contentType(MediaType.APPLICATION_JSON)
|
.contentType(MediaType.APPLICATION_JSON)
|
||||||
.content(content))
|
.content(content))
|
||||||
|
|
||||||
.andDo(MockMvcResultHandlers.print())
|
.andDo(MockMvcResultHandlers.print())
|
||||||
.andExpect(MockMvcResultMatchers.status().isOk())
|
.andExpect(MockMvcResultMatchers.status().isOk())
|
||||||
@ -97,19 +88,15 @@ public class DaqRestControllerTest extends AbstractDaqRestTest {
|
|||||||
public void testMapper() throws JsonProcessingException {
|
public void testMapper() throws JsonProcessingException {
|
||||||
long startTime = new Date().getTime();
|
long startTime = new Date().getTime();
|
||||||
long endTime = startTime + TimeUnit.SECONDS.toMillis(1);
|
long endTime = startTime + TimeUnit.SECONDS.toMillis(1);
|
||||||
QueryRange range = new QueryRangeImpl(startTime, 0, endTime, 0);
|
|
||||||
TimeRangeQuery request = new TimeRangeQuery(
|
TimeRangeQuery request = new TimeRangeQuery(
|
||||||
Ordering.asc,
|
startTime,
|
||||||
Lists.newArrayList("test"),
|
endTime,
|
||||||
Sets.newLinkedHashSet(DEFAULT_PROPERTIES),
|
"test");
|
||||||
BinningStrategyEnum.count,
|
request.setOrdering(Ordering.asc);
|
||||||
100,
|
request.setFields(Sets.newLinkedHashSet(DEFAULT_PROPERTIES));
|
||||||
false,
|
request.setNrOfBins(100);
|
||||||
AggregationType.index,
|
request.setAggregateChannels(false);
|
||||||
null, // aggregations
|
request.setAggregationType(AggregationType.index);
|
||||||
range,
|
|
||||||
null, // startMillis
|
|
||||||
null);
|
|
||||||
|
|
||||||
String content = mapper.writeValueAsString(request);
|
String content = mapper.writeValueAsString(request);
|
||||||
System.out.println(content);
|
System.out.println(content);
|
||||||
|
Reference in New Issue
Block a user