feat: add protocols and rotation base device

This commit is contained in:
appel_c 2024-03-27 20:42:33 +01:00
parent 2c43559aa8
commit ddd0b790f8
4 changed files with 787 additions and 0 deletions

View File

@ -0,0 +1,499 @@
""" This module provides a range of protocols that describe the expected interface for different types of devices.
The protocols below can be used as teamplates for functionality to be implemeted by different type of devices.
They further facilitate runtime checks on devices and provide a minimum set of properties required for a device to be loadable by BEC.
The protocols are:
- BECDeviceProtocol: Protocol for devices in BEC. All devices must at least implement this protocol.
- BECSignalProtocol: Protocol for signals.
- BECScanProtocol: Protocol for the scan interface.
- BECMixinProtocol: Protocol for utilities in particular relevant for detector implementations.
- BECPositionerProtocol: Protocol for positioners.
- BECFlyerProtocol: Protocol with for flyers.
Keep in mind, that a device of type flyer should generally also implement the BECScanProtocol that provides the required functionality for scans.
Flyers in addition, also implement the BECFlyerProtocol. Similarly, positioners should also implement the BECScanProtocol and BECPositionerProtocol.
"""
from typing import Protocol, runtime_checkable
from bec_lib.file_utils import FileWriterMixin
from ophyd import Component, DeviceStatus, Kind, Staged
from ophyd_devices.utils import bec_scaninfo_mixin
@runtime_checkable
class BECDeviceProtocol(Protocol):
"""Protocol for ophyd objects with zero functionality."""
_destroyed: bool
@property
def name(self) -> str:
"""name property"""
@name.setter
def name(self, value: str) -> None:
"""name setter"""
@property
def kind(self) -> Kind:
"""kind property"""
@kind.setter
def kind(self, value: Kind):
"""kind setter"""
@property
def parent(self):
"""Property to find the parent device"""
@property
def root(self):
"""Property to fint the root device"""
@property
def hints(self) -> dict:
"""hints property"""
@property
def connected(self) -> bool:
"""connected property.
Check if signals are connected
Returns:
bool: True if connected, False otherwise
"""
@connected.setter
def connected(self, value: bool):
"""connected setter"""
def read(self) -> dict:
"""read method
Override by child class with read method
Returns:
dict: Dictionary with nested dictionary of signals with kind.normal or kind.hinted:
{'signal_name' : {'value' : .., "timestamp" : ..}, ...}
"""
def read_configuration(self) -> dict:
"""read_configuration method
Override by child class with read_configuration method
Returns:
dict: Dictionary with nested dictionary of signals with kind.config:
{'signal_name' : {'value' : .., "timestamp" : ..}, ...}
"""
def describe(self) -> dict:
"""describe method
Override by child class with describe method
Returns:
dict: Dictionary with dictionaries with signal descriptions ('source', 'dtype', 'shape')
"""
def describe_configuration(self) -> dict:
"""describe method
Includes all signals of type Kind.config.
Override by child class with describe_configuration method
Returns:
dict: Dictionary with dictionaries with signal descriptions ('source', 'dtype', 'shape')
"""
def destroy(self) -> None:
"""Destroy method.
_destroyed must be set to True after calling destroy.
"""
@runtime_checkable
class BECSignalProtocol(Protocol):
"""Protocol for BEC signals with zero functionality.
This protocol adds the specific implementation for a signal.
Please be aware that a signal must also implement BECDeviceProtocol.
Note: Currently the implementation of the protocol is not taking into account the
event_model from ophyd, i.e. _run_sbus
"""
@property
def limits(self) -> tuple[float, float]:
"""Limits property for signals.
If low_limit == high_limit, it is equivalent to NO limits!
Returns:
tuple: Tuple with lower and upper limits
"""
@property
def high_limit(self) -> float:
"""High limit property for signals.
Returns:
float: Upper limit
"""
@property
def low_limit(self) -> float:
"""Low limit property for signals.
Returns:
float: Lower limit
"""
@property
def write_access(self) -> bool:
"""Write access method for signals.
Returns:
bool: True if write access is allowed, False otherwise
"""
def check_value(self, value: float):
"""Check whether value is within limits
Args:
value: value to check
Raises:
LimitError in case the requested motion is not inside of limits.
"""
def trigger(self) -> DeviceStatus:
"""Trigger method for signals.
This method can be used to trigger the signal readout.
For EpicsSignal, the readback value is typically set to auto_monitor=True,
which means their readback value is updated automatically from the IOC.
Returns:
DeviceStatus: DeviceStatus object
"""
def put(self, value: any, force: bool = False, timeout: float = None):
"""Put method for signals.
This method should resolve immediately and not block.
If not force, the method checks if the value is within limits using check_value.
Args:
value (any) : value to put
force (bool) : Flag to force the put and ignore limits
timeout (float) : Timeout for the put
"""
def set(self, value: any, timeout: float = None) -> DeviceStatus:
"""Set method for signals.
This method should be blocking until the set is completed.
Args:
value (any) : value to set
timeout (float) : Timeout for the set
Returns:
DeviceStatus : DeviceStatus object that will finish upon return
"""
@runtime_checkable
class BECScanProtocol(BECDeviceProtocol, Protocol):
"""Protocol for devices offering an Protocol with all relevant functionality for scans.
In BEC, scans typically follow the order of stage, (pre_scan), trigger, unstage.
Stop should be used to interrupt a scan. Be aware that pre_scan is optional and therefor
part of the BECMixinProtocol, typically useful for more complex devices such as detectors.
This protocol allows to perform runtime checks on devices of ophyd.
It is the minimum set of properties required for a device to be loadable by BEC.
"""
_staged: Staged
"""Staged property to indicate if the device is staged."""
def stage(self) -> list[object]:
"""Stage method to prepare the device for an upcoming acquistion.
This prepares a device for an upcoming acquisition, i.e. it is the first
method for which the scan parameters are known and the device can be configured.
It can be used to move scan_motors to their start position
or also prepare DAQ systems for the upcoming measurement.
We can further publish the file location for DAQ systems
to BEC and inform BEC's file writer where data will be written to.
Stagin is not idempoent. If called twice without an unstage it should raise.
For ophyd devices, one may used self._staged = True to check if the device is staged.
Returns:
list: List of objects that were staged, i.e. [self]
For devices with inheritance from ophyd, return
return super().stage() in the child class.
"""
def unstage(self) -> list[object]:
"""Unstage method to cleanup after the acquisition.
It can also be used to implement checks whether the acquisition was successful,
inform BEC that the file has been succesfully written, or raise upon receiving
feedback that the scan did not finish successful.
Unstaging is not idempotent. If called twice without a stage it should raise.
It is recommended to return super().unstage() in the child class, if
the child class also inherits from ophyd repository.
"""
def stop(self, success: bool) -> None:
"""Stop method to stop the device.
Args:
success: Flag to indicate if the scan was successful or not.
This method should be called to stop the device. It is recommended to call
super().stop(success=success) if class inherits from ophyd repository.
"""
def trigger(self) -> DeviceStatus:
"""Trigger method on the device
Returns ophyd DeviceStatus object, which is used to track the status of the trigger.
It can also be blocking until the trigger is completed, and return the status object
with set_finished() method called on the DeviceStatus.
"""
@runtime_checkable
class BECMixinProtocol(Protocol):
"""Protocol that offers BEC specific utility functionality for detectors."""
USER_ACCESS: list[str]
"""
List of methods/properties that will be exposed to the client interface in addition
to the the already exposed signals, methods and properties.
"""
scaninfo: bec_scaninfo_mixin
"""
BEC scan info mixin class that provides an transparent Protocol to scan parameter
as provided by BEC. It is recommended to use this Protocol to retrieve scaninfo from Redis.
"""
stopped: bool
"""
Flag to indicate if the device is stopped.
The stop method should set this flag to True, and i.e. stage to set it to False.
"""
filewriter: FileWriterMixin
"""
The file writer mixin main purpose is to unify and centralize the creation of
file paths within BEC. Therefore, we recommend devices to use the same mixin for creation of paths.
"""
def pre_scan(self):
"""Pre-scan method is called from BEC right before executing scancore, thus
right before the start of an acquisition.
It can be used to trigger time critical functions from the device, which
are prone to run into timeouts in case called too early.
"""
@runtime_checkable
class BECPositionerProtocol(BECScanProtocol, Protocol):
"""Protocol with functionality specific for positioners in BEC."""
@property
def limits(self) -> tuple[float, float]:
"""Limits property for positioners.
For an EpicsMotor, BEC will automatically recover the limits from the IOC.
If not set, it returns (0,0).
Note, low_limit = high_limit is equivalent to NO limits!
Returns:
tuple: Tuple with lower and upper limits
"""
@property
def low_limit(self) -> float:
"""Low limit property for positioners.
Returns:
float: Lower limit
"""
@property
def high_limit(self) -> float:
"""High limit property for positioners.
Returns:
float: Upper limit
"""
def check_value(self, value: float):
"""Check whether value is within limits
Args:
value: value to check
Raises:
LimitError in case the requested motion is not inside of limits.
"""
def move(self, position: float) -> DeviceStatus:
"""Move method for positioners.
The returned DeviceStatus is marked as done once the positioner has reached the target position.
DeviceStatus.wait() can be used to block until the move is completed.
Args:
position: position to move to
Returns:
DeviceStatus: DeviceStatus object
"""
def stop(self, success: bool) -> None:
"""Stop method for positioners.
Args:
success: Flag to indicate if the scan was successful or not.
"""
@runtime_checkable
class BECFlyerProtocol(BECScanProtocol, Protocol):
"""Protocol with functionality specific for flyers in BEC."""
def configure(self, d: dict):
"""Configure method of the flyer.
It is an optional method, but does not need to be implemented by a flyer.
Instead, stage can be used to prepare time critical operations on the device in preparation of a scan.
Method to configure the flyer in preparation of a scan.
Args:
d (dict): Dictionary with configuration parameters, i.e. key value pairs of signal_name : value
"""
def kickoff(self) -> DeviceStatus:
"""Kickoff method for flyers.
The returned DeviceStatus is marked as done once the flyer start flying,
i.e. is ready to be triggered.
Returns:
DeviceStatus: DeviceStatus object
"""
def complete(self) -> DeviceStatus:
"""Complete method for flyers.
The returned DeviceStatus is marked as done once the flyer has completed.
Returns:
DeviceStatus: DeviceStatus object
"""
@runtime_checkable
class BECRotationProtocol(Protocol):
"""Protocol which defines functionality for a tomography stage for ophyd devices"""
allow_mod360: Component
"""Signal to define whether mod360 operations are allowed. """
@property
def has_mod360(self) -> bool:
"""Property to check if the motor has mod360 option
Returns:
bool: True if mod360 is possible on device, False otherwise
"""
@property
def has_freerun(self) -> bool:
"""Property to check if the motor has freerun option
Returns:
bool: True if freerun is allowed, False otherwise
"""
@property
def valid_rotation_modes(self) -> list[str]:
"""Method to get the valid rotation modes for the implemented motor.
Returns:
list: List of strings with valid rotation modes
"""
def apply_mod360(self) -> None:
"""Method to apply the modulus 360 operation on the specific device.
Childrens should override this method
"""
@runtime_checkable
class BECEventProtocol(Protocol):
"""Protocol for events in BEC.
This is a first draft for the event protocol introduced throughout BEC.
It needs to be review and extended before it can be used in production.
"""
_callbacks: dict[dict]
@property
def event_types(self) -> tuple[str]:
"""Event types property"""
def _run_subs(self, sub_type: str, **kwargs):
"""Run subscriptions for the event.
Args:
sub_type: Subscription type
kwargs: Keyword arguments
"""
def subscribe(self, callback: callable, event_type: str = None, run: bool = True):
"""Subscribe to the event.
Args:
callback (callable) : Callback function
The expected callback structure is:
def cb(*args, obj:OphydObject, sub_type:str, **kwargs) -> None:
pass
event_type (str) : Event type, if None it defaults to obj._default_sub
This maps to sub_type in _run_subs
run (bool) : If true, run the callback directly.
Returns:
cid (int): Callback id
"""
def clear_sub(self, cb: callable, event_type: str = None):
"""Clear subscription, given the origianl callback fucntion
Args:
cb (callable) : Callback
event_type (str): Event type, if None it will be remove from all event_types
"""
def unsubscribe(self, cid: int):
"""Unsubscribe from the event.
Args:
cid (int): Callback id
"""

View File

@ -0,0 +1,107 @@
from abc import ABC, abstractmethod
from bec_lib import bec_logger
from ophyd import Component as Cpt
from ophyd import EpicsMotor
from typeguard import typechecked
from ophyd_devices.epics.devices.ophyd_base_devices.bec_protocols import (
BECRotationProtocol,
)
from ophyd_devices.utils.bec_utils import ConfigSignal
logger = bec_logger.logger
class OphtyRotationBaseError(Exception):
"""Exception specific for implmenetation of rotation stages."""
class OphydRotationBase(BECRotationProtocol, ABC):
allow_mod360 = Cpt(ConfigSignal, name="allow_mod360", value=False, kind="config")
def __init__(self, *args, **kwargs):
"""
Base class to implement functionality specific for rotation devices.
Childrens should override the instance attributes:
- has_mod360
- has_freerun
- valid_rotation_modes
"""
# pylint: disable=protected-access
self._has_mod360 = False
self._has_freerun = False
self._valid_rotation_modes = []
if "allow_mod360" in kwargs:
if not isinstance(kwargs["allow_mod360"], bool):
raise ValueError("allow_mod360 must be a boolean")
self.allow_mod360.put(kwargs["allow_mod360"])
super().__init__(*args, **kwargs)
@abstractmethod
def apply_mod360(self) -> None:
"""Method to apply the modulus 360 operation on the specific device.
Childrens should override this method
"""
@property
def has_mod360(self) -> bool:
"""Property to check if the device has mod360 operation.
ReadOnly property, childrens should override this method.
"""
return self._has_mod360
@property
def has_freerun(self) -> bool:
"""Property to check if the device has freerun operation.
ReadOnly property, childrens should override this method.
"""
return self._has_freerun
@property
def valid_rotation_modes(self) -> list:
"""Method to get the valid rotation modes for the specific device."""
return self._valid_rotation_modes
@typechecked
@valid_rotation_modes.setter
def valid_rotation_modes(self, value: list[str]):
"""Method to set the valid rotation modes for the specific device."""
self._valid_rotation_modes = value
return self._valid_rotation_modes
# pylint: disable=too-many-ancestors
class EpicsRotationBase(OphydRotationBase, EpicsMotor):
"""Class for Epics rotation devices."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._has_freerun = True
self._has_freerun = True
self._valid_rotation_modes = ["target", "radiography"]
def apply_mod360(self) -> None:
"""Apply modulos 360 operation for EpicsMotorRecord.
EpicsMotor has the function "set_current_position" which can be used for this purpose.
In addition, there is a check if mod360 is allowed and available.
"""
if self.has_mod360 and self.allow_mod360.get():
cur_val = self.user_readback.get()
new_val = cur_val % 360
try:
self.set_current_position(new_val)
except Exception as exc:
error_msg = f"Failed to set new position {new_val} from {cur_val} on device {self.name} with error {exc}"
raise OphtyRotationBaseError(error_msg) from exc
return
logger.info(
f"Did not apply mod360 for device {self.name} with has_mod={self.has_mod360} and allow_mod={self.allow_mod360.get()}"
)

View File

@ -0,0 +1,181 @@
""" Module for Tomcat rotation motors.
The following classes implement the rotation motors for:
- AerotechAutomation1 (Tomcat), based on EpicsMotorIOC.
"""
import threading
import time
import numpy as np
from bec_lib import threadlocked
from ophyd import DeviceStatus
from ophyd_devices.epics.devices.ophyd_base_devices.bec_protocols import (
BECFlyerProtocol,
BECScanProtocol,
)
from ophyd_devices.epics.devices.ophyd_base_devices.ophyd_rotation_base import (
EpicsRotationBase,
)
class TomcatAerotechRotation(EpicsRotationBase, BECFlyerProtocol, BECScanProtocol):
"""Special motor class that provides flyer interface and progress bar."""
SUB_PROGRESS = "progress"
def __init__(
self,
prefix="",
*,
name,
kind=None,
read_attrs=None,
configuration_attrs=None,
parent=None,
**kwargs,
):
"""Implementation of the Tomcat AerotechAutomation 1 rotation motor class.
This motor class is based on EpicsRotationBase and provides in addition the flyer interface for BEC
and a progress update.
"""
super().__init__(
prefix=prefix,
name=name,
kind=kind,
read_attrs=read_attrs,
configuration_attrs=configuration_attrs,
parent=parent,
**kwargs,
)
self._start_position = None
self._target_position = None
self._stopped = False
self._rlock = threading.RLock()
self.subscribe(self._progress_update, run=False)
# ------------------ alternative to using configure method --------------------- #
@property
def start_position(self) -> float:
"""Get the start position."""
return self._start_position
@start_position.setter
def start_position(self, value: float) -> None:
"""Set the start position."""
self._start_position = value
@property
def target_position(self) -> float:
"""Get the start position."""
return self._target_position
@target_position.setter
def target_position(self, value: float) -> None:
"""Set the start position."""
self._target_position = value
# ------------------ alternative to using configure method --------------------- #
def configure(self, d: dict) -> dict:
"""Configure method from the device.
This method is usually used to set configuration parameters for the device.
Args:
d (dict): Dictionary with configuration parameters.
"""
if "target" in d:
self._target_position = d["target"]
del d["target"]
if "position" in d:
self._target_position = d["position"]
del d["position"]
return super().configure(d)
def pre_scan(self):
"""Perform pre-scan operation, e.g. move to start position."""
if self._start_position:
self.move(self._start_position, wait=True)
def kickoff(self) -> DeviceStatus:
"""Kickoff the scan.
The kickoff method should return a status object that is set to finish once the flyer flys, and is ready for the next actions.
I would consider the following implementation.
"""
self._start_position = float(self.position)
self.move(self._target_position, wait=False)
status = DeviceStatus(self)
status.set_finished()
return status
def complete(self) -> DeviceStatus:
"""Complete method of the scan.
This will be called in a fly scan after the kickoff, thus, the stage will be moving to it's target position.
It should
The complete method should return a status object that is set to finish once the flyer is done and the scan is complete.
I would consider the following implementation.
"""
threading.Thread(target=self._is_motor_moving, daemon=True).start()
status = DeviceStatus(self)
self.subscribe(status.set_finished, event_type=self.SUB_DONE, run=False)
return status
def stage(self) -> list[object]:
"""Stage the scan.
We add here in addition the setting of the _stopped flag to False for the thread.
"""
self._stopped = False
return super().stage()
def stop(self, success: bool = False) -> None:
"""Stop the scan.
If the device is stopped, the _stopped flag is set to True.
"""
self._stopped = True
super().stop(success=success)
@threadlocked
def _is_motor_moving(self):
"""Function to check if the motor is moving.
This function is used in a thread to check if the motor is moving.
It resolves by running"""
while self.motor_done_move.get():
if self._stopped:
self._done_moving(success=False)
return
time.sleep(0.1)
self._done_moving(success=True)
def _progress_update(self, value, **kwargs) -> None:
"""Progress update on the scan"""
if (self._start_position is None) or (self._target_position is None) or (not self.moving):
self._run_subs(
sub_type=self.SUB_PROGRESS,
value=1,
max_value=1,
done=1,
)
return
progress = np.abs(
(value - self._start_position) / (self._target_position - self._start_position)
)
max_value = 100
self._run_subs(
sub_type=self.SUB_PROGRESS,
value=int(100 * progress),
max_value=max_value,
done=int(np.isclose(max_value, progress, 1e-3)),
)