From 1b2eeccbb81b8f14833afeb7f2ed50945b4920ca Mon Sep 17 00:00:00 2001 From: appel_c Date: Mon, 8 Dec 2025 18:40:09 +0100 Subject: [PATCH] test(MockPv): improve MockPV, allow start value to be set --- ophyd_devices/tests/utils.py | 13 ++++++++----- tests/test_psi_motors.py | 5 ++--- tests/test_utils.py | 27 ++++++++++++++++++--------- 3 files changed, 28 insertions(+), 17 deletions(-) diff --git a/ophyd_devices/tests/utils.py b/ophyd_devices/tests/utils.py index aac5a37..ca7898c 100644 --- a/ophyd_devices/tests/utils.py +++ b/ophyd_devices/tests/utils.py @@ -2,6 +2,7 @@ import threading from contextlib import contextmanager +from functools import partial from time import sleep from typing import TYPE_CHECKING, Callable, Generator, TypeVar from unittest import mock @@ -25,7 +26,9 @@ T = TypeVar("T", bound=Device) @contextmanager -def patched_device(device_type: type[T], *args, **kwargs) -> Generator[T, None, None]: +def patched_device( + device_type: type[T], *args, _mock_pv_initial_value=0, **kwargs +) -> Generator[T, None, None]: """Context manager to yield a patched ophyd device with certain initialisation args. *args and **kwargs are passed directly through to the device constructor. @@ -37,7 +40,7 @@ def patched_device(device_type: type[T], *args, **kwargs) -> Generator[T, None, """ with mock.patch.object(ophyd, "cl") as mock_cl: - mock_cl.get_pv = MockPV + mock_cl.get_pv = partial(MockPV, _mock_pv_initial_value=_mock_pv_initial_value) mock_cl.thread_class = threading.Thread device = device_type(*args, **kwargs) patch_dual_pvs(device) @@ -137,8 +140,6 @@ class MockPV: """ - DEFAULT_VALUE = 0 - _fmtsca = "" _fmtarr = "" _fields = ( @@ -173,6 +174,8 @@ class MockPV: def __init__( self, pvname, + *, + _mock_pv_initial_value=0, callback=None, form="time", verbose=False, @@ -202,7 +205,7 @@ class MockPV: self._args["access"] = "unknown" self._args["status"] = 0 self.connection_callbacks = [] - self._mock_data = self.DEFAULT_VALUE + self._mock_data = _mock_pv_initial_value if connection_callback is not None: self.connection_callbacks = [connection_callback] diff --git a/tests/test_psi_motors.py b/tests/test_psi_motors.py index 1a0a50c..d86dea6 100644 --- a/tests/test_psi_motors.py +++ b/tests/test_psi_motors.py @@ -9,8 +9,7 @@ from unittest import mock import ophyd import pytest -from ophyd_devices.devices.epics_motor_ex import EpicsMotorEx -from ophyd_devices.devices.psi_motor import EpicsMotor, EpicsMotorEC, EpicsUserMotors, SpmgStates +from ophyd_devices.devices.psi_motor import EpicsMotor, EpicsMotorEC, EpicsUserMotorVME, SpmgStates from ophyd_devices.tests.utils import MockPV, patched_device @@ -169,7 +168,7 @@ def test_epics_vme_user_motor(): with mock.patch.object(ophyd, "cl") as mock_cl: mock_cl.get_pv = MockPV mock_cl.thread_class = threading.Thread - device = EpicsUserMotors(name="test_motor_ex", prefix="SIM:MOTOR:EX") + device = EpicsUserMotorVME(name="test_motor_ex", prefix="SIM:MOTOR:EX") # Should raise with pytest.raises(RuntimeError): device.wait_for_connection(all_signals=True) diff --git a/tests/test_utils.py b/tests/test_utils.py index e835fb8..aa40ea5 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -12,15 +12,8 @@ from ophyd import Device, EpicsSignalRO, Signal from ophyd.status import WaitTimeoutError from typeguard import TypeCheckError -from ophyd_devices import ( - AndStatus, - DeviceStatus, - MoveStatus, - Status, - StatusBase, - SubscriptionStatus, -) -from ophyd_devices.tests.utils import MockPV +from ophyd_devices.devices.psi_motor import EpicsMotor +from ophyd_devices.tests.utils import MockPV, patched_device from ophyd_devices.utils.bec_signals import ( AsyncMultiSignal, AsyncSignal, @@ -31,8 +24,13 @@ from ophyd_devices.utils.bec_signals import ( ProgressSignal, ) from ophyd_devices.utils.psi_device_base_utils import ( + AndStatus, CompareStatus, + DeviceStatus, FileHandler, + MoveStatus, + StatusBase, + SubscriptionStatus, TaskHandler, TaskKilledError, TaskState, @@ -1021,3 +1019,14 @@ def test_patched_status_objects(): move_st.wait(timeout=10) assert move_st.done is True assert move_st.success is False + + +@pytest.fixture(scope="function") +def mock_device_with_initial_value(): + with patched_device(EpicsMotor, _mock_pv_initial_value=2, name="motor") as mtr: + yield mtr + + +def test_mock_device_initial_value(mock_device_with_initial_value: EpicsMotor): + mtr = mock_device_with_initial_value + assert mtr.velocity.get() == 2