refactor: reviewed and refactored based class for device integration

This commit is contained in:
appel_c 2024-12-04 15:11:15 +01:00
parent d5d0d5a57c
commit 5b55ff25b6
2 changed files with 900 additions and 447 deletions

View File

@ -1,448 +1,459 @@
"""This module contains the base class for SLS detectors. We follow the approach to integrate from ophyd_devices.interfaces.base_classes.psi_device_base import CustomPrepare, PSIDeviceBase
PSI detectors into the BEC system based on this base class. The base class is used to implement
certain methods that are expected by BEC, such as stage, unstage, trigger, stop, etc...
We use composition with a custom prepare class to implement BL specific logic for the detector.
The beamlines need to inherit from the CustomDetectorMixing for their mixin classes."""
import os
import threading class PSIDetectorBase(PSIDeviceBase):
import time """Deprecated, use PSIDeviceBase instead. Here for backwards compatibility."""
import traceback
from bec_lib import messages class CustomDetectorMixin(CustomPrepare):
from bec_lib.endpoints import MessageEndpoints """Deprecated, use CustomPrepare instead. Here for backwards compatibility."""
from bec_lib.file_utils import FileWriter
from bec_lib.logger import bec_logger
from ophyd import Component, Device, DeviceStatus, Kind # """This module contains the base class for SLS detectors. We follow the approach to integrate
from ophyd.device import Staged # PSI detectors into the BEC system based on this base class. The base class is used to implement
# certain methods that are expected by BEC, such as stage, unstage, trigger, stop, etc...
from ophyd_devices.sim.sim_signals import SetableSignal # We use composition with a custom prepare class to implement BL specific logic for the detector.
from ophyd_devices.utils import bec_utils # The beamlines need to inherit from the CustomDetectorMixing for their mixin classes."""
from ophyd_devices.utils.bec_scaninfo_mixin import BecScaninfoMixin
from ophyd_devices.utils.errors import DeviceStopError, DeviceTimeoutError # import os
# import threading
logger = bec_logger.logger # import time
# import traceback
class DetectorInitError(Exception): # from bec_lib import messages
"""Raised when initiation of the device class fails, # from bec_lib.endpoints import MessageEndpoints
due to missing device manager or not started in sim_mode.""" # from bec_lib.file_utils import FileWriter
# from bec_lib.logger import bec_logger
# from ophyd import Component, Device, DeviceStatus, Kind
class CustomDetectorMixin: # from ophyd.device import Staged
"""
Mixin class for custom detector logic # from ophyd_devices.sim.sim_signals import SetableSignal
# from ophyd_devices.utils import bec_utils
This class is used to implement BL specific logic for the detector. # from ophyd_devices.utils.bec_scaninfo_mixin import BecScaninfoMixin
It is used in the PSIDetectorBase class. # from ophyd_devices.utils.errors import DeviceStopError, DeviceTimeoutError
For the integration of a new detector, the following functions should # logger = bec_logger.logger
help with integrating functionality, but additional ones can be added.
Check PSIDetectorBase for the functions that are called during relevant function calls of # class DetectorInitError(Exception):
stage, unstage, trigger, stop and _init. # """Raised when initiation of the device class fails,
""" # due to missing device manager or not started in sim_mode."""
def __init__(self, *_args, parent: Device = None, **_kwargs) -> None:
self.parent = parent # class CustomDetectorMixin:
# """
def on_init(self) -> None: # Mixin class for custom detector logic
"""
Init sequence for the detector # This class is used to implement BL specific logic for the detector.
""" # It is used in the PSIDetectorBase class.
def on_stage(self) -> None: # For the integration of a new detector, the following functions should
""" # help with integrating functionality, but additional ones can be added.
Specify actions to be executed during stage in preparation for a scan.
self.parent.scaninfo already has all current parameters for the upcoming scan. # Check PSIDetectorBase for the functions that are called during relevant function calls of
# stage, unstage, trigger, stop and _init.
In case the backend service is writing data on disk, this step should include publishing # """
a file_event and file_message to BEC to inform the system where the data is written to.
# def __init__(self, *_args, parent: Device = None, **_kwargs) -> None:
IMPORTANT: # self.parent = parent
It must be safe to assume that the device is ready for the scan
to start immediately once this function is finished. # def on_init(self) -> None:
""" # """
# Init sequence for the detector
def on_unstage(self) -> None: # """
"""
Specify actions to be executed during unstage. # def on_stage(self) -> None:
# """
This step should include checking if the acqusition was successful, # Specify actions to be executed during stage in preparation for a scan.
and publishing the file location and file event message, # self.parent.scaninfo already has all current parameters for the upcoming scan.
with flagged done to BEC.
""" # In case the backend service is writing data on disk, this step should include publishing
# a file_event and file_message to BEC to inform the system where the data is written to.
def on_stop(self) -> None:
""" # IMPORTANT:
Specify actions to be executed during stop. # It must be safe to assume that the device is ready for the scan
This must also set self.parent.stopped to True. # to start immediately once this function is finished.
# """
This step should include stopping the detector and backend service.
""" # def on_unstage(self) -> None:
# """
def on_trigger(self) -> None | DeviceStatus: # Specify actions to be executed during unstage.
"""
Specify actions to be executed upon receiving trigger signal. # This step should include checking if the acqusition was successful,
Return a DeviceStatus object or None # and publishing the file location and file event message,
""" # with flagged done to BEC.
# """
def on_pre_scan(self) -> None:
""" # def on_stop(self) -> None:
Specify actions to be executed right before a scan starts. # """
# Specify actions to be executed during stop.
Only use if needed, and it is recommended to keep this function as short/fast as possible. # This must also set self.parent.stopped to True.
"""
# This step should include stopping the detector and backend service.
def on_complete(self) -> None | DeviceStatus: # """
"""
Specify actions to be executed when the scan is complete. # def on_trigger(self) -> None | DeviceStatus:
# """
This can for instance be to check with the detector and backend if all data is written succsessfully. # Specify actions to be executed upon receiving trigger signal.
""" # Return a DeviceStatus object or None
# """
# TODO make this a SUB event in the device manager
def publish_file_location( # def on_pre_scan(self) -> None:
self, # """
done: bool, # Specify actions to be executed right before a scan starts.
successful: bool,
filepath: str = None, # Only use if needed, and it is recommended to keep this function as short/fast as possible.
hinted_locations: dict = None, # """
metadata: dict = None,
) -> None: # def on_complete(self) -> None | DeviceStatus:
""" # """
Publish the filepath to REDIS. # Specify actions to be executed when the scan is complete.
We publish two events here: # This can for instance be to check with the detector and backend if all data is written succsessfully.
- file_event: event for the filewriter # """
- public_file: event for any secondary service (e.g. radial integ code)
# # TODO make this a SUB event in the device manager
Args: # def publish_file_location(
done (bool): True if scan is finished # self,
successful (bool): True if scan was successful # done: bool,
filepath (str): Optional, filepath to publish. If None, it will be taken from self.parent.filepath.get() # successful: bool,
hinted_locations (dict): Optional, dictionary with hinted locations; {dev_name : h5_entry} # filepath: str = None,
metadata (dict): additional metadata to publish # hinted_locations: dict = None,
""" # metadata: dict = None,
if metadata is None: # ) -> None:
metadata = {} # """
# Publish the filepath to REDIS.
if filepath is None:
file_path = self.parent.filepath.get() # We publish two events here:
# - file_event: event for the filewriter
msg = messages.FileMessage( # - public_file: event for any secondary service (e.g. radial integ code)
file_path=self.parent.filepath.get(),
hinted_locations=hinted_locations, # Args:
done=done, # done (bool): True if scan is finished
successful=successful, # successful (bool): True if scan was successful
metadata=metadata, # filepath (str): Optional, filepath to publish. If None, it will be taken from self.parent.filepath.get()
) # hinted_locations (dict): Optional, dictionary with hinted locations; {dev_name : h5_entry}
pipe = self.parent.connector.pipeline() # metadata (dict): additional metadata to publish
self.parent.connector.set_and_publish( # """
MessageEndpoints.public_file(self.parent.scaninfo.scan_id, self.parent.name), # if metadata is None:
msg, # metadata = {}
pipe=pipe,
) # if filepath is None:
self.parent.connector.set_and_publish( # file_path = self.parent.filepath.get()
MessageEndpoints.file_event(self.parent.name), msg, pipe=pipe
) # msg = messages.FileMessage(
pipe.execute() # file_path=self.parent.filepath.get(),
# hinted_locations=hinted_locations,
def wait_for_signals( # done=done,
self, # successful=successful,
signal_conditions: list[tuple], # metadata=metadata,
timeout: float, # )
check_stopped: bool = False, # pipe = self.parent.connector.pipeline()
interval: float = 0.05, # self.parent.connector.set_and_publish(
all_signals: bool = False, # MessageEndpoints.public_file(self.parent.scaninfo.scan_id, self.parent.name),
) -> bool: # msg,
""" # pipe=pipe,
Convenience wrapper to allow waiting for signals to reach a certain condition. # )
For EPICs PVs, an example usage is pasted at the bottom. # self.parent.connector.set_and_publish(
# MessageEndpoints.file_event(self.parent.name), msg, pipe=pipe
Args: # )
signal_conditions (list[tuple]): tuple of executable calls for conditions (get_current_state, condition) to check # pipe.execute()
timeout (float): timeout in seconds
interval (float): interval in seconds # def wait_for_signals(
all_signals (bool): True if all signals should be True, False if any signal should be True # self,
# signal_conditions: list[tuple],
Returns: # timeout: float,
bool: True if all signals are in the desired state, False if timeout is reached # check_stopped: bool = False,
# interval: float = 0.05,
>>> Example usage for EPICS PVs: # all_signals: bool = False,
>>> self.wait_for_signals(signal_conditions=[(self.acquiring.get, False)], timeout=5, interval=0.05, check_stopped=True, all_signals=True) # ) -> bool:
""" # """
# Convenience wrapper to allow waiting for signals to reach a certain condition.
timer = 0 # For EPICs PVs, an example usage is pasted at the bottom.
while True:
checks = [ # Args:
get_current_state() == condition # signal_conditions (list[tuple]): tuple of executable calls for conditions (get_current_state, condition) to check
for get_current_state, condition in signal_conditions # timeout (float): timeout in seconds
] # interval (float): interval in seconds
if check_stopped is True and self.parent.stopped is True: # all_signals (bool): True if all signals should be True, False if any signal should be True
return False
if (all_signals and all(checks)) or (not all_signals and any(checks)): # Returns:
return True # bool: True if all signals are in the desired state, False if timeout is reached
if timer > timeout:
return False # >>> Example usage for EPICS PVs:
time.sleep(interval) # >>> self.wait_for_signals(signal_conditions=[(self.acquiring.get, False)], timeout=5, interval=0.05, check_stopped=True, all_signals=True)
timer += interval # """
def wait_with_status( # timer = 0
self, # while True:
signal_conditions: list[tuple], # checks = [
timeout: float, # get_current_state() == condition
check_stopped: bool = False, # for get_current_state, condition in signal_conditions
interval: float = 0.05, # ]
all_signals: bool = False, # if check_stopped is True and self.parent.stopped is True:
exception_on_timeout: Exception = None, # return False
) -> DeviceStatus: # if (all_signals and all(checks)) or (not all_signals and any(checks)):
"""Utility function to wait for signals in a thread. # return True
Returns a DevicesStatus object that resolves either to set_finished or set_exception. # if timer > timeout:
The DeviceStatus is attached to the parent device, i.e. the detector object inheriting from PSIDetectorBase. # return False
# time.sleep(interval)
Usage: # timer += interval
This function should be used to wait for signals to reach a certain condition, especially in the context of
on_trigger and on_complete. If it is not used, functions may block and slow down the performance of BEC. # def wait_with_status(
It will return a DeviceStatus object that is to be returned from the function. Once the conditions are met, # self,
the DeviceStatus will be set to set_finished in case of success or set_exception in case of a timeout or exception. # signal_conditions: list[tuple],
The exception can be specified with the exception_on_timeout argument. The default exception is a TimeoutError. # timeout: float,
# check_stopped: bool = False,
Args: # interval: float = 0.05,
signal_conditions (list[tuple]): tuple of executable calls for conditions (get_current_state, condition) to check # all_signals: bool = False,
timeout (float): timeout in seconds # exception_on_timeout: Exception = None,
check_stopped (bool): True if stopped flag should be checked # ) -> DeviceStatus:
interval (float): interval in seconds # """Utility function to wait for signals in a thread.
all_signals (bool): True if all signals should be True, False if any signal should be True # Returns a DevicesStatus object that resolves either to set_finished or set_exception.
exception_on_timeout (Exception): Exception to raise on timeout # The DeviceStatus is attached to the paent device, i.e. the detector object inheriting from PSIDetectorBase.
Returns: # Usage:
DeviceStatus: DeviceStatus object that resolves either to set_finished or set_exception # This function should be used to wait for signals to reach a certain condition, especially in the context of
""" # on_trigger and on_complete. If it is not used, functions may block and slow down the performance of BEC.
if exception_on_timeout is None: # It will return a DeviceStatus object that is to be returned from the function. Once the conditions are met,
exception_on_timeout = DeviceTimeoutError( # the DeviceStatus will be set to set_finished in case of success or set_exception in case of a timeout or exception.
f"Timeout error for {self.parent.name} while waiting for signals {signal_conditions}" # The exception can be specified with the exception_on_timeout argument. The default exception is a TimeoutError.
)
# Args:
status = DeviceStatus(self.parent) # signal_conditions (list[tuple]): tuple of executable calls for conditions (get_current_state, condition) to check
# timeout (float): timeout in seconds
# utility function to wrap the wait_for_signals function # check_stopped (bool): True if stopped flag should be checked
def wait_for_signals_wrapper( # interval (float): interval in seconds
status: DeviceStatus, # all_signals (bool): True if all signals should be True, False if any signal should be True
signal_conditions: list[tuple], # exception_on_timeout (Exception): Exception to raise on timeout
timeout: float,
check_stopped: bool, # Returns:
interval: float, # DeviceStatus: DeviceStatus object that resolves either to set_finished or set_exception
all_signals: bool, # """
exception_on_timeout: Exception, # if exception_on_timeout is None:
): # exception_on_timeout = DeviceTimeoutError(
"""Convenient wrapper around wait_for_signals to set status based on the result. # f"Timeout error for {self.parent.name} while waiting for signals {signal_conditions}"
# )
Args:
status (DeviceStatus): DeviceStatus object to be set # status = DeviceStatus(self.parent)
signal_conditions (list[tuple]): tuple of executable calls for conditions (get_current_state, condition) to check
timeout (float): timeout in seconds # # utility function to wrap the wait_for_signals function
check_stopped (bool): True if stopped flag should be checked # def wait_for_signals_wrapper(
interval (float): interval in seconds # status: DeviceStatus,
all_signals (bool): True if all signals should be True, False if any signal should be True # signal_conditions: list[tuple],
exception_on_timeout (Exception): Exception to raise on timeout # timeout: float,
""" # check_stopped: bool,
try: # interval: float,
result = self.wait_for_signals( # all_signals: bool,
signal_conditions, timeout, check_stopped, interval, all_signals # exception_on_timeout: Exception,
) # ):
if result: # """Convenient wrapper around wait_for_signals to set status based on the result.
status.set_finished()
else: # Args:
if self.parent.stopped: # status (DeviceStatus): DeviceStatus object to be set
# INFO This will execute a callback to the parent device.stop() method # signal_conditions (list[tuple]): tuple of executable calls for conditions (get_current_state, condition) to check
status.set_exception(exc=DeviceStopError(f"{self.parent.name} was stopped")) # timeout (float): timeout in seconds
else: # check_stopped (bool): True if stopped flag should be checked
# INFO This will execute a callback to the parent device.stop() method # interval (float): interval in seconds
status.set_exception(exc=exception_on_timeout) # all_signals (bool): True if all signals should be True, False if any signal should be True
# pylint: disable=broad-except # exception_on_timeout (Exception): Exception to raise on timeout
except Exception as exc: # """
content = traceback.format_exc() # try:
logger.warning( # result = self.wait_for_signals(
f"Error in wait_for_signals in {self.parent.name}; Traceback: {content}" # signal_conditions, timeout, check_stopped, interval, all_signals
) # )
# INFO This will execute a callback to the parent device.stop() method # if result:
status.set_exception(exc=exc) # status.set_finished()
# else:
thread = threading.Thread( # if self.parent.stopped:
target=wait_for_signals_wrapper, # # INFO This will execute a callback to the parent device.stop() method
args=( # status.set_exception(exc=DeviceStopError(f"{self.parent.name} was stopped"))
status, # else:
signal_conditions, # # INFO This will execute a callback to the parent device.stop() method
timeout, # status.set_exception(exc=exception_on_timeout)
check_stopped, # # pylint: disable=broad-except
interval, # except Exception as exc:
all_signals, # content = traceback.format_exc()
exception_on_timeout, # logger.warning(
), # f"Error in wait_for_signals in {self.parent.name}; Traceback: {content}"
daemon=True, # )
) # # INFO This will execute a callback to the parent device.stop() method
thread.start() # status.set_exception(exc=exc)
return status
# thread = threading.Thread(
# target=wait_for_signals_wrapper,
class PSIDetectorBase(Device): # args=(
""" # status,
Abstract base class for SLS detectors # signal_conditions,
# timeout,
Class attributes: # check_stopped,
custom_prepare_cls (object): class for custom prepare logic (BL specific) # interval,
# all_signals,
Args: # exception_on_timeout,
prefix (str): EPICS PV prefix for component (optional) # ),
name (str): name of the device, as will be reported via read() # daemon=True,
kind (str): member of class 'ophydobj.Kind', defaults to Kind.normal # )
omitted -> readout ignored for read 'ophydobj.read()' # thread.start()
normal -> readout for read # return status
config -> config parameter for 'ophydobj.read_configuration()'
hinted -> which attribute is readout for read
parent (object): instance of the parent device # class PSIDetectorBase(Device):
device_manager (object): bec device manager # """
**kwargs: keyword arguments # Abstract base class for SLS detectors
"""
# Class attributes:
filepath = Component(SetableSignal, value="", kind=Kind.config) # custom_prepare_cls (object): class for custom prepare logic (BL specific)
custom_prepare_cls = CustomDetectorMixin # Args:
# prefix (str): EPICS PV prefix for component (optional)
def __init__(self, prefix="", *, name, kind=None, parent=None, device_manager=None, **kwargs): # name (str): name of the device, as will be reported via read()
super().__init__(prefix=prefix, name=name, kind=kind, parent=parent, **kwargs) # kind (str): member of class 'ophydobj.Kind', defaults to Kind.normal
self.stopped = False # omitted -> readout ignored for read 'ophydobj.read()'
self.name = name # normal -> readout for read
self.service_cfg = None # config -> config parameter for 'ophydobj.read_configuration()'
self.scaninfo = None # hinted -> which attribute is readout for read
self.filewriter = None # parent (object): instance of the parent device
# device_manager (object): bec device manager
if not issubclass(self.custom_prepare_cls, CustomDetectorMixin): # **kwargs: keyword arguments
raise DetectorInitError("Custom prepare class must be subclass of CustomDetectorMixin") # """
self.custom_prepare = self.custom_prepare_cls(parent=self, **kwargs)
# filepath = Component(SetableSignal, value="", kind=Kind.config)
if device_manager:
self._update_service_config() # custom_prepare_cls = CustomDetectorMixin
self.device_manager = device_manager
else: # def __init__(self, prefix="", *, name, kind=None, parent=None, device_manager=None, **kwargs):
self.device_manager = bec_utils.DMMock() # super().__init__(prefix=prefix, name=name, kind=kind, parent=parent, **kwargs)
base_path = kwargs["basepath"] if "basepath" in kwargs else "." # self.stopped = False
self.service_cfg = {"base_path": os.path.abspath(base_path)} # self.name = name
# self.service_cfg = None
self.connector = self.device_manager.connector # self.scaninfo = None
self._update_scaninfo() # self.filewriter = None
self._update_filewriter()
self._init() # if not issubclass(self.custom_prepare_cls, CustomDetectorMixin):
# raise DetectorInitError("Custom prepare class must be subclass of CustomDetectorMixin")
def _update_filewriter(self) -> None: # self.custom_prepare = self.custom_prepare_cls(parent=self, **kwargs)
"""Update filewriter with service config"""
self.filewriter = FileWriter(service_config=self.service_cfg, connector=self.connector) # if device_manager:
# self._update_service_config()
def _update_scaninfo(self) -> None: # self.device_manager = device_manager
"""Update scaninfo from BecScaninfoMixing # else:
This depends on device manager and operation/sim_mode # self.device_manager = bec_utils.DMMock()
""" # base_path = kwargs["basepath"] if "basepath" in kwargs else "."
self.scaninfo = BecScaninfoMixin(self.device_manager) # self.service_cfg = {"base_path": os.path.abspath(base_path)}
self.scaninfo.load_scan_metadata()
# self.connector = self.device_manager.connector
def _update_service_config(self) -> None: # self._update_scaninfo()
"""Update service config from BEC service config # self._update_filewriter()
# self._init()
If bec services are not running and SERVICE_CONFIG is NONE, we fall back to the current directory.
""" # def _update_filewriter(self) -> None:
# pylint: disable=import-outside-toplevel # """Update filewriter with service config"""
from bec_lib.bec_service import SERVICE_CONFIG # self.filewriter = FileWriter(service_config=self.service_cfg, connector=self.connector)
if SERVICE_CONFIG: # def _update_scaninfo(self) -> None:
self.service_cfg = SERVICE_CONFIG.config["service_config"]["file_writer"] # """Update scaninfo from BecScaninfoMixing
return # This depends on device manager and operation/sim_mode
self.service_cfg = {"base_path": os.path.abspath(".")} # """
# self.scaninfo = BecScaninfoMixin(self.device_manager)
def check_scan_id(self) -> None: # self.scaninfo.load_scan_metadata()
"""Checks if scan_id has changed and set stopped flagged to True if it has."""
old_scan_id = self.scaninfo.scan_id # def _update_service_config(self) -> None:
self.scaninfo.load_scan_metadata() # """Update service config from BEC service config
if self.scaninfo.scan_id != old_scan_id:
self.stopped = True # If bec services are not running and SERVICE_CONFIG is NONE, we fall back to the current directory.
# """
def _init(self) -> None: # # pylint: disable=import-outside-toplevel
"""Initialize detector, filewriter and set default parameters""" # from bec_lib.bec_service import SERVICE_CONFIG
self.custom_prepare.on_init()
# if SERVICE_CONFIG:
def stage(self) -> list[object]: # self.service_cfg = SERVICE_CONFIG.config["service_config"]["file_writer"]
""" # return
Stage device in preparation for a scan. # self.service_cfg = {"base_path": os.path.abspath(".")}
First we check if the device is already staged. Stage is idempotent,
if staged twice it should raise (we let ophyd.Device handle the raise here). # def check_scan_id(self) -> None:
We reset the stopped flag and get the scaninfo from BEC, before calling custom_prepare.on_stage. # """Checks if scan_id has changed and set stopped flagged to True if it has."""
# old_scan_id = self.scaninfo.scan_id
Returns: # self.scaninfo.load_scan_metadata()
list(object): list of objects that were staged # if self.scaninfo.scan_id != old_scan_id:
# self.stopped = True
"""
if self._staged != Staged.no: # def _init(self) -> None:
return super().stage() # """Initialize detector, filewriter and set default parameters"""
self.stopped = False # self.custom_prepare.on_init()
self.scaninfo.load_scan_metadata()
self.custom_prepare.on_stage() # def stage(self) -> list[object]:
return super().stage() # """
# Stage device in preparation for a scan.
def pre_scan(self) -> None: # First we check if the device is already staged. Stage is idempotent,
"""Pre-scan logic. # if staged twice it should raise (we let ophyd.Device handle the raise here).
# We reset the stopped flag and get the scaninfo from BEC, before calling custom_prepare.on_stage.
This function will be called from BEC directly before the scan core starts, and should only implement
time-critical actions. Therefore, it should also be kept as short/fast as possible. # Returns:
I.e. Arming a detector in case there is a risk of timing out. # list(object): list of objects that were staged
"""
self.custom_prepare.on_pre_scan() # """
# if self._staged != Staged.no:
def trigger(self) -> DeviceStatus: # return super().stage()
"""Trigger the detector, called from BEC.""" # self.stopped = False
# pylint: disable=assignment-from-no-return # self.scaninfo.load_scan_metadata()
status = self.custom_prepare.on_trigger() # self.custom_prepare.on_stage()
if isinstance(status, DeviceStatus): # return super().stage()
return status
return super().trigger() # def pre_scan(self) -> None:
# """Pre-scan logic.
def complete(self) -> None:
"""Complete the acquisition, called from BEC. # This function will be called from BEC directly before the scan core starts, and should only implement
# time-critical actions. Therefore, it should also be kept as short/fast as possible.
This function is called after the scan is complete, just before unstage. # I.e. Arming a detector in case there is a risk of timing out.
We can check here with the data backend and detector if the acquisition successfully finished. # """
# self.custom_prepare.on_pre_scan()
Actions are implemented in custom_prepare.on_complete since they are beamline specific.
""" # def trigger(self) -> DeviceStatus:
# pylint: disable=assignment-from-no-return # """Trigger the detector, called from BEC."""
status = self.custom_prepare.on_complete() # # pylint: disable=assignment-from-no-return
if isinstance(status, DeviceStatus): # status = self.custom_prepare.on_trigger()
return status # if isinstance(status, DeviceStatus):
status = DeviceStatus(self) # return status
status.set_finished() # return super().trigger()
return status
# def complete(self) -> None:
def unstage(self) -> list[object]: # """Complete the acquisition, called from BEC.
"""
Unstage device after a scan. # This function is called after the scan is complete, just before unstage.
# We can check here with the data backend and detector if the acquisition successfully finished.
We first check if the scanID has changed, thus, the scan was unexpectedly interrupted but the device was not stopped.
If that is the case, the stopped flag is set to True, which will immediately unstage the device. # Actions are implemented in custom_prepare.on_complete since they are beamline specific.
# """
Custom_prepare.on_unstage is called to allow for BL specific logic to be executed. # # pylint: disable=assignment-from-no-return
# status = self.custom_prepare.on_complete()
Returns: # if isinstance(status, DeviceStatus):
list(object): list of objects that were unstaged # return status
""" # status = DeviceStatus(self)
self.check_scan_id() # status.set_finished()
self.custom_prepare.on_unstage() # return status
self.stopped = False
return super().unstage() # def unstage(self) -> list[object]:
# """
def stop(self, *, success=False) -> None: # Unstage device after a scan.
"""
Stop the scan, with camera and file writer # We first check if the scanID has changed, thus, the scan was unexpectedly interrupted but the device was not stopped.
# If that is the case, the stopped flag is set to True, which will immediately unstage the device.
"""
self.custom_prepare.on_stop() # Custom_prepare.on_unstage is called to allow for BL specific logic to be executed.
super().stop(success=success)
self.stopped = True # Returns:
# list(object): list of objects that were unstaged
# """
# self.check_scan_id()
# self.custom_prepare.on_unstage()
# self.stopped = False
# return super().unstage()
# def stop(self, *, success=False) -> None:
# """
# Stop the scan, with camera and file writer
# """
# self.custom_prepare.on_stop()
# super().stop(success=success)
# self.stopped = True

View File

@ -0,0 +1,442 @@
"""This module contains the base class for SLS detectors. We follow the approach to integrate
PSI detectors into the BEC system based on this base class. The base class is used to implement
certain methods that are expected by BEC, such as stage, unstage, trigger, stop, etc...
We use composition with a custom prepare class to implement BL specific logic for the detector.
The beamlines need to inherit from the CustomDetectorMixing for their mixin classes."""
import os
import threading
import time
import traceback
from bec_lib.file_utils import FileWriter
from bec_lib.logger import bec_logger
from ophyd import Device, DeviceStatus, Kind
from ophyd.device import Staged
from ophyd_devices.utils import bec_utils
from ophyd_devices.utils.bec_scaninfo_mixin import BecScaninfoMixin
from ophyd_devices.utils.errors import DeviceStopError, DeviceTimeoutError
logger = bec_logger.logger
class PSIDeviceBaseError(Exception):
"""Error specific for the PSIDeviceBase class."""
class CustomPrepare:
"""
Mixin class for custom detector logic
This class is used to implement BL specific logic for the detector.
It is used in the PSIDetectorBase class.
For the integration of a new detector, the following functions should
help with integrating functionality, but additional ones can be added.
Check PSIDetectorBase for the functions that are called during relevant function calls of
stage, unstage, trigger, stop and _init.
"""
def __init__(self, *_args, parent: Device = None, **_kwargs) -> None:
self.parent = parent
def on_init(self) -> None:
"""
Init sequence for the Device. This method should be fast and not rely on setting any signals.
"""
def on_wait_for_connection(self) -> None:
"""
Specify actions to be executed when waiting for the device to connect.
The on method is called after the device is connected, thus, signals are ready to be set.
This should be used to set initial values for signals, e.g. setting the velocity of a motor.
"""
def on_stage(self) -> None:
"""
Specify actions to be executed during stage in preparation for a scan.
self.parent.scaninfo already has all current parameters for the upcoming scan.
In case the backend service is writing data on disk, this step should include publishing
a file_event and file_message to BEC to inform the system where the data is written to.
IMPORTANT:
It must be safe to assume that the device is ready for the scan
to start immediately once this function is finished.
"""
def on_unstage(self) -> None:
"""
Specify actions to be executed during unstage.
This step should include checking if the acqusition was successful,
and publishing the file location and file event message,
with flagged done to BEC.
"""
def on_stop(self) -> None:
"""
Specify actions to be executed during stop.
This must also set self.parent.stopped to True.
This step should include stopping the detector and backend service.
"""
def on_trigger(self) -> None | DeviceStatus:
"""
Specify actions to be executed upon receiving trigger signal.
Return a DeviceStatus object or None
"""
def on_pre_scan(self) -> None:
"""
Specify actions to be executed right before a scan starts.
Only use if needed, and it is recommended to keep this function as short/fast as possible.
"""
def on_complete(self) -> None | DeviceStatus:
"""
Specify actions to be executed when the scan is complete.
This can for instance be to check with the detector and backend if all data is written succsessfully.
"""
def on_kickoff(self) -> None | DeviceStatus:
"""Flyer specific method to kickoff the device.
Actions should be fast or, if time consuming,
implemented non-blocking and return a DeviceStatus object.
"""
def wait_for_signals(
self,
signal_conditions: list[tuple],
timeout: float,
check_stopped: bool = False,
interval: float = 0.05,
all_signals: bool = False,
) -> bool:
"""
Convenience wrapper to allow waiting for signals to reach a certain condition.
For EPICs PVs, an example usage is pasted at the bottom.
Args:
signal_conditions (list[tuple]): tuple of executable calls for conditions (get_current_state, condition) to check
timeout (float): timeout in seconds
interval (float): interval in seconds
all_signals (bool): True if all signals should be True, False if any signal should be True
Returns:
bool: True if all signals are in the desired state, False if timeout is reached
>>> Example usage for EPICS PVs:
>>> self.wait_for_signals(signal_conditions=[(self.acquiring.get, False)], timeout=5, interval=0.05, check_stopped=True, all_signals=True)
"""
timer = 0
while True:
checks = [
get_current_state() == condition
for get_current_state, condition in signal_conditions
]
if check_stopped is True and self.parent.stopped is True:
return False
if (all_signals and all(checks)) or (not all_signals and any(checks)):
return True
if timer > timeout:
return False
time.sleep(interval)
timer += interval
def wait_with_status(
self,
signal_conditions: list[tuple],
timeout: float,
check_stopped: bool = False,
interval: float = 0.05,
all_signals: bool = False,
exception_on_timeout: Exception = None,
) -> DeviceStatus:
"""Utility function to wait for signals in a thread.
Returns a DevicesStatus object that resolves either to set_finished or set_exception.
The DeviceStatus is attached to the paent device, i.e. the detector object inheriting from PSIDetectorBase.
Usage:
This function should be used to wait for signals to reach a certain condition, especially in the context of
on_trigger and on_complete. If it is not used, functions may block and slow down the performance of BEC.
It will return a DeviceStatus object that is to be returned from the function. Once the conditions are met,
the DeviceStatus will be set to set_finished in case of success or set_exception in case of a timeout or exception.
The exception can be specified with the exception_on_timeout argument. The default exception is a TimeoutError.
Args:
signal_conditions (list[tuple]): tuple of executable calls for conditions (get_current_state, condition) to check
timeout (float): timeout in seconds
check_stopped (bool): True if stopped flag should be checked
interval (float): interval in seconds
all_signals (bool): True if all signals should be True, False if any signal should be True
exception_on_timeout (Exception): Exception to raise on timeout
Returns:
DeviceStatus: DeviceStatus object that resolves either to set_finished or set_exception
"""
if exception_on_timeout is None:
exception_on_timeout = DeviceTimeoutError(
f"Timeout error for {self.parent.name} while waiting for signals {signal_conditions}"
)
status = DeviceStatus(self.parent)
# utility function to wrap the wait_for_signals function
def wait_for_signals_wrapper(
status: DeviceStatus,
signal_conditions: list[tuple],
timeout: float,
check_stopped: bool,
interval: float,
all_signals: bool,
exception_on_timeout: Exception,
):
"""Convenient wrapper around wait_for_signals to set status based on the result.
Args:
status (DeviceStatus): DeviceStatus object to be set
signal_conditions (list[tuple]): tuple of executable calls for conditions (get_current_state, condition) to check
timeout (float): timeout in seconds
check_stopped (bool): True if stopped flag should be checked
interval (float): interval in seconds
all_signals (bool): True if all signals should be True, False if any signal should be True
exception_on_timeout (Exception): Exception to raise on timeout
"""
try:
result = self.wait_for_signals(
signal_conditions, timeout, check_stopped, interval, all_signals
)
if result:
status.set_finished()
else:
if self.parent.stopped:
# INFO This will execute a callback to the parent device.stop() method
status.set_exception(exc=DeviceStopError(f"{self.parent.name} was stopped"))
else:
# INFO This will execute a callback to the parent device.stop() method
status.set_exception(exc=exception_on_timeout)
# pylint: disable=broad-except
except Exception as exc:
content = traceback.format_exc()
logger.warning(
f"Error in wait_for_signals in {self.parent.name}; Traceback: {content}"
)
# INFO This will execute a callback to the parent device.stop() method
status.set_exception(exc=exc)
thread = threading.Thread(
target=wait_for_signals_wrapper,
args=(
status,
signal_conditions,
timeout,
check_stopped,
interval,
all_signals,
exception_on_timeout,
),
daemon=True,
)
thread.start()
return status
class PSIDeviceBase(Device):
"""
Base class for custom device integrations at PSI. This class wraps around the ophyd's standard
set of methods, providing hooks for custom logic to be implemented in the custom_prepare_cls.
Please check the device section in the developer section within the BEC documentation
(https://bec.readthedocs.io/en/latest/) for more information on how to integrate a new device using
this base class.
"""
custom_prepare_cls = CustomPrepare
# It can not hurt to define all just in case, or will it?
SUB_READBACK = "readback"
SUB_VALUE = "value"
SUB_DONE_MOVING = "done_moving"
SUB_MOTOR_IS_MOVING = "motor_is_moving"
SUB_PROGRESS = "progress"
SUB_FILE_EVENT = "file_event"
SUB_DEVICE_MONITOR_1D = "device_monitor_1d"
SUB_DEVICE_MONITOR_2D = "device_monitor_2d"
_default_sub = SUB_VALUE
def __init__(
self,
name: str,
prefix: str = "",
kind: Kind | None = None,
parent=None,
device_manager=None,
**kwargs,
):
super().__init__(prefix=prefix, name=name, kind=kind, parent=parent, **kwargs)
self._stopped = False
self.service_cfg = None
self.scaninfo = None
self.filewriter = None
if not issubclass(self.custom_prepare_cls, CustomPrepare):
raise PSIDeviceBaseError(
f"Custom prepare class must be subclass of CustomDetectorMixin, provided: {self.custom_prepare_cls}"
)
self.custom_prepare = self.custom_prepare_cls(parent=self, **kwargs)
if device_manager:
self._update_service_config()
self.device_manager = device_manager
else:
self.device_manager = bec_utils.DMMock()
base_path = kwargs["basepath"] if "basepath" in kwargs else "."
self.service_cfg = {"base_path": os.path.abspath(base_path)}
self.connector = self.device_manager.connector
self._update_scaninfo()
self._update_filewriter()
self._init()
@property
def stopped(self) -> bool:
"""Flag to indicate if the device is stopped"""
return self._stopped
@stopped.setter
def stopped(self, value: bool) -> None:
"""Set the stopped flag"""
self._stopped = value
def _update_filewriter(self) -> None:
"""Update filewriter with service config"""
self.filewriter = FileWriter(service_config=self.service_cfg, connector=self.connector)
def _update_scaninfo(self) -> None:
"""Update scaninfo from BecScaninfoMixing
This depends on device manager and operation/sim_mode
"""
self.scaninfo = BecScaninfoMixin(self.device_manager)
self.scaninfo.load_scan_metadata()
def _update_service_config(self) -> None:
"""Update service config from BEC service config
If bec services are not running and SERVICE_CONFIG is NONE, we fall back to the current directory.
"""
# pylint: disable=import-outside-toplevel
from bec_lib.bec_service import SERVICE_CONFIG
if SERVICE_CONFIG:
self.service_cfg = SERVICE_CONFIG.config["service_config"]["file_writer"]
return
self.service_cfg = {"base_path": os.path.abspath(".")}
def check_scan_id(self) -> None:
"""Checks if scan_id has changed and set stopped flagged to True if it has."""
old_scan_id = self.scaninfo.scan_id
self.scaninfo.load_scan_metadata()
if self.scaninfo.scan_id != old_scan_id:
self.stopped = True
def _init(self) -> None:
"""Initialize detector, filewriter and set default parameters"""
self.custom_prepare.on_init()
def stage(self) -> list[object]:
"""
Stage device in preparation for a scan.
First we check if the device is already staged. Stage is idempotent,
if staged twice it should raise (we let ophyd.Device handle the raise here).
We reset the stopped flag and get the scaninfo from BEC, before calling custom_prepare.on_stage.
Returns:
list(object): list of objects that were staged
"""
if self._staged != Staged.no:
return super().stage()
self.stopped = False
self.scaninfo.load_scan_metadata()
self.custom_prepare.on_stage()
return super().stage()
def pre_scan(self) -> None:
"""Pre-scan logic.
This function will be called from BEC directly before the scan core starts, and should only implement
time-critical actions. Therefore, it should also be kept as short/fast as possible.
I.e. Arming a detector in case there is a risk of timing out.
"""
self.custom_prepare.on_pre_scan()
def trigger(self) -> DeviceStatus:
"""Trigger the detector, called from BEC."""
# pylint: disable=assignment-from-no-return
status = self.custom_prepare.on_trigger()
if isinstance(status, DeviceStatus):
return status
return super().trigger()
def complete(self) -> None:
"""Complete the acquisition, called from BEC.
This function is called after the scan is complete, just before unstage.
We can check here with the data backend and detector if the acquisition successfully finished.
Actions are implemented in custom_prepare.on_complete since they are beamline specific.
"""
# pylint: disable=assignment-from-no-return
status = self.custom_prepare.on_complete()
if isinstance(status, DeviceStatus):
return status
status = DeviceStatus(self)
status.set_finished()
return status
def unstage(self) -> list[object]:
"""
Unstage device after a scan.
We first check if the scanID has changed, thus, the scan was unexpectedly interrupted but the device was not stopped.
If that is the case, the stopped flag is set to True, which will immediately unstage the device.
Custom_prepare.on_unstage is called to allow for BL specific logic to be executed.
Returns:
list(object): list of objects that were unstaged
"""
self.check_scan_id()
self.custom_prepare.on_unstage()
self.stopped = False
return super().unstage()
def stop(self, *, success=False) -> None:
"""
Stop the scan, with camera and file writer
"""
self.custom_prepare.on_stop()
super().stop(success=success)
self.stopped = True
def wait_for_connection(self, all_signals=False, timeout=5) -> None:
super().wait_for_connection(all_signals, timeout)
self.custom_prepare.on_wait_for_connection()
def kickoff(self) -> None:
"""Kickoff the device"""
status = self.custom_prepare.on_kickoff()
if isinstance(status, DeviceStatus):
return status
status = DeviceStatus(self)
status.set_finished()
return status