From ddd0b790f8ef3e53966c660c431d2f7a9ceda97c Mon Sep 17 00:00:00 2001 From: appel_c Date: Wed, 27 Mar 2024 20:42:33 +0100 Subject: [PATCH] feat: add protocols and rotation base device --- .../devices/ophyd_base_devices/__init__.py | 0 .../ophyd_base_devices/bec_protocols.py | 499 ++++++++++++++++++ .../ophyd_base_devices/ophyd_rotation_base.py | 107 ++++ .../tomcat_rotation_motors.py | 181 +++++++ 4 files changed, 787 insertions(+) create mode 100644 ophyd_devices/epics/devices/ophyd_base_devices/__init__.py create mode 100644 ophyd_devices/epics/devices/ophyd_base_devices/bec_protocols.py create mode 100644 ophyd_devices/epics/devices/ophyd_base_devices/ophyd_rotation_base.py create mode 100644 ophyd_devices/epics/devices/ophyd_base_devices/tomcat_rotation_motors.py diff --git a/ophyd_devices/epics/devices/ophyd_base_devices/__init__.py b/ophyd_devices/epics/devices/ophyd_base_devices/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/ophyd_devices/epics/devices/ophyd_base_devices/bec_protocols.py b/ophyd_devices/epics/devices/ophyd_base_devices/bec_protocols.py new file mode 100644 index 0000000..c720b4e --- /dev/null +++ b/ophyd_devices/epics/devices/ophyd_base_devices/bec_protocols.py @@ -0,0 +1,499 @@ +""" This module provides a range of protocols that describe the expected interface for different types of devices. + +The protocols below can be used as teamplates for functionality to be implemeted by different type of devices. +They further facilitate runtime checks on devices and provide a minimum set of properties required for a device to be loadable by BEC. + +The protocols are: +- BECDeviceProtocol: Protocol for devices in BEC. All devices must at least implement this protocol. +- BECSignalProtocol: Protocol for signals. +- BECScanProtocol: Protocol for the scan interface. +- BECMixinProtocol: Protocol for utilities in particular relevant for detector implementations. +- BECPositionerProtocol: Protocol for positioners. +- BECFlyerProtocol: Protocol with for flyers. + +Keep in mind, that a device of type flyer should generally also implement the BECScanProtocol that provides the required functionality for scans. +Flyers in addition, also implement the BECFlyerProtocol. Similarly, positioners should also implement the BECScanProtocol and BECPositionerProtocol. + +""" + +from typing import Protocol, runtime_checkable + +from bec_lib.file_utils import FileWriterMixin +from ophyd import Component, DeviceStatus, Kind, Staged + +from ophyd_devices.utils import bec_scaninfo_mixin + + +@runtime_checkable +class BECDeviceProtocol(Protocol): + """Protocol for ophyd objects with zero functionality.""" + + _destroyed: bool + + @property + def name(self) -> str: + """name property""" + + @name.setter + def name(self, value: str) -> None: + """name setter""" + + @property + def kind(self) -> Kind: + """kind property""" + + @kind.setter + def kind(self, value: Kind): + """kind setter""" + + @property + def parent(self): + """Property to find the parent device""" + + @property + def root(self): + """Property to fint the root device""" + + @property + def hints(self) -> dict: + """hints property""" + + @property + def connected(self) -> bool: + """connected property. + Check if signals are connected + + Returns: + bool: True if connected, False otherwise + """ + + @connected.setter + def connected(self, value: bool): + """connected setter""" + + def read(self) -> dict: + """read method + + Override by child class with read method + + Returns: + dict: Dictionary with nested dictionary of signals with kind.normal or kind.hinted: + {'signal_name' : {'value' : .., "timestamp" : ..}, ...} + """ + + def read_configuration(self) -> dict: + """read_configuration method + + Override by child class with read_configuration method + + Returns: + dict: Dictionary with nested dictionary of signals with kind.config: + {'signal_name' : {'value' : .., "timestamp" : ..}, ...} + """ + + def describe(self) -> dict: + """describe method + + Override by child class with describe method + + Returns: + dict: Dictionary with dictionaries with signal descriptions ('source', 'dtype', 'shape') + """ + + def describe_configuration(self) -> dict: + """describe method + + Includes all signals of type Kind.config. + Override by child class with describe_configuration method + + Returns: + dict: Dictionary with dictionaries with signal descriptions ('source', 'dtype', 'shape') + """ + + def destroy(self) -> None: + """Destroy method. + + _destroyed must be set to True after calling destroy. + """ + + +@runtime_checkable +class BECSignalProtocol(Protocol): + """Protocol for BEC signals with zero functionality. + + This protocol adds the specific implementation for a signal. + Please be aware that a signal must also implement BECDeviceProtocol. + + Note: Currently the implementation of the protocol is not taking into account the + event_model from ophyd, i.e. _run_sbus + """ + + @property + def limits(self) -> tuple[float, float]: + """Limits property for signals. + If low_limit == high_limit, it is equivalent to NO limits! + + Returns: + tuple: Tuple with lower and upper limits + """ + + @property + def high_limit(self) -> float: + """High limit property for signals. + + Returns: + float: Upper limit + """ + + @property + def low_limit(self) -> float: + """Low limit property for signals. + + Returns: + float: Lower limit + """ + + @property + def write_access(self) -> bool: + """Write access method for signals. + + Returns: + bool: True if write access is allowed, False otherwise + """ + + def check_value(self, value: float): + """Check whether value is within limits + + Args: + value: value to check + + Raises: + LimitError in case the requested motion is not inside of limits. + """ + + def trigger(self) -> DeviceStatus: + """Trigger method for signals. + This method can be used to trigger the signal readout. + For EpicsSignal, the readback value is typically set to auto_monitor=True, + which means their readback value is updated automatically from the IOC. + + Returns: + DeviceStatus: DeviceStatus object + """ + + def put(self, value: any, force: bool = False, timeout: float = None): + """Put method for signals. + This method should resolve immediately and not block. + If not force, the method checks if the value is within limits using check_value. + + + Args: + value (any) : value to put + force (bool) : Flag to force the put and ignore limits + timeout (float) : Timeout for the put + """ + + def set(self, value: any, timeout: float = None) -> DeviceStatus: + """Set method for signals. + This method should be blocking until the set is completed. + + Args: + value (any) : value to set + timeout (float) : Timeout for the set + + Returns: + DeviceStatus : DeviceStatus object that will finish upon return + """ + + +@runtime_checkable +class BECScanProtocol(BECDeviceProtocol, Protocol): + """Protocol for devices offering an Protocol with all relevant functionality for scans. + + In BEC, scans typically follow the order of stage, (pre_scan), trigger, unstage. + Stop should be used to interrupt a scan. Be aware that pre_scan is optional and therefor + part of the BECMixinProtocol, typically useful for more complex devices such as detectors. + + This protocol allows to perform runtime checks on devices of ophyd. + It is the minimum set of properties required for a device to be loadable by BEC. + """ + + _staged: Staged + """Staged property to indicate if the device is staged.""" + + def stage(self) -> list[object]: + """Stage method to prepare the device for an upcoming acquistion. + + This prepares a device for an upcoming acquisition, i.e. it is the first + method for which the scan parameters are known and the device can be configured. + + It can be used to move scan_motors to their start position + or also prepare DAQ systems for the upcoming measurement. + We can further publish the file location for DAQ systems + to BEC and inform BEC's file writer where data will be written to. + + Stagin is not idempoent. If called twice without an unstage it should raise. + For ophyd devices, one may used self._staged = True to check if the device is staged. + + Returns: + list: List of objects that were staged, i.e. [self] + For devices with inheritance from ophyd, return + return super().stage() in the child class. + """ + + def unstage(self) -> list[object]: + """Unstage method to cleanup after the acquisition. + + It can also be used to implement checks whether the acquisition was successful, + inform BEC that the file has been succesfully written, or raise upon receiving + feedback that the scan did not finish successful. + + Unstaging is not idempotent. If called twice without a stage it should raise. + It is recommended to return super().unstage() in the child class, if + the child class also inherits from ophyd repository. + """ + + def stop(self, success: bool) -> None: + """Stop method to stop the device. + + Args: + success: Flag to indicate if the scan was successful or not. + + This method should be called to stop the device. It is recommended to call + super().stop(success=success) if class inherits from ophyd repository. + """ + + def trigger(self) -> DeviceStatus: + """Trigger method on the device + + Returns ophyd DeviceStatus object, which is used to track the status of the trigger. + It can also be blocking until the trigger is completed, and return the status object + with set_finished() method called on the DeviceStatus. + """ + + +@runtime_checkable +class BECMixinProtocol(Protocol): + """Protocol that offers BEC specific utility functionality for detectors.""" + + USER_ACCESS: list[str] + """ + List of methods/properties that will be exposed to the client interface in addition + to the the already exposed signals, methods and properties. + """ + + scaninfo: bec_scaninfo_mixin + """ + BEC scan info mixin class that provides an transparent Protocol to scan parameter + as provided by BEC. It is recommended to use this Protocol to retrieve scaninfo from Redis. + """ + + stopped: bool + """ + Flag to indicate if the device is stopped. + + The stop method should set this flag to True, and i.e. stage to set it to False. + """ + + filewriter: FileWriterMixin + """ + The file writer mixin main purpose is to unify and centralize the creation of + file paths within BEC. Therefore, we recommend devices to use the same mixin for creation of paths. + """ + + def pre_scan(self): + """Pre-scan method is called from BEC right before executing scancore, thus + right before the start of an acquisition. + + It can be used to trigger time critical functions from the device, which + are prone to run into timeouts in case called too early. + """ + + +@runtime_checkable +class BECPositionerProtocol(BECScanProtocol, Protocol): + """Protocol with functionality specific for positioners in BEC.""" + + @property + def limits(self) -> tuple[float, float]: + """Limits property for positioners. + For an EpicsMotor, BEC will automatically recover the limits from the IOC. + + If not set, it returns (0,0). + Note, low_limit = high_limit is equivalent to NO limits! + + Returns: + tuple: Tuple with lower and upper limits + """ + + @property + def low_limit(self) -> float: + """Low limit property for positioners. + + Returns: + float: Lower limit + """ + + @property + def high_limit(self) -> float: + """High limit property for positioners. + + Returns: + float: Upper limit + """ + + def check_value(self, value: float): + """Check whether value is within limits + + Args: + value: value to check + + Raises: + LimitError in case the requested motion is not inside of limits. + """ + + def move(self, position: float) -> DeviceStatus: + """Move method for positioners. + The returned DeviceStatus is marked as done once the positioner has reached the target position. + DeviceStatus.wait() can be used to block until the move is completed. + + Args: + position: position to move to + + Returns: + DeviceStatus: DeviceStatus object + """ + + def stop(self, success: bool) -> None: + """Stop method for positioners. + + Args: + success: Flag to indicate if the scan was successful or not. + """ + + +@runtime_checkable +class BECFlyerProtocol(BECScanProtocol, Protocol): + """Protocol with functionality specific for flyers in BEC.""" + + def configure(self, d: dict): + """Configure method of the flyer. + It is an optional method, but does not need to be implemented by a flyer. + Instead, stage can be used to prepare time critical operations on the device in preparation of a scan. + + Method to configure the flyer in preparation of a scan. + + Args: + d (dict): Dictionary with configuration parameters, i.e. key value pairs of signal_name : value + """ + + def kickoff(self) -> DeviceStatus: + """Kickoff method for flyers. + + The returned DeviceStatus is marked as done once the flyer start flying, + i.e. is ready to be triggered. + + Returns: + DeviceStatus: DeviceStatus object + """ + + def complete(self) -> DeviceStatus: + """Complete method for flyers. + + The returned DeviceStatus is marked as done once the flyer has completed. + + Returns: + DeviceStatus: DeviceStatus object + """ + + +@runtime_checkable +class BECRotationProtocol(Protocol): + """Protocol which defines functionality for a tomography stage for ophyd devices""" + + allow_mod360: Component + """Signal to define whether mod360 operations are allowed. """ + + @property + def has_mod360(self) -> bool: + """Property to check if the motor has mod360 option + + Returns: + bool: True if mod360 is possible on device, False otherwise + """ + + @property + def has_freerun(self) -> bool: + """Property to check if the motor has freerun option + + Returns: + bool: True if freerun is allowed, False otherwise + """ + + @property + def valid_rotation_modes(self) -> list[str]: + """Method to get the valid rotation modes for the implemented motor. + + Returns: + list: List of strings with valid rotation modes + """ + + def apply_mod360(self) -> None: + """Method to apply the modulus 360 operation on the specific device. + + Childrens should override this method + """ + + +@runtime_checkable +class BECEventProtocol(Protocol): + """Protocol for events in BEC. + + This is a first draft for the event protocol introduced throughout BEC. + It needs to be review and extended before it can be used in production. + """ + + _callbacks: dict[dict] + + @property + def event_types(self) -> tuple[str]: + """Event types property""" + + def _run_subs(self, sub_type: str, **kwargs): + """Run subscriptions for the event. + + Args: + sub_type: Subscription type + kwargs: Keyword arguments + """ + + def subscribe(self, callback: callable, event_type: str = None, run: bool = True): + """Subscribe to the event. + + Args: + callback (callable) : Callback function + The expected callback structure is: + def cb(*args, obj:OphydObject, sub_type:str, **kwargs) -> None: + pass + event_type (str) : Event type, if None it defaults to obj._default_sub + This maps to sub_type in _run_subs + run (bool) : If true, run the callback directly. + + Returns: + cid (int): Callback id + """ + + def clear_sub(self, cb: callable, event_type: str = None): + """Clear subscription, given the origianl callback fucntion + + Args: + cb (callable) : Callback + event_type (str): Event type, if None it will be remove from all event_types + """ + + def unsubscribe(self, cid: int): + """Unsubscribe from the event. + + Args: + cid (int): Callback id + """ diff --git a/ophyd_devices/epics/devices/ophyd_base_devices/ophyd_rotation_base.py b/ophyd_devices/epics/devices/ophyd_base_devices/ophyd_rotation_base.py new file mode 100644 index 0000000..8e316ae --- /dev/null +++ b/ophyd_devices/epics/devices/ophyd_base_devices/ophyd_rotation_base.py @@ -0,0 +1,107 @@ +from abc import ABC, abstractmethod + +from bec_lib import bec_logger +from ophyd import Component as Cpt +from ophyd import EpicsMotor +from typeguard import typechecked + +from ophyd_devices.epics.devices.ophyd_base_devices.bec_protocols import ( + BECRotationProtocol, +) +from ophyd_devices.utils.bec_utils import ConfigSignal + +logger = bec_logger.logger + + +class OphtyRotationBaseError(Exception): + """Exception specific for implmenetation of rotation stages.""" + + +class OphydRotationBase(BECRotationProtocol, ABC): + + allow_mod360 = Cpt(ConfigSignal, name="allow_mod360", value=False, kind="config") + + def __init__(self, *args, **kwargs): + """ + Base class to implement functionality specific for rotation devices. + + Childrens should override the instance attributes: + - has_mod360 + - has_freerun + - valid_rotation_modes + + """ + # pylint: disable=protected-access + self._has_mod360 = False + self._has_freerun = False + self._valid_rotation_modes = [] + if "allow_mod360" in kwargs: + if not isinstance(kwargs["allow_mod360"], bool): + raise ValueError("allow_mod360 must be a boolean") + self.allow_mod360.put(kwargs["allow_mod360"]) + super().__init__(*args, **kwargs) + + @abstractmethod + def apply_mod360(self) -> None: + """Method to apply the modulus 360 operation on the specific device. + + Childrens should override this method + """ + + @property + def has_mod360(self) -> bool: + """Property to check if the device has mod360 operation. + + ReadOnly property, childrens should override this method. + """ + return self._has_mod360 + + @property + def has_freerun(self) -> bool: + """Property to check if the device has freerun operation. + + ReadOnly property, childrens should override this method. + """ + return self._has_freerun + + @property + def valid_rotation_modes(self) -> list: + """Method to get the valid rotation modes for the specific device.""" + return self._valid_rotation_modes + + @typechecked + @valid_rotation_modes.setter + def valid_rotation_modes(self, value: list[str]): + """Method to set the valid rotation modes for the specific device.""" + self._valid_rotation_modes = value + return self._valid_rotation_modes + + +# pylint: disable=too-many-ancestors +class EpicsRotationBase(OphydRotationBase, EpicsMotor): + """Class for Epics rotation devices.""" + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self._has_freerun = True + self._has_freerun = True + self._valid_rotation_modes = ["target", "radiography"] + + def apply_mod360(self) -> None: + """Apply modulos 360 operation for EpicsMotorRecord. + + EpicsMotor has the function "set_current_position" which can be used for this purpose. + In addition, there is a check if mod360 is allowed and available. + """ + if self.has_mod360 and self.allow_mod360.get(): + cur_val = self.user_readback.get() + new_val = cur_val % 360 + try: + self.set_current_position(new_val) + except Exception as exc: + error_msg = f"Failed to set new position {new_val} from {cur_val} on device {self.name} with error {exc}" + raise OphtyRotationBaseError(error_msg) from exc + return + logger.info( + f"Did not apply mod360 for device {self.name} with has_mod={self.has_mod360} and allow_mod={self.allow_mod360.get()}" + ) diff --git a/ophyd_devices/epics/devices/ophyd_base_devices/tomcat_rotation_motors.py b/ophyd_devices/epics/devices/ophyd_base_devices/tomcat_rotation_motors.py new file mode 100644 index 0000000..86890cd --- /dev/null +++ b/ophyd_devices/epics/devices/ophyd_base_devices/tomcat_rotation_motors.py @@ -0,0 +1,181 @@ +""" Module for Tomcat rotation motors. + +The following classes implement the rotation motors for: + +- AerotechAutomation1 (Tomcat), based on EpicsMotorIOC. + +""" + +import threading +import time + +import numpy as np +from bec_lib import threadlocked +from ophyd import DeviceStatus + +from ophyd_devices.epics.devices.ophyd_base_devices.bec_protocols import ( + BECFlyerProtocol, + BECScanProtocol, +) +from ophyd_devices.epics.devices.ophyd_base_devices.ophyd_rotation_base import ( + EpicsRotationBase, +) + + +class TomcatAerotechRotation(EpicsRotationBase, BECFlyerProtocol, BECScanProtocol): + """Special motor class that provides flyer interface and progress bar.""" + + SUB_PROGRESS = "progress" + + def __init__( + self, + prefix="", + *, + name, + kind=None, + read_attrs=None, + configuration_attrs=None, + parent=None, + **kwargs, + ): + """Implementation of the Tomcat AerotechAutomation 1 rotation motor class. + + This motor class is based on EpicsRotationBase and provides in addition the flyer interface for BEC + and a progress update. + """ + super().__init__( + prefix=prefix, + name=name, + kind=kind, + read_attrs=read_attrs, + configuration_attrs=configuration_attrs, + parent=parent, + **kwargs, + ) + self._start_position = None + self._target_position = None + self._stopped = False + self._rlock = threading.RLock() + self.subscribe(self._progress_update, run=False) + + # ------------------ alternative to using configure method --------------------- # + @property + def start_position(self) -> float: + """Get the start position.""" + return self._start_position + + @start_position.setter + def start_position(self, value: float) -> None: + """Set the start position.""" + self._start_position = value + + @property + def target_position(self) -> float: + """Get the start position.""" + return self._target_position + + @target_position.setter + def target_position(self, value: float) -> None: + """Set the start position.""" + self._target_position = value + + # ------------------ alternative to using configure method --------------------- # + + def configure(self, d: dict) -> dict: + """Configure method from the device. + + This method is usually used to set configuration parameters for the device. + + Args: + d (dict): Dictionary with configuration parameters. + + """ + if "target" in d: + self._target_position = d["target"] + del d["target"] + if "position" in d: + self._target_position = d["position"] + del d["position"] + return super().configure(d) + + def pre_scan(self): + """Perform pre-scan operation, e.g. move to start position.""" + if self._start_position: + self.move(self._start_position, wait=True) + + def kickoff(self) -> DeviceStatus: + """Kickoff the scan. + + The kickoff method should return a status object that is set to finish once the flyer flys, and is ready for the next actions. + I would consider the following implementation. + """ + self._start_position = float(self.position) + self.move(self._target_position, wait=False) + status = DeviceStatus(self) + status.set_finished() + return status + + def complete(self) -> DeviceStatus: + """Complete method of the scan. + + This will be called in a fly scan after the kickoff, thus, the stage will be moving to it's target position. + It should + + The complete method should return a status object that is set to finish once the flyer is done and the scan is complete. + I would consider the following implementation. + """ + threading.Thread(target=self._is_motor_moving, daemon=True).start() + status = DeviceStatus(self) + self.subscribe(status.set_finished, event_type=self.SUB_DONE, run=False) + return status + + def stage(self) -> list[object]: + """Stage the scan. + + We add here in addition the setting of the _stopped flag to False for the thread. + """ + self._stopped = False + return super().stage() + + def stop(self, success: bool = False) -> None: + """Stop the scan. + + If the device is stopped, the _stopped flag is set to True. + """ + self._stopped = True + super().stop(success=success) + + @threadlocked + def _is_motor_moving(self): + """Function to check if the motor is moving. + + This function is used in a thread to check if the motor is moving. + It resolves by running""" + while self.motor_done_move.get(): + if self._stopped: + self._done_moving(success=False) + return + time.sleep(0.1) + self._done_moving(success=True) + + def _progress_update(self, value, **kwargs) -> None: + """Progress update on the scan""" + if (self._start_position is None) or (self._target_position is None) or (not self.moving): + self._run_subs( + sub_type=self.SUB_PROGRESS, + value=1, + max_value=1, + done=1, + ) + return + + progress = np.abs( + (value - self._start_position) / (self._target_position - self._start_position) + ) + max_value = 100 + self._run_subs( + sub_type=self.SUB_PROGRESS, + value=int(100 * progress), + max_value=max_value, + done=int(np.isclose(max_value, progress, 1e-3)), + )