Experiment with zmq pub/sub for visualization.

- after the scan somehow the zmq library still takes 100% CPU time -
very strange
This commit is contained in:
2013-10-03 09:30:36 +02:00
parent 0958b76bb0
commit 3e38e4002d
13 changed files with 136 additions and 32 deletions
+12
View File
@@ -6,6 +6,18 @@
<version>1.1.40</version>
<dependencies>
<dependency>
<groupId>org.jeromq</groupId>
<artifactId>jeromq</artifactId>
<version>0.2.0</version>
</dependency>
<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
<version>2.6</version>
</dependency>
<dependency>
<groupId>ch.psi</groupId>
<artifactId>jcae</artifactId>
@@ -264,7 +264,7 @@ public class Acquisition {
if (getQueue) {
vdq = new DataQueue(new LinkedBlockingQueue<Message>(1000), manipulator.getOutQueue().getDataMessageMetadata()); // Create bounded queue to prevent
// running out of memory ...
dispatcher.getOutQueues().add(vdq);
// dispatcher.getOutQueues().add(vdq);
}
// Add queue for serializer to dispatcher
@@ -22,6 +22,9 @@ package ch.psi.fda.core.collector;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.lang.SerializationUtils;
import org.jeromq.ZMQ;
import ch.psi.fda.core.messages.DataQueue;
import ch.psi.fda.core.messages.EndOfStreamMessage;
import ch.psi.fda.core.messages.Message;
@@ -48,6 +51,12 @@ public class DataDispatcher implements Runnable{
public void run() {
try{
ZMQ.Context context = ZMQ.context();
ZMQ.Socket socket = context.socket(ZMQ.PUB);
socket.bind("tcp://*:9090");
// socket.bind("inproc://visualize");
// TODO Need to synchronize message metadata
// for(DataQueue q: outQueues){
// }
@@ -55,11 +64,11 @@ public class DataDispatcher implements Runnable{
// Dispatch Messages
Message message = queue.getQueue().take();
while(!(message instanceof EndOfStreamMessage)){
// Clone message ...
for(DataQueue q: outQueues){
q.getQueue().put(message);
}
socket.send(SerializationUtils.serialize(message));
// Read next message
message = queue.getQueue().take();
@@ -69,6 +78,10 @@ public class DataDispatcher implements Runnable{
for(DataQueue q: outQueues){
q.getQueue().put(message);
}
socket.send(SerializationUtils.serialize(message));
socket.close();
context.term();
} catch (InterruptedException e) {
// TODO Stop loop and exit logic instead of throwing an Exception
@@ -19,6 +19,8 @@
package ch.psi.fda.core.messages;
import java.io.Serializable;
/**
* Metadata of a component of a message. Each component has a global id.
* Optionally the component can also belong to a dimension. However, depending on the
@@ -28,7 +30,10 @@ package ch.psi.fda.core.messages;
* @author ebner
*
*/
public class ComponentMetadata {
public class ComponentMetadata implements Serializable{
private static final long serialVersionUID = 1L;
/**
* Global id of the component
*/
@@ -19,11 +19,13 @@
package ch.psi.fda.core.messages;
/**
* A control message that is not holding any data but
* control information (like end of loop, etc.)
* @author ebner
*
*/
public abstract class ControlMessage extends Message {
public abstract class ControlMessage extends Message{
private static final long serialVersionUID = 1L;
}
@@ -27,7 +27,9 @@ import java.util.List;
* @author ebner
*
*/
public class DataMessage extends Message {
public class DataMessage extends Message{
private static final long serialVersionUID = 1L;
/**
* Data payload of the message
@@ -19,6 +19,7 @@
package ch.psi.fda.core.messages;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
@@ -27,8 +28,9 @@ import java.util.List;
* @author ebner
*
*/
public class DataMessageMetadata {
public class DataMessageMetadata implements Serializable {
private static final long serialVersionUID = 1L;
/**
* List of the metadata of the message components
*/
@@ -25,8 +25,9 @@ package ch.psi.fda.core.messages;
* @author ebner
*
*/
public class EndOfStreamMessage extends ControlMessage{
public class EndOfStreamMessage extends ControlMessage {
private static final long serialVersionUID = 1L;
/**
* Intersect flag - flag to indicate that stream should be intersected
@@ -19,10 +19,13 @@
package ch.psi.fda.core.messages;
import java.io.Serializable;
/**
* Message that can be put to the data queue
* @author ebner
*
*/
public abstract class Message {
public abstract class Message implements Serializable{
private static final long serialVersionUID = 1L;
}
@@ -25,7 +25,7 @@ package ch.psi.fda.core.messages;
*
*/
public class StreamDelimiterMessage extends ControlMessage{
private static final long serialVersionUID = 1L;
/**
* Number of the dimension this delimiter belongs to.
*/
@@ -29,6 +29,10 @@ import javax.swing.SwingUtilities;
//import org.jfree.data.xy.XYSeries;
import org.apache.commons.lang.SerializationUtils;
import org.jeromq.ZMQ;
import ch.psi.fda.core.messages.DataMessage;
import ch.psi.fda.core.messages.DataQueue;
import ch.psi.fda.core.messages.StreamDelimiterMessage;
@@ -78,21 +82,29 @@ public class Visualizer {
private boolean updateAtEndOfStream = false;
public Visualizer(DataQueue queue, List<Visualization> vl){
addFilterSet(queue, vl);
}
private void addFilterSet(DataQueue queue, List<Visualization> vl){
filterSet = mapVisualizations(queue, vl);
}
/**
* Visualize data
* Method blocks until visualization is done
*/
public void visualize() {
if(filterSet != null ){
DataQueue queue = filterSet.getQueue();
ZMQ.Context context = ZMQ.context();
ZMQ.Socket socket = context.socket(ZMQ.SUB);
socket.subscribe("");
socket.connect("tcp://emac:9090");
// socket.connect("inproc://visualize");
// while(true){
// Object message = SerializationUtils.deserialize(socket.recv());
//// Object message = socket.recv();
// logger.info(""+message);
// }
// DataQueue queue = filterSet.getQueue();
List<SeriesDataFilter> filters = filterSet.getFilters();
int ecount = 0;
@@ -101,14 +113,15 @@ public class Visualizer {
// Read Messages
Message message = null;
try {
message = queue.getQueue().take();
} catch (InterruptedException e) {
terminate = true;
// Reset interrupted status
Thread.currentThread().interrupt();
}
// try {
// message = queue.getQueue().take();
message = (Message) SerializationUtils.deserialize(socket.recv());
// } catch (InterruptedException e) {
// terminate = true;
// // Reset interrupted status
// Thread.currentThread().interrupt();
//
// }
while ( (!Thread.currentThread().isInterrupted()) && (!terminate) ) {
@@ -263,6 +276,7 @@ public class Visualizer {
clearPlot = true;
}
} else if (message instanceof EndOfStreamMessage) {
System.out.println("END");
ecount++;
if(terminateAtEOS){
terminate = true;
@@ -290,15 +304,20 @@ public class Visualizer {
}
// Read next message
try {
message = queue.getQueue().take();
} catch (InterruptedException e) {
terminate = true;
// Reset interrupted status
Thread.currentThread().interrupt();
}
// try {
// message = queue.getQueue().take();
message = (Message) SerializationUtils.deserialize(socket.recv());
// } catch (InterruptedException e) {
// terminate = true;
// // Reset interrupted status
// Thread.currentThread().interrupt();
// }
}
System.out.println("VIS DONE");
socket.close();
context.term();
}
logger.info("End visualization");
}
@@ -0,0 +1,43 @@
/**
*
* Copyright 2013 Paul Scherrer Institute. All rights reserved.
*
* This code is free software: you can redistribute it and/or modify it under
* the terms of the GNU Lesser General Public License as published by the Free
* Software Foundation, either version 3 of the License, or (at your option) any
* later version.
*
* This code is distributed in the hope that it will be useful, but without any
* warranty; without even the implied warranty of merchantability or fitness for
* a particular purpose. See the GNU Lesser General Public License for more
* details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this code. If not, see <http://www.gnu.org/licenses/>.
*
*/
package ch.psi.fda;
import java.util.logging.Logger;
import org.apache.commons.lang.SerializationUtils;
import org.jeromq.ZMQ;
public class Receiver {
private static final Logger logger = Logger.getLogger(Receiver.class.getName());
public static void main(String[] args){
ZMQ.Context context = ZMQ.context();
ZMQ.Socket socket = context.socket(ZMQ.SUB);
socket.subscribe("");
socket.connect("tcp://emac:9090");
while(true){
Object message = SerializationUtils.deserialize(socket.recv());
// Object message = socket.recv();
logger.info(""+message);
}
}
}
@@ -0,0 +1,2 @@
/data
/logs