diff --git a/pom.xml b/pom.xml
index 63ae9fd..d3c1ed1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -3,7 +3,7 @@
4.0.0
ch.psi.zmq
ch.psi.zmq.imagej
- 0.2.0
+ 0.4.0
@@ -11,6 +11,11 @@
jeromq
0.2.0
+
+ ch.psi.imagej
+ ch.psi.imagej.cbf
+ 0.0.3
+
com.fasterxml.jackson.core
jackson-databind
diff --git a/src/main/java/ch/psi/zmq/imagej/Collector.java b/src/main/java/ch/psi/zmq/imagej/Collector.java
index 3ec3cb5..31e4774 100644
--- a/src/main/java/ch/psi/zmq/imagej/Collector.java
+++ b/src/main/java/ch/psi/zmq/imagej/Collector.java
@@ -18,10 +18,16 @@
*/
package ch.psi.zmq.imagej;
+import ij.IJ;
import ij.ImagePlus;
+import ij.process.FloatProcessor;
import ij.process.ImageProcessor;
import ij.process.ShortProcessor;
+import java.awt.image.BufferedImage;
+import java.awt.image.DataBuffer;
+import java.awt.image.DataBufferFloat;
+import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
@@ -31,130 +37,151 @@ import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
+import javax.imageio.ImageIO;
+import javax.imageio.ImageReadParam;
+import javax.imageio.stream.ImageInputStream;
+
import org.jeromq.ZMQ;
+import ch.psi.imagej.cbf.CbfImageReader;
+import ch.psi.imagej.cbf.CbfImageReaderSpi;
+
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
/**
* @author ebner
- *
+ *
*/
-public class Collector implements Runnable{
-
-
+public class Collector implements Runnable {
+
private static final Logger logger = Logger.getLogger(Collector.class.getName());
-
+
private static final int HIGH_WATER_MARK = 10;
-
+
+ private enum HType {
+ ARRAY_1_0, PILATUS_1_0
+ };
+
+ private HType htype = HType.ARRAY_1_0;
+
private boolean flipX = false;
private boolean flipY = false;
private int imageSizeX = 2560;
private int imageSizeY = 2160;
-
+
private ZMQ.Context context;
private ZMQ.Socket socket;
private ImagePlus img;
-
+
private ObjectMapper mapper = new ObjectMapper(new JsonFactory());
-
-
+
private int numImageUpdates;
-
+
private HeaderInfo hinfo;
private String hostname;
private int port;
private String method;
-
- public Collector(HeaderInfo hinfo, String hostname, int port, String method){
+
+ public Collector(HeaderInfo hinfo, String hostname, int port, String method) {
this.hinfo = hinfo;
this.hostname = hostname;
this.port = port;
this.method = method;
}
- /* (non-Javadoc)
+ /*
+ * (non-Javadoc)
+ *
* @see java.lang.Runnable#run()
*/
public void run() {
numImageUpdates = 0; // Start counting at 0
-
- if(img!=null){
- img.close();
- img=null;
- }
-
-
- context = ZMQ.context();
- if(method.equals("PULL")){
- socket = context.socket(ZMQ.PULL);
- }
- else if(method.equals("SUB")){
- socket = context.socket(ZMQ.SUB);
- socket.subscribe(""); // Subscribe to all topics
- }
- else{
- logger.severe("Method not supported");
- // Terminate context and return
- context.term();
- return;
- }
- socket.setHWM(HIGH_WATER_MARK);
- socket.connect("tcp://"+hostname+":"+port);
-
- logger.info("Connected to: tcp://"+hostname+":"+port);
-
- while(true){ // Can only be terminated by closing the socket
- byte[] header = socket.recv();
- byte[] content = null;
- if (socket.hasReceiveMore()) {
- content = socket.recv();
- }
-
- readHeader(header);
- readContent(content);
+
+ if (img != null) {
+ img.close();
+ img = null;
+ }
+
+ context = ZMQ.context();
+ if (method.equals("PULL")) {
+ socket = context.socket(ZMQ.PULL);
+ }
+ else if (method.equals("SUB")) {
+ socket = context.socket(ZMQ.SUB);
+ socket.subscribe(""); // Subscribe to all topics
+ }
+ else {
+ logger.severe("Method not supported");
+ // Terminate context and return
+ context.term();
+ return;
+ }
+ socket.setHWM(HIGH_WATER_MARK);
+ socket.connect("tcp://" + hostname + ":" + port);
+
+ logger.info("Connected to: tcp://" + hostname + ":" + port);
+
+ while (true) { // Can only be terminated by closing the socket
+ byte[] header = socket.recv();
+ byte[] content = null;
+ if (socket.hasReceiveMore()) {
+ content = socket.recv();
}
+
+ readHeader(header);
+ readContent(content);
+ }
}
- public void terminate(){
+ public void terminate() {
logger.info("Terminate Collector");
-
- try{
-
+
+ try {
+
socket.close();
context.term();
- }
- catch(Exception ex){ // This exception can savely be ignored (somewhat most of the time an exception is expected)
+ } catch (Exception ex) { // This exception can savely be ignored
+ // (somewhat most of the time an exception
+ // is expected)
ex.printStackTrace();
}
-
- if(img!=null){
+
+ if (img != null) {
img.close();
}
-
+
logger.info("Collector terminated");
}
-
+
@SuppressWarnings("unchecked")
- private void readHeader(byte[] h){
- try{
+ private void readHeader(byte[] h) {
+ try {
String header = new String(h);
-// hinfo.setHeader(header);
- Map m = mapper.readValue(header, new TypeReference>(){});
- if(((List) m.get("htype")).contains("array-1.0")){ // currently we only support array-1.0 message types
+ // hinfo.setHeader(header);
+ Map m = mapper.readValue(header, new TypeReference>() {
+ });
+ if (((List) m.get("htype")).contains("array-1.0")) { // currently
+ // we
+ // only
+ // support
+ // array-1.0
+ // message
+ // types
+ htype = HType.ARRAY_1_0;
List shape = (List) m.get("shape");
int nImageSizeX = shape.get(1);
int nImageSizeY = shape.get(0);
- if(imageSizeX!=nImageSizeX || imageSizeY!=nImageSizeY){
+ if (imageSizeX != nImageSizeX || imageSizeY != nImageSizeY) {
imageSizeX = nImageSizeX;
imageSizeY = nImageSizeY;
-
+
img.close();
- img=null;
+ img = null;
}
-
+
if (img == null) {
// TODO eventually use ByteProcessor or BinaryProcessor
// BinaryProcessor p = new ij.process.BinaryProcessor(new
@@ -164,64 +191,143 @@ public class Collector implements Runnable{
}
img.setTitle(header);
hinfo.setText(m);
-
+
}
- else{
+ else if (((List) m.get("htype")).contains("pilatus-1.0")) { // pilatus
+ // 1.0
+ // message
+ // support
+ htype = HType.PILATUS_1_0;
+ }
+ else {
logger.info("Header type is not supported ...");
- if(img!=null){
+ if (img != null) {
img.close();
- img=null;
+ img = null;
}
}
- }
- catch(IOException e){
+ } catch (IOException e) {
logger.log(Level.SEVERE, "Unable to parse header", e);
}
-
-// logger.info(sheader);
+
+ // logger.info(sheader);
}
-
+
private void readContent(byte[] content) {
try {
- if(content!=null && img!=null){
- // TODO Check whether this is needed
- short[] shorts = new short[content.length / 2];
- ByteBuffer.wrap(content).order(ByteOrder.LITTLE_ENDIAN).asShortBuffer().get(shorts);
-
- ImageProcessor ip = img.getProcessor();
- ip.setPixels(shorts);
- if(flipX){
- ip.flipHorizontal();
+ if (htype == HType.ARRAY_1_0) {
+ if (content != null && img != null) {
+ // TODO Check whether this is needed
+ short[] shorts = new short[content.length / 2];
+ ByteBuffer.wrap(content).order(ByteOrder.LITTLE_ENDIAN).asShortBuffer().get(shorts);
+
+ ImageProcessor ip = img.getProcessor();
+ ip.setPixels(shorts);
+ if (flipX) {
+ ip.flipHorizontal();
+ }
+ if (flipY) {
+ ip.flipVertical();
+ }
+
+ img.updateAndDraw();
+ numImageUpdates++;
}
- if(flipY){
- ip.flipVertical();
+ }
+ else if (htype == HType.PILATUS_1_0) {
+ CbfImageReaderSpi spi = new CbfImageReaderSpi();
+ ImageInputStream stream = ImageIO.createImageInputStream(new ByteArrayInputStream(content));
+ CbfImageReader reader = new CbfImageReader(spi);
+ reader.setInput(stream);
+ ImageReadParam param = reader.getDefaultReadParam();
+ int width = reader.getWidth(0);
+ int height = reader.getHeight(0);
+ BufferedImage image = reader.read(0, param);
+ DataBuffer buffer = image.getData().getDataBuffer();
+ int bufferType = buffer.getDataType();
+ float[] pixels;
+ ImageProcessor ip = null;
+ if (buffer instanceof DataBufferFloat) {
+ pixels = ((DataBufferFloat) buffer).getData();
+ ip = new FloatProcessor(width, height, pixels, null);
+ image.flush();
+ reader.dispose();
+ // TODO
+ // return new ImagePlus(cbfFile.getName(), ip);
+ } else if (bufferType == DataBuffer.TYPE_INT || bufferType == DataBuffer.TYPE_FLOAT) {
+ pixels = new float[width * height];
+ for (int i = 0; i < pixels.length; i++)
+ pixels[i] = buffer.getElemFloat(i);
+ ip = new FloatProcessor(width, height, pixels, null);
+ image.flush();
+ reader.dispose();
+ // TODO
+ // return new ImagePlus(cbfFile.getName(), ip);
+ } else if (bufferType == DataBuffer.TYPE_DOUBLE) {
+ pixels = new float[width * height];
+ boolean clipped = false;
+ for (int i = 0; i < pixels.length; i++) {
+ double doubleValue = buffer.getElemDouble(i);
+ float floatValue;
+ if (doubleValue < Float.MIN_VALUE) {
+ floatValue = Float.MIN_VALUE;
+ clipped = true;
+ } else if (doubleValue > Float.MAX_VALUE) {
+ floatValue = Float.MAX_VALUE;
+ clipped = true;
+ } else {
+ floatValue = (float) doubleValue;
+ }
+ pixels[i] = floatValue;
+ }
+ if (clipped) {
+ IJ.log("Warning: pixel value(s) clipped to fit in type float");
+ }
+ ip = new FloatProcessor(width, height, pixels, null);
+ image.flush();
+ reader.dispose();
+ // TODO
+ // return new ImagePlus(cbfFile.getName(), ip);
+ } else {
+ reader.dispose();
+ // TODO
+ // return new ImagePlus(cbfFile.getName(), image);
+ }
+ if(img==null){
+ img = new ImagePlus("", ip);
+ img.show();
+ }
+ else{
+ img.setProcessor(ip);
+ img.updateAndDraw();
}
- img.updateAndDraw();
- numImageUpdates++;
}
} catch (Exception ex) {
logger.log(Level.SEVERE, "UpdateImage got exception", ex);
}
}
-
-
public boolean isFlipX() {
return flipX;
}
+
public void setFlipX(boolean flipX) {
this.flipX = flipX;
}
+
public boolean isFlipY() {
return flipY;
}
+
public void setFlipY(boolean flipY) {
this.flipY = flipY;
}
+
public int getNumImageUpdates() {
return numImageUpdates;
}
+
public void setNumImageUpdates(int numImageUpdates) {
this.numImageUpdates = numImageUpdates;
}