fix: cleanup of eiger integration
Some checks failed
CI for csaxs_bec / test (push) Has been cancelled
CI for csaxs_bec / test (pull_request) Failing after 1m26s

This commit is contained in:
2026-01-29 10:51:47 +01:00
parent d8c975f563
commit c4b7486c44

View File

@@ -71,9 +71,18 @@ class EigerError(Exception):
class Eiger(PSIDeviceBase):
"""
Base integration of the Eiger1.5M and Eiger9M at cSAXS.
Args:
name (str) : Name of the device
detector_name (str): Name of the detector. Supports ["EIGER 9M", "EIGER 8.5M (tmp)", "EIGER 1.5M"]
host (str): Hostname of the Jungfrau Joch server.
port (int): Port of the Jungfrau Joch server.
scan_info (ScanInfo): The scan info to use.
device_manager (DeviceManagerDS): The device manager to use.
**kwargs: Additional keyword arguments.
"""
USER_ACCESS = ["detector_distance", "beam_center"]
USER_ACCESS = ["set_detector_distance", "set_beam_center"]
file_event = Cpt(FileEventSignal, name="file_event")
preview_image = Cpt(PreviewSignal, name="preview_image", ndim=2)
@@ -91,18 +100,6 @@ class Eiger(PSIDeviceBase):
device_manager=None,
**kwargs,
):
"""
Initialize the PSI Device Base class.
Args:
name (str) : Name of the device
detector_name (str): Name of the detector. Supports ["EIGER 9M", "EIGER 8.5M (tmp)", "EIGER 1.5M"]
host (str): Hostname of the Jungfrau Joch server.
port (int): Port of the Jungfrau Joch server.
scan_info (ScanInfo): The scan info to use.
device_manager (DeviceManagerDS): The device manager to use.
**kwargs: Additional keyword arguments.
"""
super().__init__(name=name, scan_info=scan_info, device_manager=device_manager, **kwargs)
self._host = f"{host}:{port}"
self.jfj_client = JungfrauJochClient(host=self._host, parent=self)
@@ -116,10 +113,19 @@ class Eiger(PSIDeviceBase):
self._beam_center = beam_center
self._readout_time = readout_time
self._full_path = ""
self._num_triggers = 0
if self.device_manager is not None:
self.device_manager: DeviceManagerDS
def _preview_callback(self, message: dict) -> None:
"""
Callback method for handling preview messages as received from the JungfrauJoch preview stream.
These messages are dictionary dumps as described in the JFJ ZMQ preview stream documentation.
(https://jungfraujoch.readthedocs.io/en/latest/ZEROMQ_STREAM.html#preview-stream).
Args:
message (dict): The message received from the preview stream.
"""
if message.get("type", "") == "image":
data = message.get("data", {}).get("default", None)
if data is None:
@@ -128,41 +134,70 @@ class Eiger(PSIDeviceBase):
logger.info(f"Received preview image on device {self.name}")
self.preview_image.put(data)
# pylint: disable=missing-function-docstring
@property
def detector_distance(self) -> float:
"""The detector distance in mm."""
return self._detector_distance
@detector_distance.setter
def detector_distance(self, value: float) -> None:
"""Set the detector distance in mm."""
if value <= 0:
raise ValueError("Detector distance must be a positive value.")
self._detector_distance = value
def set_detector_distance(self, distance: float) -> None:
"""
Set the detector distance in mm.
Args:
distance (float): The detector distance in mm.
"""
self.detector_distance = distance
# pylint: disable=missing-function-docstring
@property
def beam_center(self) -> tuple[float, float]:
"""The beam center in pixels. (x,y)"""
return self._beam_center
@beam_center.setter
def beam_center(self, value: tuple[float, float]) -> None:
"""Set the beam center in pixels. (x,y)"""
if any(coord < 0 for coord in value):
raise ValueError("Beam center coordinates must be non-negative.")
self._beam_center = value
def on_init(self) -> None:
def set_beam_center(self, x: float, y: float) -> None:
"""
Called when the device is initialized.
Set the beam center coordinates in pixels.
No siganls are connected at this point,
thus should not be set here but in on_connected instead.
Args:
x (float): The x coordinate of the beam center in pixels.
y (float): The y coordinate of the beam center in pixels.
"""
self.beam_center = (x, y)
def on_init(self) -> None:
"""Hook called during device initialization."""
# pylint: disable=arguments-differ
def wait_for_connection(self, timeout: float = 10) -> None:
"""
Wait for the device to be connected to the JungfrauJoch backend.
Args:
timeout (float): Timeout in seconds to wait for the connection.
"""
self.jfj_client.api.status_get(_request_timeout=timeout) # If connected, this responds
def on_connected(self) -> None:
"""
Hook called after the device is connected to through the device server.
Called after the device is connected and its signals are connected.
Default values for signals should be set here.
Default values for signals should be set here. Currently, the detector needs to be
initialised manually through the WEB UI of JungfrauJoch. Once agreed upon, the automated
initialisation can be re-enabled here (code commented below).
"""
start_time = time.time()
logger.debug(f"On connected called for {self.name}")
self.jfj_client.stop(request_timeout=3)
# Check which detector is selected
@@ -188,8 +223,8 @@ class Eiger(PSIDeviceBase):
f"Detector {self.detector_name} is not in IDLE state, current state: {self.jfj_client.detector_state}. Please initialize the detector in the WEB UI: {self._host}."
)
# TODO - check again once Eiger should be initialized automatically, currently human initialization is expected
# # Once the automation should be enabled, we may use here
# TODO - Currently the initialisation of the detector is done manually through the WEB UI. Once adjusted
# this can be automated here again.
# detector_selection = [
# det for det in available_detectors.detectors if det.id == self.detector_name
# ]
@@ -206,42 +241,50 @@ class Eiger(PSIDeviceBase):
# Setup Detector settings, here we may also set the energy already as this might be time consuming
settings = DetectorSettings(frame_time_us=int(500), timing=DetectorTiming.TRIGGER)
self.jfj_client.set_detector_settings(settings, timeout=5)
# Set the file writer to the appropriate output for the HDF5 file
file_writer_settings = FileWriterSettings(overwrite=True, format=FileWriterFormat.NXMXVDS)
logger.debug(
f"Setting writer_settings: {yaml.dump(file_writer_settings.to_dict(), indent=4)}"
)
# Setup the file writer settings
self.jfj_client.api.config_file_writer_put(
file_writer_settings=file_writer_settings, _request_timeout=10
)
# Start the preview client
self.jfj_preview_client.connect()
self.jfj_preview_client.start()
logger.info(f"Connected to JungfrauJoch preview stream at {self.jfj_preview_client.url}")
logger.info(
f"Device {self.name} initialized after {time.time()-start_time:.2f}s. Preview stream connected on url: {self.jfj_preview_client.url}"
)
def on_stage(self) -> DeviceStatus | None:
"""
Called while staging the device.
Information about the upcoming scan can be accessed from the scan_info object.
Hook called when staging the device. Information about the upcoming scan can be accessed from the scan_info object.
scan_msg = self.scan_info.msg
"""
start_time = time.time()
scan_msg = self.scan_info.msg
# Set acquisition parameter
# TODO add check of mono energy, this can then also be passed to DatasetSettings
# TODO: Check mono energy from device in BEC
# Setting incident energy in keV
incident_energy = 12.0
# Setting up exp_time and num_triggers acquisition parameter
exp_time = scan_msg.scan_parameters.get("exp_time", 0)
if exp_time <= self._readout_time:
if exp_time <= self._readout_time: # Exp_time must be at least the readout time
raise ValueError(
f"Receive scan request for scan {scan_msg.scan_name} with exp_time {exp_time}s, which must be larger than the readout time {self._readout_time}s of the detector {self.detector_name}."
f"Value error on device {self.name}: Exposure time {exp_time}s is less than readout time {self._readout_time}s."
)
frame_time_us = exp_time #
ntrigger = int(scan_msg.num_points * scan_msg.scan_parameters["frames_per_trigger"])
# Fetch file path
self._num_triggers = int(
scan_msg.num_points * scan_msg.scan_parameters["frames_per_trigger"]
)
# Setting up the full path for file writing
self._full_path = get_full_path(scan_msg, name=f"{self.name}_master")
self._full_path = os.path.abspath(os.path.expanduser(self._full_path))
# Inform BEC about upcoming file event
self.file_event.put(
file_path=self._full_path,
@@ -252,23 +295,26 @@ class Eiger(PSIDeviceBase):
# JFJ adds _master.h5 automatically
path = os.path.relpath(self._full_path, start="/sls/x12sa/data").removesuffix("_master.h5")
# path = os.path.relpath(self._full_path, start="/sls/x12sa/data")
# Create dataset settings for API call.
data_settings = DatasetSettings(
image_time_us=int(frame_time_us * 1e6), # This is currently ignored
ntrigger=ntrigger,
image_time_us=int(exp_time * 1e6),
ntrigger=self._num_triggers,
file_prefix=path,
beam_x_pxl=int(self._beam_center[0]),
beam_y_pxl=int(self._beam_center[1]),
detector_distance_mm=self.detector_distance,
incident_energy_ke_v=incident_energy,
)
logger.info(f"Setting data_settings: {yaml.dump(data_settings.to_dict(), indent=4)}")
logger.debug(f"Setting data_settings: {yaml.dump(data_settings.to_dict(), indent=4)}")
prep_time = time.time()
self.jfj_client.wait_for_idle(timeout=10, request_timeout=10) # Ensure we are in IDLE state
self.jfj_client.wait_for_idle(timeout=10) # Ensure we are in IDLE state
self.jfj_client.start(settings=data_settings) # Takes around ~0.6s
# Time the stage process
logger.info(
f"On stage done for device {self.name} after {time.time()-start_time:.2f}s, with {time.time()-prep_time:.2f}s spent with communication to JungfrauJoch."
f"Device {self.name} staged for scan. Time spent {time.time()-start_time:.2f}s,"
f" with {time.time()-prep_time:.2f}s spent with communication to JungfrauJoch."
)
def on_unstage(self) -> DeviceStatus:
@@ -283,7 +329,7 @@ class Eiger(PSIDeviceBase):
def _file_event_callback(self, status: DeviceStatus) -> None:
"""Callback to update the file_event signal when the acquisition is done."""
logger.debug(
f"Acquisition done callback called for {self.name} for status {status.success}"
f"File event callback on complete status for device {self.name}: done={status.done}, successful={status.success}"
)
self.file_event.put(
file_path=self._full_path,
@@ -293,28 +339,35 @@ class Eiger(PSIDeviceBase):
)
def on_complete(self) -> DeviceStatus:
"""Called to inquire if a device has completed a scans."""
"""
Called at the end of the scan. The method should implement an asynchronous wait for the
device to complete the acquisition. A callback to update the file_event signal is
attached that resolves the file event when the acquisition is done.
Returns:
DeviceStatus: The status object representing the completion of the acquisition.
"""
def wait_for_complete():
start_time = time.time()
timeout = 20
# NOTE: This adjust the time (s) that should be waited for completion of the scan.
timeout = 20 #
while time.time() - start_time < timeout:
if self.jfj_client.wait_for_idle(
timeout=1, request_timeout=1, raise_on_timeout=False
):
# TODO add check if data acquisition finished in success
if self.jfj_client.wait_for_idle(timeout=1, raise_on_timeout=False):
# TODO: Once available, add check for
statistics: MeasurementStatistics = (
self.jfj_client.api.statistics_data_collection_get(_request_timeout=5)
)
broker_status = self.jfj_client.jjf_state
logger.info(
f"Device {self.name} completed acquisition. \n \n"
f"Broker status: \n{yaml.dump(broker_status.to_dict(), indent=4)} \n \n"
f"statistics: \n{yaml.dump(statistics.to_dict(), indent=4)}"
)
if statistics.images_collected < self._num_triggers:
raise EigerError(
f"Device {self.name} acquisition incomplete. "
f"Expected {self._num_triggers} triggers, "
f"but only {statistics.images_collected} were collected."
)
return
logger.info(
f"Device {self.name} running loop to wait for complete, time elapsed: {time.time() - start_time}."
f"Waiting for device {self.name} to finish complete, time elapsed: "
f"{time.time() - start_time}."
)
statistics: MeasurementStatistics = self.jfj_client.api.statistics_data_collection_get(
_request_timeout=5