# pylint: skip-file import os import threading import time from dataclasses import fields from unittest import mock import ophyd import pytest from bec_lib.messages import ScanQueueMessage, ScanStatusMessage from bec_server.scan_server.scan_assembler import ScanAssembler from bec_server.scan_server.scan_queue import RequestBlock from bec_server.scan_server.scan_worker import ScanWorker from bec_server.scan_server.tests.fixtures import scan_server_mock from ophyd.utils import LimitError from ophyd_devices.tests.utils import MockPV # from bec_server.device_server.tests.utils import DMMock from debye_bec.devices.mo1_bragg.mo1_bragg import ( Mo1Bragg, Mo1BraggError, ScanControlLoadMessage, ScanControlMode, ScanControlScanStatus, ) from debye_bec.devices.mo1_bragg.mo1_bragg_devices import MoveType # TODO move this function to ophyd_devices, it is duplicated in csaxs_bec and needed for other pluging repositories from debye_bec.devices.test_utils.utils import patch_dual_pvs @pytest.fixture(scope="function") def scan_worker_mock(scan_server_mock): scan_server_mock.device_manager.connector = mock.MagicMock() scan_worker = ScanWorker(parent=scan_server_mock) yield scan_worker @pytest.fixture(scope="function") def mock_bragg(): name = "bragg" prefix = "X01DA-OP-MO1:BRAGG:" with mock.patch.object(ophyd, "cl") as mock_cl: mock_cl.get_pv = MockPV mock_cl.thread_class = threading.Thread dev = Mo1Bragg(name=name, prefix=prefix) patch_dual_pvs(dev) yield dev def test_init(mock_bragg): dev = mock_bragg assert dev.name == "bragg" assert dev.prefix == "X01DA-OP-MO1:BRAGG:" assert dev.move_type.get() == MoveType.ENERGY assert dev.crystal.offset_si111._read_pvname == "X01DA-OP-MO1:BRAGG:offset_si111_RBV" assert dev.move_abs._read_pvname == "X01DA-OP-MO1:BRAGG:move_abs" def test_check_value(mock_bragg): dev = mock_bragg dev.low_lim._read_pv.mock_data = 0 dev.high_lim._read_pv.mock_data = 1 dev.low_limit_angle._read_pv.mock_data = 10 dev.high_limit_angle._read_pv.mock_data = 20 # Check that limits are taken correctly from angle or energy # Energy first move_type = MoveType.ENERGY dev.move_type.set(move_type) # nothing happens dev.check_value(0.5) with pytest.raises(LimitError): dev.check_value(15) # Angle next move_type = MoveType.ANGLE dev.move_type.set(move_type) dev.check_value(15) with pytest.raises(LimitError): dev.check_value(0.5) def test_egu(mock_bragg): dev = mock_bragg assert dev.egu == "eV" dev.move_type.set(MoveType.ANGLE) assert dev.egu == "deg" def test_move_succeeds(mock_bragg): dev = mock_bragg dev.move_abs._read_pv.mock_data = 0 # Move succeeds with mock.patch.object(dev.motor_is_moving._read_pv, "mock_data", side_effect=[0, 1]): status = dev.move(0.5) # Sleep needed to allow thread to resolive in _move_and_finish, i.e. and the 0.25s sleep inside the function time.sleep(1) assert status.done is True assert status.success is True assert dev.setpoint.get() == 0.5 assert dev.move_abs.get() == 1 def test_stop_move(mock_bragg): dev = mock_bragg dev.move_abs._read_pv.mock_data = 0 dev.motor_is_moving._read_pv.mock_data = 0 # Move fails status = dev.move(0.5) time.sleep(1) assert status.done is False assert dev._stopped == False dev.stop() time.sleep(0.5) assert dev._stopped == True assert status.done is True assert status.success is False def test_set_xtal(mock_bragg): dev = mock_bragg dev.set_xtal("111") # Default values for mock assert dev.crystal.offset_si111.get() == 0 assert dev.crystal.offset_si311.get() == 0 assert dev.crystal.d_spacing_si111.get() == 0 assert dev.crystal.d_spacing_si311.get() == 0 assert dev.crystal.xtal_enum.get() == 0 dev.set_xtal("311", offset_si111=1, offset_si311=2, d_spacing_si111=3, d_spacing_si311=4) assert dev.crystal.offset_si111.get() == 1 assert dev.crystal.offset_si311.get() == 2 assert dev.crystal.d_spacing_si111.get() == 3 assert dev.crystal.d_spacing_si311.get() == 4 assert dev.crystal.xtal_enum.get() == 1 def test_set_xas_settings(mock_bragg): dev = mock_bragg dev.move_type.set(MoveType.ENERGY) dev.set_xas_settings(low=0.5, high=1, scan_time=0.1) assert dev.scan_settings.s_scan_energy_lo.get() == 0.5 assert dev.scan_settings.s_scan_energy_hi.get() == 1 assert dev.scan_settings.s_scan_scantime.get() == 0.1 dev.move_type.set(MoveType.ANGLE) dev.set_xas_settings(low=10, high=20, scan_time=1) assert dev.scan_settings.s_scan_angle_lo.get() == 10 assert dev.scan_settings.s_scan_angle_hi.get() == 20 assert dev.scan_settings.s_scan_scantime.get() == 1 def test_set_trig_settings(mock_bragg): dev = mock_bragg dev.set_trig_settings( enable_low=True, enable_high=False, exp_time_high=0.1, exp_time_low=0.01, cycle_low=1, cycle_high=3, ) assert dev.scan_settings.trig_ena_lo_enum.get() == True assert dev.scan_settings.trig_ena_hi_enum.get() == False assert dev.scan_settings.trig_every_n_lo.get() == 1 assert dev.scan_settings.trig_every_n_hi.get() == 3 assert dev.scan_settings.trig_time_lo.get() == 0.01 assert dev.scan_settings.trig_time_hi.get() == 0.1 def test_set_control_settings(mock_bragg): dev = mock_bragg dev.set_scan_control_settings(mode=ScanControlMode.SIMPLE, scan_duration=10) assert dev.scan_control.scan_mode_enum.get() == ScanControlMode.SIMPLE assert dev.scan_control.scan_duration.get() == 10 dev.set_scan_control_settings(mode=ScanControlMode.ADVANCED, scan_duration=5) assert dev.scan_control.scan_mode_enum.get() == ScanControlMode.ADVANCED assert dev.scan_control.scan_duration.get() == 5 def test_update_scan_parameters(mock_bragg): dev = mock_bragg msg = ScanStatusMessage( scan_id="my_scan_id", status="closed", request_inputs={ "kwargs": { "start": 0, "stop": 5, "scan_time": 1, "scan_duration": 10, "xrd_enable_low": True, "xrd_enable_high": False, "num_trigger_low": 1, "num_trigger_high": 7, "exp_time_low": 1, "exp_time_high": 3, "cycle_low": 1, "cycle_high": 5, "p_kink": 50, "e_kink": 8000, } }, info={ "kwargs": { "start": 0, "stop": 5, "scan_time": 1, "scan_duration": 10, "xrd_enable_low": True, "xrd_enable_high": False, "num_trigger_low": 1, "num_trigger_high": 7, "exp_time_low": 1, "exp_time_high": 3, "cycle_low": 1, "cycle_high": 5, "p_kink": 50, "e_kink": 8000, } }, metadata={}, ) mock_bragg.scan_info.msg = msg scan_param = dev.scan_parameter.model_dump() for _, v in scan_param.items(): assert v == None dev._update_scan_parameter() scan_param = dev.scan_parameter.model_dump() for k, v in scan_param.items(): assert v == msg.content["request_inputs"]["kwargs"].get(k, None) def test_kickoff_scan(mock_bragg): dev = mock_bragg dev.scan_control.scan_status._read_pv.mock_data = ScanControlScanStatus.READY dev.scan_control.scan_duration._read_pv.mock_data = 1 dev.scan_control.scan_start_timer._read_pv.mock_data = 0 dev.scan_control.scan_start_infinite._read_pv.mock_data = 0 status = dev.kickoff() assert status.done is False dev.scan_control.scan_status._read_pv.mock_data = ScanControlScanStatus.RUNNING time.sleep(0.2) assert status.done is True assert dev.scan_control.scan_start_timer.get() == 1 dev.scan_control.scan_duration._read_pv.mock_data = 0 dev.scan_control.scan_start_timer._read_pv.mock_data = 0 dev.scan_control.scan_start_infinite._read_pv.mock_data = 0 dev.scan_control.scan_start_infinite._read_pv.mock_data = 0 status = dev.kickoff() dev.kickoff() assert dev.scan_control.scan_start_infinite.get() == 1 def test_complete(mock_bragg): dev = mock_bragg dev.scan_control.scan_done._read_pv.mock_data = 0 # Normal case status = dev.complete() assert status.done is False assert status.success is False dev.scan_control.scan_done._read_pv.mock_data = 1 status.wait() # time.sleep(0.2) assert status.done is True assert status.success is True # Stop called case dev.scan_control.scan_done._read_pv.mock_data = 0 status = dev.complete() assert status.done is False assert status.success is False dev.stop() time.sleep(0.2) assert status.done is True assert status.success is False def test_unstage(mock_bragg): mock_bragg.timeout_for_pvwait = 0.5 mock_bragg.scan_control.scan_val_reset._read_pv.mock_data = 0 mock_bragg.scan_control.scan_msg._read_pv.mock_data = ScanControlLoadMessage.PENDING with mock.patch.object(mock_bragg.scan_control.scan_val_reset, "put") as mock_put: status = mock_bragg.unstage() assert mock_put.call_count == 0 mock_bragg.scan_control.scan_msg._read_pv.mock_data = ScanControlLoadMessage.SUCCESS with pytest.raises(TimeoutError): mock_bragg.unstage() assert mock_put.call_count == 1 # TODO reimplement the test for stage method # @pytest.mark.parametrize( # "msg", # [ # ScanQueueMessage( # scan_type="monitor_scan", # parameter={ # "args": {}, # "kwargs": { # "device": "mo1_bragg", # "start": 0, # "stop": 10, # "relative": True, # "system_config": {"file_suffix": None, "file_directory": None}, # }, # "num_points": 100, # }, # queue="primary", # metadata={"RID": "test1234"}, # ), # ScanQueueMessage( # scan_type="xas_simple_scan", # parameter={ # "args": {}, # "kwargs": { # "motor": "mo1_bragg", # "start": 0, # "stop": 10, # "scan_time": 1, # "scan_duration": 10, # "system_config": {"file_suffix": None, "file_directory": None}, # }, # "num_points": 100, # }, # queue="primary", # metadata={"RID": "test1234"}, # ), # ScanQueueMessage( # scan_type="xas_simple_scan_with_xrd", # parameter={ # "args": {}, # "kwargs": { # "motor": "mo1_bragg", # "start": 0, # "stop": 10, # "scan_time": 1, # "scan_duration": 10, # "xrd_enable_low": True, # "xrd_enable_high": False, # "num_trigger_low": 1, # "num_trigger_high": 7, # "exp_time_low": 1, # "exp_time_high": 3, # "cycle_low": 1, # "cycle_high": 5, # "system_config": {"file_suffix": None, "file_directory": None}, # }, # "num_points": 10, # }, # queue="primary", # metadata={"RID": "test1234"}, # ), # ScanQueueMessage( # scan_type="xas_advanced_scan", # parameter={ # "args": {}, # "kwargs": { # "motor": "mo1_bragg", # "start": 8000, # "stop": 9000, # "scan_time": 1, # "scan_duration": 10, # "p_kink": 50, # "e_kink": 8500, # "system_config": {"file_suffix": None, "file_directory": None}, # }, # "num_points": 100, # }, # queue="primary", # metadata={"RID": "test1234"}, # ), # ScanQueueMessage( # scan_type="xas_advanced_scan_with_xrd", # parameter={ # "args": {}, # "kwargs": { # "motor": "mo1_bragg", # "start": 8000, # "stop": 9000, # "scan_time": 1, # "scan_duration": 10, # "p_kink": 50, # "e_kink": 8500, # "xrd_enable_low": True, # "xrd_enable_high": False, # "num_trigger_low": 1, # "num_trigger_high": 7, # "exp_time_low": 1, # "exp_time_high": 3, # "cycle_low": 1, # "cycle_high": 5, # "system_config": {"file_suffix": None, "file_directory": None}, # }, # "num_points": 10, # }, # queue="primary", # metadata={"RID": "test1234"}, # ), # ], # ) # def test_stage(mock_bragg, scan_worker_mock, msg): # """This test is important to check that the stage method of the device is working correctly. # Changing the kwargs names in the scans is tightly linked to the logic on the device, thus # it is important to check that the stage method is working correctly for the current implementation. # Therefor, this test creates a scaninfo message using the scan.open_scan() method to always check # agains the currently implemented scans vs. the logic on the device""" # # Create a scaninfo message using scans the ScanQueueMessages above, 3 cases of fly scan; for the general case the procedure is not defined yet # worker = scan_worker_mock # scan_server = worker.parent # rb = RequestBlock(msg, assembler=ScanAssembler(parent=scan_server)) # with mock.patch.object(worker, "current_instruction_queue_item"): # worker.scan_motors = [] # worker.readout_priority = { # "monitored": [], # "baseline": [], # "async": [], # "continuous": [], # "on_request": [], # } # open_scan_msg = list(rb.scan.open_scan())[0] # worker._initialize_scan_info( # rb, open_scan_msg, msg.content["parameter"].get("num_points", 1) # ) # # TODO find a better solution to this... # scan_status_msg = ScanStatusMessage( # scan_id=worker.current_scan_id, # status="open", # scan_name=worker.current_scan_info.get("scan_name"), # scan_number=worker.current_scan_info.get("scan_number"), # session_id=worker.current_scan_info.get("session_id"), # dataset_number=worker.current_scan_info.get("dataset_number"), # num_points=worker.current_scan_info.get("num_points"), # scan_type=worker.current_scan_info.get("scan_type"), # scan_report_devices=worker.current_scan_info.get("scan_report_devices"), # user_metadata=worker.current_scan_info.get("user_metadata"), # readout_priority=worker.current_scan_info.get("readout_priority"), # scan_parameters=worker.current_scan_info.get("scan_parameters"), # request_inputs=worker.current_scan_info.get("request_inputs"), # info=worker.current_scan_info, # ) # mock_bragg.scan_info.msg = scan_status_msg <<<<<<< Updated upstream # Ensure that ScanControlLoadMessage is set to SUCCESS mock_bragg.scan_control.scan_msg._read_pv.mock_data = ScanControlLoadMessage.SUCCESS with ( mock.patch.object(mock_bragg.scaninfo, "load_scan_metadata") as mock_load_scan_metadata, mock.patch.object(mock_bragg, "_check_scan_msg") as mock_check_scan_msg, mock.patch.object(mock_bragg, "on_unstage"), ): scan_name = scan_status_msg.content["info"].get("scan_name", "") # Chek the not implemented fly scan first, should raise Mo1BraggError if scan_name not in [ "xas_simple_scan", "xas_simple_scan_with_xrd", "xas_advanced_scan", "xas_advanced_scan_with_xrd", ]: with pytest.raises(Mo1BraggError): mock_bragg.stage() assert mock_check_scan_msg.call_count == 1 assert mock_load_scan_metadata.call_count == 1 else: with ( mock.patch.object(mock_bragg, "set_xas_settings") as mock_xas_settings, mock.patch.object( mock_bragg, "set_advanced_xas_settings" ) as mock_advanced_xas_settings, mock.patch.object(mock_bragg, "set_trig_settings") as mock_trig_settings, mock.patch.object( mock_bragg, "set_scan_control_settings" ) as mock_set_scan_control_settings, ): # Check xas_simple_scan if scan_name == "xas_simple_scan": mock_bragg.stage() assert mock_xas_settings.call_args == mock.call( low=scan_status_msg.content["info"]["kwargs"]["start"], high=scan_status_msg.content["info"]["kwargs"]["stop"], scan_time=scan_status_msg.content["info"]["kwargs"]["scan_time"], ) assert mock_trig_settings.call_args == mock.call( enable_low=False, enable_high=False, exp_time_low=0, exp_time_high=0, cycle_low=0, cycle_high=0, ) assert mock_set_scan_control_settings.call_args == mock.call( mode=ScanControlMode.SIMPLE, scan_duration=scan_status_msg.content["info"]["kwargs"][ "scan_duration" ], ) # Check xas_simple_scan_with_xrd elif scan_name == "xas_simple_scan_with_xrd": mock_bragg.stage() assert mock_xas_settings.call_args == mock.call( low=scan_status_msg.content["info"]["kwargs"]["start"], high=scan_status_msg.content["info"]["kwargs"]["stop"], scan_time=scan_status_msg.content["info"]["kwargs"]["scan_time"], ) assert mock_trig_settings.call_args == mock.call( enable_low=scan_status_msg.content["info"]["kwargs"]["xrd_enable_low"], enable_high=scan_status_msg.content["info"]["kwargs"][ "xrd_enable_high" ], exp_time_low=scan_status_msg.content["info"]["kwargs"]["exp_time_low"], exp_time_high=scan_status_msg.content["info"]["kwargs"][ "exp_time_high" ], cycle_low=scan_status_msg.content["info"]["kwargs"]["cycle_low"], cycle_high=scan_status_msg.content["info"]["kwargs"]["cycle_high"], ) assert mock_set_scan_control_settings.call_args == mock.call( mode=ScanControlMode.SIMPLE, scan_duration=scan_status_msg.content["info"]["kwargs"][ "scan_duration" ], ) # Check xas_advanced_scan elif scan_name == "xas_advanced_scan": mock_bragg.stage() assert mock_advanced_xas_settings.call_args == mock.call( low=scan_status_msg.content["info"]["kwargs"]["start"], high=scan_status_msg.content["info"]["kwargs"]["stop"], scan_time=scan_status_msg.content["info"]["kwargs"]["scan_time"], p_kink=scan_status_msg.content["info"]["kwargs"]["p_kink"], e_kink=scan_status_msg.content["info"]["kwargs"]["e_kink"], ) assert mock_trig_settings.call_args == mock.call( enable_low=False, enable_high=False, exp_time_low=0, exp_time_high=0, cycle_low=0, cycle_high=0, ) assert mock_set_scan_control_settings.call_args == mock.call( mode=ScanControlMode.ADVANCED, scan_duration=scan_status_msg.content["info"]["kwargs"][ "scan_duration" ], ) # Check xas_advanced_scan_with_xrd elif scan_name == "xas_advanced_scan_with_xrd": mock_bragg.stage() assert mock_advanced_xas_settings.call_args == mock.call( low=scan_status_msg.content["info"]["kwargs"]["start"], high=scan_status_msg.content["info"]["kwargs"]["stop"], scan_time=scan_status_msg.content["info"]["kwargs"]["scan_time"], p_kink=scan_status_msg.content["info"]["kwargs"]["p_kink"], e_kink=scan_status_msg.content["info"]["kwargs"]["e_kink"], ) assert mock_trig_settings.call_args == mock.call( enable_low=scan_status_msg.content["info"]["kwargs"]["xrd_enable_low"], enable_high=scan_status_msg.content["info"]["kwargs"][ "xrd_enable_high" ], exp_time_low=scan_status_msg.content["info"]["kwargs"]["exp_time_low"], exp_time_high=scan_status_msg.content["info"]["kwargs"][ "exp_time_high" ], cycle_low=scan_status_msg.content["info"]["kwargs"]["cycle_low"], cycle_high=scan_status_msg.content["info"]["kwargs"]["cycle_high"], ) assert mock_set_scan_control_settings.call_args == mock.call( mode=ScanControlMode.ADVANCED, scan_duration=scan_status_msg.content["info"]["kwargs"][ "scan_duration" ], ) ======= # # Ensure that ScanControlLoadMessage is set to SUCCESS # mock_bragg.scan_control.scan_msg._read_pv.mock_data = ScanControlLoadMessage.SUCCESS # with ( # mock.patch.object(mock_bragg, "_check_scan_msg") as mock_check_scan_msg, # mock.patch.object(mock_bragg, "on_unstage"), # ): # scan_name = scan_status_msg.content["info"].get("scan_name", "") # # Chek the not implemented fly scan first, should raise Mo1BraggError # if scan_name not in [ # "xas_simple_scan", # "xas_simple_scan_with_xrd", # "xas_advanced_scan", # "xas_advanced_scan_with_xrd", # ]: # with pytest.raises(Mo1BraggError): # mock_bragg.stage() # assert mock_check_scan_msg.call_count == 1 # else: # with ( # mock.patch.object(mock_bragg, "set_xas_settings") as mock_xas_settings, # mock.patch.object( # mock_bragg, "set_advanced_xas_settings" # ) as mock_advanced_xas_settings, # mock.patch.object(mock_bragg, "set_trig_settings") as mock_trig_settings, # mock.patch.object( # mock_bragg, "set_scan_control_settings" # ) as mock_set_scan_control_settings, # ): # # Check xas_simple_scan # if scan_name == "xas_simple_scan": # mock_bragg.stage() # assert mock_xas_settings.call_args == mock.call( # low=scan_status_msg.content["info"]["kwargs"]["start"], # high=scan_status_msg.content["info"]["kwargs"]["stop"], # scan_time=scan_status_msg.content["info"]["kwargs"]["scan_time"], # ) # assert mock_trig_settings.call_args == mock.call( # enable_low=False, # enable_high=False, # exp_time_low=0, # exp_time_high=0, # cycle_low=0, # cycle_high=0, # ) # assert mock_set_scan_control_settings.call_args == mock.call( # mode=ScanControlMode.SIMPLE, # scan_duration=scan_status_msg.content["info"]["kwargs"][ # "scan_duration" # ], # ) # # Check xas_simple_scan_with_xrd # elif scan_name == "xas_simple_scan_with_xrd": # mock_bragg.stage() # assert mock_xas_settings.call_args == mock.call( # low=scan_status_msg.content["info"]["kwargs"]["start"], # high=scan_status_msg.content["info"]["kwargs"]["stop"], # scan_time=scan_status_msg.content["info"]["kwargs"]["scan_time"], # ) # assert mock_trig_settings.call_args == mock.call( # enable_low=scan_status_msg.content["info"]["kwargs"]["xrd_enable_low"], # enable_high=scan_status_msg.content["info"]["kwargs"][ # "xrd_enable_high" # ], # exp_time_low=scan_status_msg.content["info"]["kwargs"]["exp_time_low"], # exp_time_high=scan_status_msg.content["info"]["kwargs"][ # "exp_time_high" # ], # cycle_low=scan_status_msg.content["info"]["kwargs"]["cycle_low"], # cycle_high=scan_status_msg.content["info"]["kwargs"]["cycle_high"], # ) # assert mock_set_scan_control_settings.call_args == mock.call( # mode=ScanControlMode.SIMPLE, # scan_duration=scan_status_msg.content["info"]["kwargs"][ # "scan_duration" # ], # ) # # Check xas_advanced_scan # elif scan_name == "xas_advanced_scan": # mock_bragg.stage() # assert mock_advanced_xas_settings.call_args == mock.call( # low=scan_status_msg.content["info"]["kwargs"]["start"], # high=scan_status_msg.content["info"]["kwargs"]["stop"], # scan_time=scan_status_msg.content["info"]["kwargs"]["scan_time"], # p_kink=scan_status_msg.content["info"]["kwargs"]["p_kink"], # e_kink=scan_status_msg.content["info"]["kwargs"]["e_kink"], # ) # assert mock_trig_settings.call_args == mock.call( # enable_low=False, # enable_high=False, # exp_time_low=0, # exp_time_high=0, # cycle_low=0, # cycle_high=0, # ) # assert mock_set_scan_control_settings.call_args == mock.call( # mode=ScanControlMode.ADVANCED, # scan_duration=scan_status_msg.content["info"]["kwargs"][ # "scan_duration" # ], # ) # # Check xas_advanced_scan_with_xrd # elif scan_name == "xas_advanced_scan_with_xrd": # mock_bragg.stage() # assert mock_advanced_xas_settings.call_args == mock.call( # low=scan_status_msg.content["info"]["kwargs"]["start"], # high=scan_status_msg.content["info"]["kwargs"]["stop"], # scan_time=scan_status_msg.content["info"]["kwargs"]["scan_time"], # p_kink=scan_status_msg.content["info"]["kwargs"]["p_kink"], # e_kink=scan_status_msg.content["info"]["kwargs"]["e_kink"], # ) # assert mock_trig_settings.call_args == mock.call( # enable_low=scan_status_msg.content["info"]["kwargs"]["xrd_enable_low"], # enable_high=scan_status_msg.content["info"]["kwargs"][ # "xrd_enable_high" # ], # exp_time_low=scan_status_msg.content["info"]["kwargs"]["exp_time_low"], # exp_time_high=scan_status_msg.content["info"]["kwargs"][ # "exp_time_high" # ], # cycle_low=scan_status_msg.content["info"]["kwargs"]["cycle_low"], # cycle_high=scan_status_msg.content["info"]["kwargs"]["cycle_high"], # ) # assert mock_set_scan_control_settings.call_args == mock.call( # mode=ScanControlMode.ADVANCED, # scan_duration=scan_status_msg.content["info"]["kwargs"][ # "scan_duration" # ], # ) >>>>>>> Stashed changes