Livestream with access mutex
This commit is contained in:
@@ -16,6 +16,7 @@ class StdDaqPreview:
|
||||
USER_ACCESS = ["start", "stop", "image"]
|
||||
_socket = None
|
||||
_zmq_thread = None
|
||||
_monitor_mutex = threading.Lock()
|
||||
_shutdown_event = threading.Event()
|
||||
_throttle = 0.2
|
||||
image = None
|
||||
@@ -48,7 +49,7 @@ class StdDaqPreview:
|
||||
|
||||
self._shutdown_event.clear()
|
||||
self._zmq_thread = threading.Thread(
|
||||
target=self._zmq_update_loop, daemon=True, name="StdDaq_live_preview"
|
||||
target=self._zmq_monitor, daemon=True, name="StdDaq_live_preview"
|
||||
)
|
||||
self._zmq_thread.start()
|
||||
|
||||
@@ -58,27 +59,40 @@ class StdDaqPreview:
|
||||
self._zmq_thread.join()
|
||||
self._zmq_thread = None
|
||||
|
||||
def _zmq_update_loop(self):
|
||||
if self._socket is None:
|
||||
self.connect()
|
||||
def _zmq_monitor(self):
|
||||
"""ZMQ stream monitor"""
|
||||
|
||||
# Exit if another monitor is running
|
||||
if self._monitor_mutex.locked():
|
||||
return
|
||||
|
||||
with self._monitor_mutex:
|
||||
# Open a new connection
|
||||
if self._socket is None:
|
||||
self.connect()
|
||||
|
||||
t_last = time.time()
|
||||
while not self._shutdown_event.is_set():
|
||||
try:
|
||||
# pylint: disable=no-member
|
||||
r = self._socket.recv_multipart(flags=zmq.NOBLOCK)
|
||||
# Run the monitor loop
|
||||
t_last = time.time()
|
||||
while not self._shutdown_event.is_set():
|
||||
try:
|
||||
# pylint: disable=no-member
|
||||
r = self._socket.recv_multipart(flags=zmq.NOBLOCK)
|
||||
|
||||
# Throttle parsing and callbacks
|
||||
t_curr = time.time()
|
||||
if t_curr - t_last > self._throttle:
|
||||
self._parse_data(r)
|
||||
t_last = t_curr
|
||||
except ValueError:
|
||||
# Happens when ZMQ partially delivers the multipart message
|
||||
pass
|
||||
except zmq.error.Again:
|
||||
# Happens when receive queue is empty
|
||||
time.sleep(0.1)
|
||||
# Throttle parsing and callbacks
|
||||
t_curr = time.time()
|
||||
if t_curr - t_last > self._throttle:
|
||||
self._parse_data(r)
|
||||
t_last = t_curr
|
||||
except ValueError:
|
||||
# Happens when ZMQ partially delivers the multipart message
|
||||
pass
|
||||
except zmq.error.Again:
|
||||
# Happens when receive queue is empty
|
||||
time.sleep(0.1)
|
||||
finally:
|
||||
# Stop receiving incoming data
|
||||
self._socket.close()
|
||||
|
||||
def _parse_data(self, data):
|
||||
# Length and throtling checks
|
||||
|
||||
Reference in New Issue
Block a user