feat: add pseudo motor

This commit is contained in:
2026-03-12 12:47:14 +01:00
committed by Christian Appel
parent 2c17e75e3f
commit fdc91e4aff
3 changed files with 517 additions and 0 deletions
+1
View File
@@ -20,6 +20,7 @@ from .devices.psi_motor import EpicsMotor, EpicsMotorEC, EpicsUserMotorVME
from .devices.softpositioner import SoftPositioner
from .interfaces.base_classes.psi_device_base import PSIDeviceBase
from .utils.bec_device_base import BECDeviceBase
from .utils.bec_processed_signal import *
from .utils.bec_signals import *
from .utils.dynamic_pseudo import ComputedSignal
from .utils.psi_device_base_utils import *
+276
View File
@@ -0,0 +1,276 @@
"""Module for virtual slit center implementation."""
from __future__ import annotations
from typing import TYPE_CHECKING
from ophyd_devices.interfaces.base_classes.psi_pseudo_motor_base import PSIPseudoMotorBase
if TYPE_CHECKING: # pragma: no cover
from bec_lib.devicemanager import DeviceManagerBase
from ophyd import PositionerBase, Signal
class VirtualSlitCenter(PSIPseudoMotorBase):
"""
Virtual slit center implementation. It expects the left and right slit positioner names to be passed
as arguments. The named devices must be positioners and available in the device_manager. In addition,
it must have a readback (user_readback), setpoint (user_setpoint) and motor_is_moving signal.
Args:
name (str): The name of the virtual slit center.
left_slit (str): The name of the left slit positioner in the device manager.
right_slit (str): The name of the right slit positioner in the device manager.
device_manager (DeviceManagerBase): The device manager to use for connecting to the positioners.
"""
def __init__(
self,
name: str,
left_slit: str,
right_slit: str,
device_manager: DeviceManagerBase,
offset: float = 0,
**kwargs,
):
positioners = self.get_positioner_objects(
name=name,
positioners={"left": left_slit, "right": right_slit},
device_manager=device_manager,
)
self._offset = offset
super().__init__(
name=name, device_manager=device_manager, positioners=positioners, **kwargs
)
def _get_pos_motor(self, motor: PositionerBase) -> float:
"""
Helper method to get the position of a motor.
Args:
motor (PositionerBase): The motor to get the position of.
"""
return motor.read()[motor.name]["value"]
# pylint: disable=arguments-differ
def forward_calculation(self, left: Signal, right: Signal) -> float:
"""
Forward calculation to compute the value for the pseudo motor readback
and setpoint based on the position of the left and right slit.
Args:
left (Signal): The left slit positioner signal.
right (Signal): The right slit positioner signal.
Returns:
float: The center position of the slit.
"""
left_pos = left.get()
right_pos = right.get()
center = (left_pos + right_pos) / 2 + self._offset
return float(center)
def inverse_calculation(self, position: float, left: Signal, right: Signal) -> dict[str, float]:
"""
Inverse calculation to compute the position of the left and right slit based on the center position.
Args:
center (float): The center position of the slit.
left (Signal): The left slit positioner signal.
right (Signal): The right slit positioner signal.
Returns:
dict[str, float]: The positions of the left and right slit.
"""
position_with_offset = position - self._offset
# To access position, run read on the root (PositionerBase) of the signal
left_pos = self._get_pos_motor(left.root)
right_pos = self._get_pos_motor(right.root)
width = right_pos - left_pos
new_left_pos = position_with_offset - width / 2
new_right_pos = position_with_offset + width / 2
return {"left": new_left_pos, "right": new_right_pos}
def motors_are_moving(self, left: Signal, right: Signal) -> int:
"""
Calculate whether the motors are moving based on the motor_is_moving signal of the left and right slit.
Args:
left (Signal): The left slit positioner signal.
right (Signal): The right slit positioner signal.
Returns:
int: 1 if either motor is moving, 0 otherwise.
"""
left_moving = left.get()
right_moving = right.get()
return int(left_moving or right_moving)
class VirtualSlitWidth(PSIPseudoMotorBase):
def __init__(
self,
name: str,
left_slit: str,
right_slit: str,
device_manager: DeviceManagerBase,
**kwargs,
):
positioners = self.get_positioner_objects(
name=name,
positioners={"left": left_slit, "right": right_slit},
device_manager=device_manager,
)
super().__init__(
name=name, device_manager=device_manager, positioners=positioners, **kwargs
)
def _get_pos_motor(self, motor: PositionerBase) -> float:
"""
Helper method to get the position of a motor.
Args:
motor (PositionerBase): The motor to get the position of.
"""
return motor.read()[motor.name]["value"]
# pylint: disable=arguments-differ
def forward_calculation(self, left: Signal, right: Signal) -> float:
"""
Forward calculation to compute the value for the pseudo motor readback
and setpoint based on the position of the left and right slit.
Args:
left (Signal): The left slit positioner signal.
right (Signal): The right slit positioner signal.
Returns:
float: The center position of the slit.
"""
left_pos = left.get()
right_pos = right.get()
width = right_pos - left_pos
return float(width)
def inverse_calculation(self, position: float, left: Signal, right: Signal) -> dict[str, float]:
"""
Inverse calculation to compute the position of the left and right slit based on the center position.
Args:
position (float): The center position of the slit.
Returns:
dict[str, float]: The positions of the left and right slit.
"""
left_pos = self._get_pos_motor(left.root)
right_pos = self._get_pos_motor(right.root)
center = (left_pos + right_pos) / 2
width = right_pos - left_pos
new_right_pos = center + width / 2
new_left_pos = center - width / 2
return {"left": new_left_pos, "right": new_right_pos}
def motors_are_moving(self, left: Signal, right: Signal) -> int:
"""
Calculate whether the motors are moving based on the motor_is_moving signal of the left and right slit.
Args:
left (Signal): The left slit positioner signal.
right (Signal): The right slit positioner signal.
Returns:
int: 1 if either motor is moving, 0 otherwise.
"""
left_moving = left.get()
right_moving = right.get()
return int(left_moving or right_moving)
if __name__ == "__main__": # pragma: no cover
from ophyd import Component as Cpt
from ophyd_devices.sim.sim_positioner import SimPositioner
class TestPseudoMotor(PSIPseudoMotorBase):
motor_a = Cpt(SimPositioner, name="motor_a")
motor_b = Cpt(SimPositioner, name="motor_b")
def __init__(self, name: str, device_manager: DeviceManagerBase, **kwargs):
super().__init__(name=name, device_manager=device_manager, **kwargs)
positioners = {"a": self.motor_a, "b": self.motor_b}
self.set_positioner_objects(positioners)
def _get_pos_motor(self, motor: PositionerBase) -> float:
return motor.readback.get()
def forward_calculation(self, a: Signal, b: Signal) -> float:
return float(a.get() + b.get())
def inverse_calculation(self, value: float, a: Signal, b: Signal) -> dict[str, float]:
a_val = self._get_pos_motor(a.root)
b_val = value - a_val
return {"a": a_val, "b": b_val}
def motors_are_moving(self, a: Signal, b: Signal) -> int:
return int(a.get() or b.get())
import time
from bec_server.device_server.tests.utils import DMMock
dm = DMMock()
samx = SimPositioner(name="samx")
samx.velocity.set(0.5)
samy = SimPositioner(name="samy")
samy.velocity.set(0.5)
dm.devices._add_device("samx", samx)
dm.devices._add_device("samy", samy)
# Fix the current session config to include the needs for the slit center device
setattr(
dm, "current_session", {"devices": [{"name": "slit_center", "needs": ["samx", "samy"]}]}
)
slit_center = VirtualSlitCenter(
name="slit_center", device_manager=dm, left_slit="samx", right_slit="samy"
)
test = TestPseudoMotor(name="test_pseudo", device_manager=dm)
for dev in [samx, samy, slit_center, test]:
dev.wait_for_connection()
test.motor_a.move(5).wait()
test.motor_b.move(-5).wait()
print(test.read())
test.move(2).wait()
print(test.read())
samx.tolerance.put(0.01)
samy.tolerance.put(0.01)
samx.move(1).wait()
samy.move(3).wait()
samx.velocity.set(1)
samy.velocity.set(1)
print(slit_center.read())
print(samx.read())
print(samy.read())
st = slit_center.move(3)
start_time = time.time()
while not st.done:
if (time.time() - start_time) > 10:
break
time.sleep(0.3)
print(f"Running move, status done: {st.done}")
print("Move completed or timed out.")
print(slit_center.read())
print(samx.read())
print(samy.read())
print("Done!!")
@@ -0,0 +1,240 @@
""" """
from __future__ import annotations
import inspect
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING, Any, Callable
from ophyd import Component as Cpt
from ophyd import Kind, PositionerBase
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
from ophyd_devices.utils.bec_processed_signal import BECProcessedSignal, ProcessedSignalModel
from ophyd_devices.utils.psi_device_base_utils import AndStatus, StatusBase
if TYPE_CHECKING: # pragma: no cover
from bec_server.device_server.devices.devicemanager import DeviceManagerDS
class PSIPseudoMotorBase(ABC, PSIDeviceBase, PositionerBase):
"""
Base class for PSI pseudo motors.
Args:
name (str): The name of the pseudo motor.
device_manager (DeviceManagerDS): The device manager to use for connecting to the positioners.
positioners (dict[str, str]): A dictionary mapping positioner names to device names in the device manager.
Keys of this dictionary must match the arguments of the forward_calculation,
inverse_calculation and motors_are_moving methods. The values must be the names
of the devices in the device manager that correspond to the positioners.
"""
readback = Cpt(BECProcessedSignal, name="readback", model={}, kind=Kind.hinted)
setpoint = Cpt(BECProcessedSignal, name="setpoint", model={}, kind=Kind.normal)
motor_is_moving = Cpt(BECProcessedSignal, name="motor_is_moving", model={}, kind=Kind.omitted)
def __init__(
self,
name: str,
device_manager: DeviceManagerDS,
positioners: dict[str, PositionerBase] | None = None,
egu: str = "",
**kwargs,
):
self.positioner_objects = positioners or {}
self._positioner_move_kwargs: dict[str, dict[str, Any]] = {}
self._egu = egu
super().__init__(name=name, device_manager=device_manager, **kwargs)
@property
def egu(self) -> str:
"""Engineering units for the pseudo motor. This can be set during initialization or by setting the egu attribute."""
return self._egu
def set_positioner_objects(self, positioners: dict[str, PositionerBase]) -> None:
"""
Method to set the positioner objects after initialization. This can be used if the positioner objects are not available at the time of initialization.
Args:
positioners (dict[str, PositionerBase]): A dictionary mapping positioner names to positioner objects.
"""
self.positioner_objects = positioners
def wait_for_connection(self, *args, **kwargs) -> None:
"""Connect to relevant positioners, setup processed signals."""
if not self.positioner_objects:
raise ConnectionError(
f"No positioners specified for pseudo motor {self.name}. Please use 'set_positioner_objects' or pass positioner objects during initialization."
)
# Check if all methods have the required signature that matches the positioner_objects keys
self._check_method_signatures()
self._setup_pseudo_signal(
"readback", ["readback", "user_readback"], self.forward_calculation, return_type=float
)
self._setup_pseudo_signal(
"setpoint", ["setpoint", "user_setpoint"], self.forward_calculation, return_type=float
)
self._setup_pseudo_signal(
"motor_is_moving", ["motor_is_moving"], self.motors_are_moving, return_type=int
)
return super().wait_for_connection(*args, **kwargs)
def _check_method_signatures(self) -> None:
"""
Method to check that the forward_calculation, inverse_calculation and motors_are_moving methods
have the required signature that matches the positioner_objects keys.
"""
input_names = set(self.positioner_objects.keys())
for method in [self.forward_calculation, self.inverse_calculation, self.motors_are_moving]:
signature = inspect.signature(method)
method_param_names = set(signature.parameters.keys())
for param in input_names:
if param not in method_param_names:
raise TypeError(
f"Method '{method.__name__}' has parameter '{param}' that does not match any of the positioner names specified in positioner_objects: {method_param_names}."
)
def _setup_pseudo_signal(
self,
pseudo_attr: str,
allowed_attributes: list[str],
compute_method: Callable[..., float],
return_type: type,
):
"""
Setup a pseudo signal with the given compute method and return type. The compute method will be called with
the values of the positioners as arguments. The allowed_attributes are used to determine which signal of the
positioner to use as input for the compute method. The first attribute that is found in the positioner will be used.
Args:
pseudo_attr (str): The attribute of the pseudo motor to set the model for.
allowed_attributes (list[str]): The attributes of the positioner to look for as input for the compute method.
compute_method (Callable[..., float]): The method to compute the value of the pseudo signal.
return_type (type): The return type of the compute method.
"""
device_objects = {}
dotted_names = {}
pseudo_attr_obj: BECProcessedSignal = getattr(self, pseudo_attr)
for name, positioner in self.positioner_objects.items():
obj = None
device_name = positioner.name
for attr in allowed_attributes:
if hasattr(positioner, attr):
obj = getattr(positioner, attr)
break
if obj is None:
raise AttributeError(
f"Positioner '{name}' does not have any of the allowed attributes: {allowed_attributes}."
)
dotted_names[name] = f"{device_name}.{obj.name}"
device_objects[name] = obj
model = ProcessedSignalModel(
devices=dotted_names, compute_method=compute_method, return_type=return_type
)
pseudo_attr_obj.set_device_object(device_objects)
pseudo_attr_obj.set_model(model)
pseudo_attr_obj.wait_for_connection()
def get_positioner_objects(
self, name: str, positioners: dict[str, str], device_manager: DeviceManagerDS
) -> dict[str, PositionerBase]:
"""
Helper method to get the positioner objects from the device manager based on a positioner dictionary.
Args:
name (str): The name of the pseudo motor device to look for in the device manager config.
positioners (dict[str, str]): A dictionary mapping positioner names to device names in the device manager.
device_manager (DeviceManagerDS): The device manager to use for connecting to the positioners.
Returns:
dict[str, PositionerBase]: A dictionary mapping positioner names to positioner objects.
"""
positioner_objs = {}
# First we check that the device config of this device specifies
# the relevant positioners as needs
config = self._find_device_config_in_session(name, device_manager)
needs = config.get("needs", [])
for name, device_name in positioners.items():
if device_name not in needs:
raise ConnectionError(
f"Device '{name}' requires positioner device '{device_name}' to be specified in list of 'needs' in the device config."
)
try:
device = device_manager.devices[device_name]
except KeyError:
raise ConnectionError(f"Device '{device_name}' not found in device manager.")
if not hasattr(device, "move"):
raise AttributeError(f"Device '{device_name}' does not have a 'move' method.")
required_attrs = [
("readback", "user_readback"),
("setpoint", "user_setpoint"),
("motor_is_moving",),
]
if not all(
any([hasattr(device, attr) for attr in attr_tuple]) for attr_tuple in required_attrs
):
raise AttributeError(
f"Device '{device_name}' must have at least one argument for each tuple in the following list of tuples: {required_attrs}."
)
positioner_objs[name] = device
move_signature = inspect.signature(device.move)
if "wait" in move_signature.parameters:
self._positioner_move_kwargs[name] = {"wait": False}
return positioner_objs
def _find_device_config_in_session(
self, device_name: str, device_manager: DeviceManagerDS
) -> dict[str, Any]:
"""
Helper method to find the device config for a given device name in the current session config of the device manager.
"""
configs = device_manager.current_session["devices"]
config = None
for conf in configs:
if conf["name"] == device_name:
config = conf
break
if config is None:
raise ConnectionError(f"Device '{device_name}' not found in current session config.")
return config
@abstractmethod
def forward_calculation(self, *args) -> float:
"""Calculate the pseudo motor value based on the positioner values."""
@abstractmethod
def inverse_calculation(self, position: float, **positioner_objects) -> dict[str, float]:
"""Calculate the positioner values based on the pseudo motor value."""
@abstractmethod
def motors_are_moving(self, *args) -> int:
"""Calculate whether the motors are moving based on the positioner values. Should be 0 or 1."""
# pylint: disable=arguments-differ
def move(self, position: float, **kwargs) -> StatusBase:
"""
Move method for the pseudo motor. This currently only supports moving all positioners
at once. If children want to implement more complex move logic, they can override this method
based on this implementation and the inverse_calculation method.
Args:
position (float): The position to move the pseudo motor to.
kwargs: The kwargs to pass to the move method of the positioners.
"""
self.check_value(position)
status = None
motor_positions = self.inverse_calculation(position, **self.positioner_objects)
for name, pos in motor_positions.items():
positioner = self.positioner_objects[name]
move_kwargs = self._positioner_move_kwargs.get(name, {})
move_kwargs.update(kwargs)
st = positioner.move(pos, **move_kwargs)
if status is None:
status = st
else:
status = AndStatus(status, st) # Combine Status objects
return status