mirror of
https://github.com/bec-project/ophyd_devices.git
synced 2025-07-10 02:38:04 +02:00
refactor: cleanup aerotech, fix packaging for release
This commit is contained in:
@ -1,40 +1,17 @@
|
|||||||
from ophyd import Device, Component, EpicsMotor, EpicsSignal, EpicsSignalRO, Kind, DerivedSignal
|
|
||||||
from ophyd.status import Status, SubscriptionStatus, StatusBase, DeviceStatus
|
|
||||||
from ophyd.flyers import FlyerInterface
|
|
||||||
from time import sleep
|
|
||||||
import warnings
|
|
||||||
import numpy as np
|
|
||||||
import time
|
import time
|
||||||
|
|
||||||
try:
|
|
||||||
from .AerotechAutomation1Enums import *
|
|
||||||
from .AerotechAutomation1Enums import (
|
|
||||||
DataCollectionMode,
|
|
||||||
DataCollectionFrequency,
|
|
||||||
AxisDataSignal,
|
|
||||||
PsoWindowInput,
|
|
||||||
DriveDataCaptureInput,
|
|
||||||
DriveDataCaptureTrigger,
|
|
||||||
TaskDataSignal,
|
|
||||||
SystemDataSignal,
|
|
||||||
TomcatSequencerState,
|
|
||||||
)
|
|
||||||
except:
|
|
||||||
from AerotechAutomation1Enums import *
|
|
||||||
from AerotechAutomation1Enums import (
|
|
||||||
DataCollectionMode,
|
|
||||||
DataCollectionFrequency,
|
|
||||||
AxisDataSignal,
|
|
||||||
PsoWindowInput,
|
|
||||||
DriveDataCaptureInput,
|
|
||||||
DriveDataCaptureTrigger,
|
|
||||||
TaskDataSignal,
|
|
||||||
SystemDataSignal,
|
|
||||||
TomcatSequencerState,
|
|
||||||
)
|
|
||||||
|
|
||||||
from typing import Union
|
|
||||||
from collections import OrderedDict
|
from collections import OrderedDict
|
||||||
|
from time import sleep
|
||||||
|
|
||||||
|
import numpy as np
|
||||||
|
from ophyd import Component, Device, EpicsMotor, EpicsSignal, EpicsSignalRO, Kind
|
||||||
|
from ophyd.status import DeviceStatus, Status, StatusBase, SubscriptionStatus
|
||||||
|
|
||||||
|
from ophyd_devices.epics.devices.aerotech.AerotechAutomation1Enums import (
|
||||||
|
DataCollectionFrequency,
|
||||||
|
DataCollectionMode,
|
||||||
|
DriveDataCaptureInput,
|
||||||
|
DriveDataCaptureTrigger,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class EpicsMotorX(EpicsMotor):
|
class EpicsMotorX(EpicsMotor):
|
||||||
@ -1148,32 +1125,32 @@ class aa1AxisPsoDistance(aa1AxisPsoBase):
|
|||||||
print("PSO kicked off")
|
print("PSO kicked off")
|
||||||
return status
|
return status
|
||||||
|
|
||||||
def complete(self) -> DeviceStatus:
|
# def complete(self) -> DeviceStatus:
|
||||||
"""Bluesky flyer interface"""
|
# """Bluesky flyer interface"""
|
||||||
# Array mode waits until the buffer is empty
|
# # Array mode waits until the buffer is empty
|
||||||
if hasattr(self, "_distanceValue") and isinstance(
|
# if hasattr(self, "_distanceValue") and isinstance(
|
||||||
self._distanceValue, (np.ndarray, list, tuple)
|
# self._distanceValue, (np.ndarray, list, tuple)
|
||||||
):
|
# ):
|
||||||
# Define wait until the busy flag goes down (excluding initial update)
|
# # Define wait until the busy flag goes down (excluding initial update)
|
||||||
timestamp_ = 0
|
# timestamp_ = 0
|
||||||
|
|
||||||
def notRunning(*args, old_value, value, timestamp, **kwargs):
|
# def notRunning(*args, old_value, value, timestamp, **kwargs):
|
||||||
nonlocal timestamp_
|
# nonlocal timestamp_
|
||||||
result = False if (timestamp_ == 0) else bool(int(value) & 0x1000)
|
# result = False if (timestamp_ == 0) else bool(int(value) & 0x1000)
|
||||||
print(f"Old {old_value}\tNew: {value}\tResult: {result}")
|
# print(f"Old {old_value}\tNew: {value}\tResult: {result}")
|
||||||
timestamp_ = timestamp
|
# timestamp_ = timestamp
|
||||||
return result
|
# return result
|
||||||
|
|
||||||
# Subscribe and wait for update
|
# # Subscribe and wait for update
|
||||||
# status = SubscriptionStatus(self.status, notRunning, settle_time=0.5)
|
# # status = SubscriptionStatus(self.status, notRunning, settle_time=0.5)
|
||||||
# Data capture can be stopped any time
|
# # Data capture can be stopped any time
|
||||||
status = DeviceStatus(self)
|
# status = DeviceStatus(self)
|
||||||
status.set_finished()
|
# status.set_finished()
|
||||||
else:
|
# else:
|
||||||
# In distance trigger mode there's no specific goal
|
# # In distance trigger mode there's no specific goal
|
||||||
status = DeviceStatus(self)
|
# status = DeviceStatus(self)
|
||||||
status.set_finished()
|
# status.set_finished()
|
||||||
return status
|
# return status
|
||||||
|
|
||||||
def describe_collect(self) -> OrderedDict:
|
def describe_collect(self) -> OrderedDict:
|
||||||
ret = OrderedDict()
|
ret = OrderedDict()
|
||||||
@ -1434,10 +1411,8 @@ class aa1AxisDriveDataCollection(Device):
|
|||||||
|
|
||||||
# Bluesky step scanning interface
|
# Bluesky step scanning interface
|
||||||
def stage(self, settle_time=0.1):
|
def stage(self, settle_time=0.1):
|
||||||
super().stage()
|
|
||||||
self._switch.set("Start", settle_time=0.5).wait()
|
self._switch.set("Start", settle_time=0.5).wait()
|
||||||
status = Status(timeout=0.1, settle_time=settle_time).set_finished()
|
return super().stage()
|
||||||
return status
|
|
||||||
|
|
||||||
def unstage(self, settle_time=0.1):
|
def unstage(self, settle_time=0.1):
|
||||||
self._switch.set("Stop", settle_time=settle_time).wait()
|
self._switch.set("Stop", settle_time=settle_time).wait()
|
||||||
|
@ -1,499 +0,0 @@
|
|||||||
""" This module provides a range of protocols that describe the expected interface for different types of devices.
|
|
||||||
|
|
||||||
The protocols below can be used as teamplates for functionality to be implemeted by different type of devices.
|
|
||||||
They further facilitate runtime checks on devices and provide a minimum set of properties required for a device to be loadable by BEC.
|
|
||||||
|
|
||||||
The protocols are:
|
|
||||||
- BECDeviceProtocol: Protocol for devices in BEC. All devices must at least implement this protocol.
|
|
||||||
- BECSignalProtocol: Protocol for signals.
|
|
||||||
- BECScanProtocol: Protocol for the scan interface.
|
|
||||||
- BECMixinProtocol: Protocol for utilities in particular relevant for detector implementations.
|
|
||||||
- BECPositionerProtocol: Protocol for positioners.
|
|
||||||
- BECFlyerProtocol: Protocol with for flyers.
|
|
||||||
|
|
||||||
Keep in mind, that a device of type flyer should generally also implement the BECScanProtocol that provides the required functionality for scans.
|
|
||||||
Flyers in addition, also implement the BECFlyerProtocol. Similarly, positioners should also implement the BECScanProtocol and BECPositionerProtocol.
|
|
||||||
|
|
||||||
"""
|
|
||||||
|
|
||||||
from typing import Protocol, runtime_checkable
|
|
||||||
|
|
||||||
from bec_lib.file_utils import FileWriterMixin
|
|
||||||
from ophyd import Component, DeviceStatus, Kind, Staged
|
|
||||||
|
|
||||||
from ophyd_devices.utils import bec_scaninfo_mixin
|
|
||||||
|
|
||||||
|
|
||||||
@runtime_checkable
|
|
||||||
class BECDeviceProtocol(Protocol):
|
|
||||||
"""Protocol for ophyd objects with zero functionality."""
|
|
||||||
|
|
||||||
_destroyed: bool
|
|
||||||
|
|
||||||
@property
|
|
||||||
def name(self) -> str:
|
|
||||||
"""name property"""
|
|
||||||
|
|
||||||
@name.setter
|
|
||||||
def name(self, value: str) -> None:
|
|
||||||
"""name setter"""
|
|
||||||
|
|
||||||
@property
|
|
||||||
def kind(self) -> Kind:
|
|
||||||
"""kind property"""
|
|
||||||
|
|
||||||
@kind.setter
|
|
||||||
def kind(self, value: Kind):
|
|
||||||
"""kind setter"""
|
|
||||||
|
|
||||||
@property
|
|
||||||
def parent(self):
|
|
||||||
"""Property to find the parent device"""
|
|
||||||
|
|
||||||
@property
|
|
||||||
def root(self):
|
|
||||||
"""Property to fint the root device"""
|
|
||||||
|
|
||||||
@property
|
|
||||||
def hints(self) -> dict:
|
|
||||||
"""hints property"""
|
|
||||||
|
|
||||||
@property
|
|
||||||
def connected(self) -> bool:
|
|
||||||
"""connected property.
|
|
||||||
Check if signals are connected
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
bool: True if connected, False otherwise
|
|
||||||
"""
|
|
||||||
|
|
||||||
@connected.setter
|
|
||||||
def connected(self, value: bool):
|
|
||||||
"""connected setter"""
|
|
||||||
|
|
||||||
def read(self) -> dict:
|
|
||||||
"""read method
|
|
||||||
|
|
||||||
Override by child class with read method
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
dict: Dictionary with nested dictionary of signals with kind.normal or kind.hinted:
|
|
||||||
{'signal_name' : {'value' : .., "timestamp" : ..}, ...}
|
|
||||||
"""
|
|
||||||
|
|
||||||
def read_configuration(self) -> dict:
|
|
||||||
"""read_configuration method
|
|
||||||
|
|
||||||
Override by child class with read_configuration method
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
dict: Dictionary with nested dictionary of signals with kind.config:
|
|
||||||
{'signal_name' : {'value' : .., "timestamp" : ..}, ...}
|
|
||||||
"""
|
|
||||||
|
|
||||||
def describe(self) -> dict:
|
|
||||||
"""describe method
|
|
||||||
|
|
||||||
Override by child class with describe method
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
dict: Dictionary with dictionaries with signal descriptions ('source', 'dtype', 'shape')
|
|
||||||
"""
|
|
||||||
|
|
||||||
def describe_configuration(self) -> dict:
|
|
||||||
"""describe method
|
|
||||||
|
|
||||||
Includes all signals of type Kind.config.
|
|
||||||
Override by child class with describe_configuration method
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
dict: Dictionary with dictionaries with signal descriptions ('source', 'dtype', 'shape')
|
|
||||||
"""
|
|
||||||
|
|
||||||
def destroy(self) -> None:
|
|
||||||
"""Destroy method.
|
|
||||||
|
|
||||||
_destroyed must be set to True after calling destroy.
|
|
||||||
"""
|
|
||||||
|
|
||||||
|
|
||||||
@runtime_checkable
|
|
||||||
class BECSignalProtocol(Protocol):
|
|
||||||
"""Protocol for BEC signals with zero functionality.
|
|
||||||
|
|
||||||
This protocol adds the specific implementation for a signal.
|
|
||||||
Please be aware that a signal must also implement BECDeviceProtocol.
|
|
||||||
|
|
||||||
Note: Currently the implementation of the protocol is not taking into account the
|
|
||||||
event_model from ophyd, i.e. _run_sbus
|
|
||||||
"""
|
|
||||||
|
|
||||||
@property
|
|
||||||
def limits(self) -> tuple[float, float]:
|
|
||||||
"""Limits property for signals.
|
|
||||||
If low_limit == high_limit, it is equivalent to NO limits!
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
tuple: Tuple with lower and upper limits
|
|
||||||
"""
|
|
||||||
|
|
||||||
@property
|
|
||||||
def high_limit(self) -> float:
|
|
||||||
"""High limit property for signals.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
float: Upper limit
|
|
||||||
"""
|
|
||||||
|
|
||||||
@property
|
|
||||||
def low_limit(self) -> float:
|
|
||||||
"""Low limit property for signals.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
float: Lower limit
|
|
||||||
"""
|
|
||||||
|
|
||||||
@property
|
|
||||||
def write_access(self) -> bool:
|
|
||||||
"""Write access method for signals.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
bool: True if write access is allowed, False otherwise
|
|
||||||
"""
|
|
||||||
|
|
||||||
def check_value(self, value: float):
|
|
||||||
"""Check whether value is within limits
|
|
||||||
|
|
||||||
Args:
|
|
||||||
value: value to check
|
|
||||||
|
|
||||||
Raises:
|
|
||||||
LimitError in case the requested motion is not inside of limits.
|
|
||||||
"""
|
|
||||||
|
|
||||||
def trigger(self) -> DeviceStatus:
|
|
||||||
"""Trigger method for signals.
|
|
||||||
This method can be used to trigger the signal readout.
|
|
||||||
For EpicsSignal, the readback value is typically set to auto_monitor=True,
|
|
||||||
which means their readback value is updated automatically from the IOC.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
DeviceStatus: DeviceStatus object
|
|
||||||
"""
|
|
||||||
|
|
||||||
def put(self, value: any, force: bool = False, timeout: float = None):
|
|
||||||
"""Put method for signals.
|
|
||||||
This method should resolve immediately and not block.
|
|
||||||
If not force, the method checks if the value is within limits using check_value.
|
|
||||||
|
|
||||||
|
|
||||||
Args:
|
|
||||||
value (any) : value to put
|
|
||||||
force (bool) : Flag to force the put and ignore limits
|
|
||||||
timeout (float) : Timeout for the put
|
|
||||||
"""
|
|
||||||
|
|
||||||
def set(self, value: any, timeout: float = None) -> DeviceStatus:
|
|
||||||
"""Set method for signals.
|
|
||||||
This method should be blocking until the set is completed.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
value (any) : value to set
|
|
||||||
timeout (float) : Timeout for the set
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
DeviceStatus : DeviceStatus object that will finish upon return
|
|
||||||
"""
|
|
||||||
|
|
||||||
|
|
||||||
@runtime_checkable
|
|
||||||
class BECScanProtocol(BECDeviceProtocol, Protocol):
|
|
||||||
"""Protocol for devices offering an Protocol with all relevant functionality for scans.
|
|
||||||
|
|
||||||
In BEC, scans typically follow the order of stage, (pre_scan), trigger, unstage.
|
|
||||||
Stop should be used to interrupt a scan. Be aware that pre_scan is optional and therefor
|
|
||||||
part of the BECMixinProtocol, typically useful for more complex devices such as detectors.
|
|
||||||
|
|
||||||
This protocol allows to perform runtime checks on devices of ophyd.
|
|
||||||
It is the minimum set of properties required for a device to be loadable by BEC.
|
|
||||||
"""
|
|
||||||
|
|
||||||
_staged: Staged
|
|
||||||
"""Staged property to indicate if the device is staged."""
|
|
||||||
|
|
||||||
def stage(self) -> list[object]:
|
|
||||||
"""Stage method to prepare the device for an upcoming acquistion.
|
|
||||||
|
|
||||||
This prepares a device for an upcoming acquisition, i.e. it is the first
|
|
||||||
method for which the scan parameters are known and the device can be configured.
|
|
||||||
|
|
||||||
It can be used to move scan_motors to their start position
|
|
||||||
or also prepare DAQ systems for the upcoming measurement.
|
|
||||||
We can further publish the file location for DAQ systems
|
|
||||||
to BEC and inform BEC's file writer where data will be written to.
|
|
||||||
|
|
||||||
Stagin is not idempoent. If called twice without an unstage it should raise.
|
|
||||||
For ophyd devices, one may used self._staged = True to check if the device is staged.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
list: List of objects that were staged, i.e. [self]
|
|
||||||
For devices with inheritance from ophyd, return
|
|
||||||
return super().stage() in the child class.
|
|
||||||
"""
|
|
||||||
|
|
||||||
def unstage(self) -> list[object]:
|
|
||||||
"""Unstage method to cleanup after the acquisition.
|
|
||||||
|
|
||||||
It can also be used to implement checks whether the acquisition was successful,
|
|
||||||
inform BEC that the file has been succesfully written, or raise upon receiving
|
|
||||||
feedback that the scan did not finish successful.
|
|
||||||
|
|
||||||
Unstaging is not idempotent. If called twice without a stage it should raise.
|
|
||||||
It is recommended to return super().unstage() in the child class, if
|
|
||||||
the child class also inherits from ophyd repository.
|
|
||||||
"""
|
|
||||||
|
|
||||||
def stop(self, success: bool) -> None:
|
|
||||||
"""Stop method to stop the device.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
success: Flag to indicate if the scan was successful or not.
|
|
||||||
|
|
||||||
This method should be called to stop the device. It is recommended to call
|
|
||||||
super().stop(success=success) if class inherits from ophyd repository.
|
|
||||||
"""
|
|
||||||
|
|
||||||
def trigger(self) -> DeviceStatus:
|
|
||||||
"""Trigger method on the device
|
|
||||||
|
|
||||||
Returns ophyd DeviceStatus object, which is used to track the status of the trigger.
|
|
||||||
It can also be blocking until the trigger is completed, and return the status object
|
|
||||||
with set_finished() method called on the DeviceStatus.
|
|
||||||
"""
|
|
||||||
|
|
||||||
|
|
||||||
@runtime_checkable
|
|
||||||
class BECMixinProtocol(Protocol):
|
|
||||||
"""Protocol that offers BEC specific utility functionality for detectors."""
|
|
||||||
|
|
||||||
USER_ACCESS: list[str]
|
|
||||||
"""
|
|
||||||
List of methods/properties that will be exposed to the client interface in addition
|
|
||||||
to the the already exposed signals, methods and properties.
|
|
||||||
"""
|
|
||||||
|
|
||||||
scaninfo: bec_scaninfo_mixin
|
|
||||||
"""
|
|
||||||
BEC scan info mixin class that provides an transparent Protocol to scan parameter
|
|
||||||
as provided by BEC. It is recommended to use this Protocol to retrieve scaninfo from Redis.
|
|
||||||
"""
|
|
||||||
|
|
||||||
stopped: bool
|
|
||||||
"""
|
|
||||||
Flag to indicate if the device is stopped.
|
|
||||||
|
|
||||||
The stop method should set this flag to True, and i.e. stage to set it to False.
|
|
||||||
"""
|
|
||||||
|
|
||||||
filewriter: FileWriterMixin
|
|
||||||
"""
|
|
||||||
The file writer mixin main purpose is to unify and centralize the creation of
|
|
||||||
file paths within BEC. Therefore, we recommend devices to use the same mixin for creation of paths.
|
|
||||||
"""
|
|
||||||
|
|
||||||
def pre_scan(self):
|
|
||||||
"""Pre-scan method is called from BEC right before executing scancore, thus
|
|
||||||
right before the start of an acquisition.
|
|
||||||
|
|
||||||
It can be used to trigger time critical functions from the device, which
|
|
||||||
are prone to run into timeouts in case called too early.
|
|
||||||
"""
|
|
||||||
|
|
||||||
|
|
||||||
@runtime_checkable
|
|
||||||
class BECPositionerProtocol(BECScanProtocol, Protocol):
|
|
||||||
"""Protocol with functionality specific for positioners in BEC."""
|
|
||||||
|
|
||||||
@property
|
|
||||||
def limits(self) -> tuple[float, float]:
|
|
||||||
"""Limits property for positioners.
|
|
||||||
For an EpicsMotor, BEC will automatically recover the limits from the IOC.
|
|
||||||
|
|
||||||
If not set, it returns (0,0).
|
|
||||||
Note, low_limit = high_limit is equivalent to NO limits!
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
tuple: Tuple with lower and upper limits
|
|
||||||
"""
|
|
||||||
|
|
||||||
@property
|
|
||||||
def low_limit(self) -> float:
|
|
||||||
"""Low limit property for positioners.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
float: Lower limit
|
|
||||||
"""
|
|
||||||
|
|
||||||
@property
|
|
||||||
def high_limit(self) -> float:
|
|
||||||
"""High limit property for positioners.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
float: Upper limit
|
|
||||||
"""
|
|
||||||
|
|
||||||
def check_value(self, value: float):
|
|
||||||
"""Check whether value is within limits
|
|
||||||
|
|
||||||
Args:
|
|
||||||
value: value to check
|
|
||||||
|
|
||||||
Raises:
|
|
||||||
LimitError in case the requested motion is not inside of limits.
|
|
||||||
"""
|
|
||||||
|
|
||||||
def move(self, position: float) -> DeviceStatus:
|
|
||||||
"""Move method for positioners.
|
|
||||||
The returned DeviceStatus is marked as done once the positioner has reached the target position.
|
|
||||||
DeviceStatus.wait() can be used to block until the move is completed.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
position: position to move to
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
DeviceStatus: DeviceStatus object
|
|
||||||
"""
|
|
||||||
|
|
||||||
def stop(self, success: bool) -> None:
|
|
||||||
"""Stop method for positioners.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
success: Flag to indicate if the scan was successful or not.
|
|
||||||
"""
|
|
||||||
|
|
||||||
|
|
||||||
@runtime_checkable
|
|
||||||
class BECFlyerProtocol(BECScanProtocol, Protocol):
|
|
||||||
"""Protocol with functionality specific for flyers in BEC."""
|
|
||||||
|
|
||||||
def configure(self, d: dict):
|
|
||||||
"""Configure method of the flyer.
|
|
||||||
It is an optional method, but does not need to be implemented by a flyer.
|
|
||||||
Instead, stage can be used to prepare time critical operations on the device in preparation of a scan.
|
|
||||||
|
|
||||||
Method to configure the flyer in preparation of a scan.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
d (dict): Dictionary with configuration parameters, i.e. key value pairs of signal_name : value
|
|
||||||
"""
|
|
||||||
|
|
||||||
def kickoff(self) -> DeviceStatus:
|
|
||||||
"""Kickoff method for flyers.
|
|
||||||
|
|
||||||
The returned DeviceStatus is marked as done once the flyer start flying,
|
|
||||||
i.e. is ready to be triggered.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
DeviceStatus: DeviceStatus object
|
|
||||||
"""
|
|
||||||
|
|
||||||
def complete(self) -> DeviceStatus:
|
|
||||||
"""Complete method for flyers.
|
|
||||||
|
|
||||||
The returned DeviceStatus is marked as done once the flyer has completed.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
DeviceStatus: DeviceStatus object
|
|
||||||
"""
|
|
||||||
|
|
||||||
|
|
||||||
@runtime_checkable
|
|
||||||
class BECRotationProtocol(Protocol):
|
|
||||||
"""Protocol which defines functionality for a tomography stage for ophyd devices"""
|
|
||||||
|
|
||||||
allow_mod360: Component
|
|
||||||
"""Signal to define whether mod360 operations are allowed. """
|
|
||||||
|
|
||||||
@property
|
|
||||||
def has_mod360(self) -> bool:
|
|
||||||
"""Property to check if the motor has mod360 option
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
bool: True if mod360 is possible on device, False otherwise
|
|
||||||
"""
|
|
||||||
|
|
||||||
@property
|
|
||||||
def has_freerun(self) -> bool:
|
|
||||||
"""Property to check if the motor has freerun option
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
bool: True if freerun is allowed, False otherwise
|
|
||||||
"""
|
|
||||||
|
|
||||||
@property
|
|
||||||
def valid_rotation_modes(self) -> list[str]:
|
|
||||||
"""Method to get the valid rotation modes for the implemented motor.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
list: List of strings with valid rotation modes
|
|
||||||
"""
|
|
||||||
|
|
||||||
def apply_mod360(self) -> None:
|
|
||||||
"""Method to apply the modulus 360 operation on the specific device.
|
|
||||||
|
|
||||||
Childrens should override this method
|
|
||||||
"""
|
|
||||||
|
|
||||||
|
|
||||||
@runtime_checkable
|
|
||||||
class BECEventProtocol(Protocol):
|
|
||||||
"""Protocol for events in BEC.
|
|
||||||
|
|
||||||
This is a first draft for the event protocol introduced throughout BEC.
|
|
||||||
It needs to be review and extended before it can be used in production.
|
|
||||||
"""
|
|
||||||
|
|
||||||
_callbacks: dict[dict]
|
|
||||||
|
|
||||||
@property
|
|
||||||
def event_types(self) -> tuple[str]:
|
|
||||||
"""Event types property"""
|
|
||||||
|
|
||||||
def _run_subs(self, sub_type: str, **kwargs):
|
|
||||||
"""Run subscriptions for the event.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
sub_type: Subscription type
|
|
||||||
kwargs: Keyword arguments
|
|
||||||
"""
|
|
||||||
|
|
||||||
def subscribe(self, callback: callable, event_type: str = None, run: bool = True):
|
|
||||||
"""Subscribe to the event.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
callback (callable) : Callback function
|
|
||||||
The expected callback structure is:
|
|
||||||
def cb(*args, obj:OphydObject, sub_type:str, **kwargs) -> None:
|
|
||||||
pass
|
|
||||||
event_type (str) : Event type, if None it defaults to obj._default_sub
|
|
||||||
This maps to sub_type in _run_subs
|
|
||||||
run (bool) : If true, run the callback directly.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
cid (int): Callback id
|
|
||||||
"""
|
|
||||||
|
|
||||||
def clear_sub(self, cb: callable, event_type: str = None):
|
|
||||||
"""Clear subscription, given the origianl callback fucntion
|
|
||||||
|
|
||||||
Args:
|
|
||||||
cb (callable) : Callback
|
|
||||||
event_type (str): Event type, if None it will be remove from all event_types
|
|
||||||
"""
|
|
||||||
|
|
||||||
def unsubscribe(self, cid: int):
|
|
||||||
"""Unsubscribe from the event.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
cid (int): Callback id
|
|
||||||
"""
|
|
@ -1,107 +0,0 @@
|
|||||||
from abc import ABC, abstractmethod
|
|
||||||
|
|
||||||
from bec_lib import bec_logger
|
|
||||||
from ophyd import Component as Cpt
|
|
||||||
from ophyd import EpicsMotor
|
|
||||||
from typeguard import typechecked
|
|
||||||
|
|
||||||
from ophyd_devices.epics.devices.ophyd_base_devices.bec_protocols import (
|
|
||||||
BECRotationProtocol,
|
|
||||||
)
|
|
||||||
from ophyd_devices.utils.bec_utils import ConfigSignal
|
|
||||||
|
|
||||||
logger = bec_logger.logger
|
|
||||||
|
|
||||||
|
|
||||||
class OphtyRotationBaseError(Exception):
|
|
||||||
"""Exception specific for implmenetation of rotation stages."""
|
|
||||||
|
|
||||||
|
|
||||||
class OphydRotationBase(BECRotationProtocol, ABC):
|
|
||||||
|
|
||||||
allow_mod360 = Cpt(ConfigSignal, name="allow_mod360", value=False, kind="config")
|
|
||||||
|
|
||||||
def __init__(self, *args, **kwargs):
|
|
||||||
"""
|
|
||||||
Base class to implement functionality specific for rotation devices.
|
|
||||||
|
|
||||||
Childrens should override the instance attributes:
|
|
||||||
- has_mod360
|
|
||||||
- has_freerun
|
|
||||||
- valid_rotation_modes
|
|
||||||
|
|
||||||
"""
|
|
||||||
# pylint: disable=protected-access
|
|
||||||
self._has_mod360 = False
|
|
||||||
self._has_freerun = False
|
|
||||||
self._valid_rotation_modes = []
|
|
||||||
if "allow_mod360" in kwargs:
|
|
||||||
if not isinstance(kwargs["allow_mod360"], bool):
|
|
||||||
raise ValueError("allow_mod360 must be a boolean")
|
|
||||||
self.allow_mod360.put(kwargs["allow_mod360"])
|
|
||||||
super().__init__(*args, **kwargs)
|
|
||||||
|
|
||||||
@abstractmethod
|
|
||||||
def apply_mod360(self) -> None:
|
|
||||||
"""Method to apply the modulus 360 operation on the specific device.
|
|
||||||
|
|
||||||
Childrens should override this method
|
|
||||||
"""
|
|
||||||
|
|
||||||
@property
|
|
||||||
def has_mod360(self) -> bool:
|
|
||||||
"""Property to check if the device has mod360 operation.
|
|
||||||
|
|
||||||
ReadOnly property, childrens should override this method.
|
|
||||||
"""
|
|
||||||
return self._has_mod360
|
|
||||||
|
|
||||||
@property
|
|
||||||
def has_freerun(self) -> bool:
|
|
||||||
"""Property to check if the device has freerun operation.
|
|
||||||
|
|
||||||
ReadOnly property, childrens should override this method.
|
|
||||||
"""
|
|
||||||
return self._has_freerun
|
|
||||||
|
|
||||||
@property
|
|
||||||
def valid_rotation_modes(self) -> list:
|
|
||||||
"""Method to get the valid rotation modes for the specific device."""
|
|
||||||
return self._valid_rotation_modes
|
|
||||||
|
|
||||||
@typechecked
|
|
||||||
@valid_rotation_modes.setter
|
|
||||||
def valid_rotation_modes(self, value: list[str]):
|
|
||||||
"""Method to set the valid rotation modes for the specific device."""
|
|
||||||
self._valid_rotation_modes = value
|
|
||||||
return self._valid_rotation_modes
|
|
||||||
|
|
||||||
|
|
||||||
# pylint: disable=too-many-ancestors
|
|
||||||
class EpicsRotationBase(OphydRotationBase, EpicsMotor):
|
|
||||||
"""Class for Epics rotation devices."""
|
|
||||||
|
|
||||||
def __init__(self, *args, **kwargs):
|
|
||||||
super().__init__(*args, **kwargs)
|
|
||||||
self._has_freerun = True
|
|
||||||
self._has_freerun = True
|
|
||||||
self._valid_rotation_modes = ["target", "radiography"]
|
|
||||||
|
|
||||||
def apply_mod360(self) -> None:
|
|
||||||
"""Apply modulos 360 operation for EpicsMotorRecord.
|
|
||||||
|
|
||||||
EpicsMotor has the function "set_current_position" which can be used for this purpose.
|
|
||||||
In addition, there is a check if mod360 is allowed and available.
|
|
||||||
"""
|
|
||||||
if self.has_mod360 and self.allow_mod360.get():
|
|
||||||
cur_val = self.user_readback.get()
|
|
||||||
new_val = cur_val % 360
|
|
||||||
try:
|
|
||||||
self.set_current_position(new_val)
|
|
||||||
except Exception as exc:
|
|
||||||
error_msg = f"Failed to set new position {new_val} from {cur_val} on device {self.name} with error {exc}"
|
|
||||||
raise OphtyRotationBaseError(error_msg) from exc
|
|
||||||
return
|
|
||||||
logger.info(
|
|
||||||
f"Did not apply mod360 for device {self.name} with has_mod={self.has_mod360} and allow_mod={self.allow_mod360.get()}"
|
|
||||||
)
|
|
@ -1,181 +0,0 @@
|
|||||||
""" Module for Tomcat rotation motors.
|
|
||||||
|
|
||||||
The following classes implement the rotation motors for:
|
|
||||||
|
|
||||||
- AerotechAutomation1 (Tomcat), based on EpicsMotorIOC.
|
|
||||||
|
|
||||||
"""
|
|
||||||
|
|
||||||
import threading
|
|
||||||
import time
|
|
||||||
|
|
||||||
import numpy as np
|
|
||||||
from bec_lib import threadlocked
|
|
||||||
from ophyd import DeviceStatus
|
|
||||||
|
|
||||||
from ophyd_devices.epics.devices.ophyd_base_devices.bec_protocols import (
|
|
||||||
BECFlyerProtocol,
|
|
||||||
BECScanProtocol,
|
|
||||||
)
|
|
||||||
from ophyd_devices.epics.devices.ophyd_base_devices.ophyd_rotation_base import (
|
|
||||||
EpicsRotationBase,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
class TomcatAerotechRotation(EpicsRotationBase, BECFlyerProtocol, BECScanProtocol):
|
|
||||||
"""Special motor class that provides flyer interface and progress bar."""
|
|
||||||
|
|
||||||
SUB_PROGRESS = "progress"
|
|
||||||
|
|
||||||
def __init__(
|
|
||||||
self,
|
|
||||||
prefix="",
|
|
||||||
*,
|
|
||||||
name,
|
|
||||||
kind=None,
|
|
||||||
read_attrs=None,
|
|
||||||
configuration_attrs=None,
|
|
||||||
parent=None,
|
|
||||||
**kwargs,
|
|
||||||
):
|
|
||||||
"""Implementation of the Tomcat AerotechAutomation 1 rotation motor class.
|
|
||||||
|
|
||||||
This motor class is based on EpicsRotationBase and provides in addition the flyer interface for BEC
|
|
||||||
and a progress update.
|
|
||||||
"""
|
|
||||||
super().__init__(
|
|
||||||
prefix=prefix,
|
|
||||||
name=name,
|
|
||||||
kind=kind,
|
|
||||||
read_attrs=read_attrs,
|
|
||||||
configuration_attrs=configuration_attrs,
|
|
||||||
parent=parent,
|
|
||||||
**kwargs,
|
|
||||||
)
|
|
||||||
self._start_position = None
|
|
||||||
self._target_position = None
|
|
||||||
self._stopped = False
|
|
||||||
self._rlock = threading.RLock()
|
|
||||||
self.subscribe(self._progress_update, run=False)
|
|
||||||
|
|
||||||
# ------------------ alternative to using configure method --------------------- #
|
|
||||||
@property
|
|
||||||
def start_position(self) -> float:
|
|
||||||
"""Get the start position."""
|
|
||||||
return self._start_position
|
|
||||||
|
|
||||||
@start_position.setter
|
|
||||||
def start_position(self, value: float) -> None:
|
|
||||||
"""Set the start position."""
|
|
||||||
self._start_position = value
|
|
||||||
|
|
||||||
@property
|
|
||||||
def target_position(self) -> float:
|
|
||||||
"""Get the start position."""
|
|
||||||
return self._target_position
|
|
||||||
|
|
||||||
@target_position.setter
|
|
||||||
def target_position(self, value: float) -> None:
|
|
||||||
"""Set the start position."""
|
|
||||||
self._target_position = value
|
|
||||||
|
|
||||||
# ------------------ alternative to using configure method --------------------- #
|
|
||||||
|
|
||||||
def configure(self, d: dict) -> dict:
|
|
||||||
"""Configure method from the device.
|
|
||||||
|
|
||||||
This method is usually used to set configuration parameters for the device.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
d (dict): Dictionary with configuration parameters.
|
|
||||||
|
|
||||||
"""
|
|
||||||
if "target" in d:
|
|
||||||
self._target_position = d["target"]
|
|
||||||
del d["target"]
|
|
||||||
if "position" in d:
|
|
||||||
self._target_position = d["position"]
|
|
||||||
del d["position"]
|
|
||||||
return super().configure(d)
|
|
||||||
|
|
||||||
def pre_scan(self):
|
|
||||||
"""Perform pre-scan operation, e.g. move to start position."""
|
|
||||||
if self._start_position:
|
|
||||||
self.move(self._start_position, wait=True)
|
|
||||||
|
|
||||||
def kickoff(self) -> DeviceStatus:
|
|
||||||
"""Kickoff the scan.
|
|
||||||
|
|
||||||
The kickoff method should return a status object that is set to finish once the flyer flys, and is ready for the next actions.
|
|
||||||
I would consider the following implementation.
|
|
||||||
"""
|
|
||||||
self._start_position = float(self.position)
|
|
||||||
self.move(self._target_position, wait=False)
|
|
||||||
status = DeviceStatus(self)
|
|
||||||
status.set_finished()
|
|
||||||
return status
|
|
||||||
|
|
||||||
def complete(self) -> DeviceStatus:
|
|
||||||
"""Complete method of the scan.
|
|
||||||
|
|
||||||
This will be called in a fly scan after the kickoff, thus, the stage will be moving to it's target position.
|
|
||||||
It should
|
|
||||||
|
|
||||||
The complete method should return a status object that is set to finish once the flyer is done and the scan is complete.
|
|
||||||
I would consider the following implementation.
|
|
||||||
"""
|
|
||||||
threading.Thread(target=self._is_motor_moving, daemon=True).start()
|
|
||||||
status = DeviceStatus(self)
|
|
||||||
self.subscribe(status.set_finished, event_type=self.SUB_DONE, run=False)
|
|
||||||
return status
|
|
||||||
|
|
||||||
def stage(self) -> list[object]:
|
|
||||||
"""Stage the scan.
|
|
||||||
|
|
||||||
We add here in addition the setting of the _stopped flag to False for the thread.
|
|
||||||
"""
|
|
||||||
self._stopped = False
|
|
||||||
return super().stage()
|
|
||||||
|
|
||||||
def stop(self, success: bool = False) -> None:
|
|
||||||
"""Stop the scan.
|
|
||||||
|
|
||||||
If the device is stopped, the _stopped flag is set to True.
|
|
||||||
"""
|
|
||||||
self._stopped = True
|
|
||||||
super().stop(success=success)
|
|
||||||
|
|
||||||
@threadlocked
|
|
||||||
def _is_motor_moving(self):
|
|
||||||
"""Function to check if the motor is moving.
|
|
||||||
|
|
||||||
This function is used in a thread to check if the motor is moving.
|
|
||||||
It resolves by running"""
|
|
||||||
while self.motor_done_move.get():
|
|
||||||
if self._stopped:
|
|
||||||
self._done_moving(success=False)
|
|
||||||
return
|
|
||||||
time.sleep(0.1)
|
|
||||||
self._done_moving(success=True)
|
|
||||||
|
|
||||||
def _progress_update(self, value, **kwargs) -> None:
|
|
||||||
"""Progress update on the scan"""
|
|
||||||
if (self._start_position is None) or (self._target_position is None) or (not self.moving):
|
|
||||||
self._run_subs(
|
|
||||||
sub_type=self.SUB_PROGRESS,
|
|
||||||
value=1,
|
|
||||||
max_value=1,
|
|
||||||
done=1,
|
|
||||||
)
|
|
||||||
return
|
|
||||||
|
|
||||||
progress = np.abs(
|
|
||||||
(value - self._start_position) / (self._target_position - self._start_position)
|
|
||||||
)
|
|
||||||
max_value = 100
|
|
||||||
self._run_subs(
|
|
||||||
sub_type=self.SUB_PROGRESS,
|
|
||||||
value=int(100 * progress),
|
|
||||||
max_value=max_value,
|
|
||||||
done=int(np.isclose(max_value, progress, 1e-3)),
|
|
||||||
)
|
|
Reference in New Issue
Block a user