diff --git a/ch.psi.fda.xscan/src/main/java/ch/psi/fda/core/loops/cr/CrlogicLoopStream.java b/ch.psi.fda.xscan/src/main/java/ch/psi/fda/core/loops/cr/CrlogicLoopStream.java index 5949b0b..a2a8194 100644 --- a/ch.psi.fda.xscan/src/main/java/ch/psi/fda/core/loops/cr/CrlogicLoopStream.java +++ b/ch.psi.fda.xscan/src/main/java/ch/psi/fda/core/loops/cr/CrlogicLoopStream.java @@ -33,6 +33,7 @@ import java.util.logging.Logger; import org.zeromq.ZMQ; import org.zeromq.ZMQ.Context; import org.zeromq.ZMQ.Socket; +import org.zeromq.ZMQException; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.eventbus.EventBus; @@ -40,6 +41,7 @@ import com.google.common.eventbus.EventBus; import ch.psi.fda.core.Action; import ch.psi.fda.core.ActionLoop; import ch.psi.fda.messages.DataMessage; +import ch.psi.fda.messages.EndOfStreamMessage; import ch.psi.fda.messages.Metadata; import ch.psi.jcae.ChannelException; import ch.psi.jcae.ChannelService; @@ -165,7 +167,17 @@ public class CrlogicLoopStream extends CrlogicLoop implements ActionLoop { mainHeader = mapper.readValue(socket.recv(), MainHeader.class); } catch (IOException e) { throw new RuntimeException("Unable to decode main header", e); +// } catch(ZMQException e){ +// if(!stopReadoutThread){ +// // re-throw exception unless thread is stopped +// // Closing the socket at the end of a scan causes an ZMQException +// throw e; +// } +// else{ +// return; +// } } + if(!socket.hasReceiveMore()){ throw new RuntimeException("There is no data submessage"); } @@ -641,10 +653,11 @@ public class CrlogicLoopStream extends CrlogicLoop implements ActionLoop { } public void close() { - // Interrupt the readout thread to release the read block - readoutThread.interrupt(); - logger.info("Closing stream from IOC "+ioc); + + // Send end of stream message + eventbus.post(new EndOfStreamMessage(dataGroup)); + socket.close(); context.close(); socket = null; @@ -663,8 +676,13 @@ public class CrlogicLoopStream extends CrlogicLoop implements ActionLoop { @Override public void cleanup() { + + + System.out.println("cleanup"); stopReadoutThread = true; - readoutThread.interrupt(); +// readoutThread.interrupt(); + + close(); try { cservice.destroyAnnotatedChannels(template);