1
0
mirror of https://github.com/bec-project/bec_widgets.git synced 2026-03-05 00:12:49 +01:00
Files
bec_widgets/tests/unit_tests/conftest.py
2026-02-05 11:42:19 +01:00

453 lines
18 KiB
Python

import json
import time
from math import inf
from unittest import mock
from unittest.mock import MagicMock, PropertyMock, patch
import fakeredis
import h5py
import numpy as np
import pytest
from bec_lib import messages, service_config
from bec_lib.bec_service import messages
from bec_lib.client import BECClient
from bec_lib.endpoints import MessageEndpoints
from bec_lib.messages import _StoredDataInfo
from bec_lib.scan_history import ScanHistory
from bec_qthemes import apply_theme
from ophyd._pyepics_shim import _dispatcher
from pytestqt.exceptions import TimeoutError as QtBotTimeoutError
from qtpy.QtCore import QEvent, QEventLoop
from qtpy.QtWidgets import QApplication, QMessageBox
from bec_widgets.cli.rpc.rpc_register import RPCRegister
from bec_widgets.tests.utils import (
DEVICES,
DMMock,
FakePositioner,
Positioner,
create_history_file,
process_all_deferred_deletes,
)
from bec_widgets.utils import bec_dispatcher as bec_dispatcher_module
from bec_widgets.utils import error_popups
from bec_widgets.utils.bec_dispatcher import QtRedisConnector
# Patch to set default RAISE_ERROR_DEFAULT to True for tests
# This means that by default, error popups will raise exceptions during tests
# error_popups.RAISE_ERROR_DEFAULT = True
@pytest.hookimpl(tryfirst=True, hookwrapper=True)
def pytest_runtest_makereport(item, call):
# execute all other hooks to obtain the report object
outcome = yield
rep = outcome.get_result()
item.stash["failed"] = rep.failed
@pytest.fixture(autouse=True)
def qapplication(qtbot, request, testable_qtimer_class): # pylint: disable=unused-argument
qapp = QApplication.instance()
process_all_deferred_deletes(qapp)
apply_theme("light")
qapp.processEvents()
yield
# if the test failed, we don't want to check for open widgets as
# it simply pollutes the output
# stop pyepics dispatcher for leaking tests
_dispatcher.stop()
if request.node.stash._storage.get("failed"):
print("Test failed, skipping cleanup checks")
return
bec_dispatcher = bec_dispatcher_module.BECDispatcher()
bec_dispatcher.stop_cli_server()
testable_qtimer_class.check_all_stopped(qtbot)
qapp.processEvents()
if hasattr(qapp, "os_listener") and qapp.os_listener:
qapp.removeEventFilter(qapp.os_listener)
try:
qtbot.waitUntil(lambda: qapp.topLevelWidgets() == [])
except QtBotTimeoutError as exc:
raise TimeoutError(f"Failed to close all widgets: {qapp.topLevelWidgets()}") from exc
@pytest.fixture(autouse=True)
def rpc_register():
yield RPCRegister()
RPCRegister.reset_singleton()
_REDIS_CONN: QtRedisConnector | None = None
def global_mock_qt_redis_connector(*_, **__):
global _REDIS_CONN
if _REDIS_CONN is None:
_REDIS_CONN = QtRedisConnector(bootstrap="localhost:1", redis_cls=fakeredis.FakeRedis)
return _REDIS_CONN
def mock_client(*_, **__):
with (
patch("bec_lib.client.DeviceManagerBase", DMMock),
patch("bec_lib.client.DAPPlugins"),
patch("bec_lib.client.Scans"),
patch("bec_lib.client.ScanManager"),
patch("bec_lib.bec_service.BECAccess"),
):
client = BECClient(
config=service_config.ServiceConfig(config={"redis": {"host": "localhost", "port": 1}}),
connector_cls=global_mock_qt_redis_connector,
)
client.start()
return client
@pytest.fixture(autouse=True)
def bec_dispatcher(threads_check): # pylint: disable=unused-argument
with mock.patch.object(bec_dispatcher_module, "BECClient", mock_client):
bec_dispatcher = bec_dispatcher_module.BECDispatcher()
yield bec_dispatcher
bec_dispatcher.disconnect_all()
# clean BEC client
bec_dispatcher.client.shutdown()
# stop the cli server
bec_dispatcher.stop_cli_server()
# reinitialize singleton for next test
bec_dispatcher_module.BECDispatcher.reset_singleton()
@pytest.fixture(autouse=True)
def clean_singleton():
error_popups._popup_utility_instance = None
@pytest.fixture(autouse=True)
def suppress_message_box(monkeypatch):
"""
Auto-suppress any QMessageBox.exec_ calls by returning Ok immediately.
"""
monkeypatch.setattr(QMessageBox, "exec_", lambda *args, **kwargs: QMessageBox.Ok)
@pytest.fixture
def grid_scan_history_msg(tmpdir):
x_grid, y_grid = np.meshgrid(np.linspace(-5, 5, 10), np.linspace(-5, 5, 10))
x_flat = x_grid.T.ravel()
y_flat = y_grid.T.ravel()
positions = np.vstack((x_flat, y_flat)).T
num_points = len(positions)
data = {
"baseline": {"bpm1a": {"bpm1a": {"value": [1], "timestamp": [100]}}},
"monitored": {
"bpm4i": {
"bpm4i": {
"value": np.random.rand(num_points),
"timestamp": np.random.rand(num_points),
}
},
"samx": {"samx": {"value": x_flat, "timestamp": np.random.rand(num_points)}},
"samy": {"samy": {"value": y_flat, "timestamp": np.random.rand(num_points)}},
},
"async": {
"async_device": {
"async_device": {
"value": np.random.rand(num_points * 10),
"timestamp": np.random.rand(num_points * 10),
}
}
},
}
metadata = {
"scan_id": "test_scan",
"scan_name": "grid_scan",
"scan_type": "step",
"exit_status": "closed",
"scan_number": 1,
"dataset_number": 1,
"request_inputs": {
"arg_bundle": ["samx", -5, 5, 10, "samy", -5, 5, 10],
"kwargs": {"relative": True},
},
"positions": positions.tolist(),
"num_points": num_points,
}
file_path = str(tmpdir.join("scan_1.h5"))
return create_history_file(file_path, data, metadata)
@pytest.fixture
def scan_history_factory(tmpdir):
"""
Factory to create scan history messages with custom parameters.
Usage:
msg1 = scan_history_factory(scan_id="id1", scan_number=1, num_points=10)
msg2 = scan_history_factory(scan_id="id2", scan_number=2, scan_name="grid_scan", num_points=16)
"""
def _factory(
scan_id: str = "test_scan",
scan_number: int = 1,
dataset_number: int = 1,
scan_name: str = "line_scan",
scan_type: str = "step",
num_points: int = 10,
x_range: tuple = (-5, 5),
y_range: tuple = (-5, 5),
):
# Generate positions based on scan type
if scan_name == "grid_scan":
grid_size = int(np.sqrt(num_points))
x_grid, y_grid = np.meshgrid(
np.linspace(x_range[0], x_range[1], grid_size),
np.linspace(y_range[0], y_range[1], grid_size),
)
x_flat = x_grid.T.ravel()
y_flat = y_grid.T.ravel()
else:
x_flat = np.linspace(x_range[0], x_range[1], num_points)
y_flat = np.linspace(y_range[0], y_range[1], num_points)
positions = np.vstack((x_flat, y_flat)).T
num_pts = len(positions)
# Create dummy data
data = {
"baseline": {"bpm1a": {"bpm1a": {"value": [1], "timestamp": [100]}}},
"monitored": {
"bpm4i": {
"bpm4i": {
"value": np.random.rand(num_points),
"timestamp": np.random.rand(num_points),
}
},
"bpm3a": {
"bpm3a": {
"value": np.random.rand(num_points),
"timestamp": np.random.rand(num_points),
}
},
"samx": {"samx": {"value": x_flat, "timestamp": np.arange(num_pts)}},
"samy": {"samy": {"value": y_flat, "timestamp": np.arange(num_pts)}},
},
"async": {
"async_device": {
"async_device": {
"value": np.random.rand(num_pts * 10),
"timestamp": np.random.rand(num_pts * 10),
}
}
},
}
metadata = {
"scan_id": scan_id,
"scan_name": scan_name,
"scan_type": scan_type,
"exit_status": "closed",
"scan_number": scan_number,
"dataset_number": dataset_number,
"request_inputs": {
"arg_bundle": [
"samx",
x_range[0],
x_range[1],
num_pts,
"samy",
y_range[0],
y_range[1],
num_pts,
],
"kwargs": {"relative": True},
},
"positions": positions.tolist(),
"num_points": num_pts,
"stored_data_info": {
"samx": {"samx": _StoredDataInfo(shape=(num_points,), dtype="float64")},
"samy": {"samy": _StoredDataInfo(shape=(num_points,), dtype="float64")},
"bpm4i": {"bpm4i": _StoredDataInfo(shape=(10,), dtype="float64")},
"async_device": {
"async_device": _StoredDataInfo(shape=(num_points * 10,), dtype="float64")
},
},
"scan_report_devices": [b"samx"],
}
file_path = str(tmpdir.join(f"{scan_id}.h5"))
return create_history_file(file_path, data, metadata)
return _factory
@pytest.fixture(scope="function")
def mocked_client(bec_dispatcher):
# Ensure isinstance check for Positioner passes
original_isinstance = isinstance
def isinstance_mock(obj, class_info):
if class_info == Positioner and isinstance(obj, FakePositioner):
return True
return original_isinstance(obj, class_info)
with patch("builtins.isinstance", new=isinstance_mock):
yield bec_dispatcher.client
bec_dispatcher.client.connector.shutdown()
@pytest.fixture(scope="function")
def mock_client_w_devices(mocked_client):
mocked_client.device_manager.add_devices(DEVICES)
yield mocked_client
##################################################
# Client Fixture with DAP
##################################################
@pytest.fixture(scope="function")
def dap_plugin_message():
msg = messages.AvailableResourceMessage(
**{
"resource": {
"GaussianModel": {
"class": "LmfitService1D",
"user_friendly_name": "GaussianModel",
"class_doc": "A model based on a Gaussian or normal distribution lineshape.\n\n The model has three Parameters: `amplitude`, `center`, and `sigma`.\n In addition, parameters `fwhm` and `height` are included as\n constraints to report full width at half maximum and maximum peak\n height, respectively.\n\n .. math::\n\n f(x; A, \\mu, \\sigma) = \\frac{A}{\\sigma\\sqrt{2\\pi}} e^{[{-{(x-\\mu)^2}/{{2\\sigma}^2}}]}\n\n where the parameter `amplitude` corresponds to :math:`A`, `center` to\n :math:`\\mu`, and `sigma` to :math:`\\sigma`. The full width at half\n maximum is :math:`2\\sigma\\sqrt{2\\ln{2}}`, approximately\n :math:`2.3548\\sigma`.\n\n For more information, see: https://en.wikipedia.org/wiki/Normal_distribution\n\n ",
"run_doc": "A model based on a Gaussian or normal distribution lineshape.\n\n The model has three Parameters: `amplitude`, `center`, and `sigma`.\n In addition, parameters `fwhm` and `height` are included as\n constraints to report full width at half maximum and maximum peak\n height, respectively.\n\n .. math::\n\n f(x; A, \\mu, \\sigma) = \\frac{A}{\\sigma\\sqrt{2\\pi}} e^{[{-{(x-\\mu)^2}/{{2\\sigma}^2}}]}\n\n where the parameter `amplitude` corresponds to :math:`A`, `center` to\n :math:`\\mu`, and `sigma` to :math:`\\sigma`. The full width at half\n maximum is :math:`2\\sigma\\sqrt{2\\ln{2}}`, approximately\n :math:`2.3548\\sigma`.\n\n For more information, see: https://en.wikipedia.org/wiki/Normal_distribution\n\n \n Args:\n scan_item (ScanItem): Scan item or scan ID\n device_x (DeviceBase | str): Device name for x\n signal_x (DeviceBase | str): Signal name for x\n device_y (DeviceBase | str): Device name for y\n signal_y (DeviceBase | str): Signal name for y\n parameters (dict): Fit parameters\n ",
"run_name": "fit",
"signature": [
{
"name": "args",
"kind": "VAR_POSITIONAL",
"default": "_empty",
"annotation": "_empty",
},
{
"name": "scan_item",
"kind": "KEYWORD_ONLY",
"default": None,
"annotation": "ScanItem | str",
},
{
"name": "device_x",
"kind": "KEYWORD_ONLY",
"default": None,
"annotation": "DeviceBase | str",
},
{
"name": "signal_x",
"kind": "KEYWORD_ONLY",
"default": None,
"annotation": "DeviceBase | str",
},
{
"name": "device_y",
"kind": "KEYWORD_ONLY",
"default": None,
"annotation": "DeviceBase | str",
},
{
"name": "signal_y",
"kind": "KEYWORD_ONLY",
"default": None,
"annotation": "DeviceBase | str",
},
{
"name": "parameters",
"kind": "KEYWORD_ONLY",
"default": None,
"annotation": "dict",
},
{
"name": "kwargs",
"kind": "VAR_KEYWORD",
"default": "_empty",
"annotation": "_empty",
},
],
"auto_fit_supported": True,
"params": {
"amplitude": {
"name": "amplitude",
"value": 1.0,
"vary": True,
"min": -inf,
"max": inf,
"expr": None,
"brute_step": None,
"user_data": None,
},
"center": {
"name": "center",
"value": 0.0,
"vary": True,
"min": -inf,
"max": inf,
"expr": None,
"brute_step": None,
"user_data": None,
},
"sigma": {
"name": "sigma",
"value": 1.0,
"vary": True,
"min": 0,
"max": inf,
"expr": None,
"brute_step": None,
"user_data": None,
},
"fwhm": {
"name": "fwhm",
"value": 2.35482,
"vary": False,
"min": -inf,
"max": inf,
"expr": "2.3548200*sigma",
"brute_step": None,
"user_data": None,
},
"height": {
"name": "height",
"value": 0.3989423,
"vary": False,
"min": -inf,
"max": inf,
"expr": "0.3989423*amplitude/max(1e-15, sigma)",
"brute_step": None,
"user_data": None,
},
},
"class_args": [],
"class_kwargs": {"model": "GaussianModel"},
}
}
}
)
yield msg
@pytest.fixture(scope="function")
def mocked_client_with_dap(mocked_client, dap_plugin_message):
mocked_client.device_manager.add_devices(DEVICES)
dap_services = {
"BECClient": messages.StatusMessage(name="BECClient", status=1, info={}),
"DAPServer/LmfitService1D": messages.StatusMessage(
name="LmfitService1D", status=1, info={}
),
}
type(mocked_client).service_status = PropertyMock(return_value=dap_services)
mocked_client.connector.set(
topic=MessageEndpoints.dap_available_plugins("dap"), msg=dap_plugin_message
)
# Patch the client's DAP attribute so that the available models include "GaussianModel"
patched_models = {"GaussianModel": {}, "LorentzModel": {}, "SineModel": {}}
mocked_client.dap._available_dap_plugins = patched_models
yield mocked_client