From 552ff6870e3bf6045e8dac4106da8e8528218c06 Mon Sep 17 00:00:00 2001 From: ebner Date: Thu, 31 May 2012 09:59:46 +0200 Subject: [PATCH] Fixed problems with crlogic and readout of softchannels --- .../.settings/org.eclipse.jdt.core.prefs | 1 - ch.psi.fda/pom.xml | 37 +- .../fda/core/loops/cr/ParallelCrlogic.java | 26 +- .../loops/cr/ParallelCrlogicStreamMerge.java | 11 +- .../psi/fda/core/loops/cr/ScrlogicLoop.java | 389 +++++++++--------- .../core/loops/cr/ParallelCrlogicTest.java | 6 +- .../fda/core/loops/cr/ScrlogicLoopTest.java | 78 +++- 7 files changed, 331 insertions(+), 217 deletions(-) diff --git a/ch.psi.fda/.settings/org.eclipse.jdt.core.prefs b/ch.psi.fda/.settings/org.eclipse.jdt.core.prefs index 74b8c7b..4ede96d 100644 --- a/ch.psi.fda/.settings/org.eclipse.jdt.core.prefs +++ b/ch.psi.fda/.settings/org.eclipse.jdt.core.prefs @@ -1,3 +1,2 @@ -#Wed Oct 19 12:49:33 CEST 2011 eclipse.preferences.version=1 org.eclipse.jdt.core.compiler.problem.forbiddenReference=warning diff --git a/ch.psi.fda/pom.xml b/ch.psi.fda/pom.xml index 0647d11..029fa6b 100644 --- a/ch.psi.fda/pom.xml +++ b/ch.psi.fda/pom.xml @@ -3,7 +3,7 @@ 4.0.0 ch.psi fda - 1.1.32 + 1.1.34-SNAPSHOT @@ -143,6 +143,41 @@ + + + + + org.eclipse.m2e + lifecycle-mapping + 1.0.0 + + + + + + + org.jvnet.jaxb2.maven2 + + + maven-jaxb2-plugin + + + [0.8.0,) + + + generate + + + + + + + + + + + + diff --git a/ch.psi.fda/src/main/java/ch/psi/fda/core/loops/cr/ParallelCrlogic.java b/ch.psi.fda/src/main/java/ch/psi/fda/core/loops/cr/ParallelCrlogic.java index a116096..c867509 100644 --- a/ch.psi.fda/src/main/java/ch/psi/fda/core/loops/cr/ParallelCrlogic.java +++ b/ch.psi.fda/src/main/java/ch/psi/fda/core/loops/cr/ParallelCrlogic.java @@ -130,7 +130,7 @@ public class ParallelCrlogic implements ActionLoop { // Execute the logic of this path crlogic.execute(); - // Stop scrlogic + // Need to stop the scrlogic logic (otherwise it would keep going to take data) scrlogic.abort(); return true; }}); @@ -149,22 +149,22 @@ public class ParallelCrlogic implements ActionLoop { b.await(); // Execute the logic of this path - scrlogic.execute(); + scrlogic.execute(); // This actually just starts the collection ... return true; }}); list.add(f); - // Start data merge thread - logger.info("Start data merge"); - f = service.submit(new Callable(){ - @Override - public Boolean call() throws Exception { - - merger.merge(); - return true; - }}); - list.add(f); +// // Start data merge thread +// logger.info("Start data merge"); +// f = service.submit(new Callable(){ +// @Override +// public Boolean call() throws Exception { +// +// merger.merge(); +// return true; +// }}); +// list.add(f); for(Future bf: list){ @@ -177,6 +177,8 @@ public class ParallelCrlogic implements ActionLoop { } } + merger.merge(); + // Wait until all threads have finished service.shutdown(); service.awaitTermination(1, TimeUnit.MINUTES); diff --git a/ch.psi.fda/src/main/java/ch/psi/fda/core/loops/cr/ParallelCrlogicStreamMerge.java b/ch.psi.fda/src/main/java/ch/psi/fda/core/loops/cr/ParallelCrlogicStreamMerge.java index a9c49c3..e218a72 100644 --- a/ch.psi.fda/src/main/java/ch/psi/fda/core/loops/cr/ParallelCrlogicStreamMerge.java +++ b/ch.psi.fda/src/main/java/ch/psi/fda/core/loops/cr/ParallelCrlogicStreamMerge.java @@ -66,11 +66,13 @@ public class ParallelCrlogicStreamMerge { // Take first element of the primary queue (wait until message is available) Message m = primaryQueue.getQueue().take(); + while(! (m instanceof EndOfStreamMessage)){ if(m instanceof DataMessage){ DataMessage dm = (DataMessage) m; + // Get and remove merge timestamp from the data of the message Double timestamp = (Double) dm.getData().remove(1); long milliseconds = (long) (timestamp*1000); @@ -82,6 +84,11 @@ public class ParallelCrlogicStreamMerge { // Assumption: the secondary Queue holds at least the data up to the // timestamp of the primary queue Message mess = secondaryQueue.getQueue().peek(); +// Message mess = secondaryQueue.getQueue().take(); + + if(mess instanceof EndOfStreamMessage){ + break; + } if(mess == null){ break; @@ -118,7 +125,6 @@ public class ParallelCrlogicStreamMerge { } // Add data to primary data queue message and put it into the out queue -// System.out.println(currData); dm.getData().addAll(currData); dataQueue.add(dm); @@ -130,6 +136,9 @@ public class ParallelCrlogicStreamMerge { // Add the end of stream message of the primary queue dataQueue.add(m); + + // Clear all remaining messages in secondary queue + secondaryQueue.getQueue().clear(); } diff --git a/ch.psi.fda/src/main/java/ch/psi/fda/core/loops/cr/ScrlogicLoop.java b/ch.psi.fda/src/main/java/ch/psi/fda/core/loops/cr/ScrlogicLoop.java index f0d39d8..2e0ca3f 100644 --- a/ch.psi.fda/src/main/java/ch/psi/fda/core/loops/cr/ScrlogicLoop.java +++ b/ch.psi.fda/src/main/java/ch/psi/fda/core/loops/cr/ScrlogicLoop.java @@ -2,15 +2,15 @@ * * Copyright 2010 Paul Scherrer Institute. All rights reserved. * - * This code is free software: you can redistribute it and/or modify - * it under the terms of the GNU Lesser General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. + * This code is free software: you can redistribute it and/or modify it under + * the terms of the GNU Lesser General Public License as published by the Free + * Software Foundation, either version 3 of the License, or (at your option) any + * later version. * - * This code is distributed in the hope that it will be useful, - * but without any warranty; without even the implied warranty of - * merchantability or fitness for a particular purpose. See the - * GNU Lesser General Public License for more details. + * This code is distributed in the hope that it will be useful, but without any + * warranty; without even the implied warranty of merchantability or fitness for + * a particular purpose. See the GNU Lesser General Public License for more + * details. * * You should have received a copy of the GNU Lesser General Public License * along with this code. If not, see . @@ -26,8 +26,9 @@ import java.util.ArrayList; import java.util.Date; import java.util.List; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.Semaphore; +import java.util.logging.Level; import java.util.logging.Logger; import ch.psi.fda.core.Action; @@ -46,167 +47,169 @@ import ch.psi.jcae.MonitorListenerDoubleTimestamp; /** * @author ebner * - * Assumptions: - * - The delay between the monitor writing the value to the monitor queue and the readout of all the queues - * is sufficient to prevent the situation that some monitors of events close to each other on different IOC's have - * not arrived yet. - * - The sequence of monitors fired for one channel is according to the sequence of the causes. No monitor package is - * overtaking an other package on the network. + * Assumptions: - The delay between the monitor writing the value to the + * monitor queue and the readout of all the queues is sufficient to + * prevent the situation that some monitors of events close to each + * other on different IOC's have not arrived yet. - The sequence of + * monitors fired for one channel is according to the sequence of the + * causes. No monitor package is overtaking an other package on the + * network. * - * - No monitor events are lost on the network (while using monitors you cannot guarantee this) - * - * The data queue returned by this logic includes two items for the timestamp and nanoseconds offset. These two items are the - * first two items of a message - * The id's are: - * crTimestampMilliseconds - * crTimestampOffsetNanoseconds + * - No monitor events are lost on the network (while using monitors you + * cannot guarantee this) + * + * The data queue returned by this logic includes two items for the + * timestamp and nanoseconds offset. These two items are the first two + * items of a message The id's are: crTimestampMilliseconds + * crTimestampOffsetNanoseconds */ public class ScrlogicLoop implements ActionLoop { - + private static String ID_TIMESTAMP_MILLISECONDS = "crTimestampMilliseconds"; private static String ID_TIMESTAMP_OFFSET_NANOSECONDS = "crTimestampOffsetNanoseconds"; - + // Get Logger private static final Logger logger = Logger.getLogger(ScrlogicLoop.class.getName()); - - private static Semaphore semaphore = new Semaphore(0); - + + /** - * Data queue sensor data is posted to. A message consists of a list of data objects - * that are read out of the sensors of this loop. + * Data queue sensor data is posted to. A message consists of a list of data + * objects that are read out of the sensors of this loop. */ - private BlockingQueue dataQueue; - + private final BlockingQueue dataQueue = new LinkedBlockingQueue(); + private boolean dataGroup = false; - private List preActions; - private List postActions; - - private List sensors; - private List monitors; - - private List> queues; - - private Thread mergeThread = null; - - - public ScrlogicLoop(List sensors){ - queues = new ArrayList>(); - preActions = new ArrayList(); - postActions = new ArrayList(); - - monitors = new ArrayList(); - this.sensors = sensors; - - this.dataQueue = new LinkedBlockingQueue(); - } - + private final List preActions = new ArrayList(); + private final List postActions = new ArrayList(); + /** - * @return the queues + * Sensors to read out */ - public List> getQueues() { - return queues; + private List sensors; + + /** + * List of monitors that were attached to the sensor channels (i.e + * workaround) + */ + private final List monitors = new ArrayList(); + + /** + * List of blocking queues that hold the data for one sensor (channel) + */ + private final List> queues = new ArrayList>(); + + private CountDownLatch latch; + + public ScrlogicLoop(List sensors) { + this.sensors = sensors; } - - /* (non-Javadoc) + /* + * (non-Javadoc) + * * @see ch.psi.fda.core.Action#execute() */ @Override public void execute() throws InterruptedException { - try{ - // Attach monitors to the channels - for(Sensor sensor: sensors){ - if(sensor instanceof ChannelAccessDoubleSensor){ - ChannelAccessDoubleSensor s = (ChannelAccessDoubleSensor) sensor; - ChannelBean b = s.getChannel(); - // Create data queue for the channel - final BlockingQueue q = new LinkedBlockingQueue(); - queues.add(q); - - Monitor m = b.attachMonitor(new MonitorListenerDoubleTimestamp() { - - @Override - public void valueChanged(Double value, Date timestamp, long nanosecondsOffset) { - // Add values to channel queue - q.add(new TimestampedValue(value, timestamp.getTime(), nanosecondsOffset)); - // Increase semaphore count (used for merging thread of the queues) - semaphore.release(); - } - }); - monitors.add(m); - } - } - } - catch(CAException e){ - new RuntimeException("Unable to create monitor for channels",e); - } - - logger.info("Start data acquisition"); - - // Start merge thread - mergeThread = new Thread(new Runnable() { - - @Override - public void run() { - try { - merge(); - } catch (InterruptedException e) { - // Normal termination - } - } - }); - mergeThread.start(); - - } - /* (non-Javadoc) - * @see ch.psi.fda.core.Action#abort() - */ - @Override - public void abort() { - // Actually this is stopping the logic - - // Remove monitors - try{ - for(int i=0;i b = s.getChannel(); - b.removeMonitor(monitors.get(i)); + // Create data queue for the channel + final BlockingQueue q = new LinkedBlockingQueue(); + queues.add(q); + + Monitor m = b + .attachMonitor(new MonitorListenerDoubleTimestamp() { + + @Override + public void valueChanged(Double value, Date timestamp, long nanosecondsOffset) { + // Add values to channel queue + q.add(new TimestampedValue(value, timestamp.getTime(), nanosecondsOffset)); + } + }); + monitors.add(m); } } + } catch (CAException e) { + new RuntimeException("Unable to create monitor for channels", e); } - catch(CAException e){ + + logger.info("Start data acquisition"); + + latch.await(); + + // Remove monitors + try { + for (int i = 0; i < sensors.size(); i++) { + Sensor sensor = sensors.get(i); + if (sensor instanceof ChannelAccessDoubleSensor) { + ChannelAccessDoubleSensor s = (ChannelAccessDoubleSensor) sensor; + ChannelBean b = s.getChannel(); + try{ + b.removeMonitor(monitors.get(i)); + } + catch(IllegalArgumentException e){ + logger.log(Level.SEVERE, "Unable to detach monitor", e); + } + } + } + } catch (CAException e) { new RuntimeException(e); } - - // Stop merge thread - mergeThread.interrupt(); - + finally{ + // Clear all monitors in the list + monitors.clear(); + } + + // Merge data + merge(); + // Clear data queues - for(BlockingQueue q: queues){ + for (BlockingQueue q : queues) { q.clear(); } - + queues.clear(); + // Put end of stream to the queue dataQueue.add(new EndOfStreamMessage(dataGroup)); } - /* (non-Javadoc) + /* + * (non-Javadoc) + * + * @see ch.psi.fda.core.Action#abort() + */ + @Override + public void abort() { + latch.countDown(); + } + + /* + * (non-Javadoc) + * * @see ch.psi.fda.core.Action#destroy() */ @Override public void destroy() { // Destroy all sensors - for(Sensor s: sensors){ + for (Sensor s : sensors) { s.destroy(); } sensors.clear(); } - /* (non-Javadoc) + /* + * (non-Javadoc) + * * @see ch.psi.fda.core.ActionLoop#prepare() */ @Override @@ -214,7 +217,9 @@ public class ScrlogicLoop implements ActionLoop { // do nothing } - /* (non-Javadoc) + /* + * (non-Javadoc) + * * @see ch.psi.fda.core.ActionLoop#cleanup() */ @Override @@ -222,7 +227,9 @@ public class ScrlogicLoop implements ActionLoop { // Do nothing } - /* (non-Javadoc) + /* + * (non-Javadoc) + * * @see ch.psi.fda.core.ActionLoop#getPreActions() */ @Override @@ -230,7 +237,9 @@ public class ScrlogicLoop implements ActionLoop { return preActions; } - /* (non-Javadoc) + /* + * (non-Javadoc) + * * @see ch.psi.fda.core.ActionLoop#getPostActions() */ @Override @@ -238,7 +247,9 @@ public class ScrlogicLoop implements ActionLoop { return postActions; } - /* (non-Javadoc) + /* + * (non-Javadoc) + * * @see ch.psi.fda.core.ActionLoop#isDataGroup() */ @Override @@ -246,133 +257,141 @@ public class ScrlogicLoop implements ActionLoop { return dataGroup; } - /* (non-Javadoc) + /* + * (non-Javadoc) + * * @see ch.psi.fda.core.ActionLoop#setDataGroup(boolean) */ @Override public void setDataGroup(boolean dataGroup) { this.dataGroup = dataGroup; } - + /** - * The structure of the data message depends on the sensors registered at this loop - * at the time this method is called. + * The structure of the data message depends on the sensors registered at + * this loop at the time this method is called. + * * @return the data queue and the metadata of the data messages */ public DataQueue getDataQueue() { DataMessageMetadata m = new DataMessageMetadata(); - + // Build up data message metadata based on the channels registered. m.getComponents().add(new ComponentMetadata(ID_TIMESTAMP_MILLISECONDS)); - m.getComponents().add(new ComponentMetadata(ID_TIMESTAMP_OFFSET_NANOSECONDS)); - for(Sensor s: sensors){ + m.getComponents().add( + new ComponentMetadata(ID_TIMESTAMP_OFFSET_NANOSECONDS)); + for (Sensor s : sensors) { m.getComponents().add(new ComponentMetadata(s.getId())); } return new DataQueue(dataQueue, m); } - + + private boolean hasNext(){ + for (int i = 0; i < queues.size(); i++) { + if(!queues.get(i).isEmpty()){ + return true; + } + } + return false; + } /** * Merge data collected by different monitor * - * @throws InterruptedException + * @throws InterruptedException */ - private void merge() throws InterruptedException{ - + private void merge() throws InterruptedException { + // Array to hold temporary channel values TimestampedValue[] cvalues = new TimestampedValue[queues.size()]; TimestampedValueComparator comparator = new TimestampedValueComparator(); - + // Oldest value written TimestampedValue globalOldest = null; List indexes = new ArrayList(); - - while(!Thread.currentThread().isInterrupted()){ - - semaphore.acquire(); - Thread.sleep(10); // Ensure that close by monitors have time to catch up / also ensure context switch - + + while (hasNext()) { + +// semaphore.acquire(); + Thread.sleep(10); // Ensure that close by monitors have time to + // catch up / also ensure context switch + // Oldest value of this run TimestampedValue oldest = null; // Queue index of the oldest value of this run indexes.clear(); - + // Find oldest element in any of the queues - for(int i=0;i q = queues.get(i); TimestampedValue ttcheck = q.peek(); - - if(ttcheck != null){ - if(oldest==null){ + + if (ttcheck != null) { + if (oldest == null) { // Update the oldest variable with current element oldest = ttcheck; indexes.clear(); indexes.add(i); - } - else if(comparator.compare(ttcheck, oldest)<0){ // Check whether timestamp is less (older) than the current oldest timestamp. + } else if (comparator.compare(ttcheck, oldest) < 0) { + // Check whether timestamp is less (older) than the current oldest timestamp. oldest = ttcheck; indexes.clear(); indexes.add(i); - } - else if(comparator.compare(ttcheck, oldest) == 0){ + } else if (comparator.compare(ttcheck, oldest) == 0) { // SAME TIMESTAMP indexes.add(i); - } - else{ + } else { } } } -// logger.info("Index: "+index+" Permits: "+semaphore.availablePermits()); -// System.out.println("indexes: "+indexes.size()); - - if(indexes.size()>0){ + // logger.info("Index: "+index+" Permits: "+semaphore.availablePermits()); + // System.out.println("indexes: "+indexes.size()); + + if (indexes.size() > 0) { long timestamp = 0l; - long nanoOffset =0l; - - for(Integer index: indexes){ + long nanoOffset = 0l; + + for (Integer index : indexes) { // Get next older value cvalues[index] = queues.get(index).poll(); - - - if(globalOldest != null){ - if(comparator.compare(cvalues[index], globalOldest)>=0){ + + if (globalOldest != null) { + if (comparator.compare(cvalues[index], globalOldest) >= 0) { // Update the global oldest variable globalOldest = cvalues[index]; timestamp = cvalues[index].getTimestamp(); nanoOffset = cvalues[index].getNanosecondsOffset(); - } - else{ - // Monitors did not fire in sequence (an newer monitor overtook an older (from an other IOC)) + } else { + // Monitors did not fire in sequence (an newer + // monitor overtook an older (from an other IOC)) logger.warning("Timestamped value out of sequence - discard value !!!!"); // Continue with next value ... continue; } - } - else{ + } else { globalOldest = cvalues[index]; } } - + // Assemble data message ... DataMessage message = new DataMessage(); - + message.getData().add(new Double(timestamp)); message.getData().add(new Double(nanoOffset)); - - for(int y=0;y sensors = new ArrayList(); ChannelAccessDoubleSensor s2 = new ChannelAccessDoubleSensor("mot1", c.getMotor1()+".RVAL"); - ChannelAccessDoubleSensor s1 = new ChannelAccessDoubleSensor("mot1", c.getMotor1()+".RBV"); + ChannelAccessDoubleSensor s1 = new ChannelAccessDoubleSensor("mot2", c.getMotor1()+".RBV"); + ChannelAccessDoubleSensor s3 = new ChannelAccessDoubleSensor("mot2", "ARIDI-PCT:CURRENT"); sensors.add(s1); sensors.add(s2); + sensors.add(s3); ScrlogicLoop scrlogic = new ScrlogicLoop(sensors); diff --git a/ch.psi.fda/src/test/java/ch/psi/fda/core/loops/cr/ScrlogicLoopTest.java b/ch.psi.fda/src/test/java/ch/psi/fda/core/loops/cr/ScrlogicLoopTest.java index d77754a..84e3fd4 100644 --- a/ch.psi.fda/src/test/java/ch/psi/fda/core/loops/cr/ScrlogicLoopTest.java +++ b/ch.psi.fda/src/test/java/ch/psi/fda/core/loops/cr/ScrlogicLoopTest.java @@ -24,6 +24,7 @@ import gov.aps.jca.CAException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; import org.junit.After; import org.junit.Before; @@ -33,6 +34,8 @@ import ch.psi.fda.core.Sensor; import ch.psi.fda.core.messages.EndOfStreamMessage; import ch.psi.fda.core.messages.Message; import ch.psi.fda.core.sensors.ChannelAccessDoubleSensor; +import ch.psi.jcae.ChannelBean; +import ch.psi.jcae.ChannelBeanFactory; /** * @author ebner @@ -62,35 +65,80 @@ public class ScrlogicLoopTest { @Test public void testExecute() throws InterruptedException, CAException { + + System.out.println("For this test the motor MTEST-HW3:MOT1 need to be moved manually"); List sensors = new ArrayList(); - ChannelAccessDoubleSensor s2 = new ChannelAccessDoubleSensor("mot1", "MTEST-HW3:MOT1.RVAL"); - ChannelAccessDoubleSensor s1 = new ChannelAccessDoubleSensor("mot1", "MTEST-HW3:MOT1"); + ChannelAccessDoubleSensor s2 = new ChannelAccessDoubleSensor("mot1", "MTEST-HW3:MOT1.RBV"); + ChannelAccessDoubleSensor s1 = new ChannelAccessDoubleSensor("mot2", "MTEST-HW3:MOT1"); sensors.add(s1); sensors.add(s2); - ScrlogicLoop logic = new ScrlogicLoop(sensors); + final ScrlogicLoop logic = new ScrlogicLoop(sensors); + for(int i=0;i<2;i++){ - logic.prepare(); - logic.execute(); + logic.prepare(); + + Thread tt = new Thread(new Runnable(){ + + @Override + public void run() { + try { + logic.execute(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + }); + tt.start(); + + + final CountDownLatch l = new CountDownLatch(1); + Thread t = new Thread(new Runnable(){ + + @Override + public void run() { + try{ + ChannelBean channel = ChannelBeanFactory.getFactory().createChannelBean(Double.class, "MTEST-HW3:MOT1", false); + // Wait some time until + Thread.sleep(100); + channel.setValue(1.5); + Thread.sleep(100); + channel.setValue(2.5); + // Thread.sleep(100); + // channel.setValue(6.5); + // Thread.sleep(100); + + l.countDown(); + } + catch(Exception e){ + } + } + + }); + t.start(); + + l.await(); + + + logic.abort(); + + BlockingQueue queue = logic.getDataQueue().getQueue(); + Message m = queue.take(); + while(! (m instanceof EndOfStreamMessage)){ + System.out.println(m.toString()); + m = queue.take(); + } - // Wait some time until - Thread.sleep(10000); - - logic.abort(); + } logic.destroy(); - BlockingQueue queue = logic.getDataQueue().getQueue(); - Message m = queue.take(); - while(! (m instanceof EndOfStreamMessage)){ - System.out.println(m.toString()); - m = queue.take(); - } } }