From f10060bceefac2a776e0d5d9300770b33c2e8ac0 Mon Sep 17 00:00:00 2001 From: appel_c Date: Sat, 7 Dec 2024 10:38:36 +0100 Subject: [PATCH] docs: Update docstrings for base class --- .../base_classes/bec_device_base.py | 523 ++++++++++++++++++ .../base_classes/psi_detector_base.py | 6 +- .../base_classes/psi_device_base.py | 442 --------------- 3 files changed, 526 insertions(+), 445 deletions(-) create mode 100644 ophyd_devices/interfaces/base_classes/bec_device_base.py delete mode 100644 ophyd_devices/interfaces/base_classes/psi_device_base.py diff --git a/ophyd_devices/interfaces/base_classes/bec_device_base.py b/ophyd_devices/interfaces/base_classes/bec_device_base.py new file mode 100644 index 0000000..15f535a --- /dev/null +++ b/ophyd_devices/interfaces/base_classes/bec_device_base.py @@ -0,0 +1,523 @@ +""" +This module contains the base class for custom device integrations at PSI. +Please check the device section in BEC's developer documentation +(https://bec.readthedocs.io/en/latest/) for more information about device integration. +""" + +import os +import threading +import time +import traceback +from typing import Generic, TypeVar + +from bec_lib.file_utils import FileWriter +from bec_lib.logger import bec_logger +from ophyd import Device, DeviceStatus, Kind +from ophyd.device import Staged + +from ophyd_devices.utils import bec_utils +from ophyd_devices.utils.bec_scaninfo_mixin import BecScaninfoMixin +from ophyd_devices.utils.errors import DeviceStopError, DeviceTimeoutError + +logger = bec_logger.logger + +T = TypeVar("T", bound="BECDeviceBase") + + +class BECDeviceBaseError(Exception): + """Error class for BECDeviceBase.""" + + +class CustomPrepare(Generic[T]): + """Custom prepare class for beamline specific logic. + + This class provides a set of hooks for beamline specific logic to be implemented. + BECDeviceBase will be injected as the parent device. + + To implement custom logic, inherit from this class and implement the desired methods. + If the __init__ method is overwritten, please ensure the proper initialisation. + It is important to pass the parent device to the custom prepare class as this + will allow the custom prepare class to access the parent device with all its signals. + """ + + def __init__(self, *_args, parent: T = None, **_kwargs) -> None: + """ + Initialize the custom prepare class. + + Args: + parent (BECDeviceBase): The parent device which gives access to all methods and signals. + """ + self.parent = parent + + def on_init(self) -> None: + """ + Hook for beamline specific logic during class initialization. + + This method is called during the initialization of the device class. + It should not be used to set any of the class's signals as they may not + be connected yet. + """ + pass + + def on_wait_for_connection(self) -> None: + """ + Hook for beamline specific logic during the wait_for_connection method. + + This method is called after Ophyd's wait_for_connection method was called, + meaning that signals will be connected at this point. It can be used to check + signal values, or set default values for those. + """ + + def on_stage(self) -> None: + """ + Hook for beamline specific logic during the stage method. + + This method is called during the stage method of the device class. + It is used to implement logic in preparation for a scan.""" + + def on_unstage(self) -> None: + """ + Hook for beamline specific logic during the unstage method. + + This method is called during the unstage method of the device class. + It is used to implement logic to clean up the device after a scan. + """ + + def on_stop(self) -> None: + """Hook for beamline specific logic during the stop method.""" + + def on_trigger(self) -> None | DeviceStatus: + """ + Hook for beamline specific logic during the trigger method. + + This method has to be non-blocking, and if time-consuming actions are necessary, + they should be implemented asynchronously. Please check the wait_with_status + method to implement asynchronous checks. + + The softwareTrigger config value needs to be set to True to indicate to BEC + that the device should be triggered from the software during the scan. + + Returns: + DeviceStatus: DeviceStatus object that BEC will use to check if the trigger was successful + """ + + def on_pre_scan(self) -> None: + """ + Hook for beamline specific logic during the pre_scan method. + + This method is called from BEC just before the scan core starts, and be used + to execute time-critical actions, e.g. arming a detector in case there is a risk of timing out. + Note, this method should not be used to implement blocking logic. + """ + + def on_complete(self) -> None | DeviceStatus: + """ + Hook for beamline specific logic during the complete method. + + This method is used to check whether the device has successfully completed its acquisition. + For example, a detector may want to check if it has received the correct number of frames, or + if it's data backend has finished writing the data to disk. The method should be implemented + asynchronously. Please check the wait_with_status method on how to implement asynchronous checks. + + Returns: + DeviceStatus: DeviceStatus object that BEC will use to check if the device has successfully completed + """ + + def on_kickoff(self) -> None | DeviceStatus: + """ + Hook for beamline specific logic during the kickoff method. + + This method is called to kickoff the flyer acquisition. BEC will not call this method in general + for its scans, but only if the scan explicitly implements this. The method should be non-blocking, + and if time-consuming actions are necessary, they should be implemented asynchronously. + Please check the wait_with_status method on how to implement asynchronous checks. + + Returns: + DeviceStatus: DeviceStatus object that BEC will use to check if the kickoff was successful + """ + + def wait_for_signals( + self, + signal_conditions: list[tuple], + timeout: float, + check_stopped: bool = False, + interval: float = 0.05, + all_signals: bool = False, + ) -> bool: + """ + Utility method to implement waiting for signals to reach a certain condition. It accepts + a list of conditions passed as tuples of executable calls for conditions (get_current_state, condition) to check. + It can further be specified if all signals should be True or if any signal should be True. + If the timeout is reached, it will return False. + + Args: + signal_conditions (list[tuple]): tuple of executable calls for conditions (get_current_state, condition) to check + timeout (float): timeout in seconds + check_stopped (bool): True if stopped flag should be checked + interval (float): interval in seconds + all_signals (bool): True if all signals should be True, False if any signal should be True + + Returns: + bool: True if all signals are in the desired state, False if timeout is reached + + Example: + >>> self.wait_for_signals(signal_conditions=[(self.acquiring.get, False)], timeout=5, interval=0.05, check_stopped=True, all_signals=True) + """ + + timer = 0 + while True: + checks = [ + get_current_state() == condition + for get_current_state, condition in signal_conditions + ] + if check_stopped is True and self.parent.stopped is True: + return False + if (all_signals and all(checks)) or (not all_signals and any(checks)): + return True + if timer > timeout: + return False + time.sleep(interval) + timer += interval + + def wait_with_status( + self, + signal_conditions: list[tuple], + timeout: float, + check_stopped: bool = False, + interval: float = 0.05, + all_signals: bool = False, + exception_on_timeout: Exception = None, + ) -> DeviceStatus: + """ + Utility method to implement asynchronous waiting for signals to reach a certain condition. + It accepts a list of conditions passed as tuples of executable calls. + + Please check the wait_for_signals method as it is used to implement the waiting logic. + It returns a DeviceStatus object that can be used to check if the asynchronous action is done + through 'status.done', and if it was successful through 'status.success'. An exception can be + passed to the method which will be raised if the timeout is reached. If the device was stopped + during the waiting, a DeviceStopError will be raised. + + Args: + signal_conditions (list[tuple]): tuple of executable calls for conditions (get_current_state, condition) to check + timeout (float): timeout in seconds + check_stopped (bool): True if stopped flag should be checked + interval (float): interval in seconds + all_signals (bool): True if all signals should be True, False if any signal should be True + exception_on_timeout (Exception): Exception to raise on timeout + + Returns: + DeviceStatus: DeviceStatus object to check the state of the asynchronous action (status.done, status.success) + + Example: + >>> status = self.wait_with_status(signal_conditions=[(self.acquiring.get, False)], timeout=5, interval=0.05, check_stopped=True, all_signals=True) + """ + if exception_on_timeout is None: + exception_on_timeout = DeviceTimeoutError( + f"Timeout error for {self.parent.name} while waiting for signals {signal_conditions}" + ) + + status = DeviceStatus(self.parent) + + # utility function to wrap the wait_for_signals function + def wait_for_signals_wrapper( + status: DeviceStatus, + signal_conditions: list[tuple], + timeout: float, + check_stopped: bool, + interval: float, + all_signals: bool, + exception_on_timeout: Exception, + ): + try: + result = self.wait_for_signals( + signal_conditions, timeout, check_stopped, interval, all_signals + ) + if result: + status.set_finished() + else: + if self.parent.stopped: + # INFO This will execute a callback to the parent device.stop() method + status.set_exception(exc=DeviceStopError(f"{self.parent.name} was stopped")) + else: + # INFO This will execute a callback to the parent device.stop() method + status.set_exception(exc=exception_on_timeout) + # pylint: disable=broad-except + except Exception as exc: + content = traceback.format_exc() + logger.warning( + f"Error in wait_for_signals in {self.parent.name}; Traceback: {content}" + ) + # INFO This will execute a callback to the parent device.stop() method + status.set_exception(exc=exc) + + thread = threading.Thread( + target=wait_for_signals_wrapper, + args=( + status, + signal_conditions, + timeout, + check_stopped, + interval, + all_signals, + exception_on_timeout, + ), + daemon=True, + ) + thread.start() + return status + + +class BECDeviceBase(Device): + """ + Base class for custom device integrations at PSI. This class wraps around the ophyd's standard + set of methods, providing hooks for custom logic to be implemented in the custom_prepare_cls. + + Please check the device section in BEC's developer documentation + (https://bec.readthedocs.io/en/latest/) for more information about device integration. + """ + + custom_prepare_cls = CustomPrepare + + # All possible subscription types that the Device Manager subscribes to + # Run the command _run_subs(sub_type=self.SUB_VALUE, value=value) to trigger + # the subscription of type value. Please be aware that the signature of + # the subscription callbacks needs to be matched. + SUB_READBACK = "readback" + SUB_VALUE = "value" + SUB_DONE_MOVING = "done_moving" + SUB_MOTOR_IS_MOVING = "motor_is_moving" + SUB_PROGRESS = "progress" + SUB_FILE_EVENT = "file_event" + SUB_DEVICE_MONITOR_1D = "device_monitor_1d" + SUB_DEVICE_MONITOR_2D = "device_monitor_2d" + _default_sub = SUB_VALUE + + def __init__( + self, + name: str, + prefix: str = "", + kind: Kind | None = None, + parent=None, + device_manager=None, + **kwargs, + ): + """ + Initialize the device. + + Args: + name (str): name of the device + prefix (str): prefix of the device + kind (Kind): kind of the device + parent (Device): parent device + device_manager (DeviceManager): device manager. BEC will inject the device manager as a dependency. + """ + super().__init__(prefix=prefix, name=name, kind=kind, parent=parent, **kwargs) + self._stopped = False + self.service_cfg = None + self.scaninfo = None + self.filewriter = None + + if not issubclass(self.custom_prepare_cls, CustomPrepare): + raise BECDeviceBaseError( + f"Custom prepare class must be subclass of CustomDetectorMixin, provided: {self.custom_prepare_cls}" + ) + self.custom_prepare = self.custom_prepare_cls(parent=self, **kwargs) + + if device_manager: + self._update_service_config() + self.device_manager = device_manager + else: + # If device manager is not provided through dependency injection + # A mock device manager is created. This is necessary to be able + # to use the device class in a standalone context without BEC. + self.device_manager = bec_utils.DMMock() + base_path = kwargs["basepath"] if "basepath" in kwargs else "." + self.service_cfg = {"base_path": os.path.abspath(base_path)} + + self.connector = self.device_manager.connector + self._update_scaninfo() + self._update_filewriter() + self._init() + + @property + def stopped(self) -> bool: + """Property to indicate if the device was stopped.""" + return self._stopped + + @stopped.setter + def stopped(self, value: bool) -> None: + self._stopped = value + + def _update_filewriter(self) -> None: + """Initialise the file writer utility class.""" + self.filewriter = FileWriter(service_config=self.service_cfg, connector=self.connector) + + def _update_scaninfo(self) -> None: + """Initialise the utility class to get scan metadata from BEC.""" + self.scaninfo = BecScaninfoMixin(self.device_manager) + self.scaninfo.load_scan_metadata() + + def _update_service_config(self) -> None: + """Update the Service Config. This has the info on where REDIS and other services are running.""" + # pylint: disable=import-outside-toplevel + from bec_lib.bec_service import SERVICE_CONFIG + + if SERVICE_CONFIG: + self.service_cfg = SERVICE_CONFIG.config["service_config"]["file_writer"] + return + self.service_cfg = {"base_path": os.path.abspath(".")} + + def check_scan_id(self) -> None: + """ + Check if the scan ID has changed, if yes, set the stopped property to True. + + The idea is that if the scan ID on the device and the scan ID from BEC are different, + the device is out of sync with the scan and should be stopped. + """ + old_scan_id = self.scaninfo.scan_id + self.scaninfo.load_scan_metadata() + if self.scaninfo.scan_id != old_scan_id: + self.stopped = True + + def _init(self) -> None: + """ + Hook for beamline specific logic during class initialization. + + Please to not set any of the class's signals during intialisation, but + instead use the wait_for_connection. + """ + self.custom_prepare.on_init() + + def wait_for_connection(self, all_signals=False, timeout=5) -> None: + """ + Thin wrapper around ophyd's wait_for_connection. + + Calling Ophyd's wait_for_connection will ensure that signals are connected. BEC + will call this method if a device is not connected yet. This method should + be used to set up default values for signals, or to check if the device signals + are in the expected state. + + Args: + all_signals (bool): True if all signals should be considered. Default is False. + timeout (float): timeout in seconds. Default is 5 seconds. + """ + super().wait_for_connection(all_signals, timeout) + self.custom_prepare.on_wait_for_connection() + + def stage(self) -> list[object]: + """ + Thin wrapper around ophyd's stage, the method called in preparation for a scan. + + Stage is idempotent, if staged twice it should raise (we let ophyd.Device handle the raise here). + Other that that, we reset the stopped property in case the device was stopped before, and + pull the latest scan metadata from BEC. Ater that, we allow for beamline specific logic to + be implemented through the custom_prepare.on_stage method. + + Returns: + list(object): list of objects that were staged + """ + if self._staged != Staged.no: + return super().stage() + self.stopped = False + self.scaninfo.load_scan_metadata() + self.custom_prepare.on_stage() + return super().stage() + + def unstage(self) -> list[object]: + """ + This wrapper around ophyd's unstage method, which is called to clean up the device. + + It must be possible to call unstage multiple times without raising an exception. It should + be implemented to clean up the device if it is in a staged state. + + Beamline specific logic can be implemented through the custom_prepare.on_unstage method. + + Returns: + list(object): list of objects that were unstaged + """ + self.check_scan_id() + self.custom_prepare.on_unstage() + self.stopped = False + return super().unstage() + + def pre_scan(self) -> None: + """ + Pre-scan is a method introduced by BEC that is not native to the ophyd interface. + + This method is called from BEC just before the scan core starts, and therefore should only + implement time-critical actions. I.e. Arming a detector in case there is a risk of timing out. + """ + self.custom_prepare.on_pre_scan() + + def trigger(self) -> DeviceStatus: + """ + Thin wrapper around the trigger method of the device, for which the config value + softwareTrigger needs to be set to True, which will indicate to BEC that the device + should be triggered from the software during the scan. + + Custom logic should be implemented non-blocking, i.e. be fast, or implemented asynchroniously. + + Returns: + DeviceStatus: DeviceStatus object that BEC will use to check if the trigger was successful. + """ + # pylint: disable=assignment-from-no-return + status = self.custom_prepare.on_trigger() + if isinstance(status, DeviceStatus): + return status + return super().trigger() + + def complete(self) -> DeviceStatus: + """ + Thin wrapper around ophyd's complete method. Complete is called once the scan core + has finished, but before the scan is closed. It will be called before unstage. + It can also be used for fly scans to track the status of the flyer, and indicate if the + flyer has completed. + + The method is used to check whether the device has successfully completed the acquisition. + Actions are implemented in custom_prepare.on_complete since they are beamline specific. + + This method has to be non-blocking. If checks are necessary, they should be implemented asynchronously. + + Returns: + DeviceStatus: DeviceStatus object that BEC will use to check if the device has successfully completed. + """ + # pylint: disable=assignment-from-no-return + status = self.custom_prepare.on_complete() + if isinstance(status, DeviceStatus): + return status + status = DeviceStatus(self) + status.set_finished() + return status + + def stop(self, *, success=False) -> None: + """Stop the device. + + Args: + success (bool): Argument from ophyd's stop method. Default is False. + """ + self.custom_prepare.on_stop() + super().stop(success=success) + self.stopped = True + + def kickoff(self) -> DeviceStatus: + """ + This wrapper around Ophyd's kickoff method. + + Kickoff is a method native to the flyer interface of Ophyd. It is called to + start the flyer acquisition. This method is not called by BEC in general, but + only if the scan explicitly implements this. + + The method should be non-blocking, and if time-consuming actions are necessary, + they should be implemented asynchronously. + + Returns: + DeviceStatus: DeviceStatus object that BEC will use to check if the kickoff was successful. + """ + # pylint: disable=assignment-from-no-return + status = self.custom_prepare.on_kickoff() + if isinstance(status, DeviceStatus): + return status + status = DeviceStatus(self) + status.set_finished() + return status diff --git a/ophyd_devices/interfaces/base_classes/psi_detector_base.py b/ophyd_devices/interfaces/base_classes/psi_detector_base.py index 9f64491..0060db7 100644 --- a/ophyd_devices/interfaces/base_classes/psi_detector_base.py +++ b/ophyd_devices/interfaces/base_classes/psi_detector_base.py @@ -3,7 +3,7 @@ from bec_lib.endpoints import MessageEndpoints from ophyd import Component as Cpt from ophyd import Kind -from ophyd_devices.interfaces.base_classes.psi_device_base import CustomPrepare, PSIDeviceBase +from ophyd_devices.interfaces.base_classes.bec_device_base import BECDeviceBase, CustomPrepare from ophyd_devices.sim.sim_signals import SetableSignal @@ -57,8 +57,8 @@ class CustomDetectorMixin(CustomPrepare): pipe.execute() -class PSIDetectorBase(PSIDeviceBase): - """Deprecated, use PSIDeviceBase instead. Here for backwards compatibility.""" +class PSIDetectorBase(BECDeviceBase): + """Deprecated, use BECDeviceBase instead. Here for backwards compatibility.""" custom_prepare_cls = CustomDetectorMixin diff --git a/ophyd_devices/interfaces/base_classes/psi_device_base.py b/ophyd_devices/interfaces/base_classes/psi_device_base.py deleted file mode 100644 index 98c99f6..0000000 --- a/ophyd_devices/interfaces/base_classes/psi_device_base.py +++ /dev/null @@ -1,442 +0,0 @@ -"""This module contains the base class for SLS detectors. We follow the approach to integrate -PSI detectors into the BEC system based on this base class. The base class is used to implement -certain methods that are expected by BEC, such as stage, unstage, trigger, stop, etc... -We use composition with a custom prepare class to implement BL specific logic for the detector. -The beamlines need to inherit from the CustomDetectorMixing for their mixin classes.""" - -import os -import threading -import time -import traceback - -from bec_lib.file_utils import FileWriter -from bec_lib.logger import bec_logger -from ophyd import Device, DeviceStatus, Kind -from ophyd.device import Staged - -from ophyd_devices.utils import bec_utils -from ophyd_devices.utils.bec_scaninfo_mixin import BecScaninfoMixin -from ophyd_devices.utils.errors import DeviceStopError, DeviceTimeoutError - -logger = bec_logger.logger - - -class PSIDeviceBaseError(Exception): - """Error specific for the PSIDeviceBase class.""" - - -class CustomPrepare: - """ - Mixin class for custom detector logic - - This class is used to implement BL specific logic for the detector. - It is used in the PSIDetectorBase class. - - For the integration of a new detector, the following functions should - help with integrating functionality, but additional ones can be added. - - Check PSIDetectorBase for the functions that are called during relevant function calls of - stage, unstage, trigger, stop and _init. - """ - - def __init__(self, *_args, parent: Device = None, **_kwargs) -> None: - self.parent = parent - - def on_init(self) -> None: - """ - Init sequence for the Device. This method should be fast and not rely on setting any signals. - """ - - def on_wait_for_connection(self) -> None: - """ - Specify actions to be executed when waiting for the device to connect. - The on method is called after the device is connected, thus, signals are ready to be set. - This should be used to set initial values for signals, e.g. setting the velocity of a motor. - """ - - def on_stage(self) -> None: - """ - Specify actions to be executed during stage in preparation for a scan. - self.parent.scaninfo already has all current parameters for the upcoming scan. - - In case the backend service is writing data on disk, this step should include publishing - a file_event and file_message to BEC to inform the system where the data is written to. - - IMPORTANT: - It must be safe to assume that the device is ready for the scan - to start immediately once this function is finished. - """ - - def on_unstage(self) -> None: - """ - Specify actions to be executed during unstage. - - This step should include checking if the acqusition was successful, - and publishing the file location and file event message, - with flagged done to BEC. - """ - - def on_stop(self) -> None: - """ - Specify actions to be executed during stop. - This must also set self.parent.stopped to True. - - This step should include stopping the detector and backend service. - """ - - def on_trigger(self) -> None | DeviceStatus: - """ - Specify actions to be executed upon receiving trigger signal. - Return a DeviceStatus object or None - """ - - def on_pre_scan(self) -> None: - """ - Specify actions to be executed right before a scan starts. - - Only use if needed, and it is recommended to keep this function as short/fast as possible. - """ - - def on_complete(self) -> None | DeviceStatus: - """ - Specify actions to be executed when the scan is complete. - - This can for instance be to check with the detector and backend if all data is written succsessfully. - """ - - def on_kickoff(self) -> None | DeviceStatus: - """Flyer specific method to kickoff the device. - - Actions should be fast or, if time consuming, - implemented non-blocking and return a DeviceStatus object. - """ - - def wait_for_signals( - self, - signal_conditions: list[tuple], - timeout: float, - check_stopped: bool = False, - interval: float = 0.05, - all_signals: bool = False, - ) -> bool: - """ - Convenience wrapper to allow waiting for signals to reach a certain condition. - For EPICs PVs, an example usage is pasted at the bottom. - - Args: - signal_conditions (list[tuple]): tuple of executable calls for conditions (get_current_state, condition) to check - timeout (float): timeout in seconds - interval (float): interval in seconds - all_signals (bool): True if all signals should be True, False if any signal should be True - - Returns: - bool: True if all signals are in the desired state, False if timeout is reached - - >>> Example usage for EPICS PVs: - >>> self.wait_for_signals(signal_conditions=[(self.acquiring.get, False)], timeout=5, interval=0.05, check_stopped=True, all_signals=True) - """ - - timer = 0 - while True: - checks = [ - get_current_state() == condition - for get_current_state, condition in signal_conditions - ] - if check_stopped is True and self.parent.stopped is True: - return False - if (all_signals and all(checks)) or (not all_signals and any(checks)): - return True - if timer > timeout: - return False - time.sleep(interval) - timer += interval - - def wait_with_status( - self, - signal_conditions: list[tuple], - timeout: float, - check_stopped: bool = False, - interval: float = 0.05, - all_signals: bool = False, - exception_on_timeout: Exception = None, - ) -> DeviceStatus: - """Utility function to wait for signals in a thread. - Returns a DevicesStatus object that resolves either to set_finished or set_exception. - The DeviceStatus is attached to the paent device, i.e. the detector object inheriting from PSIDetectorBase. - - Usage: - This function should be used to wait for signals to reach a certain condition, especially in the context of - on_trigger and on_complete. If it is not used, functions may block and slow down the performance of BEC. - It will return a DeviceStatus object that is to be returned from the function. Once the conditions are met, - the DeviceStatus will be set to set_finished in case of success or set_exception in case of a timeout or exception. - The exception can be specified with the exception_on_timeout argument. The default exception is a TimeoutError. - - Args: - signal_conditions (list[tuple]): tuple of executable calls for conditions (get_current_state, condition) to check - timeout (float): timeout in seconds - check_stopped (bool): True if stopped flag should be checked - interval (float): interval in seconds - all_signals (bool): True if all signals should be True, False if any signal should be True - exception_on_timeout (Exception): Exception to raise on timeout - - Returns: - DeviceStatus: DeviceStatus object that resolves either to set_finished or set_exception - """ - if exception_on_timeout is None: - exception_on_timeout = DeviceTimeoutError( - f"Timeout error for {self.parent.name} while waiting for signals {signal_conditions}" - ) - - status = DeviceStatus(self.parent) - - # utility function to wrap the wait_for_signals function - def wait_for_signals_wrapper( - status: DeviceStatus, - signal_conditions: list[tuple], - timeout: float, - check_stopped: bool, - interval: float, - all_signals: bool, - exception_on_timeout: Exception, - ): - """Convenient wrapper around wait_for_signals to set status based on the result. - - Args: - status (DeviceStatus): DeviceStatus object to be set - signal_conditions (list[tuple]): tuple of executable calls for conditions (get_current_state, condition) to check - timeout (float): timeout in seconds - check_stopped (bool): True if stopped flag should be checked - interval (float): interval in seconds - all_signals (bool): True if all signals should be True, False if any signal should be True - exception_on_timeout (Exception): Exception to raise on timeout - """ - try: - result = self.wait_for_signals( - signal_conditions, timeout, check_stopped, interval, all_signals - ) - if result: - status.set_finished() - else: - if self.parent.stopped: - # INFO This will execute a callback to the parent device.stop() method - status.set_exception(exc=DeviceStopError(f"{self.parent.name} was stopped")) - else: - # INFO This will execute a callback to the parent device.stop() method - status.set_exception(exc=exception_on_timeout) - # pylint: disable=broad-except - except Exception as exc: - content = traceback.format_exc() - logger.warning( - f"Error in wait_for_signals in {self.parent.name}; Traceback: {content}" - ) - # INFO This will execute a callback to the parent device.stop() method - status.set_exception(exc=exc) - - thread = threading.Thread( - target=wait_for_signals_wrapper, - args=( - status, - signal_conditions, - timeout, - check_stopped, - interval, - all_signals, - exception_on_timeout, - ), - daemon=True, - ) - thread.start() - return status - - -class PSIDeviceBase(Device): - """ - Base class for custom device integrations at PSI. This class wraps around the ophyd's standard - set of methods, providing hooks for custom logic to be implemented in the custom_prepare_cls. - - Please check the device section in the developer section within the BEC documentation - (https://bec.readthedocs.io/en/latest/) for more information on how to integrate a new device using - this base class. - """ - - custom_prepare_cls = CustomPrepare - - # It can not hurt to define all just in case, or will it? - SUB_READBACK = "readback" - SUB_VALUE = "value" - SUB_DONE_MOVING = "done_moving" - SUB_MOTOR_IS_MOVING = "motor_is_moving" - SUB_PROGRESS = "progress" - SUB_FILE_EVENT = "file_event" - SUB_DEVICE_MONITOR_1D = "device_monitor_1d" - SUB_DEVICE_MONITOR_2D = "device_monitor_2d" - _default_sub = SUB_VALUE - - def __init__( - self, - name: str, - prefix: str = "", - kind: Kind | None = None, - parent=None, - device_manager=None, - **kwargs, - ): - super().__init__(prefix=prefix, name=name, kind=kind, parent=parent, **kwargs) - self._stopped = False - self.service_cfg = None - self.scaninfo = None - self.filewriter = None - - if not issubclass(self.custom_prepare_cls, CustomPrepare): - raise PSIDeviceBaseError( - f"Custom prepare class must be subclass of CustomDetectorMixin, provided: {self.custom_prepare_cls}" - ) - self.custom_prepare = self.custom_prepare_cls(parent=self, **kwargs) - - if device_manager: - self._update_service_config() - self.device_manager = device_manager - else: - self.device_manager = bec_utils.DMMock() - base_path = kwargs["basepath"] if "basepath" in kwargs else "." - self.service_cfg = {"base_path": os.path.abspath(base_path)} - - self.connector = self.device_manager.connector - self._update_scaninfo() - self._update_filewriter() - self._init() - - @property - def stopped(self) -> bool: - """Flag to indicate if the device is stopped""" - return self._stopped - - @stopped.setter - def stopped(self, value: bool) -> None: - """Set the stopped flag""" - self._stopped = value - - def _update_filewriter(self) -> None: - """Update filewriter with service config""" - self.filewriter = FileWriter(service_config=self.service_cfg, connector=self.connector) - - def _update_scaninfo(self) -> None: - """Update scaninfo from BecScaninfoMixing - This depends on device manager and operation/sim_mode - """ - self.scaninfo = BecScaninfoMixin(self.device_manager) - self.scaninfo.load_scan_metadata() - - def _update_service_config(self) -> None: - """Update service config from BEC service config - - If bec services are not running and SERVICE_CONFIG is NONE, we fall back to the current directory. - """ - # pylint: disable=import-outside-toplevel - from bec_lib.bec_service import SERVICE_CONFIG - - if SERVICE_CONFIG: - self.service_cfg = SERVICE_CONFIG.config["service_config"]["file_writer"] - return - self.service_cfg = {"base_path": os.path.abspath(".")} - - def check_scan_id(self) -> None: - """Checks if scan_id has changed and set stopped flagged to True if it has.""" - old_scan_id = self.scaninfo.scan_id - self.scaninfo.load_scan_metadata() - if self.scaninfo.scan_id != old_scan_id: - self.stopped = True - - def _init(self) -> None: - """Initialize detector, filewriter and set default parameters""" - self.custom_prepare.on_init() - - def stage(self) -> list[object]: - """ - Stage device in preparation for a scan. - First we check if the device is already staged. Stage is idempotent, - if staged twice it should raise (we let ophyd.Device handle the raise here). - We reset the stopped flag and get the scaninfo from BEC, before calling custom_prepare.on_stage. - - Returns: - list(object): list of objects that were staged - - """ - if self._staged != Staged.no: - return super().stage() - self.stopped = False - self.scaninfo.load_scan_metadata() - self.custom_prepare.on_stage() - return super().stage() - - def pre_scan(self) -> None: - """Pre-scan logic. - - This function will be called from BEC directly before the scan core starts, and should only implement - time-critical actions. Therefore, it should also be kept as short/fast as possible. - I.e. Arming a detector in case there is a risk of timing out. - """ - self.custom_prepare.on_pre_scan() - - def trigger(self) -> DeviceStatus: - """Trigger the detector, called from BEC.""" - # pylint: disable=assignment-from-no-return - status = self.custom_prepare.on_trigger() - if isinstance(status, DeviceStatus): - return status - return super().trigger() - - def complete(self) -> None: - """Complete the acquisition, called from BEC. - - This function is called after the scan is complete, just before unstage. - We can check here with the data backend and detector if the acquisition successfully finished. - - Actions are implemented in custom_prepare.on_complete since they are beamline specific. - """ - # pylint: disable=assignment-from-no-return - status = self.custom_prepare.on_complete() - if isinstance(status, DeviceStatus): - return status - status = DeviceStatus(self) - status.set_finished() - return status - - def unstage(self) -> list[object]: - """ - Unstage device after a scan. - - We first check if the scanID has changed, thus, the scan was unexpectedly interrupted but the device was not stopped. - If that is the case, the stopped flag is set to True, which will immediately unstage the device. - - Custom_prepare.on_unstage is called to allow for BL specific logic to be executed. - - Returns: - list(object): list of objects that were unstaged - """ - self.check_scan_id() - self.custom_prepare.on_unstage() - self.stopped = False - return super().unstage() - - def stop(self, *, success=False) -> None: - """ - Stop the scan, with camera and file writer - - """ - self.custom_prepare.on_stop() - super().stop(success=success) - self.stopped = True - - def wait_for_connection(self, all_signals=False, timeout=5) -> None: - super().wait_for_connection(all_signals, timeout) - self.custom_prepare.on_wait_for_connection() - - def kickoff(self) -> None: - """Kickoff the device""" - status = self.custom_prepare.on_kickoff() - if isinstance(status, DeviceStatus): - return status - status = DeviceStatus(self) - status.set_finished() - return status