Merge remote-tracking branch 'origin/master' into atest124
Conflicts: src/main/java/ch/psi/daq/queryrest/controller/QueryRestController.java src/test/java/ch/psi/daq/test/queryrest/DaqWebMvcConfig.java
This commit is contained in:
@ -30,9 +30,9 @@ import com.fasterxml.jackson.databind.module.SimpleModule;
|
|||||||
import ch.psi.daq.cassandra.util.test.CassandraDataGen;
|
import ch.psi.daq.cassandra.util.test.CassandraDataGen;
|
||||||
import ch.psi.daq.common.json.deserialize.AttributeBasedDeserializer;
|
import ch.psi.daq.common.json.deserialize.AttributeBasedDeserializer;
|
||||||
import ch.psi.daq.common.statistic.StorelessStatistics;
|
import ch.psi.daq.common.statistic.StorelessStatistics;
|
||||||
import ch.psi.daq.domain.cassandra.DataEvent;
|
import ch.psi.daq.domain.DataEvent;
|
||||||
import ch.psi.daq.query.analyzer.CassandraQueryAnalyzer;
|
|
||||||
import ch.psi.daq.query.analyzer.QueryAnalyzer;
|
import ch.psi.daq.query.analyzer.QueryAnalyzer;
|
||||||
|
import ch.psi.daq.query.analyzer.QueryAnalyzerImpl;
|
||||||
import ch.psi.daq.query.config.QueryConfig;
|
import ch.psi.daq.query.config.QueryConfig;
|
||||||
import ch.psi.daq.query.model.AbstractQuery;
|
import ch.psi.daq.query.model.AbstractQuery;
|
||||||
import ch.psi.daq.query.model.Aggregation;
|
import ch.psi.daq.query.model.Aggregation;
|
||||||
@ -119,7 +119,7 @@ public class QueryRestConfig extends WebMvcConfigurerAdapter {
|
|||||||
|
|
||||||
@Bean
|
@Bean
|
||||||
public Function<Query, QueryAnalyzer> queryAnalizerFactory() {
|
public Function<Query, QueryAnalyzer> queryAnalizerFactory() {
|
||||||
return (query) -> new CassandraQueryAnalyzer(query);
|
return (query) -> new QueryAnalyzerImpl(query);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Bean
|
@Bean
|
||||||
|
@ -24,14 +24,16 @@ import org.springframework.web.bind.annotation.ResponseBody;
|
|||||||
import org.springframework.web.bind.annotation.RestController;
|
import org.springframework.web.bind.annotation.RestController;
|
||||||
|
|
||||||
import ch.psi.daq.cassandra.util.test.CassandraDataGen;
|
import ch.psi.daq.cassandra.util.test.CassandraDataGen;
|
||||||
import ch.psi.daq.domain.cassandra.DataEvent;
|
import ch.psi.daq.domain.DataEvent;
|
||||||
import ch.psi.daq.query.analyzer.QueryAnalyzer;
|
import ch.psi.daq.query.analyzer.QueryAnalyzer;
|
||||||
import ch.psi.daq.query.model.AbstractQuery;
|
import ch.psi.daq.query.model.AbstractQuery;
|
||||||
import ch.psi.daq.query.model.Aggregation;
|
import ch.psi.daq.query.model.Aggregation;
|
||||||
|
import ch.psi.daq.query.model.DBMode;
|
||||||
import ch.psi.daq.query.model.Query;
|
import ch.psi.daq.query.model.Query;
|
||||||
import ch.psi.daq.query.model.QueryField;
|
import ch.psi.daq.query.model.QueryField;
|
||||||
import ch.psi.daq.query.processor.QueryProcessor;
|
import ch.psi.daq.query.processor.QueryProcessor;
|
||||||
import ch.psi.daq.queryrest.config.QueryRestConfig;
|
import ch.psi.daq.queryrest.config.QueryRestConfig;
|
||||||
|
import ch.psi.daq.queryrest.model.ChannelsRequest;
|
||||||
import ch.psi.daq.queryrest.response.ResponseStreamWriter;
|
import ch.psi.daq.queryrest.response.ResponseStreamWriter;
|
||||||
|
|
||||||
@RestController
|
@RestController
|
||||||
@ -40,7 +42,6 @@ public class QueryRestController {
|
|||||||
private static final Logger LOGGER = LoggerFactory.getLogger(QueryRestController.class);
|
private static final Logger LOGGER = LoggerFactory.getLogger(QueryRestController.class);
|
||||||
|
|
||||||
public static final String CHANNELS = "channels";
|
public static final String CHANNELS = "channels";
|
||||||
public static final String CHANNELS_REGEX = CHANNELS + "/{regex}";
|
|
||||||
public static final String QUERY = "query";
|
public static final String QUERY = "query";
|
||||||
|
|
||||||
@Resource
|
@Resource
|
||||||
@ -50,7 +51,10 @@ public class QueryRestController {
|
|||||||
private ResponseStreamWriter responseStreamWriter;
|
private ResponseStreamWriter responseStreamWriter;
|
||||||
|
|
||||||
@Resource
|
@Resource
|
||||||
private QueryProcessor queryProcessor;
|
private QueryProcessor cassandraQueryProcessor;
|
||||||
|
|
||||||
|
@Resource
|
||||||
|
private QueryProcessor archiverApplianceQueryProcessor;
|
||||||
|
|
||||||
@Resource
|
@Resource
|
||||||
private Function<Query, QueryAnalyzer> queryAnalizerFactory;
|
private Function<Query, QueryAnalyzer> queryAnalizerFactory;
|
||||||
@ -72,30 +76,23 @@ public class QueryRestController {
|
|||||||
|
|
||||||
@RequestMapping(
|
@RequestMapping(
|
||||||
value = CHANNELS,
|
value = CHANNELS,
|
||||||
consumes = {MediaType.APPLICATION_JSON_VALUE},
|
method = {RequestMethod.GET, RequestMethod.POST},
|
||||||
method = RequestMethod.GET)
|
produces = {MediaType.APPLICATION_JSON_VALUE})
|
||||||
public @ResponseBody Collection<String> getChannels() throws Throwable {
|
public @ResponseBody Collection<String> getChannels(@RequestBody(required = false) ChannelsRequest request)
|
||||||
|
throws Throwable {
|
||||||
|
// in case not specified use default (e.g. GET)
|
||||||
|
if (request == null) {
|
||||||
|
request = new ChannelsRequest();
|
||||||
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
return queryProcessor.getChannels();
|
return getQueryProcessor(request.getDbMode()).getChannels(request.getRegex());
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
LOGGER.error("Failed to query channel names.", t);
|
LOGGER.error("Failed to query channel names.", t);
|
||||||
throw t;
|
throw t;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@RequestMapping(
|
|
||||||
value = CHANNELS_REGEX,
|
|
||||||
method = RequestMethod.POST,
|
|
||||||
consumes = {MediaType.APPLICATION_JSON_VALUE})
|
|
||||||
public @ResponseBody Collection<String> getChannels(@RequestBody String regex) throws Throwable {
|
|
||||||
try {
|
|
||||||
return queryProcessor.getChannels(regex);
|
|
||||||
} catch (Throwable t) {
|
|
||||||
LOGGER.error("Failed to query channel names with regex '{}'.", regex, t);
|
|
||||||
throw t;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@RequestMapping(
|
@RequestMapping(
|
||||||
value = QUERY,
|
value = QUERY,
|
||||||
method = RequestMethod.POST,
|
method = RequestMethod.POST,
|
||||||
@ -108,7 +105,7 @@ public class QueryRestController {
|
|||||||
queryAnalizer.validate();
|
queryAnalizer.validate();
|
||||||
|
|
||||||
// all the magic happens here
|
// all the magic happens here
|
||||||
Stream<Entry<String, Stream<? extends DataEvent>>> channelToDataEvents = queryProcessor.process(queryAnalizer);
|
Stream<Entry<String, Stream<? extends DataEvent>>> channelToDataEvents = getQueryProcessor(query.getDBMode()).process(queryAnalizer);
|
||||||
|
|
||||||
// do post-process
|
// do post-process
|
||||||
Stream<Entry<String, ?>> channelToData = queryAnalizer.postProcess(channelToDataEvents);
|
Stream<Entry<String, ?>> channelToData = queryAnalizer.postProcess(channelToDataEvents);
|
||||||
@ -121,6 +118,16 @@ public class QueryRestController {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private QueryProcessor getQueryProcessor(DBMode dbMode) {
|
||||||
|
if (DBMode.databuffer.equals(dbMode)) {
|
||||||
|
return cassandraQueryProcessor;
|
||||||
|
} else if (DBMode.archiverappliance.equals(dbMode)) {
|
||||||
|
return archiverApplianceQueryProcessor;
|
||||||
|
} else {
|
||||||
|
LOGGER.error("Unknown DBMode '{}'!", dbMode);
|
||||||
|
throw new IllegalArgumentException(String.format("Unknown DBMode '%s'", dbMode));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// ==========================================================================================
|
// ==========================================================================================
|
||||||
// TODO: This is simply for initial / rudimentary testing - remove once further evolved
|
// TODO: This is simply for initial / rudimentary testing - remove once further evolved
|
||||||
|
@ -0,0 +1,37 @@
|
|||||||
|
package ch.psi.daq.queryrest.model;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.annotation.JsonInclude;
|
||||||
|
import com.fasterxml.jackson.annotation.JsonInclude.Include;
|
||||||
|
|
||||||
|
import ch.psi.daq.query.model.DBMode;
|
||||||
|
|
||||||
|
// as RequestBody due to special chars in regex
|
||||||
|
@JsonInclude(Include.NON_DEFAULT)
|
||||||
|
public class ChannelsRequest {
|
||||||
|
private DBMode dbMode = DBMode.databuffer;
|
||||||
|
// null for no regex
|
||||||
|
private String regex = null;
|
||||||
|
|
||||||
|
public ChannelsRequest() {}
|
||||||
|
|
||||||
|
public ChannelsRequest(DBMode dbMode, String regex) {
|
||||||
|
this.regex = regex;
|
||||||
|
this.dbMode = dbMode;
|
||||||
|
}
|
||||||
|
|
||||||
|
public DBMode getDbMode() {
|
||||||
|
return dbMode;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setDbMode(DBMode dbMode) {
|
||||||
|
this.dbMode = dbMode;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getRegex() {
|
||||||
|
return regex;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setRegex(String regex) {
|
||||||
|
this.regex = regex;
|
||||||
|
}
|
||||||
|
}
|
@ -9,9 +9,9 @@ import org.springframework.context.annotation.PropertySources;
|
|||||||
import org.springframework.web.servlet.config.annotation.EnableWebMvc;
|
import org.springframework.web.servlet.config.annotation.EnableWebMvc;
|
||||||
import org.springframework.web.servlet.config.annotation.WebMvcConfigurationSupport;
|
import org.springframework.web.servlet.config.annotation.WebMvcConfigurationSupport;
|
||||||
|
|
||||||
import ch.psi.daq.cassandra.reader.DataReader;
|
import ch.psi.daq.domain.reader.DataReader;
|
||||||
import ch.psi.daq.query.processor.QueryProcessor;
|
import ch.psi.daq.query.processor.QueryProcessor;
|
||||||
import ch.psi.daq.query.processor.cassandra.CassandraQueryProcessorLocal;
|
import ch.psi.daq.query.processor.QueryProcessorLocal;
|
||||||
import ch.psi.daq.test.cassandra.admin.CassandraAdmin;
|
import ch.psi.daq.test.cassandra.admin.CassandraAdmin;
|
||||||
import ch.psi.daq.test.cassandra.admin.CassandraAdminImpl;
|
import ch.psi.daq.test.cassandra.admin.CassandraAdminImpl;
|
||||||
import ch.psi.daq.test.query.config.LocalQueryTestConfig;
|
import ch.psi.daq.test.query.config.LocalQueryTestConfig;
|
||||||
@ -33,7 +33,7 @@ public class DaqWebMvcConfig extends WebMvcConfigurationSupport {
|
|||||||
|
|
||||||
@Bean
|
@Bean
|
||||||
public QueryProcessor queryProcessor() {
|
public QueryProcessor queryProcessor() {
|
||||||
return new CassandraQueryProcessorLocal();
|
return new QueryProcessorLocal(new DummyDataReader());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Bean
|
@Bean
|
||||||
|
@ -7,10 +7,10 @@ import java.util.stream.Stream;
|
|||||||
|
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
|
|
||||||
import ch.psi.daq.cassandra.reader.DataReader;
|
|
||||||
import ch.psi.daq.common.ordering.Ordering;
|
import ch.psi.daq.common.ordering.Ordering;
|
||||||
|
import ch.psi.daq.domain.DataEvent;
|
||||||
import ch.psi.daq.domain.cassandra.ChannelEvent;
|
import ch.psi.daq.domain.cassandra.ChannelEvent;
|
||||||
import ch.psi.daq.domain.cassandra.DataEvent;
|
import ch.psi.daq.domain.reader.DataReader;
|
||||||
|
|
||||||
public class DummyDataReader implements DataReader {
|
public class DummyDataReader implements DataReader {
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user