diff --git a/debye_bec/devices/mo1_bragg.py b/debye_bec/devices/mo1_bragg.py index 4019d9b..c07e4b7 100644 --- a/debye_bec/devices/mo1_bragg.py +++ b/debye_bec/devices/mo1_bragg.py @@ -35,7 +35,7 @@ logger = bec_logger.logger class ScanControlScanStatus(int, enum.Enum): - """Enum class for the scan status of Bragg positioner""" + """Enum class for the scan status of the Bragg positioner""" PARAMETER_WRONG = 0 VALIDATION_PENDING = 1 @@ -44,7 +44,7 @@ class ScanControlScanStatus(int, enum.Enum): class ScanControlLoadMessage(int, enum.Enum): - """Enum for validating messages for load message of Bragg positioner""" + """Enum for validating messages for load message of the Bragg positioner""" PENDING = 0 STARTED = 1 @@ -194,7 +194,10 @@ class Mo1BraggScanControl(Device): @dataclass class ScanParameter: - """Dataclass to store the scan parameters for the Mo1 Bragg positioner""" + """Dataclass to store the scan parameters for the Mo1 Bragg positioner. + This needs to be in sync with the kwargs of the MO1 Bragg scans from Debye, to + ensure that the scan parameters are correctly set. Any changes in the scan kwargs, + i.e. renaming or adding new parameters, need to be represented here as well.""" scan_time: float = None scan_duration: float = None @@ -227,7 +230,6 @@ class Mo1Bragg(Device, PositionerBase): # signal to indicate the move type 'energy' or 'angle' move_type = Cpt(MoveTypeSignal, value=MoveType.ENERGY, kind="config") - ################ Motor PVs ################ # Energy PVs readback = Cpt( EpicsSignalRO, suffix="feedback_pos_energy_RBV", kind="hinted", auto_monitor=True @@ -312,7 +314,6 @@ class Mo1Bragg(Device, PositionerBase): value (int) : current progress value """ max_value = 100 - # logger.info(f"Progress at {value}") self._run_subs( sub_type=self.SUB_PROGRESS, value=value, @@ -336,9 +337,7 @@ class Mo1Bragg(Device, PositionerBase): Args: success (bool) : Flag to indicate if the motion was successful """ - # Stop any motion on the device self.move_stop.put(1) - # Stop the move thread self._stopped = True if self._move_thread is not None: self._move_thread.join() @@ -412,33 +411,24 @@ class Mo1Bragg(Device, PositionerBase): update_frequency (float): Optional, frequency to update the current position of the motion, defaults to 0.1s """ success = True - exception = None try: # Set the target position on IOC move_cpt.put(target_pos) - # Start motion self.move_abs.put(1) # Currently sleep is needed due to delay in updates on PVs, maybe time can be reduced time.sleep(0.5) while self.motor_is_moving.get() == 0: - val = read_cpt.get() - self._run_subs(sub_type=self.SUB_READBACK, value=val) if self.stopped: success = False break time.sleep(update_frequency) - + # pylint: disable=protected-access + status._finished(success=success) # pylint: disable=broad-except except Exception as exc: content = traceback.format_exc() logger.error(f"Error in move thread of device {self.name}: {content}") - exception = exc - finally: - if exception: - status.set_exception(exc=exception) - else: - # pylint: disable=protected-access - status._finished(success=success) + status.set_exception(exc=exc) def move(self, value: float, move_type: str | MoveType = None, **kwargs) -> DeviceStatus: """Move the Bragg positioner to the specified value, allows to switch between move types angle and energy. @@ -579,6 +569,7 @@ class Mo1Bragg(Device, PositionerBase): def kickoff(self): """Kickoff the device, called from BEC.""" scan_duration = self.scan_control.scan_duration.get() + # TODO implement better logic for infinite scans, at least bring it up with Debye start_func = ( self.scan_control.scan_start_infinite.put if scan_duration < 0.1 @@ -617,21 +608,22 @@ class Mo1Bragg(Device, PositionerBase): """ state = self.scan_control.scan_msg.get() if state != target_state: - logger.warning(f"Resetting scan validation in stage for state: {ScanControlLoadMessage(state)}, " \ - f"retry .get() on scan_control: {ScanControlLoadMessage(self.scan_control.scan_msg.get())} and sleeping 1s") + logger.warning( + f"Resetting scan validation in stage for state: {ScanControlLoadMessage(state)}, " + f"retry .get() on scan_control: {ScanControlLoadMessage(self.scan_control.scan_msg.get())} and sleeping 1s" + ) self.scan_control.scan_val_reset.put(1) # Sleep to ensure the reset is done time.sleep(1) - if not self.wait_for_signals( signal_conditions=[(self.scan_control.scan_msg.get, target_state)], timeout=self.timeout_for_pvwait, check_stopped=True, ): raise TimeoutError( - f"Timeout after {self.timeout_for_pvwait} while waiting for scan status," \ - f" current state: {ScanControlScanStatus(self.scan_control.scan_msg.get())}" + f"Timeout after {self.timeout_for_pvwait} while waiting for scan status," + f" current state: {ScanControlScanStatus(self.scan_control.scan_msg.get())}" ) def on_stage(self) -> None: @@ -681,7 +673,9 @@ class Mo1Bragg(Device, PositionerBase): mode=ScanControlMode.SIMPLE, scan_duration=self.scan_parameter.scan_duration ) else: - raise Mo1BraggError(f"Scan mode {scan_name} not implemented for device {self.name}") + raise Mo1BraggError( + f"Scan mode {scan_name} not implemented for scan_type={self.scaninfo.scan_type} on device {self.name}" + ) # Load the scan parameters to the controller self.scan_control.scan_load.put(1) # Wait for params to be checked from controller @@ -728,14 +722,15 @@ class Mo1Bragg(Device, PositionerBase): return super().unstage() def on_unstage(self) -> None: - """Actions to be executed when the device is unstaged.""" + """Actions to be executed when the device is unstaged. + The checks here ensure that the controller resets the Scan_msg to PENDING state.""" if self.wait_for_signals( signal_conditions=[(self.scan_control.scan_msg.get, ScanControlLoadMessage.PENDING)], timeout=self.timeout_for_pvwait, check_stopped=True, ): return - + self.scan_control.scan_val_reset.put(1) if not self.wait_for_signals( signal_conditions=[(self.scan_control.scan_msg.get, ScanControlLoadMessage.PENDING)], @@ -746,8 +741,6 @@ class Mo1Bragg(Device, PositionerBase): f"Timeout after {self.timeout_for_pvwait} while waiting for scan validation" ) - - # -------------- End Flyer Interface methods -----------------# # -------------- Utility methods -----------------# @@ -850,7 +843,8 @@ class Mo1Bragg(Device, PositionerBase): result = self.wait_for_signals( signal_conditions, timeout, check_stopped, interval, all_signals ) - if result: + if result is True: + # pylint: disable=protected-access status.set_finished() else: status.set_exception(exception_on_timeout) diff --git a/tests/tests_devices/test_mo1_bragg.py b/tests/tests_devices/test_mo1_bragg.py index 5d2c85f..afc7920 100644 --- a/tests/tests_devices/test_mo1_bragg.py +++ b/tests/tests_devices/test_mo1_bragg.py @@ -7,7 +7,11 @@ from unittest import mock import ophyd import pytest -from bec_lib.messages import ScanStatusMessage +from bec_lib.messages import ScanQueueMessage, ScanStatusMessage +from bec_server.scan_server.scan_assembler import ScanAssembler +from bec_server.scan_server.scan_queue import RequestBlock +from bec_server.scan_server.scan_worker import ScanWorker +from bec_server.scan_server.tests.fixtures import scan_server_mock from ophyd.utils import LimitError from ophyd_devices.tests.utils import MockPV @@ -23,8 +27,7 @@ from debye_bec.devices.mo1_bragg import ( # TODO move this function to ophyd_devices, it is duplicated in csaxs_bec and needed for other pluging repositories from debye_bec.devices.test_utils.utils import patch_dual_pvs -from bec_server.scan_server.tests.fixtures import scan_server_mock -from bec_server.scan_server.scan_worker import ScanWorker + @pytest.fixture(scope="function") def scan_worker_mock(scan_server_mock): @@ -271,97 +274,163 @@ def test_unstage(mock_bragg): assert mock_put.call_count == 1 -def test_stage_(mock_bragg): - # Test unknown scan type first - mock_bragg.scan_control.scan_msg._read_pv.mock_data = ScanControlLoadMessage.SUCCESS - msg = ScanStatusMessage( - scan_id="my_scan_id", - status="closed", - info={ - "kwargs": { - "start": 0, - "stop": 5, - "scan_time": 1, - "scan_duration": 10, - "xrd_enable_low": True, - "xrd_enable_high": False, - "num_trigger_low": 1, - "num_trigger_high": 7, - "exp_time_low": 1, - "exp_time_high": 3, - "cycle_low": 1, - "cycle_high": 5, - } - }, - metadata={}, - ) - mock_bragg.scaninfo.scan_msg = msg - mock_bragg.scaninfo.scan_msg.content["info"].update({"scan_name": "unknown_fly_scan"}) - with ( - mock.patch.object(mock_bragg.scaninfo, "load_scan_metadata") as mock_load_scan_metadata, - mock.patch.object(mock_bragg, "_check_scan_msg") as mock_check_scan_msg, - ): - with pytest.raises(Mo1BraggError): - mock_bragg.stage() - assert mock_check_scan_msg.call_count == 1 - assert mock_load_scan_metadata.call_count == 1 - # Test simple XAS scan - mock_bragg.scaninfo.scan_msg.content["info"].update({"scan_name": "xas_simple_scan"}) +@pytest.mark.parametrize( + "msg", + [ + ScanQueueMessage( + scan_type="monitor_scan", + parameter={ + "args": {}, + "kwargs": {"device": "mo1_bragg", "start": 0, "stop": 10, "relative": True}, + "num_points": 100, + }, + queue="primary", + metadata={"RID": "test1234"}, + ), + ScanQueueMessage( + scan_type="xas_simple_scan", + parameter={ + "args": {}, + "kwargs": { + "motor": "mo1_bragg", + "start": 0, + "stop": 10, + "scan_time": 1, + "scan_duration": 10, + }, + "num_points": 100, + }, + queue="primary", + metadata={"RID": "test1234"}, + ), + ScanQueueMessage( + scan_type="xas_simple_scan_with_xrd", + parameter={ + "args": {}, + "kwargs": { + "motor": "mo1_bragg", + "start": 0, + "stop": 10, + "scan_time": 1, + "scan_duration": 10, + "xrd_enable_low": True, + "xrd_enable_high": False, + "num_trigger_low": 1, + "num_trigger_high": 7, + "exp_time_low": 1, + "exp_time_high": 3, + "cycle_low": 1, + "cycle_high": 5, + }, + "num_points": 10, + }, + queue="primary", + metadata={"RID": "test1234"}, + ), + ], +) +def test_stage(mock_bragg, scan_worker_mock, msg): + """This test is important to check that the stage method of the device is working correctly. + Changing the kwargs names in the scans is tightly linked to the logic on the device, thus + it is important to check that the stage method is working correctly for the current implementation. + Therefor, this test creates a scaninfo message using the scan.open_scan() method to always check + agains the currently implemented scans vs. the logic on the device""" + # Create a scaninfo message using scans the ScanQueueMessages above, 3 cases of fly scan; for the general case the procedure is not defined yet + worker = scan_worker_mock + scan_server = worker.parent + rb = RequestBlock(msg, assembler=ScanAssembler(parent=scan_server)) + with mock.patch.object(worker, "current_instruction_queue_item"): + worker.scan_motors = [] + worker.readout_priority = { + "monitored": [], + "baseline": [], + "async": [], + "continuous": [], + "on_request": [], + } + open_scan_msg = list(rb.scan.open_scan())[0] + worker._initialize_scan_info(rb, open_scan_msg, msg.content["parameter"].get("num_points")) + scan_status_msg = ScanStatusMessage( + scan_id="test1234", status="closed", info=worker.current_scan_info, metadata={} + ) + mock_bragg.scaninfo.scan_msg = scan_status_msg + + # Ensure that ScanControlLoadMessage is set to SUCCESS + mock_bragg.scan_control.scan_msg._read_pv.mock_data = ScanControlLoadMessage.SUCCESS with ( - mock.patch.object(mock_bragg, "set_xas_settings") as mock_xas_settings, - mock.patch.object(mock_bragg, "set_xrd_settings") as mock_xrd_settings, - mock.patch.object( - mock_bragg, "set_scan_control_settings" - ) as mock_set_scan_control_settings, + mock.patch.object(mock_bragg.scaninfo, "load_scan_metadata") as mock_load_scan_metadata, + mock.patch.object(mock_bragg, "_check_scan_msg") as mock_check_scan_msg, + mock.patch.object(mock_bragg, "on_unstage"), ): - mock_bragg.stage() - assert mock_xas_settings.call_args == mock.call( - low=msg.content["info"]["kwargs"]["start"], - high=msg.content["info"]["kwargs"]["stop"], - scan_time=msg.content["info"]["kwargs"]["scan_time"], - ) - assert mock_xrd_settings.call_args == mock.call( - enable_low=False, - enable_high=False, - num_trigger_low=0, - num_trigger_high=0, - exp_time_low=0, - exp_time_high=0, - cycle_low=0, - cycle_high=0, - ) - assert mock_set_scan_control_settings.call_args == mock.call( - mode=ScanControlMode.SIMPLE, - scan_duration=msg.content["info"]["kwargs"]["scan_duration"], - ) - mock_bragg.scaninfo.scan_msg.content["info"].update( - {"scan_name": "xas_simple_scan_with_xrd"} - ) - # Unstage mock_bragg to reset _staged - mock_bragg._staged= ophyd.Staged.no - time.sleep(1) - # Test simple XAS scan with XRD - mock_bragg.stage() - assert mock_xas_settings.call_args == mock.call( - low=msg.content["info"]["kwargs"]["start"], - high=msg.content["info"]["kwargs"]["stop"], - scan_time=msg.content["info"]["kwargs"]["scan_time"], - ) - assert mock_xrd_settings.call_args == mock.call( - enable_low=msg.content["info"]["kwargs"]["xrd_enable_low"], - enable_high=msg.content["info"]["kwargs"]["xrd_enable_high"], - num_trigger_low=msg.content["info"]["kwargs"]["num_trigger_low"], - num_trigger_high=msg.content["info"]["kwargs"]["num_trigger_high"], - exp_time_low=msg.content["info"]["kwargs"]["exp_time_low"], - exp_time_high=msg.content["info"]["kwargs"]["exp_time_high"], - cycle_low=msg.content["info"]["kwargs"]["cycle_low"], - cycle_high=msg.content["info"]["kwargs"]["cycle_high"], - ) - assert mock_set_scan_control_settings.call_args == mock.call( - mode=ScanControlMode.SIMPLE, - scan_duration=msg.content["info"]["kwargs"]["scan_duration"], - ) - # Test redundant staging - with pytest.raises(ophyd.utils.errors.RedundantStaging): - mock_bragg.stage() + scan_name = scan_status_msg.content["info"].get("scan_name", "") + # Chek the not implemented fly scan first, should raise Mo1BraggError + if scan_name not in ["xas_simple_scan", "xas_simple_scan_with_xrd"]: + with pytest.raises(Mo1BraggError): + mock_bragg.stage() + assert mock_check_scan_msg.call_count == 1 + assert mock_load_scan_metadata.call_count == 1 + else: + with ( + mock.patch.object(mock_bragg, "set_xas_settings") as mock_xas_settings, + mock.patch.object(mock_bragg, "set_xrd_settings") as mock_xrd_settings, + mock.patch.object( + mock_bragg, "set_scan_control_settings" + ) as mock_set_scan_control_settings, + ): + # Check xas_simple_scan + if scan_name == "xas_simple_scan": + mock_bragg.stage() + assert mock_xas_settings.call_args == mock.call( + low=scan_status_msg.content["info"]["kwargs"]["start"], + high=scan_status_msg.content["info"]["kwargs"]["stop"], + scan_time=scan_status_msg.content["info"]["kwargs"]["scan_time"], + ) + assert mock_xrd_settings.call_args == mock.call( + enable_low=False, + enable_high=False, + num_trigger_low=0, + num_trigger_high=0, + exp_time_low=0, + exp_time_high=0, + cycle_low=0, + cycle_high=0, + ) + assert mock_set_scan_control_settings.call_args == mock.call( + mode=ScanControlMode.SIMPLE, + scan_duration=scan_status_msg.content["info"]["kwargs"][ + "scan_duration" + ], + ) + # Check xas_simple_scan_with_xrd + elif scan_name == "xas_simple_scan_with_xrd": + mock_bragg.stage() + assert mock_xas_settings.call_args == mock.call( + low=scan_status_msg.content["info"]["kwargs"]["start"], + high=scan_status_msg.content["info"]["kwargs"]["stop"], + scan_time=scan_status_msg.content["info"]["kwargs"]["scan_time"], + ) + assert mock_xrd_settings.call_args == mock.call( + enable_low=scan_status_msg.content["info"]["kwargs"]["xrd_enable_low"], + enable_high=scan_status_msg.content["info"]["kwargs"][ + "xrd_enable_high" + ], + num_trigger_low=scan_status_msg.content["info"]["kwargs"][ + "num_trigger_low" + ], + num_trigger_high=scan_status_msg.content["info"]["kwargs"][ + "num_trigger_high" + ], + exp_time_low=scan_status_msg.content["info"]["kwargs"]["exp_time_low"], + exp_time_high=scan_status_msg.content["info"]["kwargs"][ + "exp_time_high" + ], + cycle_low=scan_status_msg.content["info"]["kwargs"]["cycle_low"], + cycle_high=scan_status_msg.content["info"]["kwargs"]["cycle_high"], + ) + assert mock_set_scan_control_settings.call_args == mock.call( + mode=ScanControlMode.SIMPLE, + scan_duration=scan_status_msg.content["info"]["kwargs"][ + "scan_duration" + ], + )