mirror of
https://github.com/bec-project/ophyd_devices.git
synced 2025-07-12 19:51:52 +02:00
feat(#118): resolve from put completion + test
This commit is contained in:
@ -1,10 +1,11 @@
|
||||
import threading
|
||||
from time import sleep
|
||||
from unittest.mock import ANY, MagicMock, patch
|
||||
|
||||
import ophyd
|
||||
import pytest
|
||||
from ophyd.device import Component as Cpt
|
||||
from ophyd.signal import EpicsSignal, Kind, Signal
|
||||
from ophyd.signal import EpicsSignal, EpicsSignalRO, Kind, Signal
|
||||
from ophyd.sim import FakeEpicsSignal, FakeEpicsSignalRO
|
||||
|
||||
from ophyd_devices.devices.simple_positioner import PSISimplePositioner
|
||||
@ -59,6 +60,7 @@ def mock_psi_positioner():
|
||||
mock_cl.get_pv = MockPV
|
||||
mock_cl.thread_class = threading.Thread
|
||||
dev = PSISimplePositioner(name=name, prefix=prefix, deadband=0.0013)
|
||||
dev.wait_for_connection()
|
||||
patch_dual_pvs(dev)
|
||||
yield dev
|
||||
|
||||
@ -133,6 +135,7 @@ def mock_readback_positioner():
|
||||
mock_cl.thread_class = threading.Thread
|
||||
dev = ReadbackPositioner(name=name, prefix=prefix, deadband=0.0013)
|
||||
patch_dual_pvs(dev)
|
||||
dev.wait_for_connection()
|
||||
dev._set_position(0)
|
||||
yield dev
|
||||
|
||||
@ -160,3 +163,25 @@ def test_done_move_based_on_readback(mock_readback_positioner, setpoint, move_po
|
||||
|
||||
mock_readback_positioner.user_readback.sim_put(final_pos)
|
||||
assert st.done == completes
|
||||
|
||||
|
||||
def test_put_complete_positioner():
|
||||
class PsiTestPosPutComplete(PSISimplePositionerBase):
|
||||
user_setpoint: EpicsSignal = Cpt(EpicsSignal, ".VAL", auto_monitor=True)
|
||||
user_readback = Cpt(EpicsSignalRO, ".RBV", kind="hinted", auto_monitor=True)
|
||||
|
||||
with patch.object(ophyd, "cl") as mock_cl:
|
||||
mock_cl.get_pv = MockPV
|
||||
mock_cl.thread_class = threading.Thread
|
||||
dev = PsiTestPosPutComplete("prefix:", name="test", use_put_completion=True, deadband=0.001)
|
||||
patch_dual_pvs(dev)
|
||||
dev.wait_for_connection()
|
||||
dev.user_setpoint._read_pv._put_complete_event = threading.Event()
|
||||
dev._set_position(0)
|
||||
|
||||
st = dev.move(6, wait=False)
|
||||
assert dev.user_setpoint.get() == 6
|
||||
assert not st.done
|
||||
dev.user_setpoint._read_pv._put_complete_event.set()
|
||||
sleep(1)
|
||||
assert st.done
|
||||
|
Reference in New Issue
Block a user