Fixed problems with crlogic and readout of softchannels

This commit is contained in:
2012-05-31 09:59:46 +02:00
parent ccdb4c629d
commit 552ff6870e
7 changed files with 331 additions and 217 deletions

View File

@@ -1,3 +1,2 @@
#Wed Oct 19 12:49:33 CEST 2011
eclipse.preferences.version=1
org.eclipse.jdt.core.compiler.problem.forbiddenReference=warning

View File

@@ -3,7 +3,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>ch.psi</groupId>
<artifactId>fda</artifactId>
<version>1.1.32</version>
<version>1.1.34-SNAPSHOT</version>
<dependencies>
<dependency>
@@ -143,6 +143,41 @@
</configuration>
</plugin>
</plugins>
<pluginManagement>
<plugins>
<!--This plugin's configuration is used to store Eclipse m2e settings only. It has no influence on the Maven build itself.-->
<plugin>
<groupId>org.eclipse.m2e</groupId>
<artifactId>lifecycle-mapping</artifactId>
<version>1.0.0</version>
<configuration>
<lifecycleMappingMetadata>
<pluginExecutions>
<pluginExecution>
<pluginExecutionFilter>
<groupId>
org.jvnet.jaxb2.maven2
</groupId>
<artifactId>
maven-jaxb2-plugin
</artifactId>
<versionRange>
[0.8.0,)
</versionRange>
<goals>
<goal>generate</goal>
</goals>
</pluginExecutionFilter>
<action>
<ignore></ignore>
</action>
</pluginExecution>
</pluginExecutions>
</lifecycleMappingMetadata>
</configuration>
</plugin>
</plugins>
</pluginManagement>
</build>
<distributionManagement>

View File

@@ -130,7 +130,7 @@ public class ParallelCrlogic implements ActionLoop {
// Execute the logic of this path
crlogic.execute();
// Stop scrlogic
// Need to stop the scrlogic logic (otherwise it would keep going to take data)
scrlogic.abort();
return true;
}});
@@ -149,22 +149,22 @@ public class ParallelCrlogic implements ActionLoop {
b.await();
// Execute the logic of this path
scrlogic.execute();
scrlogic.execute(); // This actually just starts the collection ...
return true;
}});
list.add(f);
// Start data merge thread
logger.info("Start data merge");
f = service.submit(new Callable<Boolean>(){
@Override
public Boolean call() throws Exception {
merger.merge();
return true;
}});
list.add(f);
// // Start data merge thread
// logger.info("Start data merge");
// f = service.submit(new Callable<Boolean>(){
// @Override
// public Boolean call() throws Exception {
//
// merger.merge();
// return true;
// }});
// list.add(f);
for(Future<Boolean> bf: list){
@@ -177,6 +177,8 @@ public class ParallelCrlogic implements ActionLoop {
}
}
merger.merge();
// Wait until all threads have finished
service.shutdown();
service.awaitTermination(1, TimeUnit.MINUTES);

View File

@@ -66,11 +66,13 @@ public class ParallelCrlogicStreamMerge {
// Take first element of the primary queue (wait until message is available)
Message m = primaryQueue.getQueue().take();
while(! (m instanceof EndOfStreamMessage)){
if(m instanceof DataMessage){
DataMessage dm = (DataMessage) m;
// Get and remove merge timestamp from the data of the message
Double timestamp = (Double) dm.getData().remove(1);
long milliseconds = (long) (timestamp*1000);
@@ -82,6 +84,11 @@ public class ParallelCrlogicStreamMerge {
// Assumption: the secondary Queue holds at least the data up to the
// timestamp of the primary queue
Message mess = secondaryQueue.getQueue().peek();
// Message mess = secondaryQueue.getQueue().take();
if(mess instanceof EndOfStreamMessage){
break;
}
if(mess == null){
break;
@@ -118,7 +125,6 @@ public class ParallelCrlogicStreamMerge {
}
// Add data to primary data queue message and put it into the out queue
// System.out.println(currData);
dm.getData().addAll(currData);
dataQueue.add(dm);
@@ -130,6 +136,9 @@ public class ParallelCrlogicStreamMerge {
// Add the end of stream message of the primary queue
dataQueue.add(m);
// Clear all remaining messages in secondary queue
secondaryQueue.getQueue().clear();
}

View File

@@ -2,15 +2,15 @@
*
* Copyright 2010 Paul Scherrer Institute. All rights reserved.
*
* This code is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
* This code is free software: you can redistribute it and/or modify it under
* the terms of the GNU Lesser General Public License as published by the Free
* Software Foundation, either version 3 of the License, or (at your option) any
* later version.
*
* This code is distributed in the hope that it will be useful,
* but without any warranty; without even the implied warranty of
* merchantability or fitness for a particular purpose. See the
* GNU Lesser General Public License for more details.
* This code is distributed in the hope that it will be useful, but without any
* warranty; without even the implied warranty of merchantability or fitness for
* a particular purpose. See the GNU Lesser General Public License for more
* details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this code. If not, see <http://www.gnu.org/licenses/>.
@@ -26,8 +26,9 @@ import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.logging.Level;
import java.util.logging.Logger;
import ch.psi.fda.core.Action;
@@ -46,167 +47,169 @@ import ch.psi.jcae.MonitorListenerDoubleTimestamp;
/**
* @author ebner
*
* Assumptions:
* - The delay between the monitor writing the value to the monitor queue and the readout of all the queues
* is sufficient to prevent the situation that some monitors of events close to each other on different IOC's have
* not arrived yet.
* - The sequence of monitors fired for one channel is according to the sequence of the causes. No monitor package is
* overtaking an other package on the network.
* Assumptions: - The delay between the monitor writing the value to the
* monitor queue and the readout of all the queues is sufficient to
* prevent the situation that some monitors of events close to each
* other on different IOC's have not arrived yet. - The sequence of
* monitors fired for one channel is according to the sequence of the
* causes. No monitor package is overtaking an other package on the
* network.
*
* - No monitor events are lost on the network (while using monitors you cannot guarantee this)
*
* The data queue returned by this logic includes two items for the timestamp and nanoseconds offset. These two items are the
* first two items of a message
* The id's are:
* crTimestampMilliseconds
* crTimestampOffsetNanoseconds
* - No monitor events are lost on the network (while using monitors you
* cannot guarantee this)
*
* The data queue returned by this logic includes two items for the
* timestamp and nanoseconds offset. These two items are the first two
* items of a message The id's are: crTimestampMilliseconds
* crTimestampOffsetNanoseconds
*/
public class ScrlogicLoop implements ActionLoop {
private static String ID_TIMESTAMP_MILLISECONDS = "crTimestampMilliseconds";
private static String ID_TIMESTAMP_OFFSET_NANOSECONDS = "crTimestampOffsetNanoseconds";
// Get Logger
private static final Logger logger = Logger.getLogger(ScrlogicLoop.class.getName());
private static Semaphore semaphore = new Semaphore(0);
/**
* Data queue sensor data is posted to. A message consists of a list of data objects
* that are read out of the sensors of this loop.
* Data queue sensor data is posted to. A message consists of a list of data
* objects that are read out of the sensors of this loop.
*/
private BlockingQueue<Message> dataQueue;
private final BlockingQueue<Message> dataQueue = new LinkedBlockingQueue<Message>();
private boolean dataGroup = false;
private List<Action> preActions;
private List<Action> postActions;
private List<Sensor> sensors;
private List<Monitor> monitors;
private List<BlockingQueue<TimestampedValue>> queues;
private Thread mergeThread = null;
public ScrlogicLoop(List<Sensor> sensors){
queues = new ArrayList<BlockingQueue<TimestampedValue>>();
preActions = new ArrayList<Action>();
postActions = new ArrayList<Action>();
monitors = new ArrayList<Monitor>();
this.sensors = sensors;
this.dataQueue = new LinkedBlockingQueue<Message>();
}
private final List<Action> preActions = new ArrayList<Action>();
private final List<Action> postActions = new ArrayList<Action>();
/**
* @return the queues
* Sensors to read out
*/
public List<BlockingQueue<TimestampedValue>> getQueues() {
return queues;
private List<Sensor> sensors;
/**
* List of monitors that were attached to the sensor channels (i.e
* workaround)
*/
private final List<Monitor> monitors = new ArrayList<Monitor>();
/**
* List of blocking queues that hold the data for one sensor (channel)
*/
private final List<BlockingQueue<TimestampedValue>> queues = new ArrayList<BlockingQueue<TimestampedValue>>();
private CountDownLatch latch;
public ScrlogicLoop(List<Sensor> sensors) {
this.sensors = sensors;
}
/* (non-Javadoc)
/*
* (non-Javadoc)
*
* @see ch.psi.fda.core.Action#execute()
*/
@Override
public void execute() throws InterruptedException {
try{
// Attach monitors to the channels
for(Sensor sensor: sensors){
if(sensor instanceof ChannelAccessDoubleSensor){
ChannelAccessDoubleSensor s = (ChannelAccessDoubleSensor) sensor;
ChannelBean<Double> b = s.getChannel();
// Create data queue for the channel
final BlockingQueue<TimestampedValue> q = new LinkedBlockingQueue<TimestampedValue>();
queues.add(q);
Monitor m = b.attachMonitor(new MonitorListenerDoubleTimestamp() {
@Override
public void valueChanged(Double value, Date timestamp, long nanosecondsOffset) {
// Add values to channel queue
q.add(new TimestampedValue(value, timestamp.getTime(), nanosecondsOffset));
// Increase semaphore count (used for merging thread of the queues)
semaphore.release();
}
});
monitors.add(m);
}
}
}
catch(CAException e){
new RuntimeException("Unable to create monitor for channels",e);
}
logger.info("Start data acquisition");
// Start merge thread
mergeThread = new Thread(new Runnable() {
@Override
public void run() {
try {
merge();
} catch (InterruptedException e) {
// Normal termination
}
}
});
mergeThread.start();
}
/* (non-Javadoc)
* @see ch.psi.fda.core.Action#abort()
*/
@Override
public void abort() {
// Actually this is stopping the logic
// Remove monitors
try{
for(int i=0;i<sensors.size();i++){
Sensor sensor = sensors.get(i);
if(sensor instanceof ChannelAccessDoubleSensor){
// Clear all queues
queues.clear();
latch = new CountDownLatch(1);
try {
// Attach monitors to the channels (this is actually a workaround)
for (Sensor sensor : sensors) {
if (sensor instanceof ChannelAccessDoubleSensor) {
ChannelAccessDoubleSensor s = (ChannelAccessDoubleSensor) sensor;
ChannelBean<Double> b = s.getChannel();
b.removeMonitor(monitors.get(i));
// Create data queue for the channel
final BlockingQueue<TimestampedValue> q = new LinkedBlockingQueue<TimestampedValue>();
queues.add(q);
Monitor m = b
.attachMonitor(new MonitorListenerDoubleTimestamp() {
@Override
public void valueChanged(Double value, Date timestamp, long nanosecondsOffset) {
// Add values to channel queue
q.add(new TimestampedValue(value, timestamp.getTime(), nanosecondsOffset));
}
});
monitors.add(m);
}
}
} catch (CAException e) {
new RuntimeException("Unable to create monitor for channels", e);
}
catch(CAException e){
logger.info("Start data acquisition");
latch.await();
// Remove monitors
try {
for (int i = 0; i < sensors.size(); i++) {
Sensor sensor = sensors.get(i);
if (sensor instanceof ChannelAccessDoubleSensor) {
ChannelAccessDoubleSensor s = (ChannelAccessDoubleSensor) sensor;
ChannelBean<Double> b = s.getChannel();
try{
b.removeMonitor(monitors.get(i));
}
catch(IllegalArgumentException e){
logger.log(Level.SEVERE, "Unable to detach monitor", e);
}
}
}
} catch (CAException e) {
new RuntimeException(e);
}
// Stop merge thread
mergeThread.interrupt();
finally{
// Clear all monitors in the list
monitors.clear();
}
// Merge data
merge();
// Clear data queues
for(BlockingQueue<TimestampedValue> q: queues){
for (BlockingQueue<TimestampedValue> q : queues) {
q.clear();
}
queues.clear();
// Put end of stream to the queue
dataQueue.add(new EndOfStreamMessage(dataGroup));
}
/* (non-Javadoc)
/*
* (non-Javadoc)
*
* @see ch.psi.fda.core.Action#abort()
*/
@Override
public void abort() {
latch.countDown();
}
/*
* (non-Javadoc)
*
* @see ch.psi.fda.core.Action#destroy()
*/
@Override
public void destroy() {
// Destroy all sensors
for(Sensor s: sensors){
for (Sensor s : sensors) {
s.destroy();
}
sensors.clear();
}
/* (non-Javadoc)
/*
* (non-Javadoc)
*
* @see ch.psi.fda.core.ActionLoop#prepare()
*/
@Override
@@ -214,7 +217,9 @@ public class ScrlogicLoop implements ActionLoop {
// do nothing
}
/* (non-Javadoc)
/*
* (non-Javadoc)
*
* @see ch.psi.fda.core.ActionLoop#cleanup()
*/
@Override
@@ -222,7 +227,9 @@ public class ScrlogicLoop implements ActionLoop {
// Do nothing
}
/* (non-Javadoc)
/*
* (non-Javadoc)
*
* @see ch.psi.fda.core.ActionLoop#getPreActions()
*/
@Override
@@ -230,7 +237,9 @@ public class ScrlogicLoop implements ActionLoop {
return preActions;
}
/* (non-Javadoc)
/*
* (non-Javadoc)
*
* @see ch.psi.fda.core.ActionLoop#getPostActions()
*/
@Override
@@ -238,7 +247,9 @@ public class ScrlogicLoop implements ActionLoop {
return postActions;
}
/* (non-Javadoc)
/*
* (non-Javadoc)
*
* @see ch.psi.fda.core.ActionLoop#isDataGroup()
*/
@Override
@@ -246,133 +257,141 @@ public class ScrlogicLoop implements ActionLoop {
return dataGroup;
}
/* (non-Javadoc)
/*
* (non-Javadoc)
*
* @see ch.psi.fda.core.ActionLoop#setDataGroup(boolean)
*/
@Override
public void setDataGroup(boolean dataGroup) {
this.dataGroup = dataGroup;
}
/**
* The structure of the data message depends on the sensors registered at this loop
* at the time this method is called.
* The structure of the data message depends on the sensors registered at
* this loop at the time this method is called.
*
* @return the data queue and the metadata of the data messages
*/
public DataQueue getDataQueue() {
DataMessageMetadata m = new DataMessageMetadata();
// Build up data message metadata based on the channels registered.
m.getComponents().add(new ComponentMetadata(ID_TIMESTAMP_MILLISECONDS));
m.getComponents().add(new ComponentMetadata(ID_TIMESTAMP_OFFSET_NANOSECONDS));
for(Sensor s: sensors){
m.getComponents().add(
new ComponentMetadata(ID_TIMESTAMP_OFFSET_NANOSECONDS));
for (Sensor s : sensors) {
m.getComponents().add(new ComponentMetadata(s.getId()));
}
return new DataQueue(dataQueue, m);
}
private boolean hasNext(){
for (int i = 0; i < queues.size(); i++) {
if(!queues.get(i).isEmpty()){
return true;
}
}
return false;
}
/**
* Merge data collected by different monitor
*
* @throws InterruptedException
* @throws InterruptedException
*/
private void merge() throws InterruptedException{
private void merge() throws InterruptedException {
// Array to hold temporary channel values
TimestampedValue[] cvalues = new TimestampedValue[queues.size()];
TimestampedValueComparator comparator = new TimestampedValueComparator();
// Oldest value written
TimestampedValue globalOldest = null;
List<Integer> indexes = new ArrayList<Integer>();
while(!Thread.currentThread().isInterrupted()){
semaphore.acquire();
Thread.sleep(10); // Ensure that close by monitors have time to catch up / also ensure context switch
while (hasNext()) {
// semaphore.acquire();
Thread.sleep(10); // Ensure that close by monitors have time to
// catch up / also ensure context switch
// Oldest value of this run
TimestampedValue oldest = null;
// Queue index of the oldest value of this run
indexes.clear();
// Find oldest element in any of the queues
for(int i=0;i<queues.size();i++){
for (int i = 0; i < queues.size(); i++) {
BlockingQueue<TimestampedValue> q = queues.get(i);
TimestampedValue ttcheck = q.peek();
if(ttcheck != null){
if(oldest==null){
if (ttcheck != null) {
if (oldest == null) {
// Update the oldest variable with current element
oldest = ttcheck;
indexes.clear();
indexes.add(i);
}
else if(comparator.compare(ttcheck, oldest)<0){ // Check whether timestamp is less (older) than the current oldest timestamp.
} else if (comparator.compare(ttcheck, oldest) < 0) {
// Check whether timestamp is less (older) than the current oldest timestamp.
oldest = ttcheck;
indexes.clear();
indexes.add(i);
}
else if(comparator.compare(ttcheck, oldest) == 0){
} else if (comparator.compare(ttcheck, oldest) == 0) {
// SAME TIMESTAMP
indexes.add(i);
}
else{
} else {
}
}
}
// logger.info("Index: "+index+" Permits: "+semaphore.availablePermits());
// System.out.println("indexes: "+indexes.size());
if(indexes.size()>0){
// logger.info("Index: "+index+" Permits: "+semaphore.availablePermits());
// System.out.println("indexes: "+indexes.size());
if (indexes.size() > 0) {
long timestamp = 0l;
long nanoOffset =0l;
for(Integer index: indexes){
long nanoOffset = 0l;
for (Integer index : indexes) {
// Get next older value
cvalues[index] = queues.get(index).poll();
if(globalOldest != null){
if(comparator.compare(cvalues[index], globalOldest)>=0){
if (globalOldest != null) {
if (comparator.compare(cvalues[index], globalOldest) >= 0) {
// Update the global oldest variable
globalOldest = cvalues[index];
timestamp = cvalues[index].getTimestamp();
nanoOffset = cvalues[index].getNanosecondsOffset();
}
else{
// Monitors did not fire in sequence (an newer monitor overtook an older (from an other IOC))
} else {
// Monitors did not fire in sequence (an newer
// monitor overtook an older (from an other IOC))
logger.warning("Timestamped value out of sequence - discard value !!!!");
// Continue with next value ...
continue;
}
}
else{
} else {
globalOldest = cvalues[index];
}
}
// Assemble data message ...
DataMessage message = new DataMessage();
message.getData().add(new Double(timestamp));
message.getData().add(new Double(nanoOffset));
for(int y=0;y<cvalues.length;y++){
if(cvalues[y]!=null){
for (int y = 0; y < cvalues.length; y++) {
if (cvalues[y] != null) {
message.getData().add(new Double(cvalues[y].getValue()));
}
else{
} else {
message.getData().add(Double.NaN);
}
}
// System.out.println(message);
dataQueue.add(message);
}
}
}
}

View File

@@ -66,7 +66,7 @@ public class ParallelCrlogicTest {
public void tearDown() throws Exception {
}
@Ignore
// @Ignore
@Test(timeout=60000)
public void testExecute() throws InterruptedException, CAException{
@@ -83,10 +83,12 @@ public class ParallelCrlogicTest {
List<Sensor> sensors = new ArrayList<Sensor>();
ChannelAccessDoubleSensor s2 = new ChannelAccessDoubleSensor("mot1", c.getMotor1()+".RVAL");
ChannelAccessDoubleSensor s1 = new ChannelAccessDoubleSensor("mot1", c.getMotor1()+".RBV");
ChannelAccessDoubleSensor s1 = new ChannelAccessDoubleSensor("mot2", c.getMotor1()+".RBV");
ChannelAccessDoubleSensor s3 = new ChannelAccessDoubleSensor("mot2", "ARIDI-PCT:CURRENT");
sensors.add(s1);
sensors.add(s2);
sensors.add(s3);
ScrlogicLoop scrlogic = new ScrlogicLoop(sensors);

View File

@@ -24,6 +24,7 @@ import gov.aps.jca.CAException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import org.junit.After;
import org.junit.Before;
@@ -33,6 +34,8 @@ import ch.psi.fda.core.Sensor;
import ch.psi.fda.core.messages.EndOfStreamMessage;
import ch.psi.fda.core.messages.Message;
import ch.psi.fda.core.sensors.ChannelAccessDoubleSensor;
import ch.psi.jcae.ChannelBean;
import ch.psi.jcae.ChannelBeanFactory;
/**
* @author ebner
@@ -62,35 +65,80 @@ public class ScrlogicLoopTest {
@Test
public void testExecute() throws InterruptedException, CAException {
System.out.println("For this test the motor MTEST-HW3:MOT1 need to be moved manually");
List<Sensor> sensors = new ArrayList<Sensor>();
ChannelAccessDoubleSensor s2 = new ChannelAccessDoubleSensor("mot1", "MTEST-HW3:MOT1.RVAL");
ChannelAccessDoubleSensor s1 = new ChannelAccessDoubleSensor("mot1", "MTEST-HW3:MOT1");
ChannelAccessDoubleSensor s2 = new ChannelAccessDoubleSensor("mot1", "MTEST-HW3:MOT1.RBV");
ChannelAccessDoubleSensor s1 = new ChannelAccessDoubleSensor("mot2", "MTEST-HW3:MOT1");
sensors.add(s1);
sensors.add(s2);
ScrlogicLoop logic = new ScrlogicLoop(sensors);
final ScrlogicLoop logic = new ScrlogicLoop(sensors);
for(int i=0;i<2;i++){
logic.prepare();
logic.execute();
logic.prepare();
Thread tt = new Thread(new Runnable(){
@Override
public void run() {
try {
logic.execute();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
tt.start();
final CountDownLatch l = new CountDownLatch(1);
Thread t = new Thread(new Runnable(){
@Override
public void run() {
try{
ChannelBean<Double> channel = ChannelBeanFactory.getFactory().createChannelBean(Double.class, "MTEST-HW3:MOT1", false);
// Wait some time until
Thread.sleep(100);
channel.setValue(1.5);
Thread.sleep(100);
channel.setValue(2.5);
// Thread.sleep(100);
// channel.setValue(6.5);
// Thread.sleep(100);
l.countDown();
}
catch(Exception e){
}
}
});
t.start();
l.await();
logic.abort();
BlockingQueue<Message> queue = logic.getDataQueue().getQueue();
Message m = queue.take();
while(! (m instanceof EndOfStreamMessage)){
System.out.println(m.toString());
m = queue.take();
}
// Wait some time until
Thread.sleep(10000);
logic.abort();
}
logic.destroy();
BlockingQueue<Message> queue = logic.getDataQueue().getQueue();
Message m = queue.take();
while(! (m instanceof EndOfStreamMessage)){
System.out.println(m.toString());
m = queue.take();
}
}
}