ATEST-81:
- implementing dummy write method for manual testing
This commit is contained in:
@ -7,8 +7,8 @@ import org.springframework.boot.context.web.SpringBootServletInitializer;
|
|||||||
import org.springframework.context.annotation.ComponentScan;
|
import org.springframework.context.annotation.ComponentScan;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Entry point to our rest-frontend of the Swissfel application which most importantly wires all the @RestController
|
* Entry point to our rest-frontend of the data acquisition (DAQ) application which most importantly
|
||||||
* annotated classes.
|
* wires all the @RestController annotated classes.
|
||||||
* <p>
|
* <p>
|
||||||
*
|
*
|
||||||
* This acts as a @Configuration class for Spring. As such it has @ComponentScan annotation that
|
* This acts as a @Configuration class for Spring. As such it has @ComponentScan annotation that
|
||||||
|
@ -24,8 +24,6 @@ import com.hazelcast.spring.context.SpringManagedContext;
|
|||||||
import ch.psi.daq.common.statistic.StorelessStatistics;
|
import ch.psi.daq.common.statistic.StorelessStatistics;
|
||||||
import ch.psi.daq.domain.cassandra.ChannelEvent;
|
import ch.psi.daq.domain.cassandra.ChannelEvent;
|
||||||
import ch.psi.daq.hazelcast.config.HazelcastConfig;
|
import ch.psi.daq.hazelcast.config.HazelcastConfig;
|
||||||
import ch.psi.daq.hazelcast.query.processor.QueryProcessor;
|
|
||||||
import ch.psi.daq.hazelcast.query.processor.cassandra.CassandraQueryProcessorDistributed;
|
|
||||||
import ch.psi.daq.rest.ResponseStreamWriter;
|
import ch.psi.daq.rest.ResponseStreamWriter;
|
||||||
import ch.psi.daq.rest.model.PropertyFilterMixin;
|
import ch.psi.daq.rest.model.PropertyFilterMixin;
|
||||||
|
|
||||||
@ -36,13 +34,20 @@ public class RestConfig {
|
|||||||
|
|
||||||
private static final Logger logger = LoggerFactory.getLogger(RestConfig.class);
|
private static final Logger logger = LoggerFactory.getLogger(RestConfig.class);
|
||||||
|
|
||||||
|
// TODO refactor - also defined in HazelcastNodeConfig
|
||||||
|
private static final String HAZELCAST_GROUP_PASSWORD = "hazelcast.group.password";
|
||||||
|
private static final String HAZELCAST_GROUP_NAME = "hazelcast.group.name";
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
private Environment env;
|
private Environment env;
|
||||||
|
|
||||||
@Bean
|
// a nested configuration
|
||||||
public QueryProcessor queryProcessor() {
|
// this guarantees that the ordering of the properties file is as expected
|
||||||
return new CassandraQueryProcessorDistributed();
|
// see:
|
||||||
}
|
// https://jira.spring.io/browse/SPR-10409?focusedCommentId=101393&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-101393
|
||||||
|
// @Configuration
|
||||||
|
// @Import(CassandraConfig.class)
|
||||||
|
// static class InnerConfiguration { }
|
||||||
|
|
||||||
@Bean
|
@Bean
|
||||||
public JsonFactory jsonFactory() {
|
public JsonFactory jsonFactory() {
|
||||||
@ -63,7 +68,16 @@ public class RestConfig {
|
|||||||
ClientConfig config = new ClientConfig();
|
ClientConfig config = new ClientConfig();
|
||||||
config.setManagedContext(managedContext());
|
config.setManagedContext(managedContext());
|
||||||
|
|
||||||
config.getNetworkConfig().setAddresses(hazelcastMembers());
|
config.getNetworkConfig().setAddresses(hazelcastInitialCandidates());
|
||||||
|
|
||||||
|
String groupName = env.getProperty(HAZELCAST_GROUP_NAME);
|
||||||
|
if (groupName != null) {
|
||||||
|
config.getGroupConfig().setName(groupName);
|
||||||
|
}
|
||||||
|
String groupPassword = env.getProperty(HAZELCAST_GROUP_PASSWORD);
|
||||||
|
if (groupPassword != null) {
|
||||||
|
config.getGroupConfig().setPassword(groupPassword);
|
||||||
|
}
|
||||||
|
|
||||||
return config;
|
return config;
|
||||||
}
|
}
|
||||||
@ -73,8 +87,7 @@ public class RestConfig {
|
|||||||
return new SpringManagedContext();
|
return new SpringManagedContext();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Bean
|
private List<String> hazelcastInitialCandidates() {
|
||||||
public List<String> hazelcastMembers() {
|
|
||||||
List<String> clusterMembers = Arrays.asList(StringUtils.commaDelimitedListToStringArray(env.getProperty("hazelcast.initialcandidates")));
|
List<String> clusterMembers = Arrays.asList(StringUtils.commaDelimitedListToStringArray(env.getProperty("hazelcast.initialcandidates")));
|
||||||
logger.info("The following hosts have been defined to form a Hazelcast cluster: {}", clusterMembers);
|
logger.info("The following hosts have been defined to form a Hazelcast cluster: {}", clusterMembers);
|
||||||
return clusterMembers;
|
return clusterMembers;
|
||||||
@ -92,11 +105,4 @@ public class RestConfig {
|
|||||||
return mapper;
|
return mapper;
|
||||||
}
|
}
|
||||||
|
|
||||||
// a nested configuration
|
|
||||||
// this guarantees that the ordering of the properties file is as expected
|
|
||||||
// see:
|
|
||||||
// https://jira.spring.io/browse/SPR-10409?focusedCommentId=101393&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-101393
|
|
||||||
// @Configuration
|
|
||||||
// @Import(CassandraConfig.class)
|
|
||||||
// static class InnerConfiguration { }
|
|
||||||
}
|
}
|
||||||
|
@ -1,9 +1,15 @@
|
|||||||
package ch.psi.daq.rest.controller;
|
package ch.psi.daq.rest.controller;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.UUID;
|
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.CompletableFuture;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.function.LongFunction;
|
||||||
|
import java.util.function.LongUnaryOperator;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
import java.util.stream.LongStream;
|
||||||
import java.util.stream.Stream;
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
import javax.servlet.http.HttpServletResponse;
|
import javax.servlet.http.HttpServletResponse;
|
||||||
@ -11,11 +17,13 @@ import javax.servlet.http.HttpServletResponse;
|
|||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.util.Assert;
|
||||||
import org.springframework.web.bind.annotation.RequestBody;
|
import org.springframework.web.bind.annotation.RequestBody;
|
||||||
import org.springframework.web.bind.annotation.RequestMapping;
|
import org.springframework.web.bind.annotation.RequestMapping;
|
||||||
import org.springframework.web.bind.annotation.RestController;
|
import org.springframework.web.bind.annotation.RestController;
|
||||||
|
|
||||||
import ch.psi.daq.cassandra.writer.CassandraWriter;
|
import ch.psi.daq.cassandra.writer.CassandraWriter;
|
||||||
|
import ch.psi.daq.domain.DataType;
|
||||||
import ch.psi.daq.domain.cassandra.ChannelEvent;
|
import ch.psi.daq.domain.cassandra.ChannelEvent;
|
||||||
import ch.psi.daq.domain.cassandra.DataEvent;
|
import ch.psi.daq.domain.cassandra.DataEvent;
|
||||||
import ch.psi.daq.hazelcast.query.processor.QueryProcessor;
|
import ch.psi.daq.hazelcast.query.processor.QueryProcessor;
|
||||||
@ -39,7 +47,6 @@ public class DaqRestController {
|
|||||||
private QueryProcessor queryProcessor;
|
private QueryProcessor queryProcessor;
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
@RequestMapping(value = "/pulserange")
|
@RequestMapping(value = "/pulserange")
|
||||||
public void pulseRange(@RequestBody PulseRangeQuery query, HttpServletResponse res) throws IOException {
|
public void pulseRange(@RequestBody PulseRangeQuery query, HttpServletResponse res) throws IOException {
|
||||||
|
|
||||||
@ -71,20 +78,68 @@ public class DaqRestController {
|
|||||||
responseStreamWriter.respond(flatStreams, query, res);
|
responseStreamWriter.respond(flatStreams, query, res);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ==========================================================================================
|
||||||
|
// TODO this is simply a method for initial / rudimentary testing - remove once further evolved
|
||||||
@RequestMapping(value = "/write")
|
@RequestMapping(value = "/write")
|
||||||
public void writeDummyEntry() {
|
public long writeDummyEntry() {
|
||||||
int pulseId = 100;
|
|
||||||
ChannelEvent event = new ChannelEvent(
|
|
||||||
"dummy-test",
|
|
||||||
pulseId,
|
|
||||||
0,
|
|
||||||
pulseId,
|
|
||||||
pulseId,
|
|
||||||
0,
|
|
||||||
"data_" + UUID.randomUUID().toString());
|
|
||||||
|
|
||||||
CompletableFuture<Void> future = cassandraWriter.writeAsync(3, 0, event);
|
long startIndex = System.currentTimeMillis();
|
||||||
|
|
||||||
|
writeData(3,startIndex, 100, new String[]{"channel1", "channel2"});
|
||||||
|
|
||||||
|
return startIndex;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void writeData(int dataReplication, long startIndex, long nrOfElements,
|
||||||
|
String... channelNames) {
|
||||||
|
writeData(dataReplication, startIndex, nrOfElements,
|
||||||
|
i -> 2 * i,
|
||||||
|
i -> 2 * i,
|
||||||
|
i -> 2 * i,
|
||||||
|
i -> 2 * i,
|
||||||
|
i -> 2 * i,
|
||||||
|
i -> Long.valueOf(2 * i),
|
||||||
|
channelNames);
|
||||||
|
}
|
||||||
|
|
||||||
|
private <T> void writeData(int dataReplication, long startIndex, long nrOfElements,
|
||||||
|
LongUnaryOperator iocMillis, LongUnaryOperator iocNanos, LongUnaryOperator pulseIds,
|
||||||
|
LongUnaryOperator globalMillis, LongUnaryOperator globalNanos, LongFunction<T> valueFunction,
|
||||||
|
String... channelNames) {
|
||||||
|
|
||||||
|
Assert.notNull(channelNames);
|
||||||
|
|
||||||
|
CompletableFuture<Void> future;
|
||||||
|
|
||||||
|
List<ChannelEvent> events =
|
||||||
|
Arrays.stream(channelNames)
|
||||||
|
.parallel()
|
||||||
|
.flatMap(
|
||||||
|
channelName -> {
|
||||||
|
Stream<ChannelEvent> stream =
|
||||||
|
LongStream
|
||||||
|
.range(startIndex, startIndex + nrOfElements)
|
||||||
|
.parallel()
|
||||||
|
.mapToObj(
|
||||||
|
i -> {
|
||||||
|
Object value = valueFunction.apply(i);
|
||||||
|
return new ChannelEvent(channelName,
|
||||||
|
iocMillis.applyAsLong(i),
|
||||||
|
iocNanos.applyAsLong(i),
|
||||||
|
pulseIds.applyAsLong(i),
|
||||||
|
globalMillis.applyAsLong(i),
|
||||||
|
globalNanos.applyAsLong(i),
|
||||||
|
value,
|
||||||
|
DataType.getTypeName(value.getClass())
|
||||||
|
);
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
return stream;
|
||||||
|
})
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
|
||||||
|
future = cassandraWriter.writeAsync(dataReplication, (int) TimeUnit.HOURS.toSeconds(1), events);
|
||||||
future.join();
|
future.join();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -4,3 +4,7 @@ server.port=8080
|
|||||||
# defines the list of hosts who are tried for an initial connection to the cluster
|
# defines the list of hosts who are tried for an initial connection to the cluster
|
||||||
#hazelcast.members=sf-nube-11.psi.ch,sf-nube-12.psi.ch,sf-nube-13.psi.ch,sf-nube-14.psi.ch
|
#hazelcast.members=sf-nube-11.psi.ch,sf-nube-12.psi.ch,sf-nube-13.psi.ch,sf-nube-14.psi.ch
|
||||||
hazelcast.initialcandidates=localhost
|
hazelcast.initialcandidates=localhost
|
||||||
|
|
||||||
|
# defines the cluster group and its password
|
||||||
|
hazelcast.group.name=QueryCluster
|
||||||
|
hazelcast.group.password=d9zT5h*4!KAHesxdnDm7
|
||||||
|
Reference in New Issue
Block a user