From cb968abe73f3a3e1283a86362e02f5f772119576 Mon Sep 17 00:00:00 2001 From: x12sa Date: Thu, 27 Nov 2025 10:46:20 +0100 Subject: [PATCH 01/10] minor changes from the beamline --- csaxs_bec/devices/jungfraujoch/eiger.py | 10 ++++++---- csaxs_bec/devices/jungfraujoch/eiger_9m.py | 6 +++--- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/csaxs_bec/devices/jungfraujoch/eiger.py b/csaxs_bec/devices/jungfraujoch/eiger.py index 70ee01f..b8319d7 100644 --- a/csaxs_bec/devices/jungfraujoch/eiger.py +++ b/csaxs_bec/devices/jungfraujoch/eiger.py @@ -173,9 +173,10 @@ class Eiger(PSIDeviceBase): # Get available detectors available_detectors = self.jfj_client.api.config_select_detector_get(_request_timeout=5) + logger.info(f"Available detectors {available_detectors}") # Get current detector current_detector_name = "" - if available_detectors.current_id: + if available_detectors.current_id is not None: detector_selection = [ det.description for det in available_detectors.detectors @@ -260,7 +261,7 @@ class Eiger(PSIDeviceBase): detector_distance_mm=self.detector_distance, incident_energy_ke_v=incident_energy, ) - logger.debug(f"Setting data_settings: {yaml.dump(data_settings.to_dict(), indent=4)}") + logger.info(f"Setting data_settings: {yaml.dump(data_settings.to_dict(), indent=4)}") prep_time = start_time - time.time() logger.debug(f"Prepared information for eiger to start acquisition in {prep_time:.2f}s") self.jfj_client.wait_for_idle(timeout=10, request_timeout=10) # Ensure we are in IDLE state @@ -291,8 +292,9 @@ class Eiger(PSIDeviceBase): def wait_for_complete(): start_time = time.time() - timeout = 10 - for _ in range(timeout): + timeout = 20 + for ii in range(timeout): + logger.info(f"Running loop with timeout {time.time() - start_time}.") if self.jfj_client.wait_for_idle(timeout=1, request_timeout=10): return statistics: MeasurementStatistics = self.jfj_client.api.statistics_data_collection_get( diff --git a/csaxs_bec/devices/jungfraujoch/eiger_9m.py b/csaxs_bec/devices/jungfraujoch/eiger_9m.py index f44ca1d..f206103 100644 --- a/csaxs_bec/devices/jungfraujoch/eiger_9m.py +++ b/csaxs_bec/devices/jungfraujoch/eiger_9m.py @@ -21,18 +21,18 @@ if TYPE_CHECKING: # pragma no cover from bec_server.device_server.device_server import DeviceManagerDS EIGER9M_READOUT_TIME_US = 500e-6 # 500 microseconds in s -DETECTOR_NAME = "EIGER 8.5M (tmp)" # "EIGER 9M"" +DETECTOR_NAME = "EIGER 9M" # "EIGER 9M"" # pylint:disable=invalid-name class Eiger9M(Eiger): """ - Eiger 1.5M specific integration for the in-vaccum Eiger. + EIGER 9M specific integration for the in-vaccum Eiger. The logic implemented here is coupled to the DelayGenerator integration, repsonsible for the global triggering of all devices through a single Trigger logic. Please check the eiger.py class for more details about the integration of relevant backend - services. The detector_name must be set to "EIGER 1.5M: + services. The detector_name must be set to "EIGER 9M": """ USER_ACCESS = Eiger.USER_ACCESS + [] # Add more user_access methods here. -- 2.49.1 From 9f5799385c124a17fc66ad9804fc3667089c4292 Mon Sep 17 00:00:00 2001 From: appel_c Date: Mon, 8 Dec 2025 18:11:11 +0100 Subject: [PATCH 02/10] fix(eiger): add dectris-decompression and dependencies. --- csaxs_bec/devices/jungfraujoch/eiger.py | 8 ++- .../jungfraujoch/jungfraujoch_preview.py | 69 ++++++++++++++++++- pyproject.toml | 2 + 3 files changed, 74 insertions(+), 5 deletions(-) diff --git a/csaxs_bec/devices/jungfraujoch/eiger.py b/csaxs_bec/devices/jungfraujoch/eiger.py index b8319d7..09bb20c 100644 --- a/csaxs_bec/devices/jungfraujoch/eiger.py +++ b/csaxs_bec/devices/jungfraujoch/eiger.py @@ -84,7 +84,7 @@ class EigerError(Exception): class Eiger(PSIDeviceBase): """ - Base integration of the Eiger1.5M and Eiger9M at cSAXS. All relevant + Base integration of the Eiger1.5M and Eiger9M at cSAXS. """ USER_ACCESS = ["detector_distance", "beam_center"] @@ -208,12 +208,13 @@ class Eiger(PSIDeviceBase): # Setup Detector settings, here we may also set the energy already as this might be time consuming settings = DetectorSettings(frame_time_us=int(500), timing=DetectorTiming.TRIGGER) - self.jfj_client.set_detector_settings(settings, timeout=10) + self.jfj_client.set_detector_settings(settings, timeout=5) # Set the file writer to the appropriate output for the HDF5 file file_writer_settings = FileWriterSettings(overwrite=True, format=FileWriterFormat.NXMXVDS) logger.debug( f"Setting writer_settings: {yaml.dump(file_writer_settings.to_dict(), indent=4)}" ) + # Setup the file writer settings self.jfj_client.api.config_file_writer_put( file_writer_settings=file_writer_settings, _request_timeout=10 ) @@ -230,6 +231,7 @@ class Eiger(PSIDeviceBase): """ start_time = time.time() scan_msg = self.scan_info.msg + # Set acquisition parameter # TODO add check of mono energy, this can then also be passed to DatasetSettings incident_energy = 12.0 @@ -250,8 +252,10 @@ class Eiger(PSIDeviceBase): successful=False, hinted_h5_entries={"data": "entry/data/data"}, ) + # JFJ adds _master.h5 automatically path = os.path.relpath(self._full_path, start="/sls/x12sa/data").removesuffix("_master.h5") + data_settings = DatasetSettings( image_time_us=int(frame_time_us * 1e6), # This is currently ignored ntrigger=ntrigger, diff --git a/csaxs_bec/devices/jungfraujoch/jungfraujoch_preview.py b/csaxs_bec/devices/jungfraujoch/jungfraujoch_preview.py index bfda46d..ed439ed 100644 --- a/csaxs_bec/devices/jungfraujoch/jungfraujoch_preview.py +++ b/csaxs_bec/devices/jungfraujoch/jungfraujoch_preview.py @@ -15,6 +15,68 @@ logger = bec_logger.logger ZMQ_TOPIC_FILTER = b"" +import cbor2 +import numpy as np +from dectris.compression import decompress + + +def decode_multi_dim_array(tag, column_major): + dimensions, contents = tag.value + if isinstance(contents, list): + array = np.empty((len(contents),), dtype=object) + array[:] = contents + elif isinstance(contents, (np.ndarray, np.generic)): + array = contents + else: + raise cbor2.CBORDecodeValueError("expected array or typed array") + return array.reshape(dimensions, order="F" if column_major else "C") + + +def decode_typed_array(tag, dtype): + if not isinstance(tag.value, bytes): + raise cbor2.CBORDecodeValueError("expected byte string in typed array") + return np.frombuffer(tag.value, dtype=dtype) + + +def decode_dectris_compression(tag): + algorithm, elem_size, encoded = tag.value + return decompress(encoded, algorithm, elem_size=elem_size) + + +tag_decoders = { + 40: lambda tag: decode_multi_dim_array(tag, column_major=False), + 64: lambda tag: decode_typed_array(tag, dtype="u1"), + 65: lambda tag: decode_typed_array(tag, dtype=">u2"), + 66: lambda tag: decode_typed_array(tag, dtype=">u4"), + 67: lambda tag: decode_typed_array(tag, dtype=">u8"), + 68: lambda tag: decode_typed_array(tag, dtype="u1"), + 69: lambda tag: decode_typed_array(tag, dtype=" Date: Mon, 26 Jan 2026 12:31:27 +0100 Subject: [PATCH 03/10] fix: Cleanup after tests at the beamline --- csaxs_bec/devices/jungfraujoch/eiger.py | 41 ++++-- .../jungfraujoch/jungfrau_joch_client.py | 48 ++++--- .../jungfraujoch/jungfraujoch_preview.py | 122 ++++++++++++++---- 3 files changed, 159 insertions(+), 52 deletions(-) diff --git a/csaxs_bec/devices/jungfraujoch/eiger.py b/csaxs_bec/devices/jungfraujoch/eiger.py index 09bb20c..15294fa 100644 --- a/csaxs_bec/devices/jungfraujoch/eiger.py +++ b/csaxs_bec/devices/jungfraujoch/eiger.py @@ -28,7 +28,7 @@ Some extra notes for setting up the detector: Hardware related notes: - If there is an HW issue with the detector, power cycling may help. -- The sls_detector package is available on console on /sls/X12SA/data/gac-x12sa/erik/micromamba +- The sls_detector package is available on console on /sls/x12sa/applications/erik/micromamba - Run: source setup_9m.sh # Be careful, this connects to the detector, so it should not be used during operation - Useful commands: @@ -120,8 +120,9 @@ class Eiger(PSIDeviceBase): super().__init__(name=name, scan_info=scan_info, device_manager=device_manager, **kwargs) self._host = f"{host}:{port}" self.jfj_client = JungfrauJochClient(host=self._host, parent=self) + # NOTE: fetch this information from JungfrauJochClient during on_connected! self.jfj_preview_client = JungfrauJochPreview( - url="tcp://129.129.95.114:5400", cb=self.preview_image.put + url="tcp://129.129.95.114:5400", cb=self._preview_callback ) # IP of sls-jfjoch-001.psi.ch on port 5400 for ZMQ stream self.device_manager = device_manager self.detector_name = detector_name @@ -132,6 +133,15 @@ class Eiger(PSIDeviceBase): if self.device_manager is not None: self.device_manager: DeviceManagerDS + def _preview_callback(self, message: dict) -> None: + if message.get("type", "") == "image": + data = message.get("data", {}).get("default", None) + if data is None: + logger.error(f"Received image message on device {self.name} without data.") + return + logger.info(f"Received preview image on device {self.name}") + self.preview_image.put(data) + @property def detector_distance(self) -> float: """The detector distance in mm.""" @@ -173,7 +183,7 @@ class Eiger(PSIDeviceBase): # Get available detectors available_detectors = self.jfj_client.api.config_select_detector_get(_request_timeout=5) - logger.info(f"Available detectors {available_detectors}") + logger.debug(f"Available detectors {available_detectors}") # Get current detector current_detector_name = "" if available_detectors.current_id is not None: @@ -191,6 +201,7 @@ class Eiger(PSIDeviceBase): raise RuntimeError( f"Detector {self.detector_name} is not in IDLE state, current state: {self.jfj_client.detector_state}. Please initialize the detector in the WEB UI: {self._host}." ) + # TODO - check again once Eiger should be initialized automatically, currently human initialization is expected # # Once the automation should be enabled, we may use here # detector_selection = [ @@ -266,11 +277,12 @@ class Eiger(PSIDeviceBase): incident_energy_ke_v=incident_energy, ) logger.info(f"Setting data_settings: {yaml.dump(data_settings.to_dict(), indent=4)}") - prep_time = start_time - time.time() - logger.debug(f"Prepared information for eiger to start acquisition in {prep_time:.2f}s") + prep_time = time.time() self.jfj_client.wait_for_idle(timeout=10, request_timeout=10) # Ensure we are in IDLE state self.jfj_client.start(settings=data_settings) # Takes around ~0.6s - logger.debug(f"Wait for IDLE and start call took {time.time()-start_time-prep_time:.2f}s") + logger.info( + f"On stage done for device {self.name} after {time.time()-start_time:.2f}s, with {time.time()-prep_time:.2f}s spent with communication to JungfrauJoch." + ) def on_unstage(self) -> DeviceStatus: """Called while unstaging the device.""" @@ -283,7 +295,9 @@ class Eiger(PSIDeviceBase): def _file_event_callback(self, status: DeviceStatus) -> None: """Callback to update the file_event signal when the acquisition is done.""" - logger.info(f"Acquisition done callback called for {self.name} for status {status.success}") + logger.debug( + f"Acquisition done callback called for {self.name} for status {status.success}" + ) self.file_event.put( file_path=self._full_path, done=status.done, @@ -297,15 +311,20 @@ class Eiger(PSIDeviceBase): def wait_for_complete(): start_time = time.time() timeout = 20 - for ii in range(timeout): - logger.info(f"Running loop with timeout {time.time() - start_time}.") - if self.jfj_client.wait_for_idle(timeout=1, request_timeout=10): + for _ in range(timeout): + if self.jfj_client.wait_for_idle( + timeout=1, request_timeout=10, raise_on_timeout=False + ): + logger.info(f"Device {self.name} completed acquisition.") return + logger.info( + f"Device {self.name} running loop to wait for complete, time elapsed: {time.time() - start_time}." + ) statistics: MeasurementStatistics = self.jfj_client.api.statistics_data_collection_get( _request_timeout=5 ) raise TimeoutError( - f"Timeout after waiting for detector {self.name} to complete for {time.time()-start_time:.2f}s, measurement statistics: {yaml.dump(statistics.to_dict(), indent=4)}" + f"Timeout after waiting for device {self.name} to complete for {time.time()-start_time:.2f}s, measurement statistics: {yaml.dump(statistics.to_dict(), indent=4)}" ) status = self.task_handler.submit_task(wait_for_complete, run=True) diff --git a/csaxs_bec/devices/jungfraujoch/jungfrau_joch_client.py b/csaxs_bec/devices/jungfraujoch/jungfrau_joch_client.py index 1db6b15..bddc4eb 100644 --- a/csaxs_bec/devices/jungfraujoch/jungfrau_joch_client.py +++ b/csaxs_bec/devices/jungfraujoch/jungfrau_joch_client.py @@ -3,6 +3,7 @@ from __future__ import annotations import enum +import threading import time import traceback from typing import TYPE_CHECKING @@ -152,20 +153,29 @@ class JungfrauJochClient: def stop(self, request_timeout: float = 0.5) -> None: """Stop the acquisition, this only logs errors and is not raising.""" - try: - self.api.cancel_post_with_http_info(_request_timeout=request_timeout) - except requests.exceptions.Timeout: - content = traceback.format_exc() - logger.error( - f"Timeout in JungFrauJochClient for device {self._parent_name} during stop: {content}" - ) - except Exception: - content = traceback.format_exc() - logger.error( - f"Error in JungFrauJochClient for device {self._parent_name} during stop: {content}" - ) - def wait_for_idle(self, timeout: int = 10, request_timeout: float | None = None) -> bool: + def _stop_call(self): + try: + self.api.cancel_post_with_http_info() # (_request_timeout=request_timeout) + except requests.exceptions.Timeout: + content = traceback.format_exc() + logger.error( + f"Timeout in JungFrauJochClient for device {self._parent_name} during stop: {content}" + ) + except Exception: + content = traceback.format_exc() + logger.error( + f"Error in JungFrauJochClient for device {self._parent_name} during stop: {content}" + ) + + thread = threading.Thread( + target=_stop_call, daemon=True, args=(self,), name="stop_jungfraujoch_thread" + ) + thread.start() + + def wait_for_idle( + self, timeout: int = 10, request_timeout: float | None = None, raise_on_timeout: bool = True + ) -> bool: """Wait for JungfrauJoch to be in Idle state. Blocking call with timeout. Args: @@ -178,9 +188,17 @@ class JungfrauJochClient: try: self.api.wait_till_done_post(timeout=timeout, _request_timeout=request_timeout) except requests.exceptions.Timeout: - raise TimeoutError(f"HTTP request timeout in wait_for_idle for {self._parent_name}") - except Exception: + if raise_on_timeout: + raise TimeoutError( + f"HTTP request timeout in wait_for_idle for {self._parent_name}." + ) + return False + except Exception as exc: content = traceback.format_exc() logger.debug(f"Waiting for device {self._parent_name} to become IDLE: {content}") + if raise_on_timeout: + raise JungfrauJochClientError( + f"Error in wait_for_idle for {self._parent_name}: {content}" + ) from exc return False return True diff --git a/csaxs_bec/devices/jungfraujoch/jungfraujoch_preview.py b/csaxs_bec/devices/jungfraujoch/jungfraujoch_preview.py index ed439ed..3aeb6f4 100644 --- a/csaxs_bec/devices/jungfraujoch/jungfraujoch_preview.py +++ b/csaxs_bec/devices/jungfraujoch/jungfraujoch_preview.py @@ -1,26 +1,39 @@ -"""Module for the Eiger preview ZMQ stream.""" +""" +Module for the JungfrauJoch preview ZMQ stream for the Eiger detector at cSAXS. +The Preview client is implemented for the JungfrauJoch ZMQ PUB-SUB interface, and +should be independent of the EIGER detector type. + +The client connects to the ZMQ PUB-SUB preview stream and calls a user provided callback +function with the decompressed messages received from the stream. The callback needs to be +able to deal with the different message types sent by the JungfrauJoch server ("start", +"image", "end") as described in the JungfrauJoch ZEROMQ preview stream documentation. +(https://jungfraujoch.readthedocs.io/en/latest/ZEROMQ_STREAM.html#preview-stream). +""" from __future__ import annotations -import json import threading import time from typing import Callable +import cbor2 import numpy as np import zmq from bec_lib.logger import bec_logger +from dectris.compression import decompress logger = bec_logger.logger -ZMQ_TOPIC_FILTER = b"" - -import cbor2 -import numpy as np -from dectris.compression import decompress +############################### +###### CBOR TAG DECODERS ###### +############################### +# Dectris specific CBOR tags and decoders for Jungfrau data +# Reference: +# https://github.com/dectris/documentation/blob/main/stream_v2/examples/client.py -def decode_multi_dim_array(tag, column_major): +def decode_multi_dim_array(tag: cbor2.CBORTag, column_major: bool = False): + """Decode a multi-dimensional array from a CBOR tag.""" dimensions, contents = tag.value if isinstance(contents, list): array = np.empty((len(contents),), dtype=object) @@ -32,17 +45,23 @@ def decode_multi_dim_array(tag, column_major): return array.reshape(dimensions, order="F" if column_major else "C") -def decode_typed_array(tag, dtype): +def decode_typed_array(tag: cbor2.CBORTag, dtype: str): + """Decode a typed array from a CBOR tag.""" if not isinstance(tag.value, bytes): raise cbor2.CBORDecodeValueError("expected byte string in typed array") return np.frombuffer(tag.value, dtype=dtype) -def decode_dectris_compression(tag): +def decode_dectris_compression(tag: cbor2.CBORTag): + """Decode a Dectris compressed array from a CBOR tag.""" algorithm, elem_size, encoded = tag.value return decompress(encoded, algorithm, elem_size=elem_size) +######################################### +#### Dectris CBOR TAG Extensions ######## +######################################### + tag_decoders = { 40: lambda tag: decode_multi_dim_array(tag, column_major=False), 64: lambda tag: decode_typed_array(tag, dtype="u1"), @@ -73,12 +92,42 @@ tag_decoders = { } -def tag_hook(decoder, tag): +def tag_hook(tag: int): + """Get the decoder for Dectris specific CBOR tags. tag must be in tag_decoders.""" tag_decoder = tag_decoders.get(tag.tag) return tag_decoder(tag) if tag_decoder else tag +###################### +#### ZMQ Settings #### +###################### + +ZMQ_TOPIC_FILTER = b"" # Subscribe to all topics +ZMQ_CONFLATE_SETTING = 1 # Keep only the most recent message +ZMQ_RCVHWM_SETTING = 1 # Set high water mark to 1, this configures the max number of queue messages + + +################################# +#### Jungfrau Preview Client #### +################################# + + class JungfrauJochPreview: + """ + Preview client for the JungfrauJoch ZMQ preview stream. The client is started with + a URL to receive the data from the JungfrauJoch PUB-SUB preview interface, and a + callback function that is called with messages received from the preview stream. + The callback needs to be able to deal with the different message types sent + by the JungfrauJoch server ("start", "image", "end") as described in the + JungfrauJoch ZEROMQ preview stream documentation. Messages are dictionary dumps. + (https://jungfraujoch.readthedocs.io/en/latest/ZEROMQ_STREAM.html#preview-stream). + + Args: + url (str): ZMQ PUB-SUB preview stream URL. + cb (Callable): Callback function called with messages received from the stream. + + """ + USER_ACCESS = ["start", "stop"] def __init__(self, url: str, cb: Callable): @@ -89,16 +138,18 @@ class JungfrauJochPreview: self._on_update_callback = cb def connect(self): - """Connect to the JungfrauJoch PUB-SUB streaming interface - - JungfrauJoch may reject connection for a few seconds when it restarts, - so if it fails, wait a bit and try to connect again. + """ + Connect to the JungfrauJoch PUB-SUB streaming interface. If the connection is refused + it will reattempt a second time after a one second delay. """ # pylint: disable=no-member context = zmq.Context() self._socket = context.socket(zmq.SUB) + self._socket.setsockopt(zmq.CONFLATE, ZMQ_CONFLATE_SETTING) self._socket.setsockopt(zmq.SUBSCRIBE, ZMQ_TOPIC_FILTER) + self._socket.setsockopt(zmq.RCVHWM, ZMQ_RCVHWM_SETTING) + try: self._socket.connect(self.url) except ConnectionRefusedError: @@ -106,17 +157,20 @@ class JungfrauJochPreview: self._socket.connect(self.url) def start(self): + """Start the ZMQ update loop in a background thread.""" self._zmq_thread = threading.Thread( target=self._zmq_update_loop, daemon=True, name="JungfrauJoch_live_preview" ) self._zmq_thread.start() def stop(self): + """Stop the ZMQ update loop and wait for the thread to finish.""" self._shutdown_event.set() if self._zmq_thread: self._zmq_thread.join() def _zmq_update_loop(self): + """Zmq update loop with polling for new data. The loop runs at maximum 10 Hz.""" while not self._shutdown_event.is_set(): if self._socket is None: self.connect() @@ -127,17 +181,22 @@ class JungfrauJochPreview: pass except zmq.error.Again: # Happens when receive queue is empty - time.sleep(0.1) + time.sleep(0.1) # NOTE: Change sleep time to control polling rate def _poll(self): """ - Poll the ZMQ socket for new data. It will throttle the data update and - only subscribe to the topic for a single update. This is not very nice - but it seems like there is currently no option to set the update rate on - the backend. + Poll the ZMQ socket for new data. We are currently subscribing and unsubscribing + for each poll loop to avoid receiving too many messages. The zmq update loop is + running with a maximum of 10 Hz, and we wait an additional 0.2 seconds here to + avoid busy waiting. Therefore, the maximum data rate is around 3 Hz. + + NOTE: + This can be optimized in the future as JungfrauJoch supports controlling the update + rate from the server side. We still safeguard here to avoid that the device-server + becomes overloaded with messages. """ - if self._shutdown_event.wait(0.2): + if self._shutdown_event.wait(0.2): # NOTE: Change to adjust throttle rate in polling. return try: @@ -152,8 +211,19 @@ class JungfrauJochPreview: # Unsubscribe from the topic self._socket.setsockopt(zmq.UNSUBSCRIBE, ZMQ_TOPIC_FILTER) - def _parse_data(self, data): - message = cbor2.loads(data, tag_hook=tag_hook) - # Parse message to data, call callback with data - for cb in self._on_update_callback: - cb(message) + def _parse_data(self, bytes_list: list[bytes]): + """ + Parse the received ZMQ data from the JungfrauJoch preview stream. + We will call the _on_update_callback with the decompressed messages. + + The callback needs to be able to deal with the different message types sent + by the JungfrauJoch server ("start", "image", "end") as described in the + JungfrauJoch ZEROMQ preview stream documentation. Messages are dictionary dumps. + (https://jungfraujoch.readthedocs.io/en/latest/ZEROMQ_STREAM.html#preview-stream). + + Args: + bytes_list (list[bytes]): List of byte messages received from ZMQ recv_multipart. + """ + for byte_msg in bytes_list: + msg = cbor2.loads(byte_msg, tag_hook=tag_hook) + self._on_update_callback(msg) -- 2.49.1 From b818181da21e3cbdddc7cc3925f0eb4957a0a1b3 Mon Sep 17 00:00:00 2001 From: appel_c Date: Tue, 27 Jan 2026 09:40:59 +0100 Subject: [PATCH 04/10] fix(mcs): wrap _progress_update callback in try except. --- csaxs_bec/devices/epics/mcs_card/mcs_card_csaxs.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/csaxs_bec/devices/epics/mcs_card/mcs_card_csaxs.py b/csaxs_bec/devices/epics/mcs_card/mcs_card_csaxs.py index 9ceaf98..ea2c73c 100644 --- a/csaxs_bec/devices/epics/mcs_card/mcs_card_csaxs.py +++ b/csaxs_bec/devices/epics/mcs_card/mcs_card_csaxs.py @@ -317,10 +317,14 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard): old_value: Previous value of the signal. value: New value of the signal. """ - scan_done = bool(value == self._num_total_triggers) - self.progress.put(value=value, max_value=self._num_total_triggers, done=scan_done) - if scan_done: - self._scan_done_event.set() + try: + scan_done = bool(value == self._num_total_triggers) + self.progress.put(value=value, max_value=self._num_total_triggers, done=scan_done) + if scan_done: + self._scan_done_event.set() + except Exception: + content = traceback.format_exc() + logger.info(f"Device {self.name} error: {content}") def on_stage(self) -> None: """ -- 2.49.1 From b19bfb7ca412bdbb4bc4295e3ff68461fcfa907a Mon Sep 17 00:00:00 2001 From: appel_c Date: Tue, 27 Jan 2026 09:59:40 +0100 Subject: [PATCH 05/10] fix: improve integration with feedback from the beamline --- csaxs_bec/devices/jungfraujoch/README.MD | 38 ++++++++++++ csaxs_bec/devices/jungfraujoch/eiger.py | 55 +++++++++-------- .../jungfraujoch/jungfrau_joch_client.py | 60 +++++++++++++------ .../jungfraujoch/jungfraujoch_preview.py | 7 ++- 4 files changed, 113 insertions(+), 47 deletions(-) create mode 100644 csaxs_bec/devices/jungfraujoch/README.MD diff --git a/csaxs_bec/devices/jungfraujoch/README.MD b/csaxs_bec/devices/jungfraujoch/README.MD new file mode 100644 index 0000000..e03961c --- /dev/null +++ b/csaxs_bec/devices/jungfraujoch/README.MD @@ -0,0 +1,38 @@ +# Overview +Integration module for Eiger detectors at the cSAXS beamline with JungfrauJoch backend. +There are currently two supported Eiger detectors: +- EIGER 1.5M +- EIGER 9M + +This module provides a base integration for both detectors. A short list of useful +information is also provided below. + +## JungfrauJoch Service +The JungfrauJoch WEB UI is available on http://sls-jfjoch-001:8080. This is an interface +to the broker which runs on sls-jfjoch-001.psi.ch. The writer service runs on +xbl-daq-34.psi.ch. Permissions to get access to these machines and run systemctl or +journalctl commands can be requested with the Infrastructure and Services group in AWI. +Beamline scientists need to check if they have the necessary permissions to connect +to these machines and run the commands below. + +Useful commands for the broker service on sls-jfjoch-001.psi.ch: +- sudo systemctl status jfjoch_broker # Check status +- sudo systemctl start jfjoch_broker # Start service +- sudo systemctl stop jfjoch_broker # Stop service +- sudo systemctl restart jfjoch_broker # Restart service + +For the writer service on xbl-daq-34.psi.ch: +- sudo journalctl -u jfjoch_writer -f # streams live logs +- sudo systemctl status jfjoch_writer # Check status +- sudo systemctl start jfjoch_writer # Start service +- sudo systemctl stop jfjoch_writer # Stop service +- sudo systemctl restart jfjoch_writer # Restart service + +More information about the JungfrauJoch and API client can be found at: (https://jungfraujoch.readthedocs.io/en/latest/index.html) + +### JungfrauJoch API Client +A thin wrapper for the JungfrauJoch API client is provided in the [jungfrau_joch_client](./jungfrau_joch_client.py). +Details about the specific integration are provided in the code. + +## Eiger debugging +For debugging the Eiger hardware, please contact the detector group for support. \ No newline at end of file diff --git a/csaxs_bec/devices/jungfraujoch/eiger.py b/csaxs_bec/devices/jungfraujoch/eiger.py index 15294fa..f8273de 100644 --- a/csaxs_bec/devices/jungfraujoch/eiger.py +++ b/csaxs_bec/devices/jungfraujoch/eiger.py @@ -1,28 +1,17 @@ """ -Generic integration of JungfrauJoch backend with Eiger detectors -for the cSAXS beamline at the Swiss Light Source. -The WEB UI is available on http://sls-jfjoch-001:8080 +Integration module for Eiger detectors at the cSAXS beamline with JungfrauJoch backend. -NOTE: this may not be the best place to store this information. It should be migrated to -beamline documentation for debugging of Eiger & JungfrauJoch. +A few notes on setup and operation of the Eiger detectors through the JungfrauJoch broker: -The JungfrauJoch server for cSAXS runs on sls-jfjoch-001.psi.ch -User with sufficient rights may use: -- sudo systemctl restart jfjoch_broker -- sudo systemctl status jfjoch_broker -to check and/or restart the broker for the JungfrauJoch server. - -Some extra notes for setting up the detector: - If the energy on JFJ is set via DetectorSettings, the variable in DatasetSettings will be ignored - Changes in energy may take time, good to implement logic that only resets energy if needed. - For the Eiger, the frame_time_us in DetectorSettings is ignored, only the frame_time_us in the DatasetSettings is relevant - The bit_depth will be adjusted automatically based on the exp_time. Here, we need to ensure - that subsequent triggers properly - consider the readout_time of the boards. For Jungfrau detectors, the difference between - count_time_us and frame_time_us is the readout_time of the boards. For the Eiger, this needs - to be taken into account during the integration. + that subsequent triggers properly consider the readout_time of the boards. For the Eiger detectors + at cSAXS, a readout time of 20us is configured through the JungfrauJoch deployment config. This + setting is sufficiently large for the detectors if they run in parallel mode. - beam_center and detector settings are required input arguments, thus, they may be set to wrong values for acquisitions to start. Please keep this in mind. @@ -39,9 +28,6 @@ Hardware related notes: - cd power_control_user/ - ./on - ./off - -Further information that may be relevant for debugging: -JungfrauJoch - one needs to connect to the jfj-server (sls-jfjoch-001) """ from __future__ import annotations @@ -266,6 +252,7 @@ class Eiger(PSIDeviceBase): # JFJ adds _master.h5 automatically path = os.path.relpath(self._full_path, start="/sls/x12sa/data").removesuffix("_master.h5") + # path = os.path.relpath(self._full_path, start="/sls/x12sa/data") data_settings = DatasetSettings( image_time_us=int(frame_time_us * 1e6), # This is currently ignored @@ -311,11 +298,20 @@ class Eiger(PSIDeviceBase): def wait_for_complete(): start_time = time.time() timeout = 20 - for _ in range(timeout): + while time.time() - start_time < timeout: if self.jfj_client.wait_for_idle( - timeout=1, request_timeout=10, raise_on_timeout=False + timeout=1, request_timeout=1, raise_on_timeout=False ): - logger.info(f"Device {self.name} completed acquisition.") + # TODO add check if data acquisition finished in success + statistics: MeasurementStatistics = ( + self.jfj_client.api.statistics_data_collection_get(_request_timeout=5) + ) + broker_status = self.jfj_client.jjf_state + logger.info( + f"Device {self.name} completed acquisition. \n \n" + f"Broker status: \n{yaml.dump(broker_status.to_dict(), indent=4)} \n \n" + f"statistics: \n{yaml.dump(statistics.to_dict(), indent=4)}" + ) return logger.info( f"Device {self.name} running loop to wait for complete, time elapsed: {time.time() - start_time}." @@ -323,8 +319,11 @@ class Eiger(PSIDeviceBase): statistics: MeasurementStatistics = self.jfj_client.api.statistics_data_collection_get( _request_timeout=5 ) + broker_status = self.jfj_client.jjf_state raise TimeoutError( - f"Timeout after waiting for device {self.name} to complete for {time.time()-start_time:.2f}s, measurement statistics: {yaml.dump(statistics.to_dict(), indent=4)}" + f"Timeout after waiting for device {self.name} to complete for {time.time()-start_time:.2f}s \n \n" + f"Broker status: \n{yaml.dump(broker_status.to_dict(), indent=4)} \n \n" + f"Measurement statistics: \n{yaml.dump(statistics.to_dict(), indent=4)}" ) status = self.task_handler.submit_task(wait_for_complete, run=True) @@ -337,7 +336,11 @@ class Eiger(PSIDeviceBase): def on_stop(self) -> None: """Called when the device is stopped.""" - self.jfj_client.stop( - request_timeout=0.5 - ) # Call should not block more than 0.5 seconds to stop all devices... + self.jfj_client.stop(request_timeout=0.5) self.task_handler.shutdown() + + def on_destroy(self): + """Called when the device is destroyed.""" + self.jfj_preview_client.stop() + self.on_stop() + return super().on_destroy() diff --git a/csaxs_bec/devices/jungfraujoch/jungfrau_joch_client.py b/csaxs_bec/devices/jungfraujoch/jungfrau_joch_client.py index bddc4eb..da15049 100644 --- a/csaxs_bec/devices/jungfraujoch/jungfrau_joch_client.py +++ b/csaxs_bec/devices/jungfraujoch/jungfrau_joch_client.py @@ -1,4 +1,4 @@ -"""Module with client interface for the Jungfrau Joch detector API""" +"""Module with a thin client wrapper around the Jungfrau Joch detector API""" from __future__ import annotations @@ -29,8 +29,14 @@ class JungfrauJochClientError(Exception): """Base class for exceptions in this module.""" +"Inactive", "Idle", "Busy", "Measuring", "Pedestal", "Error" + + class DetectorState(str, enum.Enum): - """Possible Detector states for Jungfrau Joch detector""" + """ + Enum states of the BrokerStatus state. The pydantic model validates in runtime, + thus we keep the possible states here for a convenient overview and access. + """ INACTIVE = "Inactive" IDLE = "Idle" @@ -41,13 +47,13 @@ class DetectorState(str, enum.Enum): class JungfrauJochClient: - """Thin wrapper around the Jungfrau Joch API client. + """ + Jungfrau Joch API client wrapper. It provides a few thin wrappers around the API client, + that allow to connect, initialise, wait for state changes, set settings, start and stop acquisitions. - sudo systemctl restart jfjoch_broker - sudo systemctl status jfjoch_broker - - It looks as if the detector is not being stopped properly. - One module remains running, how can we restart the detector? + Args: + host (str): Hostname of the Jungfrau Joch broker service. Default is "http://sls-jfjoch-001:8080" + parent (Device, optional): Parent ophyd device, used for logging purposes. """ def __init__( @@ -61,33 +67,35 @@ class JungfrauJochClient: @property def jjf_state(self) -> BrokerStatus: - """Get the status of JungfrauJoch""" + """Broker status of JungfrauJoch.""" response = self.api.status_get() return BrokerStatus(**response.to_dict()) @property def initialised(self) -> bool: - """Check if jfj is connected and ready to receive commands""" return self._initialised @initialised.setter def initialised(self, value: bool) -> None: - """Set the connected status""" self._initialised = value - # TODO this is not correct, as it may be that the state in INACTIVE. Models are not in sync... - # REMOVE all model enums as most of the validation takes place in the Pydantic models, i.e. BrokerStatus here.. @property def detector_state(self) -> DetectorState: - """Get the status of JungfrauJoch""" + """Detector state of JungfrauJoch.""" return DetectorState(self.jjf_state.state) def connect_and_initialise(self, timeout: int = 10, **kwargs) -> None: - """Check if JungfrauJoch is connected and ready to receive commands""" + """ + Connect and initialise the JungfrauJoch detector. The detector must be in + IDLE state to become initialised. This is a blocking call. + + Args: + timeout (int): Timeout in seconds for the initialisation and waiting for IDLE state. + """ status = self.detector_state if status != DetectorState.IDLE: - self.api.initialize_post() # This is a blocking call.... - self.wait_for_idle(timeout, request_timeout=timeout) # Blocking call + self.api.initialize_post() + self.wait_for_idle(timeout, request_timeout=timeout) self.initialised = True def set_detector_settings(self, settings: dict | DetectorSettings, timeout: int = 10) -> None: @@ -114,9 +122,12 @@ class JungfrauJochClient: raise TimeoutError(f"Timeout while setting detector settings for {self._parent_name}") except Exception: content = traceback.format_exc() - raise JungfrauJochClientError( + logger.error( f"Error while setting detector settings for {self._parent_name}: {content}" ) + raise JungfrauJochClientError( + f"Error while setting detector settings for parent device {self._parent_name}." + ) def start(self, settings: dict | DatasetSettings, request_timeout: float = 10) -> None: """Start the mesaurement. DatasetSettings must be provided, and JungfrauJoch must be in IDLE state. @@ -142,14 +153,21 @@ class JungfrauJochClient: dataset_settings=settings, _request_timeout=request_timeout ) except requests.exceptions.Timeout: + content = traceback.format_exc() + logger.error( + f"TimeoutError in JungfrauJochClient for parent device {self._parent_name} during 'start' call: {content}" + ) raise TimeoutError( f"TimeoutError in JungfrauJochClient for parent device {self._parent_name} for 'start' call" ) except Exception: content = traceback.format_exc() - raise JungfrauJochClientError( + logger.error( f"Error in JungfrauJochClient for parent device {self._parent_name} during 'start' call: {content}" ) + raise JungfrauJochClientError( + f"Error in JungfrauJochClient for parent device {self._parent_name} during 'start' post." + ) def stop(self, request_timeout: float = 0.5) -> None: """Stop the acquisition, this only logs errors and is not raising.""" @@ -188,6 +206,10 @@ class JungfrauJochClient: try: self.api.wait_till_done_post(timeout=timeout, _request_timeout=request_timeout) except requests.exceptions.Timeout: + content = traceback.format_exc() + logger.debug( + f"HTTP request timeout in wait_for_idle for {self._parent_name}: {content}" + ) if raise_on_timeout: raise TimeoutError( f"HTTP request timeout in wait_for_idle for {self._parent_name}." diff --git a/csaxs_bec/devices/jungfraujoch/jungfraujoch_preview.py b/csaxs_bec/devices/jungfraujoch/jungfraujoch_preview.py index 3aeb6f4..7f60950 100644 --- a/csaxs_bec/devices/jungfraujoch/jungfraujoch_preview.py +++ b/csaxs_bec/devices/jungfraujoch/jungfraujoch_preview.py @@ -92,7 +92,7 @@ tag_decoders = { } -def tag_hook(tag: int): +def tag_hook(decoder, tag: int): """Get the decoder for Dectris specific CBOR tags. tag must be in tag_decoders.""" tag_decoder = tag_decoders.get(tag.tag) return tag_decoder(tag) if tag_decoder else tag @@ -167,7 +167,7 @@ class JungfrauJochPreview: """Stop the ZMQ update loop and wait for the thread to finish.""" self._shutdown_event.set() if self._zmq_thread: - self._zmq_thread.join() + self._zmq_thread.join(timeout=1.0) def _zmq_update_loop(self): """Zmq update loop with polling for new data. The loop runs at maximum 10 Hz.""" @@ -180,6 +180,9 @@ class JungfrauJochPreview: # Happens when ZMQ partially delivers the multipart message pass except zmq.error.Again: + logger.debug( + f"ZMQ Again exception, receive queue is empty for JFJ preview at {self.url}." + ) # Happens when receive queue is empty time.sleep(0.1) # NOTE: Change sleep time to control polling rate -- 2.49.1 From 8a69c7aa36a6784ff2e9a913ac4284a69224ed7c Mon Sep 17 00:00:00 2001 From: appel_c Date: Thu, 29 Jan 2026 08:52:01 +0100 Subject: [PATCH 06/10] fix(config): add eiger 9m to bl_detectors --- csaxs_bec/device_configs/bl_detectors.yaml | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/csaxs_bec/device_configs/bl_detectors.yaml b/csaxs_bec/device_configs/bl_detectors.yaml index 934d6a6..d9a889a 100644 --- a/csaxs_bec/device_configs/bl_detectors.yaml +++ b/csaxs_bec/device_configs/bl_detectors.yaml @@ -9,6 +9,17 @@ eiger_1_5: readoutPriority: async softwareTrigger: False +eiger_9: + description: Eiger 9M in-vacuum detector + deviceClass: csaxs_bec.devices.jungfraujoch.eiger_9m.Eiger9M + deviceConfig: + detector_distance: 100 + beam_center: [0, 0] + onFailure: raise + enabled: true + readoutPriority: async + softwareTrigger: False + ids_cam: description: IDS camera for live image acquisition deviceClass: csaxs_bec.devices.ids_cameras.IDSCamera -- 2.49.1 From efd51462fc6fe5a26131b511093aaaccada6a4b4 Mon Sep 17 00:00:00 2001 From: appel_c Date: Thu, 29 Jan 2026 09:06:28 +0100 Subject: [PATCH 07/10] refactor(jungfraujoch-preview): Improve Jungfraujoch Preview module --- .../jungfraujoch/jungfraujoch_preview.py | 38 ++++++++++--------- 1 file changed, 21 insertions(+), 17 deletions(-) diff --git a/csaxs_bec/devices/jungfraujoch/jungfraujoch_preview.py b/csaxs_bec/devices/jungfraujoch/jungfraujoch_preview.py index 7f60950..a6cf358 100644 --- a/csaxs_bec/devices/jungfraujoch/jungfraujoch_preview.py +++ b/csaxs_bec/devices/jungfraujoch/jungfraujoch_preview.py @@ -62,6 +62,7 @@ def decode_dectris_compression(tag: cbor2.CBORTag): #### Dectris CBOR TAG Extensions ######## ######################################### +# Mapping of various additional CBOR tags from Dectris to decoder functions tag_decoders = { 40: lambda tag: decode_multi_dim_array(tag, column_major=False), 64: lambda tag: decode_typed_array(tag, dtype="u1"), @@ -88,12 +89,15 @@ tag_decoders = { 86: lambda tag: decode_typed_array(tag, dtype=" Date: Thu, 29 Jan 2026 10:21:36 +0100 Subject: [PATCH 08/10] fix: Cleanup of jungfrau_joch_client --- .../jungfraujoch/jungfrau_joch_client.py | 112 +++++++++++------- 1 file changed, 67 insertions(+), 45 deletions(-) diff --git a/csaxs_bec/devices/jungfraujoch/jungfrau_joch_client.py b/csaxs_bec/devices/jungfraujoch/jungfrau_joch_client.py index da15049..7b18c5c 100644 --- a/csaxs_bec/devices/jungfraujoch/jungfrau_joch_client.py +++ b/csaxs_bec/devices/jungfraujoch/jungfrau_joch_client.py @@ -9,6 +9,7 @@ import traceback from typing import TYPE_CHECKING import requests +import yaml from bec_lib.logger import bec_logger from jfjoch_client.api.default_api import DefaultApi from jfjoch_client.api_client import ApiClient @@ -19,7 +20,7 @@ from jfjoch_client.models.detector_settings import DetectorSettings logger = bec_logger.logger -if TYPE_CHECKING: +if TYPE_CHECKING: # pragma: no cover from ophyd import Device @@ -29,9 +30,6 @@ class JungfrauJochClientError(Exception): """Base class for exceptions in this module.""" -"Inactive", "Idle", "Busy", "Measuring", "Pedestal", "Error" - - class DetectorState(str, enum.Enum): """ Enum states of the BrokerStatus state. The pydantic model validates in runtime, @@ -48,11 +46,13 @@ class DetectorState(str, enum.Enum): class JungfrauJochClient: """ - Jungfrau Joch API client wrapper. It provides a few thin wrappers around the API client, - that allow to connect, initialise, wait for state changes, set settings, start and stop acquisitions. + Jungfrau Joch API client wrapper. It provides a thin wrapper methods around the API client, + that allow to connect, initialise, wait for state changes, set settings, start and stop + acquisitions. Args: - host (str): Hostname of the Jungfrau Joch broker service. Default is "http://sls-jfjoch-001:8080" + host (str): Hostname of the Jungfrau Joch broker service. + Default is "http://sls-jfjoch-001:8080" parent (Device, optional): Parent ophyd device, used for logging purposes. """ @@ -71,6 +71,7 @@ class JungfrauJochClient: response = self.api.status_get() return BrokerStatus(**response.to_dict()) + # pylint: disable=missing-function-docstring @property def initialised(self) -> bool: return self._initialised @@ -79,39 +80,49 @@ class JungfrauJochClient: def initialised(self, value: bool) -> None: self._initialised = value + # pylint: disable=missing-function-docstring @property def detector_state(self) -> DetectorState: - """Detector state of JungfrauJoch.""" return DetectorState(self.jjf_state.state) - def connect_and_initialise(self, timeout: int = 10, **kwargs) -> None: + def connect_and_initialise(self, timeout: int = 10) -> None: """ Connect and initialise the JungfrauJoch detector. The detector must be in - IDLE state to become initialised. This is a blocking call. + IDLE state to become initialised. This is a blocking call, the timeout parameter + will be passed to the HTTP requests timeout method of the wait_for_idle method. Args: timeout (int): Timeout in seconds for the initialisation and waiting for IDLE state. """ status = self.detector_state + # TODO: #135 Check if the detector has to be in INACTIVE state before initialisation if status != DetectorState.IDLE: self.api.initialize_post() - self.wait_for_idle(timeout, request_timeout=timeout) + self.wait_for_idle(timeout) self.initialised = True def set_detector_settings(self, settings: dict | DetectorSettings, timeout: int = 10) -> None: - """Set the detector settings. JungfrauJoch must be in IDLE, Error or Inactive state. - Note, the full settings have to be provided, otherwise the settings will be overwritten with default values. + """ + Set the detector settings. The state of JungfrauJoch must be in IDLE, + Error or Inactive state. Please note: a full set of setttings has to be provided, + otherwise the settings will be overwritten with default values. Args: settings (dict): dictionary of settings + timeout (int): Timeout in seconds for the HTTP request to set the settings. """ state = self.detector_state if state not in [DetectorState.IDLE, DetectorState.ERROR, DetectorState.INACTIVE]: + logger.info( + f"JungfrauJoch backend fo device {self._parent_name} is not in IDLE state," + " waiting 1s before retrying..." + ) time.sleep(1) # Give the detector 1s to become IDLE, retry state = self.detector_state if state not in [DetectorState.IDLE, DetectorState.ERROR, DetectorState.INACTIVE]: raise JungfrauJochClientError( - f"Error in {self._parent_name}. Detector must be in IDLE, ERROR or INACTIVE state to set settings. Current state: {state}" + f"Error on {self._parent_name}. Detector must be in IDLE, ERROR or INACTIVE" + " state to set settings. Current state: {state}" ) if isinstance(settings, dict): @@ -119,31 +130,36 @@ class JungfrauJochClient: try: self.api.config_detector_put(detector_settings=settings, _request_timeout=timeout) except requests.exceptions.Timeout: - raise TimeoutError(f"Timeout while setting detector settings for {self._parent_name}") + raise TimeoutError( + f"Timeout on device {self._parent_name} while setting detector settings:\n " + f"{yaml.dump(settings, indent=4)}." + ) except Exception: content = traceback.format_exc() logger.error( - f"Error while setting detector settings for {self._parent_name}: {content}" + f"Error on device {self._parent_name} while setting detector settings:\n " + f"{yaml.dump(settings, indent=4)}. Error traceback: {content}" ) raise JungfrauJochClientError( - f"Error while setting detector settings for parent device {self._parent_name}." + f"Error on device {self._parent_name} while setting detector settings:\n " + f"{yaml.dump(settings, indent=4)}. Full traceback: {content}." ) def start(self, settings: dict | DatasetSettings, request_timeout: float = 10) -> None: - """Start the mesaurement. DatasetSettings must be provided, and JungfrauJoch must be in IDLE state. - The method call is blocking and JungfrauJoch will be ready to measure after the call resolves. + """ + Start the acquisition with the provided dataset settings. + The detector must be in IDLE state. Settings must always provide a full set of + parameters, missing parameters will be set to default values. Args: - settings (dict): dictionary of settings - - Please check the DataSettings class for the available settings. Minimum required settings are - beam_x_pxl, beam_y_pxl, detector_distance_mm, incident_energy_keV. - + settings (dict | DatasetSettings): Dataset settings to start the acquisition with. + request_timeout (float): Timeout in sec for the HTTP request to start the acquisition. """ state = self.detector_state if state != DetectorState.IDLE: raise JungfrauJochClientError( - f"Error in {self._parent_name}. Detector must be in IDLE state to set settings. Current state: {state}" + f"Error on device {self._parent_name}. " + f"Detector must be in IDLE state to start acquisition. Current state: {state}" ) if isinstance(settings, dict): @@ -155,18 +171,23 @@ class JungfrauJochClient: except requests.exceptions.Timeout: content = traceback.format_exc() logger.error( - f"TimeoutError in JungfrauJochClient for parent device {self._parent_name} during 'start' call: {content}" + f"Timeout error after {request_timeout} seconds on device {self._parent_name} " + f"during 'start' call with dataset settings: {yaml.dump(settings, indent=4)}. \n" + f"Traceback: {content}" ) raise TimeoutError( - f"TimeoutError in JungfrauJochClient for parent device {self._parent_name} for 'start' call" + f"Timeout error after {request_timeout} seconds on device {self._parent_name} " + f"during 'start' call with dataset settings: {yaml.dump(settings, indent=4)}." ) except Exception: content = traceback.format_exc() logger.error( - f"Error in JungfrauJochClient for parent device {self._parent_name} during 'start' call: {content}" + f"Error on device {self._parent_name} during 'start' post with dataset settings: \n" + f"{yaml.dump(settings, indent=4)}. \nTraceback: {content}" ) raise JungfrauJochClientError( - f"Error in JungfrauJochClient for parent device {self._parent_name} during 'start' post." + f"Error on device {self._parent_name} during 'start' post with dataset settings: \n" + f"{yaml.dump(settings, indent=4)}. \nTraceback: {content}." ) def stop(self, request_timeout: float = 0.5) -> None: @@ -178,49 +199,50 @@ class JungfrauJochClient: except requests.exceptions.Timeout: content = traceback.format_exc() logger.error( - f"Timeout in JungFrauJochClient for device {self._parent_name} during stop: {content}" + f"Timeout error after {request_timeout} seconds on device {self._parent_name} " + f"during stop: {content}" ) except Exception: content = traceback.format_exc() - logger.error( - f"Error in JungFrauJochClient for device {self._parent_name} during stop: {content}" - ) + logger.error(f"Error on device {self._parent_name} during stop: {content}") thread = threading.Thread( target=_stop_call, daemon=True, args=(self,), name="stop_jungfraujoch_thread" ) thread.start() - def wait_for_idle( - self, timeout: int = 10, request_timeout: float | None = None, raise_on_timeout: bool = True - ) -> bool: - """Wait for JungfrauJoch to be in Idle state. Blocking call with timeout. + def wait_for_idle(self, timeout: int = 10, raise_on_timeout: bool = True) -> bool: + """ + Method to wait until the detector is in IDLE state. This is a blocking call with a + timeout that can be specified. The additional parameter raise_on_timeout can be used to + raise an exception on timeout instead of returning boolean True/False. Args: timeout (int): timeout in seconds + raise_on_timeout (bool): If True, raises an exception on timeout. Default is True. Returns: bool: True if the detector is in IDLE state, False if timeout occurred """ - if request_timeout is None: - request_timeout = timeout try: - self.api.wait_till_done_post(timeout=timeout, _request_timeout=request_timeout) + self.api.wait_till_done_post(timeout=timeout, _request_timeout=timeout) except requests.exceptions.Timeout: content = traceback.format_exc() - logger.debug( - f"HTTP request timeout in wait_for_idle for {self._parent_name}: {content}" + logger.info( + f"Timeout after {timeout} seconds on device {self._parent_name} in wait_for_idle: {content}" ) if raise_on_timeout: raise TimeoutError( - f"HTTP request timeout in wait_for_idle for {self._parent_name}." + f"Timeout after {timeout} seconds on device {self._parent_name} in wait_for_idle." ) return False except Exception as exc: content = traceback.format_exc() - logger.debug(f"Waiting for device {self._parent_name} to become IDLE: {content}") + logger.info( + f"Error on device {self._parent_name} in wait_for_idle. Full traceback: {content}" + ) if raise_on_timeout: raise JungfrauJochClientError( - f"Error in wait_for_idle for {self._parent_name}: {content}" + f"Error on device {self._parent_name} in wait_for_idle: {content}" ) from exc return False return True -- 2.49.1 From 705df4b2532eca6a1b29e36aed43eea587dd88c2 Mon Sep 17 00:00:00 2001 From: appel_c Date: Thu, 29 Jan 2026 10:51:47 +0100 Subject: [PATCH 09/10] fix: cleanup of eiger integration --- csaxs_bec/devices/jungfraujoch/eiger.py | 163 ++++++++++++++++-------- 1 file changed, 108 insertions(+), 55 deletions(-) diff --git a/csaxs_bec/devices/jungfraujoch/eiger.py b/csaxs_bec/devices/jungfraujoch/eiger.py index f8273de..e175ba4 100644 --- a/csaxs_bec/devices/jungfraujoch/eiger.py +++ b/csaxs_bec/devices/jungfraujoch/eiger.py @@ -71,9 +71,18 @@ class EigerError(Exception): class Eiger(PSIDeviceBase): """ Base integration of the Eiger1.5M and Eiger9M at cSAXS. + + Args: + name (str) : Name of the device + detector_name (str): Name of the detector. Supports ["EIGER 9M", "EIGER 8.5M (tmp)", "EIGER 1.5M"] + host (str): Hostname of the Jungfrau Joch server. + port (int): Port of the Jungfrau Joch server. + scan_info (ScanInfo): The scan info to use. + device_manager (DeviceManagerDS): The device manager to use. + **kwargs: Additional keyword arguments. """ - USER_ACCESS = ["detector_distance", "beam_center"] + USER_ACCESS = ["set_detector_distance", "set_beam_center"] file_event = Cpt(FileEventSignal, name="file_event") preview_image = Cpt(PreviewSignal, name="preview_image", ndim=2) @@ -91,18 +100,6 @@ class Eiger(PSIDeviceBase): device_manager=None, **kwargs, ): - """ - Initialize the PSI Device Base class. - - Args: - name (str) : Name of the device - detector_name (str): Name of the detector. Supports ["EIGER 9M", "EIGER 8.5M (tmp)", "EIGER 1.5M"] - host (str): Hostname of the Jungfrau Joch server. - port (int): Port of the Jungfrau Joch server. - scan_info (ScanInfo): The scan info to use. - device_manager (DeviceManagerDS): The device manager to use. - **kwargs: Additional keyword arguments. - """ super().__init__(name=name, scan_info=scan_info, device_manager=device_manager, **kwargs) self._host = f"{host}:{port}" self.jfj_client = JungfrauJochClient(host=self._host, parent=self) @@ -116,10 +113,19 @@ class Eiger(PSIDeviceBase): self._beam_center = beam_center self._readout_time = readout_time self._full_path = "" + self._num_triggers = 0 if self.device_manager is not None: self.device_manager: DeviceManagerDS def _preview_callback(self, message: dict) -> None: + """ + Callback method for handling preview messages as received from the JungfrauJoch preview stream. + These messages are dictionary dumps as described in the JFJ ZMQ preview stream documentation. + (https://jungfraujoch.readthedocs.io/en/latest/ZEROMQ_STREAM.html#preview-stream). + + Args: + message (dict): The message received from the preview stream. + """ if message.get("type", "") == "image": data = message.get("data", {}).get("default", None) if data is None: @@ -128,41 +134,70 @@ class Eiger(PSIDeviceBase): logger.info(f"Received preview image on device {self.name}") self.preview_image.put(data) + # pylint: disable=missing-function-docstring @property def detector_distance(self) -> float: - """The detector distance in mm.""" return self._detector_distance @detector_distance.setter def detector_distance(self, value: float) -> None: - """Set the detector distance in mm.""" if value <= 0: raise ValueError("Detector distance must be a positive value.") self._detector_distance = value + def set_detector_distance(self, distance: float) -> None: + """ + Set the detector distance in mm. + + Args: + distance (float): The detector distance in mm. + """ + self.detector_distance = distance + + # pylint: disable=missing-function-docstring @property def beam_center(self) -> tuple[float, float]: - """The beam center in pixels. (x,y)""" return self._beam_center @beam_center.setter def beam_center(self, value: tuple[float, float]) -> None: - """Set the beam center in pixels. (x,y)""" + if any(coord < 0 for coord in value): + raise ValueError("Beam center coordinates must be non-negative.") self._beam_center = value - def on_init(self) -> None: + def set_beam_center(self, x: float, y: float) -> None: """ - Called when the device is initialized. + Set the beam center coordinates in pixels. - No siganls are connected at this point, - thus should not be set here but in on_connected instead. + Args: + x (float): The x coordinate of the beam center in pixels. + y (float): The y coordinate of the beam center in pixels. """ + self.beam_center = (x, y) + + def on_init(self) -> None: + """Hook called during device initialization.""" + + # pylint: disable=arguments-differ + def wait_for_connection(self, timeout: float = 10) -> None: + """ + Wait for the device to be connected to the JungfrauJoch backend. + + Args: + timeout (float): Timeout in seconds to wait for the connection. + """ + self.jfj_client.api.status_get(_request_timeout=timeout) # If connected, this responds def on_connected(self) -> None: """ + Hook called after the device is connected to through the device server. + Called after the device is connected and its signals are connected. - Default values for signals should be set here. + Default values for signals should be set here. Currently, the detector needs to be + initialised manually through the WEB UI of JungfrauJoch. Once agreed upon, the automated + initialisation can be re-enabled here (code commented below). """ + start_time = time.time() logger.debug(f"On connected called for {self.name}") self.jfj_client.stop(request_timeout=3) # Check which detector is selected @@ -188,8 +223,8 @@ class Eiger(PSIDeviceBase): f"Detector {self.detector_name} is not in IDLE state, current state: {self.jfj_client.detector_state}. Please initialize the detector in the WEB UI: {self._host}." ) - # TODO - check again once Eiger should be initialized automatically, currently human initialization is expected - # # Once the automation should be enabled, we may use here + # TODO - Currently the initialisation of the detector is done manually through the WEB UI. Once adjusted + # this can be automated here again. # detector_selection = [ # det for det in available_detectors.detectors if det.id == self.detector_name # ] @@ -206,42 +241,50 @@ class Eiger(PSIDeviceBase): # Setup Detector settings, here we may also set the energy already as this might be time consuming settings = DetectorSettings(frame_time_us=int(500), timing=DetectorTiming.TRIGGER) self.jfj_client.set_detector_settings(settings, timeout=5) + # Set the file writer to the appropriate output for the HDF5 file file_writer_settings = FileWriterSettings(overwrite=True, format=FileWriterFormat.NXMXVDS) logger.debug( f"Setting writer_settings: {yaml.dump(file_writer_settings.to_dict(), indent=4)}" ) + # Setup the file writer settings self.jfj_client.api.config_file_writer_put( file_writer_settings=file_writer_settings, _request_timeout=10 ) + # Start the preview client self.jfj_preview_client.connect() self.jfj_preview_client.start() - logger.info(f"Connected to JungfrauJoch preview stream at {self.jfj_preview_client.url}") + logger.info( + f"Device {self.name} initialized after {time.time()-start_time:.2f}s. Preview stream connected on url: {self.jfj_preview_client.url}" + ) def on_stage(self) -> DeviceStatus | None: """ - Called while staging the device. - - Information about the upcoming scan can be accessed from the scan_info object. + Hook called when staging the device. Information about the upcoming scan can be accessed from the scan_info object. + scan_msg = self.scan_info.msg """ start_time = time.time() scan_msg = self.scan_info.msg - # Set acquisition parameter - # TODO add check of mono energy, this can then also be passed to DatasetSettings + # TODO: Check mono energy from device in BEC + # Setting incident energy in keV incident_energy = 12.0 + # Setting up exp_time and num_triggers acquisition parameter exp_time = scan_msg.scan_parameters.get("exp_time", 0) - if exp_time <= self._readout_time: + if exp_time <= self._readout_time: # Exp_time must be at least the readout time raise ValueError( - f"Receive scan request for scan {scan_msg.scan_name} with exp_time {exp_time}s, which must be larger than the readout time {self._readout_time}s of the detector {self.detector_name}." + f"Value error on device {self.name}: Exposure time {exp_time}s is less than readout time {self._readout_time}s." ) - frame_time_us = exp_time # - ntrigger = int(scan_msg.num_points * scan_msg.scan_parameters["frames_per_trigger"]) - # Fetch file path + self._num_triggers = int( + scan_msg.num_points * scan_msg.scan_parameters["frames_per_trigger"] + ) + + # Setting up the full path for file writing self._full_path = get_full_path(scan_msg, name=f"{self.name}_master") self._full_path = os.path.abspath(os.path.expanduser(self._full_path)) + # Inform BEC about upcoming file event self.file_event.put( file_path=self._full_path, @@ -252,23 +295,26 @@ class Eiger(PSIDeviceBase): # JFJ adds _master.h5 automatically path = os.path.relpath(self._full_path, start="/sls/x12sa/data").removesuffix("_master.h5") - # path = os.path.relpath(self._full_path, start="/sls/x12sa/data") + # Create dataset settings for API call. data_settings = DatasetSettings( - image_time_us=int(frame_time_us * 1e6), # This is currently ignored - ntrigger=ntrigger, + image_time_us=int(exp_time * 1e6), + ntrigger=self._num_triggers, file_prefix=path, beam_x_pxl=int(self._beam_center[0]), beam_y_pxl=int(self._beam_center[1]), detector_distance_mm=self.detector_distance, incident_energy_ke_v=incident_energy, ) - logger.info(f"Setting data_settings: {yaml.dump(data_settings.to_dict(), indent=4)}") + logger.debug(f"Setting data_settings: {yaml.dump(data_settings.to_dict(), indent=4)}") prep_time = time.time() - self.jfj_client.wait_for_idle(timeout=10, request_timeout=10) # Ensure we are in IDLE state + self.jfj_client.wait_for_idle(timeout=10) # Ensure we are in IDLE state self.jfj_client.start(settings=data_settings) # Takes around ~0.6s + + # Time the stage process logger.info( - f"On stage done for device {self.name} after {time.time()-start_time:.2f}s, with {time.time()-prep_time:.2f}s spent with communication to JungfrauJoch." + f"Device {self.name} staged for scan. Time spent {time.time()-start_time:.2f}s," + f" with {time.time()-prep_time:.2f}s spent with communication to JungfrauJoch." ) def on_unstage(self) -> DeviceStatus: @@ -283,7 +329,7 @@ class Eiger(PSIDeviceBase): def _file_event_callback(self, status: DeviceStatus) -> None: """Callback to update the file_event signal when the acquisition is done.""" logger.debug( - f"Acquisition done callback called for {self.name} for status {status.success}" + f"File event callback on complete status for device {self.name}: done={status.done}, successful={status.success}" ) self.file_event.put( file_path=self._full_path, @@ -293,28 +339,35 @@ class Eiger(PSIDeviceBase): ) def on_complete(self) -> DeviceStatus: - """Called to inquire if a device has completed a scans.""" + """ + Called at the end of the scan. The method should implement an asynchronous wait for the + device to complete the acquisition. A callback to update the file_event signal is + attached that resolves the file event when the acquisition is done. + + Returns: + DeviceStatus: The status object representing the completion of the acquisition. + """ def wait_for_complete(): start_time = time.time() - timeout = 20 + # NOTE: This adjust the time (s) that should be waited for completion of the scan. + timeout = 20 # while time.time() - start_time < timeout: - if self.jfj_client.wait_for_idle( - timeout=1, request_timeout=1, raise_on_timeout=False - ): - # TODO add check if data acquisition finished in success + if self.jfj_client.wait_for_idle(timeout=1, raise_on_timeout=False): + # TODO: Once available, add check for statistics: MeasurementStatistics = ( self.jfj_client.api.statistics_data_collection_get(_request_timeout=5) ) - broker_status = self.jfj_client.jjf_state - logger.info( - f"Device {self.name} completed acquisition. \n \n" - f"Broker status: \n{yaml.dump(broker_status.to_dict(), indent=4)} \n \n" - f"statistics: \n{yaml.dump(statistics.to_dict(), indent=4)}" - ) + if statistics.images_collected < self._num_triggers: + raise EigerError( + f"Device {self.name} acquisition incomplete. " + f"Expected {self._num_triggers} triggers, " + f"but only {statistics.images_collected} were collected." + ) return logger.info( - f"Device {self.name} running loop to wait for complete, time elapsed: {time.time() - start_time}." + f"Waiting for device {self.name} to finish complete, time elapsed: " + f"{time.time() - start_time}." ) statistics: MeasurementStatistics = self.jfj_client.api.statistics_data_collection_get( _request_timeout=5 -- 2.49.1 From 9db56f527336b147c67a3ee96eeb5d874757e09e Mon Sep 17 00:00:00 2001 From: appel_c Date: Thu, 29 Jan 2026 11:50:47 +0100 Subject: [PATCH 10/10] refactor: Eiger refactoring, fix test and add docs. --- csaxs_bec/device_configs/bl_detectors.yaml | 2 +- csaxs_bec/devices/jungfraujoch/README.MD | 14 ++- csaxs_bec/devices/jungfraujoch/eiger.py | 5 +- .../jungfraujoch/jungfrau_joch_client.py | 4 +- tests/tests_devices/test_eiger.py | 100 ++++++++++++++++-- 5 files changed, 110 insertions(+), 15 deletions(-) diff --git a/csaxs_bec/device_configs/bl_detectors.yaml b/csaxs_bec/device_configs/bl_detectors.yaml index d9a889a..4bbce99 100644 --- a/csaxs_bec/device_configs/bl_detectors.yaml +++ b/csaxs_bec/device_configs/bl_detectors.yaml @@ -10,7 +10,7 @@ eiger_1_5: softwareTrigger: False eiger_9: - description: Eiger 9M in-vacuum detector + description: Eiger 9M detector deviceClass: csaxs_bec.devices.jungfraujoch.eiger_9m.Eiger9M deviceConfig: detector_distance: 100 diff --git a/csaxs_bec/devices/jungfraujoch/README.MD b/csaxs_bec/devices/jungfraujoch/README.MD index e03961c..468fba5 100644 --- a/csaxs_bec/devices/jungfraujoch/README.MD +++ b/csaxs_bec/devices/jungfraujoch/README.MD @@ -34,5 +34,15 @@ More information about the JungfrauJoch and API client can be found at: (https:/ A thin wrapper for the JungfrauJoch API client is provided in the [jungfrau_joch_client](./jungfrau_joch_client.py). Details about the specific integration are provided in the code. -## Eiger debugging -For debugging the Eiger hardware, please contact the detector group for support. \ No newline at end of file + +## Eiger implementation +The Eiger detector integration is provided in the [eiger.py](./eiger.py) module. It provides a base integration for both Eiger 1.5M and Eiger 9M detectors. +Logic specific to each detector is implemented in the respective modules: +- [eiger_1_5m.py](./eiger_1_5m.py) +- [eiger_9m.py](./eiger_9m.py) + +With the current implementation, the detector initialization should be done by a beamline scientist through the JungfrauJoch WEB UI by choosing the +appropriate detector (1.5M or 9M) before loading the device config with BEC. BEC will check upon connecting if the selected detector matches the expected one. +A preview stream for images is also provided which is forwarded and accessible through the `preview_image` signal. + +For more specific details, please check the code documentation. \ No newline at end of file diff --git a/csaxs_bec/devices/jungfraujoch/eiger.py b/csaxs_bec/devices/jungfraujoch/eiger.py index e175ba4..9d8c71b 100644 --- a/csaxs_bec/devices/jungfraujoch/eiger.py +++ b/csaxs_bec/devices/jungfraujoch/eiger.py @@ -114,6 +114,7 @@ class Eiger(PSIDeviceBase): self._readout_time = readout_time self._full_path = "" self._num_triggers = 0 + self._wait_for_on_complete = 20 # seconds if self.device_manager is not None: self.device_manager: DeviceManagerDS @@ -351,7 +352,7 @@ class Eiger(PSIDeviceBase): def wait_for_complete(): start_time = time.time() # NOTE: This adjust the time (s) that should be waited for completion of the scan. - timeout = 20 # + timeout = self._wait_for_on_complete while time.time() - start_time < timeout: if self.jfj_client.wait_for_idle(timeout=1, raise_on_timeout=False): # TODO: Once available, add check for @@ -372,7 +373,7 @@ class Eiger(PSIDeviceBase): statistics: MeasurementStatistics = self.jfj_client.api.statistics_data_collection_get( _request_timeout=5 ) - broker_status = self.jfj_client.jjf_state + broker_status = self.jfj_client.jfj_status raise TimeoutError( f"Timeout after waiting for device {self.name} to complete for {time.time()-start_time:.2f}s \n \n" f"Broker status: \n{yaml.dump(broker_status.to_dict(), indent=4)} \n \n" diff --git a/csaxs_bec/devices/jungfraujoch/jungfrau_joch_client.py b/csaxs_bec/devices/jungfraujoch/jungfrau_joch_client.py index 7b18c5c..dab8cf6 100644 --- a/csaxs_bec/devices/jungfraujoch/jungfrau_joch_client.py +++ b/csaxs_bec/devices/jungfraujoch/jungfrau_joch_client.py @@ -66,7 +66,7 @@ class JungfrauJochClient: self._parent_name = parent.name if parent else self.__class__.__name__ @property - def jjf_state(self) -> BrokerStatus: + def jfj_status(self) -> BrokerStatus: """Broker status of JungfrauJoch.""" response = self.api.status_get() return BrokerStatus(**response.to_dict()) @@ -83,7 +83,7 @@ class JungfrauJochClient: # pylint: disable=missing-function-docstring @property def detector_state(self) -> DetectorState: - return DetectorState(self.jjf_state.state) + return DetectorState(self.jfj_status.state) def connect_and_initialise(self, timeout: int = 10) -> None: """ diff --git a/tests/tests_devices/test_eiger.py b/tests/tests_devices/test_eiger.py index e43c981..03c0066 100644 --- a/tests/tests_devices/test_eiger.py +++ b/tests/tests_devices/test_eiger.py @@ -5,6 +5,7 @@ from time import time from typing import TYPE_CHECKING, Generator from unittest import mock +import numpy as np import pytest from bec_lib.messages import FileMessage, ScanStatusMessage from jfjoch_client.models.broker_status import BrokerStatus @@ -78,7 +79,7 @@ def detector_list(request) -> Generator[DetectorList, None, None]: ), DetectorListElement( id=2, - description="EIGER 8.5M (tmp)", + description="EIGER 9M", serial_number="123456", base_ipv4_addr="192.168.0.1", udp_interface_count=1, @@ -103,7 +104,11 @@ def eiger_1_5m(mock_scan_info) -> Generator[Eiger1_5M, None, None]: name = "eiger_1_5m" dev = Eiger1_5M(name=name, beam_center=(256, 256), detector_distance=100.0) dev.scan_info.msg = mock_scan_info - yield dev + try: + yield dev + finally: + if dev._destroyed is False: + dev.destroy() @pytest.fixture(scope="function") @@ -113,7 +118,19 @@ def eiger_9m(mock_scan_info) -> Generator[Eiger9M, None, None]: name = "eiger_9m" dev = Eiger9M(name=name) dev.scan_info.msg = mock_scan_info - yield dev + try: + yield dev + finally: + if dev._destroyed is False: + dev.destroy() + + +def test_eiger_wait_for_connection(eiger_1_5m, eiger_9m): + """Test the wait_for_connection metho is calling status_get on the JFJ API client.""" + for eiger in (eiger_1_5m, eiger_9m): + with mock.patch.object(eiger.jfj_client.api, "status_get") as mock_status_get: + eiger.wait_for_connection(timeout=1) + mock_status_get.assert_called_once_with(_request_timeout=1) @pytest.mark.parametrize("detector_state", ["Idle", "Inactive"]) @@ -141,7 +158,7 @@ def test_eiger_1_5m_on_connected(eiger_1_5m, detector_list, detector_state): else: eiger.on_connected() assert mock_set_det.call_args == mock.call( - DetectorSettings(frame_time_us=500, timing=DetectorTiming.TRIGGER), timeout=10 + DetectorSettings(frame_time_us=500, timing=DetectorTiming.TRIGGER), timeout=5 ) assert mock_file_writer.call_args == mock.call( file_writer_settings=FileWriterSettings( @@ -179,7 +196,7 @@ def test_eiger_9m_on_connected(eiger_9m, detector_list, detector_state): else: eiger.on_connected() assert mock_set_det.call_args == mock.call( - DetectorSettings(frame_time_us=500, timing=DetectorTiming.TRIGGER), timeout=10 + DetectorSettings(frame_time_us=500, timing=DetectorTiming.TRIGGER), timeout=5 ) assert mock_file_writer.call_args == mock.call( file_writer_settings=FileWriterSettings( @@ -216,11 +233,39 @@ def test_eiger_on_stop(eiger_1_5m): stop_event.wait(timeout=5) # Thread should be killed from task_handler +def test_eiger_on_destroy(eiger_1_5m): + """Test the on_destroy logic of the Eiger detector. This is equivalent for 9M and 1_5M.""" + eiger = eiger_1_5m + start_event = threading.Event() + stop_event = threading.Event() + + def tmp_task(): + start_event.set() + try: + while True: + time.sleep(0.1) + finally: + stop_event.set() + + eiger.task_handler.submit_task(tmp_task) + start_event.wait(timeout=5) + + with ( + mock.patch.object(eiger.jfj_preview_client, "stop") as mock_jfj_preview_client_stop, + mock.patch.object(eiger.jfj_client, "stop") as mock_jfj_client_stop, + ): + eiger.on_destroy() + mock_jfj_preview_client_stop.assert_called_once() + mock_jfj_client_stop.assert_called_once() + stop_event.wait(timeout=5) + + @pytest.mark.timeout(25) @pytest.mark.parametrize("raise_timeout", [True, False]) def test_eiger_on_complete(eiger_1_5m, raise_timeout): """Test the on_complete logic of the Eiger detector. This is equivalent for 9M and 1_5M.""" eiger = eiger_1_5m + eiger._wait_for_on_complete = 1 # reduce wait time for testing callback_completed_event = threading.Event() @@ -230,7 +275,7 @@ def test_eiger_on_complete(eiger_1_5m, raise_timeout): unblock_wait_for_idle = threading.Event() - def mock_wait_for_idle(timeout: int, request_timeout: float): + def mock_wait_for_idle(timeout: float, raise_on_timeout: bool) -> bool: if unblock_wait_for_idle.wait(timeout): if raise_timeout: return False @@ -238,11 +283,18 @@ def test_eiger_on_complete(eiger_1_5m, raise_timeout): return False with ( + mock.patch.object( + eiger.jfj_client.api, "status_get", return_value=BrokerStatus(state="Idle") + ), mock.patch.object(eiger.jfj_client, "wait_for_idle", side_effect=mock_wait_for_idle), mock.patch.object( eiger.jfj_client.api, "statistics_data_collection_get", - return_value=MeasurementStatistics(run_number=1), + return_value=MeasurementStatistics( + run_number=1, + images_collected=eiger.scan_info.msg.num_points + * eiger.scan_info.msg.scan_parameters["frames_per_trigger"], + ), ), ): status = eiger.complete() @@ -284,7 +336,7 @@ def test_eiger_file_event_callback(eiger_1_5m, tmp_path): assert file_msg.hinted_h5_entries == {"data": "entry/data/data"} -def test_eiger_on_sage(eiger_1_5m): +def test_eiger_on_stage(eiger_1_5m): """Test the on_stage and on_unstage logic of the Eiger detector. This is equivalent for 9M and 1_5M.""" eiger = eiger_1_5m scan_msg = eiger.scan_info.msg @@ -316,3 +368,35 @@ def test_eiger_on_sage(eiger_1_5m): ) assert mock_start.call_args == mock.call(settings=data_settings) assert eiger.staged is Staged.yes + + +def test_eiger_set_det_distance_test_beam_center(eiger_1_5m): + """Test the set_detector_distance and set_beam_center methods. Equivalent for 9M and 1_5M.""" + eiger = eiger_1_5m + old_distance = eiger.detector_distance + new_distance = old_distance + 100 + old_beam_center = eiger.beam_center + new_beam_center = (old_beam_center[0] + 20, old_beam_center[1] + 50) + eiger.set_detector_distance(new_distance) + assert eiger.detector_distance == new_distance + eiger.set_beam_center(x=new_beam_center[0], y=new_beam_center[1]) + assert eiger.beam_center == new_beam_center + with pytest.raises(ValueError): + eiger.set_beam_center(x=-10, y=100) # Cannot set negative beam center + with pytest.raises(ValueError): + eiger.detector_distance = -50 # Cannot set negative detector distance + + +def test_eiger_preview_callback(eiger_1_5m): + """Preview callback test for the Eiger detector. This is equivalent for 9M and 1_5M.""" + eiger = eiger_1_5m + # NOTE: I don't find models for the CBOR messages used by JFJ, currently using a dummay dict. + # Please adjust once the proper model is found. + for msg_type in ["start", "end", "image", "calibration", "metadata"]: + msg = {"type": msg_type, "data": {"default": np.array([[1, 2], [3, 4]])}} + with mock.patch.object(eiger.preview_image, "put") as mock_preview_put: + eiger._preview_callback(msg) + if msg_type == "image": + mock_preview_put.assert_called_once_with(msg["data"]["default"]) + else: + mock_preview_put.assert_not_called() -- 2.49.1