From f9dea63b1b2b89195d8812c0c211f687c43f2dc4 Mon Sep 17 00:00:00 2001 From: David Perl Date: Mon, 23 Jun 2025 15:44:46 +0200 Subject: [PATCH] wip feat: make simple positioner with config signals --- ophyd_devices/devices/test_pv_pos.py | 9 ++ .../base_classes/psi_positioner_base.py | 133 ++++++++++++------ tests/positioner_test_ioc.py | 19 +++ tests/test_psi_positioner.py | 16 +++ 4 files changed, 134 insertions(+), 43 deletions(-) create mode 100644 ophyd_devices/devices/test_pv_pos.py create mode 100644 tests/positioner_test_ioc.py create mode 100644 tests/test_psi_positioner.py diff --git a/ophyd_devices/devices/test_pv_pos.py b/ophyd_devices/devices/test_pv_pos.py new file mode 100644 index 0000000..248bbb3 --- /dev/null +++ b/ophyd_devices/devices/test_pv_pos.py @@ -0,0 +1,9 @@ +from ophyd import EpicsSignal, PVPositioner +from ophyd.device import Component as Cpt + + +class PvTestPositioner(PVPositioner): + setpoint = Cpt(EpicsSignal, suffix="SET") + readback = Cpt(EpicsSignal, suffix="RBV") + stop_signal = Cpt(EpicsSignal, suffix="STOP") + done = Cpt(EpicsSignal, suffix="DONE") diff --git a/ophyd_devices/interfaces/base_classes/psi_positioner_base.py b/ophyd_devices/interfaces/base_classes/psi_positioner_base.py index cb3818c..a0cea0f 100644 --- a/ophyd_devices/interfaces/base_classes/psi_positioner_base.py +++ b/ophyd_devices/interfaces/base_classes/psi_positioner_base.py @@ -1,11 +1,17 @@ from abc import ABC +from typing import NotRequired, TypedDict -from ophyd.device import Device +from ophyd import Component as Cpt +from ophyd import Device from ophyd.positioner import PositionerBase from ophyd.signal import EpicsSignalBase -_OPTIONAL_SIGNAL = object() -_REQUIRED_SIGNAL = object() + +class _SignalSentinel(object): ... + + +_OPTIONAL_SIGNAL = _SignalSentinel() +_REQUIRED_SIGNAL = _SignalSentinel() _SIGNAL_NOT_AVAILABLE = "Signal not available" @@ -18,37 +24,92 @@ class RequiredSignalNotSpecified(PSIPositionerException): ... class OptionalSignalNotSpecified(PSIPositionerException): ... -_SIGNAL_NAMES = { - "user_readback", - "user_setpoint", - "user_offset", - "user_offset_dir", - "offset_freeze_switch", - "set_use_switch", - "velocity", - "acceleration", - "motor_egu", - "motor_is_moving", - "motor_done_move", - "high_limit_switch", - "low_limit_switch", - "high_limit_travel", - "low_limit_travel", - "direction_of_travel", - "motor_stop", - "home_forward", - "home_reverse", - "tolerated_alarm", -} +class SimplePositionerSignals(TypedDict): + user_readback: NotRequired[str] + user_setpoint: NotRequired[str] + motor_done_move: NotRequired[str] + velocity: NotRequired[str] + motor_stop: NotRequired[str] -class PSIPositionerBase(ABC, Device, PositionerBase): +_SIMPLE_SIGNAL_NAMES = SimplePositionerSignals.__optional_keys__ + + +class PositionerSignals(SimplePositionerSignals): + user_offset: NotRequired[str] + user_offset_dir: NotRequired[str] + offset_freeze_switch: NotRequired[str] + set_use_switch: NotRequired[str] + acceleration: NotRequired[str] + motor_egu: NotRequired[str] + motor_is_moving: NotRequired[str] + high_limit_switch: NotRequired[str] + low_limit_switch: NotRequired[str] + high_limit_travel: NotRequired[str] + low_limit_travel: NotRequired[str] + direction_of_travel: NotRequired[str] + home_forward: NotRequired[str] + home_reverse: NotRequired[str] + tolerated_alarm: NotRequired[str] + + +_SIGNAL_NAMES = PositionerSignals.__optional_keys__ + + +class PSISimplePositionerBase(ABC, Device, PositionerBase): + """Base class for simple positioners.""" + + SIGNAL_NAMES = _SIMPLE_SIGNAL_NAMES + + user_readback: EpicsSignalBase = _REQUIRED_SIGNAL + user_setpoint: EpicsSignalBase = _OPTIONAL_SIGNAL + velocity: EpicsSignalBase = _OPTIONAL_SIGNAL + motor_stop: EpicsSignalBase = _OPTIONAL_SIGNAL + motor_done_move: EpicsSignalBase = _OPTIONAL_SIGNAL + + def __init__( + self, + prefix="", + *, + name, + read_attrs=None, + configuration_attrs=None, + parent=None, + override_suffixes: SimplePositionerSignals = {}, + **kwargs, + ): + super().__init__( + prefix=prefix, + read_attrs=read_attrs, + configuration_attrs=configuration_attrs, + name=name, + parent=parent, + **kwargs, + ) + + if (missing := self._remaining_defaults(_REQUIRED_SIGNAL)) != set(): + raise RequiredSignalNotSpecified(f"Signal(s) {missing} must be defined in a subclass") + + not_implemented = self._remaining_defaults(_OPTIONAL_SIGNAL) + for signal_name, pv_suffix in override_suffixes.items(): + if signal_name not in not_implemented: + component: Cpt[EpicsSignalBase] = getattr(self.__class__, signal_name) + signal: EpicsSignalBase = getattr(self, signal_name) + signal.__init__(prefix + pv_suffix, **component.kwargs) + + # Make the default alias for the user_readback the name of the motor itself + # for compatibility with EpicsMotor + self.user_readback.name = self.name + + def _remaining_defaults(self, attr: _SignalSentinel) -> set[str]: + return set(filter(lambda s: getattr(self, s) is attr, self.SIGNAL_NAMES)) + + +class PSIPositionerBase(PSISimplePositionerBase): """Base class for positioners which are similar to a motor but do not implement all the required signals for an EpicsMotor or have different PV suffices.""" - # position - user_readback = _REQUIRED_SIGNAL - user_setpoint = _REQUIRED_SIGNAL + SIGNAL_NAMES = _SIGNAL_NAMES # calibration dial <-> user user_offset = _OPTIONAL_SIGNAL @@ -57,13 +118,11 @@ class PSIPositionerBase(ABC, Device, PositionerBase): set_use_switch = _OPTIONAL_SIGNAL # configuration - velocity = _OPTIONAL_SIGNAL acceleration = _OPTIONAL_SIGNAL motor_egu = _OPTIONAL_SIGNAL # motor status motor_is_moving = _OPTIONAL_SIGNAL - motor_done_move = _OPTIONAL_SIGNAL high_limit_switch = _OPTIONAL_SIGNAL low_limit_switch = _OPTIONAL_SIGNAL high_limit_travel = _OPTIONAL_SIGNAL @@ -71,7 +130,6 @@ class PSIPositionerBase(ABC, Device, PositionerBase): direction_of_travel = _OPTIONAL_SIGNAL # commands - motor_stop = _OPTIONAL_SIGNAL home_forward = _OPTIONAL_SIGNAL home_reverse = _OPTIONAL_SIGNAL @@ -102,14 +160,3 @@ class PSIPositionerBase(ABC, Device, PositionerBase): connection_timeout=connection_timeout, **kwargs, ) - - not_implemented = { - signal for signal in _SIGNAL_NAMES if getattr(self, signal) is _REQUIRED_SIGNAL - } - if not_implemented != set(): - raise RequiredSignalNotSpecified( - f"Signal(s) {not_implemented} must be defined in a subclass" - ) - # Make the default alias for the user_readback the name of the motor itself - # for compatibility with EpicsMotor - self.user_readback.name = self.name diff --git a/tests/positioner_test_ioc.py b/tests/positioner_test_ioc.py new file mode 100644 index 0000000..f4d0f6d --- /dev/null +++ b/tests/positioner_test_ioc.py @@ -0,0 +1,19 @@ +#!/usr/bin/env python3 +from textwrap import dedent + +from caproto.server import PVGroup, ioc_arg_parser, pvproperty, run + + +class PositionerIOCTest(PVGroup): + """""" + + SETPOINT = pvproperty(value=2.0, doc="A float") + READBACK = pvproperty(value=2.0, doc="A float") + + +if __name__ == "__main__": + ioc_options, run_options = ioc_arg_parser( + default_prefix="SIM:", desc=dedent(PositionerIOCTest.__doc__) + ) + ioc = PositionerIOCTest(**ioc_options) + run(ioc.pvdb, **run_options) diff --git a/tests/test_psi_positioner.py b/tests/test_psi_positioner.py new file mode 100644 index 0000000..5c9817a --- /dev/null +++ b/tests/test_psi_positioner.py @@ -0,0 +1,16 @@ +import caproto.ioc_examples.simple +from ophyd import Component as Cpt +from ophyd import EpicsSignal + +from ophyd_devices.interfaces.base_classes.psi_positioner_base import PSISimplePositionerBase + + +class PvTestPositioner(PSISimplePositionerBase): + user_readback = Cpt(EpicsSignal, suffix="READ") + + +def test_simple_positioner_init(): + device = PvTestPositioner("SIM:", name="test", override_suffixes={"user_readback": "READBACK"}) + device.user_readback.wait_for_connection() + reading = device.user_readback.read() + assert reading