# pylint: skip-file import threading from typing import Generator from unittest import mock import ophyd import pytest from bec_server.scan_server.scan_worker import ScanWorker from ophyd.status import WaitTimeoutError from ophyd_devices.interfaces.base_classes.psi_device_base import DeviceStoppedError from ophyd_devices.tests.utils import MockPV # from bec_server.device_server.tests.utils import DMMock from debye_bec.devices.nidaq.nidaq import Nidaq, NidaqError # TODO move this function to ophyd_devices, it is duplicated in csaxs_bec and needed for other pluging repositories from debye_bec.devices.test_utils.utils import patch_dual_pvs @pytest.fixture(scope="function") def scan_worker_mock(scan_server_mock): """Scan worker fixture, utility to generate scan_info for a given scan name.""" scan_server_mock.device_manager.connector = mock.MagicMock() scan_worker = ScanWorker(parent=scan_server_mock) yield scan_worker @pytest.fixture(scope="function") def mock_nidaq() -> Generator[Nidaq, None, None]: """Fixture for the Nidaq device.""" name = "nidaq" prefix = "nidaq:prefix_test:" with mock.patch.object(ophyd, "cl") as mock_cl: mock_cl.get_pv = MockPV mock_cl.thread_class = threading.Thread dev = Nidaq(name=name, prefix=prefix) patch_dual_pvs(dev) yield dev def test_init(mock_nidaq): """Test the initialization of the Nidaq device.""" dev = mock_nidaq assert dev.name == "nidaq" assert dev.prefix == "nidaq:prefix_test:" assert dev.valid_scan_names == [ "xas_simple_scan", "xas_simple_scan_with_xrd", "xas_advanced_scan", "xas_advanced_scan_with_xrd", "nidaq_continuous_scan", ] def test_check_if_scan_name_is_valid(mock_nidaq): """Test the check_if_scan_name_is_valid method.""" dev = mock_nidaq dev.scan_info.msg.scan_name = "xas_simple_scan" assert dev._check_if_scan_name_is_valid() dev.scan_info.msg.scan_name = "invalid_scan_name" assert not dev._check_if_scan_name_is_valid() def test_set_config(mock_nidaq): dev = mock_nidaq # TODO #21 Add test logic for set_config, issue created # def test_on_connected(mock_nidaq): """Test the on_connected method of the Nidaq device.""" dev = mock_nidaq dev.power.put(0) dev.heartbeat._read_pv.mock_data = 0 # First scenario, raise timeout error # This will raise a WaitTimeoutError error as we currently do not support callbacks in the MockPV dev.timeout_wait_for_signal = 0.1 # To check that it raised, we check that dev.power PV is set to 1 # Set state PV to 0, 1 is expected value dev.state._read_pv.mock_data = 0 with pytest.raises(WaitTimeoutError): dev.on_connected() assert dev.power.get() == 1 # TODO, once the MOCKPv supports callbacks, we can test the rest of the logic issue #22 # def test_on_stage(mock_nidaq): # dev = mock_nidaq # #TODO Add once MockPV supports callbacks #22 def test_on_kickoff(mock_nidaq): """Test the on_kickoff method of the Nidaq device.""" dev = mock_nidaq dev.kickoff_call.put(0) dev.kickoff() assert dev.kickoff_call.get() == 1 def test_on_unstage(mock_nidaq): """Test the on_unstage method of the Nidaq device.""" dev = mock_nidaq dev.state._read_pv.mock_data = 0 # Set state to 0, 1 is Standby dev._timeout_wait_for_pv = 0.1 # Set a short timeout for testing dev.enable_compression._read_pv.mock_data = 0 # Compression enabled with pytest.raises(WaitTimeoutError): dev.on_unstage() dev.state._read_pv.mock_data = 1 # FIXME #22 add callback mechanism to MockPV to test the rest of the logic # dev.on_unstage() # assert dev.enable_compression.get() == 1 @pytest.mark.parametrize( ["scan_name", "raise_error", "nidaq_state"], [ ("line_scan", False, 0), ("xas_simple_scan", False, 3), ("xas_simple_scan", True, 0), ("nidaq_continuous_scan", False, 0), ], ) def test_on_pre_scan(mock_nidaq, scan_name, raise_error, nidaq_state): """Test the on_pre_scan method of the Nidaq device.""" dev = mock_nidaq dev.state.put(nidaq_state) dev.scan_info.msg.scan_name = scan_name dev._timeout_wait_for_pv = 0.1 # Set a short timeout for testing if not raise_error: dev.pre_scan() else: with pytest.raises(WaitTimeoutError): dev.pre_scan() def test_on_complete(mock_nidaq): """Test the on_complete method of the Nidaq device.""" dev = mock_nidaq # Check for nidaq_continuous_scan dev.scan_info.msg.scan_name = "nidaq_continuous_scan" dev.state.put(0) # Set state to DISABLED status = dev.complete() assert status.done is False dev.state.put(1) # Should resolve now status.wait(timeout=5) # Wait for the status to complete assert status.done is True # Check for XAS simple scan dev.scan_info.msg.scan_name = "xas_simple_scan" dev.state.put(0) # Set state to ACQUIRE dev.stop_call.put(0) dev._timeout_wait_for_pv = 5 status = dev.on_complete() assert status.done is False assert dev.stop_call.get() == 1 # Should have called stop dev.state.put(1) # Set state to STANDBY # Should resolve now status.wait(timeout=5) # Wait for the status to complete assert status.done is True # Test that it resolves if device is stopped dev.state.put(0) # Set state to DISABLED dev.stop() status.wait(timeout=5) assert status.done is True