feat(#118): Resolve move status from readback val

This commit is contained in:
2025-07-03 16:35:37 +02:00
committed by David Perl
parent a2d8c4932d
commit 8d37c77ee8
2 changed files with 72 additions and 7 deletions

View File

@ -69,7 +69,7 @@ class PSISimplePositionerBase(ABC, Device, PositionerBase):
user_setpoint: EpicsSignalBase = _OPTIONAL_SIGNAL
velocity: EpicsSignalBase = _OPTIONAL_SIGNAL
motor_stop: EpicsSignalBase = _OPTIONAL_SIGNAL
motor_done_move: EpicsSignalBase = _REQUIRED_SIGNAL
motor_done_move: EpicsSignalBase = _OPTIONAL_SIGNAL
stop_value = 1 # The value to put to the stop PV (if set) to make the motor stop
done_value = 1 # The value expected to be reported by motor_done_move when the move is done
@ -94,7 +94,7 @@ class PSISimplePositionerBase(ABC, Device, PositionerBase):
Args:
name (str): (required) the name of the device
limits (list | tuple | None): If given, a length-2 sequence within the range of which movemnt is allowed.
deadband (float | None): If given, set a soft deadband of this absolute value, within which positioner moves will return immediately.
deadband (float | None): If given, set a soft deadband of this absolute value, within which positioner moves will return immediately. If the positioner has no motor_done_move signal, you must provide this.
override_suffixes (dict[str, str]): a dictionary of signal_name: pv_suffix which will replace the values in the signal classvar.
"""
super().__init__(
@ -116,6 +116,11 @@ class PSISimplePositionerBase(ABC, Device, PositionerBase):
if (missing := self._remaining_defaults(_REQUIRED_SIGNAL)) != set():
raise RequiredSignalNotSpecified(f"Signal(s) {missing} must be defined in a subclass")
if self.user_readback is _OPTIONAL_SIGNAL and self.motor_done_move is _OPTIONAL_SIGNAL:
raise ValueError(
"Positioner must have at least one of user_readback and motor_done_move"
)
not_implemented = self._remaining_defaults(_OPTIONAL_SIGNAL)
for signal_name, pv_suffix in override_suffixes.items():
if signal_name not in not_implemented:
@ -131,8 +136,12 @@ class PSISimplePositionerBase(ABC, Device, PositionerBase):
f"{self.__class__} does not implement overridden signal {signal_name}"
)
self.user_readback.subscribe(self._pos_changed)
self.motor_done_move.subscribe(self._move_changed)
if self.user_readback is not _OPTIONAL_SIGNAL:
self.user_readback.subscribe(self._pos_changed)
if self.motor_done_move is not _OPTIONAL_SIGNAL:
self.motor_done_move.subscribe(self._move_changed)
elif deadband is None:
raise ValueError("Deadband must not be None for a device with no done signal")
# Make the default alias for the user_readback the name of the motor itself
# for compatibility with EpicsMotor
@ -232,14 +241,27 @@ class PSISimplePositionerBase(ABC, Device, PositionerBase):
self.log.debug(
f"[ts={fmt_time(timestamp)}] {self.name} moving: {self._moving} (value={value})"
)
if started:
self._run_subs(sub_type=self.SUB_START, timestamp=timestamp, value=value, **kwargs)
if self.motor_done_move is _OPTIONAL_SIGNAL:
if not self._moving:
# we got a position update within the deadband of the setpoint, close out move statuses.
self._run_subs(
sub_type=self._SUB_REQ_DONE, timestamp=timestamp, value=value, **kwargs
)
@required_for_connection
def _pos_changed(self, timestamp=None, value=None, **kwargs):
"""Callback from EPICS, indicating a change in position"""
self._set_position(value)
if self.motor_done_move is _OPTIONAL_SIGNAL:
# No DMOV, so we compare to the setpoint to see if we are done
self._move_changed(
timestamp=timestamp,
value=(abs(self.user_setpoint.get() - value) < self._deadband),
**kwargs,
)
def stop(self, *, success=False):
if self.motor_stop is not _OPTIONAL_SIGNAL:

View File

@ -51,8 +51,8 @@ def test_override_suffixes_warns_on_nonimplemented(ophyd_logger):
)
@pytest.fixture(scope="function")
def mock_psi_positioner() -> PSISimplePositioner:
@pytest.fixture()
def mock_psi_positioner():
name = "positioner"
prefix = "SIM:MOTOR"
with patch.object(ophyd, "cl") as mock_cl:
@ -117,3 +117,46 @@ def test_psi_positioner_soft_limits():
assert isinstance(device.high_limit_travel, Signal)
assert device.low_limit_travel.get() == -1.5
assert device.high_limit_travel.get() == 1.5
class ReadbackPositioner(PSISimplePositionerBase):
user_readback = Cpt(FakeEpicsSignalRO, "R")
user_setpoint = Cpt(FakeEpicsSignal, "S")
@pytest.fixture()
def mock_readback_positioner():
name = "positioner"
prefix = "SIM:MOTOR"
with patch.object(ophyd, "cl") as mock_cl:
mock_cl.get_pv = MockPV
mock_cl.thread_class = threading.Thread
dev = ReadbackPositioner(name=name, prefix=prefix, deadband=0.0013)
patch_dual_pvs(dev)
dev._set_position(0)
yield dev
@pytest.mark.parametrize(
"setpoint,move_positions,completes",
[
(5, [2, 4, 5], True),
(-5, [-2, -4, -4.9986], False),
(-5, [-2, -4, -4.9988], True),
(2, [2], True),
(2, [0.2, 0.3, 0.4, 0.5], False),
],
)
def test_done_move_based_on_readback(mock_readback_positioner, setpoint, move_positions, completes):
mock_readback_positioner.wait_for_connection()
st = mock_readback_positioner.move(setpoint, wait=False)
final_pos = move_positions.pop()
assert mock_readback_positioner.user_setpoint.get() == setpoint
assert not st.done
for pos in move_positions:
mock_readback_positioner.user_readback.sim_put(pos)
assert not st.done
mock_readback_positioner.user_readback.sim_put(final_pos)
assert st.done == completes