From 3e38e4002d84571d38b994e8fcc939b3b55b02bc Mon Sep 17 00:00:00 2001 From: Simon Ebner Date: Thu, 3 Oct 2013 09:30:36 +0200 Subject: [PATCH] Experiment with zmq pub/sub for visualization. - after the scan somehow the zmq library still takes 100% CPU time - very strange --- ch.psi.fda/pom.xml | 12 ++++ .../main/java/ch/psi/fda/aq/Acquisition.java | 2 +- .../fda/core/collector/DataDispatcher.java | 15 ++++- .../fda/core/messages/ComponentMetadata.java | 7 ++- .../psi/fda/core/messages/ControlMessage.java | 4 +- .../ch/psi/fda/core/messages/DataMessage.java | 4 +- .../core/messages/DataMessageMetadata.java | 6 +- .../fda/core/messages/EndOfStreamMessage.java | 3 +- .../ch/psi/fda/core/messages/Message.java | 5 +- .../core/messages/StreamDelimiterMessage.java | 2 +- .../ch/psi/fda/visualizer/Visualizer.java | 63 ++++++++++++------- .../src/test/java/ch/psi/fda/Receiver.java | 43 +++++++++++++ ch.psi.fda/src/test/resources/home/.gitignore | 2 + 13 files changed, 136 insertions(+), 32 deletions(-) create mode 100644 ch.psi.fda/src/test/java/ch/psi/fda/Receiver.java create mode 100644 ch.psi.fda/src/test/resources/home/.gitignore diff --git a/ch.psi.fda/pom.xml b/ch.psi.fda/pom.xml index f392a4f..0fd7829 100644 --- a/ch.psi.fda/pom.xml +++ b/ch.psi.fda/pom.xml @@ -6,6 +6,18 @@ 1.1.40 + + org.jeromq + jeromq + 0.2.0 + + + + commons-lang + commons-lang + 2.6 + + ch.psi jcae diff --git a/ch.psi.fda/src/main/java/ch/psi/fda/aq/Acquisition.java b/ch.psi.fda/src/main/java/ch/psi/fda/aq/Acquisition.java index b5d6edb..7b36884 100644 --- a/ch.psi.fda/src/main/java/ch/psi/fda/aq/Acquisition.java +++ b/ch.psi.fda/src/main/java/ch/psi/fda/aq/Acquisition.java @@ -264,7 +264,7 @@ public class Acquisition { if (getQueue) { vdq = new DataQueue(new LinkedBlockingQueue(1000), manipulator.getOutQueue().getDataMessageMetadata()); // Create bounded queue to prevent // running out of memory ... - dispatcher.getOutQueues().add(vdq); +// dispatcher.getOutQueues().add(vdq); } // Add queue for serializer to dispatcher diff --git a/ch.psi.fda/src/main/java/ch/psi/fda/core/collector/DataDispatcher.java b/ch.psi.fda/src/main/java/ch/psi/fda/core/collector/DataDispatcher.java index d69c745..395f650 100644 --- a/ch.psi.fda/src/main/java/ch/psi/fda/core/collector/DataDispatcher.java +++ b/ch.psi.fda/src/main/java/ch/psi/fda/core/collector/DataDispatcher.java @@ -22,6 +22,9 @@ package ch.psi.fda.core.collector; import java.util.ArrayList; import java.util.List; +import org.apache.commons.lang.SerializationUtils; +import org.jeromq.ZMQ; + import ch.psi.fda.core.messages.DataQueue; import ch.psi.fda.core.messages.EndOfStreamMessage; import ch.psi.fda.core.messages.Message; @@ -48,6 +51,12 @@ public class DataDispatcher implements Runnable{ public void run() { try{ + + ZMQ.Context context = ZMQ.context(); + ZMQ.Socket socket = context.socket(ZMQ.PUB); + socket.bind("tcp://*:9090"); +// socket.bind("inproc://visualize"); + // TODO Need to synchronize message metadata // for(DataQueue q: outQueues){ // } @@ -55,11 +64,11 @@ public class DataDispatcher implements Runnable{ // Dispatch Messages Message message = queue.getQueue().take(); while(!(message instanceof EndOfStreamMessage)){ - // Clone message ... for(DataQueue q: outQueues){ q.getQueue().put(message); } + socket.send(SerializationUtils.serialize(message)); // Read next message message = queue.getQueue().take(); @@ -69,6 +78,10 @@ public class DataDispatcher implements Runnable{ for(DataQueue q: outQueues){ q.getQueue().put(message); } + socket.send(SerializationUtils.serialize(message)); + + socket.close(); + context.term(); } catch (InterruptedException e) { // TODO Stop loop and exit logic instead of throwing an Exception diff --git a/ch.psi.fda/src/main/java/ch/psi/fda/core/messages/ComponentMetadata.java b/ch.psi.fda/src/main/java/ch/psi/fda/core/messages/ComponentMetadata.java index 11004d7..ae72fa3 100644 --- a/ch.psi.fda/src/main/java/ch/psi/fda/core/messages/ComponentMetadata.java +++ b/ch.psi.fda/src/main/java/ch/psi/fda/core/messages/ComponentMetadata.java @@ -19,6 +19,8 @@ package ch.psi.fda.core.messages; +import java.io.Serializable; + /** * Metadata of a component of a message. Each component has a global id. * Optionally the component can also belong to a dimension. However, depending on the @@ -28,7 +30,10 @@ package ch.psi.fda.core.messages; * @author ebner * */ -public class ComponentMetadata { +public class ComponentMetadata implements Serializable{ + + private static final long serialVersionUID = 1L; + /** * Global id of the component */ diff --git a/ch.psi.fda/src/main/java/ch/psi/fda/core/messages/ControlMessage.java b/ch.psi.fda/src/main/java/ch/psi/fda/core/messages/ControlMessage.java index 793283c..bd95119 100644 --- a/ch.psi.fda/src/main/java/ch/psi/fda/core/messages/ControlMessage.java +++ b/ch.psi.fda/src/main/java/ch/psi/fda/core/messages/ControlMessage.java @@ -19,11 +19,13 @@ package ch.psi.fda.core.messages; + /** * A control message that is not holding any data but * control information (like end of loop, etc.) * @author ebner * */ -public abstract class ControlMessage extends Message { +public abstract class ControlMessage extends Message{ + private static final long serialVersionUID = 1L; } diff --git a/ch.psi.fda/src/main/java/ch/psi/fda/core/messages/DataMessage.java b/ch.psi.fda/src/main/java/ch/psi/fda/core/messages/DataMessage.java index f7b4fcb..f117d08 100644 --- a/ch.psi.fda/src/main/java/ch/psi/fda/core/messages/DataMessage.java +++ b/ch.psi.fda/src/main/java/ch/psi/fda/core/messages/DataMessage.java @@ -27,7 +27,9 @@ import java.util.List; * @author ebner * */ -public class DataMessage extends Message { +public class DataMessage extends Message{ + + private static final long serialVersionUID = 1L; /** * Data payload of the message diff --git a/ch.psi.fda/src/main/java/ch/psi/fda/core/messages/DataMessageMetadata.java b/ch.psi.fda/src/main/java/ch/psi/fda/core/messages/DataMessageMetadata.java index fd0f67d..1bffab7 100644 --- a/ch.psi.fda/src/main/java/ch/psi/fda/core/messages/DataMessageMetadata.java +++ b/ch.psi.fda/src/main/java/ch/psi/fda/core/messages/DataMessageMetadata.java @@ -19,6 +19,7 @@ package ch.psi.fda.core.messages; +import java.io.Serializable; import java.util.ArrayList; import java.util.List; @@ -27,8 +28,9 @@ import java.util.List; * @author ebner * */ -public class DataMessageMetadata { - +public class DataMessageMetadata implements Serializable { + private static final long serialVersionUID = 1L; + /** * List of the metadata of the message components */ diff --git a/ch.psi.fda/src/main/java/ch/psi/fda/core/messages/EndOfStreamMessage.java b/ch.psi.fda/src/main/java/ch/psi/fda/core/messages/EndOfStreamMessage.java index d9c9e9b..3dd7c83 100644 --- a/ch.psi.fda/src/main/java/ch/psi/fda/core/messages/EndOfStreamMessage.java +++ b/ch.psi.fda/src/main/java/ch/psi/fda/core/messages/EndOfStreamMessage.java @@ -25,8 +25,9 @@ package ch.psi.fda.core.messages; * @author ebner * */ -public class EndOfStreamMessage extends ControlMessage{ +public class EndOfStreamMessage extends ControlMessage { + private static final long serialVersionUID = 1L; /** * Intersect flag - flag to indicate that stream should be intersected diff --git a/ch.psi.fda/src/main/java/ch/psi/fda/core/messages/Message.java b/ch.psi.fda/src/main/java/ch/psi/fda/core/messages/Message.java index 2a36287..8e30cb8 100644 --- a/ch.psi.fda/src/main/java/ch/psi/fda/core/messages/Message.java +++ b/ch.psi.fda/src/main/java/ch/psi/fda/core/messages/Message.java @@ -19,10 +19,13 @@ package ch.psi.fda.core.messages; +import java.io.Serializable; + /** * Message that can be put to the data queue * @author ebner * */ -public abstract class Message { +public abstract class Message implements Serializable{ + private static final long serialVersionUID = 1L; } diff --git a/ch.psi.fda/src/main/java/ch/psi/fda/core/messages/StreamDelimiterMessage.java b/ch.psi.fda/src/main/java/ch/psi/fda/core/messages/StreamDelimiterMessage.java index 2777e89..0e6ffd6 100644 --- a/ch.psi.fda/src/main/java/ch/psi/fda/core/messages/StreamDelimiterMessage.java +++ b/ch.psi.fda/src/main/java/ch/psi/fda/core/messages/StreamDelimiterMessage.java @@ -25,7 +25,7 @@ package ch.psi.fda.core.messages; * */ public class StreamDelimiterMessage extends ControlMessage{ - + private static final long serialVersionUID = 1L; /** * Number of the dimension this delimiter belongs to. */ diff --git a/ch.psi.fda/src/main/java/ch/psi/fda/visualizer/Visualizer.java b/ch.psi.fda/src/main/java/ch/psi/fda/visualizer/Visualizer.java index 309e06e..1836b9a 100644 --- a/ch.psi.fda/src/main/java/ch/psi/fda/visualizer/Visualizer.java +++ b/ch.psi.fda/src/main/java/ch/psi/fda/visualizer/Visualizer.java @@ -29,6 +29,10 @@ import javax.swing.SwingUtilities; //import org.jfree.data.xy.XYSeries; + +import org.apache.commons.lang.SerializationUtils; +import org.jeromq.ZMQ; + import ch.psi.fda.core.messages.DataMessage; import ch.psi.fda.core.messages.DataQueue; import ch.psi.fda.core.messages.StreamDelimiterMessage; @@ -78,21 +82,29 @@ public class Visualizer { private boolean updateAtEndOfStream = false; public Visualizer(DataQueue queue, List vl){ - addFilterSet(queue, vl); - } - - private void addFilterSet(DataQueue queue, List vl){ filterSet = mapVisualizations(queue, vl); } - - + /** * Visualize data * Method blocks until visualization is done */ public void visualize() { if(filterSet != null ){ - DataQueue queue = filterSet.getQueue(); + + ZMQ.Context context = ZMQ.context(); + ZMQ.Socket socket = context.socket(ZMQ.SUB); + socket.subscribe(""); + socket.connect("tcp://emac:9090"); +// socket.connect("inproc://visualize"); +// while(true){ +// Object message = SerializationUtils.deserialize(socket.recv()); +//// Object message = socket.recv(); +// logger.info(""+message); +// } + + +// DataQueue queue = filterSet.getQueue(); List filters = filterSet.getFilters(); int ecount = 0; @@ -101,14 +113,15 @@ public class Visualizer { // Read Messages Message message = null; - try { - message = queue.getQueue().take(); - } catch (InterruptedException e) { - terminate = true; - // Reset interrupted status - Thread.currentThread().interrupt(); - - } +// try { +// message = queue.getQueue().take(); + message = (Message) SerializationUtils.deserialize(socket.recv()); +// } catch (InterruptedException e) { +// terminate = true; +// // Reset interrupted status +// Thread.currentThread().interrupt(); +// +// } while ( (!Thread.currentThread().isInterrupted()) && (!terminate) ) { @@ -263,6 +276,7 @@ public class Visualizer { clearPlot = true; } } else if (message instanceof EndOfStreamMessage) { + System.out.println("END"); ecount++; if(terminateAtEOS){ terminate = true; @@ -290,15 +304,20 @@ public class Visualizer { } // Read next message - try { - message = queue.getQueue().take(); - } catch (InterruptedException e) { - terminate = true; - // Reset interrupted status - Thread.currentThread().interrupt(); - } +// try { +// message = queue.getQueue().take(); + message = (Message) SerializationUtils.deserialize(socket.recv()); +// } catch (InterruptedException e) { +// terminate = true; +// // Reset interrupted status +// Thread.currentThread().interrupt(); +// } } + System.out.println("VIS DONE"); + socket.close(); + context.term(); } + logger.info("End visualization"); } diff --git a/ch.psi.fda/src/test/java/ch/psi/fda/Receiver.java b/ch.psi.fda/src/test/java/ch/psi/fda/Receiver.java new file mode 100644 index 0000000..9751a98 --- /dev/null +++ b/ch.psi.fda/src/test/java/ch/psi/fda/Receiver.java @@ -0,0 +1,43 @@ +/** + * + * Copyright 2013 Paul Scherrer Institute. All rights reserved. + * + * This code is free software: you can redistribute it and/or modify it under + * the terms of the GNU Lesser General Public License as published by the Free + * Software Foundation, either version 3 of the License, or (at your option) any + * later version. + * + * This code is distributed in the hope that it will be useful, but without any + * warranty; without even the implied warranty of merchantability or fitness for + * a particular purpose. See the GNU Lesser General Public License for more + * details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this code. If not, see . + * + */ + +package ch.psi.fda; + +import java.util.logging.Logger; + +import org.apache.commons.lang.SerializationUtils; +import org.jeromq.ZMQ; + +public class Receiver { + + private static final Logger logger = Logger.getLogger(Receiver.class.getName()); + + public static void main(String[] args){ + ZMQ.Context context = ZMQ.context(); + ZMQ.Socket socket = context.socket(ZMQ.SUB); + socket.subscribe(""); + socket.connect("tcp://emac:9090"); + while(true){ + Object message = SerializationUtils.deserialize(socket.recv()); +// Object message = socket.recv(); + logger.info(""+message); + } + } + +} diff --git a/ch.psi.fda/src/test/resources/home/.gitignore b/ch.psi.fda/src/test/resources/home/.gitignore new file mode 100644 index 0000000..58265e5 --- /dev/null +++ b/ch.psi.fda/src/test/resources/home/.gitignore @@ -0,0 +1,2 @@ +/data +/logs