diff --git a/ch.psi.fda/pom.xml b/ch.psi.fda/pom.xml
index f392a4f..0fd7829 100644
--- a/ch.psi.fda/pom.xml
+++ b/ch.psi.fda/pom.xml
@@ -6,6 +6,18 @@
1.1.40
+
+ org.jeromq
+ jeromq
+ 0.2.0
+
+
+
+ commons-lang
+ commons-lang
+ 2.6
+
+
ch.psi
jcae
diff --git a/ch.psi.fda/src/main/java/ch/psi/fda/aq/Acquisition.java b/ch.psi.fda/src/main/java/ch/psi/fda/aq/Acquisition.java
index b5d6edb..7b36884 100644
--- a/ch.psi.fda/src/main/java/ch/psi/fda/aq/Acquisition.java
+++ b/ch.psi.fda/src/main/java/ch/psi/fda/aq/Acquisition.java
@@ -264,7 +264,7 @@ public class Acquisition {
if (getQueue) {
vdq = new DataQueue(new LinkedBlockingQueue(1000), manipulator.getOutQueue().getDataMessageMetadata()); // Create bounded queue to prevent
// running out of memory ...
- dispatcher.getOutQueues().add(vdq);
+// dispatcher.getOutQueues().add(vdq);
}
// Add queue for serializer to dispatcher
diff --git a/ch.psi.fda/src/main/java/ch/psi/fda/core/collector/DataDispatcher.java b/ch.psi.fda/src/main/java/ch/psi/fda/core/collector/DataDispatcher.java
index d69c745..395f650 100644
--- a/ch.psi.fda/src/main/java/ch/psi/fda/core/collector/DataDispatcher.java
+++ b/ch.psi.fda/src/main/java/ch/psi/fda/core/collector/DataDispatcher.java
@@ -22,6 +22,9 @@ package ch.psi.fda.core.collector;
import java.util.ArrayList;
import java.util.List;
+import org.apache.commons.lang.SerializationUtils;
+import org.jeromq.ZMQ;
+
import ch.psi.fda.core.messages.DataQueue;
import ch.psi.fda.core.messages.EndOfStreamMessage;
import ch.psi.fda.core.messages.Message;
@@ -48,6 +51,12 @@ public class DataDispatcher implements Runnable{
public void run() {
try{
+
+ ZMQ.Context context = ZMQ.context();
+ ZMQ.Socket socket = context.socket(ZMQ.PUB);
+ socket.bind("tcp://*:9090");
+// socket.bind("inproc://visualize");
+
// TODO Need to synchronize message metadata
// for(DataQueue q: outQueues){
// }
@@ -55,11 +64,11 @@ public class DataDispatcher implements Runnable{
// Dispatch Messages
Message message = queue.getQueue().take();
while(!(message instanceof EndOfStreamMessage)){
-
// Clone message ...
for(DataQueue q: outQueues){
q.getQueue().put(message);
}
+ socket.send(SerializationUtils.serialize(message));
// Read next message
message = queue.getQueue().take();
@@ -69,6 +78,10 @@ public class DataDispatcher implements Runnable{
for(DataQueue q: outQueues){
q.getQueue().put(message);
}
+ socket.send(SerializationUtils.serialize(message));
+
+ socket.close();
+ context.term();
} catch (InterruptedException e) {
// TODO Stop loop and exit logic instead of throwing an Exception
diff --git a/ch.psi.fda/src/main/java/ch/psi/fda/core/messages/ComponentMetadata.java b/ch.psi.fda/src/main/java/ch/psi/fda/core/messages/ComponentMetadata.java
index 11004d7..ae72fa3 100644
--- a/ch.psi.fda/src/main/java/ch/psi/fda/core/messages/ComponentMetadata.java
+++ b/ch.psi.fda/src/main/java/ch/psi/fda/core/messages/ComponentMetadata.java
@@ -19,6 +19,8 @@
package ch.psi.fda.core.messages;
+import java.io.Serializable;
+
/**
* Metadata of a component of a message. Each component has a global id.
* Optionally the component can also belong to a dimension. However, depending on the
@@ -28,7 +30,10 @@ package ch.psi.fda.core.messages;
* @author ebner
*
*/
-public class ComponentMetadata {
+public class ComponentMetadata implements Serializable{
+
+ private static final long serialVersionUID = 1L;
+
/**
* Global id of the component
*/
diff --git a/ch.psi.fda/src/main/java/ch/psi/fda/core/messages/ControlMessage.java b/ch.psi.fda/src/main/java/ch/psi/fda/core/messages/ControlMessage.java
index 793283c..bd95119 100644
--- a/ch.psi.fda/src/main/java/ch/psi/fda/core/messages/ControlMessage.java
+++ b/ch.psi.fda/src/main/java/ch/psi/fda/core/messages/ControlMessage.java
@@ -19,11 +19,13 @@
package ch.psi.fda.core.messages;
+
/**
* A control message that is not holding any data but
* control information (like end of loop, etc.)
* @author ebner
*
*/
-public abstract class ControlMessage extends Message {
+public abstract class ControlMessage extends Message{
+ private static final long serialVersionUID = 1L;
}
diff --git a/ch.psi.fda/src/main/java/ch/psi/fda/core/messages/DataMessage.java b/ch.psi.fda/src/main/java/ch/psi/fda/core/messages/DataMessage.java
index f7b4fcb..f117d08 100644
--- a/ch.psi.fda/src/main/java/ch/psi/fda/core/messages/DataMessage.java
+++ b/ch.psi.fda/src/main/java/ch/psi/fda/core/messages/DataMessage.java
@@ -27,7 +27,9 @@ import java.util.List;
* @author ebner
*
*/
-public class DataMessage extends Message {
+public class DataMessage extends Message{
+
+ private static final long serialVersionUID = 1L;
/**
* Data payload of the message
diff --git a/ch.psi.fda/src/main/java/ch/psi/fda/core/messages/DataMessageMetadata.java b/ch.psi.fda/src/main/java/ch/psi/fda/core/messages/DataMessageMetadata.java
index fd0f67d..1bffab7 100644
--- a/ch.psi.fda/src/main/java/ch/psi/fda/core/messages/DataMessageMetadata.java
+++ b/ch.psi.fda/src/main/java/ch/psi/fda/core/messages/DataMessageMetadata.java
@@ -19,6 +19,7 @@
package ch.psi.fda.core.messages;
+import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
@@ -27,8 +28,9 @@ import java.util.List;
* @author ebner
*
*/
-public class DataMessageMetadata {
-
+public class DataMessageMetadata implements Serializable {
+ private static final long serialVersionUID = 1L;
+
/**
* List of the metadata of the message components
*/
diff --git a/ch.psi.fda/src/main/java/ch/psi/fda/core/messages/EndOfStreamMessage.java b/ch.psi.fda/src/main/java/ch/psi/fda/core/messages/EndOfStreamMessage.java
index d9c9e9b..3dd7c83 100644
--- a/ch.psi.fda/src/main/java/ch/psi/fda/core/messages/EndOfStreamMessage.java
+++ b/ch.psi.fda/src/main/java/ch/psi/fda/core/messages/EndOfStreamMessage.java
@@ -25,8 +25,9 @@ package ch.psi.fda.core.messages;
* @author ebner
*
*/
-public class EndOfStreamMessage extends ControlMessage{
+public class EndOfStreamMessage extends ControlMessage {
+ private static final long serialVersionUID = 1L;
/**
* Intersect flag - flag to indicate that stream should be intersected
diff --git a/ch.psi.fda/src/main/java/ch/psi/fda/core/messages/Message.java b/ch.psi.fda/src/main/java/ch/psi/fda/core/messages/Message.java
index 2a36287..8e30cb8 100644
--- a/ch.psi.fda/src/main/java/ch/psi/fda/core/messages/Message.java
+++ b/ch.psi.fda/src/main/java/ch/psi/fda/core/messages/Message.java
@@ -19,10 +19,13 @@
package ch.psi.fda.core.messages;
+import java.io.Serializable;
+
/**
* Message that can be put to the data queue
* @author ebner
*
*/
-public abstract class Message {
+public abstract class Message implements Serializable{
+ private static final long serialVersionUID = 1L;
}
diff --git a/ch.psi.fda/src/main/java/ch/psi/fda/core/messages/StreamDelimiterMessage.java b/ch.psi.fda/src/main/java/ch/psi/fda/core/messages/StreamDelimiterMessage.java
index 2777e89..0e6ffd6 100644
--- a/ch.psi.fda/src/main/java/ch/psi/fda/core/messages/StreamDelimiterMessage.java
+++ b/ch.psi.fda/src/main/java/ch/psi/fda/core/messages/StreamDelimiterMessage.java
@@ -25,7 +25,7 @@ package ch.psi.fda.core.messages;
*
*/
public class StreamDelimiterMessage extends ControlMessage{
-
+ private static final long serialVersionUID = 1L;
/**
* Number of the dimension this delimiter belongs to.
*/
diff --git a/ch.psi.fda/src/main/java/ch/psi/fda/visualizer/Visualizer.java b/ch.psi.fda/src/main/java/ch/psi/fda/visualizer/Visualizer.java
index 309e06e..1836b9a 100644
--- a/ch.psi.fda/src/main/java/ch/psi/fda/visualizer/Visualizer.java
+++ b/ch.psi.fda/src/main/java/ch/psi/fda/visualizer/Visualizer.java
@@ -29,6 +29,10 @@ import javax.swing.SwingUtilities;
//import org.jfree.data.xy.XYSeries;
+
+import org.apache.commons.lang.SerializationUtils;
+import org.jeromq.ZMQ;
+
import ch.psi.fda.core.messages.DataMessage;
import ch.psi.fda.core.messages.DataQueue;
import ch.psi.fda.core.messages.StreamDelimiterMessage;
@@ -78,21 +82,29 @@ public class Visualizer {
private boolean updateAtEndOfStream = false;
public Visualizer(DataQueue queue, List vl){
- addFilterSet(queue, vl);
- }
-
- private void addFilterSet(DataQueue queue, List vl){
filterSet = mapVisualizations(queue, vl);
}
-
-
+
/**
* Visualize data
* Method blocks until visualization is done
*/
public void visualize() {
if(filterSet != null ){
- DataQueue queue = filterSet.getQueue();
+
+ ZMQ.Context context = ZMQ.context();
+ ZMQ.Socket socket = context.socket(ZMQ.SUB);
+ socket.subscribe("");
+ socket.connect("tcp://emac:9090");
+// socket.connect("inproc://visualize");
+// while(true){
+// Object message = SerializationUtils.deserialize(socket.recv());
+//// Object message = socket.recv();
+// logger.info(""+message);
+// }
+
+
+// DataQueue queue = filterSet.getQueue();
List filters = filterSet.getFilters();
int ecount = 0;
@@ -101,14 +113,15 @@ public class Visualizer {
// Read Messages
Message message = null;
- try {
- message = queue.getQueue().take();
- } catch (InterruptedException e) {
- terminate = true;
- // Reset interrupted status
- Thread.currentThread().interrupt();
-
- }
+// try {
+// message = queue.getQueue().take();
+ message = (Message) SerializationUtils.deserialize(socket.recv());
+// } catch (InterruptedException e) {
+// terminate = true;
+// // Reset interrupted status
+// Thread.currentThread().interrupt();
+//
+// }
while ( (!Thread.currentThread().isInterrupted()) && (!terminate) ) {
@@ -263,6 +276,7 @@ public class Visualizer {
clearPlot = true;
}
} else if (message instanceof EndOfStreamMessage) {
+ System.out.println("END");
ecount++;
if(terminateAtEOS){
terminate = true;
@@ -290,15 +304,20 @@ public class Visualizer {
}
// Read next message
- try {
- message = queue.getQueue().take();
- } catch (InterruptedException e) {
- terminate = true;
- // Reset interrupted status
- Thread.currentThread().interrupt();
- }
+// try {
+// message = queue.getQueue().take();
+ message = (Message) SerializationUtils.deserialize(socket.recv());
+// } catch (InterruptedException e) {
+// terminate = true;
+// // Reset interrupted status
+// Thread.currentThread().interrupt();
+// }
}
+ System.out.println("VIS DONE");
+ socket.close();
+ context.term();
}
+
logger.info("End visualization");
}
diff --git a/ch.psi.fda/src/test/java/ch/psi/fda/Receiver.java b/ch.psi.fda/src/test/java/ch/psi/fda/Receiver.java
new file mode 100644
index 0000000..9751a98
--- /dev/null
+++ b/ch.psi.fda/src/test/java/ch/psi/fda/Receiver.java
@@ -0,0 +1,43 @@
+/**
+ *
+ * Copyright 2013 Paul Scherrer Institute. All rights reserved.
+ *
+ * This code is free software: you can redistribute it and/or modify it under
+ * the terms of the GNU Lesser General Public License as published by the Free
+ * Software Foundation, either version 3 of the License, or (at your option) any
+ * later version.
+ *
+ * This code is distributed in the hope that it will be useful, but without any
+ * warranty; without even the implied warranty of merchantability or fitness for
+ * a particular purpose. See the GNU Lesser General Public License for more
+ * details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this code. If not, see .
+ *
+ */
+
+package ch.psi.fda;
+
+import java.util.logging.Logger;
+
+import org.apache.commons.lang.SerializationUtils;
+import org.jeromq.ZMQ;
+
+public class Receiver {
+
+ private static final Logger logger = Logger.getLogger(Receiver.class.getName());
+
+ public static void main(String[] args){
+ ZMQ.Context context = ZMQ.context();
+ ZMQ.Socket socket = context.socket(ZMQ.SUB);
+ socket.subscribe("");
+ socket.connect("tcp://emac:9090");
+ while(true){
+ Object message = SerializationUtils.deserialize(socket.recv());
+// Object message = socket.recv();
+ logger.info(""+message);
+ }
+ }
+
+}
diff --git a/ch.psi.fda/src/test/resources/home/.gitignore b/ch.psi.fda/src/test/resources/home/.gitignore
new file mode 100644
index 0000000..58265e5
--- /dev/null
+++ b/ch.psi.fda/src/test/resources/home/.gitignore
@@ -0,0 +1,2 @@
+/data
+/logs