diff --git a/ch.psi.fda/src/main/java/ch/psi/fda/aq/Acquisition.java b/ch.psi.fda/src/main/java/ch/psi/fda/aq/Acquisition.java index 36da3e2..df10202 100644 --- a/ch.psi.fda/src/main/java/ch/psi/fda/aq/Acquisition.java +++ b/ch.psi.fda/src/main/java/ch/psi/fda/aq/Acquisition.java @@ -30,12 +30,14 @@ import java.util.Collections; import java.util.Date; import java.util.HashMap; import java.util.List; +import java.util.concurrent.Executors; import java.util.logging.FileHandler; import java.util.logging.Handler; import java.util.logging.Level; import java.util.logging.Logger; import java.util.logging.SimpleFormatter; +import com.google.common.eventbus.AsyncEventBus; import com.google.common.eventbus.EventBus; import ch.psi.fda.core.ActionLoop; @@ -126,13 +128,11 @@ import ch.psi.jcae.ChannelBeanFactory; */ public class Acquisition { - // Get Logger private static Logger logger = Logger.getLogger(Acquisition.class.getName()); private AcquisitionConfiguration configuration; private ActionLoop actionLoop; - private Collector collector; private Manipulator manipulator; private DataSerializerTXT serializer; @@ -143,17 +143,16 @@ public class Acquisition { private Handler logHandler = null; + private Collector col; + /** * Name of the datafile */ private File datafile; -// private Thread acquisitionThread = null; - public Acquisition(){ configuration = AcquisitionConfiguration.getInstance(); actionLoop = null; - collector = new Collector(); manipulations = new ArrayList(); } @@ -236,18 +235,20 @@ public class Acquisition { logger.fine("Map Model to internal logic"); + + EventBus b = new AsyncEventBus(Executors.newCachedThreadPool()); // Map scan to base model // After this call actionLoop and collector will be initialized - mapScan(smodel); - + Collector collector = new Collector(b); + mapScan(collector, smodel); + col = collector; logger.fine("ActionLoop and Collector initialized"); - // TODO Remove this workaround - Collections.reverse(collector.getQueues()); + // Add manipulator into processing chain - this.manipulator = new Manipulator(bus, collector.getOutQueue(), this.manipulations); - + this.manipulator = new Manipulator(bus, collector.getMetadata(), this.manipulations); + b.register(this.manipulator); DataMessageMetadata metadata = manipulator.getMetadata(); @@ -268,12 +269,9 @@ public class Acquisition { try{ active = true; - Thread tc = new Thread(collector); + Thread tc = new Thread(col); tc.start(); - Thread tm = new Thread(manipulator); - tm.start(); - actionLoop.prepare(); actionLoop.execute(); actionLoop.cleanup(); @@ -283,7 +281,6 @@ public class Acquisition { // Give the threads 1 minute to catch up tc.join(60000); - tm.join(60000); // Send notifications out to all recipients that want to have success notifications try { @@ -292,8 +289,6 @@ public class Acquisition { } catch (UnknownHostException e1) { logger.log(Level.WARNING, "Unable to send notification", e1); } - -// active = false; } catch(RuntimeException e){ logger.log(Level.WARNING, "Execution failed: ", e); @@ -402,7 +397,7 @@ public class Acquisition { * Map scan to base model * @param scan */ - private void mapScan(Configuration configuration){ + private void mapScan(Collector collector, Configuration configuration){ Scan scan = configuration.getScan(); // Map continuous dimension @@ -495,6 +490,10 @@ public class Acquisition { this.manipulations.add(manipulation); } } + + // TODO Remove this workaround + // Revert queues to match sequence + Collections.reverse(collector.getQueues()); } /** diff --git a/ch.psi.fda/src/main/java/ch/psi/fda/aq/Collector.java b/ch.psi.fda/src/main/java/ch/psi/fda/aq/Collector.java index b0b23f8..24c6c70 100644 --- a/ch.psi.fda/src/main/java/ch/psi/fda/aq/Collector.java +++ b/ch.psi.fda/src/main/java/ch/psi/fda/aq/Collector.java @@ -22,9 +22,10 @@ package ch.psi.fda.aq; import java.util.ArrayList; import java.util.List; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; import java.util.logging.Logger; +import com.google.common.eventbus.EventBus; + import ch.psi.fda.core.messages.ComponentMetadata; import ch.psi.fda.core.messages.DataMessage; import ch.psi.fda.core.messages.DataMessageMetadata; @@ -53,14 +54,14 @@ public class Collector implements Runnable{ /** * Outgoing queue of this collector */ - private BlockingQueue outQueue; + private EventBus bus; /** * Constructor */ - public Collector(){ + public Collector(EventBus b){ queues = new ArrayList(); - outQueue = new LinkedBlockingQueue(1000); // Create bounded queue to prevent running out of memory ... + this.bus = b; } /* (non-Javadoc) @@ -81,14 +82,8 @@ public class Collector implements Runnable{ // No queue registered for reading } - try { + bus.post(new EndOfStreamMessage()); - outQueue.put(new EndOfStreamMessage()); - - } catch (InterruptedException e) { - // TODO Stop loop and exit logic instead of throwing an Exception - throw new RuntimeException("Unable to terminate stream with and End of Stream Message",e); - } logger.info("END"); @@ -115,7 +110,7 @@ public class Collector implements Runnable{ } else{ // Write message to outgoing queue - outQueue.put(dm); + bus.post(dm); } // Read next message @@ -126,7 +121,7 @@ public class Collector implements Runnable{ // Translate EndOfStream to StreamDelimiter message StreamDelimiterMessage ddm = new StreamDelimiterMessage(queues.size()-1-index, ((EndOfStreamMessage)message).isIflag()); // Write message to outgoing queue - outQueue.put(ddm); + bus.post(ddm); } } @@ -140,14 +135,10 @@ public class Collector implements Runnable{ /** - * Get the outgoing data queue. - * Attention, only call this method after all ingoing queues were registered! Otherwise the data returned - * by this method is not accurate. - * @return output queue of collector + * Get the outgoing data metadata */ - public DataQueue getOutQueue(){ + public DataMessageMetadata getMetadata(){ DataMessageMetadata dataMessageMetadata = new DataMessageMetadata(); - dataMessageMetadata.getComponents(); // Generate new combined metadata and add dimension information to the components int nq = queues.size(); @@ -158,8 +149,7 @@ public class Collector implements Runnable{ dataMessageMetadata.getComponents().add(new ComponentMetadata(cm.getId(), nq-i-1)); } } - - return(new DataQueue(outQueue, dataMessageMetadata)); + return(dataMessageMetadata); } } diff --git a/ch.psi.fda/src/main/java/ch/psi/fda/core/manipulator/Manipulator.java b/ch.psi.fda/src/main/java/ch/psi/fda/core/manipulator/Manipulator.java index 8315546..bb21467 100644 --- a/ch.psi.fda/src/main/java/ch/psi/fda/core/manipulator/Manipulator.java +++ b/ch.psi.fda/src/main/java/ch/psi/fda/core/manipulator/Manipulator.java @@ -20,87 +20,55 @@ package ch.psi.fda.core.manipulator; import java.util.List; + import com.google.common.eventbus.EventBus; +import com.google.common.eventbus.Subscribe; import ch.psi.fda.core.messages.ComponentMetadata; import ch.psi.fda.core.messages.DataMessage; import ch.psi.fda.core.messages.DataMessageMetadata; -import ch.psi.fda.core.messages.DataQueue; -import ch.psi.fda.core.messages.EndOfStreamMessage; import ch.psi.fda.core.messages.Message; /** - * @author ebner - * + * Applies manipulations to the data stream */ -public class Manipulator implements Runnable{ +public class Manipulator { private EventBus bus; private DataMessageMetadata metadata; - private final DataQueue queue; private final List manipulations; - /** - * Constructor - * @param queue - * @param manipulations - */ - // TODO need to support multiple (a list of) manipulation(s) - public Manipulator(EventBus b, DataQueue queue, List manipulations){ + public Manipulator(EventBus b, DataMessageMetadata meta, List manipulations){ this.bus = b; this.manipulations = manipulations; - this.queue = queue; // Create outgoing data metadata - metadata = queue.getDataMessageMetadata().clone(); + this.metadata = meta.clone(); // Initialize manipulations and create outgoing metadata for(Manipulation manipulation: this.manipulations){ - manipulation.initialize(metadata); + manipulation.initialize(this.metadata); // Add manipulation id to metadata - metadata.getComponents().add(new ComponentMetadata(manipulation.getId(),0)); // Calculated component always belongs to lowes dimension + this.metadata.getComponents().add(new ComponentMetadata(manipulation.getId(),0)); // Calculated component always belongs to lowes dimension } } + @Subscribe + public void onMessage(Message message){ + if(message instanceof DataMessage){ + DataMessage dm = (DataMessage) message; + + for(Manipulation manipulation: manipulations){ + dm.getData().add(manipulation.execute(dm)); + } + } + bus.post(message); + } + public DataMessageMetadata getMetadata() { return metadata; } - - /* (non-Javadoc) - * @see java.lang.Runnable#run() - */ - @Override - public void run() { - try{ - - // Dispatch Messages - Message message = queue.getQueue().take(); - while(!(message instanceof EndOfStreamMessage)){ - if(message instanceof DataMessage){ - DataMessage dm = (DataMessage) message; - - for(Manipulation manipulation: manipulations){ - dm.getData().add(manipulation.execute(dm)); - } - } - - bus.post(message); - - // Read next message - message = queue.getQueue().take(); - } - - // Write end of stream message - bus.post(message); - - - } catch (InterruptedException e) { - // TODO Stop loop and exit logic instead of throwing an Exception - throw new RuntimeException("Data manipulator was interrupted while writing data to file",e); - } - } - } diff --git a/ch.psi.fda/src/test/java/ch/psi/fda/aq/CollectorTest.java b/ch.psi.fda/src/test/java/ch/psi/fda/aq/CollectorTest.java index b806808..8f05ddd 100644 --- a/ch.psi.fda/src/test/java/ch/psi/fda/aq/CollectorTest.java +++ b/ch.psi.fda/src/test/java/ch/psi/fda/aq/CollectorTest.java @@ -23,13 +23,15 @@ import static org.junit.Assert.*; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; -import java.util.logging.Level; import java.util.logging.Logger; import org.junit.After; import org.junit.Before; import org.junit.Test; +import com.google.common.eventbus.EventBus; +import com.google.common.eventbus.Subscribe; + import ch.psi.fda.aq.Collector; import ch.psi.fda.core.messages.ComponentMetadata; import ch.psi.fda.core.messages.ControlMessage; @@ -39,43 +41,10 @@ import ch.psi.fda.core.messages.DataQueue; import ch.psi.fda.core.messages.EndOfStreamMessage; import ch.psi.fda.core.messages.Message; -/** - * @author ebner - * - */ public class CollectorTest { - // Get Logger private static Logger logger = Logger.getLogger(CollectorTest.class.getName()); - - class TestCollector implements Runnable{ - - private final DataQueue queue; - public TestCollector(DataQueue queue){ - this.queue = queue; - } - /* (non-Javadoc) - * @see java.lang.Runnable#run() - */ - @Override - public void run() { - try { - while(true){ - Message m = queue.getQueue().take(); - if(m instanceof DataMessage){ - DataMessage x = (DataMessage) m; - logger.fine( x.toString() ); - } - else if(m instanceof ControlMessage){ - logger.fine("---- "+m.toString()+" ----"); - } - } - } catch (InterruptedException e) { - logger.log(Level.SEVERE, "An Exception occured while reading data from the data queue", e); - } - } - - } + private EventBus bus; private BlockingQueue q1; private BlockingQueue q2; @@ -85,11 +54,12 @@ public class CollectorTest { private DataMessageMetadata m2; private DataMessageMetadata m3; - /** - * @throws java.lang.Exception - */ + @Before public void setUp() throws Exception { + bus = new EventBus(); + + // Create blocking queues q1 = new LinkedBlockingQueue(); q2 = new LinkedBlockingQueue(); @@ -190,9 +160,6 @@ public class CollectorTest { } - /** - * @throws java.lang.Exception - */ @After public void tearDown() throws Exception { } @@ -203,17 +170,14 @@ public class CollectorTest { */ @Test public void testRun() throws InterruptedException { - Collector collector = new Collector(); + Collector collector = new Collector(bus); collector.getQueues().add(new DataQueue(q1, m1)); collector.getQueues().add(new DataQueue(q2, m2)); collector.getQueues().add(new DataQueue(q3, m3)); - Thread t = new Thread(new TestCollector(collector.getOutQueue())); - t.start(); - // Check component metadata of output queue int c=2; - for(ComponentMetadata cm: collector.getOutQueue().getDataMessageMetadata().getComponents()){ + for(ComponentMetadata cm: collector.getMetadata().getComponents()){ logger.info(cm.toString()); if(cm.getDimension() != c){ fail("Dimension number does not match required dimension number"); @@ -224,13 +188,22 @@ public class CollectorTest { c--; } - collector.run(); - // Wait some time to ensure that collector was able to finish processing - Thread.sleep(2000); - // Execute collector via the ExecutorService framework -// ExecutorService executor = Executors.newCachedThreadPool(); -// executor.execute(collector); + // check wether messages arrive + bus.register(new Object(){ + @Subscribe + public void onMessage(Message m){ + if(m instanceof DataMessage){ + DataMessage x = (DataMessage) m; + logger.info( x.toString() ); + } + else if(m instanceof ControlMessage){ + logger.info("---- "+m.toString()+" ----"); + } + } + }); + + collector.run(); } } diff --git a/ch.psi.fda/src/test/java/ch/psi/fda/core/manipulator/ManipulatorTest.java b/ch.psi.fda/src/test/java/ch/psi/fda/core/manipulator/ManipulatorTest.java index fcfc26a..f58bb3d 100644 --- a/ch.psi.fda/src/test/java/ch/psi/fda/core/manipulator/ManipulatorTest.java +++ b/ch.psi.fda/src/test/java/ch/psi/fda/core/manipulator/ManipulatorTest.java @@ -24,7 +24,6 @@ import gov.aps.jca.CAException; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.LinkedBlockingQueue; import java.util.logging.Logger; import org.junit.After; @@ -42,7 +41,6 @@ import ch.psi.fda.core.manipulator.Manipulator; import ch.psi.fda.core.messages.ComponentMetadata; import ch.psi.fda.core.messages.DataMessage; import ch.psi.fda.core.messages.DataMessageMetadata; -import ch.psi.fda.core.messages.DataQueue; import ch.psi.fda.core.messages.EndOfStreamMessage; import ch.psi.fda.core.messages.Message; import ch.psi.fda.core.scripting.JythonParameterMapping; @@ -51,13 +49,8 @@ import ch.psi.fda.core.scripting.JythonParameterMappingID; import ch.psi.jcae.ChannelBean; import ch.psi.jcae.ChannelBeanFactory; -/** - * @author ebner - * - */ public class ManipulatorTest { - // Get Logger private static Logger logger = Logger.getLogger(ManipulatorTest.class.getName()); private EventBus bus; @@ -83,7 +76,6 @@ public class ManipulatorTest { @Test(expected=IllegalArgumentException.class) public void testConstructor() { DataMessageMetadata dmm = new DataMessageMetadata(); - DataQueue inQueue = new DataQueue(new LinkedBlockingQueue(), dmm); String id="computedId"; String script = "import math\ndef process(o):\n return math.cos(10.0) + math.sin(o)"; @@ -95,21 +87,14 @@ public class ManipulatorTest { // id "myid" which is expected in the mapping List manipulations = new ArrayList(); manipulations.add(manipulation); - new Manipulator(bus, inQueue, manipulations); + new Manipulator(bus, dmm, manipulations); } @Test public void testConstructorNoMappingNoParam() { DataMessageMetadata dmm = new DataMessageMetadata(); - DataQueue inQueue = new DataQueue(new LinkedBlockingQueue(), dmm); - DataMessage m = new DataMessage(); - m.getData().add(10d); - m.getData().add(0.2d); - inQueue.getQueue().add(m); - inQueue.getQueue().add(new EndOfStreamMessage()); - String id="cid"; String script = "import math\ndef process():\n return 0.0"; List mapping = new ArrayList(); @@ -118,7 +103,7 @@ public class ManipulatorTest { List manipulations = new ArrayList(); manipulations.add(manipulation); - new Manipulator(bus, inQueue, manipulations); + new Manipulator(bus, dmm, manipulations); // Expect IllegalArgument Exception as there is no mapping for the parameter c } @@ -127,14 +112,7 @@ public class ManipulatorTest { DataMessageMetadata dmm = new DataMessageMetadata(); dmm.getComponents().add(new ComponentMetadata("myid")); dmm.getComponents().add(new ComponentMetadata("myid2")); - DataQueue inQueue = new DataQueue(new LinkedBlockingQueue(), dmm); - DataMessage m = new DataMessage(); - m.getData().add(10d); - m.getData().add(0.2d); - inQueue.getQueue().add(m); - inQueue.getQueue().add(new EndOfStreamMessage()); - String id="cid"; String script = "import math\ndef process():\n return 0.0"; List mapping = new ArrayList(); @@ -143,7 +121,7 @@ public class ManipulatorTest { List manipulations = new ArrayList(); manipulations.add(manipulation); - new Manipulator(bus, inQueue, manipulations); + new Manipulator(bus, dmm, manipulations); // Expect IllegalArgument Exception as there is no mapping for the parameter c } @@ -155,14 +133,7 @@ public class ManipulatorTest { DataMessageMetadata dmm = new DataMessageMetadata(); dmm.getComponents().add(new ComponentMetadata("myid")); dmm.getComponents().add(new ComponentMetadata("myid2")); - DataQueue inQueue = new DataQueue(new LinkedBlockingQueue(), dmm); - DataMessage m = new DataMessage(); - m.getData().add(10d); - m.getData().add(0.2d); - inQueue.getQueue().add(m); - inQueue.getQueue().add(new EndOfStreamMessage()); - String id="cid"; String script = "import math\ndef process(o ,c):\n return math.cos(c) + math.sin(o)"; List mapping = new ArrayList(); @@ -171,7 +142,7 @@ public class ManipulatorTest { List manipulations = new ArrayList(); manipulations.add(manipulation); - new Manipulator(bus, inQueue, manipulations); + new Manipulator(bus, dmm, manipulations); // Expect IllegalArgument Exception as there is no mapping for the parameter c } @@ -183,13 +154,7 @@ public class ManipulatorTest { public void testRun() throws InterruptedException { DataMessageMetadata dmm = new DataMessageMetadata(); dmm.getComponents().add(new ComponentMetadata("myid")); - DataQueue inQueue = new DataQueue(new LinkedBlockingQueue(), dmm); - DataMessage m = new DataMessage(); - m.getData().add(10d); - inQueue.getQueue().add(m); - inQueue.getQueue().add(new EndOfStreamMessage()); - String id="cid"; String script = "import math\ndef process(o):\n return math.cos(10.0) + math.sin(o)"; List mapping = new ArrayList(); @@ -198,7 +163,7 @@ public class ManipulatorTest { List manipulations = new ArrayList(); manipulations.add(manipulation); - Manipulator manipulator = new Manipulator(bus, inQueue, manipulations); + Manipulator manipulator = new Manipulator(bus, dmm, manipulations); // Check whether output queue message structur complies to expected one DataMessageMetadata outMeta = manipulator.getMetadata(); @@ -233,15 +198,14 @@ public class ManipulatorTest { } } }); - manipulator.run(); -// Message message = manipulator.getOutQueue().getQueue().take(); -// while(!(message instanceof EndOfStreamMessage)){ -// -// -// -// message = manipulator.getOutQueue().getQueue().take(); -// } + EventBus b = new EventBus(); + b.register(manipulator); + + DataMessage m = new DataMessage(); + m.getData().add(10d); + b.post(m); + b.post(new EndOfStreamMessage()); logger.info(""+(Math.cos(10.0)+Math.sin(10))); } @@ -254,12 +218,7 @@ public class ManipulatorTest { public void testRunIntegerReturn() throws InterruptedException { DataMessageMetadata dmm = new DataMessageMetadata(); dmm.getComponents().add(new ComponentMetadata("myid")); - DataQueue inQueue = new DataQueue(new LinkedBlockingQueue(), dmm); - DataMessage m = new DataMessage(); - m.getData().add(10d); - inQueue.getQueue().add(m); - inQueue.getQueue().add(new EndOfStreamMessage()); String id="cid"; String script = "import math\ndef process(o):\n return 1"; @@ -269,7 +228,7 @@ public class ManipulatorTest { List manipulations = new ArrayList(); manipulations.add(manipulation); - Manipulator manipulator = new Manipulator(bus, inQueue, manipulations); + Manipulator manipulator = new Manipulator(bus, dmm, manipulations); // Check whether output queue message structur complies to expected one DataMessageMetadata outMeta = manipulator.getMetadata(); @@ -300,7 +259,14 @@ public class ManipulatorTest { } } }); - manipulator.run(); + + EventBus b = new EventBus(); + b.register(manipulator); + + DataMessage m = new DataMessage(); + m.getData().add(10d); + b.post(m); + b.post(new EndOfStreamMessage()); } /** @@ -312,30 +278,6 @@ public class ManipulatorTest { public void testRunLongTimeTest() throws InterruptedException { DataMessageMetadata dmm = new DataMessageMetadata(); dmm.getComponents().add(new ComponentMetadata("myid")); - final DataQueue inQueue = new DataQueue(new LinkedBlockingQueue(1000), dmm); - - Thread tf = new Thread(new Runnable() { - - @Override - public void run() { - try{ - for(Double i=0d;i<1000000;i++){ - DataMessage m = new DataMessage(); - m.getData().add(i); - inQueue.getQueue().put(m); - // try { - // Thread.sleep(1); - // } catch (InterruptedException e) { - // } - } - inQueue.getQueue().put(new EndOfStreamMessage()); - } - catch(InterruptedException e){ - e.printStackTrace(); - } - } - }); - String id="cid"; String script = "import math\ndef process(o):\n return math.cos(10.0) + math.sin(o)"; @@ -345,11 +287,7 @@ public class ManipulatorTest { List manipulations = new ArrayList(); manipulations.add(manipulation); - Manipulator manipulator = new Manipulator(bus, inQueue, manipulations); - - - - Thread t = new Thread(manipulator); + Manipulator manipulator = new Manipulator(bus, dmm, manipulations); bus.register(new Object(){ int count=0; @@ -363,13 +301,15 @@ public class ManipulatorTest { } }); - - - tf.start(); - t.start(); - - tf.join(); - t.join(); + + EventBus b = new EventBus(); + b.register(manipulator); + for(Double i=0d;i<1000000;i++){ + DataMessage m = new DataMessage(); + m.getData().add(i); + b.post(m); + } + b.post(new EndOfStreamMessage()); } /** @@ -381,13 +321,7 @@ public class ManipulatorTest { DataMessageMetadata dmm = new DataMessageMetadata(); dmm.getComponents().add(new ComponentMetadata("myid")); dmm.getComponents().add(new ComponentMetadata("myid2")); - DataQueue inQueue = new DataQueue(new LinkedBlockingQueue(), dmm); - - DataMessage m = new DataMessage(); - m.getData().add(10d); - m.getData().add(0.2d); - inQueue.getQueue().add(m); - inQueue.getQueue().add(new EndOfStreamMessage()); + String id="cid"; String script = "import math\ndef process(o ,c):\n return math.cos(c) + math.sin(o)"; @@ -398,7 +332,7 @@ public class ManipulatorTest { List manipulations = new ArrayList(); manipulations.add(manipulation); - Manipulator manipulator = new Manipulator(bus, inQueue, manipulations); + Manipulator manipulator = new Manipulator(bus, dmm, manipulations); // Check whether output queue message structur complies to expected one DataMessageMetadata outMeta = manipulator.getMetadata(); @@ -437,7 +371,16 @@ public class ManipulatorTest { } } }); - manipulator.run(); + + + EventBus b = new EventBus(); + b.register(manipulator); + + DataMessage m = new DataMessage(); + m.getData().add(10d); + m.getData().add(0.2d); + b.post(m); + b.post(new EndOfStreamMessage()); logger.info(""+(Math.cos(0.2)+Math.sin(10))); @@ -459,16 +402,9 @@ public class ManipulatorTest { DataMessageMetadata dmm = new DataMessageMetadata(); dmm.getComponents().add(new ComponentMetadata("myid")); dmm.getComponents().add(new ComponentMetadata("myid2")); - DataQueue inQueue = new DataQueue(new LinkedBlockingQueue(), dmm); - DataMessage m = new DataMessage(); - m.getData().add(10d); - m.getData().add(0.2d); - inQueue.getQueue().add(m); - inQueue.getQueue().add(new EndOfStreamMessage()); - String id="cid"; - String script = "import math\ndef process(o ,c,d):\n d.setValue("+setValue+")\n print d.getValue()\n return math.cos(c) + math.sin(o)"; + String script = "import math\ndef process(o ,c,d):\n d.setValue("+setValue+")\n return math.cos(c) + math.sin(o)"; List mapping = new ArrayList(); mapping.add(new JythonParameterMappingID("o", "myid")); mapping.add(new JythonParameterMappingID("c", "myid2")); @@ -477,7 +413,7 @@ public class ManipulatorTest { List manipulations = new ArrayList(); manipulations.add(manipulation); - Manipulator manipulator = new Manipulator(bus, inQueue, manipulations); + Manipulator manipulator = new Manipulator(bus, dmm, manipulations); // Check whether output queue message structur complies to expected one DataMessageMetadata outMeta = manipulator.getMetadata(); @@ -520,7 +456,14 @@ public class ManipulatorTest { } }); - manipulator.run(); + + EventBus b = new EventBus(); + b.register(manipulator); + DataMessage m = new DataMessage(); + m.getData().add(10d); + m.getData().add(0.2d); + b.post(m); + b.post(new EndOfStreamMessage()); logger.info(""+(Math.cos(0.2)+Math.sin(10)));