mirror of
https://github.com/bec-project/ophyd_devices.git
synced 2025-06-23 19:27:59 +02:00
wip feat: make simple positioner with config signals
This commit is contained in:
9
ophyd_devices/devices/test_pv_pos.py
Normal file
9
ophyd_devices/devices/test_pv_pos.py
Normal file
@ -0,0 +1,9 @@
|
||||
from ophyd import EpicsSignal, PVPositioner
|
||||
from ophyd.device import Component as Cpt
|
||||
|
||||
|
||||
class PvTestPositioner(PVPositioner):
|
||||
setpoint = Cpt(EpicsSignal, suffix="SET")
|
||||
readback = Cpt(EpicsSignal, suffix="RBV")
|
||||
stop_signal = Cpt(EpicsSignal, suffix="STOP")
|
||||
done = Cpt(EpicsSignal, suffix="DONE")
|
@ -1,11 +1,17 @@
|
||||
from abc import ABC
|
||||
from typing import NotRequired, TypedDict
|
||||
|
||||
from ophyd.device import Device
|
||||
from ophyd import Component as Cpt
|
||||
from ophyd import Device
|
||||
from ophyd.positioner import PositionerBase
|
||||
from ophyd.signal import EpicsSignalBase
|
||||
|
||||
_OPTIONAL_SIGNAL = object()
|
||||
_REQUIRED_SIGNAL = object()
|
||||
|
||||
class _SignalSentinel(object): ...
|
||||
|
||||
|
||||
_OPTIONAL_SIGNAL = _SignalSentinel()
|
||||
_REQUIRED_SIGNAL = _SignalSentinel()
|
||||
_SIGNAL_NOT_AVAILABLE = "Signal not available"
|
||||
|
||||
|
||||
@ -18,37 +24,92 @@ class RequiredSignalNotSpecified(PSIPositionerException): ...
|
||||
class OptionalSignalNotSpecified(PSIPositionerException): ...
|
||||
|
||||
|
||||
_SIGNAL_NAMES = {
|
||||
"user_readback",
|
||||
"user_setpoint",
|
||||
"user_offset",
|
||||
"user_offset_dir",
|
||||
"offset_freeze_switch",
|
||||
"set_use_switch",
|
||||
"velocity",
|
||||
"acceleration",
|
||||
"motor_egu",
|
||||
"motor_is_moving",
|
||||
"motor_done_move",
|
||||
"high_limit_switch",
|
||||
"low_limit_switch",
|
||||
"high_limit_travel",
|
||||
"low_limit_travel",
|
||||
"direction_of_travel",
|
||||
"motor_stop",
|
||||
"home_forward",
|
||||
"home_reverse",
|
||||
"tolerated_alarm",
|
||||
}
|
||||
class SimplePositionerSignals(TypedDict):
|
||||
user_readback: NotRequired[str]
|
||||
user_setpoint: NotRequired[str]
|
||||
motor_done_move: NotRequired[str]
|
||||
velocity: NotRequired[str]
|
||||
motor_stop: NotRequired[str]
|
||||
|
||||
|
||||
class PSIPositionerBase(ABC, Device, PositionerBase):
|
||||
_SIMPLE_SIGNAL_NAMES = SimplePositionerSignals.__optional_keys__
|
||||
|
||||
|
||||
class PositionerSignals(SimplePositionerSignals):
|
||||
user_offset: NotRequired[str]
|
||||
user_offset_dir: NotRequired[str]
|
||||
offset_freeze_switch: NotRequired[str]
|
||||
set_use_switch: NotRequired[str]
|
||||
acceleration: NotRequired[str]
|
||||
motor_egu: NotRequired[str]
|
||||
motor_is_moving: NotRequired[str]
|
||||
high_limit_switch: NotRequired[str]
|
||||
low_limit_switch: NotRequired[str]
|
||||
high_limit_travel: NotRequired[str]
|
||||
low_limit_travel: NotRequired[str]
|
||||
direction_of_travel: NotRequired[str]
|
||||
home_forward: NotRequired[str]
|
||||
home_reverse: NotRequired[str]
|
||||
tolerated_alarm: NotRequired[str]
|
||||
|
||||
|
||||
_SIGNAL_NAMES = PositionerSignals.__optional_keys__
|
||||
|
||||
|
||||
class PSISimplePositionerBase(ABC, Device, PositionerBase):
|
||||
"""Base class for simple positioners."""
|
||||
|
||||
SIGNAL_NAMES = _SIMPLE_SIGNAL_NAMES
|
||||
|
||||
user_readback: EpicsSignalBase = _REQUIRED_SIGNAL
|
||||
user_setpoint: EpicsSignalBase = _OPTIONAL_SIGNAL
|
||||
velocity: EpicsSignalBase = _OPTIONAL_SIGNAL
|
||||
motor_stop: EpicsSignalBase = _OPTIONAL_SIGNAL
|
||||
motor_done_move: EpicsSignalBase = _OPTIONAL_SIGNAL
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
prefix="",
|
||||
*,
|
||||
name,
|
||||
read_attrs=None,
|
||||
configuration_attrs=None,
|
||||
parent=None,
|
||||
override_suffixes: SimplePositionerSignals = {},
|
||||
**kwargs,
|
||||
):
|
||||
super().__init__(
|
||||
prefix=prefix,
|
||||
read_attrs=read_attrs,
|
||||
configuration_attrs=configuration_attrs,
|
||||
name=name,
|
||||
parent=parent,
|
||||
**kwargs,
|
||||
)
|
||||
|
||||
if (missing := self._remaining_defaults(_REQUIRED_SIGNAL)) != set():
|
||||
raise RequiredSignalNotSpecified(f"Signal(s) {missing} must be defined in a subclass")
|
||||
|
||||
not_implemented = self._remaining_defaults(_OPTIONAL_SIGNAL)
|
||||
for signal_name, pv_suffix in override_suffixes.items():
|
||||
if signal_name not in not_implemented:
|
||||
component: Cpt[EpicsSignalBase] = getattr(self.__class__, signal_name)
|
||||
signal: EpicsSignalBase = getattr(self, signal_name)
|
||||
signal.__init__(prefix + pv_suffix, **component.kwargs)
|
||||
|
||||
# Make the default alias for the user_readback the name of the motor itself
|
||||
# for compatibility with EpicsMotor
|
||||
self.user_readback.name = self.name
|
||||
|
||||
def _remaining_defaults(self, attr: _SignalSentinel) -> set[str]:
|
||||
return set(filter(lambda s: getattr(self, s) is attr, self.SIGNAL_NAMES))
|
||||
|
||||
|
||||
class PSIPositionerBase(PSISimplePositionerBase):
|
||||
"""Base class for positioners which are similar to a motor but do not implement
|
||||
all the required signals for an EpicsMotor or have different PV suffices."""
|
||||
|
||||
# position
|
||||
user_readback = _REQUIRED_SIGNAL
|
||||
user_setpoint = _REQUIRED_SIGNAL
|
||||
SIGNAL_NAMES = _SIGNAL_NAMES
|
||||
|
||||
# calibration dial <-> user
|
||||
user_offset = _OPTIONAL_SIGNAL
|
||||
@ -57,13 +118,11 @@ class PSIPositionerBase(ABC, Device, PositionerBase):
|
||||
set_use_switch = _OPTIONAL_SIGNAL
|
||||
|
||||
# configuration
|
||||
velocity = _OPTIONAL_SIGNAL
|
||||
acceleration = _OPTIONAL_SIGNAL
|
||||
motor_egu = _OPTIONAL_SIGNAL
|
||||
|
||||
# motor status
|
||||
motor_is_moving = _OPTIONAL_SIGNAL
|
||||
motor_done_move = _OPTIONAL_SIGNAL
|
||||
high_limit_switch = _OPTIONAL_SIGNAL
|
||||
low_limit_switch = _OPTIONAL_SIGNAL
|
||||
high_limit_travel = _OPTIONAL_SIGNAL
|
||||
@ -71,7 +130,6 @@ class PSIPositionerBase(ABC, Device, PositionerBase):
|
||||
direction_of_travel = _OPTIONAL_SIGNAL
|
||||
|
||||
# commands
|
||||
motor_stop = _OPTIONAL_SIGNAL
|
||||
home_forward = _OPTIONAL_SIGNAL
|
||||
home_reverse = _OPTIONAL_SIGNAL
|
||||
|
||||
@ -102,14 +160,3 @@ class PSIPositionerBase(ABC, Device, PositionerBase):
|
||||
connection_timeout=connection_timeout,
|
||||
**kwargs,
|
||||
)
|
||||
|
||||
not_implemented = {
|
||||
signal for signal in _SIGNAL_NAMES if getattr(self, signal) is _REQUIRED_SIGNAL
|
||||
}
|
||||
if not_implemented != set():
|
||||
raise RequiredSignalNotSpecified(
|
||||
f"Signal(s) {not_implemented} must be defined in a subclass"
|
||||
)
|
||||
# Make the default alias for the user_readback the name of the motor itself
|
||||
# for compatibility with EpicsMotor
|
||||
self.user_readback.name = self.name
|
||||
|
19
tests/positioner_test_ioc.py
Normal file
19
tests/positioner_test_ioc.py
Normal file
@ -0,0 +1,19 @@
|
||||
#!/usr/bin/env python3
|
||||
from textwrap import dedent
|
||||
|
||||
from caproto.server import PVGroup, ioc_arg_parser, pvproperty, run
|
||||
|
||||
|
||||
class PositionerIOCTest(PVGroup):
|
||||
""""""
|
||||
|
||||
SETPOINT = pvproperty(value=2.0, doc="A float")
|
||||
READBACK = pvproperty(value=2.0, doc="A float")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
ioc_options, run_options = ioc_arg_parser(
|
||||
default_prefix="SIM:", desc=dedent(PositionerIOCTest.__doc__)
|
||||
)
|
||||
ioc = PositionerIOCTest(**ioc_options)
|
||||
run(ioc.pvdb, **run_options)
|
16
tests/test_psi_positioner.py
Normal file
16
tests/test_psi_positioner.py
Normal file
@ -0,0 +1,16 @@
|
||||
import caproto.ioc_examples.simple
|
||||
from ophyd import Component as Cpt
|
||||
from ophyd import EpicsSignal
|
||||
|
||||
from ophyd_devices.interfaces.base_classes.psi_positioner_base import PSISimplePositionerBase
|
||||
|
||||
|
||||
class PvTestPositioner(PSISimplePositionerBase):
|
||||
user_readback = Cpt(EpicsSignal, suffix="READ")
|
||||
|
||||
|
||||
def test_simple_positioner_init():
|
||||
device = PvTestPositioner("SIM:", name="test", override_suffixes={"user_readback": "READBACK"})
|
||||
device.user_readback.wait_for_connection()
|
||||
reading = device.user_readback.read()
|
||||
assert reading
|
Reference in New Issue
Block a user