reformatting

added cbf support
This commit is contained in:
ebner 2014-04-30 06:58:24 +02:00
parent 9891e11979
commit 0b3ce96675
2 changed files with 204 additions and 93 deletions

View File

@ -3,7 +3,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>ch.psi.zmq</groupId>
<artifactId>ch.psi.zmq.imagej</artifactId>
<version>0.2.0</version>
<version>0.4.0</version>
<dependencies>
<dependency>
@ -11,6 +11,11 @@
<artifactId>jeromq</artifactId>
<version>0.2.0</version>
</dependency>
<dependency>
<groupId>ch.psi.imagej</groupId>
<artifactId>ch.psi.imagej.cbf</artifactId>
<version>0.0.3</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>

View File

@ -18,10 +18,16 @@
*/
package ch.psi.zmq.imagej;
import ij.IJ;
import ij.ImagePlus;
import ij.process.FloatProcessor;
import ij.process.ImageProcessor;
import ij.process.ShortProcessor;
import java.awt.image.BufferedImage;
import java.awt.image.DataBuffer;
import java.awt.image.DataBufferFloat;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
@ -31,130 +37,151 @@ import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.imageio.ImageIO;
import javax.imageio.ImageReadParam;
import javax.imageio.stream.ImageInputStream;
import org.jeromq.ZMQ;
import ch.psi.imagej.cbf.CbfImageReader;
import ch.psi.imagej.cbf.CbfImageReaderSpi;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
/**
* @author ebner
*
*
*/
public class Collector implements Runnable{
public class Collector implements Runnable {
private static final Logger logger = Logger.getLogger(Collector.class.getName());
private static final int HIGH_WATER_MARK = 10;
private enum HType {
ARRAY_1_0, PILATUS_1_0
};
private HType htype = HType.ARRAY_1_0;
private boolean flipX = false;
private boolean flipY = false;
private int imageSizeX = 2560;
private int imageSizeY = 2160;
private ZMQ.Context context;
private ZMQ.Socket socket;
private ImagePlus img;
private ObjectMapper mapper = new ObjectMapper(new JsonFactory());
private int numImageUpdates;
private HeaderInfo hinfo;
private String hostname;
private int port;
private String method;
public Collector(HeaderInfo hinfo, String hostname, int port, String method){
public Collector(HeaderInfo hinfo, String hostname, int port, String method) {
this.hinfo = hinfo;
this.hostname = hostname;
this.port = port;
this.method = method;
}
/* (non-Javadoc)
/*
* (non-Javadoc)
*
* @see java.lang.Runnable#run()
*/
public void run() {
numImageUpdates = 0; // Start counting at 0
if(img!=null){
img.close();
img=null;
}
context = ZMQ.context();
if(method.equals("PULL")){
socket = context.socket(ZMQ.PULL);
}
else if(method.equals("SUB")){
socket = context.socket(ZMQ.SUB);
socket.subscribe(""); // Subscribe to all topics
}
else{
logger.severe("Method not supported");
// Terminate context and return
context.term();
return;
}
socket.setHWM(HIGH_WATER_MARK);
socket.connect("tcp://"+hostname+":"+port);
logger.info("Connected to: tcp://"+hostname+":"+port);
while(true){ // Can only be terminated by closing the socket
byte[] header = socket.recv();
byte[] content = null;
if (socket.hasReceiveMore()) {
content = socket.recv();
}
readHeader(header);
readContent(content);
if (img != null) {
img.close();
img = null;
}
context = ZMQ.context();
if (method.equals("PULL")) {
socket = context.socket(ZMQ.PULL);
}
else if (method.equals("SUB")) {
socket = context.socket(ZMQ.SUB);
socket.subscribe(""); // Subscribe to all topics
}
else {
logger.severe("Method not supported");
// Terminate context and return
context.term();
return;
}
socket.setHWM(HIGH_WATER_MARK);
socket.connect("tcp://" + hostname + ":" + port);
logger.info("Connected to: tcp://" + hostname + ":" + port);
while (true) { // Can only be terminated by closing the socket
byte[] header = socket.recv();
byte[] content = null;
if (socket.hasReceiveMore()) {
content = socket.recv();
}
readHeader(header);
readContent(content);
}
}
public void terminate(){
public void terminate() {
logger.info("Terminate Collector");
try{
try {
socket.close();
context.term();
}
catch(Exception ex){ // This exception can savely be ignored (somewhat most of the time an exception is expected)
} catch (Exception ex) { // This exception can savely be ignored
// (somewhat most of the time an exception
// is expected)
ex.printStackTrace();
}
if(img!=null){
if (img != null) {
img.close();
}
logger.info("Collector terminated");
}
@SuppressWarnings("unchecked")
private void readHeader(byte[] h){
try{
private void readHeader(byte[] h) {
try {
String header = new String(h);
// hinfo.setHeader(header);
Map<String,Object> m = mapper.readValue(header, new TypeReference<HashMap<String,Object>>(){});
if(((List<String>) m.get("htype")).contains("array-1.0")){ // currently we only support array-1.0 message types
// hinfo.setHeader(header);
Map<String, Object> m = mapper.readValue(header, new TypeReference<HashMap<String, Object>>() {
});
if (((List<String>) m.get("htype")).contains("array-1.0")) { // currently
// we
// only
// support
// array-1.0
// message
// types
htype = HType.ARRAY_1_0;
List<Integer> shape = (List<Integer>) m.get("shape");
int nImageSizeX = shape.get(1);
int nImageSizeY = shape.get(0);
if(imageSizeX!=nImageSizeX || imageSizeY!=nImageSizeY){
if (imageSizeX != nImageSizeX || imageSizeY != nImageSizeY) {
imageSizeX = nImageSizeX;
imageSizeY = nImageSizeY;
img.close();
img=null;
img = null;
}
if (img == null) {
// TODO eventually use ByteProcessor or BinaryProcessor
// BinaryProcessor p = new ij.process.BinaryProcessor(new
@ -164,64 +191,143 @@ public class Collector implements Runnable{
}
img.setTitle(header);
hinfo.setText(m);
}
else{
else if (((List<String>) m.get("htype")).contains("pilatus-1.0")) { // pilatus
// 1.0
// message
// support
htype = HType.PILATUS_1_0;
}
else {
logger.info("Header type is not supported ...");
if(img!=null){
if (img != null) {
img.close();
img=null;
img = null;
}
}
}
catch(IOException e){
} catch (IOException e) {
logger.log(Level.SEVERE, "Unable to parse header", e);
}
// logger.info(sheader);
// logger.info(sheader);
}
private void readContent(byte[] content) {
try {
if(content!=null && img!=null){
// TODO Check whether this is needed
short[] shorts = new short[content.length / 2];
ByteBuffer.wrap(content).order(ByteOrder.LITTLE_ENDIAN).asShortBuffer().get(shorts);
ImageProcessor ip = img.getProcessor();
ip.setPixels(shorts);
if(flipX){
ip.flipHorizontal();
if (htype == HType.ARRAY_1_0) {
if (content != null && img != null) {
// TODO Check whether this is needed
short[] shorts = new short[content.length / 2];
ByteBuffer.wrap(content).order(ByteOrder.LITTLE_ENDIAN).asShortBuffer().get(shorts);
ImageProcessor ip = img.getProcessor();
ip.setPixels(shorts);
if (flipX) {
ip.flipHorizontal();
}
if (flipY) {
ip.flipVertical();
}
img.updateAndDraw();
numImageUpdates++;
}
if(flipY){
ip.flipVertical();
}
else if (htype == HType.PILATUS_1_0) {
CbfImageReaderSpi spi = new CbfImageReaderSpi();
ImageInputStream stream = ImageIO.createImageInputStream(new ByteArrayInputStream(content));
CbfImageReader reader = new CbfImageReader(spi);
reader.setInput(stream);
ImageReadParam param = reader.getDefaultReadParam();
int width = reader.getWidth(0);
int height = reader.getHeight(0);
BufferedImage image = reader.read(0, param);
DataBuffer buffer = image.getData().getDataBuffer();
int bufferType = buffer.getDataType();
float[] pixels;
ImageProcessor ip = null;
if (buffer instanceof DataBufferFloat) {
pixels = ((DataBufferFloat) buffer).getData();
ip = new FloatProcessor(width, height, pixels, null);
image.flush();
reader.dispose();
// TODO
// return new ImagePlus(cbfFile.getName(), ip);
} else if (bufferType == DataBuffer.TYPE_INT || bufferType == DataBuffer.TYPE_FLOAT) {
pixels = new float[width * height];
for (int i = 0; i < pixels.length; i++)
pixels[i] = buffer.getElemFloat(i);
ip = new FloatProcessor(width, height, pixels, null);
image.flush();
reader.dispose();
// TODO
// return new ImagePlus(cbfFile.getName(), ip);
} else if (bufferType == DataBuffer.TYPE_DOUBLE) {
pixels = new float[width * height];
boolean clipped = false;
for (int i = 0; i < pixels.length; i++) {
double doubleValue = buffer.getElemDouble(i);
float floatValue;
if (doubleValue < Float.MIN_VALUE) {
floatValue = Float.MIN_VALUE;
clipped = true;
} else if (doubleValue > Float.MAX_VALUE) {
floatValue = Float.MAX_VALUE;
clipped = true;
} else {
floatValue = (float) doubleValue;
}
pixels[i] = floatValue;
}
if (clipped) {
IJ.log("Warning: pixel value(s) clipped to fit in type float");
}
ip = new FloatProcessor(width, height, pixels, null);
image.flush();
reader.dispose();
// TODO
// return new ImagePlus(cbfFile.getName(), ip);
} else {
reader.dispose();
// TODO
// return new ImagePlus(cbfFile.getName(), image);
}
if(img==null){
img = new ImagePlus("", ip);
img.show();
}
else{
img.setProcessor(ip);
img.updateAndDraw();
}
img.updateAndDraw();
numImageUpdates++;
}
} catch (Exception ex) {
logger.log(Level.SEVERE, "UpdateImage got exception", ex);
}
}
public boolean isFlipX() {
return flipX;
}
public void setFlipX(boolean flipX) {
this.flipX = flipX;
}
public boolean isFlipY() {
return flipY;
}
public void setFlipY(boolean flipY) {
this.flipY = flipY;
}
public int getNumImageUpdates() {
return numImageUpdates;
}
public void setNumImageUpdates(int numImageUpdates) {
this.numImageUpdates = numImageUpdates;
}