From 0b3ce96675cd99292e46bb0fce1b556a0fda8ef6 Mon Sep 17 00:00:00 2001 From: Simon Ebner Date: Wed, 30 Apr 2014 06:58:24 +0200 Subject: [PATCH] reformatting added cbf support --- pom.xml | 7 +- .../java/ch/psi/zmq/imagej/Collector.java | 290 ++++++++++++------ 2 files changed, 204 insertions(+), 93 deletions(-) diff --git a/pom.xml b/pom.xml index 63ae9fd..d3c1ed1 100644 --- a/pom.xml +++ b/pom.xml @@ -3,7 +3,7 @@ 4.0.0 ch.psi.zmq ch.psi.zmq.imagej - 0.2.0 + 0.4.0 @@ -11,6 +11,11 @@ jeromq 0.2.0 + + ch.psi.imagej + ch.psi.imagej.cbf + 0.0.3 + com.fasterxml.jackson.core jackson-databind diff --git a/src/main/java/ch/psi/zmq/imagej/Collector.java b/src/main/java/ch/psi/zmq/imagej/Collector.java index 3ec3cb5..31e4774 100644 --- a/src/main/java/ch/psi/zmq/imagej/Collector.java +++ b/src/main/java/ch/psi/zmq/imagej/Collector.java @@ -18,10 +18,16 @@ */ package ch.psi.zmq.imagej; +import ij.IJ; import ij.ImagePlus; +import ij.process.FloatProcessor; import ij.process.ImageProcessor; import ij.process.ShortProcessor; +import java.awt.image.BufferedImage; +import java.awt.image.DataBuffer; +import java.awt.image.DataBufferFloat; +import java.io.ByteArrayInputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.ByteOrder; @@ -31,130 +37,151 @@ import java.util.Map; import java.util.logging.Level; import java.util.logging.Logger; +import javax.imageio.ImageIO; +import javax.imageio.ImageReadParam; +import javax.imageio.stream.ImageInputStream; + import org.jeromq.ZMQ; +import ch.psi.imagej.cbf.CbfImageReader; +import ch.psi.imagej.cbf.CbfImageReaderSpi; + import com.fasterxml.jackson.core.JsonFactory; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; /** * @author ebner - * + * */ -public class Collector implements Runnable{ - - +public class Collector implements Runnable { + private static final Logger logger = Logger.getLogger(Collector.class.getName()); - + private static final int HIGH_WATER_MARK = 10; - + + private enum HType { + ARRAY_1_0, PILATUS_1_0 + }; + + private HType htype = HType.ARRAY_1_0; + private boolean flipX = false; private boolean flipY = false; private int imageSizeX = 2560; private int imageSizeY = 2160; - + private ZMQ.Context context; private ZMQ.Socket socket; private ImagePlus img; - + private ObjectMapper mapper = new ObjectMapper(new JsonFactory()); - - + private int numImageUpdates; - + private HeaderInfo hinfo; private String hostname; private int port; private String method; - - public Collector(HeaderInfo hinfo, String hostname, int port, String method){ + + public Collector(HeaderInfo hinfo, String hostname, int port, String method) { this.hinfo = hinfo; this.hostname = hostname; this.port = port; this.method = method; } - /* (non-Javadoc) + /* + * (non-Javadoc) + * * @see java.lang.Runnable#run() */ public void run() { numImageUpdates = 0; // Start counting at 0 - - if(img!=null){ - img.close(); - img=null; - } - - - context = ZMQ.context(); - if(method.equals("PULL")){ - socket = context.socket(ZMQ.PULL); - } - else if(method.equals("SUB")){ - socket = context.socket(ZMQ.SUB); - socket.subscribe(""); // Subscribe to all topics - } - else{ - logger.severe("Method not supported"); - // Terminate context and return - context.term(); - return; - } - socket.setHWM(HIGH_WATER_MARK); - socket.connect("tcp://"+hostname+":"+port); - - logger.info("Connected to: tcp://"+hostname+":"+port); - - while(true){ // Can only be terminated by closing the socket - byte[] header = socket.recv(); - byte[] content = null; - if (socket.hasReceiveMore()) { - content = socket.recv(); - } - - readHeader(header); - readContent(content); + + if (img != null) { + img.close(); + img = null; + } + + context = ZMQ.context(); + if (method.equals("PULL")) { + socket = context.socket(ZMQ.PULL); + } + else if (method.equals("SUB")) { + socket = context.socket(ZMQ.SUB); + socket.subscribe(""); // Subscribe to all topics + } + else { + logger.severe("Method not supported"); + // Terminate context and return + context.term(); + return; + } + socket.setHWM(HIGH_WATER_MARK); + socket.connect("tcp://" + hostname + ":" + port); + + logger.info("Connected to: tcp://" + hostname + ":" + port); + + while (true) { // Can only be terminated by closing the socket + byte[] header = socket.recv(); + byte[] content = null; + if (socket.hasReceiveMore()) { + content = socket.recv(); } + + readHeader(header); + readContent(content); + } } - public void terminate(){ + public void terminate() { logger.info("Terminate Collector"); - - try{ - + + try { + socket.close(); context.term(); - } - catch(Exception ex){ // This exception can savely be ignored (somewhat most of the time an exception is expected) + } catch (Exception ex) { // This exception can savely be ignored + // (somewhat most of the time an exception + // is expected) ex.printStackTrace(); } - - if(img!=null){ + + if (img != null) { img.close(); } - + logger.info("Collector terminated"); } - + @SuppressWarnings("unchecked") - private void readHeader(byte[] h){ - try{ + private void readHeader(byte[] h) { + try { String header = new String(h); -// hinfo.setHeader(header); - Map m = mapper.readValue(header, new TypeReference>(){}); - if(((List) m.get("htype")).contains("array-1.0")){ // currently we only support array-1.0 message types + // hinfo.setHeader(header); + Map m = mapper.readValue(header, new TypeReference>() { + }); + if (((List) m.get("htype")).contains("array-1.0")) { // currently + // we + // only + // support + // array-1.0 + // message + // types + htype = HType.ARRAY_1_0; List shape = (List) m.get("shape"); int nImageSizeX = shape.get(1); int nImageSizeY = shape.get(0); - if(imageSizeX!=nImageSizeX || imageSizeY!=nImageSizeY){ + if (imageSizeX != nImageSizeX || imageSizeY != nImageSizeY) { imageSizeX = nImageSizeX; imageSizeY = nImageSizeY; - + img.close(); - img=null; + img = null; } - + if (img == null) { // TODO eventually use ByteProcessor or BinaryProcessor // BinaryProcessor p = new ij.process.BinaryProcessor(new @@ -164,64 +191,143 @@ public class Collector implements Runnable{ } img.setTitle(header); hinfo.setText(m); - + } - else{ + else if (((List) m.get("htype")).contains("pilatus-1.0")) { // pilatus + // 1.0 + // message + // support + htype = HType.PILATUS_1_0; + } + else { logger.info("Header type is not supported ..."); - if(img!=null){ + if (img != null) { img.close(); - img=null; + img = null; } } - } - catch(IOException e){ + } catch (IOException e) { logger.log(Level.SEVERE, "Unable to parse header", e); } - -// logger.info(sheader); + + // logger.info(sheader); } - + private void readContent(byte[] content) { try { - if(content!=null && img!=null){ - // TODO Check whether this is needed - short[] shorts = new short[content.length / 2]; - ByteBuffer.wrap(content).order(ByteOrder.LITTLE_ENDIAN).asShortBuffer().get(shorts); - - ImageProcessor ip = img.getProcessor(); - ip.setPixels(shorts); - if(flipX){ - ip.flipHorizontal(); + if (htype == HType.ARRAY_1_0) { + if (content != null && img != null) { + // TODO Check whether this is needed + short[] shorts = new short[content.length / 2]; + ByteBuffer.wrap(content).order(ByteOrder.LITTLE_ENDIAN).asShortBuffer().get(shorts); + + ImageProcessor ip = img.getProcessor(); + ip.setPixels(shorts); + if (flipX) { + ip.flipHorizontal(); + } + if (flipY) { + ip.flipVertical(); + } + + img.updateAndDraw(); + numImageUpdates++; } - if(flipY){ - ip.flipVertical(); + } + else if (htype == HType.PILATUS_1_0) { + CbfImageReaderSpi spi = new CbfImageReaderSpi(); + ImageInputStream stream = ImageIO.createImageInputStream(new ByteArrayInputStream(content)); + CbfImageReader reader = new CbfImageReader(spi); + reader.setInput(stream); + ImageReadParam param = reader.getDefaultReadParam(); + int width = reader.getWidth(0); + int height = reader.getHeight(0); + BufferedImage image = reader.read(0, param); + DataBuffer buffer = image.getData().getDataBuffer(); + int bufferType = buffer.getDataType(); + float[] pixels; + ImageProcessor ip = null; + if (buffer instanceof DataBufferFloat) { + pixels = ((DataBufferFloat) buffer).getData(); + ip = new FloatProcessor(width, height, pixels, null); + image.flush(); + reader.dispose(); + // TODO + // return new ImagePlus(cbfFile.getName(), ip); + } else if (bufferType == DataBuffer.TYPE_INT || bufferType == DataBuffer.TYPE_FLOAT) { + pixels = new float[width * height]; + for (int i = 0; i < pixels.length; i++) + pixels[i] = buffer.getElemFloat(i); + ip = new FloatProcessor(width, height, pixels, null); + image.flush(); + reader.dispose(); + // TODO + // return new ImagePlus(cbfFile.getName(), ip); + } else if (bufferType == DataBuffer.TYPE_DOUBLE) { + pixels = new float[width * height]; + boolean clipped = false; + for (int i = 0; i < pixels.length; i++) { + double doubleValue = buffer.getElemDouble(i); + float floatValue; + if (doubleValue < Float.MIN_VALUE) { + floatValue = Float.MIN_VALUE; + clipped = true; + } else if (doubleValue > Float.MAX_VALUE) { + floatValue = Float.MAX_VALUE; + clipped = true; + } else { + floatValue = (float) doubleValue; + } + pixels[i] = floatValue; + } + if (clipped) { + IJ.log("Warning: pixel value(s) clipped to fit in type float"); + } + ip = new FloatProcessor(width, height, pixels, null); + image.flush(); + reader.dispose(); + // TODO + // return new ImagePlus(cbfFile.getName(), ip); + } else { + reader.dispose(); + // TODO + // return new ImagePlus(cbfFile.getName(), image); + } + if(img==null){ + img = new ImagePlus("", ip); + img.show(); + } + else{ + img.setProcessor(ip); + img.updateAndDraw(); } - img.updateAndDraw(); - numImageUpdates++; } } catch (Exception ex) { logger.log(Level.SEVERE, "UpdateImage got exception", ex); } } - - public boolean isFlipX() { return flipX; } + public void setFlipX(boolean flipX) { this.flipX = flipX; } + public boolean isFlipY() { return flipY; } + public void setFlipY(boolean flipY) { this.flipY = flipY; } + public int getNumImageUpdates() { return numImageUpdates; } + public void setNumImageUpdates(int numImageUpdates) { this.numImageUpdates = numImageUpdates; }