Livestream with access mutex
This commit is contained in:
@@ -16,6 +16,7 @@ class StdDaqPreview:
|
|||||||
USER_ACCESS = ["start", "stop", "image"]
|
USER_ACCESS = ["start", "stop", "image"]
|
||||||
_socket = None
|
_socket = None
|
||||||
_zmq_thread = None
|
_zmq_thread = None
|
||||||
|
_monitor_mutex = threading.Lock()
|
||||||
_shutdown_event = threading.Event()
|
_shutdown_event = threading.Event()
|
||||||
_throttle = 0.2
|
_throttle = 0.2
|
||||||
image = None
|
image = None
|
||||||
@@ -48,7 +49,7 @@ class StdDaqPreview:
|
|||||||
|
|
||||||
self._shutdown_event.clear()
|
self._shutdown_event.clear()
|
||||||
self._zmq_thread = threading.Thread(
|
self._zmq_thread = threading.Thread(
|
||||||
target=self._zmq_update_loop, daemon=True, name="StdDaq_live_preview"
|
target=self._zmq_monitor, daemon=True, name="StdDaq_live_preview"
|
||||||
)
|
)
|
||||||
self._zmq_thread.start()
|
self._zmq_thread.start()
|
||||||
|
|
||||||
@@ -58,27 +59,40 @@ class StdDaqPreview:
|
|||||||
self._zmq_thread.join()
|
self._zmq_thread.join()
|
||||||
self._zmq_thread = None
|
self._zmq_thread = None
|
||||||
|
|
||||||
def _zmq_update_loop(self):
|
def _zmq_monitor(self):
|
||||||
if self._socket is None:
|
"""ZMQ stream monitor"""
|
||||||
self.connect()
|
|
||||||
|
# Exit if another monitor is running
|
||||||
|
if self._monitor_mutex.locked():
|
||||||
|
return
|
||||||
|
|
||||||
|
with self._monitor_mutex:
|
||||||
|
# Open a new connection
|
||||||
|
if self._socket is None:
|
||||||
|
self.connect()
|
||||||
|
|
||||||
t_last = time.time()
|
|
||||||
while not self._shutdown_event.is_set():
|
|
||||||
try:
|
try:
|
||||||
# pylint: disable=no-member
|
# Run the monitor loop
|
||||||
r = self._socket.recv_multipart(flags=zmq.NOBLOCK)
|
t_last = time.time()
|
||||||
|
while not self._shutdown_event.is_set():
|
||||||
|
try:
|
||||||
|
# pylint: disable=no-member
|
||||||
|
r = self._socket.recv_multipart(flags=zmq.NOBLOCK)
|
||||||
|
|
||||||
# Throttle parsing and callbacks
|
# Throttle parsing and callbacks
|
||||||
t_curr = time.time()
|
t_curr = time.time()
|
||||||
if t_curr - t_last > self._throttle:
|
if t_curr - t_last > self._throttle:
|
||||||
self._parse_data(r)
|
self._parse_data(r)
|
||||||
t_last = t_curr
|
t_last = t_curr
|
||||||
except ValueError:
|
except ValueError:
|
||||||
# Happens when ZMQ partially delivers the multipart message
|
# Happens when ZMQ partially delivers the multipart message
|
||||||
pass
|
pass
|
||||||
except zmq.error.Again:
|
except zmq.error.Again:
|
||||||
# Happens when receive queue is empty
|
# Happens when receive queue is empty
|
||||||
time.sleep(0.1)
|
time.sleep(0.1)
|
||||||
|
finally:
|
||||||
|
# Stop receiving incoming data
|
||||||
|
self._socket.close()
|
||||||
|
|
||||||
def _parse_data(self, data):
|
def _parse_data(self, data):
|
||||||
# Length and throtling checks
|
# Length and throtling checks
|
||||||
|
|||||||
Reference in New Issue
Block a user