Probably fixed CRLOGIC thing as such ...

This commit is contained in:
2013-10-21 11:49:29 +02:00
parent 67e7f9a6c8
commit 51d7b36219
5 changed files with 128 additions and 109 deletions

View File

@@ -21,19 +21,24 @@ package ch.psi.fda.core.loops.cr;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import com.google.common.eventbus.EventBus;
import com.google.common.eventbus.Subscribe;
import ch.psi.fda.core.Action;
import ch.psi.fda.core.ActionLoop;
import ch.psi.fda.core.messages.DataQueue;
import ch.psi.fda.core.messages.Message;
public class ParallelCrlogic implements ActionLoop {
@@ -60,6 +65,8 @@ public class ParallelCrlogic implements ActionLoop {
private ParallelCrlogicStreamMerge merger;
private final EventBus eventbus;
public ParallelCrlogic(CrlogicLoop crlogic, ScrlogicLoop scrlogic){
if(crlogic==null){
@@ -69,6 +76,9 @@ public class ParallelCrlogic implements ActionLoop {
throw new IllegalArgumentException("No Scrloop specified");
}
this.eventbus = new EventBus();
this.crlogic = crlogic;
// Add timestamp to sensor at the beginning of the sensor list as this is required for merging the data
// Timestamp will be at the second position of a message in the queue!
@@ -79,7 +89,23 @@ public class ParallelCrlogic implements ActionLoop {
this.preActions = new ArrayList<Action>();
this.postActions = new ArrayList<Action>();
this.merger = new ParallelCrlogicStreamMerge(crlogic.getDataQueue(), scrlogic.getDataQueue());
final BlockingQueue<Message> q1 = new LinkedBlockingQueue<>();
final BlockingQueue<Message> q2 = new LinkedBlockingQueue<>();
crlogic.getEventBus().register(new Object(){
@Subscribe
public void onMessage(Message m){
q1.add(m);
}
});
scrlogic.getEventBus().register(new Object(){
@Subscribe
public void onMessage(Message m){
q2.add(m);
}
});
this.merger = new ParallelCrlogicStreamMerge(q1, q2, eventbus);
}
@Override
@@ -205,7 +231,7 @@ public class ParallelCrlogic implements ActionLoop {
}
@Override
public DataQueue getDataQueue() {
return(merger.getDataQueue());
public EventBus getEventBus(){
return eventbus;
}
}

View File

@@ -19,15 +19,16 @@
package ch.psi.fda.core.loops.cr;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import com.google.common.eventbus.EventBus;
import ch.psi.fda.core.messages.DataMessage;
import ch.psi.fda.core.messages.DataMessageMetadata;
import ch.psi.fda.core.messages.DataQueue;
import ch.psi.fda.core.messages.EndOfStreamMessage;
import ch.psi.fda.core.messages.Message;
import ch.psi.fda.core.messages.Metadata;
/**
* Class to merge two data streams into one. The secondary queues data is added to the primary queues data.
@@ -36,20 +37,23 @@ import ch.psi.fda.core.messages.Message;
*/
public class ParallelCrlogicStreamMerge {
private BlockingQueue<Message> dataQueue;
private DataQueue primaryQueue;
private DataQueue secondaryQueue;
private BlockingQueue<Message> primaryQueue;
private BlockingQueue<Message> secondaryQueue;
private final EventBus eventbus;
private List<Metadata> metadata;
/**
* @param pqueue Primary/master queue
* @param squeue Secondary queue
*/
public ParallelCrlogicStreamMerge(DataQueue pqueue, DataQueue squeue){
public ParallelCrlogicStreamMerge(BlockingQueue<Message> pqueue, BlockingQueue<Message> squeue, EventBus ebus){
this.primaryQueue = pqueue;
this.secondaryQueue = squeue;
this.dataQueue = new LinkedBlockingQueue<Message>();
this.eventbus = ebus;
}
/**
@@ -58,11 +62,16 @@ public class ParallelCrlogicStreamMerge {
*/
public void merge() throws InterruptedException{
metadata = new ArrayList<>();
boolean firstPrimary = true;
boolean firstSecondary = true;
// Actual data of the secondary queue
List<Object> currData = null;
// Take first element of the primary queue (wait until message is available)
Message m = primaryQueue.getQueue().take();
Message m = primaryQueue.take();
while(! (m instanceof EndOfStreamMessage)){
@@ -70,6 +79,17 @@ public class ParallelCrlogicStreamMerge {
DataMessage dm = (DataMessage) m;
if(firstPrimary){
firstPrimary = false;
List<Metadata> pqm = dm.getMetadata();
metadata.add(pqm.get(0)); // add first component (this is the actuator)
// Skip the next component as this is the timestamp used to merge the data
for(int i=2;i<pqm.size();i++){
metadata.add(pqm.get(i));
}
}
// Get and remove merge timestamp from the data of the message
Double timestamp = (Double) dm.getData().remove(1);
long milliseconds = (long) (timestamp*1000);
@@ -78,7 +98,7 @@ public class ParallelCrlogicStreamMerge {
while(true){
// Assumption: the secondary Queue holds at least the data up to the
// timestamp of the primary queue
Message mess = secondaryQueue.getQueue().peek();
Message mess = secondaryQueue.peek();
if(mess instanceof EndOfStreamMessage){
break;
@@ -89,10 +109,21 @@ public class ParallelCrlogicStreamMerge {
}
if(mess instanceof DataMessage){
// Check whether timestamp of the next message is bigger than the timestamp of the
// message from the primary queue - if the timestamp is bigger do not take message out of the queue
DataMessage msCheck = (DataMessage) mess;
if(firstSecondary){
firstSecondary = false;
List<Metadata> sqm = msCheck.getMetadata();
// Skip first two components of the message as this is the timestamp
for(int i=2;i<sqm.size();i++){
metadata.add(sqm.get(i));
}
}
long currMilliCheck = ((Double) msCheck.getData().get(0)).longValue();
long currNanoCheck = ((Double) msCheck.getData().get(1)).longValue();
@@ -100,7 +131,7 @@ public class ParallelCrlogicStreamMerge {
break;
}
DataMessage ms = (DataMessage) secondaryQueue.getQueue().take();
DataMessage ms = (DataMessage) secondaryQueue.take();
currData = ms.getData();
// Remove timestamps
@@ -115,44 +146,18 @@ public class ParallelCrlogicStreamMerge {
// Add data to primary data queue message and put it into the out queue
dm.getData().addAll(currData);
dataQueue.add(dm);
dm.setMetadata(metadata);
eventbus.post(dm);
}
m = primaryQueue.getQueue().take();
m = primaryQueue.take();
}
// Add the end of stream message of the primary queue
dataQueue.add(m);
eventbus.post(m);
// Clear all remaining messages in secondary queue
secondaryQueue.getQueue().clear();
secondaryQueue.clear();
}
/**
* The structure of the data message depends on the sensors registered at this loop
* at the time this method is called.
* @return the data queue and the metadata of the data messages
*/
public DataQueue getDataQueue() {
DataMessageMetadata m = new DataMessageMetadata();
DataMessageMetadata pqm = primaryQueue.getDataMessageMetadata();
m.getComponents().add(pqm.getComponents().get(0)); // add first component (this is the actuator)
// Skip the next component as this is the timestamp used to merge the data
for(int i=2;i<pqm.getComponents().size();i++){
m.getComponents().add(pqm.getComponents().get(i));
}
DataMessageMetadata sqm = secondaryQueue.getDataMessageMetadata();
// Skip first two components of the message as this is the timestamp
for(int i=2;i<sqm.getComponents().size();i++){
m.getComponents().add(sqm.getComponents().get(i));
}
return new DataQueue(dataQueue, m);
}
}

View File

@@ -102,7 +102,6 @@ public class ScrlogicLoop implements ActionLoop {
@Override
public void execute() throws InterruptedException {
System.out.println("Exec");
// Clear all queues
queues.clear();
latch = new CountDownLatch(1);
@@ -195,10 +194,8 @@ public class ScrlogicLoop implements ActionLoop {
}
private boolean hasNext(){
System.out.println("H "+queues.size());
for (BlockingQueue<TimestampedValue> q: queues) {
if(!q.isEmpty()){
System.out.println("JU");
return true;
}
}
@@ -211,9 +208,6 @@ public class ScrlogicLoop implements ActionLoop {
* @throws InterruptedException
*/
private void merge() throws InterruptedException {
System.out.println("MERGE");
// Array to hold temporary channel values
TimestampedValue[] cvalues = new TimestampedValue[queues.size()];
TimestampedValueComparator comparator = new TimestampedValueComparator();
@@ -223,9 +217,6 @@ public class ScrlogicLoop implements ActionLoop {
List<Integer> indexes = new ArrayList<Integer>();
while (hasNext()) {
System.out.println("MERGE 2");
// semaphore.acquire();
Thread.sleep(10); // Ensure that close by monitors have time to
// catch up / also ensure context switch

View File

@@ -19,91 +19,91 @@
package ch.psi.fda.core.loops.cr;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Logger;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import ch.psi.fda.core.messages.ComponentMetadata;
import com.google.common.eventbus.EventBus;
import com.google.common.eventbus.Subscribe;
import ch.psi.fda.core.messages.DataMessage;
import ch.psi.fda.core.messages.DataMessageMetadata;
import ch.psi.fda.core.messages.DataQueue;
import ch.psi.fda.core.messages.EndOfStreamMessage;
import ch.psi.fda.core.messages.Message;
import ch.psi.fda.core.messages.Metadata;
/**
* @author ebner
*
*/
public class ParallelCrlogicStreamMergeTest {
/**
* @throws java.lang.Exception
*/
private static final Logger logger = Logger.getLogger(ParallelCrlogicStreamMergeTest.class.getName());
@Before
public void setUp() throws Exception {
}
/**
* @throws java.lang.Exception
*/
@After
public void tearDown() throws Exception {
}
/**
* Test method for {@link ch.psi.fda.core.loops.cr.ParallelCrlogicStreamMerge#merge()}.
* @throws InterruptedException
*/
@Test
public void testMerge() throws InterruptedException {
List<Metadata> dmm = new ArrayList<>();
dmm.add(new Metadata("cractuator"));
dmm.add(new Metadata("tstamp"));
BlockingQueue<Message> dataQueue = new LinkedBlockingQueue<Message>();
DataMessage dm = new DataMessage();
DataMessage dm = new DataMessage(dmm);
dm.getData().add(0.0035d);
dm.getData().add(10.000000123);
dataQueue.add(dm);
dm = new DataMessage();
dm = new DataMessage(dmm);
dm.getData().add(0.015);
dm.getData().add(10.000000143);
dataQueue.add(dm);
dm = new DataMessage();
dm = new DataMessage(dmm);
dm.getData().add(0.026);
dm.getData().add(10.000000163);
dataQueue.add(dm);
dataQueue.add(new EndOfStreamMessage());
DataMessageMetadata dmm = new DataMessageMetadata();
dmm.getComponents().add(new ComponentMetadata("cractuator"));
dmm.getComponents().add(new ComponentMetadata("tstamp"));
DataQueue dqueue = new DataQueue(dataQueue, dmm);
List<Metadata> dmm2 = new ArrayList<>();
dmm2.add(new Metadata("milli"));
dmm2.add(new Metadata("nano"));
dmm2.add(new Metadata("sensor1"));
BlockingQueue<Message> dataQueue2 = new LinkedBlockingQueue<Message>();
DataMessage dm2 = new DataMessage();
DataMessage dm2 = new DataMessage(dmm2);
dm2.getData().add(9000d);
dm2.getData().add(122d);
dm2.getData().add(0.1d);
dataQueue2.add(dm2);
dm2 = new DataMessage();
dm2 = new DataMessage(dmm2);
dm2.getData().add(10000d);
dm2.getData().add(122d);
dm2.getData().add(1d);
dataQueue2.add(dm2);
dm2 = new DataMessage();
dm2 = new DataMessage(dmm2);
dm2.getData().add(10000d);
dm2.getData().add(153d);
dm2.getData().add(2d);
dataQueue2.add(dm2);
dm2 = new DataMessage();
dm2 = new DataMessage(dmm2);
dm2.getData().add(10000d);
dm2.getData().add(162d);
dm2.getData().add(3d);
@@ -111,26 +111,15 @@ public class ParallelCrlogicStreamMergeTest {
dataQueue2.add(new EndOfStreamMessage());
DataMessageMetadata dmm2 = new DataMessageMetadata();
dmm2.getComponents().add(new ComponentMetadata("milli"));
dmm2.getComponents().add(new ComponentMetadata("nano"));
dmm2.getComponents().add(new ComponentMetadata("sensor1"));
DataQueue dqueue2 = new DataQueue(dataQueue2, dmm2);
ParallelCrlogicStreamMerge streamMerge = new ParallelCrlogicStreamMerge(dqueue, dqueue2);
EventBus b = new EventBus();
b.register(new Object(){
@Subscribe
public void onMessage(Message m){
logger.info(m.toString());
}
});
ParallelCrlogicStreamMerge streamMerge = new ParallelCrlogicStreamMerge(dataQueue, dataQueue2, b);
streamMerge.merge();
// Print merged queue
BlockingQueue<Message> queue = streamMerge.getDataQueue().getQueue();
Message m = queue.take();
while(! (m instanceof EndOfStreamMessage)){
System.out.println(m.toString());
m = queue.take();
}
}
}

View File

@@ -23,7 +23,6 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.logging.Logger;
@@ -32,8 +31,9 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import com.google.common.eventbus.Subscribe;
import ch.psi.fda.TestConfiguration;
import ch.psi.fda.core.messages.EndOfStreamMessage;
import ch.psi.fda.core.messages.Message;
import ch.psi.jcae.Channel;
import ch.psi.jcae.ChannelDescriptor;
@@ -104,6 +104,14 @@ public class ParallelCrlogicTest {
ParallelCrlogic pcrlogic = new ParallelCrlogic(crlogic, scrlogic);
pcrlogic.getEventBus().register(new Object(){
@Subscribe
public void onMessage(Message m){
logger.info(m.toString());
}
});
logger.info("Start scaler");
scalertemplate.getCommand().setValueNoWait(TemplateVSC16Scaler.Command.Count.ordinal());
@@ -114,13 +122,13 @@ public class ParallelCrlogicTest {
logger.info("Stop scaler");
scalertemplate.getCommand().setValue(TemplateVSC16Scaler.Command.Done.ordinal());
System.out.println("PARALLEL CRLOGIC data:");
BlockingQueue<Message> queue = pcrlogic.getDataQueue().getQueue();
Message m = queue.take();
while(! (m instanceof EndOfStreamMessage)){
System.out.println(m.toString());
m = queue.take();
}
// System.out.println("PARALLEL CRLOGIC data:");
// BlockingQueue<Message> queue = pcrlogic.getDataQueue().getQueue();
// Message m = queue.take();
// while(! (m instanceof EndOfStreamMessage)){
// System.out.println(m.toString());
// m = queue.take();
// }
// Destroy scaler template
cservice.destroyAnnotatedChannels(scalertemplate);