diff --git a/csaxs_bec/device_configs/bl_detectors.yaml b/csaxs_bec/device_configs/bl_detectors.yaml index 934d6a6..4bbce99 100644 --- a/csaxs_bec/device_configs/bl_detectors.yaml +++ b/csaxs_bec/device_configs/bl_detectors.yaml @@ -9,6 +9,17 @@ eiger_1_5: readoutPriority: async softwareTrigger: False +eiger_9: + description: Eiger 9M detector + deviceClass: csaxs_bec.devices.jungfraujoch.eiger_9m.Eiger9M + deviceConfig: + detector_distance: 100 + beam_center: [0, 0] + onFailure: raise + enabled: true + readoutPriority: async + softwareTrigger: False + ids_cam: description: IDS camera for live image acquisition deviceClass: csaxs_bec.devices.ids_cameras.IDSCamera diff --git a/csaxs_bec/devices/epics/mcs_card/mcs_card_csaxs.py b/csaxs_bec/devices/epics/mcs_card/mcs_card_csaxs.py index 9ceaf98..ea2c73c 100644 --- a/csaxs_bec/devices/epics/mcs_card/mcs_card_csaxs.py +++ b/csaxs_bec/devices/epics/mcs_card/mcs_card_csaxs.py @@ -317,10 +317,14 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard): old_value: Previous value of the signal. value: New value of the signal. """ - scan_done = bool(value == self._num_total_triggers) - self.progress.put(value=value, max_value=self._num_total_triggers, done=scan_done) - if scan_done: - self._scan_done_event.set() + try: + scan_done = bool(value == self._num_total_triggers) + self.progress.put(value=value, max_value=self._num_total_triggers, done=scan_done) + if scan_done: + self._scan_done_event.set() + except Exception: + content = traceback.format_exc() + logger.info(f"Device {self.name} error: {content}") def on_stage(self) -> None: """ diff --git a/csaxs_bec/devices/jungfraujoch/README.MD b/csaxs_bec/devices/jungfraujoch/README.MD new file mode 100644 index 0000000..468fba5 --- /dev/null +++ b/csaxs_bec/devices/jungfraujoch/README.MD @@ -0,0 +1,48 @@ +# Overview +Integration module for Eiger detectors at the cSAXS beamline with JungfrauJoch backend. +There are currently two supported Eiger detectors: +- EIGER 1.5M +- EIGER 9M + +This module provides a base integration for both detectors. A short list of useful +information is also provided below. + +## JungfrauJoch Service +The JungfrauJoch WEB UI is available on http://sls-jfjoch-001:8080. This is an interface +to the broker which runs on sls-jfjoch-001.psi.ch. The writer service runs on +xbl-daq-34.psi.ch. Permissions to get access to these machines and run systemctl or +journalctl commands can be requested with the Infrastructure and Services group in AWI. +Beamline scientists need to check if they have the necessary permissions to connect +to these machines and run the commands below. + +Useful commands for the broker service on sls-jfjoch-001.psi.ch: +- sudo systemctl status jfjoch_broker # Check status +- sudo systemctl start jfjoch_broker # Start service +- sudo systemctl stop jfjoch_broker # Stop service +- sudo systemctl restart jfjoch_broker # Restart service + +For the writer service on xbl-daq-34.psi.ch: +- sudo journalctl -u jfjoch_writer -f # streams live logs +- sudo systemctl status jfjoch_writer # Check status +- sudo systemctl start jfjoch_writer # Start service +- sudo systemctl stop jfjoch_writer # Stop service +- sudo systemctl restart jfjoch_writer # Restart service + +More information about the JungfrauJoch and API client can be found at: (https://jungfraujoch.readthedocs.io/en/latest/index.html) + +### JungfrauJoch API Client +A thin wrapper for the JungfrauJoch API client is provided in the [jungfrau_joch_client](./jungfrau_joch_client.py). +Details about the specific integration are provided in the code. + + +## Eiger implementation +The Eiger detector integration is provided in the [eiger.py](./eiger.py) module. It provides a base integration for both Eiger 1.5M and Eiger 9M detectors. +Logic specific to each detector is implemented in the respective modules: +- [eiger_1_5m.py](./eiger_1_5m.py) +- [eiger_9m.py](./eiger_9m.py) + +With the current implementation, the detector initialization should be done by a beamline scientist through the JungfrauJoch WEB UI by choosing the +appropriate detector (1.5M or 9M) before loading the device config with BEC. BEC will check upon connecting if the selected detector matches the expected one. +A preview stream for images is also provided which is forwarded and accessible through the `preview_image` signal. + +For more specific details, please check the code documentation. \ No newline at end of file diff --git a/csaxs_bec/devices/jungfraujoch/eiger.py b/csaxs_bec/devices/jungfraujoch/eiger.py index 70ee01f..9d8c71b 100644 --- a/csaxs_bec/devices/jungfraujoch/eiger.py +++ b/csaxs_bec/devices/jungfraujoch/eiger.py @@ -1,34 +1,23 @@ """ -Generic integration of JungfrauJoch backend with Eiger detectors -for the cSAXS beamline at the Swiss Light Source. -The WEB UI is available on http://sls-jfjoch-001:8080 +Integration module for Eiger detectors at the cSAXS beamline with JungfrauJoch backend. -NOTE: this may not be the best place to store this information. It should be migrated to -beamline documentation for debugging of Eiger & JungfrauJoch. +A few notes on setup and operation of the Eiger detectors through the JungfrauJoch broker: -The JungfrauJoch server for cSAXS runs on sls-jfjoch-001.psi.ch -User with sufficient rights may use: -- sudo systemctl restart jfjoch_broker -- sudo systemctl status jfjoch_broker -to check and/or restart the broker for the JungfrauJoch server. - -Some extra notes for setting up the detector: - If the energy on JFJ is set via DetectorSettings, the variable in DatasetSettings will be ignored - Changes in energy may take time, good to implement logic that only resets energy if needed. - For the Eiger, the frame_time_us in DetectorSettings is ignored, only the frame_time_us in the DatasetSettings is relevant - The bit_depth will be adjusted automatically based on the exp_time. Here, we need to ensure - that subsequent triggers properly - consider the readout_time of the boards. For Jungfrau detectors, the difference between - count_time_us and frame_time_us is the readout_time of the boards. For the Eiger, this needs - to be taken into account during the integration. + that subsequent triggers properly consider the readout_time of the boards. For the Eiger detectors + at cSAXS, a readout time of 20us is configured through the JungfrauJoch deployment config. This + setting is sufficiently large for the detectors if they run in parallel mode. - beam_center and detector settings are required input arguments, thus, they may be set to wrong values for acquisitions to start. Please keep this in mind. Hardware related notes: - If there is an HW issue with the detector, power cycling may help. -- The sls_detector package is available on console on /sls/X12SA/data/gac-x12sa/erik/micromamba +- The sls_detector package is available on console on /sls/x12sa/applications/erik/micromamba - Run: source setup_9m.sh # Be careful, this connects to the detector, so it should not be used during operation - Useful commands: @@ -39,9 +28,6 @@ Hardware related notes: - cd power_control_user/ - ./on - ./off - -Further information that may be relevant for debugging: -JungfrauJoch - one needs to connect to the jfj-server (sls-jfjoch-001) """ from __future__ import annotations @@ -84,10 +70,19 @@ class EigerError(Exception): class Eiger(PSIDeviceBase): """ - Base integration of the Eiger1.5M and Eiger9M at cSAXS. All relevant + Base integration of the Eiger1.5M and Eiger9M at cSAXS. + + Args: + name (str) : Name of the device + detector_name (str): Name of the detector. Supports ["EIGER 9M", "EIGER 8.5M (tmp)", "EIGER 1.5M"] + host (str): Hostname of the Jungfrau Joch server. + port (int): Port of the Jungfrau Joch server. + scan_info (ScanInfo): The scan info to use. + device_manager (DeviceManagerDS): The device manager to use. + **kwargs: Additional keyword arguments. """ - USER_ACCESS = ["detector_distance", "beam_center"] + USER_ACCESS = ["set_detector_distance", "set_beam_center"] file_event = Cpt(FileEventSignal, name="file_event") preview_image = Cpt(PreviewSignal, name="preview_image", ndim=2) @@ -105,23 +100,12 @@ class Eiger(PSIDeviceBase): device_manager=None, **kwargs, ): - """ - Initialize the PSI Device Base class. - - Args: - name (str) : Name of the device - detector_name (str): Name of the detector. Supports ["EIGER 9M", "EIGER 8.5M (tmp)", "EIGER 1.5M"] - host (str): Hostname of the Jungfrau Joch server. - port (int): Port of the Jungfrau Joch server. - scan_info (ScanInfo): The scan info to use. - device_manager (DeviceManagerDS): The device manager to use. - **kwargs: Additional keyword arguments. - """ super().__init__(name=name, scan_info=scan_info, device_manager=device_manager, **kwargs) self._host = f"{host}:{port}" self.jfj_client = JungfrauJochClient(host=self._host, parent=self) + # NOTE: fetch this information from JungfrauJochClient during on_connected! self.jfj_preview_client = JungfrauJochPreview( - url="tcp://129.129.95.114:5400", cb=self.preview_image.put + url="tcp://129.129.95.114:5400", cb=self._preview_callback ) # IP of sls-jfjoch-001.psi.ch on port 5400 for ZMQ stream self.device_manager = device_manager self.detector_name = detector_name @@ -129,53 +113,102 @@ class Eiger(PSIDeviceBase): self._beam_center = beam_center self._readout_time = readout_time self._full_path = "" + self._num_triggers = 0 + self._wait_for_on_complete = 20 # seconds if self.device_manager is not None: self.device_manager: DeviceManagerDS + def _preview_callback(self, message: dict) -> None: + """ + Callback method for handling preview messages as received from the JungfrauJoch preview stream. + These messages are dictionary dumps as described in the JFJ ZMQ preview stream documentation. + (https://jungfraujoch.readthedocs.io/en/latest/ZEROMQ_STREAM.html#preview-stream). + + Args: + message (dict): The message received from the preview stream. + """ + if message.get("type", "") == "image": + data = message.get("data", {}).get("default", None) + if data is None: + logger.error(f"Received image message on device {self.name} without data.") + return + logger.info(f"Received preview image on device {self.name}") + self.preview_image.put(data) + + # pylint: disable=missing-function-docstring @property def detector_distance(self) -> float: - """The detector distance in mm.""" return self._detector_distance @detector_distance.setter def detector_distance(self, value: float) -> None: - """Set the detector distance in mm.""" if value <= 0: raise ValueError("Detector distance must be a positive value.") self._detector_distance = value + def set_detector_distance(self, distance: float) -> None: + """ + Set the detector distance in mm. + + Args: + distance (float): The detector distance in mm. + """ + self.detector_distance = distance + + # pylint: disable=missing-function-docstring @property def beam_center(self) -> tuple[float, float]: - """The beam center in pixels. (x,y)""" return self._beam_center @beam_center.setter def beam_center(self, value: tuple[float, float]) -> None: - """Set the beam center in pixels. (x,y)""" + if any(coord < 0 for coord in value): + raise ValueError("Beam center coordinates must be non-negative.") self._beam_center = value - def on_init(self) -> None: + def set_beam_center(self, x: float, y: float) -> None: """ - Called when the device is initialized. + Set the beam center coordinates in pixels. - No siganls are connected at this point, - thus should not be set here but in on_connected instead. + Args: + x (float): The x coordinate of the beam center in pixels. + y (float): The y coordinate of the beam center in pixels. """ + self.beam_center = (x, y) + + def on_init(self) -> None: + """Hook called during device initialization.""" + + # pylint: disable=arguments-differ + def wait_for_connection(self, timeout: float = 10) -> None: + """ + Wait for the device to be connected to the JungfrauJoch backend. + + Args: + timeout (float): Timeout in seconds to wait for the connection. + """ + self.jfj_client.api.status_get(_request_timeout=timeout) # If connected, this responds def on_connected(self) -> None: """ + Hook called after the device is connected to through the device server. + Called after the device is connected and its signals are connected. - Default values for signals should be set here. + Default values for signals should be set here. Currently, the detector needs to be + initialised manually through the WEB UI of JungfrauJoch. Once agreed upon, the automated + initialisation can be re-enabled here (code commented below). """ + start_time = time.time() logger.debug(f"On connected called for {self.name}") self.jfj_client.stop(request_timeout=3) # Check which detector is selected # Get available detectors available_detectors = self.jfj_client.api.config_select_detector_get(_request_timeout=5) + logger.debug(f"Available detectors {available_detectors}") # Get current detector current_detector_name = "" - if available_detectors.current_id: + if available_detectors.current_id is not None: detector_selection = [ det.description for det in available_detectors.detectors @@ -190,8 +223,9 @@ class Eiger(PSIDeviceBase): raise RuntimeError( f"Detector {self.detector_name} is not in IDLE state, current state: {self.jfj_client.detector_state}. Please initialize the detector in the WEB UI: {self._host}." ) - # TODO - check again once Eiger should be initialized automatically, currently human initialization is expected - # # Once the automation should be enabled, we may use here + + # TODO - Currently the initialisation of the detector is done manually through the WEB UI. Once adjusted + # this can be automated here again. # detector_selection = [ # det for det in available_detectors.detectors if det.id == self.detector_name # ] @@ -207,41 +241,51 @@ class Eiger(PSIDeviceBase): # Setup Detector settings, here we may also set the energy already as this might be time consuming settings = DetectorSettings(frame_time_us=int(500), timing=DetectorTiming.TRIGGER) - self.jfj_client.set_detector_settings(settings, timeout=10) + self.jfj_client.set_detector_settings(settings, timeout=5) + # Set the file writer to the appropriate output for the HDF5 file file_writer_settings = FileWriterSettings(overwrite=True, format=FileWriterFormat.NXMXVDS) logger.debug( f"Setting writer_settings: {yaml.dump(file_writer_settings.to_dict(), indent=4)}" ) + + # Setup the file writer settings self.jfj_client.api.config_file_writer_put( file_writer_settings=file_writer_settings, _request_timeout=10 ) + # Start the preview client self.jfj_preview_client.connect() self.jfj_preview_client.start() - logger.info(f"Connected to JungfrauJoch preview stream at {self.jfj_preview_client.url}") + logger.info( + f"Device {self.name} initialized after {time.time()-start_time:.2f}s. Preview stream connected on url: {self.jfj_preview_client.url}" + ) def on_stage(self) -> DeviceStatus | None: """ - Called while staging the device. - - Information about the upcoming scan can be accessed from the scan_info object. + Hook called when staging the device. Information about the upcoming scan can be accessed from the scan_info object. + scan_msg = self.scan_info.msg """ start_time = time.time() scan_msg = self.scan_info.msg - # Set acquisition parameter - # TODO add check of mono energy, this can then also be passed to DatasetSettings + + # TODO: Check mono energy from device in BEC + # Setting incident energy in keV incident_energy = 12.0 + # Setting up exp_time and num_triggers acquisition parameter exp_time = scan_msg.scan_parameters.get("exp_time", 0) - if exp_time <= self._readout_time: + if exp_time <= self._readout_time: # Exp_time must be at least the readout time raise ValueError( - f"Receive scan request for scan {scan_msg.scan_name} with exp_time {exp_time}s, which must be larger than the readout time {self._readout_time}s of the detector {self.detector_name}." + f"Value error on device {self.name}: Exposure time {exp_time}s is less than readout time {self._readout_time}s." ) - frame_time_us = exp_time # - ntrigger = int(scan_msg.num_points * scan_msg.scan_parameters["frames_per_trigger"]) - # Fetch file path + self._num_triggers = int( + scan_msg.num_points * scan_msg.scan_parameters["frames_per_trigger"] + ) + + # Setting up the full path for file writing self._full_path = get_full_path(scan_msg, name=f"{self.name}_master") self._full_path = os.path.abspath(os.path.expanduser(self._full_path)) + # Inform BEC about upcoming file event self.file_event.put( file_path=self._full_path, @@ -249,11 +293,14 @@ class Eiger(PSIDeviceBase): successful=False, hinted_h5_entries={"data": "entry/data/data"}, ) + # JFJ adds _master.h5 automatically path = os.path.relpath(self._full_path, start="/sls/x12sa/data").removesuffix("_master.h5") + + # Create dataset settings for API call. data_settings = DatasetSettings( - image_time_us=int(frame_time_us * 1e6), # This is currently ignored - ntrigger=ntrigger, + image_time_us=int(exp_time * 1e6), + ntrigger=self._num_triggers, file_prefix=path, beam_x_pxl=int(self._beam_center[0]), beam_y_pxl=int(self._beam_center[1]), @@ -261,11 +308,15 @@ class Eiger(PSIDeviceBase): incident_energy_ke_v=incident_energy, ) logger.debug(f"Setting data_settings: {yaml.dump(data_settings.to_dict(), indent=4)}") - prep_time = start_time - time.time() - logger.debug(f"Prepared information for eiger to start acquisition in {prep_time:.2f}s") - self.jfj_client.wait_for_idle(timeout=10, request_timeout=10) # Ensure we are in IDLE state + prep_time = time.time() + self.jfj_client.wait_for_idle(timeout=10) # Ensure we are in IDLE state self.jfj_client.start(settings=data_settings) # Takes around ~0.6s - logger.debug(f"Wait for IDLE and start call took {time.time()-start_time-prep_time:.2f}s") + + # Time the stage process + logger.info( + f"Device {self.name} staged for scan. Time spent {time.time()-start_time:.2f}s," + f" with {time.time()-prep_time:.2f}s spent with communication to JungfrauJoch." + ) def on_unstage(self) -> DeviceStatus: """Called while unstaging the device.""" @@ -278,7 +329,9 @@ class Eiger(PSIDeviceBase): def _file_event_callback(self, status: DeviceStatus) -> None: """Callback to update the file_event signal when the acquisition is done.""" - logger.info(f"Acquisition done callback called for {self.name} for status {status.success}") + logger.debug( + f"File event callback on complete status for device {self.name}: done={status.done}, successful={status.success}" + ) self.file_event.put( file_path=self._full_path, done=status.done, @@ -287,19 +340,44 @@ class Eiger(PSIDeviceBase): ) def on_complete(self) -> DeviceStatus: - """Called to inquire if a device has completed a scans.""" + """ + Called at the end of the scan. The method should implement an asynchronous wait for the + device to complete the acquisition. A callback to update the file_event signal is + attached that resolves the file event when the acquisition is done. + + Returns: + DeviceStatus: The status object representing the completion of the acquisition. + """ def wait_for_complete(): start_time = time.time() - timeout = 10 - for _ in range(timeout): - if self.jfj_client.wait_for_idle(timeout=1, request_timeout=10): + # NOTE: This adjust the time (s) that should be waited for completion of the scan. + timeout = self._wait_for_on_complete + while time.time() - start_time < timeout: + if self.jfj_client.wait_for_idle(timeout=1, raise_on_timeout=False): + # TODO: Once available, add check for + statistics: MeasurementStatistics = ( + self.jfj_client.api.statistics_data_collection_get(_request_timeout=5) + ) + if statistics.images_collected < self._num_triggers: + raise EigerError( + f"Device {self.name} acquisition incomplete. " + f"Expected {self._num_triggers} triggers, " + f"but only {statistics.images_collected} were collected." + ) return + logger.info( + f"Waiting for device {self.name} to finish complete, time elapsed: " + f"{time.time() - start_time}." + ) statistics: MeasurementStatistics = self.jfj_client.api.statistics_data_collection_get( _request_timeout=5 ) + broker_status = self.jfj_client.jfj_status raise TimeoutError( - f"Timeout after waiting for detector {self.name} to complete for {time.time()-start_time:.2f}s, measurement statistics: {yaml.dump(statistics.to_dict(), indent=4)}" + f"Timeout after waiting for device {self.name} to complete for {time.time()-start_time:.2f}s \n \n" + f"Broker status: \n{yaml.dump(broker_status.to_dict(), indent=4)} \n \n" + f"Measurement statistics: \n{yaml.dump(statistics.to_dict(), indent=4)}" ) status = self.task_handler.submit_task(wait_for_complete, run=True) @@ -312,7 +390,11 @@ class Eiger(PSIDeviceBase): def on_stop(self) -> None: """Called when the device is stopped.""" - self.jfj_client.stop( - request_timeout=0.5 - ) # Call should not block more than 0.5 seconds to stop all devices... + self.jfj_client.stop(request_timeout=0.5) self.task_handler.shutdown() + + def on_destroy(self): + """Called when the device is destroyed.""" + self.jfj_preview_client.stop() + self.on_stop() + return super().on_destroy() diff --git a/csaxs_bec/devices/jungfraujoch/eiger_9m.py b/csaxs_bec/devices/jungfraujoch/eiger_9m.py index f44ca1d..f206103 100644 --- a/csaxs_bec/devices/jungfraujoch/eiger_9m.py +++ b/csaxs_bec/devices/jungfraujoch/eiger_9m.py @@ -21,18 +21,18 @@ if TYPE_CHECKING: # pragma no cover from bec_server.device_server.device_server import DeviceManagerDS EIGER9M_READOUT_TIME_US = 500e-6 # 500 microseconds in s -DETECTOR_NAME = "EIGER 8.5M (tmp)" # "EIGER 9M"" +DETECTOR_NAME = "EIGER 9M" # "EIGER 9M"" # pylint:disable=invalid-name class Eiger9M(Eiger): """ - Eiger 1.5M specific integration for the in-vaccum Eiger. + EIGER 9M specific integration for the in-vaccum Eiger. The logic implemented here is coupled to the DelayGenerator integration, repsonsible for the global triggering of all devices through a single Trigger logic. Please check the eiger.py class for more details about the integration of relevant backend - services. The detector_name must be set to "EIGER 1.5M: + services. The detector_name must be set to "EIGER 9M": """ USER_ACCESS = Eiger.USER_ACCESS + [] # Add more user_access methods here. diff --git a/csaxs_bec/devices/jungfraujoch/jungfrau_joch_client.py b/csaxs_bec/devices/jungfraujoch/jungfrau_joch_client.py index 1db6b15..dab8cf6 100644 --- a/csaxs_bec/devices/jungfraujoch/jungfrau_joch_client.py +++ b/csaxs_bec/devices/jungfraujoch/jungfrau_joch_client.py @@ -1,13 +1,15 @@ -"""Module with client interface for the Jungfrau Joch detector API""" +"""Module with a thin client wrapper around the Jungfrau Joch detector API""" from __future__ import annotations import enum +import threading import time import traceback from typing import TYPE_CHECKING import requests +import yaml from bec_lib.logger import bec_logger from jfjoch_client.api.default_api import DefaultApi from jfjoch_client.api_client import ApiClient @@ -18,7 +20,7 @@ from jfjoch_client.models.detector_settings import DetectorSettings logger = bec_logger.logger -if TYPE_CHECKING: +if TYPE_CHECKING: # pragma: no cover from ophyd import Device @@ -29,7 +31,10 @@ class JungfrauJochClientError(Exception): class DetectorState(str, enum.Enum): - """Possible Detector states for Jungfrau Joch detector""" + """ + Enum states of the BrokerStatus state. The pydantic model validates in runtime, + thus we keep the possible states here for a convenient overview and access. + """ INACTIVE = "Inactive" IDLE = "Idle" @@ -40,13 +45,15 @@ class DetectorState(str, enum.Enum): class JungfrauJochClient: - """Thin wrapper around the Jungfrau Joch API client. + """ + Jungfrau Joch API client wrapper. It provides a thin wrapper methods around the API client, + that allow to connect, initialise, wait for state changes, set settings, start and stop + acquisitions. - sudo systemctl restart jfjoch_broker - sudo systemctl status jfjoch_broker - - It looks as if the detector is not being stopped properly. - One module remains running, how can we restart the detector? + Args: + host (str): Hostname of the Jungfrau Joch broker service. + Default is "http://sls-jfjoch-001:8080" + parent (Device, optional): Parent ophyd device, used for logging purposes. """ def __init__( @@ -59,50 +66,63 @@ class JungfrauJochClient: self._parent_name = parent.name if parent else self.__class__.__name__ @property - def jjf_state(self) -> BrokerStatus: - """Get the status of JungfrauJoch""" + def jfj_status(self) -> BrokerStatus: + """Broker status of JungfrauJoch.""" response = self.api.status_get() return BrokerStatus(**response.to_dict()) + # pylint: disable=missing-function-docstring @property def initialised(self) -> bool: - """Check if jfj is connected and ready to receive commands""" return self._initialised @initialised.setter def initialised(self, value: bool) -> None: - """Set the connected status""" self._initialised = value - # TODO this is not correct, as it may be that the state in INACTIVE. Models are not in sync... - # REMOVE all model enums as most of the validation takes place in the Pydantic models, i.e. BrokerStatus here.. + # pylint: disable=missing-function-docstring @property def detector_state(self) -> DetectorState: - """Get the status of JungfrauJoch""" - return DetectorState(self.jjf_state.state) + return DetectorState(self.jfj_status.state) - def connect_and_initialise(self, timeout: int = 10, **kwargs) -> None: - """Check if JungfrauJoch is connected and ready to receive commands""" + def connect_and_initialise(self, timeout: int = 10) -> None: + """ + Connect and initialise the JungfrauJoch detector. The detector must be in + IDLE state to become initialised. This is a blocking call, the timeout parameter + will be passed to the HTTP requests timeout method of the wait_for_idle method. + + Args: + timeout (int): Timeout in seconds for the initialisation and waiting for IDLE state. + """ status = self.detector_state + # TODO: #135 Check if the detector has to be in INACTIVE state before initialisation if status != DetectorState.IDLE: - self.api.initialize_post() # This is a blocking call.... - self.wait_for_idle(timeout, request_timeout=timeout) # Blocking call + self.api.initialize_post() + self.wait_for_idle(timeout) self.initialised = True def set_detector_settings(self, settings: dict | DetectorSettings, timeout: int = 10) -> None: - """Set the detector settings. JungfrauJoch must be in IDLE, Error or Inactive state. - Note, the full settings have to be provided, otherwise the settings will be overwritten with default values. + """ + Set the detector settings. The state of JungfrauJoch must be in IDLE, + Error or Inactive state. Please note: a full set of setttings has to be provided, + otherwise the settings will be overwritten with default values. Args: settings (dict): dictionary of settings + timeout (int): Timeout in seconds for the HTTP request to set the settings. """ state = self.detector_state if state not in [DetectorState.IDLE, DetectorState.ERROR, DetectorState.INACTIVE]: + logger.info( + f"JungfrauJoch backend fo device {self._parent_name} is not in IDLE state," + " waiting 1s before retrying..." + ) time.sleep(1) # Give the detector 1s to become IDLE, retry state = self.detector_state if state not in [DetectorState.IDLE, DetectorState.ERROR, DetectorState.INACTIVE]: raise JungfrauJochClientError( - f"Error in {self._parent_name}. Detector must be in IDLE, ERROR or INACTIVE state to set settings. Current state: {state}" + f"Error on {self._parent_name}. Detector must be in IDLE, ERROR or INACTIVE" + " state to set settings. Current state: {state}" ) if isinstance(settings, dict): @@ -110,28 +130,36 @@ class JungfrauJochClient: try: self.api.config_detector_put(detector_settings=settings, _request_timeout=timeout) except requests.exceptions.Timeout: - raise TimeoutError(f"Timeout while setting detector settings for {self._parent_name}") + raise TimeoutError( + f"Timeout on device {self._parent_name} while setting detector settings:\n " + f"{yaml.dump(settings, indent=4)}." + ) except Exception: content = traceback.format_exc() + logger.error( + f"Error on device {self._parent_name} while setting detector settings:\n " + f"{yaml.dump(settings, indent=4)}. Error traceback: {content}" + ) raise JungfrauJochClientError( - f"Error while setting detector settings for {self._parent_name}: {content}" + f"Error on device {self._parent_name} while setting detector settings:\n " + f"{yaml.dump(settings, indent=4)}. Full traceback: {content}." ) def start(self, settings: dict | DatasetSettings, request_timeout: float = 10) -> None: - """Start the mesaurement. DatasetSettings must be provided, and JungfrauJoch must be in IDLE state. - The method call is blocking and JungfrauJoch will be ready to measure after the call resolves. + """ + Start the acquisition with the provided dataset settings. + The detector must be in IDLE state. Settings must always provide a full set of + parameters, missing parameters will be set to default values. Args: - settings (dict): dictionary of settings - - Please check the DataSettings class for the available settings. Minimum required settings are - beam_x_pxl, beam_y_pxl, detector_distance_mm, incident_energy_keV. - + settings (dict | DatasetSettings): Dataset settings to start the acquisition with. + request_timeout (float): Timeout in sec for the HTTP request to start the acquisition. """ state = self.detector_state if state != DetectorState.IDLE: raise JungfrauJochClientError( - f"Error in {self._parent_name}. Detector must be in IDLE state to set settings. Current state: {state}" + f"Error on device {self._parent_name}. " + f"Detector must be in IDLE state to start acquisition. Current state: {state}" ) if isinstance(settings, dict): @@ -141,46 +169,80 @@ class JungfrauJochClient: dataset_settings=settings, _request_timeout=request_timeout ) except requests.exceptions.Timeout: + content = traceback.format_exc() + logger.error( + f"Timeout error after {request_timeout} seconds on device {self._parent_name} " + f"during 'start' call with dataset settings: {yaml.dump(settings, indent=4)}. \n" + f"Traceback: {content}" + ) raise TimeoutError( - f"TimeoutError in JungfrauJochClient for parent device {self._parent_name} for 'start' call" + f"Timeout error after {request_timeout} seconds on device {self._parent_name} " + f"during 'start' call with dataset settings: {yaml.dump(settings, indent=4)}." ) except Exception: content = traceback.format_exc() + logger.error( + f"Error on device {self._parent_name} during 'start' post with dataset settings: \n" + f"{yaml.dump(settings, indent=4)}. \nTraceback: {content}" + ) raise JungfrauJochClientError( - f"Error in JungfrauJochClient for parent device {self._parent_name} during 'start' call: {content}" + f"Error on device {self._parent_name} during 'start' post with dataset settings: \n" + f"{yaml.dump(settings, indent=4)}. \nTraceback: {content}." ) def stop(self, request_timeout: float = 0.5) -> None: """Stop the acquisition, this only logs errors and is not raising.""" - try: - self.api.cancel_post_with_http_info(_request_timeout=request_timeout) - except requests.exceptions.Timeout: - content = traceback.format_exc() - logger.error( - f"Timeout in JungFrauJochClient for device {self._parent_name} during stop: {content}" - ) - except Exception: - content = traceback.format_exc() - logger.error( - f"Error in JungFrauJochClient for device {self._parent_name} during stop: {content}" - ) - def wait_for_idle(self, timeout: int = 10, request_timeout: float | None = None) -> bool: - """Wait for JungfrauJoch to be in Idle state. Blocking call with timeout. + def _stop_call(self): + try: + self.api.cancel_post_with_http_info() # (_request_timeout=request_timeout) + except requests.exceptions.Timeout: + content = traceback.format_exc() + logger.error( + f"Timeout error after {request_timeout} seconds on device {self._parent_name} " + f"during stop: {content}" + ) + except Exception: + content = traceback.format_exc() + logger.error(f"Error on device {self._parent_name} during stop: {content}") + + thread = threading.Thread( + target=_stop_call, daemon=True, args=(self,), name="stop_jungfraujoch_thread" + ) + thread.start() + + def wait_for_idle(self, timeout: int = 10, raise_on_timeout: bool = True) -> bool: + """ + Method to wait until the detector is in IDLE state. This is a blocking call with a + timeout that can be specified. The additional parameter raise_on_timeout can be used to + raise an exception on timeout instead of returning boolean True/False. Args: timeout (int): timeout in seconds + raise_on_timeout (bool): If True, raises an exception on timeout. Default is True. Returns: bool: True if the detector is in IDLE state, False if timeout occurred """ - if request_timeout is None: - request_timeout = timeout try: - self.api.wait_till_done_post(timeout=timeout, _request_timeout=request_timeout) + self.api.wait_till_done_post(timeout=timeout, _request_timeout=timeout) except requests.exceptions.Timeout: - raise TimeoutError(f"HTTP request timeout in wait_for_idle for {self._parent_name}") - except Exception: content = traceback.format_exc() - logger.debug(f"Waiting for device {self._parent_name} to become IDLE: {content}") + logger.info( + f"Timeout after {timeout} seconds on device {self._parent_name} in wait_for_idle: {content}" + ) + if raise_on_timeout: + raise TimeoutError( + f"Timeout after {timeout} seconds on device {self._parent_name} in wait_for_idle." + ) + return False + except Exception as exc: + content = traceback.format_exc() + logger.info( + f"Error on device {self._parent_name} in wait_for_idle. Full traceback: {content}" + ) + if raise_on_timeout: + raise JungfrauJochClientError( + f"Error on device {self._parent_name} in wait_for_idle: {content}" + ) from exc return False return True diff --git a/csaxs_bec/devices/jungfraujoch/jungfraujoch_preview.py b/csaxs_bec/devices/jungfraujoch/jungfraujoch_preview.py index bfda46d..a6cf358 100644 --- a/csaxs_bec/devices/jungfraujoch/jungfraujoch_preview.py +++ b/csaxs_bec/devices/jungfraujoch/jungfraujoch_preview.py @@ -1,22 +1,136 @@ -"""Module for the Eiger preview ZMQ stream.""" +""" +Module for the JungfrauJoch preview ZMQ stream for the Eiger detector at cSAXS. +The Preview client is implemented for the JungfrauJoch ZMQ PUB-SUB interface, and +should be independent of the EIGER detector type. + +The client connects to the ZMQ PUB-SUB preview stream and calls a user provided callback +function with the decompressed messages received from the stream. The callback needs to be +able to deal with the different message types sent by the JungfrauJoch server ("start", +"image", "end") as described in the JungfrauJoch ZEROMQ preview stream documentation. +(https://jungfraujoch.readthedocs.io/en/latest/ZEROMQ_STREAM.html#preview-stream). +""" from __future__ import annotations -import json import threading import time from typing import Callable +import cbor2 import numpy as np import zmq from bec_lib.logger import bec_logger +from dectris.compression import decompress logger = bec_logger.logger -ZMQ_TOPIC_FILTER = b"" +############################### +###### CBOR TAG DECODERS ###### +############################### +# Dectris specific CBOR tags and decoders for Jungfrau data +# Reference: +# https://github.com/dectris/documentation/blob/main/stream_v2/examples/client.py + + +def decode_multi_dim_array(tag: cbor2.CBORTag, column_major: bool = False): + """Decode a multi-dimensional array from a CBOR tag.""" + dimensions, contents = tag.value + if isinstance(contents, list): + array = np.empty((len(contents),), dtype=object) + array[:] = contents + elif isinstance(contents, (np.ndarray, np.generic)): + array = contents + else: + raise cbor2.CBORDecodeValueError("expected array or typed array") + return array.reshape(dimensions, order="F" if column_major else "C") + + +def decode_typed_array(tag: cbor2.CBORTag, dtype: str): + """Decode a typed array from a CBOR tag.""" + if not isinstance(tag.value, bytes): + raise cbor2.CBORDecodeValueError("expected byte string in typed array") + return np.frombuffer(tag.value, dtype=dtype) + + +def decode_dectris_compression(tag: cbor2.CBORTag): + """Decode a Dectris compressed array from a CBOR tag.""" + algorithm, elem_size, encoded = tag.value + return decompress(encoded, algorithm, elem_size=elem_size) + + +######################################### +#### Dectris CBOR TAG Extensions ######## +######################################### + +# Mapping of various additional CBOR tags from Dectris to decoder functions +tag_decoders = { + 40: lambda tag: decode_multi_dim_array(tag, column_major=False), + 64: lambda tag: decode_typed_array(tag, dtype="u1"), + 65: lambda tag: decode_typed_array(tag, dtype=">u2"), + 66: lambda tag: decode_typed_array(tag, dtype=">u4"), + 67: lambda tag: decode_typed_array(tag, dtype=">u8"), + 68: lambda tag: decode_typed_array(tag, dtype="u1"), + 69: lambda tag: decode_typed_array(tag, dtype=" Generator[DetectorList, None, None]: ), DetectorListElement( id=2, - description="EIGER 8.5M (tmp)", + description="EIGER 9M", serial_number="123456", base_ipv4_addr="192.168.0.1", udp_interface_count=1, @@ -103,7 +104,11 @@ def eiger_1_5m(mock_scan_info) -> Generator[Eiger1_5M, None, None]: name = "eiger_1_5m" dev = Eiger1_5M(name=name, beam_center=(256, 256), detector_distance=100.0) dev.scan_info.msg = mock_scan_info - yield dev + try: + yield dev + finally: + if dev._destroyed is False: + dev.destroy() @pytest.fixture(scope="function") @@ -113,7 +118,19 @@ def eiger_9m(mock_scan_info) -> Generator[Eiger9M, None, None]: name = "eiger_9m" dev = Eiger9M(name=name) dev.scan_info.msg = mock_scan_info - yield dev + try: + yield dev + finally: + if dev._destroyed is False: + dev.destroy() + + +def test_eiger_wait_for_connection(eiger_1_5m, eiger_9m): + """Test the wait_for_connection metho is calling status_get on the JFJ API client.""" + for eiger in (eiger_1_5m, eiger_9m): + with mock.patch.object(eiger.jfj_client.api, "status_get") as mock_status_get: + eiger.wait_for_connection(timeout=1) + mock_status_get.assert_called_once_with(_request_timeout=1) @pytest.mark.parametrize("detector_state", ["Idle", "Inactive"]) @@ -141,7 +158,7 @@ def test_eiger_1_5m_on_connected(eiger_1_5m, detector_list, detector_state): else: eiger.on_connected() assert mock_set_det.call_args == mock.call( - DetectorSettings(frame_time_us=500, timing=DetectorTiming.TRIGGER), timeout=10 + DetectorSettings(frame_time_us=500, timing=DetectorTiming.TRIGGER), timeout=5 ) assert mock_file_writer.call_args == mock.call( file_writer_settings=FileWriterSettings( @@ -179,7 +196,7 @@ def test_eiger_9m_on_connected(eiger_9m, detector_list, detector_state): else: eiger.on_connected() assert mock_set_det.call_args == mock.call( - DetectorSettings(frame_time_us=500, timing=DetectorTiming.TRIGGER), timeout=10 + DetectorSettings(frame_time_us=500, timing=DetectorTiming.TRIGGER), timeout=5 ) assert mock_file_writer.call_args == mock.call( file_writer_settings=FileWriterSettings( @@ -216,11 +233,39 @@ def test_eiger_on_stop(eiger_1_5m): stop_event.wait(timeout=5) # Thread should be killed from task_handler +def test_eiger_on_destroy(eiger_1_5m): + """Test the on_destroy logic of the Eiger detector. This is equivalent for 9M and 1_5M.""" + eiger = eiger_1_5m + start_event = threading.Event() + stop_event = threading.Event() + + def tmp_task(): + start_event.set() + try: + while True: + time.sleep(0.1) + finally: + stop_event.set() + + eiger.task_handler.submit_task(tmp_task) + start_event.wait(timeout=5) + + with ( + mock.patch.object(eiger.jfj_preview_client, "stop") as mock_jfj_preview_client_stop, + mock.patch.object(eiger.jfj_client, "stop") as mock_jfj_client_stop, + ): + eiger.on_destroy() + mock_jfj_preview_client_stop.assert_called_once() + mock_jfj_client_stop.assert_called_once() + stop_event.wait(timeout=5) + + @pytest.mark.timeout(25) @pytest.mark.parametrize("raise_timeout", [True, False]) def test_eiger_on_complete(eiger_1_5m, raise_timeout): """Test the on_complete logic of the Eiger detector. This is equivalent for 9M and 1_5M.""" eiger = eiger_1_5m + eiger._wait_for_on_complete = 1 # reduce wait time for testing callback_completed_event = threading.Event() @@ -230,7 +275,7 @@ def test_eiger_on_complete(eiger_1_5m, raise_timeout): unblock_wait_for_idle = threading.Event() - def mock_wait_for_idle(timeout: int, request_timeout: float): + def mock_wait_for_idle(timeout: float, raise_on_timeout: bool) -> bool: if unblock_wait_for_idle.wait(timeout): if raise_timeout: return False @@ -238,11 +283,18 @@ def test_eiger_on_complete(eiger_1_5m, raise_timeout): return False with ( + mock.patch.object( + eiger.jfj_client.api, "status_get", return_value=BrokerStatus(state="Idle") + ), mock.patch.object(eiger.jfj_client, "wait_for_idle", side_effect=mock_wait_for_idle), mock.patch.object( eiger.jfj_client.api, "statistics_data_collection_get", - return_value=MeasurementStatistics(run_number=1), + return_value=MeasurementStatistics( + run_number=1, + images_collected=eiger.scan_info.msg.num_points + * eiger.scan_info.msg.scan_parameters["frames_per_trigger"], + ), ), ): status = eiger.complete() @@ -284,7 +336,7 @@ def test_eiger_file_event_callback(eiger_1_5m, tmp_path): assert file_msg.hinted_h5_entries == {"data": "entry/data/data"} -def test_eiger_on_sage(eiger_1_5m): +def test_eiger_on_stage(eiger_1_5m): """Test the on_stage and on_unstage logic of the Eiger detector. This is equivalent for 9M and 1_5M.""" eiger = eiger_1_5m scan_msg = eiger.scan_info.msg @@ -316,3 +368,35 @@ def test_eiger_on_sage(eiger_1_5m): ) assert mock_start.call_args == mock.call(settings=data_settings) assert eiger.staged is Staged.yes + + +def test_eiger_set_det_distance_test_beam_center(eiger_1_5m): + """Test the set_detector_distance and set_beam_center methods. Equivalent for 9M and 1_5M.""" + eiger = eiger_1_5m + old_distance = eiger.detector_distance + new_distance = old_distance + 100 + old_beam_center = eiger.beam_center + new_beam_center = (old_beam_center[0] + 20, old_beam_center[1] + 50) + eiger.set_detector_distance(new_distance) + assert eiger.detector_distance == new_distance + eiger.set_beam_center(x=new_beam_center[0], y=new_beam_center[1]) + assert eiger.beam_center == new_beam_center + with pytest.raises(ValueError): + eiger.set_beam_center(x=-10, y=100) # Cannot set negative beam center + with pytest.raises(ValueError): + eiger.detector_distance = -50 # Cannot set negative detector distance + + +def test_eiger_preview_callback(eiger_1_5m): + """Preview callback test for the Eiger detector. This is equivalent for 9M and 1_5M.""" + eiger = eiger_1_5m + # NOTE: I don't find models for the CBOR messages used by JFJ, currently using a dummay dict. + # Please adjust once the proper model is found. + for msg_type in ["start", "end", "image", "calibration", "metadata"]: + msg = {"type": msg_type, "data": {"default": np.array([[1, 2], [3, 4]])}} + with mock.patch.object(eiger.preview_image, "put") as mock_preview_put: + eiger._preview_callback(msg) + if msg_type == "image": + mock_preview_put.assert_called_once_with(msg["data"]["default"]) + else: + mock_preview_put.assert_not_called()