refactor(jungfraujoch-preview): Improve Jungfraujoch Preview module

This commit is contained in:
2026-01-29 09:06:28 +01:00
parent 8a69c7aa36
commit efd51462fc

View File

@@ -62,6 +62,7 @@ def decode_dectris_compression(tag: cbor2.CBORTag):
#### Dectris CBOR TAG Extensions ########
#########################################
# Mapping of various additional CBOR tags from Dectris to decoder functions
tag_decoders = {
40: lambda tag: decode_multi_dim_array(tag, column_major=False),
64: lambda tag: decode_typed_array(tag, dtype="u1"),
@@ -88,12 +89,15 @@ tag_decoders = {
86: lambda tag: decode_typed_array(tag, dtype="<f8"),
87: lambda tag: decode_typed_array(tag, dtype="<f16"),
1040: lambda tag: decode_multi_dim_array(tag, column_major=True),
56500: lambda tag: decode_dectris_compression(tag),
56500: lambda tag: decode_dectris_compression(tag), # pylint: disable=unnecessary-lambda
}
def tag_hook(decoder, tag: int):
"""Get the decoder for Dectris specific CBOR tags. tag must be in tag_decoders."""
"""
Tag hook for the cbor2.loads method. Both arguments "decoder" and "tag" mus be present.
We use the tag to choose the respective decoder from the tag_decoders registry if available.
"""
tag_decoder = tag_decoders.get(tag.tag)
return tag_decoder(tag) if tag_decoder else tag
@@ -125,7 +129,6 @@ class JungfrauJochPreview:
Args:
url (str): ZMQ PUB-SUB preview stream URL.
cb (Callable): Callback function called with messages received from the stream.
"""
USER_ACCESS = ["start", "stop"]
@@ -169,8 +172,14 @@ class JungfrauJochPreview:
if self._zmq_thread:
self._zmq_thread.join(timeout=1.0)
def _zmq_update_loop(self):
"""Zmq update loop with polling for new data. The loop runs at maximum 10 Hz."""
def _zmq_update_loop(self, poll_interval: float = 0.2):
"""
ZMQ update loop running in a background thread. The polling is throttled by
the poll_interval parameter.
Args:
poll_interval (float): Time in seconds to wait between polling attempts.
"""
while not self._shutdown_event.is_set():
if self._socket is None:
self.connect()
@@ -183,23 +192,18 @@ class JungfrauJochPreview:
logger.debug(
f"ZMQ Again exception, receive queue is empty for JFJ preview at {self.url}."
)
# Happens when receive queue is empty
time.sleep(0.1) # NOTE: Change sleep time to control polling rate
finally:
# We throttle the polling to avoid heavy load on the device server
time.sleep(poll_interval)
def _poll(self):
"""
Poll the ZMQ socket for new data. We are currently subscribing and unsubscribing
for each poll loop to avoid receiving too many messages. The zmq update loop is
running with a maximum of 10 Hz, and we wait an additional 0.2 seconds here to
avoid busy waiting. Therefore, the maximum data rate is around 3 Hz.
NOTE:
This can be optimized in the future as JungfrauJoch supports controlling the update
rate from the server side. We still safeguard here to avoid that the device-server
becomes overloaded with messages.
for each poll loop to avoid receiving too many messages. Throttling of the update
loop is handled in the _zmq_update_loop method.
"""
if self._shutdown_event.wait(0.2): # NOTE: Change to adjust throttle rate in polling.
if self._shutdown_event.is_set():
return
try:
@@ -217,7 +221,7 @@ class JungfrauJochPreview:
def _parse_data(self, bytes_list: list[bytes]):
"""
Parse the received ZMQ data from the JungfrauJoch preview stream.
We will call the _on_update_callback with the decompressed messages.
We will call the _on_update_callback with the decompressed messages as a dictionary.
The callback needs to be able to deal with the different message types sent
by the JungfrauJoch server ("start", "image", "end") as described in the