Livestream with access mutex

This commit is contained in:
gac-x05la
2025-03-18 11:18:06 +01:00
parent 9051e1a9ee
commit c9a2ce0dc5

View File

@@ -16,6 +16,7 @@ class StdDaqPreview:
USER_ACCESS = ["start", "stop", "image"] USER_ACCESS = ["start", "stop", "image"]
_socket = None _socket = None
_zmq_thread = None _zmq_thread = None
_monitor_mutex = threading.Lock()
_shutdown_event = threading.Event() _shutdown_event = threading.Event()
_throttle = 0.2 _throttle = 0.2
image = None image = None
@@ -48,7 +49,7 @@ class StdDaqPreview:
self._shutdown_event.clear() self._shutdown_event.clear()
self._zmq_thread = threading.Thread( self._zmq_thread = threading.Thread(
target=self._zmq_update_loop, daemon=True, name="StdDaq_live_preview" target=self._zmq_monitor, daemon=True, name="StdDaq_live_preview"
) )
self._zmq_thread.start() self._zmq_thread.start()
@@ -58,27 +59,40 @@ class StdDaqPreview:
self._zmq_thread.join() self._zmq_thread.join()
self._zmq_thread = None self._zmq_thread = None
def _zmq_update_loop(self): def _zmq_monitor(self):
if self._socket is None: """ZMQ stream monitor"""
self.connect()
# Exit if another monitor is running
if self._monitor_mutex.locked():
return
with self._monitor_mutex:
# Open a new connection
if self._socket is None:
self.connect()
t_last = time.time()
while not self._shutdown_event.is_set():
try: try:
# pylint: disable=no-member # Run the monitor loop
r = self._socket.recv_multipart(flags=zmq.NOBLOCK) t_last = time.time()
while not self._shutdown_event.is_set():
try:
# pylint: disable=no-member
r = self._socket.recv_multipart(flags=zmq.NOBLOCK)
# Throttle parsing and callbacks # Throttle parsing and callbacks
t_curr = time.time() t_curr = time.time()
if t_curr - t_last > self._throttle: if t_curr - t_last > self._throttle:
self._parse_data(r) self._parse_data(r)
t_last = t_curr t_last = t_curr
except ValueError: except ValueError:
# Happens when ZMQ partially delivers the multipart message # Happens when ZMQ partially delivers the multipart message
pass pass
except zmq.error.Again: except zmq.error.Again:
# Happens when receive queue is empty # Happens when receive queue is empty
time.sleep(0.1) time.sleep(0.1)
finally:
# Stop receiving incoming data
self._socket.close()
def _parse_data(self, data): def _parse_data(self, data):
# Length and throtling checks # Length and throtling checks