feat(#118): resolve from put completion + test

This commit is contained in:
2025-07-07 09:57:19 +02:00
parent b3e1ee74dd
commit 3a6e2dd9bd
3 changed files with 73 additions and 4 deletions

View File

@ -29,6 +29,8 @@ class OptionalSignalNotSpecified(PSIPositionerException): ...
class SimplePositionerSignals(TypedDict, total=False):
"""The list of all the signals in the PSISimplePositionerBase"""
user_readback: str
user_setpoint: str
motor_done_move: str
@ -40,6 +42,9 @@ _SIMPLE_SIGNAL_NAMES = SimplePositionerSignals.__optional_keys__
class PositionerSignals(SimplePositionerSignals, total=False):
"""The list of all the signals in the PSIPositionerBase. See that class for
documentation of signal functionality."""
user_offset: str
user_offset_dir: str
offset_freeze_switch: str
@ -82,6 +87,7 @@ class PSISimplePositionerBase(ABC, Device, PositionerBase):
name,
limits: list[float] | tuple[float, ...] | None = None,
deadband: float | None = None,
use_put_completion: bool | None = None,
read_attrs=None,
configuration_attrs=None,
parent=None,
@ -93,8 +99,9 @@ class PSISimplePositionerBase(ABC, Device, PositionerBase):
Args:
name (str): (required) the name of the device
limits (list | tuple | None): If given, a length-2 sequence within the range of which movemnt is allowed.
limits (list | tuple | None): If given, a length-2 sequence within the range of which movement is allowed.
deadband (float | None): If given, set a soft deadband of this absolute value, within which positioner moves will return immediately. If the positioner has no motor_done_move signal, you must provide this.
use_put_completion (bool | None): If given, use put completion on the setpoint signal to resolve the move status.
override_suffixes (dict[str, str]): a dictionary of signal_name: pv_suffix which will replace the values in the signal classvar.
"""
super().__init__(
@ -111,6 +118,8 @@ class PSISimplePositionerBase(ABC, Device, PositionerBase):
else:
self._limits = None
self._deadband = deadband
if use_put_completion is not None:
self.use_put_complete = use_put_completion
self._egu = kwargs.get("egu") or ""
if (missing := self._remaining_defaults(_REQUIRED_SIGNAL)) != set():
@ -181,7 +190,12 @@ class PSISimplePositionerBase(ABC, Device, PositionerBase):
def _setup_move(self, position):
"""Move and do not wait until motion is complete (asynchronous)"""
self.log.debug(f"{self.name}.user_setpoint = {position}")
self.user_setpoint.put(position, wait=True)
if not self.use_put_complete:
self.user_setpoint.put(position, wait=True)
else:
self.user_setpoint.put(
position, wait=False, callback=lambda *_: self._done_moving(success=True)
)
def move(self, position, wait=True, timeout=None, moved_cb=None):
"""Move to a specified position, optionally waiting for motion to
@ -245,6 +259,8 @@ class PSISimplePositionerBase(ABC, Device, PositionerBase):
self._run_subs(sub_type=self.SUB_START, timestamp=timestamp, value=value, **kwargs)
if self.motor_done_move is _OPTIONAL_SIGNAL:
# if there is no motor_done_move, we came here from self._pos_changed with a value
# based on whether whe are within the deadband
if not self._moving:
# we got a position update within the deadband of the setpoint, close out move statuses.
self._run_subs(
@ -281,6 +297,10 @@ class PSISimplePositionerBase(ABC, Device, PositionerBase):
else:
return self.user_setpoint.limits
@property
def egu(self):
return self._egu
def _repr_info(self):
yield from super()._repr_info()
@ -299,8 +319,10 @@ class PSIPositionerBase(PSISimplePositionerBase):
SIGNAL_NAMES = _SIGNAL_NAMES
# calibration dial <-> user
# https://epics.anl.gov/bcda/synApps/motor/motorRecord.html#Fields_calib
user_offset = _OPTIONAL_SIGNAL
user_offset_dir = _OPTIONAL_SIGNAL
# Fix the difference between the user and dial positions
offset_freeze_switch = _OPTIONAL_SIGNAL
set_use_switch = _OPTIONAL_SIGNAL
@ -353,3 +375,9 @@ class PSIPositionerBase(PSISimplePositionerBase):
# If the limit signals are defined as soft signals, propagate the limits there
if sig is not _OPTIONAL_SIGNAL and type(sig) is Signal:
sig.put(lim)
@property
def egu(self):
if self.motor_egu is not _OPTIONAL_SIGNAL:
return self.motor_egu.get()
return self._egu

View File

@ -1,5 +1,7 @@
"""Utilities to mock and test devices."""
import threading
from time import sleep
from typing import TYPE_CHECKING
from unittest import mock
@ -183,6 +185,7 @@ class MockPV:
self.callbacks = {}
self._put_complete = None
self._put_complete_event: threading.Event | None = None
self._monref = None # holder of data returned from create_subscription
self._monref_mask = None
self._conn_started = False
@ -228,9 +231,22 @@ class MockPV:
self, value, wait=False, timeout=None, use_complete=False, callback=None, callback_data=None
):
"""MOCK PV, put function"""
def put_complete():
while True:
if self._put_complete_event.is_set():
self._put_complete_event.clear()
callback()
break
sleep(0.2)
self.mock_data = value
if callback is not None:
callback()
if not self._put_complete_event:
callback()
else:
threading.Thread(target=put_complete, daemon=True).start()
# pylint: disable=unused-argument
def add_callback(self, callback=None, index=None, run_now=False, with_ctrlvars=True, **kw):

View File

@ -1,10 +1,11 @@
import threading
from time import sleep
from unittest.mock import ANY, MagicMock, patch
import ophyd
import pytest
from ophyd.device import Component as Cpt
from ophyd.signal import EpicsSignal, Kind, Signal
from ophyd.signal import EpicsSignal, EpicsSignalRO, Kind, Signal
from ophyd.sim import FakeEpicsSignal, FakeEpicsSignalRO
from ophyd_devices.devices.simple_positioner import PSISimplePositioner
@ -59,6 +60,7 @@ def mock_psi_positioner():
mock_cl.get_pv = MockPV
mock_cl.thread_class = threading.Thread
dev = PSISimplePositioner(name=name, prefix=prefix, deadband=0.0013)
dev.wait_for_connection()
patch_dual_pvs(dev)
yield dev
@ -133,6 +135,7 @@ def mock_readback_positioner():
mock_cl.thread_class = threading.Thread
dev = ReadbackPositioner(name=name, prefix=prefix, deadband=0.0013)
patch_dual_pvs(dev)
dev.wait_for_connection()
dev._set_position(0)
yield dev
@ -160,3 +163,25 @@ def test_done_move_based_on_readback(mock_readback_positioner, setpoint, move_po
mock_readback_positioner.user_readback.sim_put(final_pos)
assert st.done == completes
def test_put_complete_positioner():
class PsiTestPosPutComplete(PSISimplePositionerBase):
user_setpoint: EpicsSignal = Cpt(EpicsSignal, ".VAL", auto_monitor=True)
user_readback = Cpt(EpicsSignalRO, ".RBV", kind="hinted", auto_monitor=True)
with patch.object(ophyd, "cl") as mock_cl:
mock_cl.get_pv = MockPV
mock_cl.thread_class = threading.Thread
dev = PsiTestPosPutComplete("prefix:", name="test", use_put_completion=True, deadband=0.001)
patch_dual_pvs(dev)
dev.wait_for_connection()
dev.user_setpoint._read_pv._put_complete_event = threading.Event()
dev._set_position(0)
st = dev.move(6, wait=False)
assert dev.user_setpoint.get() == 6
assert not st.done
dev.user_setpoint._read_pv._put_complete_event.set()
sleep(1)
assert st.done