wip eiger, improve stage procedure for BEC core scans

This commit is contained in:
gac-x12sa
2025-03-24 08:22:46 +01:00
parent ee8fa8b962
commit 2b4a13ebc2

View File

@@ -1,21 +1,34 @@
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
from ophyd import DeviceStatus
from csaxs_bec.devices.jungfraujoch.jungfrau_joch_client import JungfrauJochClient
from jfjoch_client.models.dataset_settings import DatasetSettings
from typing import TYPE_CHECKING
from bec_lib.devicemanager import ScanInfo
if TYPE_CHECKING: #pragma no cover
from bec_lib.devicemanager import ScanInfo
from jfjoch_client.models.dataset_settings import DatasetSettings
from jfjoch_client.models.detector_settings import DetectorSettings
from ophyd import DeviceStatus
from csaxs_bec.devices.jungfraujoch.jungfrau_joch_client import JungfrauJochClient
from csaxs_bec.devices.jungfraujoch.readout_constants import EIGER9M_READOUT_TIME
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
if TYPE_CHECKING: # pragma no cover
from bec_lib.devicemanager import ScanInfo
class EigerCSAXS(PSIDeviceBase):
class EigerCSAXS(PSIDeviceBase):
########################################
# Beamline Specific Implementations #
########################################
def __init__(self, name: str, host:str="http://sls-jfjoch-001", port:int=8080, scan_info:ScanInfo=None, **kwargs):
FREQUENCY_BIT_DEPTH_CHANGE = 100 # Hz
def __init__(
self,
name: str,
host: str = "http://sls-jfjoch-001",
port: int = 8080,
scan_info: ScanInfo = None,
**kwargs,
):
"""
Initialize the PSI Device Base class.
@@ -26,6 +39,7 @@ class EigerCSAXS(PSIDeviceBase):
super().__init__(name=name, scan_info=scan_info, **kwargs)
self._host = f"{host}:{port}"
self.jfj_client = JungfrauJochClient(host=self._host, parent=self)
self._bit_depth = 16
def on_init(self) -> None:
"""
@@ -41,6 +55,12 @@ class EigerCSAXS(PSIDeviceBase):
Default values for signals should be set here.
"""
self.jfj_client.connect_and_initialise(timeout=5)
settings = DetectorSettings(
frame_time_us=480, # EIGER9M_READOUT_TIME is in seconds
bit_depth=self._bit_depth,
timing="gated", # To be checked
)
self.jfj_client.set_detector_settings(settings)
def on_stage(self) -> DeviceStatus | None:
"""
@@ -48,22 +68,39 @@ class EigerCSAXS(PSIDeviceBase):
Information about the upcoming scan can be accessed from the scan_info object.
"""
if self.scan_info.msg.scan_name != "jfj_test":
return
num_burst_cycle = self.scan_info.msg.scan_parameters['num_points']
cycles = self.scan_info.msg.scan_parameters['cycles']
exp_time = self.scan_info.msg.scan_parameters['exp_time']
total_points = num_burst_cycle * cycles
settings = DatasetSettings(
image_time_us= int(exp_time*1e6),
ntrigger = total_points,
beam_x_pxl=0,
beam_y_pxl=0,
detector_distance_mm=100,
incident_energy_keV=10.00,
)
self.jfj_client.start(settings = settings)
if self.scan_info.msg.scan_type == "step":
exp_time = self.scan_info.msg.scan_parameters["exp_time"] # in seconds
num_points = self.scan_info.msg.num_points
if (
exp_time < 1 / self.FREQUENCY_BIT_DEPTH_CHANGE
): # TODO add check that it should only change this if needed.
self._bit_depth = 32
else:
self._bit_depth = 16
if exp_time < 0.48e-3: # 480us (microseconds)
raise ValueError(
f"Frame time is lower than 480, smalles time in milliseconds: {exp_time}"
)
settings = DetectorSettings(
frame_time_us=int(exp_time * 1e6), # Time is in us, needs to be multiplied by 1e6
bit_depth=self._bit_depth,
)
self.jfj_client.set_detector_settings(settings)
self.jfj_client.wait_till_done(timeout=5) # blocking call
data_settings = DatasetSettings(
image_time_us=int(exp_time * 1e6),
ntrigger=num_points,
beam_x_pxl=0,
beam_y_pxl=0,
detector_distance_mm=100,
incident_energy_keV=10.00,
)
# status = self.task_handler.submit_task(
# self.jfj_client.start, task_args=(data_settings,), run=True
# )
# return status
self.jfj_client.start(settings=data_settings)
def on_unstage(self) -> DeviceStatus | None:
"""Called while unstaging the device."""
@@ -76,8 +113,7 @@ class EigerCSAXS(PSIDeviceBase):
def on_complete(self) -> DeviceStatus | None:
"""Called to inquire if a device has completed a scans."""
if self.scan_info.msg.scan_name != "jfj_test":
return
def wait_for_complete():
timeout = 10
for _ in range(timeout):
@@ -93,10 +129,10 @@ class EigerCSAXS(PSIDeviceBase):
status = self.task_handler.submit_task(wait_for_complete, run=True)
return status
def on_kickoff(self) -> DeviceStatus | None:
"""Called to kickoff a device for a fly scan. Has to be called explicitly."""
def on_stop(self) -> None:
"""Called when the device is stopped."""
self.jfj_client.stop()
self.jfj_client.stop()
self.task_handler.shutdown()