tests: add tests for Mo1Bragg
This commit is contained in:
@@ -34,7 +34,7 @@ class MoveType(str, enum.Enum):
|
||||
ANGLE = "angle"
|
||||
|
||||
|
||||
class Mo1BraggCrystal:
|
||||
class Mo1BraggCrystal(Device):
|
||||
"""Class to set the crystal parameters of the Bragg positioner"""
|
||||
|
||||
offset_si111 = Cpt(
|
||||
@@ -81,7 +81,7 @@ class Mo1Bragg(Device, PositionerBase):
|
||||
crystal = Cpt(Mo1BraggCrystal, "")
|
||||
|
||||
# Introduced new signal to be able to switch between motion in energy or angle
|
||||
move_type = Cpt(MoveTypeSignal, value="energy", kind=Kind.normal)
|
||||
move_type = Cpt(MoveTypeSignal, value=MoveType.ENERGY, kind=Kind.normal)
|
||||
|
||||
# Motor PVs
|
||||
feedback_pos_energy = Cpt(
|
||||
@@ -140,7 +140,7 @@ class Mo1Bragg(Device, PositionerBase):
|
||||
@property
|
||||
def limits(self) -> tuple:
|
||||
"""Return limits of the Bragg positioner"""
|
||||
if self.move_type.get() == "energy":
|
||||
if self.move_type.get() == MoveType.ENERGY:
|
||||
return (self.low_limit_energy.get(), self.high_limit_energy.get())
|
||||
return (self.low_limit_angle.get(), self.high_limit_angle.get())
|
||||
|
||||
@@ -157,7 +157,7 @@ class Mo1Bragg(Device, PositionerBase):
|
||||
@property
|
||||
def egu(self) -> str:
|
||||
"""Return the engineering units of the positioner"""
|
||||
if self.move_type.get() == "energy":
|
||||
if self.move_type.get() == MoveType.ENERGY:
|
||||
return "eV"
|
||||
return "deg"
|
||||
|
||||
@@ -191,7 +191,12 @@ class Mo1Bragg(Device, PositionerBase):
|
||||
super().stop(success=success)
|
||||
|
||||
def _move_and_finish(
|
||||
self, target_pos: float, move_cpt: Cpt, status: DeviceStatus, update_frequency: float = 0.1
|
||||
self,
|
||||
target_pos: float,
|
||||
move_cpt: Cpt,
|
||||
read_cpt: Cpt,
|
||||
status: DeviceStatus,
|
||||
update_frequency: float = 0.1,
|
||||
) -> None:
|
||||
"""Move the simulated device and finish the motion.
|
||||
|
||||
@@ -209,13 +214,12 @@ class Mo1Bragg(Device, PositionerBase):
|
||||
# Loop until the motion is done and run the subscriptions
|
||||
while self.move_abs_done.get() == 0:
|
||||
# Is this needed since BEC is subscribed to the feedback_pos_angle due to the auto_monitor=True
|
||||
self._run_subs(sub_type=self.SUB_READBACK, value=self.position)
|
||||
self._run_subs(sub_type=self.SUB_READBACK, value=read_cpt.get())
|
||||
if self._stopped:
|
||||
success = False
|
||||
break
|
||||
time.sleep(update_frequency)
|
||||
self._done_moving(success=success)
|
||||
status.set_finished()
|
||||
# pylint: disable=broad-except
|
||||
except Exception as exc:
|
||||
content = traceback.format_exc()
|
||||
@@ -244,8 +248,11 @@ class Mo1Bragg(Device, PositionerBase):
|
||||
move_cpt = (
|
||||
self.setpoint_abs_energy if move_type == MoveType.ENERGY else self.setpoint_abs_angle
|
||||
)
|
||||
read_cpt = (
|
||||
self.feedback_pos_energy if move_type == MoveType.ENERGY else self.feedback_pos_angle
|
||||
)
|
||||
self._move_thread = threading.Thread(
|
||||
target=self._move_and_finish, args=(value, move_cpt, status, 0.1)
|
||||
target=self._move_and_finish, args=(value, move_cpt, read_cpt, status, 0.1)
|
||||
)
|
||||
self._move_thread.start()
|
||||
return status
|
||||
|
||||
9
debye_bec/devices/test_utils/utils.py
Normal file
9
debye_bec/devices/test_utils/utils.py
Normal file
@@ -0,0 +1,9 @@
|
||||
def patch_dual_pvs(device):
|
||||
device.wait_for_connection(all_signals=True)
|
||||
for walk in device.walk_signals():
|
||||
if not hasattr(walk.item, "_read_pv"):
|
||||
continue
|
||||
if not hasattr(walk.item, "_write_pv"):
|
||||
continue
|
||||
if walk.item._read_pv.pvname.endswith("_RBV"):
|
||||
walk.item._read_pv = walk.item._write_pv
|
||||
113
tests/tests_devices/test_mo1_bragg.py
Normal file
113
tests/tests_devices/test_mo1_bragg.py
Normal file
@@ -0,0 +1,113 @@
|
||||
# pylint: skip-file
|
||||
import os
|
||||
import threading
|
||||
import time
|
||||
from unittest import mock
|
||||
|
||||
import ophyd
|
||||
import pytest
|
||||
from ophyd.utils import LimitError
|
||||
from ophyd_devices.tests.utils import MockPV
|
||||
|
||||
# from bec_server.device_server.tests.utils import DMMock
|
||||
from debye_bec.devices.mo1_bragg import Mo1Bragg, MoveType
|
||||
|
||||
# TODO move this function to ophyd_devices, it is duplicated in csaxs_bec and needed for other pluging repositories
|
||||
from debye_bec.devices.test_utils.utils import patch_dual_pvs
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def mock_bragg():
|
||||
name = "bragg"
|
||||
prefix = "X01DA-OP-MO1:BRAGG:"
|
||||
with mock.patch.object(ophyd, "cl") as mock_cl:
|
||||
mock_cl.get_pv = MockPV
|
||||
mock_cl.thread_class = threading.Thread
|
||||
dev = Mo1Bragg(name=name, prefix=prefix)
|
||||
patch_dual_pvs(dev)
|
||||
yield dev
|
||||
|
||||
|
||||
def test_init(mock_bragg):
|
||||
dev = mock_bragg
|
||||
assert dev.name == "bragg"
|
||||
assert dev.prefix == "X01DA-OP-MO1:BRAGG:"
|
||||
assert dev.move_type.get() == MoveType.ENERGY
|
||||
assert dev.crystal.offset_si111._read_pvname == "X01DA-OP-MO1:BRAGG:offset_si111_RBV"
|
||||
assert dev.move_abs._read_pvname == "X01DA-OP-MO1:BRAGG:move_abs"
|
||||
|
||||
|
||||
def test_check_value(mock_bragg):
|
||||
dev = mock_bragg
|
||||
dev.low_limit_energy._read_pv.mock_data = 0
|
||||
dev.high_limit_energy._read_pv.mock_data = 1
|
||||
dev.low_limit_angle._read_pv.mock_data = 10
|
||||
dev.high_limit_angle._read_pv.mock_data = 20
|
||||
# Check that limits are taken correctly from angle or energy
|
||||
# Energy first
|
||||
move_type = MoveType.ENERGY
|
||||
dev.move_type.set(move_type)
|
||||
# nothing happens
|
||||
dev.check_value(0.5)
|
||||
with pytest.raises(LimitError):
|
||||
dev.check_value(15)
|
||||
# Angle next
|
||||
move_type = MoveType.ANGLE
|
||||
dev.move_type.set(move_type)
|
||||
dev.check_value(15)
|
||||
with pytest.raises(LimitError):
|
||||
dev.check_value(0.5)
|
||||
|
||||
|
||||
def test_egu(mock_bragg):
|
||||
dev = mock_bragg
|
||||
assert dev.egu == "eV"
|
||||
dev.move_type.set(MoveType.ANGLE)
|
||||
assert dev.egu == "deg"
|
||||
|
||||
|
||||
def test_move_succeeds(mock_bragg):
|
||||
dev = mock_bragg
|
||||
dev.move_abs._read_pv.mock_data = 0
|
||||
# Move succeeds
|
||||
with mock.patch.object(dev.move_abs_done._read_pv, "mock_data", side_effect=[0, 1]):
|
||||
status = dev.move(0.5)
|
||||
# Sleep needed for while loop in _move_and_finish
|
||||
time.sleep(0.5)
|
||||
assert status.done is True
|
||||
assert status.success is True
|
||||
assert dev.setpoint_abs_energy.get() == 0.5
|
||||
assert dev.move_abs.get() == 1
|
||||
|
||||
|
||||
def test_stop_move(mock_bragg):
|
||||
dev = mock_bragg
|
||||
dev.move_abs._read_pv.mock_data = 0
|
||||
dev.move_abs_done._read_pv.mock_data = 0
|
||||
# Move fails
|
||||
status = dev.move(0.5)
|
||||
assert status.done is False
|
||||
time.sleep(0.5)
|
||||
assert dev._stopped == False
|
||||
dev.stop()
|
||||
time.sleep(0.5)
|
||||
assert dev._stopped == True
|
||||
assert status.done is True
|
||||
assert status.success is False
|
||||
|
||||
|
||||
def test_set_xtal(mock_bragg):
|
||||
dev = mock_bragg
|
||||
dev.set_xtal("111")
|
||||
# Default values for mock
|
||||
assert dev.crystal.offset_si111.get() == 0
|
||||
assert dev.crystal.offset_si311.get() == 0
|
||||
assert dev.crystal.d_spacing_si111.get() == 0
|
||||
assert dev.crystal.d_spacing_si311.get() == 0
|
||||
assert dev.crystal.xtal_enum.get() == 0
|
||||
dev.set_xtal("311", offset_si111=1, offset_si311=2, d_spacing_si111=3, d_spacing_si311=4)
|
||||
assert dev.crystal.offset_si111.get() == 1
|
||||
assert dev.crystal.offset_si311.get() == 2
|
||||
assert dev.crystal.d_spacing_si111.get() == 3
|
||||
assert dev.crystal.d_spacing_si311.get() == 4
|
||||
assert dev.crystal.xtal_enum.get() == 1
|
||||
Reference in New Issue
Block a user