refactor(eiger): cleanup, remove auto initialization

This commit is contained in:
2025-09-10 21:08:24 +02:00
parent 19a95d325b
commit bac733e94b
3 changed files with 78 additions and 87 deletions

View File

@@ -48,31 +48,30 @@ from __future__ import annotations
import os
import time
import traceback
from typing import TYPE_CHECKING, Literal
import requests
import yaml
from bec_lib.file_utils import get_full_path
from bec_lib.logger import bec_logger
from jfjoch_client.models.dataset_settings import DatasetSettings
from jfjoch_client.models.detector_selection import DetectorSelection
from jfjoch_client.models.detector_settings import DetectorSettings
from jfjoch_client.models.detector_state import DetectorState
from jfjoch_client.models.detector_timing import DetectorTiming
from jfjoch_client.models.file_writer_format import FileWriterFormat
from jfjoch_client.models.file_writer_settings import FileWriterSettings
from ophyd import Component as Cpt
from ophyd import DeviceStatus
from ophyd_devices import FileEventSignal, PreviewSignal
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
from csaxs_bec.devices.jungfraujoch.jungfrau_joch_client import (
JungfrauJochClient,
JungfrauJochClientError,
)
from csaxs_bec.devices.jungfraujoch.jungfrau_joch_client import JungfrauJochClient
from csaxs_bec.devices.jungfraujoch.jungfraujoch_preview import JungfrauJochPreview
if TYPE_CHECKING: # pragma no cover
from bec_lib.devicemanager import ScanInfo
from bec_server.device_server.device_server import DeviceManagerDS
from jfjoch_client.models.detector_list_element import DetectorListElement
from jfjoch_client.models.measurement_statistics import MeasurementStatistics
logger = bec_logger.logger
@@ -91,6 +90,7 @@ class Eiger(PSIDeviceBase):
USER_ACCESS = ["detector_distance", "beam_center"]
file_event = Cpt(FileEventSignal, name="file_event")
preview_image = Cpt(PreviewSignal, name="preview_image", ndim=2)
def __init__(
self,
@@ -120,6 +120,9 @@ class Eiger(PSIDeviceBase):
super().__init__(name=name, scan_info=scan_info, device_manager=device_manager, **kwargs)
self._host = f"{host}:{port}"
self.jfj_client = JungfrauJochClient(host=self._host, parent=self)
self.jfj_preview_client = JungfrauJochPreview(
url="tcp://129.129.95.114:5400", cb=self.preview_image.put
) # IP of sls-jfjoch-001.psi.ch on port 5400 for ZMQ stream
self.device_manager = device_manager
self.detector_name = detector_name
self._detector_distance = detector_distance
@@ -166,35 +169,57 @@ class Eiger(PSIDeviceBase):
"""
logger.debug(f"On connected called for {self.name}")
self.jfj_client.stop(request_timeout=3)
# If stop failed, it will not raise. So the next calls may raise instead.. TODO logic could be discussed
# Check which detector is selected
# Get available detectors
available_detectors = self.jfj_client.api.config_select_detector_get(_request_timeout=5)
detector_info = [
det for det in available_detectors.detectors if det.description == self.detector_name
]
# If the specified detector is not found, or multiple definitions for that name are available, something is wrong.
if len(detector_info) != 1:
raise ValueError(
f"Detector {self.detector_name} not found in available detectors: {available_detectors}"
# Get current detector
current_detector_name = ""
if available_detectors.current_id:
detector_selection = [
det.description
for det in available_detectors.detectors
if det.id == available_detectors.current_id
]
current_detector_name = detector_selection[0] if detector_selection else ""
if current_detector_name != self.detector_name:
raise RuntimeError(
f"Please select and initialise the detector {self.detector_name} in the WEB UI: {self._host}."
)
detector_info: DetectorListElement = detector_info[0]
try:
self.jfj_client.api.config_select_detector_put(
DetectorSelection(id=detector_info.id), _request_timeout=10
if self.jfj_client.detector_state != DetectorState.IDLE:
raise RuntimeError(
f"Detector {self.detector_name} is not in IDLE state, current state: {self.jfj_client.detector_state}. Please initialize the detector in the WEB UI: {self._host}."
)
except requests.exceptions.Timeout:
raise TimeoutError(f"Timeout while selecting detector {self.detector_name}")
except Exception:
content = traceback.format_exc()
raise EigerError(f"Error while selecting detector {self.detector_name}: {content}")
# TODO - check again once Eiger should be initialized automatically, currently human initialization is expected
# # Once the automation should be enabled, we may use here
# detector_selection = [
# det for det in available_detectors.detectors if det.id == self.detector_name
# ]
# if not detector_selection:
# raise ValueError(
# f"Detector {self.detector_name} not found in available detectors: {[det.description for det in available_detectors.detectors]}"
# )
# det_id = detector_selection[0].id
# self.jfj_client.api.config_select_detector_put(
# detector_selection=DetectorSelection(id=det_id), _request_timeout=5
# )
# self.jfj_client.connect_and_initialise(timeout=10)
# Try to connect, needs to be in Inactive or Error state
self.jfj_client.connect_and_initialise(timeout=10, _request_timeout=10)
# This sets the energy threshold for the EIGER detector
# Setup Detector settings, here we may also set the energy already as this might be time consuming
settings = DetectorSettings(frame_time_us=int(500), timing=DetectorTiming.TRIGGER)
logger.debug(f"Setting detector_settings: {yaml.dump(settings.to_dict(), indent=4)}")
self.jfj_client.set_detector_settings(settings, timeout=10)
# Set the file writer to the appropriate output for the HDF5 file
file_writer_settings = FileWriterSettings(overwrite=True, format=FileWriterFormat.NXMXVDS)
logger.debug(
f"Setting writer_settings: {yaml.dump(file_writer_settings.to_dict(), indent=4)}"
)
self.jfj_client.api.config_file_writer_put(
file_writer_settings=file_writer_settings, _request_timeout=10
)
# Start the preview client
self.jfj_preview_client.connect()
self.jfj_preview_client.start()
logger.info(f"Connected to JungfrauJoch preview stream at {self.jfj_preview_client.url}")
def on_stage(self) -> DeviceStatus | None:
"""
@@ -203,35 +228,29 @@ class Eiger(PSIDeviceBase):
Information about the upcoming scan can be accessed from the scan_info object.
"""
start_time = time.time()
scan_msg = self.scan_info.msg
# TODO add mono energy logic, if energy changed more than ~1-2%, adapt thresholds!
# Add logic to set the detector energy threshold from mono_energy pv
# dev.mono_energy from device_manager
# Set acquisition parameter
# TODO add check of mono energy, this can then also be passed to DatasetSettings
incident_energy = 12.0
exp_time = scan_msg.scan_parameters.get("exp_time", 0)
if exp_time <= self._readout_time:
raise ValueError(
f"Receive scan request for scan {scan_msg.scan_name} with exp_time {exp_time}s, which must be larger than the readout time {self._readout_time}s of the detector {self.detector_name}."
)
frame_time_us = exp_time # - self._readout_time # Needs to be checked if needed
# convert frame time to us
# settings = DetectorSettings(frame_time_us=int(frame_time_us * 1e6))
# self.jfj_client.set_detector_settings(settings, timeout=10)
# Set acquisition parameter
frame_time_us = exp_time #
ntrigger = int(scan_msg.num_points * scan_msg.scan_parameters["frames_per_trigger"])
# Fetch file path
self._full_path = get_full_path(scan_msg, name=f"{self.name}_master")
# JFJ adds _master.h5 automatically
path = os.path.relpath(self._full_path, start="/sls/x12sa/data").rstrip("_master.h5")
self._full_path = os.path.abspath(os.path.expanduser(self._full_path))
# Inform BEC about upcoming file event
self.file_event.put(
file_path=self._full_path,
done=False,
successful=False,
hinted_h5_entries={"data": "entry/data/data"},
)
path = os.path.relpath(self._full_path, start="/sls/x12sa/data")
# JFJ adds _master.h5 automatically
path = os.path.relpath(self._full_path, start="/sls/x12sa/data").removesuffix("_master.h5")
data_settings = DatasetSettings(
image_time_us=int(frame_time_us * 1e6), # This is currently ignored
ntrigger=ntrigger,
@@ -243,15 +262,10 @@ class Eiger(PSIDeviceBase):
)
logger.debug(f"Setting data_settings: {yaml.dump(data_settings.to_dict(), indent=4)}")
prep_time = start_time - time.time()
logger.info(f"Prepared information for eiger to start acquisition in {prep_time:.2f}s")
self.jfj_client.start(settings=data_settings)
# start_call_returns = time.time() - start_time - prep_time
# logger.info(f"Start Rest call from JFJ took {start_call_returns:.2f}s")
# sleep_time = 0.5
# time.sleep(sleep_time)
# logger.info(
# f"Eiger {self.name} staged and ready for acquisition; with additional sleep of {sleep_time:.2f}s"
# )
logger.debug(f"Prepared information for eiger to start acquisition in {prep_time:.2f}s")
self.jfj_client.wait_for_idle(timeout=10, request_timeout=10) # Ensure we are in IDLE state
self.jfj_client.start(settings=data_settings) # Takes around ~0.6s
logger.debug(f"Wait for IDLE and start call took {time.time()-start_time-prep_time:.2f}s")
def on_unstage(self) -> DeviceStatus:
"""Called while unstaging the device."""
@@ -264,6 +278,7 @@ class Eiger(PSIDeviceBase):
def _file_event_callback(self, status: DeviceStatus) -> None:
"""Callback to update the file_event signal when the acquisition is done."""
logger.info(f"Acquisition done callback called for {self.name} for status {status.success}")
self.file_event.put(
file_path=self._full_path,
done=True,
@@ -275,10 +290,17 @@ class Eiger(PSIDeviceBase):
"""Called to inquire if a device has completed a scans."""
def wait_for_complete():
start_time = time.time()
timeout = 10
for _ in range(timeout):
if self.jfj_client.wait_for_idle(timeout=1, request_timeout=10):
break
return
statistics: MeasurementStatistics = self.jfj_client.api.statistics_data_collection_get(
_request_timeout=5
)
raise TimeoutError(
f"Timeout after waiting for detector {self.name} to complete for {time.time()-start_time:.2f}s, measurement statistics: {yaml.dump(statistics.to_dict(), indent=4)}"
)
status = self.task_handler.submit_task(wait_for_complete, run=True)
status.add_callback(self._file_event_callback)

View File

@@ -3,7 +3,6 @@
from __future__ import annotations
import enum
import math
import time
import traceback
from typing import TYPE_CHECKING
@@ -183,34 +182,3 @@ class JungfrauJochClient:
logger.debug(f"Waiting for device {self._parent_name} to become IDLE: {content}")
return False
return True
# # TODO improve this method for error handling and reporting...
# def wait_till_done(self, timeout: int = 10, **kwargs) -> None:
# """Wait for JungfrauJoch to be in Idle state. Blocking call with timeout.
# Args:
# timeout (int): timeout in seconds
# """
# try:
# self.api.wait_till_done_post_with_http_info(timeout=math.ceil(timeout / 2), **kwargs)
# except requests.exceptions.Timeout:
# raise TimeoutError(
# f"Timeout in JungfrauJochClient for parent device {self._parent_name} for 'wait_till_done' call"
# )
# except Exception:
# logger.info(
# f"Waiting for device {self._parent_name}, jungfrau joch to become IDLE, retry after {math.ceil(timeout / 2)} seconds"
# )
# try:
# self.api.wait_till_done_post_with_http_info(
# timeout=math.ceil(timeout / 2), **kwargs
# )
# except requests.exceptions.Timeout:
# raise TimeoutError(
# f"Timeout in JungfrauJochClient for parent device {self._parent_name} for 'wait_till_done' call"
# )
# except Exception:
# content = traceback.format_exc()
# raise JungfrauJochClientError(
# f"JungfrauJoch Error in wait_till_done post for device {self._parent_name}: {content}"
# )

View File

@@ -91,5 +91,6 @@ class JungfrauJochPreview:
self._socket.setsockopt(zmq.UNSUBSCRIBE, ZMQ_TOPIC_FILTER)
def _parse_data(self, data):
logger.info(f"Received data of length {len(data)}")
self._on_update_callback(data)
# TODO decode and parse the data
# self._on_update_callback(data)
pass