refactor: migrate devices to new scans_v4 structure

This commit is contained in:
2026-06-15 14:01:48 +02:00
parent ccd36f7bf8
commit 3397befafd
6 changed files with 69 additions and 41 deletions
@@ -37,6 +37,7 @@ import traceback
from typing import TYPE_CHECKING
from bec_lib.logger import bec_logger
from bec_server.scan_server.scans.scan_base import ScanInfo as ScanServerScanInfo
from ophyd import Component as Cpt
from ophyd import EpicsSignalRO, Kind
from ophyd_devices import CompareStatus, DeviceStatus, StatusBase, TransitionStatus
@@ -55,6 +56,7 @@ from csaxs_bec.devices.epics.delay_generator_csaxs.delay_generator_csaxs import
StatusBitsCompareStatus,
)
from csaxs_bec.devices.epics.mcs_card.mcs_card_csaxs import ACQUIRING
from csaxs_bec.devices.utils.utils import fetch_scan_info
if TYPE_CHECKING: # pragma: no cover
from bec_lib.devicemanager import DeviceManagerBase, ScanInfo
@@ -172,6 +174,7 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS):
self._poll_thread_poll_loop_done = threading.Event()
self._poll_thread_kill_event = threading.Event()
self._poll_thread.start()
self.scan_parameters: ScanServerScanInfo | None = None
# pylint: disable=attribute-defined-outside-init
def on_connected(self) -> None:
@@ -270,6 +273,7 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS):
- We set the delay pairs ef to be triggered after the shutter closes with a width of 1us to trigger the MCS card.
- Finally, we add a short sleep to ensure that the IOC and DDG HW process the values properly.
"""
self.scan_parameters = fetch_scan_info(self.scan_info)
start_time = time.time()
########################################
@@ -304,8 +308,8 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS):
### Setup timing for burst and delays ###
#########################################
frames_per_trigger = self.scan_info.msg.scan_parameters["frames_per_trigger"]
exp_time = self.scan_info.msg.scan_parameters["exp_time"]
frames_per_trigger = self.scan_parameters.frames_per_trigger
exp_time = self.scan_parameters.exp_time
# Burst Period DDG1
# Set burst_period to shutter width
@@ -344,7 +348,7 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS):
# Trigger extra pulse for MCS OR gate
# f = e + 1us
# e has refernce to d, f has reference to e
if self.scan_info.msg.scan_type == "fly":
if self.scan_parameters.scan_type == "hardware_triggered":
self.set_delay_pairs(channel="ef", delay=0, width=0)
else:
self.set_delay_pairs(channel="ef", delay=1e-6, width=1e-6)
@@ -498,9 +502,7 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS):
"""
if timeout is None:
# Default timeout of 5 seconds + exposure time * frames_per_trigger
timeout = 5 + self.scan_info.msg.scan_parameters.get(
"exp_time", 0.1
) * self.scan_info.msg.scan_parameters.get("frames_per_trigger", 1)
timeout = 5 + self.scan_parameters.exp_time * self.scan_parameters.frames_per_trigger
# Callback to cancel the status if the device is stopped
def cancel_cb(status: CompareStatus) -> None:
@@ -566,7 +568,11 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS):
# to indicate when the burst cycle is done. If no mcs card is available
# the fallback is to use the polling of the DDG
mcs = self.device_manager.devices.get("mcs", None)
if mcs is None or mcs.enabled is False or self.scan_info.msg.scan_type == "fly":
if (
mcs is None
or mcs.enabled is False
or self.scan_parameters.scan_type == "hardware_triggered"
):
self._poll_thread_poll_loop_done.wait(timeout=1)
logger.warning("Did not find mcs card with name 'mcs' in current session")
time.sleep(0.02)
@@ -25,6 +25,7 @@ Burst mode is enabled:
import time
from bec_lib.logger import bec_logger
from bec_server.scan_server.scans.scan_base import ScanInfo as ScanServerScanInfo
from ophyd_devices import DeviceStatus, StatusBase
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
@@ -39,6 +40,7 @@ from csaxs_bec.devices.epics.delay_generator_csaxs.delay_generator_csaxs import
DelayGeneratorCSAXS,
LiteralChannels,
)
from csaxs_bec.devices.utils.utils import fetch_scan_info
logger = bec_logger.logger
@@ -111,6 +113,10 @@ class DDG2(PSIDeviceBase, DelayGeneratorCSAXS):
distributes the trigger to the detectors. The DDG2 is triggered by the DDG1 through the EXT/EN channel.
"""
def on_init(self) -> None:
"""Initialize the device"""
self.scan_parameters: ScanServerScanInfo | None = None
# pylint: disable=attribute-defined-outside-init
def on_connected(self) -> None:
"""
@@ -168,6 +174,7 @@ class DDG2(PSIDeviceBase, DelayGeneratorCSAXS):
through the EXT/EN channel.
"""
start_time = time.time()
self.scan_parameters = fetch_scan_info(self.scan_info)
########################################
### Burst mode settings ################
########################################
@@ -180,8 +187,8 @@ class DDG2(PSIDeviceBase, DelayGeneratorCSAXS):
if self.burst_delay.get() != 0:
self.burst_delay.put(0)
exp_time = self.scan_info.msg.scan_parameters["exp_time"]
frames_per_trigger = self.scan_info.msg.scan_parameters["frames_per_trigger"]
exp_time = self.scan_parameters.exp_time
frames_per_trigger = self.scan_parameters.frames_per_trigger
# NOTE Check if the exposure time is longer than all readout times.
# Raise a ValueError if requested exposure time is too short.
@@ -20,9 +20,10 @@ from typing import TYPE_CHECKING, Callable, Literal
import numpy as np
from bec_lib.logger import bec_logger
from ophyd.utils.errors import WaitTimeoutError
from bec_server.scan_server.scans.scan_base import ScanInfo as ScanServerScanInfo
from ophyd import Component as Cpt
from ophyd import EpicsSignalRO, Kind
from ophyd.utils.errors import WaitTimeoutError
from ophyd_devices import (
AsyncMultiSignal,
CompareStatus,
@@ -43,6 +44,7 @@ from csaxs_bec.devices.epics.mcs_card.mcs_card import (
READMODE,
MCSCard,
)
from csaxs_bec.devices.utils.utils import fetch_scan_info
@contextmanager
@@ -168,6 +170,7 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
self._mca_counter_index: int = 0
self._current_data: dict[str, dict[Literal["value", "timestamp"], list[int] | float]] = {}
self._omit_mca_callbacks: threading.Event = threading.Event()
self.scan_parameters: ScanServerScanInfo | None = None
def on_connected(self):
"""
@@ -335,6 +338,7 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
'_start_monitor_async_data_emission', '_scan_done_callbacks', and '_current_data'.
"""
start_time = time.time()
self.scan_parameters = fetch_scan_info(self.scan_info)
# NOTE: If for some reason, the card is still acquiring, we need to stop it first
# This should never happen as the card is properly stopped during unstage
@@ -364,14 +368,14 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
#####################################
### Setup Acquisition Parameters ###
#####################################
triggers = self.scan_info.msg.scan_parameters.get("frames_per_trigger", 1)
num_points = self.scan_info.msg.num_points
triggers = self.scan_parameters.frames_per_trigger
num_points = self.scan_parameters.num_points
self._num_total_triggers = triggers * num_points
self._acquisition_group = "monitored" if triggers == 1 else "burst_group"
self.preset_real.set(0).wait(timeout=self._pv_timeout)
if self.scan_info.msg.scan_type == "step":
if self.scan_parameters.scan_type == "software_triggered":
self.num_use_all.set(triggers).wait(timeout=self._pv_timeout)
elif self.scan_info.msg.scan_type == "fly":
elif self.scan_parameters.scan_type == "hardware_triggered":
self.num_use_all.set(self._num_total_triggers).wait(timeout=self._pv_timeout)
# Clear any previous data, just to be sure
@@ -395,7 +399,7 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
self._omit_mca_callbacks.clear()
# For a fly scan we need to start the mcs card ourselves
if self.scan_info.msg.scan_type == "fly":
if self.scan_parameters.scan_type == "hardware_triggered":
self.erase_start.put(1)
logger.info(f"MCS Card {self.name} on_stage completed in {time.time() - start_time:.3f}s.")
@@ -406,7 +410,7 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
that the card is properly started for fly scans. For step scans, this will be handled by the DDG,
so no action is required here.
"""
if self.scan_info.msg.scan_type == "fly":
if self.scan_parameters.scan_type == "hardware_triggered":
status_acquiring = CompareStatus(self.acquiring, ACQUIRING.ACQUIRING)
self.cancel_on_stop(status_acquiring)
return status_acquiring
@@ -444,11 +448,11 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
while self._start_monitor_async_data_emission.wait():
try:
if (
hasattr(self.scan_info.msg, "num_points")
and self.scan_info.msg.num_points is not None
hasattr(self.scan_parameters, "num_points")
and self.scan_parameters.num_points is not None
):
if self.scan_info.msg.scan_type == "step":
if self._current_data_index == self.scan_info.msg.num_points:
if self.scan_parameters.scan_type == "software_triggered":
if self._current_data_index == self.scan_parameters.num_points:
for callback in self._scan_done_callbacks:
callback(exception=None)
else:
@@ -513,13 +517,14 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
# that the acquisition finishes on the card and that data is emitted to BEC. If the acquisition
# was already finished (i.e. normal step scan sends 1 extra pulse per burst cycle), this will
# not have any effect as the card will already be in DONE state and signal.
if self.scan_info.msg.scan_type == "fly":
if self.scan_parameters.scan_type == "hardware_triggered":
expected_points = int(
self.scan_info.msg.num_points
* self.scan_info.msg.scan_parameters.get("frames_per_trigger", 1)
)
status = CompareStatus(self.current_channel, expected_points-1, operation_success=">=")
self.scan_parameters.num_points * self.scan_parameters.frames_per_trigger
)
status = CompareStatus(
self.current_channel, expected_points - 1, operation_success=">="
)
try:
status.wait(timeout=5)
except WaitTimeoutError:
+7 -5
View File
@@ -39,6 +39,7 @@ from typing import TYPE_CHECKING, Literal
import yaml
from bec_lib.file_utils import get_full_path
from bec_lib.logger import bec_logger
from bec_server.scan_server.scans.scan_base import ScanInfo as ScanServerScanInfo
from jfjoch_client.models.dataset_settings import DatasetSettings
from jfjoch_client.models.detector_settings import DetectorSettings
from jfjoch_client.models.detector_state import DetectorState
@@ -52,6 +53,7 @@ from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
from csaxs_bec.devices.jungfraujoch.jungfrau_joch_client import JungfrauJochClient
from csaxs_bec.devices.jungfraujoch.jungfraujoch_preview import JungfrauJochPreview
from csaxs_bec.devices.utils.utils import fetch_scan_info
if TYPE_CHECKING: # pragma no cover
from bec_lib.devicemanager import ScanInfo
@@ -117,6 +119,7 @@ class Eiger(PSIDeviceBase):
self._wait_for_on_complete = 20 # seconds
if self.device_manager is not None:
self.device_manager: DeviceManagerDS
self.scan_parameters: ScanServerScanInfo | None = None
def _preview_callback(self, message: dict) -> None:
"""
@@ -263,26 +266,25 @@ class Eiger(PSIDeviceBase):
def on_stage(self) -> DeviceStatus | None:
"""
Hook called when staging the device. Information about the upcoming scan can be accessed from the scan_info object.
scan_msg = self.scan_info.msg
"""
start_time = time.time()
scan_msg = self.scan_info.msg
self.scan_parameters = fetch_scan_info(self.scan_info)
# TODO: Check mono energy from device in BEC
# Setting incident energy in keV
incident_energy = 12.0
# Setting up exp_time and num_triggers acquisition parameter
exp_time = scan_msg.scan_parameters.get("exp_time", 0)
exp_time = self.scan_parameters.exp_time
if exp_time <= self._readout_time: # Exp_time must be at least the readout time
raise ValueError(
f"Value error on device {self.name}: Exposure time {exp_time}s is less than readout time {self._readout_time}s."
)
self._num_triggers = int(
scan_msg.num_points * scan_msg.scan_parameters["frames_per_trigger"]
self.scan_parameters.num_points * self.scan_parameters.frames_per_trigger
)
# Setting up the full path for file writing
self._full_path = get_full_path(scan_msg, name=f"{self.name}_master")
self._full_path = get_full_path(self.scan_parameters, name=f"{self.name}_master")
self._full_path = os.path.abspath(os.path.expanduser(self._full_path))
# Inform BEC about upcoming file event
+10 -5
View File
@@ -3,9 +3,12 @@
import time
from bec_lib.logger import bec_logger
from bec_server.scan_server.scans.scan_base import ScanInfo as ScanServerScanInfo
from ophyd_devices import StatusBase
from ophyd_devices.devices.panda_box.panda_box import PandaBox, PandaState
from csaxs_bec.devices.utils.utils import fetch_scan_info
logger = bec_logger.logger
@@ -20,16 +23,19 @@ class PandaBoxCSAXS(PandaBox):
super().on_init()
self._acquisition_group = "burst"
self._timeout_on_completed = 10
self.scan_parameters: ScanServerScanInfo | None = None
def on_stage(self):
self.scan_parameters = fetch_scan_info(self.scan_info)
start_time = time.time()
super().on_stage()
# TODO, adjust as seen fit.
# Adjust the acquisition group based on scan parameters if needed
if self.scan_info.msg.scan_type == "fly":
if self.scan_parameters.scan_type == "hardware_triggered":
self._acquisition_group = "fly"
elif self.scan_info.msg.scan_type == "step":
if self.scan_info.msg.scan_parameters["frames_per_trigger"] == 1:
elif self.scan_parameters.scan_type == "software_triggered":
if self.scan_parameters.frames_per_trigger == 1:
self._acquisition_group = "monitored"
else:
self._acquisition_group = "burst"
@@ -44,8 +50,7 @@ class PandaBoxCSAXS(PandaBox):
start_time = time.monotonic()
try:
expected_points = int(
self.scan_info.msg.num_points
* self.scan_info.msg.scan_parameters.get("frames_per_trigger", 1)
self.scan_parameters.num_points * self.scan_parameters.frames_per_trigger
)
while captured < expected_points:
ret = self.send_raw("*PCAP.CAPTURED?")
@@ -3,9 +3,12 @@
import time
from bec_lib.logger import bec_logger
from bec_server.scan_server.scans.scan_base import ScanInfo as ScanServerScanInfo
from ophyd_devices import StatusBase
from ophyd_devices.devices.panda_box.panda_box import PandaBox, PandaState
from csaxs_bec.devices.utils.utils import fetch_scan_info
logger = bec_logger.logger
@@ -16,16 +19,17 @@ class PandaBoxOMNY(PandaBox):
super().on_init()
self._acquisition_group = "burst"
self._timeout_on_completed = 10
self.scan_parameters: ScanServerScanInfo | None = None
def on_stage(self):
start_time = time.time()
super().on_stage()
# TODO, adjust as seen fit.
# Adjust the acquisition group based on scan parameters if needed
if self.scan_info.msg.scan_type == "fly":
if self.scan_parameters.scan_type == "hardware_triggered":
self._acquisition_group = "fly"
elif self.scan_info.msg.scan_type == "step":
if self.scan_info.msg.scan_parameters["frames_per_trigger"] == 1:
elif self.scan_parameters.scan_type == "software_triggered":
if self.scan_parameters.frames_per_trigger == 1:
self._acquisition_group = "monitored"
else:
self._acquisition_group = "burst"
@@ -40,8 +44,7 @@ class PandaBoxOMNY(PandaBox):
start_time = time.monotonic()
try:
expected_points = int(
self.scan_info.msg.num_points
* self.scan_info.msg.scan_parameters.get("frames_per_trigger", 1)
self.scan_parameters.num_points * self.scan_parameters.frames_per_trigger
)
while captured < expected_points:
ret = self.send_raw("*PCAP.CAPTURED?")