diff --git a/tomcat_bec/devices/gigafrost/std_daq_preview.py b/tomcat_bec/devices/gigafrost/std_daq_preview.py index c519785..6ea8a80 100644 --- a/tomcat_bec/devices/gigafrost/std_daq_preview.py +++ b/tomcat_bec/devices/gigafrost/std_daq_preview.py @@ -16,6 +16,7 @@ class StdDaqPreview: USER_ACCESS = ["start", "stop", "image"] _socket = None _zmq_thread = None + _monitor_mutex = threading.Lock() _shutdown_event = threading.Event() _throttle = 0.2 image = None @@ -48,7 +49,7 @@ class StdDaqPreview: self._shutdown_event.clear() self._zmq_thread = threading.Thread( - target=self._zmq_update_loop, daemon=True, name="StdDaq_live_preview" + target=self._zmq_monitor, daemon=True, name="StdDaq_live_preview" ) self._zmq_thread.start() @@ -58,27 +59,40 @@ class StdDaqPreview: self._zmq_thread.join() self._zmq_thread = None - def _zmq_update_loop(self): - if self._socket is None: - self.connect() + def _zmq_monitor(self): + """ZMQ stream monitor""" + + # Exit if another monitor is running + if self._monitor_mutex.locked(): + return + + with self._monitor_mutex: + # Open a new connection + if self._socket is None: + self.connect() - t_last = time.time() - while not self._shutdown_event.is_set(): try: - # pylint: disable=no-member - r = self._socket.recv_multipart(flags=zmq.NOBLOCK) + # Run the monitor loop + t_last = time.time() + while not self._shutdown_event.is_set(): + try: + # pylint: disable=no-member + r = self._socket.recv_multipart(flags=zmq.NOBLOCK) - # Throttle parsing and callbacks - t_curr = time.time() - if t_curr - t_last > self._throttle: - self._parse_data(r) - t_last = t_curr - except ValueError: - # Happens when ZMQ partially delivers the multipart message - pass - except zmq.error.Again: - # Happens when receive queue is empty - time.sleep(0.1) + # Throttle parsing and callbacks + t_curr = time.time() + if t_curr - t_last > self._throttle: + self._parse_data(r) + t_last = t_curr + except ValueError: + # Happens when ZMQ partially delivers the multipart message + pass + except zmq.error.Again: + # Happens when receive queue is empty + time.sleep(0.1) + finally: + # Stop receiving incoming data + self._socket.close() def _parse_data(self, data): # Length and throtling checks