Files
debye_bec/tests/tests_devices/test_nidaq.py

167 lines
5.4 KiB
Python

# pylint: skip-file
import threading
from typing import Generator
from unittest import mock
import ophyd
import pytest
from bec_server.scan_server.scan_worker import ScanWorker
from ophyd.status import WaitTimeoutError
from ophyd_devices.interfaces.base_classes.psi_device_base import DeviceStoppedError
from ophyd_devices.tests.utils import MockPV
# from bec_server.device_server.tests.utils import DMMock
from debye_bec.devices.nidaq.nidaq import Nidaq, NidaqError
# TODO move this function to ophyd_devices, it is duplicated in csaxs_bec and needed for other pluging repositories
from debye_bec.devices.test_utils.utils import patch_dual_pvs
@pytest.fixture(scope="function")
def scan_worker_mock(scan_server_mock):
"""Scan worker fixture, utility to generate scan_info for a given scan name."""
scan_server_mock.device_manager.connector = mock.MagicMock()
scan_worker = ScanWorker(parent=scan_server_mock)
yield scan_worker
@pytest.fixture(scope="function")
def mock_nidaq() -> Generator[Nidaq, None, None]:
"""Fixture for the Nidaq device."""
name = "nidaq"
prefix = "nidaq:prefix_test:"
with mock.patch.object(ophyd, "cl") as mock_cl:
mock_cl.get_pv = MockPV
mock_cl.thread_class = threading.Thread
dev = Nidaq(name=name, prefix=prefix)
patch_dual_pvs(dev)
yield dev
def test_init(mock_nidaq):
"""Test the initialization of the Nidaq device."""
dev = mock_nidaq
assert dev.name == "nidaq"
assert dev.prefix == "nidaq:prefix_test:"
assert dev.valid_scan_names == [
"xas_simple_scan",
"xas_simple_scan_with_xrd",
"xas_advanced_scan",
"xas_advanced_scan_with_xrd",
"nidaq_continuous_scan",
]
def test_check_if_scan_name_is_valid(mock_nidaq):
"""Test the check_if_scan_name_is_valid method."""
dev = mock_nidaq
dev.scan_info.msg.scan_name = "xas_simple_scan"
assert dev._check_if_scan_name_is_valid()
dev.scan_info.msg.scan_name = "invalid_scan_name"
assert not dev._check_if_scan_name_is_valid()
def test_set_config(mock_nidaq):
dev = mock_nidaq
# TODO #21 Add test logic for set_config, issue created #
def test_on_connected(mock_nidaq):
"""Test the on_connected method of the Nidaq device."""
dev = mock_nidaq
dev.power.put(0)
dev.heartbeat._read_pv.mock_data = 0
# First scenario, raise timeout error
# This will raise a WaitTimeoutError error as we currently do not support callbacks in the MockPV
dev.timeout_wait_for_signal = 0.1
# To check that it raised, we check that dev.power PV is set to 1
# Set state PV to 0, 1 is expected value
dev.state._read_pv.mock_data = 0
with pytest.raises(WaitTimeoutError):
dev.on_connected()
assert dev.power.get() == 1
# TODO, once the MOCKPv supports callbacks, we can test the rest of the logic issue #22
# def test_on_stage(mock_nidaq):
# dev = mock_nidaq
# #TODO Add once MockPV supports callbacks #22
def test_on_kickoff(mock_nidaq):
"""Test the on_kickoff method of the Nidaq device."""
dev = mock_nidaq
dev.kickoff_call.put(0)
dev.kickoff()
assert dev.kickoff_call.get() == 1
def test_on_unstage(mock_nidaq):
"""Test the on_unstage method of the Nidaq device."""
dev = mock_nidaq
dev.state._read_pv.mock_data = 0 # Set state to 0, 1 is Standby
dev._timeout_wait_for_pv = 0.1 # Set a short timeout for testing
dev.enable_compression._read_pv.mock_data = 0 # Compression enabled
with pytest.raises(WaitTimeoutError):
dev.on_unstage()
dev.state._read_pv.mock_data = 1
# FIXME #22 add callback mechanism to MockPV to test the rest of the logic
# dev.on_unstage()
# assert dev.enable_compression.get() == 1
@pytest.mark.parametrize(
["scan_name", "raise_error", "nidaq_state"],
[
("line_scan", False, 0),
("xas_simple_scan", False, 3),
("xas_simple_scan", True, 0),
("nidaq_continuous_scan", False, 0),
],
)
def test_on_pre_scan(mock_nidaq, scan_name, raise_error, nidaq_state):
"""Test the on_pre_scan method of the Nidaq device."""
dev = mock_nidaq
dev.state.put(nidaq_state)
dev.scan_info.msg.scan_name = scan_name
dev._timeout_wait_for_pv = 0.1 # Set a short timeout for testing
if not raise_error:
dev.pre_scan()
else:
with pytest.raises(WaitTimeoutError):
dev.pre_scan()
def test_on_complete(mock_nidaq):
"""Test the on_complete method of the Nidaq device."""
dev = mock_nidaq
# Check for nidaq_continuous_scan
dev.scan_info.msg.scan_name = "nidaq_continuous_scan"
dev.state.put(0) # Set state to DISABLED
status = dev.complete()
assert status.done is False
dev.state.put(1)
# Should resolve now
status.wait(timeout=5) # Wait for the status to complete
assert status.done is True
# Check for XAS simple scan
dev.scan_info.msg.scan_name = "xas_simple_scan"
dev.state.put(0) # Set state to ACQUIRE
dev.stop_call.put(0)
dev._timeout_wait_for_pv = 5
status = dev.on_complete()
assert status.done is False
assert dev.stop_call.get() == 1 # Should have called stop
dev.state.put(1) # Set state to STANDBY
# Should resolve now
status.wait(timeout=5) # Wait for the status to complete
assert status.done is True
# Test that it resolves if device is stopped
dev.state.put(0) # Set state to DISABLED
dev.stop()
status.wait(timeout=5)
assert status.done is True