From c4b7486c44da099fa259738048949a4b54afcaa4 Mon Sep 17 00:00:00 2001 From: appel_c Date: Thu, 29 Jan 2026 10:51:47 +0100 Subject: [PATCH] fix: cleanup of eiger integration --- csaxs_bec/devices/jungfraujoch/eiger.py | 163 ++++++++++++++++-------- 1 file changed, 108 insertions(+), 55 deletions(-) diff --git a/csaxs_bec/devices/jungfraujoch/eiger.py b/csaxs_bec/devices/jungfraujoch/eiger.py index f8273de..e175ba4 100644 --- a/csaxs_bec/devices/jungfraujoch/eiger.py +++ b/csaxs_bec/devices/jungfraujoch/eiger.py @@ -71,9 +71,18 @@ class EigerError(Exception): class Eiger(PSIDeviceBase): """ Base integration of the Eiger1.5M and Eiger9M at cSAXS. + + Args: + name (str) : Name of the device + detector_name (str): Name of the detector. Supports ["EIGER 9M", "EIGER 8.5M (tmp)", "EIGER 1.5M"] + host (str): Hostname of the Jungfrau Joch server. + port (int): Port of the Jungfrau Joch server. + scan_info (ScanInfo): The scan info to use. + device_manager (DeviceManagerDS): The device manager to use. + **kwargs: Additional keyword arguments. """ - USER_ACCESS = ["detector_distance", "beam_center"] + USER_ACCESS = ["set_detector_distance", "set_beam_center"] file_event = Cpt(FileEventSignal, name="file_event") preview_image = Cpt(PreviewSignal, name="preview_image", ndim=2) @@ -91,18 +100,6 @@ class Eiger(PSIDeviceBase): device_manager=None, **kwargs, ): - """ - Initialize the PSI Device Base class. - - Args: - name (str) : Name of the device - detector_name (str): Name of the detector. Supports ["EIGER 9M", "EIGER 8.5M (tmp)", "EIGER 1.5M"] - host (str): Hostname of the Jungfrau Joch server. - port (int): Port of the Jungfrau Joch server. - scan_info (ScanInfo): The scan info to use. - device_manager (DeviceManagerDS): The device manager to use. - **kwargs: Additional keyword arguments. - """ super().__init__(name=name, scan_info=scan_info, device_manager=device_manager, **kwargs) self._host = f"{host}:{port}" self.jfj_client = JungfrauJochClient(host=self._host, parent=self) @@ -116,10 +113,19 @@ class Eiger(PSIDeviceBase): self._beam_center = beam_center self._readout_time = readout_time self._full_path = "" + self._num_triggers = 0 if self.device_manager is not None: self.device_manager: DeviceManagerDS def _preview_callback(self, message: dict) -> None: + """ + Callback method for handling preview messages as received from the JungfrauJoch preview stream. + These messages are dictionary dumps as described in the JFJ ZMQ preview stream documentation. + (https://jungfraujoch.readthedocs.io/en/latest/ZEROMQ_STREAM.html#preview-stream). + + Args: + message (dict): The message received from the preview stream. + """ if message.get("type", "") == "image": data = message.get("data", {}).get("default", None) if data is None: @@ -128,41 +134,70 @@ class Eiger(PSIDeviceBase): logger.info(f"Received preview image on device {self.name}") self.preview_image.put(data) + # pylint: disable=missing-function-docstring @property def detector_distance(self) -> float: - """The detector distance in mm.""" return self._detector_distance @detector_distance.setter def detector_distance(self, value: float) -> None: - """Set the detector distance in mm.""" if value <= 0: raise ValueError("Detector distance must be a positive value.") self._detector_distance = value + def set_detector_distance(self, distance: float) -> None: + """ + Set the detector distance in mm. + + Args: + distance (float): The detector distance in mm. + """ + self.detector_distance = distance + + # pylint: disable=missing-function-docstring @property def beam_center(self) -> tuple[float, float]: - """The beam center in pixels. (x,y)""" return self._beam_center @beam_center.setter def beam_center(self, value: tuple[float, float]) -> None: - """Set the beam center in pixels. (x,y)""" + if any(coord < 0 for coord in value): + raise ValueError("Beam center coordinates must be non-negative.") self._beam_center = value - def on_init(self) -> None: + def set_beam_center(self, x: float, y: float) -> None: """ - Called when the device is initialized. + Set the beam center coordinates in pixels. - No siganls are connected at this point, - thus should not be set here but in on_connected instead. + Args: + x (float): The x coordinate of the beam center in pixels. + y (float): The y coordinate of the beam center in pixels. """ + self.beam_center = (x, y) + + def on_init(self) -> None: + """Hook called during device initialization.""" + + # pylint: disable=arguments-differ + def wait_for_connection(self, timeout: float = 10) -> None: + """ + Wait for the device to be connected to the JungfrauJoch backend. + + Args: + timeout (float): Timeout in seconds to wait for the connection. + """ + self.jfj_client.api.status_get(_request_timeout=timeout) # If connected, this responds def on_connected(self) -> None: """ + Hook called after the device is connected to through the device server. + Called after the device is connected and its signals are connected. - Default values for signals should be set here. + Default values for signals should be set here. Currently, the detector needs to be + initialised manually through the WEB UI of JungfrauJoch. Once agreed upon, the automated + initialisation can be re-enabled here (code commented below). """ + start_time = time.time() logger.debug(f"On connected called for {self.name}") self.jfj_client.stop(request_timeout=3) # Check which detector is selected @@ -188,8 +223,8 @@ class Eiger(PSIDeviceBase): f"Detector {self.detector_name} is not in IDLE state, current state: {self.jfj_client.detector_state}. Please initialize the detector in the WEB UI: {self._host}." ) - # TODO - check again once Eiger should be initialized automatically, currently human initialization is expected - # # Once the automation should be enabled, we may use here + # TODO - Currently the initialisation of the detector is done manually through the WEB UI. Once adjusted + # this can be automated here again. # detector_selection = [ # det for det in available_detectors.detectors if det.id == self.detector_name # ] @@ -206,42 +241,50 @@ class Eiger(PSIDeviceBase): # Setup Detector settings, here we may also set the energy already as this might be time consuming settings = DetectorSettings(frame_time_us=int(500), timing=DetectorTiming.TRIGGER) self.jfj_client.set_detector_settings(settings, timeout=5) + # Set the file writer to the appropriate output for the HDF5 file file_writer_settings = FileWriterSettings(overwrite=True, format=FileWriterFormat.NXMXVDS) logger.debug( f"Setting writer_settings: {yaml.dump(file_writer_settings.to_dict(), indent=4)}" ) + # Setup the file writer settings self.jfj_client.api.config_file_writer_put( file_writer_settings=file_writer_settings, _request_timeout=10 ) + # Start the preview client self.jfj_preview_client.connect() self.jfj_preview_client.start() - logger.info(f"Connected to JungfrauJoch preview stream at {self.jfj_preview_client.url}") + logger.info( + f"Device {self.name} initialized after {time.time()-start_time:.2f}s. Preview stream connected on url: {self.jfj_preview_client.url}" + ) def on_stage(self) -> DeviceStatus | None: """ - Called while staging the device. - - Information about the upcoming scan can be accessed from the scan_info object. + Hook called when staging the device. Information about the upcoming scan can be accessed from the scan_info object. + scan_msg = self.scan_info.msg """ start_time = time.time() scan_msg = self.scan_info.msg - # Set acquisition parameter - # TODO add check of mono energy, this can then also be passed to DatasetSettings + # TODO: Check mono energy from device in BEC + # Setting incident energy in keV incident_energy = 12.0 + # Setting up exp_time and num_triggers acquisition parameter exp_time = scan_msg.scan_parameters.get("exp_time", 0) - if exp_time <= self._readout_time: + if exp_time <= self._readout_time: # Exp_time must be at least the readout time raise ValueError( - f"Receive scan request for scan {scan_msg.scan_name} with exp_time {exp_time}s, which must be larger than the readout time {self._readout_time}s of the detector {self.detector_name}." + f"Value error on device {self.name}: Exposure time {exp_time}s is less than readout time {self._readout_time}s." ) - frame_time_us = exp_time # - ntrigger = int(scan_msg.num_points * scan_msg.scan_parameters["frames_per_trigger"]) - # Fetch file path + self._num_triggers = int( + scan_msg.num_points * scan_msg.scan_parameters["frames_per_trigger"] + ) + + # Setting up the full path for file writing self._full_path = get_full_path(scan_msg, name=f"{self.name}_master") self._full_path = os.path.abspath(os.path.expanduser(self._full_path)) + # Inform BEC about upcoming file event self.file_event.put( file_path=self._full_path, @@ -252,23 +295,26 @@ class Eiger(PSIDeviceBase): # JFJ adds _master.h5 automatically path = os.path.relpath(self._full_path, start="/sls/x12sa/data").removesuffix("_master.h5") - # path = os.path.relpath(self._full_path, start="/sls/x12sa/data") + # Create dataset settings for API call. data_settings = DatasetSettings( - image_time_us=int(frame_time_us * 1e6), # This is currently ignored - ntrigger=ntrigger, + image_time_us=int(exp_time * 1e6), + ntrigger=self._num_triggers, file_prefix=path, beam_x_pxl=int(self._beam_center[0]), beam_y_pxl=int(self._beam_center[1]), detector_distance_mm=self.detector_distance, incident_energy_ke_v=incident_energy, ) - logger.info(f"Setting data_settings: {yaml.dump(data_settings.to_dict(), indent=4)}") + logger.debug(f"Setting data_settings: {yaml.dump(data_settings.to_dict(), indent=4)}") prep_time = time.time() - self.jfj_client.wait_for_idle(timeout=10, request_timeout=10) # Ensure we are in IDLE state + self.jfj_client.wait_for_idle(timeout=10) # Ensure we are in IDLE state self.jfj_client.start(settings=data_settings) # Takes around ~0.6s + + # Time the stage process logger.info( - f"On stage done for device {self.name} after {time.time()-start_time:.2f}s, with {time.time()-prep_time:.2f}s spent with communication to JungfrauJoch." + f"Device {self.name} staged for scan. Time spent {time.time()-start_time:.2f}s," + f" with {time.time()-prep_time:.2f}s spent with communication to JungfrauJoch." ) def on_unstage(self) -> DeviceStatus: @@ -283,7 +329,7 @@ class Eiger(PSIDeviceBase): def _file_event_callback(self, status: DeviceStatus) -> None: """Callback to update the file_event signal when the acquisition is done.""" logger.debug( - f"Acquisition done callback called for {self.name} for status {status.success}" + f"File event callback on complete status for device {self.name}: done={status.done}, successful={status.success}" ) self.file_event.put( file_path=self._full_path, @@ -293,28 +339,35 @@ class Eiger(PSIDeviceBase): ) def on_complete(self) -> DeviceStatus: - """Called to inquire if a device has completed a scans.""" + """ + Called at the end of the scan. The method should implement an asynchronous wait for the + device to complete the acquisition. A callback to update the file_event signal is + attached that resolves the file event when the acquisition is done. + + Returns: + DeviceStatus: The status object representing the completion of the acquisition. + """ def wait_for_complete(): start_time = time.time() - timeout = 20 + # NOTE: This adjust the time (s) that should be waited for completion of the scan. + timeout = 20 # while time.time() - start_time < timeout: - if self.jfj_client.wait_for_idle( - timeout=1, request_timeout=1, raise_on_timeout=False - ): - # TODO add check if data acquisition finished in success + if self.jfj_client.wait_for_idle(timeout=1, raise_on_timeout=False): + # TODO: Once available, add check for statistics: MeasurementStatistics = ( self.jfj_client.api.statistics_data_collection_get(_request_timeout=5) ) - broker_status = self.jfj_client.jjf_state - logger.info( - f"Device {self.name} completed acquisition. \n \n" - f"Broker status: \n{yaml.dump(broker_status.to_dict(), indent=4)} \n \n" - f"statistics: \n{yaml.dump(statistics.to_dict(), indent=4)}" - ) + if statistics.images_collected < self._num_triggers: + raise EigerError( + f"Device {self.name} acquisition incomplete. " + f"Expected {self._num_triggers} triggers, " + f"but only {statistics.images_collected} were collected." + ) return logger.info( - f"Device {self.name} running loop to wait for complete, time elapsed: {time.time() - start_time}." + f"Waiting for device {self.name} to finish complete, time elapsed: " + f"{time.time() - start_time}." ) statistics: MeasurementStatistics = self.jfj_client.api.statistics_data_collection_get( _request_timeout=5