From 5bb0df2ddf680e752fb464d77cd239800bf4686d Mon Sep 17 00:00:00 2001 From: gac-x01da Date: Fri, 9 May 2025 08:36:26 +0200 Subject: [PATCH 1/6] refactor(mo1-bragg-positioner): remove move_type, cleanupt of angle pvs in mo1_bragg_positioner --- debye_bec/devices/mo1_bragg/mo1_bragg.py | 27 +++-------- .../devices/mo1_bragg/mo1_bragg_devices.py | 48 ++++--------------- debye_bec/scans/mono_bragg_scans.py | 5 +- 3 files changed, 17 insertions(+), 63 deletions(-) diff --git a/debye_bec/devices/mo1_bragg/mo1_bragg.py b/debye_bec/devices/mo1_bragg/mo1_bragg.py index e6172db..18e41bb 100644 --- a/debye_bec/devices/mo1_bragg/mo1_bragg.py +++ b/debye_bec/devices/mo1_bragg/mo1_bragg.py @@ -81,7 +81,7 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner): The prefix to connect to the soft IOC is X01DA-OP-MO1:BRAGG: """ - USER_ACCESS = ["set_advanced_xas_settings"] + USER_ACCESS = ["set_advanced_xas_settings", "set_xtal"] def __init__(self, name: str, prefix: str = "", scan_info: ScanInfo | None = None, **kwargs): # type: ignore """ @@ -315,13 +315,8 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner): high (float): High energy/angle value of the scan scan_time (float): Time for a half oscillation """ - move_type = self.move_type.get() - if move_type == MoveType.ENERGY: - self.scan_settings.s_scan_energy_lo.put(low) - self.scan_settings.s_scan_energy_hi.put(high) - else: - self.scan_settings.s_scan_angle_lo.put(low) - self.scan_settings.s_scan_angle_hi.put(high) + self.scan_settings.s_scan_energy_lo.put(low) + self.scan_settings.s_scan_energy_hi.put(high) self.scan_settings.s_scan_scantime.put(scan_time) def wait_for_signal(self, signal: Cpt, value: Any, timeout: float | None = None) -> None: @@ -381,18 +376,10 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner): p_kink (float): Position of kink in % e_kink (float): Energy of kink in eV """ - # TODO Add fallback solution for automatic testing, otherwise test will fail - # because no monochromator will calculate the angle - # Unsure how to implement this - - move_type = self.move_type.get() - if move_type == MoveType.ENERGY: - e_kink_deg = self.convert_angle_energy(mode="EnergyToAngle", inp=e_kink) - # Angle and Energy are inverse proportional! - high_deg = self.convert_angle_energy(mode="EnergyToAngle", inp=low) - low_deg = self.convert_angle_energy(mode="EnergyToAngle", inp=high) - else: - raise Mo1BraggError("MoveType Angle not implemented for advanced scans, use Energy") + e_kink_deg = self.convert_angle_energy(mode="EnergyToAngle", inp=e_kink) + # Angle and Energy are inverse proportional! + high_deg = self.convert_angle_energy(mode="EnergyToAngle", inp=low) + low_deg = self.convert_angle_energy(mode="EnergyToAngle", inp=high) pos, vel, dt = compute_spline( low_deg=low_deg, diff --git a/debye_bec/devices/mo1_bragg/mo1_bragg_devices.py b/debye_bec/devices/mo1_bragg/mo1_bragg_devices.py index e7b8546..fada768 100644 --- a/debye_bec/devices/mo1_bragg/mo1_bragg_devices.py +++ b/debye_bec/devices/mo1_bragg/mo1_bragg_devices.py @@ -189,13 +189,13 @@ class Mo1BraggScanControl(Device): class Mo1BraggPositioner(Device, PositionerBase): """ - Positioner implementation of the MO1 Bragg positioner. + Positioner implementation with readback energy of the MO1 Bragg positioner. The prefix to connect to the soft IOC is X01DA-OP-MO1:BRAGG: This soft IOC connects to the NI motor and its control loop. """ - USER_ACCESS = ["set_advanced_xas_settings"] + USER_ACCESS = ["set_xtal"] ####### Sub-components ######## # Namespace is cleaner and easier to maintain @@ -207,10 +207,6 @@ class Mo1BraggPositioner(Device, PositionerBase): scan_control = Cpt(Mo1BraggScanControl, "") status = Cpt(Mo1BraggStatus, "") - ############# switch between energy and angle ############# - # TODO should be removed/replaced once decision about pseudo motor is made - move_type = Cpt(MoveTypeSignal, value=MoveType.ENERGY, kind="config") - ############# Energy PVs ############# readback = Cpt( @@ -226,22 +222,6 @@ class Mo1BraggPositioner(Device, PositionerBase): high_lim = Cpt(EpicsSignalRO, suffix="hi_lim_pos_energy_RBV", kind="config", auto_monitor=True) velocity = Cpt(EpicsSignalWithRBV, suffix="move_velocity", kind="config", auto_monitor=True) - ########### Angle PVs ############# - - # TODO Pseudo motor for angle? - feedback_pos_angle = Cpt( - EpicsSignalRO, suffix="feedback_pos_angle_RBV", kind="normal", auto_monitor=True - ) - setpoint_abs_angle = Cpt( - EpicsSignalWithRBV, suffix="set_abs_pos_angle", kind="normal", auto_monitor=True - ) - low_limit_angle = Cpt( - EpicsSignalRO, suffix="lo_lim_pos_angle_RBV", kind="config", auto_monitor=True - ) - high_limit_angle = Cpt( - EpicsSignalRO, suffix="hi_lim_pos_angle_RBV", kind="config", auto_monitor=True - ) - ########## Move Command PVs ########## move_abs = Cpt(EpicsSignal, suffix="move_abs", kind="config", put_complete=True) @@ -290,9 +270,7 @@ class Mo1BraggPositioner(Device, PositionerBase): @property def limits(self) -> tuple: """Return limits of the Bragg positioner""" - if self.move_type.get() == MoveType.ENERGY: - return (self.low_lim.get(), self.high_lim.get()) - return (self.low_limit_angle.get(), self.high_limit_angle.get()) + return (self.low_lim.get(), self.high_lim.get()) @property def low_limit(self) -> float: @@ -307,16 +285,12 @@ class Mo1BraggPositioner(Device, PositionerBase): @property def egu(self) -> str: """Return the engineering units of the positioner""" - if self.move_type.get() == MoveType.ENERGY: - return "eV" - return "deg" + return "eV" @property def position(self) -> float: """Return the current position of Mo1Bragg, considering the move type""" - move_type = self.move_type.get() - move_cpt = self.readback if move_type == MoveType.ENERGY else self.feedback_pos_angle - return move_cpt.get() + return self.readback.get() # pylint: disable=arguments-differ def check_value(self, value: float) -> None: @@ -332,7 +306,7 @@ class Mo1BraggPositioner(Device, PositionerBase): raise LimitError(f"position={value} not within limits {self.limits}") def _move_and_finish( - self, target_pos: float, move_cpt: Cpt, status: DeviceStatus, update_frequency: float = 0.1 + self, target_pos: float, status: DeviceStatus, update_frequency: float = 0.1 ) -> None: """ Method to be called in the move thread to move the Bragg positioner @@ -351,7 +325,7 @@ class Mo1BraggPositioner(Device, PositionerBase): """ try: # Set the target position on IOC - move_cpt.put(target_pos) + self.setpoint.put(target_pos) self.move_abs.put(1) # Currently sleep is needed due to delay in updates on PVs, maybe time can be reduced time.sleep(0.5) @@ -367,7 +341,7 @@ class Mo1BraggPositioner(Device, PositionerBase): logger.error(f"Error in move thread of device {self.name}: {content}") status.set_exception(exc=exc) - def move(self, value: float, move_type: str | MoveType = None, **kwargs) -> DeviceStatus: + def move(self, value: float, **kwargs) -> DeviceStatus: """ Move the Bragg positioner to the specified value, allows to switch between move types angle and energy. @@ -381,16 +355,12 @@ class Mo1BraggPositioner(Device, PositionerBase): DeviceStatus : status object to track the motion """ self._stopped = False - if move_type is not None: - self.move_type.put(move_type) - move_type = self.move_type.get() - move_cpt = self.setpoint if move_type == MoveType.ENERGY else self.setpoint_abs_angle self.check_value(value) status = DeviceStatus(device=self) self._move_thread = threading.Thread( - target=self._move_and_finish, args=(value, move_cpt, status, 0.1) + target=self._move_and_finish, args=(value, status, 0.1) ) self._move_thread.start() return status diff --git a/debye_bec/scans/mono_bragg_scans.py b/debye_bec/scans/mono_bragg_scans.py index 57ed477..e1ac5b6 100644 --- a/debye_bec/scans/mono_bragg_scans.py +++ b/debye_bec/scans/mono_bragg_scans.py @@ -72,11 +72,8 @@ class XASSimpleScan(AsyncFlyScanBase): yield None def pre_scan(self): - """Pre Scan action. Ensure the motor movetype is set to energy, then check - limits for start/end energy. - #TODO Remove once the motor movetype is removed and ANGLE motion is a pseudo motor. + """Pre Scan action. """ - yield from self.stubs.send_rpc_and_wait(self.motor, "move_type.set", "energy") self._check_limits() # Ensure parent class pre_scan actions to be called. -- 2.49.1 From c782324065d49d1a6726f496099afad25ac4db36 Mon Sep 17 00:00:00 2001 From: gac-x01da Date: Fri, 9 May 2025 08:41:44 +0200 Subject: [PATCH 2/6] feat(mo1-bragg-angle): add Mo1BraggAngle positioner class; closes #14 --- .../devices/mo1_bragg/mo1_bragg_angle.py | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) create mode 100644 debye_bec/devices/mo1_bragg/mo1_bragg_angle.py diff --git a/debye_bec/devices/mo1_bragg/mo1_bragg_angle.py b/debye_bec/devices/mo1_bragg/mo1_bragg_angle.py new file mode 100644 index 0000000..dbc4f51 --- /dev/null +++ b/debye_bec/devices/mo1_bragg/mo1_bragg_angle.py @@ -0,0 +1,19 @@ +from debye_bec.devices.mo1_bragg.mo1_bragg_devices import Mo1BraggPositioner +from ophyd import Component as Cpt +from ophyd import EpicsSignalWithRBV, EpicsSignalRO + +class Mo1BraggAngle(Mo1BraggPositioner): + """ Positioner implementation with readback angle of the MO1 Bragg positioner.""" + + readback = Cpt( + EpicsSignalRO, suffix="feedback_pos_angle_RBV", kind="normal", auto_monitor=True + ) + setpoint = Cpt( + EpicsSignalWithRBV, suffix="set_abs_pos_angle", kind="normal", auto_monitor=True + ) + low_lim = Cpt( + EpicsSignalRO, suffix="lo_lim_pos_angle_RBV", kind="config", auto_monitor=True + ) + high_lim = Cpt( + EpicsSignalRO, suffix="hi_lim_pos_angle_RBV", kind="config", auto_monitor=True + ) \ No newline at end of file -- 2.49.1 From 4d9a062b8c56f5f0490cf33eff435434c15fb85b Mon Sep 17 00:00:00 2001 From: appel_c Date: Fri, 9 May 2025 16:00:07 +0200 Subject: [PATCH 3/6] tests: fix tests and formatting --- .../devices/mo1_bragg/mo1_bragg_angle.py | 31 ++++++------ debye_bec/scans/mono_bragg_scans.py | 3 +- tests/tests_devices/test_mo1_bragg.py | 21 -------- tests/tests_scans/test_mono_bragg_scans.py | 48 ------------------- 4 files changed, 17 insertions(+), 86 deletions(-) diff --git a/debye_bec/devices/mo1_bragg/mo1_bragg_angle.py b/debye_bec/devices/mo1_bragg/mo1_bragg_angle.py index dbc4f51..46a095a 100644 --- a/debye_bec/devices/mo1_bragg/mo1_bragg_angle.py +++ b/debye_bec/devices/mo1_bragg/mo1_bragg_angle.py @@ -1,19 +1,20 @@ -from debye_bec.devices.mo1_bragg.mo1_bragg_devices import Mo1BraggPositioner +"""Positioner implementation with readback angle of the MO1 Bragg positioner.""" + from ophyd import Component as Cpt -from ophyd import EpicsSignalWithRBV, EpicsSignalRO +from ophyd import EpicsSignalRO, EpicsSignalWithRBV + +from debye_bec.devices.mo1_bragg.mo1_bragg_devices import Mo1BraggPositioner + class Mo1BraggAngle(Mo1BraggPositioner): - """ Positioner implementation with readback angle of the MO1 Bragg positioner.""" + """Positioner implementation with readback angle of the MO1 Bragg positioner.""" - readback = Cpt( - EpicsSignalRO, suffix="feedback_pos_angle_RBV", kind="normal", auto_monitor=True - ) - setpoint = Cpt( - EpicsSignalWithRBV, suffix="set_abs_pos_angle", kind="normal", auto_monitor=True - ) - low_lim = Cpt( - EpicsSignalRO, suffix="lo_lim_pos_angle_RBV", kind="config", auto_monitor=True - ) - high_lim = Cpt( - EpicsSignalRO, suffix="hi_lim_pos_angle_RBV", kind="config", auto_monitor=True - ) \ No newline at end of file + readback = Cpt(EpicsSignalRO, suffix="feedback_pos_angle_RBV", kind="normal", auto_monitor=True) + setpoint = Cpt(EpicsSignalWithRBV, suffix="set_abs_pos_angle", kind="normal", auto_monitor=True) + low_lim = Cpt(EpicsSignalRO, suffix="lo_lim_pos_angle_RBV", kind="config", auto_monitor=True) + high_lim = Cpt(EpicsSignalRO, suffix="hi_lim_pos_angle_RBV", kind="config", auto_monitor=True) + + @property + def egu(self) -> str: + """Return the engineering unit of the positioner.""" + return "deg" diff --git a/debye_bec/scans/mono_bragg_scans.py b/debye_bec/scans/mono_bragg_scans.py index e1ac5b6..1dc131b 100644 --- a/debye_bec/scans/mono_bragg_scans.py +++ b/debye_bec/scans/mono_bragg_scans.py @@ -72,8 +72,7 @@ class XASSimpleScan(AsyncFlyScanBase): yield None def pre_scan(self): - """Pre Scan action. - """ + """Pre Scan action.""" self._check_limits() # Ensure parent class pre_scan actions to be called. diff --git a/tests/tests_devices/test_mo1_bragg.py b/tests/tests_devices/test_mo1_bragg.py index 534bead..933ad05 100644 --- a/tests/tests_devices/test_mo1_bragg.py +++ b/tests/tests_devices/test_mo1_bragg.py @@ -52,7 +52,6 @@ def test_init(mock_bragg): dev = mock_bragg assert dev.name == "bragg" assert dev.prefix == "X01DA-OP-MO1:BRAGG:" - assert dev.move_type.get() == MoveType.ENERGY assert dev.crystal.offset_si111._read_pvname == "X01DA-OP-MO1:BRAGG:offset_si111_RBV" assert dev.move_abs._read_pvname == "X01DA-OP-MO1:BRAGG:move_abs" @@ -61,29 +60,15 @@ def test_check_value(mock_bragg): dev = mock_bragg dev.low_lim._read_pv.mock_data = 0 dev.high_lim._read_pv.mock_data = 1 - dev.low_limit_angle._read_pv.mock_data = 10 - dev.high_limit_angle._read_pv.mock_data = 20 - # Check that limits are taken correctly from angle or energy - # Energy first - move_type = MoveType.ENERGY - dev.move_type.set(move_type) # nothing happens dev.check_value(0.5) with pytest.raises(LimitError): dev.check_value(15) - # Angle next - move_type = MoveType.ANGLE - dev.move_type.set(move_type) - dev.check_value(15) - with pytest.raises(LimitError): - dev.check_value(0.5) def test_egu(mock_bragg): dev = mock_bragg assert dev.egu == "eV" - dev.move_type.set(MoveType.ANGLE) - assert dev.egu == "deg" def test_move_succeeds(mock_bragg): @@ -135,16 +120,10 @@ def test_set_xtal(mock_bragg): def test_set_xas_settings(mock_bragg): dev = mock_bragg - dev.move_type.set(MoveType.ENERGY) dev.set_xas_settings(low=0.5, high=1, scan_time=0.1) assert dev.scan_settings.s_scan_energy_lo.get() == 0.5 assert dev.scan_settings.s_scan_energy_hi.get() == 1 assert dev.scan_settings.s_scan_scantime.get() == 0.1 - dev.move_type.set(MoveType.ANGLE) - dev.set_xas_settings(low=10, high=20, scan_time=1) - assert dev.scan_settings.s_scan_angle_lo.get() == 10 - assert dev.scan_settings.s_scan_angle_hi.get() == 20 - assert dev.scan_settings.s_scan_scantime.get() == 1 def test_set_trig_settings(mock_bragg): diff --git a/tests/tests_scans/test_mono_bragg_scans.py b/tests/tests_scans/test_mono_bragg_scans.py index 51ad94f..52dec76 100644 --- a/tests/tests_scans/test_mono_bragg_scans.py +++ b/tests/tests_scans/test_mono_bragg_scans.py @@ -92,18 +92,6 @@ def test_xas_simple_scan(scan_assembler, ScanStubStatusMock): action="read", parameter={}, ), - DeviceInstructionMessage( - metadata={"readout_priority": "monitored", "RID": "my_test_request_id"}, - device="mo1_bragg", - action="rpc", - parameter={ - "device": "mo1_bragg", - "func": "move_type.set", - "rpc_id": "my_test_rpc_id", - "args": ("energy",), - "kwargs": {}, - }, - ), DeviceInstructionMessage( metadata={"readout_priority": "monitored", "RID": "my_test_request_id"}, device=["bpm4i", "eiger", "mo1_bragg", "nidaq", "samx"], @@ -205,18 +193,6 @@ def test_xas_simple_scan_with_xrd(scan_assembler, ScanStubStatusMock): action="read", parameter={}, ), - DeviceInstructionMessage( - metadata={"readout_priority": "monitored", "RID": "my_test_request_id"}, - device="mo1_bragg", - action="rpc", - parameter={ - "device": "mo1_bragg", - "func": "move_type.set", - "rpc_id": "my_test_rpc_id", - "args": ("energy",), - "kwargs": {}, - }, - ), DeviceInstructionMessage( metadata={"readout_priority": "monitored", "RID": "my_test_request_id"}, device=["bpm4i", "eiger", "mo1_bragg", "nidaq", "samx"], @@ -312,18 +288,6 @@ def test_xas_advanced_scan(scan_assembler, ScanStubStatusMock): action="read", parameter={}, ), - DeviceInstructionMessage( - metadata={"readout_priority": "monitored", "RID": "my_test_request_id"}, - device="mo1_bragg", - action="rpc", - parameter={ - "device": "mo1_bragg", - "func": "move_type.set", - "rpc_id": "my_test_rpc_id", - "args": ("energy",), - "kwargs": {}, - }, - ), DeviceInstructionMessage( metadata={"readout_priority": "monitored", "RID": "my_test_request_id"}, device=["bpm4i", "eiger", "mo1_bragg", "nidaq", "samx"], @@ -427,18 +391,6 @@ def test_xas_advanced_scan_with_xrd(scan_assembler, ScanStubStatusMock): action="read", parameter={}, ), - DeviceInstructionMessage( - metadata={"readout_priority": "monitored", "RID": "my_test_request_id"}, - device="mo1_bragg", - action="rpc", - parameter={ - "device": "mo1_bragg", - "func": "move_type.set", - "rpc_id": "my_test_rpc_id", - "args": ("energy",), - "kwargs": {}, - }, - ), DeviceInstructionMessage( metadata={"readout_priority": "monitored", "RID": "my_test_request_id"}, device=["bpm4i", "eiger", "mo1_bragg", "nidaq", "samx"], -- 2.49.1 From 87ea95e975700b9c6dc8ac7c9bf8251b2d39aa45 Mon Sep 17 00:00:00 2001 From: appel_c Date: Fri, 9 May 2025 16:25:41 +0200 Subject: [PATCH 4/6] fix: improve move method of mo1_bragg_devices --- debye_bec/devices/mo1_bragg/mo1_bragg_devices.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/debye_bec/devices/mo1_bragg/mo1_bragg_devices.py b/debye_bec/devices/mo1_bragg/mo1_bragg_devices.py index fada768..37cd9ce 100644 --- a/debye_bec/devices/mo1_bragg/mo1_bragg_devices.py +++ b/debye_bec/devices/mo1_bragg/mo1_bragg_devices.py @@ -251,9 +251,7 @@ class Mo1BraggPositioner(Device, PositionerBase): success (bool) : Flag to indicate if the motion was successful """ self.move_stop.put(1) - if self._move_thread is not None: - self._move_thread.join() - self._move_thread = None + self._stopped = True super().stop(success=success) def stop_scan(self) -> None: @@ -323,12 +321,14 @@ class Mo1BraggPositioner(Device, PositionerBase): update_frequency (float): Optional, frequency to update the current position of the motion, defaults to 0.1s """ + motor_name = None try: # Set the target position on IOC self.setpoint.put(target_pos) self.move_abs.put(1) # Currently sleep is needed due to delay in updates on PVs, maybe time can be reduced time.sleep(0.5) + motor_name = self.name while self.motor_is_moving.get() == 0: if self.stopped: raise Mo1BraggStoppedError(f"Device {self.name} was stopped") @@ -338,7 +338,9 @@ class Mo1BraggPositioner(Device, PositionerBase): # pylint: disable=broad-except except Exception as exc: content = traceback.format_exc() - logger.error(f"Error in move thread of device {self.name}: {content}") + logger.error( + f"Error in move thread of device {motor_name if motor_name else ''}: {content}" + ) status.set_exception(exc=exc) def move(self, value: float, **kwargs) -> DeviceStatus: -- 2.49.1 From b3f63f4f765ad42e77d12fa9ca1a596b057f4185 Mon Sep 17 00:00:00 2001 From: appel_c Date: Fri, 9 May 2025 16:26:00 +0200 Subject: [PATCH 5/6] tests(mo1-bragg-angle): add tests for mo1_bragg_angle --- tests/tests_devices/test_mo1_bragg_angle.py | 88 +++++++++++++++++++++ 1 file changed, 88 insertions(+) create mode 100644 tests/tests_devices/test_mo1_bragg_angle.py diff --git a/tests/tests_devices/test_mo1_bragg_angle.py b/tests/tests_devices/test_mo1_bragg_angle.py new file mode 100644 index 0000000..c09c055 --- /dev/null +++ b/tests/tests_devices/test_mo1_bragg_angle.py @@ -0,0 +1,88 @@ +"""Tests for the Mo1BraggAngle class.""" + +import threading +from unittest import mock + +import ophyd +import pytest +from ophyd_devices.tests.utils import MockPV, patch_dual_pvs + +from debye_bec.devices.mo1_bragg.mo1_bragg_angle import Mo1BraggAngle +from debye_bec.devices.mo1_bragg.mo1_bragg_devices import Mo1BraggStoppedError + +# pylint: disable=protected-access + + +@pytest.fixture(scope="function") +def mock_bragg() -> Mo1BraggAngle: + """Fixture for the Mo1BraggAngle device.""" + name = "bragg" + prefix = "X01DA-OP-MO1:BRAGG:" + with mock.patch.object(ophyd, "cl") as mock_cl: + mock_cl.get_pv = MockPV + mock_cl.thread_class = threading.Thread + dev = Mo1BraggAngle(name=name, prefix=prefix) + patch_dual_pvs(dev) + yield dev + + +def test_mo1_bragg_angle_init(mock_bragg): + """Test the initialization of the Mo1BraggAngle device.""" + assert mock_bragg.name == "bragg" + assert mock_bragg.prefix == "X01DA-OP-MO1:BRAGG:" + assert isinstance(mock_bragg.readback, ophyd.EpicsSignalRO) + assert isinstance(mock_bragg.setpoint, ophyd.EpicsSignalWithRBV) + assert isinstance(mock_bragg.low_lim, ophyd.EpicsSignalRO) + assert isinstance(mock_bragg.high_lim, ophyd.EpicsSignalRO) + + +def test_mo1_bragg_angle_egu(mock_bragg): + """Test the engineering unit of the Mo1BraggAngle device.""" + assert mock_bragg.egu == "deg" + + +def test_mo1_bragg_angle_limits(mock_bragg): + """Test the limits of the Mo1BraggAngle device.""" + mock_bragg.low_lim._read_pv.mock_data = -10 + mock_bragg.high_lim._read_pv.mock_data = 10 + + assert mock_bragg.limits == (-10, 10) + + +def test_mo1_bragg_angle_move(mock_bragg): + """Test the move method of the Mo1BraggAngle device.""" + mock_bragg.setpoint.put(0) + mock_bragg.readback._read_pv.mock_data = 0 # EpicsSignalRO + + # Change PV for motor is moving before starting the move + mock_bragg.motor_is_moving._read_pv.mock_data = 0 # EpicsSignalRO + status = mock_bragg.move(5) + assert status.done is False + + # Check setpoint is set correctly + assert mock_bragg.setpoint.get() == 5 + + # Update the motor is moving PV to simulate that the move is done + mock_bragg.motor_is_moving._read_pv.mock_data = 1 + assert mock_bragg.motor_is_moving.get() == 1 + + status.wait(timeout=5) # If the status does not resolve after 5 seconds, something is wrong + assert status.done is True + + +def test_mo1_bragg_angle_stop(mock_bragg): + """Test the stop method of the Mo1BraggAngle device.""" + assert mock_bragg.stopped is False + + mock_bragg.stop() + assert mock_bragg.stopped is True + + status = mock_bragg.move(5) + assert status.done is False + + # stopped should be resetted + assert mock_bragg.stopped is False + + with pytest.raises(Mo1BraggStoppedError): + mock_bragg.stop() + status.wait(timeout=5) # This should raise before due to stop() call -- 2.49.1 From 0cdad97d00ea6f6da2af4658cc63a459cd79650f Mon Sep 17 00:00:00 2001 From: gac-x01da Date: Fri, 16 May 2025 15:02:28 +0200 Subject: [PATCH 6/6] refactor: add mo1_bragg_angle to x01da_test_config --- .../device_configs/x01da_test_config.yaml | 27 ++++++++++++------- 1 file changed, 18 insertions(+), 9 deletions(-) diff --git a/debye_bec/device_configs/x01da_test_config.yaml b/debye_bec/device_configs/x01da_test_config.yaml index 97b856e..13def75 100644 --- a/debye_bec/device_configs/x01da_test_config.yaml +++ b/debye_bec/device_configs/x01da_test_config.yaml @@ -219,6 +219,15 @@ dummy_pv: onFailure: retry enabled: true softwareTrigger: false +mo1_bragg_angle: + readoutPriority: baseline + description: Positioner for the Monochromator + deviceClass: debye_bec.devices.mo1_bragg.mo1_bragg_angle.Mo1BraggAngle + deviceConfig: + prefix: "X01DA-OP-MO1:BRAGG:" + onFailure: retry + enabled: true + softwareTrigger: false # NIDAQ nidaq: @@ -337,15 +346,15 @@ reffoilchanger: # enabled: true # softwareTrigger: false -# xray_eye: -# readoutPriority: async -# description: X-ray eye -# deviceClass: debye_bec.devices.cameras.basler_cam.BaslerCam -# deviceConfig: -# prefix: "X01DA-ES-XRAYEYE:" -# onFailure: retry -# enabled: true -# softwareTrigger: false +xray_eye: + readoutPriority: async + description: X-ray eye + deviceClass: debye_bec.devices.cameras.basler_cam.BaslerCam + deviceConfig: + prefix: "X01DA-ES-XRAYEYE:" + onFailure: retry + enabled: true + softwareTrigger: false # Pilatus Curtain # pilatus_curtain: -- 2.49.1