diff --git a/src/main/java/ch/psi/daq/rest/RestApplication.java b/src/main/java/ch/psi/daq/rest/RestApplication.java index 86a2c04..10d391f 100644 --- a/src/main/java/ch/psi/daq/rest/RestApplication.java +++ b/src/main/java/ch/psi/daq/rest/RestApplication.java @@ -7,8 +7,8 @@ import org.springframework.boot.context.web.SpringBootServletInitializer; import org.springframework.context.annotation.ComponentScan; /** - * Entry point to our rest-frontend of the Swissfel application which most importantly wires all the @RestController - * annotated classes. + * Entry point to our rest-frontend of the data acquisition (DAQ) application which most importantly + * wires all the @RestController annotated classes. *

* * This acts as a @Configuration class for Spring. As such it has @ComponentScan annotation that diff --git a/src/main/java/ch/psi/daq/rest/config/RestConfig.java b/src/main/java/ch/psi/daq/rest/config/RestConfig.java index 724ae67..a9e73a5 100644 --- a/src/main/java/ch/psi/daq/rest/config/RestConfig.java +++ b/src/main/java/ch/psi/daq/rest/config/RestConfig.java @@ -24,8 +24,6 @@ import com.hazelcast.spring.context.SpringManagedContext; import ch.psi.daq.common.statistic.StorelessStatistics; import ch.psi.daq.domain.cassandra.ChannelEvent; import ch.psi.daq.hazelcast.config.HazelcastConfig; -import ch.psi.daq.hazelcast.query.processor.QueryProcessor; -import ch.psi.daq.hazelcast.query.processor.cassandra.CassandraQueryProcessorDistributed; import ch.psi.daq.rest.ResponseStreamWriter; import ch.psi.daq.rest.model.PropertyFilterMixin; @@ -36,14 +34,21 @@ public class RestConfig { private static final Logger logger = LoggerFactory.getLogger(RestConfig.class); + // TODO refactor - also defined in HazelcastNodeConfig + private static final String HAZELCAST_GROUP_PASSWORD = "hazelcast.group.password"; + private static final String HAZELCAST_GROUP_NAME = "hazelcast.group.name"; + @Autowired private Environment env; - @Bean - public QueryProcessor queryProcessor() { - return new CassandraQueryProcessorDistributed(); - } - + // a nested configuration + // this guarantees that the ordering of the properties file is as expected + // see: + // https://jira.spring.io/browse/SPR-10409?focusedCommentId=101393&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-101393 + // @Configuration + // @Import(CassandraConfig.class) + // static class InnerConfiguration { } + @Bean public JsonFactory jsonFactory() { return new JsonFactory(); @@ -63,7 +68,16 @@ public class RestConfig { ClientConfig config = new ClientConfig(); config.setManagedContext(managedContext()); - config.getNetworkConfig().setAddresses(hazelcastMembers()); + config.getNetworkConfig().setAddresses(hazelcastInitialCandidates()); + + String groupName = env.getProperty(HAZELCAST_GROUP_NAME); + if (groupName != null) { + config.getGroupConfig().setName(groupName); + } + String groupPassword = env.getProperty(HAZELCAST_GROUP_PASSWORD); + if (groupPassword != null) { + config.getGroupConfig().setPassword(groupPassword); + } return config; } @@ -73,8 +87,7 @@ public class RestConfig { return new SpringManagedContext(); } - @Bean - public List hazelcastMembers() { + private List hazelcastInitialCandidates() { List clusterMembers = Arrays.asList(StringUtils.commaDelimitedListToStringArray(env.getProperty("hazelcast.initialcandidates"))); logger.info("The following hosts have been defined to form a Hazelcast cluster: {}", clusterMembers); return clusterMembers; @@ -92,11 +105,4 @@ public class RestConfig { return mapper; } - // a nested configuration - // this guarantees that the ordering of the properties file is as expected - // see: - // https://jira.spring.io/browse/SPR-10409?focusedCommentId=101393&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-101393 - // @Configuration - // @Import(CassandraConfig.class) - // static class InnerConfiguration { } } diff --git a/src/main/java/ch/psi/daq/rest/controller/DaqRestController.java b/src/main/java/ch/psi/daq/rest/controller/DaqRestController.java index 99d34c6..2b2109c 100644 --- a/src/main/java/ch/psi/daq/rest/controller/DaqRestController.java +++ b/src/main/java/ch/psi/daq/rest/controller/DaqRestController.java @@ -1,9 +1,15 @@ package ch.psi.daq.rest.controller; import java.io.IOException; +import java.util.Arrays; +import java.util.List; import java.util.Map; -import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.function.LongFunction; +import java.util.function.LongUnaryOperator; +import java.util.stream.Collectors; +import java.util.stream.LongStream; import java.util.stream.Stream; import javax.servlet.http.HttpServletResponse; @@ -11,11 +17,13 @@ import javax.servlet.http.HttpServletResponse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.util.Assert; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import ch.psi.daq.cassandra.writer.CassandraWriter; +import ch.psi.daq.domain.DataType; import ch.psi.daq.domain.cassandra.ChannelEvent; import ch.psi.daq.domain.cassandra.DataEvent; import ch.psi.daq.hazelcast.query.processor.QueryProcessor; @@ -38,7 +46,6 @@ public class DaqRestController { @Autowired private QueryProcessor queryProcessor; - @RequestMapping(value = "/pulserange") public void pulseRange(@RequestBody PulseRangeQuery query, HttpServletResponse res) throws IOException { @@ -71,20 +78,68 @@ public class DaqRestController { responseStreamWriter.respond(flatStreams, query, res); } - + // ========================================================================================== + // TODO this is simply a method for initial / rudimentary testing - remove once further evolved @RequestMapping(value = "/write") - public void writeDummyEntry() { - int pulseId = 100; - ChannelEvent event = new ChannelEvent( - "dummy-test", - pulseId, - 0, - pulseId, - pulseId, - 0, - "data_" + UUID.randomUUID().toString()); + public long writeDummyEntry() { - CompletableFuture future = cassandraWriter.writeAsync(3, 0, event); + long startIndex = System.currentTimeMillis(); + + writeData(3,startIndex, 100, new String[]{"channel1", "channel2"}); + + return startIndex; + } + + private void writeData(int dataReplication, long startIndex, long nrOfElements, + String... channelNames) { + writeData(dataReplication, startIndex, nrOfElements, + i -> 2 * i, + i -> 2 * i, + i -> 2 * i, + i -> 2 * i, + i -> 2 * i, + i -> Long.valueOf(2 * i), + channelNames); + } + + private void writeData(int dataReplication, long startIndex, long nrOfElements, + LongUnaryOperator iocMillis, LongUnaryOperator iocNanos, LongUnaryOperator pulseIds, + LongUnaryOperator globalMillis, LongUnaryOperator globalNanos, LongFunction valueFunction, + String... channelNames) { + + Assert.notNull(channelNames); + + CompletableFuture future; + + List events = + Arrays.stream(channelNames) + .parallel() + .flatMap( + channelName -> { + Stream stream = + LongStream + .range(startIndex, startIndex + nrOfElements) + .parallel() + .mapToObj( + i -> { + Object value = valueFunction.apply(i); + return new ChannelEvent(channelName, + iocMillis.applyAsLong(i), + iocNanos.applyAsLong(i), + pulseIds.applyAsLong(i), + globalMillis.applyAsLong(i), + globalNanos.applyAsLong(i), + value, + DataType.getTypeName(value.getClass()) + ); + } + ); + + return stream; + }) + .collect(Collectors.toList()); + + future = cassandraWriter.writeAsync(dataReplication, (int) TimeUnit.HOURS.toSeconds(1), events); future.join(); } } diff --git a/src/main/resources/rest.properties b/src/main/resources/rest.properties index 9873024..7999931 100644 --- a/src/main/resources/rest.properties +++ b/src/main/resources/rest.properties @@ -3,4 +3,8 @@ server.port=8080 # defines the list of hosts who are tried for an initial connection to the cluster #hazelcast.members=sf-nube-11.psi.ch,sf-nube-12.psi.ch,sf-nube-13.psi.ch,sf-nube-14.psi.ch -hazelcast.initialcandidates=localhost \ No newline at end of file +hazelcast.initialcandidates=localhost + +# defines the cluster group and its password +hazelcast.group.name=QueryCluster +hazelcast.group.password=d9zT5h*4!KAHesxdnDm7