diff --git a/ch.psi.imagej.zeromq/pom.xml b/ch.psi.imagej.zeromq/pom.xml index aa47210..63ae9fd 100644 --- a/ch.psi.imagej.zeromq/pom.xml +++ b/ch.psi.imagej.zeromq/pom.xml @@ -3,7 +3,7 @@ 4.0.0 ch.psi.zmq ch.psi.zmq.imagej - 0.1.1 + 0.2.0 diff --git a/ch.psi.imagej.zeromq/src/main/java/ch/psi/zmq/imagej/Collector.java b/ch.psi.imagej.zeromq/src/main/java/ch/psi/zmq/imagej/Collector.java new file mode 100644 index 0000000..3ec3cb5 --- /dev/null +++ b/ch.psi.imagej.zeromq/src/main/java/ch/psi/zmq/imagej/Collector.java @@ -0,0 +1,228 @@ +/** + * + * Copyright 2013 Paul Scherrer Institute. All rights reserved. + * + * This code is free software: you can redistribute it and/or modify it under + * the terms of the GNU Lesser General Public License as published by the Free + * Software Foundation, either version 3 of the License, or (at your option) any + * later version. + * + * This code is distributed in the hope that it will be useful, but without any + * warranty; without even the implied warranty of merchantability or fitness for + * a particular purpose. See the GNU Lesser General Public License for more + * details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this code. If not, see . + * + */ +package ch.psi.zmq.imagej; + +import ij.ImagePlus; +import ij.process.ImageProcessor; +import ij.process.ShortProcessor; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.jeromq.ZMQ; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; + +/** + * @author ebner + * + */ +public class Collector implements Runnable{ + + + private static final Logger logger = Logger.getLogger(Collector.class.getName()); + + private static final int HIGH_WATER_MARK = 10; + + private boolean flipX = false; + private boolean flipY = false; + + private int imageSizeX = 2560; + private int imageSizeY = 2160; + + private ZMQ.Context context; + private ZMQ.Socket socket; + private ImagePlus img; + + private ObjectMapper mapper = new ObjectMapper(new JsonFactory()); + + + private int numImageUpdates; + + private HeaderInfo hinfo; + private String hostname; + private int port; + private String method; + + public Collector(HeaderInfo hinfo, String hostname, int port, String method){ + this.hinfo = hinfo; + this.hostname = hostname; + this.port = port; + this.method = method; + } + + /* (non-Javadoc) + * @see java.lang.Runnable#run() + */ + public void run() { + numImageUpdates = 0; // Start counting at 0 + + if(img!=null){ + img.close(); + img=null; + } + + + context = ZMQ.context(); + if(method.equals("PULL")){ + socket = context.socket(ZMQ.PULL); + } + else if(method.equals("SUB")){ + socket = context.socket(ZMQ.SUB); + socket.subscribe(""); // Subscribe to all topics + } + else{ + logger.severe("Method not supported"); + // Terminate context and return + context.term(); + return; + } + socket.setHWM(HIGH_WATER_MARK); + socket.connect("tcp://"+hostname+":"+port); + + logger.info("Connected to: tcp://"+hostname+":"+port); + + while(true){ // Can only be terminated by closing the socket + byte[] header = socket.recv(); + byte[] content = null; + if (socket.hasReceiveMore()) { + content = socket.recv(); + } + + readHeader(header); + readContent(content); + } + } + + public void terminate(){ + logger.info("Terminate Collector"); + + try{ + + socket.close(); + context.term(); + } + catch(Exception ex){ // This exception can savely be ignored (somewhat most of the time an exception is expected) + ex.printStackTrace(); + } + + if(img!=null){ + img.close(); + } + + logger.info("Collector terminated"); + } + + @SuppressWarnings("unchecked") + private void readHeader(byte[] h){ + try{ + String header = new String(h); +// hinfo.setHeader(header); + Map m = mapper.readValue(header, new TypeReference>(){}); + if(((List) m.get("htype")).contains("array-1.0")){ // currently we only support array-1.0 message types + List shape = (List) m.get("shape"); + int nImageSizeX = shape.get(1); + int nImageSizeY = shape.get(0); + if(imageSizeX!=nImageSizeX || imageSizeY!=nImageSizeY){ + imageSizeX = nImageSizeX; + imageSizeY = nImageSizeY; + + img.close(); + img=null; + } + + if (img == null) { + // TODO eventually use ByteProcessor or BinaryProcessor + // BinaryProcessor p = new ij.process.BinaryProcessor(new + // ByteProcessor(imageSizeX, imageSizeY)); + img = new ImagePlus("", new ShortProcessor(imageSizeX, imageSizeY)); + img.show(); + } + img.setTitle(header); + hinfo.setText(m); + + } + else{ + logger.info("Header type is not supported ..."); + if(img!=null){ + img.close(); + img=null; + } + } + } + catch(IOException e){ + logger.log(Level.SEVERE, "Unable to parse header", e); + } + +// logger.info(sheader); + } + + private void readContent(byte[] content) { + try { + if(content!=null && img!=null){ + // TODO Check whether this is needed + short[] shorts = new short[content.length / 2]; + ByteBuffer.wrap(content).order(ByteOrder.LITTLE_ENDIAN).asShortBuffer().get(shorts); + + ImageProcessor ip = img.getProcessor(); + ip.setPixels(shorts); + if(flipX){ + ip.flipHorizontal(); + } + if(flipY){ + ip.flipVertical(); + } + + img.updateAndDraw(); + numImageUpdates++; + } + } catch (Exception ex) { + logger.log(Level.SEVERE, "UpdateImage got exception", ex); + } + } + + + + public boolean isFlipX() { + return flipX; + } + public void setFlipX(boolean flipX) { + this.flipX = flipX; + } + public boolean isFlipY() { + return flipY; + } + public void setFlipY(boolean flipY) { + this.flipY = flipY; + } + public int getNumImageUpdates() { + return numImageUpdates; + } + public void setNumImageUpdates(int numImageUpdates) { + this.numImageUpdates = numImageUpdates; + } +} diff --git a/ch.psi.imagej.zeromq/src/main/java/ch/psi/zmq/imagej/ZeroMQViewer.java b/ch.psi.imagej.zeromq/src/main/java/ch/psi/zmq/imagej/ZeroMQViewer.java index cd90d35..9bb7ea8 100644 --- a/ch.psi.imagej.zeromq/src/main/java/ch/psi/zmq/imagej/ZeroMQViewer.java +++ b/ch.psi.imagej.zeromq/src/main/java/ch/psi/zmq/imagej/ZeroMQViewer.java @@ -5,61 +5,32 @@ package ch.psi.zmq.imagej; // Tim Madden, APS // Mark Rivers, University of Chicago import ij.*; -import ij.process.*; import java.awt.*; import ij.plugin.*; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.ByteOrder; import java.awt.event.*; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.Semaphore; import java.util.logging.Level; import java.util.logging.Logger; import javax.swing.*; import javax.swing.Timer; -import org.jeromq.ZMQ; - -import com.fasterxml.jackson.core.JsonFactory; -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.ObjectMapper; - public class ZeroMQViewer implements PlugIn { private static final Logger logger = Logger.getLogger(ZeroMQViewer.class.getName()); - private static final int HIGH_WATER_MARK = 10; - - private ImagePlus img; - - private boolean flipX = false; - private boolean flipY = false; - - private int imageSizeX = 2560; - private int imageSizeY = 2160; - // These are used for the frames/second calculation private long prevTime; - private int numImageUpdates; + private JFrame frame; - // private JLabel fpsText; private volatile boolean isPluginRunning; - private volatile boolean collect; private Timer timer; - private ZMQ.Context context; - private ZMQ.Socket socket; - private JPanel panel_1; private JLabel lblNewLabel; private JLabel labelFrameRate; @@ -69,12 +40,15 @@ public class ZeroMQViewer implements PlugIn { private JTextField textPort; private JButton btnStart; - private Semaphore semaphore = new Semaphore(1); private JLabel lblMethod; private JComboBox comboBoxMethod; - private ObjectMapper mapper = new ObjectMapper(new JsonFactory()); private HeaderInfo hinfo = new HeaderInfo(); + private boolean flipX = false; + private boolean flipY = false; + + private Collector collector; + private JLabel lblFlip; private JPanel panel; private JCheckBox chckbxX; @@ -86,80 +60,36 @@ public class ZeroMQViewer implements PlugIn { isPluginRunning = true; prevTime = System.currentTimeMillis(); - numImageUpdates = 0; - semaphore.acquire(); // block semaphore + // Update frame rate + int timerDelay = 2000; // 2 seconds + timer = new Timer(timerDelay, new ActionListener() { + public void actionPerformed(ActionEvent event) { + if(collector != null){ + long time = System.currentTimeMillis(); + double fps = 1000. * collector.getNumImageUpdates() / (double) (time - prevTime); + labelFrameRate.setText(String.format("%.1f", fps)); + prevTime = time; + collector.setNumImageUpdates(0); + } + } + }); + timer.start(); + + // Show Viewer GUI javax.swing.SwingUtilities.invokeLater(new Runnable() { public void run() { createAndShowGUI(); } }); - - while (isPluginRunning) { - semaphore.acquire(); - if(!isPluginRunning){ // if plugin was terminated while waiting for the semaphore exit loop - break; - } - collect=true; - hinfo.setVisible(true); - try{ - if(img!=null){ - img.close(); - img=null; - } - String hostname = textHostname.getText(); - int port = Integer.parseInt(textPort.getText()); - String method = (String) comboBoxMethod.getSelectedItem(); - - context = ZMQ.context(); - if(method.equals("PULL")){ - socket = context.socket(ZMQ.PULL); - } - else if(method.equals("SUB")){ - socket = context.socket(ZMQ.SUB); - socket.subscribe(""); // Subscribe to all topics - } - else{ - logger.severe("Method not supported"); - collect=false; - } - socket.setHWM(HIGH_WATER_MARK); - socket.connect("tcp://"+hostname+":"+port); - - logger.info("Connected to: tcp://"+hostname+":"+port); - - while(collect){ - byte[] header = socket.recv(); - byte[] content = null; - if (socket.hasReceiveMore()) { - content = socket.recv(); - } - - readHeader(header); - readContent(content); - } - } - catch(Exception e){ - logger.log(Level.SEVERE, "",e); - } - finally{ - try{ - logger.info("Close connection"); - socket.close(); - context.term(); - logger.info("Connection closed"); - } - catch(Exception e){ - } - } + while (isPluginRunning) { // Keep plugin running + Thread.sleep(1000); } timer.stop(); - if(img!=null){ - img.close(); - } + frame.setVisible(false); IJ.showStatus("Exiting ZeroMQ Viewer"); @@ -170,73 +100,7 @@ public class ZeroMQViewer implements PlugIn { } } - @SuppressWarnings("unchecked") - private void readHeader(byte[] h){ - try{ - String header = new String(h); -// hinfo.setHeader(header); - Map m = mapper.readValue(header, new TypeReference>(){}); - if(((List) m.get("htype")).contains("array-1.0")){ // currently we only support array-1.0 message types - List shape = (List) m.get("shape"); - int nImageSizeX = shape.get(1); - int nImageSizeY = shape.get(0); - if(imageSizeX!=nImageSizeX || imageSizeY!=nImageSizeY){ - imageSizeX = nImageSizeX; - imageSizeY = nImageSizeY; - - img.close(); - img=null; - } - - if (img == null) { - // TODO eventually use ByteProcessor or BinaryProcessor - // BinaryProcessor p = new ij.process.BinaryProcessor(new - // ByteProcessor(imageSizeX, imageSizeY)); - img = new ImagePlus("", new ShortProcessor(imageSizeX, imageSizeY)); - img.show(); - } - img.setTitle(header); - hinfo.setText(m); - - } - else{ - logger.info("Header type is not supported ..."); - if(img!=null){ - img.close(); - img=null; - } - } - } - catch(IOException e){ - logger.log(Level.SEVERE, "Unable to parse header", e); - } - -// logger.info(sheader); - } - - private void readContent(byte[] content) { - try { - if(content!=null && img!=null){ - // TODO Check whether this is needed - short[] shorts = new short[content.length / 2]; - ByteBuffer.wrap(content).order(ByteOrder.LITTLE_ENDIAN).asShortBuffer().get(shorts); - - ImageProcessor ip = img.getProcessor(); - ip.setPixels(shorts); - if(flipX){ - ip.flipHorizontal(); - } - if(flipY){ - ip.flipVertical(); - } - - img.updateAndDraw(); - numImageUpdates++; - } - } catch (Exception ex) { - logger.log(Level.SEVERE, "UpdateImage got exception", ex); - } - } + /** * Create the GUI and show it. For thread safety, this method should be @@ -333,17 +197,26 @@ public class ZeroMQViewer implements PlugIn { public void actionPerformed(ActionEvent e) { if(btnStart.getText().equals("Start")){ // Start data acquisition - semaphore.release(); + + hinfo.setVisible(true); + + String hostname = textHostname.getText(); + int port = Integer.parseInt(textPort.getText()); + String method = (String) comboBoxMethod.getSelectedItem(); + collector = new Collector(hinfo, hostname, port, method); + collector.setFlipX(flipX); + collector.setFlipY(flipY); + new Thread(collector).start(); + btnStart.setText("Stop"); } else{ // Stop data acquisition - collect = false; - try{ - socket.notifyAll(); - } - catch(Exception ex){ // This exception can savely be ignored (somewhat most of the time an exception is expected) - } + + hinfo.setVisible(false); + + collector.terminate(); + btnStart.setText("Start"); } } @@ -390,27 +263,11 @@ public class ZeroMQViewer implements PlugIn { frame.pack(); frame.addWindowListener(new FrameExitListener()); frame.setVisible(true); - - - // Update frame rate - int timerDelay = 2000; // 2 seconds - timer = new Timer(timerDelay, new ActionListener() { - public void actionPerformed(ActionEvent event) { - long time = System.currentTimeMillis(); - double fps = 1000. * numImageUpdates / (double) (time - prevTime); - labelFrameRate.setText(String.format("%.1f", fps)); - prevTime = time; - numImageUpdates = 0; - } - }); - timer.start(); - } public class FrameExitListener extends WindowAdapter { public void windowClosing(WindowEvent event) { isPluginRunning = false; - semaphore.release(); // release the wait semaphore in case the plugin is stucked in there. } } }