diff --git a/ch.psi.fda/src/main/java/ch/psi/fda/core/loops/cr/ParallelCrlogic.java b/ch.psi.fda/src/main/java/ch/psi/fda/core/loops/cr/ParallelCrlogic.java index 6c31bc2..ae5aaca 100644 --- a/ch.psi.fda/src/main/java/ch/psi/fda/core/loops/cr/ParallelCrlogic.java +++ b/ch.psi.fda/src/main/java/ch/psi/fda/core/loops/cr/ParallelCrlogic.java @@ -21,19 +21,24 @@ package ch.psi.fda.core.loops.cr; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; +import com.google.common.eventbus.EventBus; +import com.google.common.eventbus.Subscribe; + import ch.psi.fda.core.Action; import ch.psi.fda.core.ActionLoop; -import ch.psi.fda.core.messages.DataQueue; +import ch.psi.fda.core.messages.Message; public class ParallelCrlogic implements ActionLoop { @@ -60,6 +65,8 @@ public class ParallelCrlogic implements ActionLoop { private ParallelCrlogicStreamMerge merger; + private final EventBus eventbus; + public ParallelCrlogic(CrlogicLoop crlogic, ScrlogicLoop scrlogic){ if(crlogic==null){ @@ -69,6 +76,9 @@ public class ParallelCrlogic implements ActionLoop { throw new IllegalArgumentException("No Scrloop specified"); } + + this.eventbus = new EventBus(); + this.crlogic = crlogic; // Add timestamp to sensor at the beginning of the sensor list as this is required for merging the data // Timestamp will be at the second position of a message in the queue! @@ -79,7 +89,23 @@ public class ParallelCrlogic implements ActionLoop { this.preActions = new ArrayList(); this.postActions = new ArrayList(); - this.merger = new ParallelCrlogicStreamMerge(crlogic.getDataQueue(), scrlogic.getDataQueue()); + final BlockingQueue q1 = new LinkedBlockingQueue<>(); + final BlockingQueue q2 = new LinkedBlockingQueue<>(); + + crlogic.getEventBus().register(new Object(){ + @Subscribe + public void onMessage(Message m){ + q1.add(m); + } + }); + scrlogic.getEventBus().register(new Object(){ + @Subscribe + public void onMessage(Message m){ + q2.add(m); + } + }); + + this.merger = new ParallelCrlogicStreamMerge(q1, q2, eventbus); } @Override @@ -205,7 +231,7 @@ public class ParallelCrlogic implements ActionLoop { } @Override - public DataQueue getDataQueue() { - return(merger.getDataQueue()); + public EventBus getEventBus(){ + return eventbus; } } diff --git a/ch.psi.fda/src/main/java/ch/psi/fda/core/loops/cr/ParallelCrlogicStreamMerge.java b/ch.psi.fda/src/main/java/ch/psi/fda/core/loops/cr/ParallelCrlogicStreamMerge.java index 00ebc9f..84c5156 100644 --- a/ch.psi.fda/src/main/java/ch/psi/fda/core/loops/cr/ParallelCrlogicStreamMerge.java +++ b/ch.psi.fda/src/main/java/ch/psi/fda/core/loops/cr/ParallelCrlogicStreamMerge.java @@ -19,15 +19,16 @@ package ch.psi.fda.core.loops.cr; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; + +import com.google.common.eventbus.EventBus; import ch.psi.fda.core.messages.DataMessage; -import ch.psi.fda.core.messages.DataMessageMetadata; -import ch.psi.fda.core.messages.DataQueue; import ch.psi.fda.core.messages.EndOfStreamMessage; import ch.psi.fda.core.messages.Message; +import ch.psi.fda.core.messages.Metadata; /** * Class to merge two data streams into one. The secondary queues data is added to the primary queues data. @@ -36,20 +37,23 @@ import ch.psi.fda.core.messages.Message; */ public class ParallelCrlogicStreamMerge { - private BlockingQueue dataQueue; - private DataQueue primaryQueue; - private DataQueue secondaryQueue; + private BlockingQueue primaryQueue; + private BlockingQueue secondaryQueue; + + private final EventBus eventbus; + + private List metadata; /** * @param pqueue Primary/master queue * @param squeue Secondary queue */ - public ParallelCrlogicStreamMerge(DataQueue pqueue, DataQueue squeue){ + public ParallelCrlogicStreamMerge(BlockingQueue pqueue, BlockingQueue squeue, EventBus ebus){ this.primaryQueue = pqueue; this.secondaryQueue = squeue; - - this.dataQueue = new LinkedBlockingQueue(); + + this.eventbus = ebus; } /** @@ -58,11 +62,16 @@ public class ParallelCrlogicStreamMerge { */ public void merge() throws InterruptedException{ + metadata = new ArrayList<>(); + + boolean firstPrimary = true; + boolean firstSecondary = true; + // Actual data of the secondary queue List currData = null; // Take first element of the primary queue (wait until message is available) - Message m = primaryQueue.getQueue().take(); + Message m = primaryQueue.take(); while(! (m instanceof EndOfStreamMessage)){ @@ -70,6 +79,17 @@ public class ParallelCrlogicStreamMerge { DataMessage dm = (DataMessage) m; + if(firstPrimary){ + firstPrimary = false; + List pqm = dm.getMetadata(); + metadata.add(pqm.get(0)); // add first component (this is the actuator) + // Skip the next component as this is the timestamp used to merge the data + for(int i=2;i sqm = msCheck.getMetadata(); + // Skip first two components of the message as this is the timestamp + for(int i=2;i q: queues) { if(!q.isEmpty()){ - System.out.println("JU"); return true; } } @@ -211,9 +208,6 @@ public class ScrlogicLoop implements ActionLoop { * @throws InterruptedException */ private void merge() throws InterruptedException { - - System.out.println("MERGE"); - // Array to hold temporary channel values TimestampedValue[] cvalues = new TimestampedValue[queues.size()]; TimestampedValueComparator comparator = new TimestampedValueComparator(); @@ -223,9 +217,6 @@ public class ScrlogicLoop implements ActionLoop { List indexes = new ArrayList(); while (hasNext()) { - System.out.println("MERGE 2"); - -// semaphore.acquire(); Thread.sleep(10); // Ensure that close by monitors have time to // catch up / also ensure context switch diff --git a/ch.psi.fda/src/test/java/ch/psi/fda/core/loops/cr/ParallelCrlogicStreamMergeTest.java b/ch.psi.fda/src/test/java/ch/psi/fda/core/loops/cr/ParallelCrlogicStreamMergeTest.java index 0d6d251..cd4e12b 100644 --- a/ch.psi.fda/src/test/java/ch/psi/fda/core/loops/cr/ParallelCrlogicStreamMergeTest.java +++ b/ch.psi.fda/src/test/java/ch/psi/fda/core/loops/cr/ParallelCrlogicStreamMergeTest.java @@ -19,91 +19,91 @@ package ch.psi.fda.core.loops.cr; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; +import java.util.logging.Logger; import org.junit.After; import org.junit.Before; import org.junit.Test; -import ch.psi.fda.core.messages.ComponentMetadata; +import com.google.common.eventbus.EventBus; +import com.google.common.eventbus.Subscribe; + import ch.psi.fda.core.messages.DataMessage; -import ch.psi.fda.core.messages.DataMessageMetadata; -import ch.psi.fda.core.messages.DataQueue; import ch.psi.fda.core.messages.EndOfStreamMessage; import ch.psi.fda.core.messages.Message; +import ch.psi.fda.core.messages.Metadata; /** * @author ebner * */ public class ParallelCrlogicStreamMergeTest { - - /** - * @throws java.lang.Exception - */ + + private static final Logger logger = Logger.getLogger(ParallelCrlogicStreamMergeTest.class.getName()); + @Before public void setUp() throws Exception { } - /** - * @throws java.lang.Exception - */ @After public void tearDown() throws Exception { } - /** - * Test method for {@link ch.psi.fda.core.loops.cr.ParallelCrlogicStreamMerge#merge()}. - * @throws InterruptedException - */ @Test public void testMerge() throws InterruptedException { + List dmm = new ArrayList<>(); + dmm.add(new Metadata("cractuator")); + dmm.add(new Metadata("tstamp")); + BlockingQueue dataQueue = new LinkedBlockingQueue(); - DataMessage dm = new DataMessage(); + DataMessage dm = new DataMessage(dmm); dm.getData().add(0.0035d); dm.getData().add(10.000000123); dataQueue.add(dm); - dm = new DataMessage(); + dm = new DataMessage(dmm); dm.getData().add(0.015); dm.getData().add(10.000000143); dataQueue.add(dm); - dm = new DataMessage(); + dm = new DataMessage(dmm); dm.getData().add(0.026); dm.getData().add(10.000000163); dataQueue.add(dm); dataQueue.add(new EndOfStreamMessage()); - DataMessageMetadata dmm = new DataMessageMetadata(); - dmm.getComponents().add(new ComponentMetadata("cractuator")); - dmm.getComponents().add(new ComponentMetadata("tstamp")); - DataQueue dqueue = new DataQueue(dataQueue, dmm); + List dmm2 = new ArrayList<>(); + dmm2.add(new Metadata("milli")); + dmm2.add(new Metadata("nano")); + dmm2.add(new Metadata("sensor1")); BlockingQueue dataQueue2 = new LinkedBlockingQueue(); - DataMessage dm2 = new DataMessage(); + DataMessage dm2 = new DataMessage(dmm2); dm2.getData().add(9000d); dm2.getData().add(122d); dm2.getData().add(0.1d); dataQueue2.add(dm2); - dm2 = new DataMessage(); + dm2 = new DataMessage(dmm2); dm2.getData().add(10000d); dm2.getData().add(122d); dm2.getData().add(1d); dataQueue2.add(dm2); - dm2 = new DataMessage(); + dm2 = new DataMessage(dmm2); dm2.getData().add(10000d); dm2.getData().add(153d); dm2.getData().add(2d); dataQueue2.add(dm2); - dm2 = new DataMessage(); + dm2 = new DataMessage(dmm2); dm2.getData().add(10000d); dm2.getData().add(162d); dm2.getData().add(3d); @@ -111,26 +111,15 @@ public class ParallelCrlogicStreamMergeTest { dataQueue2.add(new EndOfStreamMessage()); - DataMessageMetadata dmm2 = new DataMessageMetadata(); - dmm2.getComponents().add(new ComponentMetadata("milli")); - dmm2.getComponents().add(new ComponentMetadata("nano")); - dmm2.getComponents().add(new ComponentMetadata("sensor1")); - - DataQueue dqueue2 = new DataQueue(dataQueue2, dmm2); - - - - ParallelCrlogicStreamMerge streamMerge = new ParallelCrlogicStreamMerge(dqueue, dqueue2); + EventBus b = new EventBus(); + b.register(new Object(){ + @Subscribe + public void onMessage(Message m){ + logger.info(m.toString()); + } + }); + ParallelCrlogicStreamMerge streamMerge = new ParallelCrlogicStreamMerge(dataQueue, dataQueue2, b); streamMerge.merge(); - - // Print merged queue - BlockingQueue queue = streamMerge.getDataQueue().getQueue(); - Message m = queue.take(); - while(! (m instanceof EndOfStreamMessage)){ - System.out.println(m.toString()); - m = queue.take(); - } - } } diff --git a/ch.psi.fda/src/test/java/ch/psi/fda/core/loops/cr/ParallelCrlogicTest.java b/ch.psi.fda/src/test/java/ch/psi/fda/core/loops/cr/ParallelCrlogicTest.java index 823b54c..9486f60 100644 --- a/ch.psi.fda/src/test/java/ch/psi/fda/core/loops/cr/ParallelCrlogicTest.java +++ b/ch.psi.fda/src/test/java/ch/psi/fda/core/loops/cr/ParallelCrlogicTest.java @@ -23,7 +23,6 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; import java.util.logging.Logger; @@ -32,8 +31,9 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; +import com.google.common.eventbus.Subscribe; + import ch.psi.fda.TestConfiguration; -import ch.psi.fda.core.messages.EndOfStreamMessage; import ch.psi.fda.core.messages.Message; import ch.psi.jcae.Channel; import ch.psi.jcae.ChannelDescriptor; @@ -104,6 +104,14 @@ public class ParallelCrlogicTest { ParallelCrlogic pcrlogic = new ParallelCrlogic(crlogic, scrlogic); + + pcrlogic.getEventBus().register(new Object(){ + @Subscribe + public void onMessage(Message m){ + logger.info(m.toString()); + } + }); + logger.info("Start scaler"); scalertemplate.getCommand().setValueNoWait(TemplateVSC16Scaler.Command.Count.ordinal()); @@ -114,13 +122,13 @@ public class ParallelCrlogicTest { logger.info("Stop scaler"); scalertemplate.getCommand().setValue(TemplateVSC16Scaler.Command.Done.ordinal()); - System.out.println("PARALLEL CRLOGIC data:"); - BlockingQueue queue = pcrlogic.getDataQueue().getQueue(); - Message m = queue.take(); - while(! (m instanceof EndOfStreamMessage)){ - System.out.println(m.toString()); - m = queue.take(); - } +// System.out.println("PARALLEL CRLOGIC data:"); +// BlockingQueue queue = pcrlogic.getDataQueue().getQueue(); +// Message m = queue.take(); +// while(! (m instanceof EndOfStreamMessage)){ +// System.out.println(m.toString()); +// m = queue.take(); +// } // Destroy scaler template cservice.destroyAnnotatedChannels(scalertemplate);