diff --git a/debye_bec/devices/mo1_bragg.py b/debye_bec/devices/mo1_bragg.py index 0b820ea..3ffbd8d 100644 --- a/debye_bec/devices/mo1_bragg.py +++ b/debye_bec/devices/mo1_bragg.py @@ -34,7 +34,7 @@ class MoveType(str, enum.Enum): ANGLE = "angle" -class Mo1BraggCrystal: +class Mo1BraggCrystal(Device): """Class to set the crystal parameters of the Bragg positioner""" offset_si111 = Cpt( @@ -81,7 +81,7 @@ class Mo1Bragg(Device, PositionerBase): crystal = Cpt(Mo1BraggCrystal, "") # Introduced new signal to be able to switch between motion in energy or angle - move_type = Cpt(MoveTypeSignal, value="energy", kind=Kind.normal) + move_type = Cpt(MoveTypeSignal, value=MoveType.ENERGY, kind=Kind.normal) # Motor PVs feedback_pos_energy = Cpt( @@ -140,7 +140,7 @@ class Mo1Bragg(Device, PositionerBase): @property def limits(self) -> tuple: """Return limits of the Bragg positioner""" - if self.move_type.get() == "energy": + if self.move_type.get() == MoveType.ENERGY: return (self.low_limit_energy.get(), self.high_limit_energy.get()) return (self.low_limit_angle.get(), self.high_limit_angle.get()) @@ -157,7 +157,7 @@ class Mo1Bragg(Device, PositionerBase): @property def egu(self) -> str: """Return the engineering units of the positioner""" - if self.move_type.get() == "energy": + if self.move_type.get() == MoveType.ENERGY: return "eV" return "deg" @@ -191,7 +191,12 @@ class Mo1Bragg(Device, PositionerBase): super().stop(success=success) def _move_and_finish( - self, target_pos: float, move_cpt: Cpt, status: DeviceStatus, update_frequency: float = 0.1 + self, + target_pos: float, + move_cpt: Cpt, + read_cpt: Cpt, + status: DeviceStatus, + update_frequency: float = 0.1, ) -> None: """Move the simulated device and finish the motion. @@ -209,13 +214,12 @@ class Mo1Bragg(Device, PositionerBase): # Loop until the motion is done and run the subscriptions while self.move_abs_done.get() == 0: # Is this needed since BEC is subscribed to the feedback_pos_angle due to the auto_monitor=True - self._run_subs(sub_type=self.SUB_READBACK, value=self.position) + self._run_subs(sub_type=self.SUB_READBACK, value=read_cpt.get()) if self._stopped: success = False break time.sleep(update_frequency) self._done_moving(success=success) - status.set_finished() # pylint: disable=broad-except except Exception as exc: content = traceback.format_exc() @@ -244,8 +248,11 @@ class Mo1Bragg(Device, PositionerBase): move_cpt = ( self.setpoint_abs_energy if move_type == MoveType.ENERGY else self.setpoint_abs_angle ) + read_cpt = ( + self.feedback_pos_energy if move_type == MoveType.ENERGY else self.feedback_pos_angle + ) self._move_thread = threading.Thread( - target=self._move_and_finish, args=(value, move_cpt, status, 0.1) + target=self._move_and_finish, args=(value, move_cpt, read_cpt, status, 0.1) ) self._move_thread.start() return status diff --git a/debye_bec/devices/test_utils/utils.py b/debye_bec/devices/test_utils/utils.py new file mode 100644 index 0000000..e4843cb --- /dev/null +++ b/debye_bec/devices/test_utils/utils.py @@ -0,0 +1,9 @@ +def patch_dual_pvs(device): + device.wait_for_connection(all_signals=True) + for walk in device.walk_signals(): + if not hasattr(walk.item, "_read_pv"): + continue + if not hasattr(walk.item, "_write_pv"): + continue + if walk.item._read_pv.pvname.endswith("_RBV"): + walk.item._read_pv = walk.item._write_pv diff --git a/tests/tests_devices/test_mo1_bragg.py b/tests/tests_devices/test_mo1_bragg.py new file mode 100644 index 0000000..8d93ec3 --- /dev/null +++ b/tests/tests_devices/test_mo1_bragg.py @@ -0,0 +1,113 @@ +# pylint: skip-file +import os +import threading +import time +from unittest import mock + +import ophyd +import pytest +from ophyd.utils import LimitError +from ophyd_devices.tests.utils import MockPV + +# from bec_server.device_server.tests.utils import DMMock +from debye_bec.devices.mo1_bragg import Mo1Bragg, MoveType + +# TODO move this function to ophyd_devices, it is duplicated in csaxs_bec and needed for other pluging repositories +from debye_bec.devices.test_utils.utils import patch_dual_pvs + + +@pytest.fixture(scope="function") +def mock_bragg(): + name = "bragg" + prefix = "X01DA-OP-MO1:BRAGG:" + with mock.patch.object(ophyd, "cl") as mock_cl: + mock_cl.get_pv = MockPV + mock_cl.thread_class = threading.Thread + dev = Mo1Bragg(name=name, prefix=prefix) + patch_dual_pvs(dev) + yield dev + + +def test_init(mock_bragg): + dev = mock_bragg + assert dev.name == "bragg" + assert dev.prefix == "X01DA-OP-MO1:BRAGG:" + assert dev.move_type.get() == MoveType.ENERGY + assert dev.crystal.offset_si111._read_pvname == "X01DA-OP-MO1:BRAGG:offset_si111_RBV" + assert dev.move_abs._read_pvname == "X01DA-OP-MO1:BRAGG:move_abs" + + +def test_check_value(mock_bragg): + dev = mock_bragg + dev.low_limit_energy._read_pv.mock_data = 0 + dev.high_limit_energy._read_pv.mock_data = 1 + dev.low_limit_angle._read_pv.mock_data = 10 + dev.high_limit_angle._read_pv.mock_data = 20 + # Check that limits are taken correctly from angle or energy + # Energy first + move_type = MoveType.ENERGY + dev.move_type.set(move_type) + # nothing happens + dev.check_value(0.5) + with pytest.raises(LimitError): + dev.check_value(15) + # Angle next + move_type = MoveType.ANGLE + dev.move_type.set(move_type) + dev.check_value(15) + with pytest.raises(LimitError): + dev.check_value(0.5) + + +def test_egu(mock_bragg): + dev = mock_bragg + assert dev.egu == "eV" + dev.move_type.set(MoveType.ANGLE) + assert dev.egu == "deg" + + +def test_move_succeeds(mock_bragg): + dev = mock_bragg + dev.move_abs._read_pv.mock_data = 0 + # Move succeeds + with mock.patch.object(dev.move_abs_done._read_pv, "mock_data", side_effect=[0, 1]): + status = dev.move(0.5) + # Sleep needed for while loop in _move_and_finish + time.sleep(0.5) + assert status.done is True + assert status.success is True + assert dev.setpoint_abs_energy.get() == 0.5 + assert dev.move_abs.get() == 1 + + +def test_stop_move(mock_bragg): + dev = mock_bragg + dev.move_abs._read_pv.mock_data = 0 + dev.move_abs_done._read_pv.mock_data = 0 + # Move fails + status = dev.move(0.5) + assert status.done is False + time.sleep(0.5) + assert dev._stopped == False + dev.stop() + time.sleep(0.5) + assert dev._stopped == True + assert status.done is True + assert status.success is False + + +def test_set_xtal(mock_bragg): + dev = mock_bragg + dev.set_xtal("111") + # Default values for mock + assert dev.crystal.offset_si111.get() == 0 + assert dev.crystal.offset_si311.get() == 0 + assert dev.crystal.d_spacing_si111.get() == 0 + assert dev.crystal.d_spacing_si311.get() == 0 + assert dev.crystal.xtal_enum.get() == 0 + dev.set_xtal("311", offset_si111=1, offset_si311=2, d_spacing_si111=3, d_spacing_si311=4) + assert dev.crystal.offset_si111.get() == 1 + assert dev.crystal.offset_si311.get() == 2 + assert dev.crystal.d_spacing_si111.get() == 3 + assert dev.crystal.d_spacing_si311.get() == 4 + assert dev.crystal.xtal_enum.get() == 1