diff --git a/ch.psi.fda/.settings/org.eclipse.jdt.core.prefs b/ch.psi.fda/.settings/org.eclipse.jdt.core.prefs
index 74b8c7b..4ede96d 100644
--- a/ch.psi.fda/.settings/org.eclipse.jdt.core.prefs
+++ b/ch.psi.fda/.settings/org.eclipse.jdt.core.prefs
@@ -1,3 +1,2 @@
-#Wed Oct 19 12:49:33 CEST 2011
eclipse.preferences.version=1
org.eclipse.jdt.core.compiler.problem.forbiddenReference=warning
diff --git a/ch.psi.fda/pom.xml b/ch.psi.fda/pom.xml
index 0647d11..029fa6b 100644
--- a/ch.psi.fda/pom.xml
+++ b/ch.psi.fda/pom.xml
@@ -3,7 +3,7 @@
4.0.0
ch.psi
fda
- 1.1.32
+ 1.1.34-SNAPSHOT
@@ -143,6 +143,41 @@
+
+
+
+
+ org.eclipse.m2e
+ lifecycle-mapping
+ 1.0.0
+
+
+
+
+
+
+ org.jvnet.jaxb2.maven2
+
+
+ maven-jaxb2-plugin
+
+
+ [0.8.0,)
+
+
+ generate
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/ch.psi.fda/src/main/java/ch/psi/fda/core/loops/cr/ParallelCrlogic.java b/ch.psi.fda/src/main/java/ch/psi/fda/core/loops/cr/ParallelCrlogic.java
index a116096..c867509 100644
--- a/ch.psi.fda/src/main/java/ch/psi/fda/core/loops/cr/ParallelCrlogic.java
+++ b/ch.psi.fda/src/main/java/ch/psi/fda/core/loops/cr/ParallelCrlogic.java
@@ -130,7 +130,7 @@ public class ParallelCrlogic implements ActionLoop {
// Execute the logic of this path
crlogic.execute();
- // Stop scrlogic
+ // Need to stop the scrlogic logic (otherwise it would keep going to take data)
scrlogic.abort();
return true;
}});
@@ -149,22 +149,22 @@ public class ParallelCrlogic implements ActionLoop {
b.await();
// Execute the logic of this path
- scrlogic.execute();
+ scrlogic.execute(); // This actually just starts the collection ...
return true;
}});
list.add(f);
- // Start data merge thread
- logger.info("Start data merge");
- f = service.submit(new Callable(){
- @Override
- public Boolean call() throws Exception {
-
- merger.merge();
- return true;
- }});
- list.add(f);
+// // Start data merge thread
+// logger.info("Start data merge");
+// f = service.submit(new Callable(){
+// @Override
+// public Boolean call() throws Exception {
+//
+// merger.merge();
+// return true;
+// }});
+// list.add(f);
for(Future bf: list){
@@ -177,6 +177,8 @@ public class ParallelCrlogic implements ActionLoop {
}
}
+ merger.merge();
+
// Wait until all threads have finished
service.shutdown();
service.awaitTermination(1, TimeUnit.MINUTES);
diff --git a/ch.psi.fda/src/main/java/ch/psi/fda/core/loops/cr/ParallelCrlogicStreamMerge.java b/ch.psi.fda/src/main/java/ch/psi/fda/core/loops/cr/ParallelCrlogicStreamMerge.java
index a9c49c3..e218a72 100644
--- a/ch.psi.fda/src/main/java/ch/psi/fda/core/loops/cr/ParallelCrlogicStreamMerge.java
+++ b/ch.psi.fda/src/main/java/ch/psi/fda/core/loops/cr/ParallelCrlogicStreamMerge.java
@@ -66,11 +66,13 @@ public class ParallelCrlogicStreamMerge {
// Take first element of the primary queue (wait until message is available)
Message m = primaryQueue.getQueue().take();
+
while(! (m instanceof EndOfStreamMessage)){
if(m instanceof DataMessage){
DataMessage dm = (DataMessage) m;
+
// Get and remove merge timestamp from the data of the message
Double timestamp = (Double) dm.getData().remove(1);
long milliseconds = (long) (timestamp*1000);
@@ -82,6 +84,11 @@ public class ParallelCrlogicStreamMerge {
// Assumption: the secondary Queue holds at least the data up to the
// timestamp of the primary queue
Message mess = secondaryQueue.getQueue().peek();
+// Message mess = secondaryQueue.getQueue().take();
+
+ if(mess instanceof EndOfStreamMessage){
+ break;
+ }
if(mess == null){
break;
@@ -118,7 +125,6 @@ public class ParallelCrlogicStreamMerge {
}
// Add data to primary data queue message and put it into the out queue
-// System.out.println(currData);
dm.getData().addAll(currData);
dataQueue.add(dm);
@@ -130,6 +136,9 @@ public class ParallelCrlogicStreamMerge {
// Add the end of stream message of the primary queue
dataQueue.add(m);
+
+ // Clear all remaining messages in secondary queue
+ secondaryQueue.getQueue().clear();
}
diff --git a/ch.psi.fda/src/main/java/ch/psi/fda/core/loops/cr/ScrlogicLoop.java b/ch.psi.fda/src/main/java/ch/psi/fda/core/loops/cr/ScrlogicLoop.java
index f0d39d8..2e0ca3f 100644
--- a/ch.psi.fda/src/main/java/ch/psi/fda/core/loops/cr/ScrlogicLoop.java
+++ b/ch.psi.fda/src/main/java/ch/psi/fda/core/loops/cr/ScrlogicLoop.java
@@ -2,15 +2,15 @@
*
* Copyright 2010 Paul Scherrer Institute. All rights reserved.
*
- * This code is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Lesser General Public License as published by
- * the Free Software Foundation, either version 3 of the License, or
- * (at your option) any later version.
+ * This code is free software: you can redistribute it and/or modify it under
+ * the terms of the GNU Lesser General Public License as published by the Free
+ * Software Foundation, either version 3 of the License, or (at your option) any
+ * later version.
*
- * This code is distributed in the hope that it will be useful,
- * but without any warranty; without even the implied warranty of
- * merchantability or fitness for a particular purpose. See the
- * GNU Lesser General Public License for more details.
+ * This code is distributed in the hope that it will be useful, but without any
+ * warranty; without even the implied warranty of merchantability or fitness for
+ * a particular purpose. See the GNU Lesser General Public License for more
+ * details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this code. If not, see .
@@ -26,8 +26,9 @@ import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.Semaphore;
+import java.util.logging.Level;
import java.util.logging.Logger;
import ch.psi.fda.core.Action;
@@ -46,167 +47,169 @@ import ch.psi.jcae.MonitorListenerDoubleTimestamp;
/**
* @author ebner
*
- * Assumptions:
- * - The delay between the monitor writing the value to the monitor queue and the readout of all the queues
- * is sufficient to prevent the situation that some monitors of events close to each other on different IOC's have
- * not arrived yet.
- * - The sequence of monitors fired for one channel is according to the sequence of the causes. No monitor package is
- * overtaking an other package on the network.
+ * Assumptions: - The delay between the monitor writing the value to the
+ * monitor queue and the readout of all the queues is sufficient to
+ * prevent the situation that some monitors of events close to each
+ * other on different IOC's have not arrived yet. - The sequence of
+ * monitors fired for one channel is according to the sequence of the
+ * causes. No monitor package is overtaking an other package on the
+ * network.
*
- * - No monitor events are lost on the network (while using monitors you cannot guarantee this)
- *
- * The data queue returned by this logic includes two items for the timestamp and nanoseconds offset. These two items are the
- * first two items of a message
- * The id's are:
- * crTimestampMilliseconds
- * crTimestampOffsetNanoseconds
+ * - No monitor events are lost on the network (while using monitors you
+ * cannot guarantee this)
+ *
+ * The data queue returned by this logic includes two items for the
+ * timestamp and nanoseconds offset. These two items are the first two
+ * items of a message The id's are: crTimestampMilliseconds
+ * crTimestampOffsetNanoseconds
*/
public class ScrlogicLoop implements ActionLoop {
-
+
private static String ID_TIMESTAMP_MILLISECONDS = "crTimestampMilliseconds";
private static String ID_TIMESTAMP_OFFSET_NANOSECONDS = "crTimestampOffsetNanoseconds";
-
+
// Get Logger
private static final Logger logger = Logger.getLogger(ScrlogicLoop.class.getName());
-
- private static Semaphore semaphore = new Semaphore(0);
-
+
+
/**
- * Data queue sensor data is posted to. A message consists of a list of data objects
- * that are read out of the sensors of this loop.
+ * Data queue sensor data is posted to. A message consists of a list of data
+ * objects that are read out of the sensors of this loop.
*/
- private BlockingQueue dataQueue;
-
+ private final BlockingQueue dataQueue = new LinkedBlockingQueue();
+
private boolean dataGroup = false;
- private List preActions;
- private List postActions;
-
- private List sensors;
- private List monitors;
-
- private List> queues;
-
- private Thread mergeThread = null;
-
-
- public ScrlogicLoop(List sensors){
- queues = new ArrayList>();
- preActions = new ArrayList();
- postActions = new ArrayList();
-
- monitors = new ArrayList();
- this.sensors = sensors;
-
- this.dataQueue = new LinkedBlockingQueue();
- }
-
+ private final List preActions = new ArrayList();
+ private final List postActions = new ArrayList();
+
/**
- * @return the queues
+ * Sensors to read out
*/
- public List> getQueues() {
- return queues;
+ private List sensors;
+
+ /**
+ * List of monitors that were attached to the sensor channels (i.e
+ * workaround)
+ */
+ private final List monitors = new ArrayList();
+
+ /**
+ * List of blocking queues that hold the data for one sensor (channel)
+ */
+ private final List> queues = new ArrayList>();
+
+ private CountDownLatch latch;
+
+ public ScrlogicLoop(List sensors) {
+ this.sensors = sensors;
}
-
- /* (non-Javadoc)
+ /*
+ * (non-Javadoc)
+ *
* @see ch.psi.fda.core.Action#execute()
*/
@Override
public void execute() throws InterruptedException {
- try{
- // Attach monitors to the channels
- for(Sensor sensor: sensors){
- if(sensor instanceof ChannelAccessDoubleSensor){
- ChannelAccessDoubleSensor s = (ChannelAccessDoubleSensor) sensor;
- ChannelBean b = s.getChannel();
- // Create data queue for the channel
- final BlockingQueue q = new LinkedBlockingQueue();
- queues.add(q);
-
- Monitor m = b.attachMonitor(new MonitorListenerDoubleTimestamp() {
-
- @Override
- public void valueChanged(Double value, Date timestamp, long nanosecondsOffset) {
- // Add values to channel queue
- q.add(new TimestampedValue(value, timestamp.getTime(), nanosecondsOffset));
- // Increase semaphore count (used for merging thread of the queues)
- semaphore.release();
- }
- });
- monitors.add(m);
- }
- }
- }
- catch(CAException e){
- new RuntimeException("Unable to create monitor for channels",e);
- }
-
- logger.info("Start data acquisition");
-
- // Start merge thread
- mergeThread = new Thread(new Runnable() {
-
- @Override
- public void run() {
- try {
- merge();
- } catch (InterruptedException e) {
- // Normal termination
- }
- }
- });
- mergeThread.start();
-
- }
- /* (non-Javadoc)
- * @see ch.psi.fda.core.Action#abort()
- */
- @Override
- public void abort() {
- // Actually this is stopping the logic
-
- // Remove monitors
- try{
- for(int i=0;i b = s.getChannel();
- b.removeMonitor(monitors.get(i));
+ // Create data queue for the channel
+ final BlockingQueue q = new LinkedBlockingQueue();
+ queues.add(q);
+
+ Monitor m = b
+ .attachMonitor(new MonitorListenerDoubleTimestamp() {
+
+ @Override
+ public void valueChanged(Double value, Date timestamp, long nanosecondsOffset) {
+ // Add values to channel queue
+ q.add(new TimestampedValue(value, timestamp.getTime(), nanosecondsOffset));
+ }
+ });
+ monitors.add(m);
}
}
+ } catch (CAException e) {
+ new RuntimeException("Unable to create monitor for channels", e);
}
- catch(CAException e){
+
+ logger.info("Start data acquisition");
+
+ latch.await();
+
+ // Remove monitors
+ try {
+ for (int i = 0; i < sensors.size(); i++) {
+ Sensor sensor = sensors.get(i);
+ if (sensor instanceof ChannelAccessDoubleSensor) {
+ ChannelAccessDoubleSensor s = (ChannelAccessDoubleSensor) sensor;
+ ChannelBean b = s.getChannel();
+ try{
+ b.removeMonitor(monitors.get(i));
+ }
+ catch(IllegalArgumentException e){
+ logger.log(Level.SEVERE, "Unable to detach monitor", e);
+ }
+ }
+ }
+ } catch (CAException e) {
new RuntimeException(e);
}
-
- // Stop merge thread
- mergeThread.interrupt();
-
+ finally{
+ // Clear all monitors in the list
+ monitors.clear();
+ }
+
+ // Merge data
+ merge();
+
// Clear data queues
- for(BlockingQueue q: queues){
+ for (BlockingQueue q : queues) {
q.clear();
}
-
+ queues.clear();
+
// Put end of stream to the queue
dataQueue.add(new EndOfStreamMessage(dataGroup));
}
- /* (non-Javadoc)
+ /*
+ * (non-Javadoc)
+ *
+ * @see ch.psi.fda.core.Action#abort()
+ */
+ @Override
+ public void abort() {
+ latch.countDown();
+ }
+
+ /*
+ * (non-Javadoc)
+ *
* @see ch.psi.fda.core.Action#destroy()
*/
@Override
public void destroy() {
// Destroy all sensors
- for(Sensor s: sensors){
+ for (Sensor s : sensors) {
s.destroy();
}
sensors.clear();
}
- /* (non-Javadoc)
+ /*
+ * (non-Javadoc)
+ *
* @see ch.psi.fda.core.ActionLoop#prepare()
*/
@Override
@@ -214,7 +217,9 @@ public class ScrlogicLoop implements ActionLoop {
// do nothing
}
- /* (non-Javadoc)
+ /*
+ * (non-Javadoc)
+ *
* @see ch.psi.fda.core.ActionLoop#cleanup()
*/
@Override
@@ -222,7 +227,9 @@ public class ScrlogicLoop implements ActionLoop {
// Do nothing
}
- /* (non-Javadoc)
+ /*
+ * (non-Javadoc)
+ *
* @see ch.psi.fda.core.ActionLoop#getPreActions()
*/
@Override
@@ -230,7 +237,9 @@ public class ScrlogicLoop implements ActionLoop {
return preActions;
}
- /* (non-Javadoc)
+ /*
+ * (non-Javadoc)
+ *
* @see ch.psi.fda.core.ActionLoop#getPostActions()
*/
@Override
@@ -238,7 +247,9 @@ public class ScrlogicLoop implements ActionLoop {
return postActions;
}
- /* (non-Javadoc)
+ /*
+ * (non-Javadoc)
+ *
* @see ch.psi.fda.core.ActionLoop#isDataGroup()
*/
@Override
@@ -246,133 +257,141 @@ public class ScrlogicLoop implements ActionLoop {
return dataGroup;
}
- /* (non-Javadoc)
+ /*
+ * (non-Javadoc)
+ *
* @see ch.psi.fda.core.ActionLoop#setDataGroup(boolean)
*/
@Override
public void setDataGroup(boolean dataGroup) {
this.dataGroup = dataGroup;
}
-
+
/**
- * The structure of the data message depends on the sensors registered at this loop
- * at the time this method is called.
+ * The structure of the data message depends on the sensors registered at
+ * this loop at the time this method is called.
+ *
* @return the data queue and the metadata of the data messages
*/
public DataQueue getDataQueue() {
DataMessageMetadata m = new DataMessageMetadata();
-
+
// Build up data message metadata based on the channels registered.
m.getComponents().add(new ComponentMetadata(ID_TIMESTAMP_MILLISECONDS));
- m.getComponents().add(new ComponentMetadata(ID_TIMESTAMP_OFFSET_NANOSECONDS));
- for(Sensor s: sensors){
+ m.getComponents().add(
+ new ComponentMetadata(ID_TIMESTAMP_OFFSET_NANOSECONDS));
+ for (Sensor s : sensors) {
m.getComponents().add(new ComponentMetadata(s.getId()));
}
return new DataQueue(dataQueue, m);
}
-
+
+ private boolean hasNext(){
+ for (int i = 0; i < queues.size(); i++) {
+ if(!queues.get(i).isEmpty()){
+ return true;
+ }
+ }
+ return false;
+ }
/**
* Merge data collected by different monitor
*
- * @throws InterruptedException
+ * @throws InterruptedException
*/
- private void merge() throws InterruptedException{
-
+ private void merge() throws InterruptedException {
+
// Array to hold temporary channel values
TimestampedValue[] cvalues = new TimestampedValue[queues.size()];
TimestampedValueComparator comparator = new TimestampedValueComparator();
-
+
// Oldest value written
TimestampedValue globalOldest = null;
List indexes = new ArrayList();
-
- while(!Thread.currentThread().isInterrupted()){
-
- semaphore.acquire();
- Thread.sleep(10); // Ensure that close by monitors have time to catch up / also ensure context switch
-
+
+ while (hasNext()) {
+
+// semaphore.acquire();
+ Thread.sleep(10); // Ensure that close by monitors have time to
+ // catch up / also ensure context switch
+
// Oldest value of this run
TimestampedValue oldest = null;
// Queue index of the oldest value of this run
indexes.clear();
-
+
// Find oldest element in any of the queues
- for(int i=0;i q = queues.get(i);
TimestampedValue ttcheck = q.peek();
-
- if(ttcheck != null){
- if(oldest==null){
+
+ if (ttcheck != null) {
+ if (oldest == null) {
// Update the oldest variable with current element
oldest = ttcheck;
indexes.clear();
indexes.add(i);
- }
- else if(comparator.compare(ttcheck, oldest)<0){ // Check whether timestamp is less (older) than the current oldest timestamp.
+ } else if (comparator.compare(ttcheck, oldest) < 0) {
+ // Check whether timestamp is less (older) than the current oldest timestamp.
oldest = ttcheck;
indexes.clear();
indexes.add(i);
- }
- else if(comparator.compare(ttcheck, oldest) == 0){
+ } else if (comparator.compare(ttcheck, oldest) == 0) {
// SAME TIMESTAMP
indexes.add(i);
- }
- else{
+ } else {
}
}
}
-// logger.info("Index: "+index+" Permits: "+semaphore.availablePermits());
-// System.out.println("indexes: "+indexes.size());
-
- if(indexes.size()>0){
+ // logger.info("Index: "+index+" Permits: "+semaphore.availablePermits());
+ // System.out.println("indexes: "+indexes.size());
+
+ if (indexes.size() > 0) {
long timestamp = 0l;
- long nanoOffset =0l;
-
- for(Integer index: indexes){
+ long nanoOffset = 0l;
+
+ for (Integer index : indexes) {
// Get next older value
cvalues[index] = queues.get(index).poll();
-
-
- if(globalOldest != null){
- if(comparator.compare(cvalues[index], globalOldest)>=0){
+
+ if (globalOldest != null) {
+ if (comparator.compare(cvalues[index], globalOldest) >= 0) {
// Update the global oldest variable
globalOldest = cvalues[index];
timestamp = cvalues[index].getTimestamp();
nanoOffset = cvalues[index].getNanosecondsOffset();
- }
- else{
- // Monitors did not fire in sequence (an newer monitor overtook an older (from an other IOC))
+ } else {
+ // Monitors did not fire in sequence (an newer
+ // monitor overtook an older (from an other IOC))
logger.warning("Timestamped value out of sequence - discard value !!!!");
// Continue with next value ...
continue;
}
- }
- else{
+ } else {
globalOldest = cvalues[index];
}
}
-
+ // Assemble data message ...
DataMessage message = new DataMessage();
-
+
message.getData().add(new Double(timestamp));
message.getData().add(new Double(nanoOffset));
-
- for(int y=0;y sensors = new ArrayList();
ChannelAccessDoubleSensor s2 = new ChannelAccessDoubleSensor("mot1", c.getMotor1()+".RVAL");
- ChannelAccessDoubleSensor s1 = new ChannelAccessDoubleSensor("mot1", c.getMotor1()+".RBV");
+ ChannelAccessDoubleSensor s1 = new ChannelAccessDoubleSensor("mot2", c.getMotor1()+".RBV");
+ ChannelAccessDoubleSensor s3 = new ChannelAccessDoubleSensor("mot2", "ARIDI-PCT:CURRENT");
sensors.add(s1);
sensors.add(s2);
+ sensors.add(s3);
ScrlogicLoop scrlogic = new ScrlogicLoop(sensors);
diff --git a/ch.psi.fda/src/test/java/ch/psi/fda/core/loops/cr/ScrlogicLoopTest.java b/ch.psi.fda/src/test/java/ch/psi/fda/core/loops/cr/ScrlogicLoopTest.java
index d77754a..84e3fd4 100644
--- a/ch.psi.fda/src/test/java/ch/psi/fda/core/loops/cr/ScrlogicLoopTest.java
+++ b/ch.psi.fda/src/test/java/ch/psi/fda/core/loops/cr/ScrlogicLoopTest.java
@@ -24,6 +24,7 @@ import gov.aps.jca.CAException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
import org.junit.After;
import org.junit.Before;
@@ -33,6 +34,8 @@ import ch.psi.fda.core.Sensor;
import ch.psi.fda.core.messages.EndOfStreamMessage;
import ch.psi.fda.core.messages.Message;
import ch.psi.fda.core.sensors.ChannelAccessDoubleSensor;
+import ch.psi.jcae.ChannelBean;
+import ch.psi.jcae.ChannelBeanFactory;
/**
* @author ebner
@@ -62,35 +65,80 @@ public class ScrlogicLoopTest {
@Test
public void testExecute() throws InterruptedException, CAException {
+
+
System.out.println("For this test the motor MTEST-HW3:MOT1 need to be moved manually");
List sensors = new ArrayList();
- ChannelAccessDoubleSensor s2 = new ChannelAccessDoubleSensor("mot1", "MTEST-HW3:MOT1.RVAL");
- ChannelAccessDoubleSensor s1 = new ChannelAccessDoubleSensor("mot1", "MTEST-HW3:MOT1");
+ ChannelAccessDoubleSensor s2 = new ChannelAccessDoubleSensor("mot1", "MTEST-HW3:MOT1.RBV");
+ ChannelAccessDoubleSensor s1 = new ChannelAccessDoubleSensor("mot2", "MTEST-HW3:MOT1");
sensors.add(s1);
sensors.add(s2);
- ScrlogicLoop logic = new ScrlogicLoop(sensors);
+ final ScrlogicLoop logic = new ScrlogicLoop(sensors);
+ for(int i=0;i<2;i++){
- logic.prepare();
- logic.execute();
+ logic.prepare();
+
+ Thread tt = new Thread(new Runnable(){
+
+ @Override
+ public void run() {
+ try {
+ logic.execute();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+
+ });
+ tt.start();
+
+
+ final CountDownLatch l = new CountDownLatch(1);
+ Thread t = new Thread(new Runnable(){
+
+ @Override
+ public void run() {
+ try{
+ ChannelBean channel = ChannelBeanFactory.getFactory().createChannelBean(Double.class, "MTEST-HW3:MOT1", false);
+ // Wait some time until
+ Thread.sleep(100);
+ channel.setValue(1.5);
+ Thread.sleep(100);
+ channel.setValue(2.5);
+ // Thread.sleep(100);
+ // channel.setValue(6.5);
+ // Thread.sleep(100);
+
+ l.countDown();
+ }
+ catch(Exception e){
+ }
+ }
+
+ });
+ t.start();
+
+ l.await();
+
+
+ logic.abort();
+
+ BlockingQueue queue = logic.getDataQueue().getQueue();
+ Message m = queue.take();
+ while(! (m instanceof EndOfStreamMessage)){
+ System.out.println(m.toString());
+ m = queue.take();
+ }
- // Wait some time until
- Thread.sleep(10000);
-
- logic.abort();
+ }
logic.destroy();
- BlockingQueue queue = logic.getDataQueue().getQueue();
- Message m = queue.take();
- while(! (m instanceof EndOfStreamMessage)){
- System.out.println(m.toString());
- m = queue.take();
- }
}
}