From b24f65a2a16f7048c9370ef576dede63da40e00e Mon Sep 17 00:00:00 2001 From: Mathias Guijarro Date: Wed, 6 Mar 2024 10:12:45 +0100 Subject: [PATCH] feat(tests): fixtures for end-2-end tests (available as a pytest plugin) --- bec_client/tests/end-2-end/test_scans.py | 159 +++++--------- bec_client/tests/end-2-end/test_scans_lib.py | 83 ++------ bec_lib/bec_lib/tests/end2end_fixtures.py | 206 +++++++++++++++++++ bec_lib/bec_lib/tests/fixtures.py | 56 +++++ bec_lib/bec_lib/tests/utils.py | 61 +----- bec_lib/setup.py | 9 +- 6 files changed, 353 insertions(+), 221 deletions(-) create mode 100644 bec_lib/bec_lib/tests/end2end_fixtures.py create mode 100644 bec_lib/bec_lib/tests/fixtures.py diff --git a/bec_client/tests/end-2-end/test_scans.py b/bec_client/tests/end-2-end/test_scans.py index 3fc8c2f2..006b4485 100644 --- a/bec_client/tests/end-2-end/test_scans.py +++ b/bec_client/tests/end-2-end/test_scans.py @@ -7,41 +7,18 @@ from unittest.mock import PropertyMock import numpy as np import pytest -from bec_client import BECIPythonClient from bec_client.callbacks.utils import ScanRequestError -from bec_lib import MessageEndpoints, RedisConnector, ServiceConfig, bec_logger, configs +from bec_lib import MessageEndpoints, bec_logger, configs from bec_lib.alarm_handler import AlarmBase from bec_lib.bec_errors import ScanAbortion, ScanInterruption -from bec_lib.tests.utils import wait_for_empty_queue logger = bec_logger.logger -CONFIG_PATH = "../ci/test_config.yaml" -# CONFIG_PATH = "../bec_config_dev.yaml" -# pylint: disable=no-member -# pylint: disable=missing-function-docstring -# pylint: disable=redefined-outer-name -# pylint: disable=protected-access -# pylint: disable=undefined-variable - - -@pytest.fixture(scope="function") -def client(): - config = ServiceConfig(CONFIG_PATH) - bec = BECIPythonClient(config, RedisConnector, forced=True) - bec.start() - bec.queue.request_queue_reset() - bec.queue.request_scan_continuation() - time.sleep(1) - yield bec - bec.shutdown() - @pytest.mark.timeout(100) -def test_grid_scan(capsys, client): - bec = client +def test_grid_scan(capsys, bec_client_fixture): + bec = bec_client_fixture scans = bec.scans - wait_for_empty_queue(bec) bec.metadata.update({"unit_test": "test_grid_scan"}) dev = bec.device_manager.devices scans.umv(dev.samx, 0, dev.samy, 0, relative=False) @@ -53,10 +30,9 @@ def test_grid_scan(capsys, client): @pytest.mark.timeout(100) -def test_fermat_scan(capsys, client): - bec = client +def test_fermat_scan(capsys, bec_client_fixture): + bec = bec_client_fixture scans = bec.scans - wait_for_empty_queue(bec) bec.metadata.update({"unit_test": "test_fermat_scan"}) dev = bec.device_manager.devices status = scans.fermat_scan( @@ -78,10 +54,9 @@ def test_fermat_scan(capsys, client): @pytest.mark.timeout(100) -def test_line_scan(capsys, client): - bec = client +def test_line_scan(capsys, bec_client_fixture): + bec = bec_client_fixture scans = bec.scans - wait_for_empty_queue(bec) bec.metadata.update({"unit_test": "test_line_scan"}) dev = bec.device_manager.devices status = scans.line_scan(dev.samx, -5, 5, steps=10, exp_time=0.01, relative=True) @@ -93,10 +68,9 @@ def test_line_scan(capsys, client): @pytest.mark.flaky # marked as flaky as the simulation might return a new readback value within the tolerance @pytest.mark.timeout(100) -def test_mv_scan(capsys, client): - bec = client +def test_mv_scan(capsys, bec_client_fixture): + bec = bec_client_fixture scans = bec.scans - wait_for_empty_queue(bec) bec.metadata.update({"unit_test": "test_mv_scan"}) dev = bec.device_manager.devices scans.mv(dev.samx, 10, dev.samy, 20, relative=False).wait() @@ -119,10 +93,9 @@ def test_mv_scan(capsys, client): @pytest.mark.timeout(100) -def test_mv_scan_mv(client): - bec = client +def test_mv_scan_mv(bec_client_fixture): + bec = bec_client_fixture scans = bec.scans - wait_for_empty_queue(bec) bec.metadata.update({"unit_test": "test_mv_scan_mv"}) scan_number_start = bec.queue.next_scan_number dev = bec.device_manager.devices @@ -184,7 +157,7 @@ def test_mv_scan_mv(client): @pytest.mark.timeout(100) -def test_scan_abort(client): +def test_scan_abort(bec_client_fixture): def send_abort(bec): while True: current_scan_info = bec.queue.scan_storage.current_scan_info @@ -205,8 +178,7 @@ def test_scan_abort(client): time.sleep(0.5) _thread.interrupt_main() - bec = client - wait_for_empty_queue(bec) + bec = bec_client_fixture bec.metadata.update({"unit_test": "test_scan_abort"}) scan_number_start = bec.queue.next_scan_number scans = bec.scans @@ -237,9 +209,8 @@ def test_scan_abort(client): @pytest.mark.timeout(100) -def test_limit_error(client): - bec = client - wait_for_empty_queue(bec) +def test_limit_error(bec_client_fixture): + bec = bec_client_fixture bec.metadata.update({"unit_test": "test_limit_error"}) scan_number_start = bec.queue.next_scan_number scans = bec.scans @@ -268,9 +239,8 @@ def test_limit_error(client): @pytest.mark.timeout(100) -def test_queued_scan(client): - bec = client - wait_for_empty_queue(bec) +def test_queued_scan(bec_client_fixture): + bec = bec_client_fixture bec.metadata.update({"unit_test": "test_queued_scan"}) scan_number_start = bec.queue.next_scan_number scans = bec.scans @@ -301,9 +271,8 @@ def test_queued_scan(client): @pytest.mark.timeout(100) -def test_fly_scan(client): - bec = client - wait_for_empty_queue(bec) +def test_fly_scan(bec_client_fixture): + bec = bec_client_fixture bec.metadata.update({"unit_test": "test_fly_scan"}) scans = bec.scans dev = bec.device_manager.devices @@ -313,9 +282,8 @@ def test_fly_scan(client): @pytest.mark.timeout(100) -def test_scan_restart(client): - bec = client - wait_for_empty_queue(bec) +def test_scan_restart(bec_client_fixture): + bec = bec_client_fixture bec.metadata.update({"unit_test": "test_scan_restart"}) scans = bec.scans dev = bec.device_manager.devices @@ -352,9 +320,8 @@ def test_scan_restart(client): @pytest.mark.timeout(100) -def test_scan_observer_repeat_queued(client): - bec = client - wait_for_empty_queue(bec) +def test_scan_observer_repeat_queued(bec_client_fixture): + bec = bec_client_fixture bec.metadata.update({"unit_test": "test_scan_observer_repeat_queued"}) scans = bec.scans dev = bec.device_manager.devices @@ -393,9 +360,8 @@ def test_scan_observer_repeat_queued(client): @pytest.mark.timeout(100) -def test_scan_observer_repeat(client): - bec = client - wait_for_empty_queue(bec) +def test_scan_observer_repeat(bec_client_fixture): + bec = bec_client_fixture bec.metadata.update({"unit_test": "test_scan_observer_repeat"}) scans = bec.scans dev = bec.device_manager.devices @@ -432,9 +398,8 @@ def test_scan_observer_repeat(client): @pytest.mark.timeout(100) -def test_file_writer(client): - bec = client - wait_for_empty_queue(bec) +def test_file_writer(bec_client_fixture): + bec = bec_client_fixture bec.metadata.update({"unit_test": "test_file_writer"}) scans = bec.scans dev = bec.device_manager.devices @@ -479,9 +444,8 @@ def test_file_writer(client): @pytest.mark.timeout(100) -def test_scan_def_callback(capsys, client): - bec = client - wait_for_empty_queue(bec) +def test_scan_def_callback(capsys, bec_client_fixture): + bec = bec_client_fixture bec.metadata.update({"unit_test": "test_scan_def_callback"}) scans = bec.scans dev = bec.device_manager.devices @@ -502,9 +466,8 @@ def test_scan_def_callback(capsys, client): @pytest.mark.timeout(100) -def test_scan_def(client): - bec = client - wait_for_empty_queue(bec) +def test_scan_def(bec_client_fixture): + bec = bec_client_fixture bec.metadata.update({"unit_test": "test_scan_def"}) scans = bec.scans dev = bec.device_manager.devices @@ -529,9 +492,8 @@ def test_scan_def(client): @pytest.mark.timeout(100) -def test_group_def(client): - bec = client - wait_for_empty_queue(bec) +def test_group_def(bec_client_fixture): + bec = bec_client_fixture bec.metadata.update({"unit_test": "test_scan_def"}) scans = bec.scans dev = bec.device_manager.devices @@ -545,9 +507,8 @@ def test_group_def(client): @pytest.mark.timeout(100) -def test_list_scan(client): - bec = client - wait_for_empty_queue(bec) +def test_list_scan(bec_client_fixture): + bec = bec_client_fixture bec.metadata.update({"unit_test": "test_list_scan"}) scans = bec.scans dev = bec.device_manager.devices @@ -574,9 +535,8 @@ def test_list_scan(client): @pytest.mark.timeout(100) -def test_time_scan(client): - bec = client - wait_for_empty_queue(bec) +def test_time_scan(bec_client_fixture): + bec = bec_client_fixture bec.metadata.update({"unit_test": "test_time_scan"}) scans = bec.scans status = scans.time_scan(points=5, interval=0.5, exp_time=0.1, relative=False) @@ -584,9 +544,8 @@ def test_time_scan(client): @pytest.mark.timeout(100) -def test_monitor_scan(client): - bec = client - wait_for_empty_queue(bec) +def test_monitor_scan(bec_client_fixture): + bec = bec_client_fixture bec.metadata.update({"unit_test": "test_monitor_scan"}) scans = bec.scans dev = bec.device_manager.devices @@ -597,9 +556,8 @@ def test_monitor_scan(client): @pytest.mark.timeout(100) -def test_rpc_calls(client): - bec = client - wait_for_empty_queue(bec) +def test_rpc_calls(bec_client_fixture): + bec = bec_client_fixture bec.metadata.update({"unit_test": "test_rpc_calls"}) dev = bec.device_manager.devices assert dev.samx.dummy_controller._func_with_args(2, 3) == [2, 3] @@ -616,9 +574,8 @@ def test_rpc_calls(client): @pytest.mark.timeout(100) -def test_burst_scan(client): - bec = client - wait_for_empty_queue(bec) +def test_burst_scan(bec_client_fixture): + bec = bec_client_fixture bec.metadata.update({"unit_test": "test_burst_scan"}) dev = bec.device_manager.devices s = scans.line_scan(dev.samx, 0, 1, burst_at_each_point=2, steps=10, relative=False) @@ -626,9 +583,8 @@ def test_burst_scan(client): @pytest.mark.timeout(100) -def test_callback_data_matches_scan_data(client): - bec = client - wait_for_empty_queue(bec) +def test_callback_data_matches_scan_data(bec_client_fixture): + bec = bec_client_fixture bec.metadata.update({"unit_test": "test_callback_data_matches_scan_data"}) dev = bec.device_manager.devices reference_container = {"data": [], "metadata": {}} @@ -649,9 +605,8 @@ def test_callback_data_matches_scan_data(client): @pytest.mark.timeout(100) -def test_async_callback_data_matches_scan_data(client): - bec = client - wait_for_empty_queue(bec) +def test_async_callback_data_matches_scan_data(bec_client_fixture): + bec = bec_client_fixture bec.metadata.update({"unit_test": "test_async_callback_data_matches_scan_data"}) dev = bec.device_manager.devices reference_container = {"data": [], "metadata": {}} @@ -674,9 +629,8 @@ def test_async_callback_data_matches_scan_data(client): @pytest.mark.timeout(100) -def test_disabled_device_raises_scan_request_error(client): - bec = client - wait_for_empty_queue(bec) +def test_disabled_device_raises_scan_request_error(bec_client_fixture): + bec = bec_client_fixture bec.metadata.update({"unit_test": "test_disabled_device_raises_scan_rejection"}) dev = bec.device_manager.devices dev.samx.enabled = False @@ -689,10 +643,9 @@ def test_disabled_device_raises_scan_request_error(client): # @pytest.fixture(scope="function") @pytest.mark.timeout(100) @pytest.mark.parametrize("abort_on_ctrl_c", [True, False]) -def test_context_manager_export(tmp_path, client, abort_on_ctrl_c): - bec = client +def test_context_manager_export(tmp_path, bec_client_fixture, abort_on_ctrl_c): + bec = bec_client_fixture scans = bec.scans - wait_for_empty_queue(bec) bec.metadata.update({"unit_test": "test_line_scan"}) dev = bec.device_manager.devices bec._client._service_config = PropertyMock() @@ -705,17 +658,17 @@ def test_context_manager_export(tmp_path, client, abort_on_ctrl_c): dev.samx, -5, 5, 10, dev.samy, -5, 5, 10, exp_time=0.01, relative=True ) else: - with scans.scan_export(os.path.join(tmp_path, "test.csv")): + scan_file = os.path.join(tmp_path, "test.csv") + with scans.scan_export(scan_file): scans.line_scan(dev.samx, -5, 5, steps=10, exp_time=0.01, relative=True) scans.grid_scan(dev.samx, -5, 5, 10, dev.samy, -5, 5, 10, exp_time=0.01, relative=True) - assert len(list(tmp_path.iterdir())) == 1 + assert os.path.exists(scan_file) @pytest.mark.timeout(100) -def test_update_config(client): - bec = client - wait_for_empty_queue(bec) +def test_update_config(bec_client_fixture): + bec = bec_client_fixture bec.metadata.update({"unit_test": "test_update_config"}) demo_config_path = os.path.join(os.path.dirname(configs.__file__), "demo_config.yaml") config = bec.config._load_config_from_file(demo_config_path) diff --git a/bec_client/tests/end-2-end/test_scans_lib.py b/bec_client/tests/end-2-end/test_scans_lib.py index 80a30565..02e8d157 100644 --- a/bec_client/tests/end-2-end/test_scans_lib.py +++ b/bec_client/tests/end-2-end/test_scans_lib.py @@ -1,56 +1,22 @@ import os -import threading import time -import bec_lib import numpy as np import pytest import yaml -from bec_lib import BECClient, DeviceConfigError, RedisConnector, ServiceConfig, bec_logger + +import bec_lib +from bec_lib import DeviceConfigError, bec_logger from bec_lib.alarm_handler import AlarmBase -from bec_lib.tests.utils import wait_for_empty_queue logger = bec_logger.logger -CONFIG_PATH = "../ci/test_config.yaml" -# CONFIG_PATH = "../bec_config_dev.yaml" -# pylint: disable=no-member -# pylint: disable=missing-function-docstring -# pylint: disable=redefined-outer-name -# pylint: disable=protected-access -# pylint: disable=undefined-variable - - -@pytest.fixture() -def threads_check(): - current_threads = set(th for th in threading.enumerate() if th is not threading.main_thread()) - yield - threads_after = set(th for th in threading.enumerate() if th is not threading.main_thread()) - additional_threads = threads_after - current_threads - assert ( - len(additional_threads) == 0 - ), f"Test creates {len(additional_threads)} threads that are not cleaned: {additional_threads}" - - -@pytest.fixture(scope="function") -def lib_client(threads_check): - config = ServiceConfig(CONFIG_PATH) - bec = BECClient(config, RedisConnector, forced=True) - bec.start() - bec.queue.request_queue_reset() - bec.queue.request_scan_continuation() - time.sleep(5) - yield bec - bec.shutdown() - bec._client._reset_singleton() - @pytest.mark.timeout(100) -def test_grid_scan_lib_client(lib_client): - bec = lib_client +def test_grid_scan_lib(bec_client_lib): + bec = bec_client_lib scans = bec.scans - wait_for_empty_queue(bec) - bec.metadata.update({"unit_test": "test_grid_scan_lib_client"}) + bec.metadata.update({"unit_test": "test_grid_scan_bec_client_lib"}) dev = bec.device_manager.devices scans.umv(dev.samx, 0, dev.samy, 0, relative=False) status = scans.grid_scan(dev.samx, -5, 5, 10, dev.samy, -5, 5, 10, exp_time=0.01, relative=True) @@ -60,11 +26,10 @@ def test_grid_scan_lib_client(lib_client): @pytest.mark.timeout(100) -def test_mv_scan_lib_client(lib_client): - bec = lib_client +def test_mv_scan_lib(bec_client_lib): + bec = bec_client_lib scans = bec.scans - wait_for_empty_queue(bec) - bec.metadata.update({"unit_test": "test_mv_scan_lib_client"}) + bec.metadata.update({"unit_test": "test_mv_scan_bec_client_lib"}) dev = bec.device_manager.devices scans.mv(dev.samx, 10, dev.samy, 20, relative=False).wait() current_pos_samx = dev.samx.read()["samx"]["value"] @@ -78,10 +43,9 @@ def test_mv_scan_lib_client(lib_client): @pytest.mark.timeout(100) -def test_mv_raises_limit_error(lib_client): - bec = lib_client +def test_mv_raises_limit_error(bec_client_lib): + bec = bec_client_lib scans = bec.scans - wait_for_empty_queue(bec) bec.metadata.update({"unit_test": "test_mv_raises_limit_error"}) dev = bec.device_manager.devices dev.samx.limits = [-50, 50] @@ -90,9 +54,8 @@ def test_mv_raises_limit_error(lib_client): @pytest.mark.timeout(100) -def test_async_callback_data_matches_scan_data_lib_client(lib_client): - bec = lib_client - wait_for_empty_queue(bec) +def test_async_callback_data_matches_scan_data_lib(bec_client_lib): + bec = bec_client_lib bec.metadata.update({"unit_test": "test_async_callback_data_matches_scan_data"}) dev = bec.device_manager.devices reference_container = {"data": [], "metadata": {}} @@ -114,9 +77,8 @@ def test_async_callback_data_matches_scan_data_lib_client(lib_client): @pytest.mark.timeout(100) -def test_config_updates(lib_client): - bec = lib_client - wait_for_empty_queue(bec) +def test_config_updates(bec_client_lib): + bec = bec_client_lib bec.metadata.update({"unit_test": "test_config_updates"}) dev = bec.device_manager.devices dev.samx.limits = [-80, 80] @@ -150,9 +112,8 @@ def test_config_updates(lib_client): @pytest.mark.timeout(100) -def test_dap_fit(lib_client): - bec = lib_client - wait_for_empty_queue(bec) +def test_dap_fit(bec_client_lib): + bec = bec_client_lib bec.metadata.update({"unit_test": "test_dap_fit"}) dev = bec.device_manager.devices scans = bec.scans @@ -312,9 +273,8 @@ def test_dap_fit(lib_client): "invalid_device_class", ], ) -def test_config_reload(lib_client, config, raises_error, deletes_config, disabled_device): - bec = lib_client - wait_for_empty_queue(bec) +def test_config_reload(bec_client_lib, config, raises_error, deletes_config, disabled_device): + bec = bec_client_lib bec.metadata.update({"unit_test": "test_config_reload"}) try: # write new config to disk @@ -343,9 +303,8 @@ def test_config_reload(lib_client, config, raises_error, deletes_config, disable # bec.config.load_demo_config() -def test_computed_signal(lib_client): - bec = lib_client - wait_for_empty_queue(bec) +def test_computed_signal(bec_client_lib): + bec = bec_client_lib bec.metadata.update({"unit_test": "test_computed_signal"}) dev = bec.device_manager.devices scans = bec.scans diff --git a/bec_lib/bec_lib/tests/end2end_fixtures.py b/bec_lib/bec_lib/tests/end2end_fixtures.py new file mode 100644 index 00000000..5db7911e --- /dev/null +++ b/bec_lib/bec_lib/tests/end2end_fixtures.py @@ -0,0 +1,206 @@ +import os +import pathlib +import platform +import tempfile +import warnings + +import pytest +from pytest_redis import factories as pytest_redis_factories + +try: + from bec_client import BECIPythonClient +except ImportError: + warnings.warn( + "No BEC IPython client installed, 'bec_client_fixture_with_demo_config' is not available" + ) + +from bec_lib import BECClient, ConfigHelper, RedisConnector, ServiceConfig +from bec_lib.tests.utils import wait_for_empty_queue + + +@pytest.hookimpl +def pytest_addoption(parser): + parser.addoption("--start-servers", action="store_true", default=False) + parser.addoption("--bec-redis-host", action="store", default="localhost") + parser.addoption("--bec-redis-cmd", action="store", default=None) + parser.addoption("--flush-redis", action="store_true", default=False) + parser.addoption("--files-path", action="store", default=None) + + +redis_server_fixture = None +bec_redis = None +_start_servers = False +bec_servers_scope = ( + lambda fixture_name, config: config.getoption("--flush-redis") and "function" or "session" +) + +config_template = f""" +redis: + host: %(redis_host)s + port: %(redis_port)s +mongodb: + host: "localhost" + port: 27017 +scibec: + host: http://localhost + port: 3030 + beamline: TestBeamline +service_config: + abort_on_ctrl_c: False + enforce_ACLs: False + file_writer: + plugin: default_NeXus_format + base_path: %(file_writer_base_path)s +""" + + +def _check_path(file_path): + if os.path.exists(file_path): + return pathlib.Path(file_path) + else: + raise RuntimeError( + f"end2end tests: --files-path directory {repr(file_path)} does not exist" + ) + + +def _get_tmp_dir(): + # on MacOS, gettempdir() returns path like /var/folders/nj/269977hs0_96bttwj2gs_jhhp48z54/T[...], + # and if building a Unix socket file (like pytest-redis does to connect to redis) it can + # exceed the 109 characters limit, so make a special case for MacOS + return pathlib.Path("/tmp" if platform.system() == "Darwin" else tempfile.gettempdir()) + + +@pytest.hookimpl +def pytest_configure(config): + global redis_server_fixture + global bec_redis + global _start_servers + global _bec_servers_scope + + if config.getoption("--start-servers"): + # configure 'datadir' == where redis Unix socket will go, and .rdb file (if any) + # try to use specified files path (hope it does not exceed 109 chars) or + # just use the normal tmp file directory except on MacOS where it must be enforced + # to /tmp + user_tmp_path = config.getoption("--files-path") + if user_tmp_path is not None: + datadir = _check_path(user_tmp_path) + else: + datadir = _get_tmp_dir() + # session-scoped fixture that starts redis using provided cmd + redis_server_fixture = pytest_redis_factories.proc.redis_proc( + executable=config.getoption("--bec-redis-cmd"), datadir=datadir + ) + + if config.getoption("--flush-redis"): + bec_redis = pytest_redis_factories.redisdb("redis_server_fixture") + _bec_servers_scope = "function" # have to restart servers at each test + else: + bec_redis = redis_server_fixture + else: + # do not automatically start redis - bec_redis will use existing + # process, will wait for 3 seconds max (must be running already); + # there is no point checking if we want to flush redis + # since it would remove available scans which are only populated + # when scan server starts + redis_server_fixture = pytest_redis_factories.redis_noproc( + host=config.getoption("--bec-redis-host"), startup_timeout=3 + ) + bec_redis = redis_server_fixture + + _start_servers = config.getoption("--start-servers") + + +@pytest.fixture(scope=bec_servers_scope) +def bec_services_config_path(request): + user_tmp_path = request.config.getoption("--files-path") + if user_tmp_path is not None: + yield _check_path(user_tmp_path) + else: + if request.config.getoption("--flush-redis"): + request.fixturenames.append("tmp_path") + yield request.getfixturevalue("tmp_path") + else: + request.fixturenames.append("tmp_path_factory") + yield request.getfixturevalue("tmp_path_factory").mktemp("bec_files") + + +@pytest.fixture(scope=bec_servers_scope) +def bec_servers(bec_services_config_path, redis_server_fixture): + config_path = bec_services_config_path / "test_config.yaml" + file_writer_path = bec_services_config_path # / "writer_output" + # file_writer_path.mkdir(exist_ok=True) + config_content = config_template % { + "redis_host": redis_server_fixture.host, + "redis_port": redis_server_fixture.port, + "file_writer_base_path": file_writer_path, + } + with open(config_path, "w") as config_file: + config_file.write(config_content) + + if _start_servers: + from bec_server.service_handler import ServiceHandler + + # Start all BEC servers, kill them at the end + # when no_tmux=True, 'bec_path' indicate the cwd + # for the process (working directory), i.e. where log files will go + service_handler = ServiceHandler( + bec_path=bec_services_config_path, + config_path=config_path, + no_tmux=True, + ) + processes = service_handler.start() + try: + yield + finally: + for process in processes: + process.terminate() + for process in processes: + os.waitpid(process.pid, 0) + else: + # Nothing to do here: servers are supposed to be started externally. + yield + + +@pytest.fixture +def bec_client_with_demo_config(bec_redis, bec_services_config_path, bec_servers): + config = ServiceConfig(bec_services_config_path / "test_config.yaml") + bec = BECIPythonClient(config, RedisConnector, forced=True) + bec.start() + ConfigHelper(bec._client.connector).load_demo_config() + try: + yield bec + finally: + bec.shutdown() + bec._client._reset_singleton() + + +@pytest.fixture +def bec_client_lib_with_demo_config(bec_redis, bec_services_config_path, bec_servers): + config = ServiceConfig(bec_services_config_path / "test_config.yaml") + bec = BECClient(config, RedisConnector, forced=True, wait_for_server=True) + bec.start() + ConfigHelper(bec._client.connector).load_demo_config() + try: + yield bec + finally: + bec.shutdown() + bec._client._reset_singleton() + + +@pytest.fixture +def bec_client_fixture(bec_client_with_demo_config): + bec = bec_client_with_demo_config + bec.queue.request_queue_reset() + bec.queue.request_scan_continuation() + wait_for_empty_queue(bec) + yield bec + + +@pytest.fixture +def bec_client_lib(bec_client_lib_with_demo_config): + bec = bec_client_lib_with_demo_config + bec.queue.request_queue_reset() + bec.queue.request_scan_continuation() + wait_for_empty_queue(bec) + yield bec diff --git a/bec_lib/bec_lib/tests/fixtures.py b/bec_lib/bec_lib/tests/fixtures.py new file mode 100644 index 00000000..b67824da --- /dev/null +++ b/bec_lib/bec_lib/tests/fixtures.py @@ -0,0 +1,56 @@ +from __future__ import annotations + +import copy +import threading +from unittest import mock + +import pytest + +from bec_lib.service_config import ServiceConfig +from bec_lib.tests.utils import ClientMock, ConnectorMock, DMClientMock, load_test_config + + +@pytest.fixture() +def threads_check(): + threads_at_start = set(th for th in threading.enumerate() if th is not threading.main_thread()) + yield + threads_after = set(th for th in threading.enumerate() if th is not threading.main_thread()) + additional_threads = threads_after - threads_at_start + assert ( + len(additional_threads) == 0 + ), f"Test creates {len(additional_threads)} threads that are not cleaned: {additional_threads}" + + +@pytest.fixture +def dm(): + service_mock = mock.MagicMock() + service_mock.connector = ConnectorMock("") + dev_manager = DMClientMock(service_mock) + yield dev_manager + + +@pytest.fixture +def dm_with_devices(dm): + dm._session = copy.deepcopy(load_test_config()) + dm._load_session() + yield dm + + +@pytest.fixture() +def bec_client_mock(dm_with_devices): + client = ClientMock( + ServiceConfig(redis={"host": "host", "port": 123}, scibec={"host": "host", "port": 123}), + ConnectorMock, + wait_for_server=False, + ) + client.start() + device_manager = dm_with_devices + for name, dev in device_manager.devices.items(): + dev._info["hints"] = {"fields": [name]} + client.device_manager = device_manager + try: + yield client + finally: + client.shutdown() + client._reset_singleton() + device_manager.devices.flush() diff --git a/bec_lib/bec_lib/tests/utils.py b/bec_lib/bec_lib/tests/utils.py index d7a2409d..ea14bbce 100644 --- a/bec_lib/bec_lib/tests/utils.py +++ b/bec_lib/bec_lib/tests/utils.py @@ -1,16 +1,12 @@ from __future__ import annotations import builtins -import copy import functools import os -import threading import time import uuid from typing import TYPE_CHECKING -from unittest import mock -import pytest import yaml import bec_lib @@ -20,7 +16,6 @@ from bec_lib.devicemanager import DeviceManagerBase from bec_lib.endpoints import EndpointInfo, MessageEndpoints from bec_lib.logger import bec_logger from bec_lib.scans import Scans -from bec_lib.service_config import ServiceConfig if TYPE_CHECKING: from bec_lib.alarm_handler import Alarms @@ -35,38 +30,6 @@ logger = bec_logger.logger # pylint: disable=protected-access -@pytest.fixture(autouse=True) -def threads_check(): - current_threads = set(th for th in threading.enumerate() if th is not threading.main_thread()) - yield - threads_after = set(th for th in threading.enumerate() if th is not threading.main_thread()) - additional_threads = threads_after - current_threads - assert ( - len(additional_threads) == 0 - ), f"Test creates {len(additional_threads)} threads that are not cleaned: {additional_threads}" - - -@pytest.fixture -def dm(): - service_mock = mock.MagicMock() - service_mock.connector = ConnectorMock("") - dev_manager = DMClientMock(service_mock) - yield dev_manager - - -@functools.lru_cache -def load_test_config(): - with open(f"{dir_path}/tests/test_config.yaml", "r", encoding="utf-8") as f: - return create_session_from_config(yaml.safe_load(f)) - - -@pytest.fixture -def dm_with_devices(dm): - dm._session = copy.deepcopy(load_test_config()) - dm._load_session() - yield dm - - def queue_is_empty(queue) -> bool: # pragma: no cover if not queue: return True @@ -491,24 +454,6 @@ class DMClientMock(DeviceManagerBase): return dev -@pytest.fixture() -def bec_client(dm_with_devices): - client = ClientMock( - ServiceConfig(redis={"host": "host", "port": 123}, scibec={"host": "host", "port": 123}), - ConnectorMock, - wait_for_server=False, - ) - client.start() - print(id(client)) - device_manager = dm_with_devices - for name, dev in device_manager.devices.items(): - dev._info["hints"] = {"fields": [name]} - client.device_manager = device_manager - yield client - client._reset_singleton() - device_manager.devices.flush() - - class PipelineMock: # pragma: no cover _pipe_buffer = [] _connector = None @@ -668,3 +613,9 @@ def create_session_from_config(config: dict) -> dict: device_configs.append(dev_conf) session = {"accessGroups": "customer", "devices": device_configs} return session + + +@functools.lru_cache +def load_test_config(): + with open(f"{dir_path}/tests/test_config.yaml", "r", encoding="utf-8") as f: + return create_session_from_config(yaml.safe_load(f)) diff --git a/bec_lib/setup.py b/bec_lib/setup.py index b64b8b75..f3a1f82c 100644 --- a/bec_lib/setup.py +++ b/bec_lib/setup.py @@ -27,6 +27,7 @@ if __name__ == "__main__": "dev": [ "pytest", "pytest-random-order", + "pytest-redis", "pytest-timeout", "coverage", "pandas", @@ -35,7 +36,13 @@ if __name__ == "__main__": "fakeredis", ] }, - entry_points={"console_scripts": ["bec-channel-monitor = bec_lib:channel_monitor_launch"]}, + entry_points={ + "console_scripts": ["bec-channel-monitor = bec_lib:channel_monitor_launch"], + "pytest11": [ + "bec_lib_end2end_fixtures = bec_lib.tests.end2end_fixtures", + "bec_lib_fixtures = bec_lib.tests.fixtures", + ], + }, package_data={"bec_lib.tests": ["*.yaml"], "bec_lib.configs": ["*.yaml", "*.json"]}, version=__version__, )