diff --git a/ophyd_devices/interfaces/base_classes/psi_positioner_base.py b/ophyd_devices/interfaces/base_classes/psi_positioner_base.py index 9be5501..70eae6c 100644 --- a/ophyd_devices/interfaces/base_classes/psi_positioner_base.py +++ b/ophyd_devices/interfaces/base_classes/psi_positioner_base.py @@ -69,7 +69,7 @@ class PSISimplePositionerBase(ABC, Device, PositionerBase): user_setpoint: EpicsSignalBase = _OPTIONAL_SIGNAL velocity: EpicsSignalBase = _OPTIONAL_SIGNAL motor_stop: EpicsSignalBase = _OPTIONAL_SIGNAL - motor_done_move: EpicsSignalBase = _REQUIRED_SIGNAL + motor_done_move: EpicsSignalBase = _OPTIONAL_SIGNAL stop_value = 1 # The value to put to the stop PV (if set) to make the motor stop done_value = 1 # The value expected to be reported by motor_done_move when the move is done @@ -94,7 +94,7 @@ class PSISimplePositionerBase(ABC, Device, PositionerBase): Args: name (str): (required) the name of the device limits (list | tuple | None): If given, a length-2 sequence within the range of which movemnt is allowed. - deadband (float | None): If given, set a soft deadband of this absolute value, within which positioner moves will return immediately. + deadband (float | None): If given, set a soft deadband of this absolute value, within which positioner moves will return immediately. If the positioner has no motor_done_move signal, you must provide this. override_suffixes (dict[str, str]): a dictionary of signal_name: pv_suffix which will replace the values in the signal classvar. """ super().__init__( @@ -116,6 +116,11 @@ class PSISimplePositionerBase(ABC, Device, PositionerBase): if (missing := self._remaining_defaults(_REQUIRED_SIGNAL)) != set(): raise RequiredSignalNotSpecified(f"Signal(s) {missing} must be defined in a subclass") + if self.user_readback is _OPTIONAL_SIGNAL and self.motor_done_move is _OPTIONAL_SIGNAL: + raise ValueError( + "Positioner must have at least one of user_readback and motor_done_move" + ) + not_implemented = self._remaining_defaults(_OPTIONAL_SIGNAL) for signal_name, pv_suffix in override_suffixes.items(): if signal_name not in not_implemented: @@ -131,8 +136,12 @@ class PSISimplePositionerBase(ABC, Device, PositionerBase): f"{self.__class__} does not implement overridden signal {signal_name}" ) - self.user_readback.subscribe(self._pos_changed) - self.motor_done_move.subscribe(self._move_changed) + if self.user_readback is not _OPTIONAL_SIGNAL: + self.user_readback.subscribe(self._pos_changed) + if self.motor_done_move is not _OPTIONAL_SIGNAL: + self.motor_done_move.subscribe(self._move_changed) + elif deadband is None: + raise ValueError("Deadband must not be None for a device with no done signal") # Make the default alias for the user_readback the name of the motor itself # for compatibility with EpicsMotor @@ -232,14 +241,27 @@ class PSISimplePositionerBase(ABC, Device, PositionerBase): self.log.debug( f"[ts={fmt_time(timestamp)}] {self.name} moving: {self._moving} (value={value})" ) - if started: self._run_subs(sub_type=self.SUB_START, timestamp=timestamp, value=value, **kwargs) + if self.motor_done_move is _OPTIONAL_SIGNAL: + if not self._moving: + # we got a position update within the deadband of the setpoint, close out move statuses. + self._run_subs( + sub_type=self._SUB_REQ_DONE, timestamp=timestamp, value=value, **kwargs + ) + @required_for_connection def _pos_changed(self, timestamp=None, value=None, **kwargs): """Callback from EPICS, indicating a change in position""" self._set_position(value) + if self.motor_done_move is _OPTIONAL_SIGNAL: + # No DMOV, so we compare to the setpoint to see if we are done + self._move_changed( + timestamp=timestamp, + value=(abs(self.user_setpoint.get() - value) < self._deadband), + **kwargs, + ) def stop(self, *, success=False): if self.motor_stop is not _OPTIONAL_SIGNAL: diff --git a/tests/test_psi_positioner.py b/tests/test_psi_positioner.py index 03029b4..e44bef5 100644 --- a/tests/test_psi_positioner.py +++ b/tests/test_psi_positioner.py @@ -51,8 +51,8 @@ def test_override_suffixes_warns_on_nonimplemented(ophyd_logger): ) -@pytest.fixture(scope="function") -def mock_psi_positioner() -> PSISimplePositioner: +@pytest.fixture() +def mock_psi_positioner(): name = "positioner" prefix = "SIM:MOTOR" with patch.object(ophyd, "cl") as mock_cl: @@ -117,3 +117,46 @@ def test_psi_positioner_soft_limits(): assert isinstance(device.high_limit_travel, Signal) assert device.low_limit_travel.get() == -1.5 assert device.high_limit_travel.get() == 1.5 + + +class ReadbackPositioner(PSISimplePositionerBase): + user_readback = Cpt(FakeEpicsSignalRO, "R") + user_setpoint = Cpt(FakeEpicsSignal, "S") + + +@pytest.fixture() +def mock_readback_positioner(): + name = "positioner" + prefix = "SIM:MOTOR" + with patch.object(ophyd, "cl") as mock_cl: + mock_cl.get_pv = MockPV + mock_cl.thread_class = threading.Thread + dev = ReadbackPositioner(name=name, prefix=prefix, deadband=0.0013) + patch_dual_pvs(dev) + dev._set_position(0) + yield dev + + +@pytest.mark.parametrize( + "setpoint,move_positions,completes", + [ + (5, [2, 4, 5], True), + (-5, [-2, -4, -4.9986], False), + (-5, [-2, -4, -4.9988], True), + (2, [2], True), + (2, [0.2, 0.3, 0.4, 0.5], False), + ], +) +def test_done_move_based_on_readback(mock_readback_positioner, setpoint, move_positions, completes): + mock_readback_positioner.wait_for_connection() + st = mock_readback_positioner.move(setpoint, wait=False) + final_pos = move_positions.pop() + assert mock_readback_positioner.user_setpoint.get() == setpoint + assert not st.done + + for pos in move_positions: + mock_readback_positioner.user_readback.sim_put(pos) + assert not st.done + + mock_readback_positioner.user_readback.sim_put(final_pos) + assert st.done == completes