Added EventBus to collector

This commit is contained in:
2013-10-04 11:16:08 +02:00
parent 83dd7654ed
commit b9c15b0fc8
5 changed files with 127 additions and 254 deletions

View File

@@ -30,12 +30,14 @@ import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.logging.FileHandler;
import java.util.logging.Handler;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.logging.SimpleFormatter;
import com.google.common.eventbus.AsyncEventBus;
import com.google.common.eventbus.EventBus;
import ch.psi.fda.core.ActionLoop;
@@ -126,13 +128,11 @@ import ch.psi.jcae.ChannelBeanFactory;
*/
public class Acquisition {
// Get Logger
private static Logger logger = Logger.getLogger(Acquisition.class.getName());
private AcquisitionConfiguration configuration;
private ActionLoop actionLoop;
private Collector collector;
private Manipulator manipulator;
private DataSerializerTXT serializer;
@@ -143,17 +143,16 @@ public class Acquisition {
private Handler logHandler = null;
private Collector col;
/**
* Name of the datafile
*/
private File datafile;
// private Thread acquisitionThread = null;
public Acquisition(){
configuration = AcquisitionConfiguration.getInstance();
actionLoop = null;
collector = new Collector();
manipulations = new ArrayList<Manipulation>();
}
@@ -236,18 +235,20 @@ public class Acquisition {
logger.fine("Map Model to internal logic");
EventBus b = new AsyncEventBus(Executors.newCachedThreadPool());
// Map scan to base model
// After this call actionLoop and collector will be initialized
mapScan(smodel);
Collector collector = new Collector(b);
mapScan(collector, smodel);
col = collector;
logger.fine("ActionLoop and Collector initialized");
// TODO Remove this workaround
Collections.reverse(collector.getQueues());
// Add manipulator into processing chain
this.manipulator = new Manipulator(bus, collector.getOutQueue(), this.manipulations);
this.manipulator = new Manipulator(bus, collector.getMetadata(), this.manipulations);
b.register(this.manipulator);
DataMessageMetadata metadata = manipulator.getMetadata();
@@ -268,12 +269,9 @@ public class Acquisition {
try{
active = true;
Thread tc = new Thread(collector);
Thread tc = new Thread(col);
tc.start();
Thread tm = new Thread(manipulator);
tm.start();
actionLoop.prepare();
actionLoop.execute();
actionLoop.cleanup();
@@ -283,7 +281,6 @@ public class Acquisition {
// Give the threads 1 minute to catch up
tc.join(60000);
tm.join(60000);
// Send notifications out to all recipients that want to have success notifications
try {
@@ -292,8 +289,6 @@ public class Acquisition {
} catch (UnknownHostException e1) {
logger.log(Level.WARNING, "Unable to send notification", e1);
}
// active = false;
}
catch(RuntimeException e){
logger.log(Level.WARNING, "Execution failed: ", e);
@@ -402,7 +397,7 @@ public class Acquisition {
* Map scan to base model
* @param scan
*/
private void mapScan(Configuration configuration){
private void mapScan(Collector collector, Configuration configuration){
Scan scan = configuration.getScan();
// Map continuous dimension
@@ -495,6 +490,10 @@ public class Acquisition {
this.manipulations.add(manipulation);
}
}
// TODO Remove this workaround
// Revert queues to match sequence
Collections.reverse(collector.getQueues());
}
/**

View File

@@ -22,9 +22,10 @@ package ch.psi.fda.aq;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Logger;
import com.google.common.eventbus.EventBus;
import ch.psi.fda.core.messages.ComponentMetadata;
import ch.psi.fda.core.messages.DataMessage;
import ch.psi.fda.core.messages.DataMessageMetadata;
@@ -53,14 +54,14 @@ public class Collector implements Runnable{
/**
* Outgoing queue of this collector
*/
private BlockingQueue<Message> outQueue;
private EventBus bus;
/**
* Constructor
*/
public Collector(){
public Collector(EventBus b){
queues = new ArrayList<DataQueue>();
outQueue = new LinkedBlockingQueue<Message>(1000); // Create bounded queue to prevent running out of memory ...
this.bus = b;
}
/* (non-Javadoc)
@@ -81,14 +82,8 @@ public class Collector implements Runnable{
// No queue registered for reading
}
try {
bus.post(new EndOfStreamMessage());
outQueue.put(new EndOfStreamMessage());
} catch (InterruptedException e) {
// TODO Stop loop and exit logic instead of throwing an Exception
throw new RuntimeException("Unable to terminate stream with and End of Stream Message",e);
}
logger.info("END");
@@ -115,7 +110,7 @@ public class Collector implements Runnable{
}
else{
// Write message to outgoing queue
outQueue.put(dm);
bus.post(dm);
}
// Read next message
@@ -126,7 +121,7 @@ public class Collector implements Runnable{
// Translate EndOfStream to StreamDelimiter message
StreamDelimiterMessage ddm = new StreamDelimiterMessage(queues.size()-1-index, ((EndOfStreamMessage)message).isIflag());
// Write message to outgoing queue
outQueue.put(ddm);
bus.post(ddm);
}
}
@@ -140,14 +135,10 @@ public class Collector implements Runnable{
/**
* Get the outgoing data queue.
* Attention, only call this method after all ingoing queues were registered! Otherwise the data returned
* by this method is not accurate.
* @return output queue of collector
* Get the outgoing data metadata
*/
public DataQueue getOutQueue(){
public DataMessageMetadata getMetadata(){
DataMessageMetadata dataMessageMetadata = new DataMessageMetadata();
dataMessageMetadata.getComponents();
// Generate new combined metadata and add dimension information to the components
int nq = queues.size();
@@ -158,8 +149,7 @@ public class Collector implements Runnable{
dataMessageMetadata.getComponents().add(new ComponentMetadata(cm.getId(), nq-i-1));
}
}
return(new DataQueue(outQueue, dataMessageMetadata));
return(dataMessageMetadata);
}
}

View File

@@ -20,87 +20,55 @@
package ch.psi.fda.core.manipulator;
import java.util.List;
import com.google.common.eventbus.EventBus;
import com.google.common.eventbus.Subscribe;
import ch.psi.fda.core.messages.ComponentMetadata;
import ch.psi.fda.core.messages.DataMessage;
import ch.psi.fda.core.messages.DataMessageMetadata;
import ch.psi.fda.core.messages.DataQueue;
import ch.psi.fda.core.messages.EndOfStreamMessage;
import ch.psi.fda.core.messages.Message;
/**
* @author ebner
*
* Applies manipulations to the data stream
*/
public class Manipulator implements Runnable{
public class Manipulator {
private EventBus bus;
private DataMessageMetadata metadata;
private final DataQueue queue;
private final List<Manipulation> manipulations;
/**
* Constructor
* @param queue
* @param manipulations
*/
// TODO need to support multiple (a list of) manipulation(s)
public Manipulator(EventBus b, DataQueue queue, List<Manipulation> manipulations){
public Manipulator(EventBus b, DataMessageMetadata meta, List<Manipulation> manipulations){
this.bus = b;
this.manipulations = manipulations;
this.queue = queue;
// Create outgoing data metadata
metadata = queue.getDataMessageMetadata().clone();
this.metadata = meta.clone();
// Initialize manipulations and create outgoing metadata
for(Manipulation manipulation: this.manipulations){
manipulation.initialize(metadata);
manipulation.initialize(this.metadata);
// Add manipulation id to metadata
metadata.getComponents().add(new ComponentMetadata(manipulation.getId(),0)); // Calculated component always belongs to lowes dimension
this.metadata.getComponents().add(new ComponentMetadata(manipulation.getId(),0)); // Calculated component always belongs to lowes dimension
}
}
@Subscribe
public void onMessage(Message message){
if(message instanceof DataMessage){
DataMessage dm = (DataMessage) message;
for(Manipulation manipulation: manipulations){
dm.getData().add(manipulation.execute(dm));
}
}
bus.post(message);
}
public DataMessageMetadata getMetadata() {
return metadata;
}
/* (non-Javadoc)
* @see java.lang.Runnable#run()
*/
@Override
public void run() {
try{
// Dispatch Messages
Message message = queue.getQueue().take();
while(!(message instanceof EndOfStreamMessage)){
if(message instanceof DataMessage){
DataMessage dm = (DataMessage) message;
for(Manipulation manipulation: manipulations){
dm.getData().add(manipulation.execute(dm));
}
}
bus.post(message);
// Read next message
message = queue.getQueue().take();
}
// Write end of stream message
bus.post(message);
} catch (InterruptedException e) {
// TODO Stop loop and exit logic instead of throwing an Exception
throw new RuntimeException("Data manipulator was interrupted while writing data to file",e);
}
}
}

View File

@@ -23,13 +23,15 @@ import static org.junit.Assert.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import com.google.common.eventbus.EventBus;
import com.google.common.eventbus.Subscribe;
import ch.psi.fda.aq.Collector;
import ch.psi.fda.core.messages.ComponentMetadata;
import ch.psi.fda.core.messages.ControlMessage;
@@ -39,43 +41,10 @@ import ch.psi.fda.core.messages.DataQueue;
import ch.psi.fda.core.messages.EndOfStreamMessage;
import ch.psi.fda.core.messages.Message;
/**
* @author ebner
*
*/
public class CollectorTest {
// Get Logger
private static Logger logger = Logger.getLogger(CollectorTest.class.getName());
class TestCollector implements Runnable{
private final DataQueue queue;
public TestCollector(DataQueue queue){
this.queue = queue;
}
/* (non-Javadoc)
* @see java.lang.Runnable#run()
*/
@Override
public void run() {
try {
while(true){
Message m = queue.getQueue().take();
if(m instanceof DataMessage){
DataMessage x = (DataMessage) m;
logger.fine( x.toString() );
}
else if(m instanceof ControlMessage){
logger.fine("---- "+m.toString()+" ----");
}
}
} catch (InterruptedException e) {
logger.log(Level.SEVERE, "An Exception occured while reading data from the data queue", e);
}
}
}
private EventBus bus;
private BlockingQueue<Message> q1;
private BlockingQueue<Message> q2;
@@ -85,11 +54,12 @@ public class CollectorTest {
private DataMessageMetadata m2;
private DataMessageMetadata m3;
/**
* @throws java.lang.Exception
*/
@Before
public void setUp() throws Exception {
bus = new EventBus();
// Create blocking queues
q1 = new LinkedBlockingQueue<Message>();
q2 = new LinkedBlockingQueue<Message>();
@@ -190,9 +160,6 @@ public class CollectorTest {
}
/**
* @throws java.lang.Exception
*/
@After
public void tearDown() throws Exception {
}
@@ -203,17 +170,14 @@ public class CollectorTest {
*/
@Test
public void testRun() throws InterruptedException {
Collector collector = new Collector();
Collector collector = new Collector(bus);
collector.getQueues().add(new DataQueue(q1, m1));
collector.getQueues().add(new DataQueue(q2, m2));
collector.getQueues().add(new DataQueue(q3, m3));
Thread t = new Thread(new TestCollector(collector.getOutQueue()));
t.start();
// Check component metadata of output queue
int c=2;
for(ComponentMetadata cm: collector.getOutQueue().getDataMessageMetadata().getComponents()){
for(ComponentMetadata cm: collector.getMetadata().getComponents()){
logger.info(cm.toString());
if(cm.getDimension() != c){
fail("Dimension number does not match required dimension number");
@@ -224,13 +188,22 @@ public class CollectorTest {
c--;
}
collector.run();
// Wait some time to ensure that collector was able to finish processing
Thread.sleep(2000);
// Execute collector via the ExecutorService framework
// ExecutorService executor = Executors.newCachedThreadPool();
// executor.execute(collector);
// check wether messages arrive
bus.register(new Object(){
@Subscribe
public void onMessage(Message m){
if(m instanceof DataMessage){
DataMessage x = (DataMessage) m;
logger.info( x.toString() );
}
else if(m instanceof ControlMessage){
logger.info("---- "+m.toString()+" ----");
}
}
});
collector.run();
}
}

View File

@@ -24,7 +24,6 @@ import gov.aps.jca.CAException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Logger;
import org.junit.After;
@@ -42,7 +41,6 @@ import ch.psi.fda.core.manipulator.Manipulator;
import ch.psi.fda.core.messages.ComponentMetadata;
import ch.psi.fda.core.messages.DataMessage;
import ch.psi.fda.core.messages.DataMessageMetadata;
import ch.psi.fda.core.messages.DataQueue;
import ch.psi.fda.core.messages.EndOfStreamMessage;
import ch.psi.fda.core.messages.Message;
import ch.psi.fda.core.scripting.JythonParameterMapping;
@@ -51,13 +49,8 @@ import ch.psi.fda.core.scripting.JythonParameterMappingID;
import ch.psi.jcae.ChannelBean;
import ch.psi.jcae.ChannelBeanFactory;
/**
* @author ebner
*
*/
public class ManipulatorTest {
// Get Logger
private static Logger logger = Logger.getLogger(ManipulatorTest.class.getName());
private EventBus bus;
@@ -83,7 +76,6 @@ public class ManipulatorTest {
@Test(expected=IllegalArgumentException.class)
public void testConstructor() {
DataMessageMetadata dmm = new DataMessageMetadata();
DataQueue inQueue = new DataQueue(new LinkedBlockingQueue<Message>(), dmm);
String id="computedId";
String script = "import math\ndef process(o):\n return math.cos(10.0) + math.sin(o)";
@@ -95,21 +87,14 @@ public class ManipulatorTest {
// id "myid" which is expected in the mapping
List<Manipulation> manipulations = new ArrayList<Manipulation>();
manipulations.add(manipulation);
new Manipulator(bus, inQueue, manipulations);
new Manipulator(bus, dmm, manipulations);
}
@Test
public void testConstructorNoMappingNoParam() {
DataMessageMetadata dmm = new DataMessageMetadata();
DataQueue inQueue = new DataQueue(new LinkedBlockingQueue<Message>(), dmm);
DataMessage m = new DataMessage();
m.getData().add(10d);
m.getData().add(0.2d);
inQueue.getQueue().add(m);
inQueue.getQueue().add(new EndOfStreamMessage());
String id="cid";
String script = "import math\ndef process():\n return 0.0";
List<JythonParameterMapping> mapping = new ArrayList<JythonParameterMapping>();
@@ -118,7 +103,7 @@ public class ManipulatorTest {
List<Manipulation> manipulations = new ArrayList<Manipulation>();
manipulations.add(manipulation);
new Manipulator(bus, inQueue, manipulations);
new Manipulator(bus, dmm, manipulations);
// Expect IllegalArgument Exception as there is no mapping for the parameter c
}
@@ -127,14 +112,7 @@ public class ManipulatorTest {
DataMessageMetadata dmm = new DataMessageMetadata();
dmm.getComponents().add(new ComponentMetadata("myid"));
dmm.getComponents().add(new ComponentMetadata("myid2"));
DataQueue inQueue = new DataQueue(new LinkedBlockingQueue<Message>(), dmm);
DataMessage m = new DataMessage();
m.getData().add(10d);
m.getData().add(0.2d);
inQueue.getQueue().add(m);
inQueue.getQueue().add(new EndOfStreamMessage());
String id="cid";
String script = "import math\ndef process():\n return 0.0";
List<JythonParameterMapping> mapping = new ArrayList<JythonParameterMapping>();
@@ -143,7 +121,7 @@ public class ManipulatorTest {
List<Manipulation> manipulations = new ArrayList<Manipulation>();
manipulations.add(manipulation);
new Manipulator(bus, inQueue, manipulations);
new Manipulator(bus, dmm, manipulations);
// Expect IllegalArgument Exception as there is no mapping for the parameter c
}
@@ -155,14 +133,7 @@ public class ManipulatorTest {
DataMessageMetadata dmm = new DataMessageMetadata();
dmm.getComponents().add(new ComponentMetadata("myid"));
dmm.getComponents().add(new ComponentMetadata("myid2"));
DataQueue inQueue = new DataQueue(new LinkedBlockingQueue<Message>(), dmm);
DataMessage m = new DataMessage();
m.getData().add(10d);
m.getData().add(0.2d);
inQueue.getQueue().add(m);
inQueue.getQueue().add(new EndOfStreamMessage());
String id="cid";
String script = "import math\ndef process(o ,c):\n return math.cos(c) + math.sin(o)";
List<JythonParameterMapping> mapping = new ArrayList<JythonParameterMapping>();
@@ -171,7 +142,7 @@ public class ManipulatorTest {
List<Manipulation> manipulations = new ArrayList<Manipulation>();
manipulations.add(manipulation);
new Manipulator(bus, inQueue, manipulations);
new Manipulator(bus, dmm, manipulations);
// Expect IllegalArgument Exception as there is no mapping for the parameter c
}
@@ -183,13 +154,7 @@ public class ManipulatorTest {
public void testRun() throws InterruptedException {
DataMessageMetadata dmm = new DataMessageMetadata();
dmm.getComponents().add(new ComponentMetadata("myid"));
DataQueue inQueue = new DataQueue(new LinkedBlockingQueue<Message>(), dmm);
DataMessage m = new DataMessage();
m.getData().add(10d);
inQueue.getQueue().add(m);
inQueue.getQueue().add(new EndOfStreamMessage());
String id="cid";
String script = "import math\ndef process(o):\n return math.cos(10.0) + math.sin(o)";
List<JythonParameterMapping> mapping = new ArrayList<JythonParameterMapping>();
@@ -198,7 +163,7 @@ public class ManipulatorTest {
List<Manipulation> manipulations = new ArrayList<Manipulation>();
manipulations.add(manipulation);
Manipulator manipulator = new Manipulator(bus, inQueue, manipulations);
Manipulator manipulator = new Manipulator(bus, dmm, manipulations);
// Check whether output queue message structur complies to expected one
DataMessageMetadata outMeta = manipulator.getMetadata();
@@ -233,15 +198,14 @@ public class ManipulatorTest {
}
}
});
manipulator.run();
// Message message = manipulator.getOutQueue().getQueue().take();
// while(!(message instanceof EndOfStreamMessage)){
//
//
//
// message = manipulator.getOutQueue().getQueue().take();
// }
EventBus b = new EventBus();
b.register(manipulator);
DataMessage m = new DataMessage();
m.getData().add(10d);
b.post(m);
b.post(new EndOfStreamMessage());
logger.info(""+(Math.cos(10.0)+Math.sin(10)));
}
@@ -254,12 +218,7 @@ public class ManipulatorTest {
public void testRunIntegerReturn() throws InterruptedException {
DataMessageMetadata dmm = new DataMessageMetadata();
dmm.getComponents().add(new ComponentMetadata("myid"));
DataQueue inQueue = new DataQueue(new LinkedBlockingQueue<Message>(), dmm);
DataMessage m = new DataMessage();
m.getData().add(10d);
inQueue.getQueue().add(m);
inQueue.getQueue().add(new EndOfStreamMessage());
String id="cid";
String script = "import math\ndef process(o):\n return 1";
@@ -269,7 +228,7 @@ public class ManipulatorTest {
List<Manipulation> manipulations = new ArrayList<Manipulation>();
manipulations.add(manipulation);
Manipulator manipulator = new Manipulator(bus, inQueue, manipulations);
Manipulator manipulator = new Manipulator(bus, dmm, manipulations);
// Check whether output queue message structur complies to expected one
DataMessageMetadata outMeta = manipulator.getMetadata();
@@ -300,7 +259,14 @@ public class ManipulatorTest {
}
}
});
manipulator.run();
EventBus b = new EventBus();
b.register(manipulator);
DataMessage m = new DataMessage();
m.getData().add(10d);
b.post(m);
b.post(new EndOfStreamMessage());
}
/**
@@ -312,30 +278,6 @@ public class ManipulatorTest {
public void testRunLongTimeTest() throws InterruptedException {
DataMessageMetadata dmm = new DataMessageMetadata();
dmm.getComponents().add(new ComponentMetadata("myid"));
final DataQueue inQueue = new DataQueue(new LinkedBlockingQueue<Message>(1000), dmm);
Thread tf = new Thread(new Runnable() {
@Override
public void run() {
try{
for(Double i=0d;i<1000000;i++){
DataMessage m = new DataMessage();
m.getData().add(i);
inQueue.getQueue().put(m);
// try {
// Thread.sleep(1);
// } catch (InterruptedException e) {
// }
}
inQueue.getQueue().put(new EndOfStreamMessage());
}
catch(InterruptedException e){
e.printStackTrace();
}
}
});
String id="cid";
String script = "import math\ndef process(o):\n return math.cos(10.0) + math.sin(o)";
@@ -345,11 +287,7 @@ public class ManipulatorTest {
List<Manipulation> manipulations = new ArrayList<Manipulation>();
manipulations.add(manipulation);
Manipulator manipulator = new Manipulator(bus, inQueue, manipulations);
Thread t = new Thread(manipulator);
Manipulator manipulator = new Manipulator(bus, dmm, manipulations);
bus.register(new Object(){
int count=0;
@@ -363,13 +301,15 @@ public class ManipulatorTest {
}
});
tf.start();
t.start();
tf.join();
t.join();
EventBus b = new EventBus();
b.register(manipulator);
for(Double i=0d;i<1000000;i++){
DataMessage m = new DataMessage();
m.getData().add(i);
b.post(m);
}
b.post(new EndOfStreamMessage());
}
/**
@@ -381,13 +321,7 @@ public class ManipulatorTest {
DataMessageMetadata dmm = new DataMessageMetadata();
dmm.getComponents().add(new ComponentMetadata("myid"));
dmm.getComponents().add(new ComponentMetadata("myid2"));
DataQueue inQueue = new DataQueue(new LinkedBlockingQueue<Message>(), dmm);
DataMessage m = new DataMessage();
m.getData().add(10d);
m.getData().add(0.2d);
inQueue.getQueue().add(m);
inQueue.getQueue().add(new EndOfStreamMessage());
String id="cid";
String script = "import math\ndef process(o ,c):\n return math.cos(c) + math.sin(o)";
@@ -398,7 +332,7 @@ public class ManipulatorTest {
List<Manipulation> manipulations = new ArrayList<Manipulation>();
manipulations.add(manipulation);
Manipulator manipulator = new Manipulator(bus, inQueue, manipulations);
Manipulator manipulator = new Manipulator(bus, dmm, manipulations);
// Check whether output queue message structur complies to expected one
DataMessageMetadata outMeta = manipulator.getMetadata();
@@ -437,7 +371,16 @@ public class ManipulatorTest {
}
}
});
manipulator.run();
EventBus b = new EventBus();
b.register(manipulator);
DataMessage m = new DataMessage();
m.getData().add(10d);
m.getData().add(0.2d);
b.post(m);
b.post(new EndOfStreamMessage());
logger.info(""+(Math.cos(0.2)+Math.sin(10)));
@@ -459,16 +402,9 @@ public class ManipulatorTest {
DataMessageMetadata dmm = new DataMessageMetadata();
dmm.getComponents().add(new ComponentMetadata("myid"));
dmm.getComponents().add(new ComponentMetadata("myid2"));
DataQueue inQueue = new DataQueue(new LinkedBlockingQueue<Message>(), dmm);
DataMessage m = new DataMessage();
m.getData().add(10d);
m.getData().add(0.2d);
inQueue.getQueue().add(m);
inQueue.getQueue().add(new EndOfStreamMessage());
String id="cid";
String script = "import math\ndef process(o ,c,d):\n d.setValue("+setValue+")\n print d.getValue()\n return math.cos(c) + math.sin(o)";
String script = "import math\ndef process(o ,c,d):\n d.setValue("+setValue+")\n return math.cos(c) + math.sin(o)";
List<JythonParameterMapping> mapping = new ArrayList<JythonParameterMapping>();
mapping.add(new JythonParameterMappingID("o", "myid"));
mapping.add(new JythonParameterMappingID("c", "myid2"));
@@ -477,7 +413,7 @@ public class ManipulatorTest {
List<Manipulation> manipulations = new ArrayList<Manipulation>();
manipulations.add(manipulation);
Manipulator manipulator = new Manipulator(bus, inQueue, manipulations);
Manipulator manipulator = new Manipulator(bus, dmm, manipulations);
// Check whether output queue message structur complies to expected one
DataMessageMetadata outMeta = manipulator.getMetadata();
@@ -520,7 +456,14 @@ public class ManipulatorTest {
}
});
manipulator.run();
EventBus b = new EventBus();
b.register(manipulator);
DataMessage m = new DataMessage();
m.getData().add(10d);
m.getData().add(0.2d);
b.post(m);
b.post(new EndOfStreamMessage());
logger.info(""+(Math.cos(0.2)+Math.sin(10)));