Fixed stop/start problem

fix #1
This commit is contained in:
ebner 2013-09-26 15:54:31 +02:00
parent 6779bacf38
commit 24d22c5235
3 changed files with 271 additions and 186 deletions

View File

@ -3,7 +3,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>ch.psi.zmq</groupId>
<artifactId>ch.psi.zmq.imagej</artifactId>
<version>0.1.1</version>
<version>0.2.0</version>
<dependencies>
<dependency>

View File

@ -0,0 +1,228 @@
/**
*
* Copyright 2013 Paul Scherrer Institute. All rights reserved.
*
* This code is free software: you can redistribute it and/or modify it under
* the terms of the GNU Lesser General Public License as published by the Free
* Software Foundation, either version 3 of the License, or (at your option) any
* later version.
*
* This code is distributed in the hope that it will be useful, but without any
* warranty; without even the implied warranty of merchantability or fitness for
* a particular purpose. See the GNU Lesser General Public License for more
* details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this code. If not, see <http://www.gnu.org/licenses/>.
*
*/
package ch.psi.zmq.imagej;
import ij.ImagePlus;
import ij.process.ImageProcessor;
import ij.process.ShortProcessor;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.jeromq.ZMQ;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
/**
* @author ebner
*
*/
public class Collector implements Runnable{
private static final Logger logger = Logger.getLogger(Collector.class.getName());
private static final int HIGH_WATER_MARK = 10;
private boolean flipX = false;
private boolean flipY = false;
private int imageSizeX = 2560;
private int imageSizeY = 2160;
private ZMQ.Context context;
private ZMQ.Socket socket;
private ImagePlus img;
private ObjectMapper mapper = new ObjectMapper(new JsonFactory());
private int numImageUpdates;
private HeaderInfo hinfo;
private String hostname;
private int port;
private String method;
public Collector(HeaderInfo hinfo, String hostname, int port, String method){
this.hinfo = hinfo;
this.hostname = hostname;
this.port = port;
this.method = method;
}
/* (non-Javadoc)
* @see java.lang.Runnable#run()
*/
public void run() {
numImageUpdates = 0; // Start counting at 0
if(img!=null){
img.close();
img=null;
}
context = ZMQ.context();
if(method.equals("PULL")){
socket = context.socket(ZMQ.PULL);
}
else if(method.equals("SUB")){
socket = context.socket(ZMQ.SUB);
socket.subscribe(""); // Subscribe to all topics
}
else{
logger.severe("Method not supported");
// Terminate context and return
context.term();
return;
}
socket.setHWM(HIGH_WATER_MARK);
socket.connect("tcp://"+hostname+":"+port);
logger.info("Connected to: tcp://"+hostname+":"+port);
while(true){ // Can only be terminated by closing the socket
byte[] header = socket.recv();
byte[] content = null;
if (socket.hasReceiveMore()) {
content = socket.recv();
}
readHeader(header);
readContent(content);
}
}
public void terminate(){
logger.info("Terminate Collector");
try{
socket.close();
context.term();
}
catch(Exception ex){ // This exception can savely be ignored (somewhat most of the time an exception is expected)
ex.printStackTrace();
}
if(img!=null){
img.close();
}
logger.info("Collector terminated");
}
@SuppressWarnings("unchecked")
private void readHeader(byte[] h){
try{
String header = new String(h);
// hinfo.setHeader(header);
Map<String,Object> m = mapper.readValue(header, new TypeReference<HashMap<String,Object>>(){});
if(((List<String>) m.get("htype")).contains("array-1.0")){ // currently we only support array-1.0 message types
List<Integer> shape = (List<Integer>) m.get("shape");
int nImageSizeX = shape.get(1);
int nImageSizeY = shape.get(0);
if(imageSizeX!=nImageSizeX || imageSizeY!=nImageSizeY){
imageSizeX = nImageSizeX;
imageSizeY = nImageSizeY;
img.close();
img=null;
}
if (img == null) {
// TODO eventually use ByteProcessor or BinaryProcessor
// BinaryProcessor p = new ij.process.BinaryProcessor(new
// ByteProcessor(imageSizeX, imageSizeY));
img = new ImagePlus("", new ShortProcessor(imageSizeX, imageSizeY));
img.show();
}
img.setTitle(header);
hinfo.setText(m);
}
else{
logger.info("Header type is not supported ...");
if(img!=null){
img.close();
img=null;
}
}
}
catch(IOException e){
logger.log(Level.SEVERE, "Unable to parse header", e);
}
// logger.info(sheader);
}
private void readContent(byte[] content) {
try {
if(content!=null && img!=null){
// TODO Check whether this is needed
short[] shorts = new short[content.length / 2];
ByteBuffer.wrap(content).order(ByteOrder.LITTLE_ENDIAN).asShortBuffer().get(shorts);
ImageProcessor ip = img.getProcessor();
ip.setPixels(shorts);
if(flipX){
ip.flipHorizontal();
}
if(flipY){
ip.flipVertical();
}
img.updateAndDraw();
numImageUpdates++;
}
} catch (Exception ex) {
logger.log(Level.SEVERE, "UpdateImage got exception", ex);
}
}
public boolean isFlipX() {
return flipX;
}
public void setFlipX(boolean flipX) {
this.flipX = flipX;
}
public boolean isFlipY() {
return flipY;
}
public void setFlipY(boolean flipY) {
this.flipY = flipY;
}
public int getNumImageUpdates() {
return numImageUpdates;
}
public void setNumImageUpdates(int numImageUpdates) {
this.numImageUpdates = numImageUpdates;
}
}

View File

@ -5,61 +5,32 @@ package ch.psi.zmq.imagej;
// Tim Madden, APS
// Mark Rivers, University of Chicago
import ij.*;
import ij.process.*;
import java.awt.*;
import ij.plugin.*;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.awt.event.*;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Semaphore;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.swing.*;
import javax.swing.Timer;
import org.jeromq.ZMQ;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
public class ZeroMQViewer implements PlugIn {
private static final Logger logger = Logger.getLogger(ZeroMQViewer.class.getName());
private static final int HIGH_WATER_MARK = 10;
private ImagePlus img;
private boolean flipX = false;
private boolean flipY = false;
private int imageSizeX = 2560;
private int imageSizeY = 2160;
// These are used for the frames/second calculation
private long prevTime;
private int numImageUpdates;
private JFrame frame;
// private JLabel fpsText;
private volatile boolean isPluginRunning;
private volatile boolean collect;
private Timer timer;
private ZMQ.Context context;
private ZMQ.Socket socket;
private JPanel panel_1;
private JLabel lblNewLabel;
private JLabel labelFrameRate;
@ -69,12 +40,15 @@ public class ZeroMQViewer implements PlugIn {
private JTextField textPort;
private JButton btnStart;
private Semaphore semaphore = new Semaphore(1);
private JLabel lblMethod;
private JComboBox<String> comboBoxMethod;
private ObjectMapper mapper = new ObjectMapper(new JsonFactory());
private HeaderInfo hinfo = new HeaderInfo();
private boolean flipX = false;
private boolean flipY = false;
private Collector collector;
private JLabel lblFlip;
private JPanel panel;
private JCheckBox chckbxX;
@ -86,80 +60,36 @@ public class ZeroMQViewer implements PlugIn {
isPluginRunning = true;
prevTime = System.currentTimeMillis();
numImageUpdates = 0;
semaphore.acquire(); // block semaphore
// Update frame rate
int timerDelay = 2000; // 2 seconds
timer = new Timer(timerDelay, new ActionListener() {
public void actionPerformed(ActionEvent event) {
if(collector != null){
long time = System.currentTimeMillis();
double fps = 1000. * collector.getNumImageUpdates() / (double) (time - prevTime);
labelFrameRate.setText(String.format("%.1f", fps));
prevTime = time;
collector.setNumImageUpdates(0);
}
}
});
timer.start();
// Show Viewer GUI
javax.swing.SwingUtilities.invokeLater(new Runnable() {
public void run() {
createAndShowGUI();
}
});
while (isPluginRunning) {
semaphore.acquire();
if(!isPluginRunning){ // if plugin was terminated while waiting for the semaphore exit loop
break;
}
collect=true;
hinfo.setVisible(true);
try{
if(img!=null){
img.close();
img=null;
}
String hostname = textHostname.getText();
int port = Integer.parseInt(textPort.getText());
String method = (String) comboBoxMethod.getSelectedItem();
context = ZMQ.context();
if(method.equals("PULL")){
socket = context.socket(ZMQ.PULL);
}
else if(method.equals("SUB")){
socket = context.socket(ZMQ.SUB);
socket.subscribe(""); // Subscribe to all topics
}
else{
logger.severe("Method not supported");
collect=false;
}
socket.setHWM(HIGH_WATER_MARK);
socket.connect("tcp://"+hostname+":"+port);
logger.info("Connected to: tcp://"+hostname+":"+port);
while(collect){
byte[] header = socket.recv();
byte[] content = null;
if (socket.hasReceiveMore()) {
content = socket.recv();
}
readHeader(header);
readContent(content);
}
}
catch(Exception e){
logger.log(Level.SEVERE, "",e);
}
finally{
try{
logger.info("Close connection");
socket.close();
context.term();
logger.info("Connection closed");
}
catch(Exception e){
}
}
while (isPluginRunning) { // Keep plugin running
Thread.sleep(1000);
}
timer.stop();
if(img!=null){
img.close();
}
frame.setVisible(false);
IJ.showStatus("Exiting ZeroMQ Viewer");
@ -170,73 +100,7 @@ public class ZeroMQViewer implements PlugIn {
}
}
@SuppressWarnings("unchecked")
private void readHeader(byte[] h){
try{
String header = new String(h);
// hinfo.setHeader(header);
Map<String,Object> m = mapper.readValue(header, new TypeReference<HashMap<String,Object>>(){});
if(((List<String>) m.get("htype")).contains("array-1.0")){ // currently we only support array-1.0 message types
List<Integer> shape = (List<Integer>) m.get("shape");
int nImageSizeX = shape.get(1);
int nImageSizeY = shape.get(0);
if(imageSizeX!=nImageSizeX || imageSizeY!=nImageSizeY){
imageSizeX = nImageSizeX;
imageSizeY = nImageSizeY;
img.close();
img=null;
}
if (img == null) {
// TODO eventually use ByteProcessor or BinaryProcessor
// BinaryProcessor p = new ij.process.BinaryProcessor(new
// ByteProcessor(imageSizeX, imageSizeY));
img = new ImagePlus("", new ShortProcessor(imageSizeX, imageSizeY));
img.show();
}
img.setTitle(header);
hinfo.setText(m);
}
else{
logger.info("Header type is not supported ...");
if(img!=null){
img.close();
img=null;
}
}
}
catch(IOException e){
logger.log(Level.SEVERE, "Unable to parse header", e);
}
// logger.info(sheader);
}
private void readContent(byte[] content) {
try {
if(content!=null && img!=null){
// TODO Check whether this is needed
short[] shorts = new short[content.length / 2];
ByteBuffer.wrap(content).order(ByteOrder.LITTLE_ENDIAN).asShortBuffer().get(shorts);
ImageProcessor ip = img.getProcessor();
ip.setPixels(shorts);
if(flipX){
ip.flipHorizontal();
}
if(flipY){
ip.flipVertical();
}
img.updateAndDraw();
numImageUpdates++;
}
} catch (Exception ex) {
logger.log(Level.SEVERE, "UpdateImage got exception", ex);
}
}
/**
* Create the GUI and show it. For thread safety, this method should be
@ -333,17 +197,26 @@ public class ZeroMQViewer implements PlugIn {
public void actionPerformed(ActionEvent e) {
if(btnStart.getText().equals("Start")){
// Start data acquisition
semaphore.release();
hinfo.setVisible(true);
String hostname = textHostname.getText();
int port = Integer.parseInt(textPort.getText());
String method = (String) comboBoxMethod.getSelectedItem();
collector = new Collector(hinfo, hostname, port, method);
collector.setFlipX(flipX);
collector.setFlipY(flipY);
new Thread(collector).start();
btnStart.setText("Stop");
}
else{
// Stop data acquisition
collect = false;
try{
socket.notifyAll();
}
catch(Exception ex){ // This exception can savely be ignored (somewhat most of the time an exception is expected)
}
hinfo.setVisible(false);
collector.terminate();
btnStart.setText("Start");
}
}
@ -390,27 +263,11 @@ public class ZeroMQViewer implements PlugIn {
frame.pack();
frame.addWindowListener(new FrameExitListener());
frame.setVisible(true);
// Update frame rate
int timerDelay = 2000; // 2 seconds
timer = new Timer(timerDelay, new ActionListener() {
public void actionPerformed(ActionEvent event) {
long time = System.currentTimeMillis();
double fps = 1000. * numImageUpdates / (double) (time - prevTime);
labelFrameRate.setText(String.format("%.1f", fps));
prevTime = time;
numImageUpdates = 0;
}
});
timer.start();
}
public class FrameExitListener extends WindowAdapter {
public void windowClosing(WindowEvent event) {
isPluginRunning = false;
semaphore.release(); // release the wait semaphore in case the plugin is stucked in there.
}
}
}