ATEST-259:
- trying to implement dummy cassandra reader - fixing some of the tests
This commit is contained in:
@ -2,9 +2,11 @@ package ch.psi.daq.queryrest.controller;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Set;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
@ -27,9 +29,11 @@ import org.springframework.web.bind.annotation.RestController;
|
||||
import ch.psi.daq.cassandra.request.validate.RequestProviderValidator;
|
||||
import ch.psi.daq.cassandra.util.test.CassandraDataGen;
|
||||
import ch.psi.daq.common.json.deserialize.AttributeBasedDeserializer;
|
||||
import ch.psi.daq.common.ordering.Ordering;
|
||||
import ch.psi.daq.domain.DataEvent;
|
||||
import ch.psi.daq.query.analyzer.QueryAnalyzer;
|
||||
import ch.psi.daq.query.model.Aggregation;
|
||||
import ch.psi.daq.query.model.AggregationType;
|
||||
import ch.psi.daq.query.model.DBMode;
|
||||
import ch.psi.daq.query.model.Query;
|
||||
import ch.psi.daq.query.model.QueryField;
|
||||
@ -39,6 +43,8 @@ import ch.psi.daq.queryrest.config.QueryRestConfig;
|
||||
import ch.psi.daq.queryrest.model.ChannelsRequest;
|
||||
import ch.psi.daq.queryrest.response.ResponseStreamWriter;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
@RestController
|
||||
public class QueryRestController {
|
||||
|
||||
@ -114,6 +120,68 @@ public class QueryRestController {
|
||||
return getChannels(new ChannelsRequest(channelName));
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Returns the current list of {@link Ordering}s available.
|
||||
*
|
||||
* @return list of {@link Ordering}s as String array
|
||||
*/
|
||||
@RequestMapping(value = "ordering", method = {RequestMethod.GET},
|
||||
produces = {MediaType.APPLICATION_JSON_VALUE})
|
||||
public @ResponseBody List<String> getOrderingValues() {
|
||||
List<Ordering> orderings = Lists.newArrayList(Ordering.values());
|
||||
return orderings.stream()
|
||||
.map((Ordering ord) -> {
|
||||
return ord.toString();
|
||||
}).collect(Collectors.toList());
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the current list of {@link QueryField}s available.
|
||||
*
|
||||
* @return list of {@link QueryField}s as String array
|
||||
*/
|
||||
@RequestMapping(value = "queryfields", method = {RequestMethod.GET},
|
||||
produces = {MediaType.APPLICATION_JSON_VALUE})
|
||||
public @ResponseBody List<String> getQueryFieldValues() {
|
||||
List<QueryField> orderings = Lists.newArrayList(QueryField.values());
|
||||
return orderings.stream()
|
||||
.map((QueryField qf) -> {
|
||||
return qf.toString();
|
||||
}).collect(Collectors.toList());
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the current list of {@link Aggregation}s available.
|
||||
*
|
||||
* @return list of {@link Aggregation}s as String array
|
||||
*/
|
||||
@RequestMapping(value = "aggregations", method = {RequestMethod.GET},
|
||||
produces = {MediaType.APPLICATION_JSON_VALUE})
|
||||
public @ResponseBody List<String> getAggregationsValues() {
|
||||
List<Aggregation> orderings = Lists.newArrayList(Aggregation.values());
|
||||
return orderings.stream()
|
||||
.map((Aggregation value) -> {
|
||||
return value.toString();
|
||||
}).collect(Collectors.toList());
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the current list of {@link AggregationType}s available.
|
||||
*
|
||||
* @return list of {@link AggregationType}s as String array
|
||||
*/
|
||||
@RequestMapping(value = "aggregationtypes", method = {RequestMethod.GET},
|
||||
produces = {MediaType.APPLICATION_JSON_VALUE})
|
||||
public @ResponseBody List<String> getAggregationTypesValues() {
|
||||
List<AggregationType> orderings = Lists.newArrayList(AggregationType.values());
|
||||
return orderings.stream()
|
||||
.map((AggregationType value) -> {
|
||||
return value.toString();
|
||||
}).collect(Collectors.toList());
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Catch-all query method for getting data from the backend.
|
||||
* <p>
|
||||
@ -143,8 +211,8 @@ public class QueryRestController {
|
||||
QueryAnalyzer queryAnalizer = queryAnalizerFactory.apply(query);
|
||||
|
||||
// all the magic happens here
|
||||
Stream<Entry<String, Stream<? extends DataEvent>>> channelToDataEvents = getQueryProcessor(query.getDbMode())
|
||||
.process(queryAnalizer);
|
||||
Stream<Entry<String, Stream<? extends DataEvent>>> channelToDataEvents =
|
||||
getQueryProcessor(query.getDbMode()).process(queryAnalizer);
|
||||
|
||||
// do post-process
|
||||
Stream<Entry<String, ?>> channelToData = queryAnalizer.postProcess(channelToDataEvents);
|
||||
|
@ -9,12 +9,14 @@ import org.springframework.context.annotation.PropertySources;
|
||||
import org.springframework.web.servlet.config.annotation.EnableWebMvc;
|
||||
import org.springframework.web.servlet.config.annotation.WebMvcConfigurationSupport;
|
||||
|
||||
import ch.psi.daq.cassandra.reader.CassandraReader;
|
||||
import ch.psi.daq.domain.reader.DataReader;
|
||||
import ch.psi.daq.query.processor.QueryProcessor;
|
||||
import ch.psi.daq.query.processor.QueryProcessorLocal;
|
||||
import ch.psi.daq.test.cassandra.admin.CassandraTestAdmin;
|
||||
import ch.psi.daq.test.cassandra.admin.CassandraTestAdminImpl;
|
||||
import ch.psi.daq.test.query.config.LocalQueryTestConfig;
|
||||
import ch.psi.daq.test.queryrest.query.DummyCassandraReader;
|
||||
import ch.psi.daq.test.queryrest.query.DummyDataReader;
|
||||
|
||||
@Configuration
|
||||
@ -32,8 +34,8 @@ public class DaqWebMvcConfig extends WebMvcConfigurationSupport {
|
||||
}
|
||||
|
||||
@Bean
|
||||
public QueryProcessor queryProcessor() {
|
||||
return new QueryProcessorLocal(new DummyDataReader());
|
||||
public QueryProcessor cassandraQueryProcessorLocal() {
|
||||
return new QueryProcessorLocal(dataReader());
|
||||
}
|
||||
|
||||
@Bean
|
||||
@ -41,6 +43,10 @@ public class DaqWebMvcConfig extends WebMvcConfigurationSupport {
|
||||
return new DummyDataReader();
|
||||
}
|
||||
|
||||
@Bean CassandraReader cassandraReader() {
|
||||
return new DummyCassandraReader();
|
||||
}
|
||||
|
||||
@Bean
|
||||
public CassandraTestAdmin cassandraTestAdmin() {
|
||||
return new CassandraTestAdminImpl();
|
||||
|
@ -0,0 +1,224 @@
|
||||
/**
|
||||
*
|
||||
*/
|
||||
package ch.psi.daq.test.queryrest.query;
|
||||
|
||||
import java.awt.image.MultiPixelPackedSampleModel;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import org.apache.commons.lang.NotImplementedException;
|
||||
|
||||
import ch.psi.daq.cassandra.reader.CassandraReader;
|
||||
import ch.psi.daq.cassandra.reader.query.PulseIdRangeQuery;
|
||||
import ch.psi.daq.cassandra.reader.query.TimeRangeQuery;
|
||||
import ch.psi.daq.cassandra.util.test.CassandraDataGen;
|
||||
import ch.psi.daq.common.ordering.Ordering;
|
||||
import ch.psi.daq.domain.DataEvent;
|
||||
import ch.psi.daq.domain.cassandra.ChannelEvent;
|
||||
import ch.psi.daq.domain.cassandra.MetaPulseId;
|
||||
import ch.psi.daq.domain.cassandra.MetaTime;
|
||||
import ch.psi.daq.domain.cassandra.querying.ChannelEventQuery;
|
||||
import ch.psi.daq.domain.cassandra.querying.ChannelEventQueryInfo;
|
||||
import ch.psi.daq.domain.cassandra.querying.EventQuery;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
/**
|
||||
* @author zellweger_c
|
||||
*
|
||||
*/
|
||||
public class DummyCassandraReader implements CassandraReader {
|
||||
|
||||
private static final int KEYSPACE = 1;
|
||||
private CassandraDataGen dataGen;
|
||||
private String[] channels;
|
||||
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public DummyCassandraReader() {
|
||||
this.dataGen = new CassandraDataGen();
|
||||
this.channels = new String[]{
|
||||
"testChannel1",
|
||||
"testChannel2",
|
||||
"BooleanScalar",
|
||||
"BooleanWaveform",
|
||||
"ByteScalar",
|
||||
"ByteWaveform",
|
||||
"DoubleScalar",
|
||||
"DoubleWaveform",
|
||||
"FloatScalar",
|
||||
"FloatWaveform",
|
||||
"IntegerScalar",
|
||||
"IntegerWaveform",
|
||||
"LongScalar",
|
||||
"LongWaveform",
|
||||
"ShortScalar",
|
||||
"ShortWaveform",
|
||||
"StringScalar",
|
||||
"UByteScalar",
|
||||
"UByteWaveform",
|
||||
"UIntegerScalar",
|
||||
"UIntegerWaveform",
|
||||
"ULongScalar",
|
||||
"ULongWaveform",
|
||||
"UShortScalar",
|
||||
"UShortWaveform"};
|
||||
}
|
||||
/**
|
||||
* @{inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
public Stream<String> getChannelStream(String regex) {
|
||||
return Stream.of(channels);
|
||||
}
|
||||
|
||||
/**
|
||||
* @{inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
public Stream<? extends DataEvent> getEventStream(String channel, long startPulseId, long endPulseId,
|
||||
Ordering ordering, String... columns) {
|
||||
return getDummyEventStream(channel, startPulseId, endPulseId);
|
||||
}
|
||||
|
||||
/**
|
||||
* @{inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
public Stream<? extends DataEvent> getEventStream(String channel, long startMillis, long startNanos, long endMillis,
|
||||
long endNanos, Ordering ordering, String... columns) {
|
||||
return getDummyEventStream(channel, startMillis, endMillis);
|
||||
}
|
||||
|
||||
/**
|
||||
* @{inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
public Stream<ChannelEvent> getEventStream(EventQuery eventQuery, Stream<? extends ChannelEventQuery> queryProviders) {
|
||||
List<ChannelEvent> result = Lists.newArrayList();
|
||||
queryProviders.forEach(ceq -> {
|
||||
if (ceq instanceof ChannelEventQueryInfo) {
|
||||
result.add(getEvent((ChannelEventQueryInfo) ceq));
|
||||
} else {
|
||||
throw new NotImplementedException("This is not yet implemented!");
|
||||
}
|
||||
});
|
||||
return result.stream();
|
||||
}
|
||||
/**
|
||||
* @{inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
public Stream<ChannelEvent> getEventStream(PulseIdRangeQuery query) {
|
||||
Stream<ChannelEvent> dummyEventStream = getDummyEventStream(query.getChannel(), query.getStartPulseId(), query.getEndPulseId())
|
||||
.map(ce -> { return (ChannelEvent) ce; });
|
||||
return dummyEventStream;
|
||||
|
||||
}
|
||||
/**
|
||||
* @{inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
public Stream<ChannelEvent> getEventStream(TimeRangeQuery query) {
|
||||
Stream<ChannelEvent> dummyEventStream = getDummyEventStream(query.getChannel(), query.getStartMillis(), query.getEndMillis())
|
||||
.map(ce -> { return (ChannelEvent) ce; });
|
||||
return dummyEventStream;
|
||||
}
|
||||
|
||||
|
||||
private Stream<? extends DataEvent> getDummyEventStream(String channel, long startIndex, long endIndex) {
|
||||
return dataGen.generateData(startIndex, (endIndex-startIndex), channel).stream();
|
||||
}
|
||||
|
||||
/**
|
||||
* @{inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
public List<String> getChannels() {
|
||||
return Lists.newArrayList(channels);
|
||||
}
|
||||
|
||||
/**
|
||||
* @{inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
public List<String> getChannels(String regex) {
|
||||
return Lists.newArrayList(channels).stream().filter(s -> { return s.contains(regex); }).collect(Collectors.toList());
|
||||
}
|
||||
|
||||
/**
|
||||
* @{inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
public ChannelEvent getEvent(ChannelEventQueryInfo queryInfo, String... columns) {
|
||||
if (queryInfo.getPulseId() > 0) {
|
||||
return dataGen.generateData(queryInfo.getPulseId(), 1, queryInfo.getChannel()).get(0);
|
||||
}
|
||||
return dataGen.generateData(queryInfo.getGlobalMillis(), 1, queryInfo.getChannel()).get(0);
|
||||
}
|
||||
|
||||
/**
|
||||
* @{inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
public CompletableFuture<ChannelEvent> getEventAsync(ChannelEventQueryInfo queryInfo, String... columns) {
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
|
||||
/**
|
||||
* @{inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
public Stream<? extends ChannelEventQuery> getChannelEventQueryStream(PulseIdRangeQuery query) {
|
||||
return dataGen.generateMetaPulseId(
|
||||
KEYSPACE,
|
||||
query.getStartPulseId(),
|
||||
(query.getEndPulseId() - query.getStartPulseId()),
|
||||
query.getChannel()).stream();
|
||||
}
|
||||
|
||||
/**
|
||||
* @{inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
public Stream<? extends ChannelEventQuery> getChannelEventQueryStream(TimeRangeQuery query) {
|
||||
return dataGen.generateMetaTime(
|
||||
KEYSPACE,
|
||||
query.getStartMillis(),
|
||||
(query.getEndMillis() - query.getStartMillis()),
|
||||
query.getChannel())
|
||||
.stream()
|
||||
.map(e -> {
|
||||
return (ChannelEventQuery) e;
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* @{inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
public Stream<MetaPulseId> getMetaPulseIdStream(PulseIdRangeQuery query) {
|
||||
return dataGen.generateMetaPulseId(
|
||||
KEYSPACE,
|
||||
query.getStartPulseId(), (query.getEndPulseId() - query.getStartPulseId()),
|
||||
query.getChannel()).stream();
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @{inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
public Stream<MetaTime> getMetaTimeStream(TimeRangeQuery query) {
|
||||
return dataGen.generateMetaTime(
|
||||
KEYSPACE,
|
||||
query.getStartMillis(), (query.getEndMillis() - query.getStartMillis()),
|
||||
query.getChannel()).stream();
|
||||
}
|
||||
|
||||
}
|
Reference in New Issue
Block a user