From 0fb85d785b3fdca47024793eef2f65e3eff6ee72 Mon Sep 17 00:00:00 2001 From: appel_c Date: Mon, 26 Jan 2026 12:31:27 +0100 Subject: [PATCH] fix: Cleanup after tests at the beamline --- csaxs_bec/devices/jungfraujoch/eiger.py | 41 ++++-- .../jungfraujoch/jungfrau_joch_client.py | 48 ++++--- .../jungfraujoch/jungfraujoch_preview.py | 122 ++++++++++++++---- 3 files changed, 159 insertions(+), 52 deletions(-) diff --git a/csaxs_bec/devices/jungfraujoch/eiger.py b/csaxs_bec/devices/jungfraujoch/eiger.py index 09bb20c..15294fa 100644 --- a/csaxs_bec/devices/jungfraujoch/eiger.py +++ b/csaxs_bec/devices/jungfraujoch/eiger.py @@ -28,7 +28,7 @@ Some extra notes for setting up the detector: Hardware related notes: - If there is an HW issue with the detector, power cycling may help. -- The sls_detector package is available on console on /sls/X12SA/data/gac-x12sa/erik/micromamba +- The sls_detector package is available on console on /sls/x12sa/applications/erik/micromamba - Run: source setup_9m.sh # Be careful, this connects to the detector, so it should not be used during operation - Useful commands: @@ -120,8 +120,9 @@ class Eiger(PSIDeviceBase): super().__init__(name=name, scan_info=scan_info, device_manager=device_manager, **kwargs) self._host = f"{host}:{port}" self.jfj_client = JungfrauJochClient(host=self._host, parent=self) + # NOTE: fetch this information from JungfrauJochClient during on_connected! self.jfj_preview_client = JungfrauJochPreview( - url="tcp://129.129.95.114:5400", cb=self.preview_image.put + url="tcp://129.129.95.114:5400", cb=self._preview_callback ) # IP of sls-jfjoch-001.psi.ch on port 5400 for ZMQ stream self.device_manager = device_manager self.detector_name = detector_name @@ -132,6 +133,15 @@ class Eiger(PSIDeviceBase): if self.device_manager is not None: self.device_manager: DeviceManagerDS + def _preview_callback(self, message: dict) -> None: + if message.get("type", "") == "image": + data = message.get("data", {}).get("default", None) + if data is None: + logger.error(f"Received image message on device {self.name} without data.") + return + logger.info(f"Received preview image on device {self.name}") + self.preview_image.put(data) + @property def detector_distance(self) -> float: """The detector distance in mm.""" @@ -173,7 +183,7 @@ class Eiger(PSIDeviceBase): # Get available detectors available_detectors = self.jfj_client.api.config_select_detector_get(_request_timeout=5) - logger.info(f"Available detectors {available_detectors}") + logger.debug(f"Available detectors {available_detectors}") # Get current detector current_detector_name = "" if available_detectors.current_id is not None: @@ -191,6 +201,7 @@ class Eiger(PSIDeviceBase): raise RuntimeError( f"Detector {self.detector_name} is not in IDLE state, current state: {self.jfj_client.detector_state}. Please initialize the detector in the WEB UI: {self._host}." ) + # TODO - check again once Eiger should be initialized automatically, currently human initialization is expected # # Once the automation should be enabled, we may use here # detector_selection = [ @@ -266,11 +277,12 @@ class Eiger(PSIDeviceBase): incident_energy_ke_v=incident_energy, ) logger.info(f"Setting data_settings: {yaml.dump(data_settings.to_dict(), indent=4)}") - prep_time = start_time - time.time() - logger.debug(f"Prepared information for eiger to start acquisition in {prep_time:.2f}s") + prep_time = time.time() self.jfj_client.wait_for_idle(timeout=10, request_timeout=10) # Ensure we are in IDLE state self.jfj_client.start(settings=data_settings) # Takes around ~0.6s - logger.debug(f"Wait for IDLE and start call took {time.time()-start_time-prep_time:.2f}s") + logger.info( + f"On stage done for device {self.name} after {time.time()-start_time:.2f}s, with {time.time()-prep_time:.2f}s spent with communication to JungfrauJoch." + ) def on_unstage(self) -> DeviceStatus: """Called while unstaging the device.""" @@ -283,7 +295,9 @@ class Eiger(PSIDeviceBase): def _file_event_callback(self, status: DeviceStatus) -> None: """Callback to update the file_event signal when the acquisition is done.""" - logger.info(f"Acquisition done callback called for {self.name} for status {status.success}") + logger.debug( + f"Acquisition done callback called for {self.name} for status {status.success}" + ) self.file_event.put( file_path=self._full_path, done=status.done, @@ -297,15 +311,20 @@ class Eiger(PSIDeviceBase): def wait_for_complete(): start_time = time.time() timeout = 20 - for ii in range(timeout): - logger.info(f"Running loop with timeout {time.time() - start_time}.") - if self.jfj_client.wait_for_idle(timeout=1, request_timeout=10): + for _ in range(timeout): + if self.jfj_client.wait_for_idle( + timeout=1, request_timeout=10, raise_on_timeout=False + ): + logger.info(f"Device {self.name} completed acquisition.") return + logger.info( + f"Device {self.name} running loop to wait for complete, time elapsed: {time.time() - start_time}." + ) statistics: MeasurementStatistics = self.jfj_client.api.statistics_data_collection_get( _request_timeout=5 ) raise TimeoutError( - f"Timeout after waiting for detector {self.name} to complete for {time.time()-start_time:.2f}s, measurement statistics: {yaml.dump(statistics.to_dict(), indent=4)}" + f"Timeout after waiting for device {self.name} to complete for {time.time()-start_time:.2f}s, measurement statistics: {yaml.dump(statistics.to_dict(), indent=4)}" ) status = self.task_handler.submit_task(wait_for_complete, run=True) diff --git a/csaxs_bec/devices/jungfraujoch/jungfrau_joch_client.py b/csaxs_bec/devices/jungfraujoch/jungfrau_joch_client.py index 1db6b15..bddc4eb 100644 --- a/csaxs_bec/devices/jungfraujoch/jungfrau_joch_client.py +++ b/csaxs_bec/devices/jungfraujoch/jungfrau_joch_client.py @@ -3,6 +3,7 @@ from __future__ import annotations import enum +import threading import time import traceback from typing import TYPE_CHECKING @@ -152,20 +153,29 @@ class JungfrauJochClient: def stop(self, request_timeout: float = 0.5) -> None: """Stop the acquisition, this only logs errors and is not raising.""" - try: - self.api.cancel_post_with_http_info(_request_timeout=request_timeout) - except requests.exceptions.Timeout: - content = traceback.format_exc() - logger.error( - f"Timeout in JungFrauJochClient for device {self._parent_name} during stop: {content}" - ) - except Exception: - content = traceback.format_exc() - logger.error( - f"Error in JungFrauJochClient for device {self._parent_name} during stop: {content}" - ) - def wait_for_idle(self, timeout: int = 10, request_timeout: float | None = None) -> bool: + def _stop_call(self): + try: + self.api.cancel_post_with_http_info() # (_request_timeout=request_timeout) + except requests.exceptions.Timeout: + content = traceback.format_exc() + logger.error( + f"Timeout in JungFrauJochClient for device {self._parent_name} during stop: {content}" + ) + except Exception: + content = traceback.format_exc() + logger.error( + f"Error in JungFrauJochClient for device {self._parent_name} during stop: {content}" + ) + + thread = threading.Thread( + target=_stop_call, daemon=True, args=(self,), name="stop_jungfraujoch_thread" + ) + thread.start() + + def wait_for_idle( + self, timeout: int = 10, request_timeout: float | None = None, raise_on_timeout: bool = True + ) -> bool: """Wait for JungfrauJoch to be in Idle state. Blocking call with timeout. Args: @@ -178,9 +188,17 @@ class JungfrauJochClient: try: self.api.wait_till_done_post(timeout=timeout, _request_timeout=request_timeout) except requests.exceptions.Timeout: - raise TimeoutError(f"HTTP request timeout in wait_for_idle for {self._parent_name}") - except Exception: + if raise_on_timeout: + raise TimeoutError( + f"HTTP request timeout in wait_for_idle for {self._parent_name}." + ) + return False + except Exception as exc: content = traceback.format_exc() logger.debug(f"Waiting for device {self._parent_name} to become IDLE: {content}") + if raise_on_timeout: + raise JungfrauJochClientError( + f"Error in wait_for_idle for {self._parent_name}: {content}" + ) from exc return False return True diff --git a/csaxs_bec/devices/jungfraujoch/jungfraujoch_preview.py b/csaxs_bec/devices/jungfraujoch/jungfraujoch_preview.py index ed439ed..3aeb6f4 100644 --- a/csaxs_bec/devices/jungfraujoch/jungfraujoch_preview.py +++ b/csaxs_bec/devices/jungfraujoch/jungfraujoch_preview.py @@ -1,26 +1,39 @@ -"""Module for the Eiger preview ZMQ stream.""" +""" +Module for the JungfrauJoch preview ZMQ stream for the Eiger detector at cSAXS. +The Preview client is implemented for the JungfrauJoch ZMQ PUB-SUB interface, and +should be independent of the EIGER detector type. + +The client connects to the ZMQ PUB-SUB preview stream and calls a user provided callback +function with the decompressed messages received from the stream. The callback needs to be +able to deal with the different message types sent by the JungfrauJoch server ("start", +"image", "end") as described in the JungfrauJoch ZEROMQ preview stream documentation. +(https://jungfraujoch.readthedocs.io/en/latest/ZEROMQ_STREAM.html#preview-stream). +""" from __future__ import annotations -import json import threading import time from typing import Callable +import cbor2 import numpy as np import zmq from bec_lib.logger import bec_logger +from dectris.compression import decompress logger = bec_logger.logger -ZMQ_TOPIC_FILTER = b"" - -import cbor2 -import numpy as np -from dectris.compression import decompress +############################### +###### CBOR TAG DECODERS ###### +############################### +# Dectris specific CBOR tags and decoders for Jungfrau data +# Reference: +# https://github.com/dectris/documentation/blob/main/stream_v2/examples/client.py -def decode_multi_dim_array(tag, column_major): +def decode_multi_dim_array(tag: cbor2.CBORTag, column_major: bool = False): + """Decode a multi-dimensional array from a CBOR tag.""" dimensions, contents = tag.value if isinstance(contents, list): array = np.empty((len(contents),), dtype=object) @@ -32,17 +45,23 @@ def decode_multi_dim_array(tag, column_major): return array.reshape(dimensions, order="F" if column_major else "C") -def decode_typed_array(tag, dtype): +def decode_typed_array(tag: cbor2.CBORTag, dtype: str): + """Decode a typed array from a CBOR tag.""" if not isinstance(tag.value, bytes): raise cbor2.CBORDecodeValueError("expected byte string in typed array") return np.frombuffer(tag.value, dtype=dtype) -def decode_dectris_compression(tag): +def decode_dectris_compression(tag: cbor2.CBORTag): + """Decode a Dectris compressed array from a CBOR tag.""" algorithm, elem_size, encoded = tag.value return decompress(encoded, algorithm, elem_size=elem_size) +######################################### +#### Dectris CBOR TAG Extensions ######## +######################################### + tag_decoders = { 40: lambda tag: decode_multi_dim_array(tag, column_major=False), 64: lambda tag: decode_typed_array(tag, dtype="u1"), @@ -73,12 +92,42 @@ tag_decoders = { } -def tag_hook(decoder, tag): +def tag_hook(tag: int): + """Get the decoder for Dectris specific CBOR tags. tag must be in tag_decoders.""" tag_decoder = tag_decoders.get(tag.tag) return tag_decoder(tag) if tag_decoder else tag +###################### +#### ZMQ Settings #### +###################### + +ZMQ_TOPIC_FILTER = b"" # Subscribe to all topics +ZMQ_CONFLATE_SETTING = 1 # Keep only the most recent message +ZMQ_RCVHWM_SETTING = 1 # Set high water mark to 1, this configures the max number of queue messages + + +################################# +#### Jungfrau Preview Client #### +################################# + + class JungfrauJochPreview: + """ + Preview client for the JungfrauJoch ZMQ preview stream. The client is started with + a URL to receive the data from the JungfrauJoch PUB-SUB preview interface, and a + callback function that is called with messages received from the preview stream. + The callback needs to be able to deal with the different message types sent + by the JungfrauJoch server ("start", "image", "end") as described in the + JungfrauJoch ZEROMQ preview stream documentation. Messages are dictionary dumps. + (https://jungfraujoch.readthedocs.io/en/latest/ZEROMQ_STREAM.html#preview-stream). + + Args: + url (str): ZMQ PUB-SUB preview stream URL. + cb (Callable): Callback function called with messages received from the stream. + + """ + USER_ACCESS = ["start", "stop"] def __init__(self, url: str, cb: Callable): @@ -89,16 +138,18 @@ class JungfrauJochPreview: self._on_update_callback = cb def connect(self): - """Connect to the JungfrauJoch PUB-SUB streaming interface - - JungfrauJoch may reject connection for a few seconds when it restarts, - so if it fails, wait a bit and try to connect again. + """ + Connect to the JungfrauJoch PUB-SUB streaming interface. If the connection is refused + it will reattempt a second time after a one second delay. """ # pylint: disable=no-member context = zmq.Context() self._socket = context.socket(zmq.SUB) + self._socket.setsockopt(zmq.CONFLATE, ZMQ_CONFLATE_SETTING) self._socket.setsockopt(zmq.SUBSCRIBE, ZMQ_TOPIC_FILTER) + self._socket.setsockopt(zmq.RCVHWM, ZMQ_RCVHWM_SETTING) + try: self._socket.connect(self.url) except ConnectionRefusedError: @@ -106,17 +157,20 @@ class JungfrauJochPreview: self._socket.connect(self.url) def start(self): + """Start the ZMQ update loop in a background thread.""" self._zmq_thread = threading.Thread( target=self._zmq_update_loop, daemon=True, name="JungfrauJoch_live_preview" ) self._zmq_thread.start() def stop(self): + """Stop the ZMQ update loop and wait for the thread to finish.""" self._shutdown_event.set() if self._zmq_thread: self._zmq_thread.join() def _zmq_update_loop(self): + """Zmq update loop with polling for new data. The loop runs at maximum 10 Hz.""" while not self._shutdown_event.is_set(): if self._socket is None: self.connect() @@ -127,17 +181,22 @@ class JungfrauJochPreview: pass except zmq.error.Again: # Happens when receive queue is empty - time.sleep(0.1) + time.sleep(0.1) # NOTE: Change sleep time to control polling rate def _poll(self): """ - Poll the ZMQ socket for new data. It will throttle the data update and - only subscribe to the topic for a single update. This is not very nice - but it seems like there is currently no option to set the update rate on - the backend. + Poll the ZMQ socket for new data. We are currently subscribing and unsubscribing + for each poll loop to avoid receiving too many messages. The zmq update loop is + running with a maximum of 10 Hz, and we wait an additional 0.2 seconds here to + avoid busy waiting. Therefore, the maximum data rate is around 3 Hz. + + NOTE: + This can be optimized in the future as JungfrauJoch supports controlling the update + rate from the server side. We still safeguard here to avoid that the device-server + becomes overloaded with messages. """ - if self._shutdown_event.wait(0.2): + if self._shutdown_event.wait(0.2): # NOTE: Change to adjust throttle rate in polling. return try: @@ -152,8 +211,19 @@ class JungfrauJochPreview: # Unsubscribe from the topic self._socket.setsockopt(zmq.UNSUBSCRIBE, ZMQ_TOPIC_FILTER) - def _parse_data(self, data): - message = cbor2.loads(data, tag_hook=tag_hook) - # Parse message to data, call callback with data - for cb in self._on_update_callback: - cb(message) + def _parse_data(self, bytes_list: list[bytes]): + """ + Parse the received ZMQ data from the JungfrauJoch preview stream. + We will call the _on_update_callback with the decompressed messages. + + The callback needs to be able to deal with the different message types sent + by the JungfrauJoch server ("start", "image", "end") as described in the + JungfrauJoch ZEROMQ preview stream documentation. Messages are dictionary dumps. + (https://jungfraujoch.readthedocs.io/en/latest/ZEROMQ_STREAM.html#preview-stream). + + Args: + bytes_list (list[bytes]): List of byte messages received from ZMQ recv_multipart. + """ + for byte_msg in bytes_list: + msg = cbor2.loads(byte_msg, tag_hook=tag_hook) + self._on_update_callback(msg)