0
0
mirror of https://github.com/bec-project/bec_widgets.git synced 2025-07-14 11:41:49 +02:00

test: unit tests moved to separate folder; scope of autouse bec_dispatcher fixture reduced only for unit tests; ci adjusted

This commit is contained in:
2024-04-15 13:54:45 +02:00
parent 4d0df364d3
commit 2446c401d9
31 changed files with 33 additions and 54 deletions

View File

View File

@ -0,0 +1,138 @@
# pylint: disable = no-name-in-module,missing-class-docstring, missing-module-docstring
from unittest.mock import MagicMock, patch
import fakeredis
import pytest
from bec_lib import BECClient, RedisConnector
from bec_lib.device import Positioner
from bec_lib.devicemanager import DeviceContainer
class FakeDevice:
"""Fake minimal positioner class for testing."""
def __init__(self, name, enabled=True):
self.name = name
self.enabled = enabled
self.signals = {self.name: {"value": 1.0}}
self.description = {self.name: {"source": self.name, "dtype": "number", "shape": []}}
def __contains__(self, item):
return item == self.name
@property
def _hints(self):
return [self.name]
def set_value(self, fake_value: float = 1.0) -> None:
"""
Setup fake value for device readout
Args:
fake_value(float): Desired fake value
"""
self.signals[self.name]["value"] = fake_value
def describe(self) -> dict:
"""
Get the description of the device
Returns:
dict: Description of the device
"""
return self.description
class FakePositioner(FakeDevice):
def __init__(self, name, enabled=True, limits=None, read_value=1.0):
super().__init__(name, enabled)
self.limits = limits if limits is not None else [0, 0]
self.read_value = read_value
def set_read_value(self, value):
self.read_value = value
def read(self):
return {self.name: {"value": self.read_value}}
def set_limits(self, limits):
self.limits = limits
def move(self, value, relative=False):
"""Simulates moving the device to a new position."""
if relative:
self.read_value += value
else:
self.read_value = value
# Respect the limits
self.read_value = max(min(self.read_value, self.limits[1]), self.limits[0])
@property
def readback(self):
return MagicMock(get=MagicMock(return_value=self.read_value))
class DMMock:
def __init__(self):
self.devices = DeviceContainer()
def add_devives(self, devices: list):
for device in devices:
self.devices[device.name] = device
DEVICES = [
FakePositioner("samx", limits=[-10, 10], read_value=2.0),
FakePositioner("samy", limits=[-5, 5], read_value=3.0),
FakePositioner("aptrx", limits=None, read_value=4.0),
FakePositioner("aptry", limits=None, read_value=5.0),
FakeDevice("gauss_bpm"),
FakeDevice("gauss_adc1"),
FakeDevice("gauss_adc2"),
FakeDevice("gauss_adc3"),
FakeDevice("bpm4i"),
FakeDevice("bpm3a"),
FakeDevice("bpm3i"),
]
def fake_redis_server(host, port):
redis = fakeredis.FakeRedis()
return redis
@pytest.fixture(scope="function")
def mocked_client(bec_dispatcher):
connector = RedisConnector("localhost:1", redis_cls=fake_redis_server)
# Create a MagicMock object
client = MagicMock() # TODO change to real BECClient
# Shutdown the original client
bec_dispatcher.client.shutdown()
# Mock the connector attribute
bec_dispatcher.client = client
# Mock the device_manager.devices attribute
client.connector = connector
client.device_manager = DMMock()
client.device_manager.add_devives(DEVICES)
def mock_mv(*args, relative=False):
# Extracting motor and value pairs
for i in range(0, len(args), 2):
motor = args[i]
value = args[i + 1]
motor.move(value, relative=relative)
return MagicMock(wait=MagicMock())
client.scans = MagicMock(mv=mock_mv)
# Ensure isinstance check for Positioner passes
original_isinstance = isinstance
def isinstance_mock(obj, class_info):
if class_info == Positioner and isinstance(obj, FakePositioner):
return True
return original_isinstance(obj, class_info)
with patch("builtins.isinstance", new=isinstance_mock):
yield client
connector.shutdown() # TODO change to real BECClient

View File

@ -0,0 +1,14 @@
import pytest
from bec_widgets.utils import bec_dispatcher as bec_dispatcher_module
@pytest.fixture(autouse=True)
def bec_dispatcher(threads_check):
bec_dispatcher = bec_dispatcher_module.BECDispatcher()
yield bec_dispatcher
bec_dispatcher.disconnect_all()
# clean BEC client
bec_dispatcher.client.shutdown()
# reinitialize singleton for next test
bec_dispatcher_module.BECDispatcher.reset_singleton()

View File

@ -0,0 +1,57 @@
# pylint: disable = no-name-in-module,missing-class-docstring, missing-module-docstring
import pytest
from bec_widgets.utils import BECConnector, ConnectionConfig
from .client_mocks import mocked_client
@pytest.fixture
def bec_connector(mocked_client):
connector = BECConnector(client=mocked_client)
return connector
def test_bec_connector_init(bec_connector):
assert bec_connector is not None
assert bec_connector.client is not None
assert isinstance(bec_connector, BECConnector)
assert bec_connector.config.widget_class == "BECConnector"
def test_bec_connector_init_with_gui_id(mocked_client):
bc = BECConnector(client=mocked_client, gui_id="test_gui_id")
assert bc.config.gui_id == "test_gui_id"
assert bc.gui_id == "test_gui_id"
def test_bec_connector_set_gui_id(bec_connector):
bec_connector.set_gui_id("test_gui_id")
assert bec_connector.config.gui_id == "test_gui_id"
def test_bec_connector_change_config(bec_connector):
bec_connector.on_config_update({"gui_id": "test_gui_id"})
assert bec_connector.config.gui_id == "test_gui_id"
def test_bec_connector_get_obj_by_id(bec_connector):
bec_connector.set_gui_id("test_gui_id")
assert bec_connector.get_obj_by_id("test_gui_id") == bec_connector
assert bec_connector.get_obj_by_id("test_gui_id_2") is None
def test_bec_connector_update_client(bec_connector, mocked_client):
client_new = mocked_client
bec_connector.update_client(client_new)
assert bec_connector.client == client_new
assert bec_connector.dev is not None
assert bec_connector.scans is not None
assert bec_connector.queue is not None
assert bec_connector.scan_storage is not None
assert bec_connector.dap is not None
def test_bec_connector_get_config(bec_connector):
assert bec_connector.get_config(dict_output=False) == bec_connector.config
assert bec_connector.get_config() == bec_connector.config.model_dump()

View File

@ -0,0 +1,59 @@
# pylint: disable = no-name-in-module,missing-class-docstring, missing-module-docstring
import time
from unittest import mock
import pytest
import redis
from bec_lib.connector import MessageObject
from bec_lib.messages import ScanMessage
from bec_lib.redis_connector import RedisConnector
from bec_lib.serialization import MsgpackSerialization
from bec_widgets.utils.bec_dispatcher import QtRedisConnector
@pytest.fixture
def bec_dispatcher_w_connector(bec_dispatcher, topics_msg_list):
def pubsub_msg_generator():
for topic, msg in topics_msg_list:
yield {"channel": topic.encode(), "pattern": None, "data": msg}
while True:
time.sleep(0.2)
yield StopIteration
with mock.patch("redis.Redis"):
pubsub = redis.Redis().pubsub()
messages = pubsub_msg_generator()
pubsub.get_message.side_effect = lambda timeout: next(messages)
connector = QtRedisConnector("localhost:1")
bec_dispatcher.client.connector = connector
yield bec_dispatcher
dummy_msg = MsgpackSerialization.dumps(ScanMessage(point_id=0, scan_id="0", data={}))
@pytest.mark.parametrize(
"topics_msg_list",
[
(
("topic1", dummy_msg),
("topic2", dummy_msg),
("topic3", dummy_msg),
)
],
)
def test_dispatcher_disconnect_all(bec_dispatcher_w_connector, qtbot):
bec_dispatcher = bec_dispatcher_w_connector
cb1 = mock.Mock(spec=[])
cb2 = mock.Mock(spec=[])
bec_dispatcher.connect_slot(cb1, "topic1")
bec_dispatcher.connect_slot(cb1, "topic2")
bec_dispatcher.connect_slot(cb2, "topic2")
bec_dispatcher.connect_slot(cb2, "topic3")
assert len(bec_dispatcher.client.connector._topics_cb) == 3
bec_dispatcher.disconnect_all()
assert len(bec_dispatcher.client.connector._topics_cb) == 0

View File

@ -0,0 +1,244 @@
# pylint: disable=missing-function-docstring, missing-module-docstring, unused-import
import os
from unittest.mock import MagicMock
import numpy as np
import pytest
from bec_widgets.widgets import BECFigure, BECMotorMap, BECWaveform
from bec_widgets.widgets.plots import BECImageShow
from .client_mocks import mocked_client
@pytest.fixture
def bec_figure(qtbot, mocked_client):
widget = BECFigure(client=mocked_client)
qtbot.addWidget(widget)
qtbot.waitExposed(widget)
yield widget
widget.close()
def test_bec_figure_init(bec_figure):
assert bec_figure is not None
assert bec_figure.client is not None
assert isinstance(bec_figure, BECFigure)
assert bec_figure.config.widget_class == "BECFigure"
def test_bec_figure_init_with_config(mocked_client):
config = {
"widget_class": "BECFigure",
"gui_id": "test_gui_id",
"theme": "dark",
}
widget = BECFigure(client=mocked_client, config=config)
assert widget.config.gui_id == "test_gui_id"
assert widget.config.theme == "dark"
def test_bec_figure_add_remove_plot(bec_figure):
initial_count = len(bec_figure._widgets)
# Adding 3 widgets - 2 WaveformBase and 1 PlotBase
w0 = bec_figure.add_plot()
w1 = bec_figure.add_plot()
w2 = bec_figure.add_widget(widget_type="PlotBase")
# Check if the widgets were added
assert len(bec_figure._widgets) == initial_count + 3
assert "widget_1" in bec_figure._widgets
assert "widget_2" in bec_figure._widgets
assert "widget_3" in bec_figure._widgets
assert bec_figure._widgets["widget_1"].config.widget_class == "BECWaveform"
assert bec_figure._widgets["widget_2"].config.widget_class == "BECWaveform"
assert bec_figure._widgets["widget_3"].config.widget_class == "BECPlotBase"
# Check accessing positions by the grid in figure
assert bec_figure[0, 0] == w0
assert bec_figure[1, 0] == w1
assert bec_figure[2, 0] == w2
# Removing 1 widget
bec_figure.remove(widget_id="widget_1")
assert len(bec_figure._widgets) == initial_count + 2
assert "widget_1" not in bec_figure._widgets
assert "widget_3" in bec_figure._widgets
assert bec_figure._widgets["widget_2"].config.widget_class == "BECWaveform"
def test_add_different_types_of_widgets(bec_figure):
plt = bec_figure.plot("samx", "bpm4i")
im = bec_figure.image("eiger")
motor_map = bec_figure.motor_map("samx", "samy")
assert plt.__class__ == BECWaveform
assert im.__class__ == BECImageShow
assert motor_map.__class__ == BECMotorMap
def test_access_widgets_access_errors(bec_figure):
bec_figure.add_plot(row=0, col=0)
# access widget by non-existent coordinates
with pytest.raises(ValueError) as excinfo:
bec_figure[0, 2]
assert "No widget at coordinates (0, 2)" in str(excinfo.value)
# access widget by non-existent widget_id
with pytest.raises(KeyError) as excinfo:
bec_figure["non_existent_widget"]
assert "Widget with id 'non_existent_widget' not found" in str(excinfo.value)
# access widget by wrong type
with pytest.raises(TypeError) as excinfo:
bec_figure[1.2]
assert (
"Key must be a string (widget id) or a tuple of two integers (grid coordinates)"
in str(excinfo.value)
)
def test_add_plot_to_occupied_position(bec_figure):
bec_figure.add_plot(row=0, col=0)
with pytest.raises(ValueError) as excinfo:
bec_figure.add_plot(row=0, col=0)
assert "Position at row 0 and column 0 is already occupied." in str(excinfo.value)
def test_remove_plots(bec_figure):
w1 = bec_figure.add_plot(row=0, col=0)
w2 = bec_figure.add_plot(row=0, col=1)
w3 = bec_figure.add_plot(row=1, col=0)
w4 = bec_figure.add_plot(row=1, col=1)
assert bec_figure[0, 0] == w1
assert bec_figure[0, 1] == w2
assert bec_figure[1, 0] == w3
assert bec_figure[1, 1] == w4
# remove by coordinates
bec_figure[0, 0].remove()
assert "widget_1" not in bec_figure._widgets
# remove by widget_id
bec_figure.remove(widget_id="widget_2")
assert "widget_2" not in bec_figure._widgets
# remove by widget object
w3.remove()
assert "widget_3" not in bec_figure._widgets
# check the remaining widget 4
assert bec_figure[0, 0] == w4
assert bec_figure["widget_4"] == w4
assert "widget_4" in bec_figure._widgets
assert len(bec_figure._widgets) == 1
def test_remove_plots_by_coordinates_ints(bec_figure):
w1 = bec_figure.add_plot(row=0, col=0)
w2 = bec_figure.add_plot(row=0, col=1)
bec_figure.remove(0, 0)
assert "widget_1" not in bec_figure._widgets
assert "widget_2" in bec_figure._widgets
assert bec_figure[0, 0] == w2
assert len(bec_figure._widgets) == 1
def test_remove_plots_by_coordinates_tuple(bec_figure):
w1 = bec_figure.add_plot(row=0, col=0)
w2 = bec_figure.add_plot(row=0, col=1)
bec_figure.remove(coordinates=(0, 0))
assert "widget_1" not in bec_figure._widgets
assert "widget_2" in bec_figure._widgets
assert bec_figure[0, 0] == w2
assert len(bec_figure._widgets) == 1
def test_remove_plot_by_id_error(bec_figure):
bec_figure.add_plot(row=0, col=0)
with pytest.raises(ValueError) as excinfo:
bec_figure.remove(widget_id="non_existent_widget")
assert "Widget with ID 'non_existent_widget' does not exist." in str(excinfo.value)
def test_remove_plot_by_coordinates_error(bec_figure):
bec_figure.add_plot(row=0, col=0)
with pytest.raises(ValueError) as excinfo:
bec_figure.remove(0, 1)
assert "No widget at coordinates (0, 1)" in str(excinfo.value)
def test_remove_plot_by_providing_nothing(bec_figure):
bec_figure.add_plot(row=0, col=0)
with pytest.raises(ValueError) as excinfo:
bec_figure.remove()
assert "Must provide either widget_id or coordinates for removal." in str(excinfo.value)
# def test_change_theme(bec_figure): #TODO do no work at python 3.12
# bec_figure.change_theme("dark")
# assert bec_figure.config.theme == "dark"
# assert bec_figure.backgroundBrush().color().name() == "#000000"
# bec_figure.change_theme("light")
# assert bec_figure.config.theme == "light"
# assert bec_figure.backgroundBrush().color().name() == "#ffffff"
# bec_figure.change_theme("dark")
# assert bec_figure.config.theme == "dark"
# assert bec_figure.backgroundBrush().color().name() == "#000000"
def test_change_layout(bec_figure):
w1 = bec_figure.add_plot(row=0, col=0)
w2 = bec_figure.add_plot(row=0, col=1)
w3 = bec_figure.add_plot(row=1, col=0)
w4 = bec_figure.add_plot(row=1, col=1)
bec_figure.change_layout(max_columns=1)
assert np.shape(bec_figure.grid) == (4, 1)
assert bec_figure[0, 0] == w1
assert bec_figure[1, 0] == w2
assert bec_figure[2, 0] == w3
assert bec_figure[3, 0] == w4
bec_figure.change_layout(max_rows=1)
assert np.shape(bec_figure.grid) == (1, 4)
assert bec_figure[0, 0] == w1
assert bec_figure[0, 1] == w2
assert bec_figure[0, 2] == w3
assert bec_figure[0, 3] == w4
def test_clear_all(bec_figure):
bec_figure.add_plot(row=0, col=0)
bec_figure.add_plot(row=0, col=1)
bec_figure.add_plot(row=1, col=0)
bec_figure.add_plot(row=1, col=1)
bec_figure.clear_all()
assert len(bec_figure._widgets) == 0
assert np.shape(bec_figure.grid) == (0,)
def test_shortcuts(bec_figure):
plt = bec_figure.plot("samx", "bpm4i")
im = bec_figure.image("eiger")
motor_map = bec_figure.motor_map("samx", "samy")
assert plt.config.widget_class == "BECWaveform"
assert plt.__class__ == BECWaveform
assert im.config.widget_class == "BECImageShow"
assert im.__class__ == BECImageShow
assert motor_map.config.widget_class == "BECMotorMap"
assert motor_map.__class__ == BECMotorMap

View File

@ -0,0 +1,220 @@
# pylint: disable = no-name-in-module,missing-class-docstring, missing-module-docstring
import os
from unittest.mock import MagicMock
import pytest
import yaml
from bec_widgets.widgets import BECMonitor
from .client_mocks import mocked_client
def load_test_config(config_name):
"""Helper function to load config from yaml file."""
config_path = os.path.join(os.path.dirname(__file__), "test_configs", f"{config_name}.yaml")
with open(config_path, "r") as f:
config = yaml.safe_load(f)
return config
@pytest.fixture(scope="function")
def monitor(bec_dispatcher, qtbot, mocked_client):
# client = MagicMock()
widget = BECMonitor(client=mocked_client)
qtbot.addWidget(widget)
qtbot.waitExposed(widget)
yield widget
@pytest.mark.parametrize(
"config_name, scan_type, number_of_plots",
[
("config_device", False, 2),
("config_device_no_entry", False, 2),
# ("config_scan", True, 4),
],
)
def test_initialization_with_device_config(monitor, config_name, scan_type, number_of_plots):
config = load_test_config(config_name)
monitor.on_config_update(config)
assert isinstance(monitor, BECMonitor)
assert monitor.client is not None
assert len(monitor.plot_data) == number_of_plots
assert monitor.scan_types == scan_type
@pytest.mark.parametrize(
"config_initial,config_update",
[("config_device", "config_scan"), ("config_scan", "config_device")],
)
def test_on_config_update(monitor, config_initial, config_update):
config_initial = load_test_config(config_initial)
config_update = load_test_config(config_update)
# validated config has to be compared
config_initial_validated = monitor.validator.validate_monitor_config(
config_initial
).model_dump()
config_update_validated = monitor.validator.validate_monitor_config(config_update).model_dump()
monitor.on_config_update(config_initial)
assert monitor.config == config_initial_validated
monitor.on_config_update(config_update)
assert monitor.config == config_update_validated
@pytest.mark.parametrize(
"config_name, expected_num_columns, expected_plot_names, expected_coordinates",
[
("config_device", 1, ["BPM4i plots vs samx", "Gauss plots vs samx"], [(0, 0), (1, 0)]),
(
"config_scan",
3,
["Grid plot 1", "Grid plot 2", "Grid plot 3", "Grid plot 4"],
[(0, 0), (0, 1), (0, 2), (1, 0)],
),
],
)
def test_render_initial_plots(
monitor, config_name, expected_num_columns, expected_plot_names, expected_coordinates
):
config = load_test_config(config_name)
monitor.on_config_update(config)
# Validate number of columns
assert monitor.plot_settings["num_columns"] == expected_num_columns
# Validate the plots are created correctly
for expected_name in expected_plot_names:
assert expected_name in monitor.plots.keys()
# Validate the grid_coordinates
assert monitor.grid_coordinates == expected_coordinates
def mock_getitem(dev_name):
"""Helper function to mock the __getitem__ method of the 'dev'."""
mock_instance = MagicMock()
if dev_name == "samx":
mock_instance._hints = "samx"
elif dev_name == "bpm4i":
mock_instance._hints = "bpm4i"
elif dev_name == "gauss_bpm":
mock_instance._hints = "gauss_bpm"
return mock_instance
def mock_get_scan_storage(scan_id, data):
"""Helper function to mock the __getitem__ method of the 'dev'."""
mock_instance = MagicMock()
mock_instance.get_scan_storage.return_value = data
return mock_instance
# mocked messages and metadata
msg_1 = {
"data": {
"samx": {"samx": {"value": 10}},
"bpm4i": {"bpm4i": {"value": 5}},
"gauss_bpm": {"gauss_bpm": {"value": 6}},
"gauss_adc1": {"gauss_adc1": {"value": 8}},
"gauss_adc2": {"gauss_adc2": {"value": 9}},
},
"scan_id": 1,
}
metadata_grid = {"scan_name": "grid_scan"}
metadata_line = {"scan_name": "line_scan"}
@pytest.mark.parametrize(
"config_name, msg, metadata, expected_data",
[
# case: msg does not have 'scan_id'
(
"config_device",
{"data": {}},
{},
{
"scan_segment": {
"bpm4i": {"bpm4i": []},
"gauss_adc1": {"gauss_adc1": []},
"gauss_adc2": {"gauss_adc2": []},
"samx": {"samx": []},
}
},
),
# case: scan_types is false, msg contains all valid fields, and entry is present in config
(
"config_device",
msg_1,
{},
{
"scan_segment": {
"bpm4i": {"bpm4i": [5]},
"gauss_adc1": {"gauss_adc1": [8]},
"gauss_adc2": {"gauss_adc2": [9]},
"samx": {"samx": [10]},
}
},
),
# case: scan_types is false, msg contains all valid fields and entry is missing in config, should use hints
(
"config_device_no_entry",
msg_1,
{},
{
"scan_segment": {
"bpm4i": {"bpm4i": [5]},
"gauss_bpm": {"gauss_bpm": [6]},
"samx": {"samx": [10]},
}
},
),
# case: scan_types is true, msg contains all valid fields, metadata contains scan "line_scan:"
(
"config_scan",
msg_1,
metadata_line,
{
"scan_segment": {
"bpm4i": {"bpm4i": [5]},
"gauss_adc1": {"gauss_adc1": [8]},
"gauss_adc2": {"gauss_adc2": [9]},
"gauss_bpm": {"gauss_bpm": [6]},
"samx": {"samx": [10]},
}
},
),
(
"config_scan",
msg_1,
metadata_grid,
{
"scan_segment": {
"bpm4i": {"bpm4i": [5]},
"gauss_adc1": {"gauss_adc1": [8]},
"gauss_adc2": {"gauss_adc2": [9]},
"gauss_bpm": {"gauss_bpm": [6]},
"samx": {"samx": [10]},
}
},
),
],
)
def test_on_scan_segment(monitor, config_name, msg, metadata, expected_data):
config = load_test_config(config_name)
monitor.on_config_update(config)
# Mock scan_storage.find_scan_by_ID
mock_scan_data = MagicMock()
mock_scan_data.data = {
device_name: {
entry: MagicMock(val=[msg["data"][device_name][entry]["value"]])
for entry in msg["data"][device_name]
}
for device_name in msg["data"]
}
monitor.queue.scan_storage.find_scan_by_ID.return_value = mock_scan_data
monitor.on_scan_segment(msg, metadata)
assert monitor.database == expected_data

View File

@ -0,0 +1,125 @@
import pytest
from bec_widgets.widgets import BECMotorMap
from bec_widgets.widgets.plots.motor_map import MotorMapConfig
from bec_widgets.widgets.plots.waveform import Signal, SignalData
from .client_mocks import mocked_client
@pytest.fixture(scope="function")
def bec_motor_map(qtbot, mocked_client):
widget = BECMotorMap(client=mocked_client, gui_id="BECMotorMap_test")
# qtbot.addWidget(widget)
# qtbot.waitExposed(widget)
yield widget
def test_motor_map_init(bec_motor_map):
default_config = MotorMapConfig(widget_class="BECMotorMap", gui_id="BECMotorMap_test")
assert bec_motor_map.config == default_config
def test_motor_map_change_motors(bec_motor_map):
bec_motor_map.change_motors("samx", "samy")
assert bec_motor_map.config.signals.x == SignalData(name="samx", entry="samx", limits=[-10, 10])
assert bec_motor_map.config.signals.y == SignalData(name="samy", entry="samy", limits=[-5, 5])
def test_motor_map_get_limits(bec_motor_map):
expected_limits = {
"samx": [-10, 10],
"samy": [-5, 5],
}
for motor_name, expected_limit in expected_limits.items():
actual_limit = bec_motor_map._get_motor_limit(motor_name)
assert actual_limit == expected_limit
def test_motor_map_get_init_position(bec_motor_map):
bec_motor_map.set_precision(2)
motor_map_dev = bec_motor_map.client.device_manager.devices
expected_positions = {
("samx", "samx"): motor_map_dev["samx"].read()["samx"]["value"],
("samy", "samy"): motor_map_dev["samy"].read()["samy"]["value"],
("aptrx", "aptrx"): motor_map_dev["aptrx"].read()["aptrx"]["value"],
("aptry", "aptry"): motor_map_dev["aptry"].read()["aptry"]["value"],
}
for (motor_name, entry), expected_position in expected_positions.items():
actual_position = bec_motor_map._get_motor_init_position(motor_name, entry, 2)
assert actual_position == expected_position
def test_motor_movement_updates_position_and_database(bec_motor_map):
motor_map_dev = bec_motor_map.client.device_manager.devices
init_positions = {
"samx": [motor_map_dev["samx"].read()["samx"]["value"]],
"samy": [motor_map_dev["samy"].read()["samy"]["value"]],
}
bec_motor_map.change_motors("samx", "samy")
assert bec_motor_map.database_buffer["x"] == init_positions["samx"]
assert bec_motor_map.database_buffer["y"] == init_positions["samy"]
# Simulate motor movement for 'samx' only
new_position_samx = 4.0
bec_motor_map.on_device_readback({"signals": {"samx": {"value": new_position_samx}}})
init_positions["samx"].append(new_position_samx)
init_positions["samy"].append(init_positions["samy"][-1])
# Verify database update for 'samx'
assert bec_motor_map.database_buffer["x"] == init_positions["samx"]
# Verify 'samy' retains its last known position
assert bec_motor_map.database_buffer["y"] == init_positions["samy"]
def test_scatter_plot_rendering(bec_motor_map):
motor_map_dev = bec_motor_map.client.device_manager.devices
init_positions = {
"samx": [motor_map_dev["samx"].read()["samx"]["value"]],
"samy": [motor_map_dev["samy"].read()["samy"]["value"]],
}
bec_motor_map.change_motors("samx", "samy")
# Simulate motor movement for 'samx' only
new_position_samx = 4.0
bec_motor_map.on_device_readback({"signals": {"samx": {"value": new_position_samx}}})
bec_motor_map._update_plot()
# Get the scatter plot item
scatter_plot_item = bec_motor_map.plot_components["scatter"]
# Check the scatter plot item properties
assert len(scatter_plot_item.data) > 0, "Scatter plot data is empty"
x_data = scatter_plot_item.data["x"]
y_data = scatter_plot_item.data["y"]
assert x_data[-1] == new_position_samx, "Scatter plot X data not updated correctly"
assert (
y_data[-1] == init_positions["samy"][-1]
), "Scatter plot Y data should retain last known position"
def test_plot_visualization_consistency(bec_motor_map):
bec_motor_map.change_motors("samx", "samy")
# Simulate updating the plot with new data
bec_motor_map.on_device_readback({"signals": {"samx": {"value": 5}}})
bec_motor_map.on_device_readback({"signals": {"samy": {"value": 9}}})
bec_motor_map._update_plot()
scatter_plot_item = bec_motor_map.plot_components["scatter"]
# Check if the scatter plot reflects the new data correctly
assert (
scatter_plot_item.data["x"][-1] == 5 and scatter_plot_item.data["y"][-1] == 9
), "Plot not updated correctly with new data"

View File

@ -0,0 +1,182 @@
# pylint: disable = no-name-in-module,missing-class-docstring, missing-module-docstring
import os
from unittest.mock import MagicMock
import pytest
import yaml
from qtpy.QtWidgets import QTableWidgetItem, QTabWidget
from bec_widgets.widgets.monitor.config_dialog import ConfigDialog
from .client_mocks import mocked_client
def load_test_config(config_name):
"""Helper function to load config from yaml file."""
config_path = os.path.join(os.path.dirname(__file__), "test_configs", f"{config_name}.yaml")
with open(config_path, "r") as f:
config = yaml.safe_load(f)
return config
@pytest.fixture(scope="function")
def config_dialog(qtbot, mocked_client):
client = mocked_client
widget = ConfigDialog(client=client)
qtbot.addWidget(widget)
qtbot.waitExposed(widget)
yield widget
@pytest.mark.parametrize("config_name", ["config_device", "config_scan"])
def test_load_config(config_dialog, config_name):
config = load_test_config(config_name)
config_dialog.load_config(config)
assert (
config_dialog.comboBox_appearance.currentText()
== config["plot_settings"]["background_color"]
)
assert config_dialog.spinBox_n_column.value() == config["plot_settings"]["num_columns"]
assert config_dialog.comboBox_colormap.currentText() == config["plot_settings"]["colormap"]
@pytest.mark.parametrize(
"config_name, scan_mode",
[
("config_device", False),
("config_scan", True),
("config_device_no_entry", False),
],
)
def test_initialization(config_dialog, config_name, scan_mode):
config = load_test_config(config_name)
config_dialog.load_config(config)
assert isinstance(config_dialog, ConfigDialog)
assert (
config_dialog.comboBox_appearance.currentText()
== config["plot_settings"]["background_color"]
)
assert config_dialog.spinBox_n_column.value() == config["plot_settings"]["num_columns"]
assert (config_dialog.comboBox_scanTypes.currentText() == "Enabled") == scan_mode
assert (
config_dialog.tabWidget_scan_types.count() > 0
) # Ensures there's at least one tab created
# If there's a need to check the contents of the first tab (there has to be always at least one tab)
first_tab = config_dialog.tabWidget_scan_types.widget(0)
if scan_mode:
assert (
first_tab.findChild(QTabWidget, "tabWidget_plots") is not None
) # Ensures plot tab widget exists in scan mode
else:
assert (
first_tab.findChild(QTabWidget) is not None
) # Ensures plot tab widget exists in default mode
def test_edit_and_apply_config(config_dialog):
config_device = load_test_config("config_device")
config_dialog.load_config(config_device)
config_dialog.comboBox_appearance.setCurrentText("white")
config_dialog.spinBox_n_column.setValue(2)
config_dialog.comboBox_colormap.setCurrentText("viridis")
applied_config = config_dialog.apply_config()
assert applied_config["plot_settings"]["background_color"] == "white"
assert applied_config["plot_settings"]["num_columns"] == 2
assert applied_config["plot_settings"]["colormap"] == "viridis"
def test_edit_and_apply_config_scan_mode(config_dialog):
config_scan = load_test_config("config_scan")
config_dialog.load_config(config_scan)
config_dialog.comboBox_appearance.setCurrentText("white")
config_dialog.spinBox_n_column.setValue(2)
config_dialog.comboBox_colormap.setCurrentText("viridis")
config_dialog.comboBox_scanTypes.setCurrentText("Enabled")
applied_config = config_dialog.apply_config()
assert applied_config["plot_settings"]["background_color"] == "white"
assert applied_config["plot_settings"]["num_columns"] == 2
assert applied_config["plot_settings"]["colormap"] == "viridis"
assert applied_config["plot_settings"]["scan_types"] is True
def test_add_new_scan(config_dialog):
# Ensure the tab count is initially 1 (from the default config)
assert config_dialog.tabWidget_scan_types.count() == 1
# Add a new scan tab
config_dialog.add_new_scan_tab(config_dialog.tabWidget_scan_types, "Test Scan Tab")
# Ensure the tab count is now 2
assert config_dialog.tabWidget_scan_types.count() == 2
# Ensure the new tab has the correct name
assert config_dialog.tabWidget_scan_types.tabText(1) == "Test Scan Tab"
def test_add_new_plot_and_modify(config_dialog):
# Ensure the tab count is initially 1 and it is called "Default"
assert config_dialog.tabWidget_scan_types.count() == 1
assert config_dialog.tabWidget_scan_types.tabText(0) == "Default"
# Get the first tab (which should be a scan tab)
scan_tab = config_dialog.tabWidget_scan_types.widget(0)
# Ensure the plot tab count is initially 1 and it is called "Plot 1"
tabWidget_plots = scan_tab.findChild(QTabWidget)
assert tabWidget_plots.count() == 1
assert tabWidget_plots.tabText(0) == "Plot 1"
# Add a new plot tab
config_dialog.add_new_plot_tab(scan_tab)
# Ensure the plot tab count is now 2
assert tabWidget_plots.count() == 2
# Ensure the new tab has the correct name
assert tabWidget_plots.tabText(1) == "Plot 2"
# Access the new plot tab
new_plot_tab = tabWidget_plots.widget(1)
# Modify the line edits within the new plot tab
new_plot_tab.ui.lineEdit_plot_title.setText("Modified Plot Title")
new_plot_tab.ui.lineEdit_x_label.setText("Modified X Label")
new_plot_tab.ui.lineEdit_y_label.setText("Modified Y Label")
new_plot_tab.ui.lineEdit_x_name.setText("Modified X Name")
new_plot_tab.ui.lineEdit_x_entry.setText("Modified X Entry")
# Modify the table for signals
config_dialog.add_new_signal(new_plot_tab.ui.tableWidget_y_signals)
table = new_plot_tab.ui.tableWidget_y_signals
assert table.rowCount() == 1 # Ensure the new row is added
row_position = table.rowCount() - 1
# Modify the first row
table.setItem(row_position, 0, QTableWidgetItem("New Signal Name"))
table.setItem(row_position, 1, QTableWidgetItem("New Signal Entry"))
# Apply the configuration
config = config_dialog.apply_config()
# Check if the modifications are reflected in the configuration
modified_plot_config = config["plot_data"][1] # Access the second plot in the plot_data list
sources = modified_plot_config["sources"][0] # Access the first source in the sources list
assert modified_plot_config["plot_name"] == "Modified Plot Title"
assert modified_plot_config["x_label"] == "Modified X Label"
assert modified_plot_config["y_label"] == "Modified Y Label"
assert sources["signals"]["x"][0]["name"] == "Modified X Name"
assert sources["signals"]["x"][0]["entry"] == "Modified X Entry"
assert sources["signals"]["y"][0]["name"] == "New Signal Name"
assert sources["signals"]["y"][0]["entry"] == "New Signal Entry"

View File

@ -0,0 +1,33 @@
plot_settings:
background_color: "black"
num_columns: 1
colormap: "plasma"
scan_types: false
plot_data:
- plot_name: "BPM4i plots vs samx"
x_label: "Motor X"
y_label: "bpm4i"
sources:
- type: "scan_segment"
signals:
x:
- name : "samx"
entry: "samx"
y:
- name : "bpm4i"
entry: "bpm4i"
- plot_name: "Gauss plots vs samx"
x_label: "Motor X"
y_label: "Gauss"
sources:
- type: "scan_segment"
signals:
x:
- name: "samx"
entry: "samx"
y:
- name: "gauss_adc1"
entry: "gauss_adc1"
- name: "gauss_adc2"
entry: "gauss_adc2"

View File

@ -0,0 +1,27 @@
plot_settings:
background_color: "black"
num_columns: 1
colormap: "plasma"
scan_types: false
plot_data:
- plot_name: "BPM4i plots vs samx"
x_label: "Motor X"
y_label: "bpm4i"
sources:
- type: "scan_segment"
signals:
x:
- name : "samx"
y:
- name : "bpm4i"
- plot_name: "Gauss plots vs samx"
x_label: "Motor X"
y_label: "Gauss"
sources:
- type: "scan_segment"
signals:
x:
- name: "samx"
y:
- name: "gauss_bpm"

View File

@ -0,0 +1,82 @@
plot_settings:
background_color: "white"
num_columns: 3
colormap: "plasma"
scan_types: true
plot_data:
grid_scan:
- plot_name: "Grid plot 1"
x_label: "Motor X"
y_label: "BPM"
sources:
- type: "scan_segment"
signals:
x:
- name: "samx"
entry: "samx"
y:
- name: "gauss_bpm"
entry: "gauss_bpm"
- plot_name: "Grid plot 2"
x_label: "Motor X"
y_label: "BPM"
sources:
- type: "scan_segment"
signals:
x:
- name: "samx"
entry: "samx"
y:
- name: "gauss_adc1"
entry: "gauss_adc1"
- plot_name: "Grid plot 3"
x_label: "Motor X"
y_label: "BPM"
sources:
- type: "scan_segment"
signals:
x:
- name: "samx"
entry: "samx"
y:
- name: "gauss_adc2"
entry: "gauss_adc2"
- plot_name: "Grid plot 4"
x_label: "Motor X"
y_label: "BPM"
sources:
- type: "scan_segment"
signals:
x:
- name: "samx"
entry: "samx"
y:
- name: "bpm4i"
entry: "bpm4i"
line_scan:
- plot_name: "Multiple Gauss Plot"
x_label: "Motor X"
y_label: "BPM"
sources:
- type: "scan_segment"
signals:
x:
- name: "samx"
y:
- name: "gauss_bpm"
entry: "gauss_bpm"
- name: "gauss_adc1"
entry: "gauss_adc1"
- name: "gauss_adc2"
entry: "gauss_adc2"
- plot_name: "BPM Plot"
x_label: "Motor X"
y_label: "BPM"
sources:
- type: "scan_segment"
signals:
x:
- name: "samx"
y:
- name: "bpm4i"
entry: "bpm4i"

View File

@ -0,0 +1,150 @@
# pylint: disable = no-name-in-module,missing-class-docstring, missing-module-docstring
import numpy as np
import pyqtgraph as pg
from qtpy.QtCore import QPointF
from bec_widgets.utils import Crosshair
def test_mouse_moved_lines(qtbot):
# Create a PlotWidget and add a PlotItem
plot_widget = pg.PlotWidget(title="1D PlotWidget with multiple curves")
plot_item = plot_widget.getPlotItem()
plot_item.plot([1, 2, 3], [4, 5, 6])
# Create a Crosshair instance
crosshair = Crosshair(plot_item=plot_item, precision=2)
# Connect the signals to slots that will store the emitted values
emitted_values_1D = []
crosshair.coordinatesChanged1D.connect(emitted_values_1D.append)
# Simulate a mouse moved event at a specific position
pos_in_view = QPointF(2, 5)
pos_in_scene = plot_item.vb.mapViewToScene(pos_in_view)
event_mock = [pos_in_scene]
# Call the mouse_moved method
crosshair.mouse_moved(event_mock)
# Assert the expected behavior
assert crosshair.v_line.pos().x() == 2
assert crosshair.h_line.pos().y() == 5
def test_mouse_moved_signals(qtbot):
# Create a PlotWidget and add a PlotItem
plot_widget = pg.PlotWidget(title="1D PlotWidget with multiple curves")
plot_item = plot_widget.getPlotItem()
plot_item.plot([1, 2, 3], [4, 5, 6])
# Create a Crosshair instance
crosshair = Crosshair(plot_item=plot_item, precision=2)
# Create a slot that will store the emitted values as tuples
emitted_values_1D = []
def slot(x, y_values):
emitted_values_1D.append((x, y_values))
# Connect the signal to the custom slot
crosshair.coordinatesChanged1D.connect(slot)
# Simulate a mouse moved event at a specific position
pos_in_view = QPointF(2, 5)
pos_in_scene = plot_item.vb.mapViewToScene(pos_in_view)
event_mock = [pos_in_scene]
# Call the mouse_moved method
crosshair.mouse_moved(event_mock)
# Assert the expected behavior
assert emitted_values_1D == [(2.0, [5.0])]
def test_mouse_moved_signals_outside(qtbot):
# Create a PlotWidget and add a PlotItem
plot_widget = pg.PlotWidget(title="1D PlotWidget with multiple curves")
plot_item = plot_widget.getPlotItem()
plot_item.plot([1, 2, 3], [4, 5, 6])
# Create a Crosshair instance
crosshair = Crosshair(plot_item=plot_item, precision=2)
# Create a slot that will store the emitted values as tuples
emitted_values_1D = []
def slot(x, y_values):
emitted_values_1D.append((x, y_values))
# Connect the signal to the custom slot
crosshair.coordinatesChanged1D.connect(slot)
# Simulate a mouse moved event at a specific position
pos_in_view = QPointF(22, 55)
pos_in_scene = plot_item.vb.mapViewToScene(pos_in_view)
event_mock = [pos_in_scene]
# Call the mouse_moved method
crosshair.mouse_moved(event_mock)
# Assert the expected behavior
assert emitted_values_1D == []
def test_mouse_moved_signals_2D(qtbot):
# write similar test for 2D plot
# Create a PlotWidget and add a PlotItem
plot_widget = pg.PlotWidget(title="2D plot with crosshair and ROI square")
data_2D = np.random.random((100, 200))
plot_item = plot_widget.getPlotItem()
image_item = pg.ImageItem(data_2D)
plot_item.addItem(image_item)
# Create a Crosshair instance
crosshair = Crosshair(plot_item=plot_item)
# Create a slot that will store the emitted values as tuples
emitted_values_2D = []
def slot(x, y):
emitted_values_2D.append((x, y))
# Connect the signal to the custom slot
crosshair.coordinatesChanged2D.connect(slot)
# Simulate a mouse moved event at a specific position
pos_in_view = QPointF(22.0, 55.0)
pos_in_scene = plot_item.vb.mapViewToScene(pos_in_view)
event_mock = [pos_in_scene]
# Call the mouse_moved method
crosshair.mouse_moved(event_mock)
# Assert the expected behavior
assert emitted_values_2D == [(22.0, 55.0)]
def test_mouse_moved_signals_2D_outside(qtbot):
# write similar test for 2D plot
# Create a PlotWidget and add a PlotItem
plot_widget = pg.PlotWidget(title="2D plot with crosshair and ROI square")
data_2D = np.random.random((100, 200))
plot_item = plot_widget.getPlotItem()
image_item = pg.ImageItem(data_2D)
plot_item.addItem(image_item)
# Create a Crosshair instance
crosshair = Crosshair(plot_item=plot_item, precision=2)
# Create a slot that will store the emitted values as tuples
emitted_values_2D = []
def slot(x, y):
emitted_values_2D.append((x, y))
# Connect the signal to the custom slot
crosshair.coordinatesChanged2D.connect(slot)
# Simulate a mouse moved event at a specific position
pos_in_view = QPointF(220.0, 555.0)
pos_in_scene = plot_item.vb.mapViewToScene(pos_in_view)
event_mock = [pos_in_scene]
# Call the mouse_moved method
crosshair.mouse_moved(event_mock)
# Assert the expected behavior
assert emitted_values_2D == []

View File

@ -0,0 +1,170 @@
# pylint: disable = no-name-in-module,missing-class-docstring, missing-module-docstring
import os
import tempfile
from unittest.mock import MagicMock, mock_open, patch
import pytest
from qtpy.Qsci import QsciScintilla
from qtpy.QtWidgets import QTextEdit
from bec_widgets.widgets.editor.editor import AutoCompleter, BECEditor
@pytest.fixture(scope="function")
def editor(qtbot, docstring_tooltip=False):
"""Helper function to set up the BECEditor widget."""
widget = BECEditor(toolbar_enabled=True, docstring_tooltip=docstring_tooltip)
qtbot.addWidget(widget)
qtbot.waitExposed(widget)
yield widget
def find_action_by_text(toolbar, text):
"""Helper function to find an action in the toolbar by its text."""
for action in toolbar.actions():
if action.text() == text:
return action
return None
def test_bec_editor_initialization(editor):
"""Test if the BECEditor widget is initialized correctly."""
assert isinstance(editor.editor, QsciScintilla)
assert isinstance(editor.terminal, QTextEdit)
assert isinstance(editor.auto_completer, AutoCompleter)
@patch("bec_widgets.widgets.editor.editor.Script") # Mock the Script class from jedi
def test_autocompleter_suggestions(mock_script, editor, qtbot):
"""Test if the autocompleter provides correct suggestions based on input."""
# Set up mock return values for the Script.complete method
mock_completion = MagicMock()
mock_completion.name = "mocked_method"
mock_script.return_value.complete.return_value = [mock_completion]
# Simulate user input in the editor
test_code = "print("
editor.editor.setText(test_code)
line, index = editor.editor.getCursorPosition()
# Trigger autocomplete
editor.auto_completer.get_completions(line, index, test_code)
# Use qtbot to wait for the completion thread
qtbot.waitUntil(lambda: editor.auto_completer.completions is not None, timeout=1000)
# Check if the expected completion is in the autocompleter's suggestions
suggested_methods = [completion.name for completion in editor.auto_completer.completions]
assert "mocked_method" in suggested_methods
@patch("bec_widgets.widgets.editor.editor.Script") # Mock the Script class from jedi
@pytest.mark.parametrize(
"docstring_enabled, expected_signature",
[(True, "Mocked signature with docstring"), (False, "Mocked signature")],
)
def test_autocompleter_signature(mock_script, editor, docstring_enabled, expected_signature):
"""Test if the autocompleter provides correct function signature based on docstring setting."""
# Set docstring mode based on parameter
editor.docstring_tooltip = docstring_enabled
editor.auto_completer.enable_docstring = docstring_enabled
# Set up mock return values for the Script.get_signatures method
mock_signature = MagicMock()
if docstring_enabled:
mock_signature.docstring.return_value = expected_signature
else:
mock_signature.to_string.return_value = expected_signature
mock_script.return_value.get_signatures.return_value = [mock_signature]
# Simulate user input that would trigger a signature request
test_code = "print("
editor.editor.setText(test_code)
line, index = editor.editor.getCursorPosition()
# Trigger signature request
signature = editor.auto_completer.get_function_signature(line, index, test_code)
# Check if the expected signature is returned
assert signature == expected_signature
def test_open_file(editor):
"""Test open_file method of BECEditor."""
# Create a temporary file with some content
with tempfile.NamedTemporaryFile(delete=False, suffix=".py") as temp_file:
temp_file.write(b"test file content")
# Mock user selecting the file in the dialog
with patch("qtpy.QtWidgets.QFileDialog.getOpenFileName", return_value=(temp_file.name, "")):
with patch("builtins.open", new_callable=mock_open, read_data="test file content"):
editor.open_file()
# Verify if the editor's text is set to the file content
assert editor.editor.text() == "test file content"
# Clean up by removing the temporary file
os.remove(temp_file.name)
def test_save_file(editor):
"""Test save_file method of BECEditor."""
# Set some text in the editor
editor.editor.setText("test save content")
# Mock user selecting the file in the dialog
with patch(
"qtpy.QtWidgets.QFileDialog.getSaveFileName", return_value=("/path/to/save/file.py", "")
):
with patch("builtins.open", new_callable=mock_open) as mock_file:
editor.save_file()
# Verify if the file was opened correctly for writing
mock_file.assert_called_with("/path/to/save/file.py", "w")
# Verify if the editor's text was written to the file
mock_file().write.assert_called_with("test save content")
def test_open_file_through_toolbar(editor):
"""Test the open_file method through the ModularToolBar."""
# Create a temporary file
with tempfile.NamedTemporaryFile(delete=False, suffix=".py") as temp_file:
temp_file.write(b"test file content")
# Find the open file action in the toolbar
open_action = find_action_by_text(editor.toolbar, "Open File")
assert open_action is not None, "Open File action should be found"
# Mock the file dialog and built-in open function
with patch("qtpy.QtWidgets.QFileDialog.getOpenFileName", return_value=(temp_file.name, "")):
with patch("builtins.open", new_callable=mock_open, read_data="test file content"):
open_action.trigger()
# Verify if the editor's text is set to the file content
assert editor.editor.text() == "test file content"
# Clean up
os.remove(temp_file.name)
def test_save_file_through_toolbar(editor):
"""Test the save_file method through the ModularToolBar."""
# Set some text in the editor
editor.editor.setText("test save content")
# Find the save file action in the toolbar
save_action = find_action_by_text(editor.toolbar, "Save File")
assert save_action is not None, "Save File action should be found"
# Mock the file dialog and built-in open function
with patch(
"qtpy.QtWidgets.QFileDialog.getSaveFileName", return_value=("/path/to/save/file.py", "")
):
with patch("builtins.open", new_callable=mock_open) as mock_file:
save_action.trigger()
# Verify if the file was opened correctly for writing
mock_file.assert_called_with("/path/to/save/file.py", "w")
# Verify if the editor's text was written to the file
mock_file().write.assert_called_with("test save content")

View File

@ -0,0 +1,115 @@
# pylint: disable = no-name-in-module,missing-class-docstring, missing-module-docstring
import json
from unittest.mock import MagicMock, patch
import numpy as np
import pyqtgraph as pg
import pytest
import zmq
from bec_widgets.examples.eiger_plot.eiger_plot import EigerPlot
# Common fixture for all tests
@pytest.fixture
def eiger_plot_instance(qtbot):
widget = EigerPlot()
qtbot.addWidget(widget)
qtbot.waitExposed(widget)
yield widget
widget.close()
@pytest.mark.parametrize(
"fft_checked, rotation_index, transpose_checked, log_checked, expected_image",
[
(False, 0, False, False, np.array([[2, 1], [1, 5]], dtype=float)), # just mask
(False, 1, False, False, np.array([[1, 5], [2, 1]], dtype=float)), # 90 deg rotation
(False, 2, False, False, np.array([[5, 1], [1, 2]], dtype=float)), # 180 deg rotation
(False, 0, True, False, np.array([[2, 1], [1, 5]], dtype=float)), # transposed
(False, 0, False, True, np.array([[0.30103, 0.0], [0.0, 0.69897]], dtype=float)), # log
(True, 0, False, False, np.array([[5.0, 3.0], [3.0, 9.0]], dtype=float)), # FFT
],
)
def test_on_image_update(
qtbot,
eiger_plot_instance,
fft_checked,
rotation_index,
transpose_checked,
log_checked,
expected_image,
):
# Initialize image and mask
eiger_plot_instance.image = np.array([[1, 2], [3, 4]], dtype=float)
eiger_plot_instance.mask = np.array([[0, 1], [1, 0]], dtype=float)
# Mock UI elements
eiger_plot_instance.checkBox_FFT = MagicMock()
eiger_plot_instance.checkBox_FFT.isChecked.return_value = fft_checked
eiger_plot_instance.comboBox_rotation = MagicMock()
eiger_plot_instance.comboBox_rotation.currentIndex.return_value = rotation_index
eiger_plot_instance.checkBox_transpose = MagicMock()
eiger_plot_instance.checkBox_transpose.isChecked.return_value = transpose_checked
eiger_plot_instance.checkBox_log = MagicMock()
eiger_plot_instance.checkBox_log.isChecked.return_value = log_checked
eiger_plot_instance.imageItem = MagicMock()
# Call the method
eiger_plot_instance.on_image_update()
# Validate the transformations
np.testing.assert_array_almost_equal(eiger_plot_instance.image, expected_image, decimal=5)
# Validate that setImage was called
eiger_plot_instance.imageItem.setImage.assert_called_with(
eiger_plot_instance.image, autoLevels=False
)
def test_init_ui(eiger_plot_instance):
assert isinstance(eiger_plot_instance.plot_item, pg.PlotItem)
assert isinstance(eiger_plot_instance.imageItem, pg.ImageItem)
assert isinstance(eiger_plot_instance.hist, pg.HistogramLUTItem)
def test_start_zmq_consumer(eiger_plot_instance):
with patch("threading.Thread") as MockThread:
eiger_plot_instance.start_zmq_consumer()
MockThread.assert_called_once()
MockThread.return_value.start.assert_called_once()
def test_zmq_consumer(eiger_plot_instance, qtbot):
fake_meta = json.dumps({"type": "int32", "shape": (2, 2)}).encode("utf-8")
fake_data = np.array([[1, 2], [3, 4]], dtype="int32").tobytes()
with patch("zmq.Context", autospec=True) as MockContext:
mock_socket = MagicMock()
mock_socket.recv_multipart.side_effect = ((fake_meta, fake_data),)
MockContext.return_value.socket.return_value = mock_socket
# Mocking the update_signal to check if it gets emitted
eiger_plot_instance.update_signal = MagicMock()
with patch("zmq.Poller"):
# will do only 1 iteration of the loop in the thread
eiger_plot_instance._zmq_consumer_exit_event.set()
# Run the method under test
consumer_thread = eiger_plot_instance.start_zmq_consumer()
consumer_thread.join()
# Check if zmq methods are called
# MockContext.assert_called_once()
assert MockContext.call_count == 1
mock_socket.connect.assert_called_with("tcp://129.129.95.38:20000")
mock_socket.setsockopt_string.assert_called_with(zmq.SUBSCRIBE, "")
mock_socket.recv_multipart.assert_called()
# Check if update_signal was emitted
eiger_plot_instance.update_signal.emit.assert_called_once()
# Validate the image data
np.testing.assert_array_equal(
eiger_plot_instance.image, np.array([[1, 2], [3, 4]], dtype="int32")
)

View File

@ -0,0 +1,81 @@
from textwrap import dedent
import black
import pytest
from bec_widgets.cli.generate_cli import ClientGenerator
# Mock classes to test the generator
class MockBECWaveform1D:
USER_ACCESS = ["set_frequency", "set_amplitude"]
def set_frequency(self, frequency: float) -> list:
"""Set the frequency of the waveform."""
return [frequency]
def set_amplitude(self, amplitude: float) -> tuple[float, float]:
"""Set the amplitude of the waveform."""
return amplitude, amplitude
class MockBECFigure:
USER_ACCESS = ["add_plot", "remove_plot"]
def add_plot(self, plot_id: str):
"""Add a plot to the figure."""
pass
def remove_plot(self, plot_id: str):
"""Remove a plot from the figure."""
pass
def test_client_generator_with_black_formatting():
generator = ClientGenerator()
generator.generate_client([MockBECWaveform1D, MockBECFigure])
# Format the expected output with black to ensure it matches the generator output
expected_output = dedent(
'''\
# This file was automatically generated by generate_cli.py
from bec_widgets.cli.client_utils import rpc_call, RPCBase, BECFigureClientMixin
from typing import Literal, Optional, overload
class MockBECWaveform1D(RPCBase):
@rpc_call
def set_frequency(self, frequency: float) -> list:
"""
Set the frequency of the waveform.
"""
@rpc_call
def set_amplitude(self, amplitude: float) -> tuple[float, float]:
"""
Set the amplitude of the waveform.
"""
class MockBECFigure(RPCBase):
@rpc_call
def add_plot(self, plot_id: str):
"""
Add a plot to the figure.
"""
@rpc_call
def remove_plot(self, plot_id: str):
"""
Remove a plot from the figure.
"""
'''
)
expected_output_formatted = black.format_str(
expected_output, mode=black.FileMode(line_length=100)
).lstrip()
generated_output_formatted = black.format_str(
generator.header + "\n" + generator.content, mode=black.FileMode(line_length=100)
)
assert expected_output_formatted == generated_output_formatted

View File

@ -0,0 +1,582 @@
# pylint: disable = no-name-in-module,missing-class-docstring, missing-module-docstring
from unittest.mock import MagicMock, patch
import pytest
from bec_lib.devicemanager import DeviceContainer
from bec_widgets.examples import (
MotorControlApp,
MotorControlMap,
MotorControlPanel,
MotorControlPanelAbsolute,
MotorControlPanelRelative,
)
from bec_widgets.widgets import (
MotorControlAbsolute,
MotorControlRelative,
MotorControlSelection,
MotorCoordinateTable,
MotorThread,
)
from bec_widgets.widgets.motor_control.motor_control import MotorActions
from .client_mocks import mocked_client
CONFIG_DEFAULT = {
"motor_control": {
"motor_x": "samx",
"motor_y": "samy",
"step_size_x": 3,
"step_size_y": 3,
"precision": 4,
"step_x_y_same": False,
"move_with_arrows": False,
},
"plot_settings": {
"colormap": "Greys",
"scatter_size": 5,
"max_points": 1000,
"num_dim_points": 100,
"precision": 2,
"num_columns": 1,
"background_value": 25,
},
"motors": [
{
"plot_name": "Motor Map",
"x_label": "Motor X",
"y_label": "Motor Y",
"signals": {
"x": [{"name": "samx", "entry": "samx"}],
"y": [{"name": "samy", "entry": "samy"}],
},
},
],
}
#######################################################
# Motor Thread
#######################################################
@pytest.fixture
def motor_thread(mocked_client):
"""Fixture for MotorThread with a mocked client."""
return MotorThread(client=mocked_client)
def test_motor_thread_initialization(mocked_client):
motor_thread = MotorThread(client=mocked_client)
assert motor_thread.client == mocked_client
assert isinstance(motor_thread.dev, DeviceContainer)
def test_get_all_motors_names(mocked_client):
motor_thread = MotorThread(client=mocked_client)
motor_names = motor_thread.get_all_motors_names()
expected_names = ["samx", "samy", "aptrx", "aptry"]
assert sorted(motor_names) == sorted(expected_names)
assert all(name in motor_names for name in expected_names)
assert len(motor_names) == len(expected_names) # Ensure only these motors are returned
def test_get_coordinates(mocked_client):
motor_thread = MotorThread(client=mocked_client)
motor_x, motor_y = "samx", "samy"
x, y = motor_thread.get_coordinates(motor_x, motor_y)
assert x == mocked_client.device_manager.devices[motor_x].readback.get()
assert y == mocked_client.device_manager.devices[motor_y].readback.get()
def test_move_motor_absolute_by_run(mocked_client):
motor_thread = MotorThread(client=mocked_client)
motor_thread.motor_x = "samx"
motor_thread.motor_y = "samy"
motor_thread.target_coordinates = (5.0, -3.0)
motor_thread.action = MotorActions.MOVE_ABSOLUTE
motor_thread.run()
assert mocked_client.device_manager.devices["samx"].read_value == 5.0
assert mocked_client.device_manager.devices["samy"].read_value == -3.0
def test_move_motor_relative_by_run(mocked_client):
motor_thread = MotorThread(client=mocked_client)
initial_value = motor_thread.dev["samx"].read()["samx"]["value"]
move_value = 2.0
expected_value = initial_value + move_value
motor_thread.motor = "samx"
motor_thread.value = move_value
motor_thread.action = MotorActions.MOVE_RELATIVE
motor_thread.run()
assert mocked_client.device_manager.devices["samx"].read_value == expected_value
def test_motor_thread_move_absolute(motor_thread):
motor_x = "samx"
motor_y = "samy"
target_x = 5.0
target_y = -3.0
motor_thread.move_absolute(motor_x, motor_y, (target_x, target_y))
motor_thread.wait()
assert motor_thread.dev[motor_x].read()["samx"]["value"] == target_x
assert motor_thread.dev[motor_y].read()["samy"]["value"] == target_y
def test_motor_thread_move_relative(motor_thread):
motor_name = "samx"
move_value = 2.0
initial_value = motor_thread.dev[motor_name].read()["samx"]["value"]
motor_thread.move_relative(motor_name, move_value)
motor_thread.wait()
expected_value = initial_value + move_value
assert motor_thread.dev[motor_name].read()["samx"]["value"] == expected_value
#######################################################
# Motor Control Widgets - MotorControlSelection
#######################################################
@pytest.fixture(scope="function")
def motor_selection_widget(qtbot, mocked_client, motor_thread):
"""Fixture for creating a MotorControlSelection widget with a mocked client."""
widget = MotorControlSelection(
client=mocked_client, config=CONFIG_DEFAULT, motor_thread=motor_thread
)
qtbot.addWidget(widget)
qtbot.waitExposed(widget)
return widget
def test_initialization_and_population(motor_selection_widget):
assert motor_selection_widget.comboBox_motor_x.count() == 4
assert motor_selection_widget.comboBox_motor_x.itemText(0) == "samx"
assert motor_selection_widget.comboBox_motor_y.itemText(1) == "samy"
assert motor_selection_widget.comboBox_motor_x.itemText(2) == "aptrx"
assert motor_selection_widget.comboBox_motor_y.itemText(3) == "aptry"
def test_selection_and_signal_emission(motor_selection_widget):
# Connect signal to a custom slot to capture the emitted values
emitted_values = []
def capture_emitted_values(motor_x, motor_y):
emitted_values.append((motor_x, motor_y))
motor_selection_widget.selected_motors_signal.connect(capture_emitted_values)
# Select motors
motor_selection_widget.comboBox_motor_x.setCurrentIndex(0) # Select 'samx'
motor_selection_widget.comboBox_motor_y.setCurrentIndex(1) # Select 'samy'
motor_selection_widget.pushButton_connecMotors.click() # Emit the signal
# Verify the emitted signal
assert emitted_values == [
("samx", "samy")
], "The emitted signal did not match the expected values"
def test_configuration_update(motor_selection_widget):
new_config = {"motor_control": {"motor_x": "samy", "motor_y": "samx"}}
motor_selection_widget.on_config_update(new_config)
assert motor_selection_widget.comboBox_motor_x.currentText() == "samy"
assert motor_selection_widget.comboBox_motor_y.currentText() == "samx"
def test_enable_motor_controls(motor_selection_widget):
motor_selection_widget.enable_motor_controls(False)
assert not motor_selection_widget.comboBox_motor_x.isEnabled()
assert not motor_selection_widget.comboBox_motor_y.isEnabled()
motor_selection_widget.enable_motor_controls(True)
assert motor_selection_widget.comboBox_motor_x.isEnabled()
assert motor_selection_widget.comboBox_motor_y.isEnabled()
#######################################################
# Motor Control Widgets - MotorControlAbsolute
#######################################################
@pytest.fixture(scope="function")
def motor_absolute_widget(qtbot, mocked_client, motor_thread):
widget = MotorControlAbsolute(
client=mocked_client, config=CONFIG_DEFAULT, motor_thread=motor_thread
)
qtbot.addWidget(widget)
qtbot.waitExposed(widget)
return widget
def test_absolute_initialization(motor_absolute_widget):
motor_absolute_widget.change_motors("samx", "samy")
motor_absolute_widget.on_config_update(CONFIG_DEFAULT)
assert motor_absolute_widget.motor_x == "samx", "Motor X not initialized correctly"
assert motor_absolute_widget.motor_y == "samy", "Motor Y not initialized correctly"
assert motor_absolute_widget.precision == CONFIG_DEFAULT["motor_control"]["precision"]
def test_absolute_save_current_coordinates(motor_absolute_widget):
motor_x_value = motor_absolute_widget.client.device_manager.devices["samx"].read()["samx"][
"value"
]
motor_y_value = motor_absolute_widget.client.device_manager.devices["samy"].read()["samy"][
"value"
]
motor_absolute_widget.change_motors("samx", "samy")
emitted_coordinates = []
def capture_emit(x_y):
emitted_coordinates.append(x_y)
motor_absolute_widget.coordinates_signal.connect(capture_emit)
# Trigger saving current coordinates
motor_absolute_widget.pushButton_save.click()
assert emitted_coordinates == [(motor_x_value, motor_y_value)]
def test_absolute_set_absolute_coordinates(motor_absolute_widget):
motor_absolute_widget.spinBox_absolute_x.setValue(5)
motor_absolute_widget.spinBox_absolute_y.setValue(10)
# Connect to the coordinates_signal to capture emitted values
emitted_values = []
def capture_coordinates(x_y):
emitted_values.append(x_y)
motor_absolute_widget.coordinates_signal.connect(capture_coordinates)
# Simulate button click for absolute movement
motor_absolute_widget.pushButton_set.click()
assert emitted_values == [(5, 10)]
def test_absolute_go_absolute_coordinates(motor_absolute_widget):
motor_absolute_widget.change_motors("samx", "samy")
motor_absolute_widget.spinBox_absolute_x.setValue(5)
motor_absolute_widget.spinBox_absolute_y.setValue(10)
with patch(
"bec_widgets.widgets.motor_control.motor_control.MotorThread.move_absolute",
new_callable=MagicMock,
) as mock_move_absolute:
motor_absolute_widget.pushButton_go_absolute.click()
mock_move_absolute.assert_called_once_with("samx", "samy", (5, 10))
def test_change_motor_absolute(motor_absolute_widget):
motor_absolute_widget.change_motors("aptrx", "aptry")
assert motor_absolute_widget.motor_x == "aptrx"
assert motor_absolute_widget.motor_y == "aptry"
motor_absolute_widget.change_motors("samx", "samy")
assert motor_absolute_widget.motor_x == "samx"
assert motor_absolute_widget.motor_y == "samy"
def test_set_precision(motor_absolute_widget):
motor_absolute_widget.on_config_update(CONFIG_DEFAULT)
motor_absolute_widget.set_precision(2)
assert motor_absolute_widget.spinBox_absolute_x.decimals() == 2
assert motor_absolute_widget.spinBox_absolute_y.decimals() == 2
#######################################################
# Motor Control Widgets - MotorControlRelative
#######################################################
@pytest.fixture(scope="function")
def motor_relative_widget(qtbot, mocked_client, motor_thread):
widget = MotorControlRelative(
client=mocked_client, config=CONFIG_DEFAULT, motor_thread=motor_thread
)
qtbot.addWidget(widget)
qtbot.waitExposed(widget)
return widget
def test_initialization_and_config_update(motor_relative_widget):
motor_relative_widget.on_config_update(CONFIG_DEFAULT)
assert motor_relative_widget.motor_x == CONFIG_DEFAULT["motor_control"]["motor_x"]
assert motor_relative_widget.motor_y == CONFIG_DEFAULT["motor_control"]["motor_y"]
assert motor_relative_widget.precision == CONFIG_DEFAULT["motor_control"]["precision"]
# Simulate a configuration update
new_config = {
"motor_control": {
"motor_x": "new_motor_x",
"motor_y": "new_motor_y",
"precision": 2,
"step_size_x": 5,
"step_size_y": 5,
"step_x_y_same": True,
"move_with_arrows": True,
}
}
motor_relative_widget.on_config_update(new_config)
assert motor_relative_widget.motor_x == "new_motor_x"
assert motor_relative_widget.motor_y == "new_motor_y"
assert motor_relative_widget.precision == 2
def test_move_motor_relative(motor_relative_widget):
motor_relative_widget.on_config_update(CONFIG_DEFAULT)
# Set step sizes
motor_relative_widget.spinBox_step_x.setValue(1)
motor_relative_widget.spinBox_step_y.setValue(1)
# Mock the move_relative method
motor_relative_widget.motor_thread.move_relative = MagicMock()
# Simulate button clicks
motor_relative_widget.toolButton_right.click()
motor_relative_widget.motor_thread.move_relative.assert_called_with(
motor_relative_widget.motor_x, 1
)
motor_relative_widget.toolButton_left.click()
motor_relative_widget.motor_thread.move_relative.assert_called_with(
motor_relative_widget.motor_x, -1
)
motor_relative_widget.toolButton_up.click()
motor_relative_widget.motor_thread.move_relative.assert_called_with(
motor_relative_widget.motor_y, 1
)
motor_relative_widget.toolButton_down.click()
motor_relative_widget.motor_thread.move_relative.assert_called_with(
motor_relative_widget.motor_y, -1
)
def test_precision_update(motor_relative_widget):
# Capture emitted precision values
emitted_values = []
def capture_precision(precision):
emitted_values.append(precision)
motor_relative_widget.precision_signal.connect(capture_precision)
# Update precision
motor_relative_widget.spinBox_precision.setValue(1)
assert emitted_values == [1]
assert motor_relative_widget.spinBox_step_x.decimals() == 1
assert motor_relative_widget.spinBox_step_y.decimals() == 1
def test_sync_step_sizes(motor_relative_widget):
motor_relative_widget.on_config_update(CONFIG_DEFAULT)
motor_relative_widget.checkBox_same_xy.setChecked(True)
# Change step size for X
motor_relative_widget.spinBox_step_x.setValue(2)
assert motor_relative_widget.spinBox_step_y.value() == 2
def test_change_motor_relative(motor_relative_widget):
motor_relative_widget.on_config_update(CONFIG_DEFAULT)
motor_relative_widget.change_motors("aptrx", "aptry")
assert motor_relative_widget.motor_x == "aptrx"
assert motor_relative_widget.motor_y == "aptry"
#######################################################
# Motor Control Widgets - MotorCoordinateTable
#######################################################
@pytest.fixture(scope="function")
def motor_coordinate_table(qtbot, mocked_client, motor_thread):
widget = MotorCoordinateTable(
client=mocked_client, config=CONFIG_DEFAULT, motor_thread=motor_thread
)
qtbot.addWidget(widget)
qtbot.waitExposed(widget)
return widget
def test_delete_selected_row(motor_coordinate_table):
# Add a coordinate
motor_coordinate_table.add_coordinate((1.0, 2.0))
motor_coordinate_table.add_coordinate((3.0, 4.0))
# Select the row
motor_coordinate_table.table.selectRow(0)
# Delete the selected row
motor_coordinate_table.delete_selected_row()
assert motor_coordinate_table.table.rowCount() == 1
def test_add_coordinate_and_table_update(motor_coordinate_table):
# Disable Warning message popups for test
motor_coordinate_table.warning_message = False
# Add coordinate in Individual mode
motor_coordinate_table.add_coordinate((1.0, 2.0))
assert motor_coordinate_table.table.rowCount() == 1
# Check if the coordinates match
x_item_individual = motor_coordinate_table.table.cellWidget(0, 3) # Assuming X is in column 3
y_item_individual = motor_coordinate_table.table.cellWidget(0, 4) # Assuming Y is in column 4
assert float(x_item_individual.text()) == 1.0
assert float(y_item_individual.text()) == 2.0
# Switch to Start/Stop and add coordinates
motor_coordinate_table.comboBox_mode.setCurrentIndex(1) # Switch mode
motor_coordinate_table.add_coordinate((3.0, 4.0))
motor_coordinate_table.add_coordinate((5.0, 6.0))
assert motor_coordinate_table.table.rowCount() == 1
def test_plot_coordinates_signal(motor_coordinate_table):
# Connect to the signal
def signal_emitted(coordinates, reference_tag, color):
nonlocal received
received = True
assert len(coordinates) == 1 # Assuming one coordinate was added
assert reference_tag in ["Individual", "Start", "Stop"]
assert color in ["green", "blue", "red"]
received = False
motor_coordinate_table.plot_coordinates_signal.connect(signal_emitted)
# Add a coordinate and check signal
motor_coordinate_table.add_coordinate((1.0, 2.0))
assert received
def test_move_motor_action(motor_coordinate_table):
# Add a coordinate
motor_coordinate_table.add_coordinate((1.0, 2.0))
# Mock the motor thread move_absolute function
motor_coordinate_table.motor_thread.move_absolute = MagicMock()
# Trigger the move action
move_button = motor_coordinate_table.table.cellWidget(0, 1)
move_button.click()
motor_coordinate_table.motor_thread.move_absolute.assert_called_with(
motor_coordinate_table.motor_x, motor_coordinate_table.motor_y, (1.0, 2.0)
)
def test_plot_coordinates_signal_individual(motor_coordinate_table, qtbot):
motor_coordinate_table.warning_message = False
motor_coordinate_table.set_precision(3)
motor_coordinate_table.comboBox_mode.setCurrentIndex(0)
# This list will store the signals emitted during the test
emitted_signals = []
def signal_emitted(coordinates, reference_tag, color):
emitted_signals.append((coordinates, reference_tag, color))
motor_coordinate_table.plot_coordinates_signal.connect(signal_emitted)
# Add new coordinates
motor_coordinate_table.add_coordinate((1.0, 2.0))
qtbot.wait(100)
# Verify the signals
assert len(emitted_signals) > 0, "No signals were emitted."
for coordinates, reference_tag, color in emitted_signals:
assert len(coordinates) > 0, "Coordinates list is empty."
assert reference_tag == "Individual"
assert color == "green"
assert motor_coordinate_table.table.cellWidget(0, 3).text() == "1.000"
assert motor_coordinate_table.table.cellWidget(0, 4).text() == "2.000"
#######################################################
# MotorControl examples compilations
#######################################################
@pytest.fixture(scope="function")
def motor_app(qtbot, mocked_client):
widget = MotorControlApp(config=CONFIG_DEFAULT, client=mocked_client)
qtbot.addWidget(widget)
qtbot.waitExposed(widget)
yield widget
def test_motor_app_initialization(motor_app):
assert isinstance(motor_app, MotorControlApp)
assert motor_app.client is not None
assert motor_app.config == CONFIG_DEFAULT
@pytest.fixture(scope="function")
def motor_control_map(qtbot, mocked_client):
widget = MotorControlMap(config=CONFIG_DEFAULT, client=mocked_client)
qtbot.addWidget(widget)
qtbot.waitExposed(widget)
yield widget
def test_motor_control_map_initialization(motor_control_map):
assert isinstance(motor_control_map, MotorControlMap)
assert motor_control_map.client is not None
assert motor_control_map.config == CONFIG_DEFAULT
@pytest.fixture(scope="function")
def motor_control_panel(qtbot, mocked_client):
widget = MotorControlPanel(config=CONFIG_DEFAULT, client=mocked_client)
qtbot.addWidget(widget)
qtbot.waitExposed(widget)
yield widget
def test_motor_control_panel_initialization(motor_control_panel):
assert isinstance(motor_control_panel, MotorControlPanel)
assert motor_control_panel.client is not None
assert motor_control_panel.config == CONFIG_DEFAULT
@pytest.fixture(scope="function")
def motor_control_panel_absolute(qtbot, mocked_client):
widget = MotorControlPanelAbsolute(config=CONFIG_DEFAULT, client=mocked_client)
qtbot.addWidget(widget)
qtbot.waitExposed(widget)
yield widget
def test_motor_control_panel_absolute_initialization(motor_control_panel_absolute):
assert isinstance(motor_control_panel_absolute, MotorControlPanelAbsolute)
assert motor_control_panel_absolute.client is not None
assert motor_control_panel_absolute.config == CONFIG_DEFAULT
@pytest.fixture(scope="function")
def motor_control_panel_relative(qtbot, mocked_client):
widget = MotorControlPanelRelative(config=CONFIG_DEFAULT, client=mocked_client)
qtbot.addWidget(widget)
qtbot.waitExposed(widget)
yield widget
def test_motor_control_panel_relative_initialization(motor_control_panel_relative):
assert isinstance(motor_control_panel_relative, MotorControlPanelRelative)
assert motor_control_panel_relative.client is not None
assert motor_control_panel_relative.config == CONFIG_DEFAULT

View File

@ -0,0 +1,186 @@
# pylint: disable = no-name-in-module,missing-module-docstring, missing-function-docstring
from unittest.mock import MagicMock
import pytest
from bec_widgets.widgets import MotorMap
from .client_mocks import mocked_client
CONFIG_DEFAULT = {
"plot_settings": {
"colormap": "Greys",
"scatter_size": 5,
"max_points": 1000,
"num_dim_points": 100,
"precision": 2,
"num_columns": 1,
"background_value": 25,
},
"motors": [
{
"plot_name": "Motor Map",
"x_label": "Motor X",
"y_label": "Motor Y",
"signals": {
"x": [{"name": "samx", "entry": "samx"}],
"y": [{"name": "samy", "entry": "samy"}],
},
},
{
"plot_name": "Motor Map 2 ",
"x_label": "Motor X",
"y_label": "Motor Y",
"signals": {
"x": [{"name": "aptrx", "entry": "aptrx"}],
"y": [{"name": "aptry", "entry": "aptry"}],
},
},
],
}
CONFIG_ONE_DEVICE = {
"plot_settings": {
"colormap": "Greys",
"scatter_size": 5,
"max_points": 1000,
"num_dim_points": 100,
"precision": 2,
"num_columns": 1,
"background_value": 25,
},
"motors": [
{
"plot_name": "Motor Map",
"x_label": "Motor X",
"y_label": "Motor Y",
"signals": {
"x": [{"name": "samx", "entry": "samx"}],
"y": [{"name": "samy", "entry": "samy"}],
},
},
],
}
@pytest.fixture(scope="function")
def motor_map(qtbot, mocked_client):
widget = MotorMap(client=mocked_client)
qtbot.addWidget(widget)
qtbot.waitExposed(widget)
yield widget
def test_motor_limits_initialization(motor_map):
# Example test to check if motor limits are correctly initialized
expected_limits = {
"samx": [-10, 10],
"samy": [-5, 5],
}
for motor_name, expected_limit in expected_limits.items():
actual_limit = motor_map._get_motor_limit(motor_name)
assert actual_limit == expected_limit
def test_motor_initial_position(motor_map):
motor_map.precision = 2
motor_map_dev = motor_map.client.device_manager.devices
# Example test to check if motor initial positions are correctly initialized
expected_positions = {
("samx", "samx"): motor_map_dev["samx"].read()["samx"]["value"],
("samy", "samy"): motor_map_dev["samy"].read()["samy"]["value"],
("aptrx", "aptrx"): motor_map_dev["aptrx"].read()["aptrx"]["value"],
("aptry", "aptry"): motor_map_dev["aptry"].read()["aptry"]["value"],
}
for (motor_name, entry), expected_position in expected_positions.items():
actual_position = motor_map._get_motor_init_position(motor_name, entry)
assert actual_position == expected_position
@pytest.mark.parametrize(
"config, number_of_plots",
[
(CONFIG_DEFAULT, 2),
(CONFIG_ONE_DEVICE, 1),
],
)
def test_initialization(motor_map, config, number_of_plots):
config_load = config
motor_map.on_config_update(config_load)
assert isinstance(motor_map, MotorMap)
assert motor_map.client is not None
assert motor_map.config == config_load
assert len(motor_map.plot_data) == number_of_plots
def test_motor_movement_updates_position_and_database(motor_map):
motor_map.on_config_update(CONFIG_DEFAULT)
# Initial positions
initial_position_samx = 2.0
initial_position_samy = 3.0
# Set initial positions in the mocked database
motor_map.database["samx"]["samx"] = [initial_position_samx]
motor_map.database["samy"]["samy"] = [initial_position_samy]
# Simulate motor movement for 'samx' only
new_position_samx = 4.0
motor_map.on_device_readback({"signals": {"samx": {"value": new_position_samx}}})
# Verify database update for 'samx'
assert motor_map.database["samx"]["samx"] == [
initial_position_samx,
new_position_samx,
]
# Verify 'samy' retains its last known position
assert motor_map.database["samy"]["samy"] == [
initial_position_samy,
initial_position_samy,
]
def test_scatter_plot_rendering(motor_map):
motor_map.on_config_update(CONFIG_DEFAULT)
# Set initial positions
initial_position_samx = 2.0
initial_position_samy = 3.0
motor_map.database["samx"]["samx"] = [initial_position_samx]
motor_map.database["samy"]["samy"] = [initial_position_samy]
# Simulate motor movement for 'samx' only
new_position_samx = 4.0
motor_map.on_device_readback({"signals": {"samx": {"value": new_position_samx}}})
motor_map._update_plots()
# Get the scatter plot item
plot_name = "Motor Map" # Update as per your actual plot name
scatter_plot_item = motor_map.curves_data[plot_name]["pos"]
# Check the scatter plot item properties
assert len(scatter_plot_item.data) > 0, "Scatter plot data is empty"
x_data = scatter_plot_item.data["x"]
y_data = scatter_plot_item.data["y"]
assert x_data[-1] == new_position_samx, "Scatter plot X data not updated correctly"
assert (
y_data[-1] == initial_position_samy
), "Scatter plot Y data should retain last known position"
def test_plot_visualization_consistency(motor_map):
motor_map.on_config_update(CONFIG_DEFAULT)
# Simulate updating the plot with new data
motor_map.on_device_readback({"signals": {"samx": {"value": 5}}})
motor_map.on_device_readback({"signals": {"samy": {"value": 9}}})
motor_map._update_plots()
plot_name = "Motor Map"
scatter_plot_item = motor_map.curves_data[plot_name]["pos"]
# Check if the scatter plot reflects the new data correctly
assert (
scatter_plot_item.data["x"][-1] == 5 and scatter_plot_item.data["y"][-1] == 9
), "Plot not updated correctly with new data"

View File

View File

@ -0,0 +1,989 @@
from bec_lib.messages import AvailableResourceMessage
available_scans_message = AvailableResourceMessage(
resource={
"acquire": {
"class": "Acquire",
"base_class": "ScanBase",
"arg_input": {},
"required_kwargs": [],
"arg_bundle_size": {"bundle": 0, "min": None, "max": None},
"scan_report_hint": "table",
"doc": "\n A simple acquisition at the current position.\n\n Args:\n burst: number of acquisition per point\n\n Returns:\n ScanReport\n\n Examples:\n >>> scans.acquire(exp_time=0.1, relative=True)\n\n ",
"signature": [
{
"name": "args",
"kind": "VAR_POSITIONAL",
"default": "_empty",
"annotation": "_empty",
},
{"name": "exp_time", "kind": "KEYWORD_ONLY", "default": 0, "annotation": "float"},
{
"name": "burst_at_each_point",
"kind": "KEYWORD_ONLY",
"default": 1,
"annotation": "int",
},
{
"name": "kwargs",
"kind": "VAR_KEYWORD",
"default": "_empty",
"annotation": "_empty",
},
],
},
"interactive_scan_trigger": {
"class": "AddInteractiveScanPoint",
"base_class": "ScanComponent",
"arg_input": {"device": "device"},
"required_kwargs": ["required"],
"arg_bundle_size": {"bundle": 1, "min": 1, "max": None},
"scan_report_hint": "",
"doc": "\n An interactive scan for one or more motors.\n\n Args:\n *args: devices\n exp_time: exposure time in s\n steps: number of steps (please note: 5 steps == 6 positions)\n relative: Start from an absolute or relative position\n burst: number of acquisition per point\n\n Returns:\n ScanReport\n\n Examples:\n >>> scans.interactive_scan_trigger()\n\n ",
"signature": [
{
"name": "args",
"kind": "VAR_POSITIONAL",
"default": "_empty",
"annotation": "_empty",
},
{
"name": "kwargs",
"kind": "VAR_KEYWORD",
"default": "_empty",
"annotation": "_empty",
},
],
},
"close_interactive_scan": {
"class": "CloseInteractiveScan",
"base_class": "ScanComponent",
"arg_input": {},
"required_kwargs": ["required"],
"arg_bundle_size": {"bundle": 0, "min": None, "max": None},
"scan_report_hint": "",
"doc": "\n An interactive scan for one or more motors.\n\n Args:\n *args: devices\n exp_time: exposure time in s\n steps: number of steps (please note: 5 steps == 6 positions)\n relative: Start from an absolute or relative position\n burst: number of acquisition per point\n\n Returns:\n ScanReport\n\n Examples:\n >>> scans.close_interactive_scan(dev.motor1, dev.motor2, exp_time=0.1)\n\n ",
"signature": [
{
"name": "args",
"kind": "VAR_POSITIONAL",
"default": "_empty",
"annotation": "_empty",
},
{
"name": "kwargs",
"kind": "VAR_KEYWORD",
"default": "_empty",
"annotation": "_empty",
},
],
},
"close_scan_def": {
"class": "CloseScanDef",
"base_class": "RequestBase",
"arg_input": {},
"required_kwargs": [],
"arg_bundle_size": {"bundle": 0, "min": 0, "max": 0},
"scan_report_hint": "table",
"doc": None,
"signature": [
{
"name": "args",
"kind": "VAR_POSITIONAL",
"default": "_empty",
"annotation": "_empty",
},
{
"name": "device_manager",
"kind": "KEYWORD_ONLY",
"default": None,
"annotation": "DeviceManagerBase",
},
{
"name": "monitored",
"kind": "KEYWORD_ONLY",
"default": None,
"annotation": "list",
},
{
"name": "parameter",
"kind": "KEYWORD_ONLY",
"default": None,
"annotation": "dict",
},
{"name": "metadata", "kind": "KEYWORD_ONLY", "default": None, "annotation": "dict"},
{
"name": "kwargs",
"kind": "VAR_KEYWORD",
"default": "_empty",
"annotation": "_empty",
},
],
},
"close_scan_group": {
"class": "CloseScanGroup",
"base_class": "RequestBase",
"arg_input": {},
"required_kwargs": [],
"arg_bundle_size": {"bundle": 0, "min": 0, "max": 0},
"scan_report_hint": None,
"doc": None,
"signature": [
{
"name": "args",
"kind": "VAR_POSITIONAL",
"default": "_empty",
"annotation": "_empty",
},
{
"name": "device_manager",
"kind": "KEYWORD_ONLY",
"default": None,
"annotation": "DeviceManagerBase",
},
{
"name": "monitored",
"kind": "KEYWORD_ONLY",
"default": None,
"annotation": "list",
},
{
"name": "parameter",
"kind": "KEYWORD_ONLY",
"default": None,
"annotation": "dict",
},
{"name": "metadata", "kind": "KEYWORD_ONLY", "default": None, "annotation": "dict"},
{
"name": "kwargs",
"kind": "VAR_KEYWORD",
"default": "_empty",
"annotation": "_empty",
},
],
},
"cont_line_scan": {
"class": "ContLineScan",
"base_class": "ScanBase",
"arg_input": {"device": "device", "start": "float", "stop": "float"},
"required_kwargs": ["steps", "relative"],
"arg_bundle_size": {"bundle": 3, "min": 1, "max": 1},
"scan_report_hint": "table",
"doc": "\n A line scan for one or more motors.\n\n Args:\n *args (Device, float, float): pairs of device / start position / end position\n exp_time (float): exposure time in seconds. Default is 0.\n steps (int): number of steps. Default is 10.\n relative (bool): if True, the motors will be moved relative to their current position. Default is False.\n burst_at_each_point (int): number of exposures at each point. Default is 1.\n offset (float): offset in motor units. Default is 100.\n\n Returns:\n ScanReport\n\n Examples:\n >>> scans.cont_line_scan(dev.motor1, -5, 5, steps=10, exp_time=0.1, relative=True)\n\n ",
"signature": [
{
"name": "args",
"kind": "VAR_POSITIONAL",
"default": "_empty",
"annotation": "_empty",
},
{"name": "exp_time", "kind": "KEYWORD_ONLY", "default": 0, "annotation": "float"},
{"name": "steps", "kind": "KEYWORD_ONLY", "default": 10, "annotation": "int"},
{
"name": "relative",
"kind": "KEYWORD_ONLY",
"default": False,
"annotation": "bool",
},
{"name": "offset", "kind": "KEYWORD_ONLY", "default": 100, "annotation": "float"},
{
"name": "burst_at_each_point",
"kind": "KEYWORD_ONLY",
"default": 1,
"annotation": "int",
},
{
"name": "kwargs",
"kind": "VAR_KEYWORD",
"default": "_empty",
"annotation": "_empty",
},
],
},
"device_rpc": {
"class": "DeviceRPC",
"base_class": "RequestBase",
"arg_input": ["device", "str", "list", "dict"],
"required_kwargs": [],
"arg_bundle_size": {"bundle": 4, "min": 1, "max": 1},
"scan_report_hint": None,
"doc": None,
"signature": [
{
"name": "args",
"kind": "VAR_POSITIONAL",
"default": "_empty",
"annotation": "_empty",
},
{
"name": "device_manager",
"kind": "KEYWORD_ONLY",
"default": None,
"annotation": "DeviceManagerBase",
},
{
"name": "monitored",
"kind": "KEYWORD_ONLY",
"default": None,
"annotation": "list",
},
{
"name": "parameter",
"kind": "KEYWORD_ONLY",
"default": None,
"annotation": "dict",
},
{"name": "metadata", "kind": "KEYWORD_ONLY", "default": None, "annotation": "dict"},
{
"name": "kwargs",
"kind": "VAR_KEYWORD",
"default": "_empty",
"annotation": "_empty",
},
],
},
"fermat_scan": {
"class": "FermatSpiralScan",
"base_class": "ScanBase",
"arg_input": {"device": "device", "start": "float", "stop": "float"},
"required_kwargs": ["step", "relative"],
"arg_bundle_size": {"bundle": 3, "min": 2, "max": 2},
"scan_report_hint": "table",
"doc": '\n A scan following Fermat\'s spiral.\n\n Args:\n *args: pairs of device / start position / end position arguments\n step (float): step size in motor units. Default is 0.1.\n exp_time (float): exposure time in seconds. Default is 0.\n settling_time (float): settling time in seconds. Default is 0.\n relative (bool): if True, the motors will be moved relative to their current position. Default is False.\n burst_at_each_point (int): number of exposures at each point. Default is 1.\n spiral_type (float): type of spiral to use. Default is 0.\n optim_trajectory (str): trajectory optimization method. Default is None. Options are "corridor" and "none".\n\n Returns:\n ScanReport\n\n Examples:\n >>> scans.fermat_scan(dev.motor1, -5, 5, dev.motor2, -5, 5, step=0.5, exp_time=0.1, relative=True, optim_trajectory="corridor")\n\n ',
"signature": [
{
"name": "args",
"kind": "VAR_POSITIONAL",
"default": "_empty",
"annotation": "_empty",
},
{"name": "step", "kind": "KEYWORD_ONLY", "default": 0.1, "annotation": "float"},
{"name": "exp_time", "kind": "KEYWORD_ONLY", "default": 0, "annotation": "float"},
{
"name": "settling_time",
"kind": "KEYWORD_ONLY",
"default": 0,
"annotation": "float",
},
{
"name": "relative",
"kind": "KEYWORD_ONLY",
"default": False,
"annotation": "bool",
},
{
"name": "burst_at_each_point",
"kind": "KEYWORD_ONLY",
"default": 1,
"annotation": "int",
},
{
"name": "spiral_type",
"kind": "KEYWORD_ONLY",
"default": 0,
"annotation": "float",
},
{
"name": "optim_trajectory",
"kind": "KEYWORD_ONLY",
"default": None,
"annotation": {"Literal": ["corridor", None]},
},
{
"name": "kwargs",
"kind": "VAR_KEYWORD",
"default": "_empty",
"annotation": "_empty",
},
],
},
"line_scan": {
"class": "LineScan",
"base_class": "ScanBase",
"arg_input": {"device": "device", "start": "float", "stop": "float"},
"required_kwargs": ["steps", "relative"],
"arg_bundle_size": {"bundle": 3, "min": 1, "max": None},
"scan_report_hint": "table",
"doc": "\n A line scan for one or more motors.\n\n Args:\n *args (Device, float, float): pairs of device / start position / end position\n exp_time (float): exposure time in s. Default: 0\n steps (int): number of steps. Default: 10\n relative (bool): if True, the start and end positions are relative to the current position. Default: False\n burst_at_each_point (int): number of acquisition per point. Default: 1\n\n Returns:\n ScanReport\n\n Examples:\n >>> scans.line_scan(dev.motor1, -5, 5, dev.motor2, -5, 5, steps=10, exp_time=0.1, relative=True)\n\n ",
"signature": [
{
"name": "args",
"kind": "VAR_POSITIONAL",
"default": "_empty",
"annotation": "_empty",
},
{"name": "exp_time", "kind": "KEYWORD_ONLY", "default": 0, "annotation": "float"},
{"name": "steps", "kind": "KEYWORD_ONLY", "default": None, "annotation": "int"},
{
"name": "relative",
"kind": "KEYWORD_ONLY",
"default": False,
"annotation": "bool",
},
{
"name": "burst_at_each_point",
"kind": "KEYWORD_ONLY",
"default": 1,
"annotation": "int",
},
{
"name": "kwargs",
"kind": "VAR_KEYWORD",
"default": "_empty",
"annotation": "_empty",
},
],
},
"list_scan": {
"class": "ListScan",
"base_class": "ScanBase",
"arg_input": {"device": "device", "positions": "list"},
"required_kwargs": ["relative"],
"arg_bundle_size": {"bundle": 2, "min": 1, "max": None},
"scan_report_hint": "table",
"doc": "\n A scan following the positions specified in a list.\n Please note that all lists must be of equal length.\n\n Args:\n *args: pairs of motors and position lists\n relative: Start from an absolute or relative position\n burst: number of acquisition per point\n\n Returns:\n ScanReport\n\n Examples:\n >>> scans.list_scan(dev.motor1, [0,1,2,3,4], dev.motor2, [4,3,2,1,0], exp_time=0.1, relative=True)\n\n ",
"signature": [
{
"name": "args",
"kind": "VAR_POSITIONAL",
"default": "_empty",
"annotation": "_empty",
},
{
"name": "parameter",
"kind": "KEYWORD_ONLY",
"default": None,
"annotation": "dict",
},
{
"name": "kwargs",
"kind": "VAR_KEYWORD",
"default": "_empty",
"annotation": "_empty",
},
],
},
"monitor_scan": {
"class": "MonitorScan",
"base_class": "ScanBase",
"arg_input": {"device": "device", "start": "float", "stop": "float"},
"required_kwargs": ["relative"],
"arg_bundle_size": {"bundle": 3, "min": 1, "max": 1},
"scan_report_hint": "table",
"doc": "\n Readout all primary devices at each update of the monitored device.\n\n Args:\n device (Device): monitored device\n start (float): start position of the monitored device\n stop (float): stop position of the monitored device\n\n Returns:\n ScanReport\n\n Examples:\n >>> scans.monitor_scan(dev.motor1, -5, 5, exp_time=0.1, relative=True)\n\n ",
"signature": [
{
"name": "device",
"kind": "POSITIONAL_OR_KEYWORD",
"default": "_empty",
"annotation": "_empty",
},
{
"name": "start",
"kind": "POSITIONAL_OR_KEYWORD",
"default": "_empty",
"annotation": "float",
},
{
"name": "stop",
"kind": "POSITIONAL_OR_KEYWORD",
"default": "_empty",
"annotation": "float",
},
{
"name": "args",
"kind": "VAR_POSITIONAL",
"default": "_empty",
"annotation": "_empty",
},
{
"name": "relative",
"kind": "KEYWORD_ONLY",
"default": False,
"annotation": "bool",
},
{
"name": "kwargs",
"kind": "VAR_KEYWORD",
"default": "_empty",
"annotation": "_empty",
},
],
},
"mv": {
"class": "Move",
"base_class": "RequestBase",
"arg_input": {"device": "device", "target": "float"},
"required_kwargs": ["relative"],
"arg_bundle_size": {"bundle": 2, "min": 1, "max": None},
"scan_report_hint": None,
"doc": "\n Move device(s) to an absolute position\n Args:\n *args (Device, float): pairs of device / position arguments\n relative (bool): if True, move relative to current position\n\n Returns:\n ScanReport\n\n Examples:\n >>> scans.mv(dev.samx, 1, dev.samy,2)\n ",
"signature": [
{
"name": "args",
"kind": "VAR_POSITIONAL",
"default": "_empty",
"annotation": "_empty",
},
{
"name": "relative",
"kind": "KEYWORD_ONLY",
"default": False,
"annotation": "_empty",
},
{
"name": "kwargs",
"kind": "VAR_KEYWORD",
"default": "_empty",
"annotation": "_empty",
},
],
},
"open_interactive_scan": {
"class": "OpenInteractiveScan",
"base_class": "ScanComponent",
"arg_input": {"device": "device"},
"required_kwargs": [],
"arg_bundle_size": {"bundle": 1, "min": 1, "max": None},
"scan_report_hint": "",
"doc": "\n An interactive scan for one or more motors.\n\n Args:\n *args: devices\n exp_time: exposure time in s\n steps: number of steps (please note: 5 steps == 6 positions)\n relative: Start from an absolute or relative position\n burst: number of acquisition per point\n\n Returns:\n ScanReport\n\n Examples:\n >>> scans.open_interactive_scan(dev.motor1, dev.motor2, exp_time=0.1)\n\n ",
"signature": [
{
"name": "args",
"kind": "VAR_POSITIONAL",
"default": "_empty",
"annotation": "_empty",
},
{
"name": "kwargs",
"kind": "VAR_KEYWORD",
"default": "_empty",
"annotation": "_empty",
},
],
},
"open_scan_def": {
"class": "OpenScanDef",
"base_class": "RequestBase",
"arg_input": {},
"required_kwargs": [],
"arg_bundle_size": {"bundle": 0, "min": 0, "max": 0},
"scan_report_hint": None,
"doc": None,
"signature": [
{
"name": "args",
"kind": "VAR_POSITIONAL",
"default": "_empty",
"annotation": "_empty",
},
{
"name": "device_manager",
"kind": "KEYWORD_ONLY",
"default": None,
"annotation": "DeviceManagerBase",
},
{
"name": "monitored",
"kind": "KEYWORD_ONLY",
"default": None,
"annotation": "list",
},
{
"name": "parameter",
"kind": "KEYWORD_ONLY",
"default": None,
"annotation": "dict",
},
{"name": "metadata", "kind": "KEYWORD_ONLY", "default": None, "annotation": "dict"},
{
"name": "kwargs",
"kind": "VAR_KEYWORD",
"default": "_empty",
"annotation": "_empty",
},
],
},
"round_roi_scan": {
"class": "RoundROIScan",
"base_class": "ScanBase",
"arg_input": {
"motor_1": "device",
"motor_2": "device",
"width_1": "float",
"width_2": "float",
},
"required_kwargs": ["dr", "nth", "relative"],
"arg_bundle_size": {"bundle": 4, "min": 1, "max": 1},
"scan_report_hint": "table",
"doc": "\n A scan following a round-roi-like pattern.\n\n Args:\n *args: motor1, width for motor1, motor2, width for motor2,\n dr (float): shell width. Default is 1.\n nth (int): number of points in the first shell. Default is 5.\n exp_time (float): exposure time in seconds. Default is 0.\n relative (bool): Start from an absolute or relative position. Default is False.\n burst_at_each_point (int): number of acquisition per point. Default is 1.\n\n Returns:\n ScanReport\n\n Examples:\n >>> scans.round_roi_scan(dev.motor1, 20, dev.motor2, 20, dr=2, nth=3, exp_time=0.1, relative=True)\n\n ",
"signature": [
{
"name": "args",
"kind": "VAR_POSITIONAL",
"default": "_empty",
"annotation": "_empty",
},
{"name": "dr", "kind": "KEYWORD_ONLY", "default": 1, "annotation": "float"},
{"name": "nth", "kind": "KEYWORD_ONLY", "default": 5, "annotation": "int"},
{"name": "exp_time", "kind": "KEYWORD_ONLY", "default": 0, "annotation": "float"},
{
"name": "relative",
"kind": "KEYWORD_ONLY",
"default": False,
"annotation": "bool",
},
{
"name": "burst_at_each_point",
"kind": "KEYWORD_ONLY",
"default": 1,
"annotation": "int",
},
{
"name": "kwargs",
"kind": "VAR_KEYWORD",
"default": "_empty",
"annotation": "_empty",
},
],
},
"round_scan": {
"class": "RoundScan",
"base_class": "ScanBase",
"arg_input": {
"motor_1": "device",
"motor_2": "device",
"inner_ring": "float",
"outer_ring": "float",
"number_of_rings": "int",
"number_of_positions_in_first_ring": "int",
},
"required_kwargs": ["relative"],
"arg_bundle_size": {"bundle": 6, "min": 1, "max": 1},
"scan_report_hint": "table",
"doc": "\n A scan following a round shell-like pattern.\n\n Args:\n *args: motor1, motor2, inner ring, outer ring, number of rings, number of positions in the first ring\n relative (bool): if True, the motors will be moved relative to their current position. Default is False.\n burst_at_each_point (int): number of exposures at each point. Default is 1.\n\n Returns:\n ScanReport\n\n Examples:\n >>> scans.round_scan(dev.motor1, dev.motor2, 0, 25, 5, 3, exp_time=0.1, relative=True)\n\n ",
"signature": [
{
"name": "args",
"kind": "VAR_POSITIONAL",
"default": "_empty",
"annotation": "_empty",
},
{
"name": "relative",
"kind": "KEYWORD_ONLY",
"default": False,
"annotation": "bool",
},
{
"name": "burst_at_each_point",
"kind": "KEYWORD_ONLY",
"default": 1,
"annotation": "int",
},
{
"name": "kwargs",
"kind": "VAR_KEYWORD",
"default": "_empty",
"annotation": "_empty",
},
],
},
"round_scan_fly": {
"class": "RoundScanFlySim",
"base_class": "ScanBase",
"arg_input": {
"flyer": "device",
"inner_ring": "float",
"outer_ring": "float",
"number_of_rings": "int",
"number_of_positions_in_first_ring": "int",
},
"required_kwargs": ["relative"],
"arg_bundle_size": {"bundle": 5, "min": 1, "max": 1},
"scan_report_hint": "table",
"doc": "\n A fly scan following a round shell-like pattern.\n\n Args:\n *args: motor1, motor2, inner ring, outer ring, number of rings, number of positions in the first ring\n relative: Start from an absolute or relative position\n burst: number of acquisition per point\n\n Returns:\n ScanReport\n\n Examples:\n >>> scans.round_scan_fly(dev.flyer_sim, 0, 50, 5, 3, exp_time=0.1, relative=True)\n\n ",
"signature": [
{
"name": "args",
"kind": "VAR_POSITIONAL",
"default": "_empty",
"annotation": "_empty",
},
{
"name": "kwargs",
"kind": "VAR_KEYWORD",
"default": "_empty",
"annotation": "_empty",
},
],
},
"grid_scan": {
"class": "Scan",
"base_class": "ScanBase",
"arg_input": {"device": "device", "start": "float", "stop": "float", "steps": "int"},
"required_kwargs": ["relative"],
"arg_bundle_size": {"bundle": 4, "min": 2, "max": None},
"scan_report_hint": "table",
"doc": "\n Scan two motors in a grid.\n\n Args:\n *args (Device, float, float, int): pairs of device / start / stop / steps arguments\n exp_time (float): exposure time in seconds. Default is 0.\n settling_time (float): settling time in seconds. Default is 0.\n relative (bool): if True, the motors will be moved relative to their current position. Default is False.\n burst_at_each_point (int): number of exposures at each point. Default is 1.\n\n Returns:\n ScanReport\n\n Examples:\n >>> scans.grid_scan(dev.motor1, -5, 5, 10, dev.motor2, -5, 5, 10, exp_time=0.1, relative=True)\n\n ",
"signature": [
{
"name": "args",
"kind": "VAR_POSITIONAL",
"default": "_empty",
"annotation": "_empty",
},
{"name": "exp_time", "kind": "KEYWORD_ONLY", "default": 0, "annotation": "float"},
{
"name": "settling_time",
"kind": "KEYWORD_ONLY",
"default": 0,
"annotation": "float",
},
{
"name": "relative",
"kind": "KEYWORD_ONLY",
"default": False,
"annotation": "bool",
},
{
"name": "burst_at_each_point",
"kind": "KEYWORD_ONLY",
"default": 1,
"annotation": "int",
},
{
"name": "kwargs",
"kind": "VAR_KEYWORD",
"default": "_empty",
"annotation": "_empty",
},
],
},
"time_scan": {
"class": "TimeScan",
"base_class": "ScanBase",
"arg_input": {},
"required_kwargs": ["points", "interval"],
"arg_bundle_size": {"bundle": 0, "min": None, "max": None},
"scan_report_hint": "table",
"doc": '\n Trigger and readout devices at a fixed interval.\n Note that the interval time cannot be less than the exposure time.\n The effective "sleep" time between points is\n sleep_time = interval - exp_time\n\n Args:\n points: number of points\n interval: time interval between points\n exp_time: exposure time in s\n burst: number of acquisition per point\n\n Returns:\n ScanReport\n\n Examples:\n >>> scans.time_scan(points=10, interval=1.5, exp_time=0.1, relative=True)\n\n ',
"signature": [
{
"name": "points",
"kind": "POSITIONAL_OR_KEYWORD",
"default": "_empty",
"annotation": "int",
},
{
"name": "interval",
"kind": "POSITIONAL_OR_KEYWORD",
"default": "_empty",
"annotation": "float",
},
{
"name": "exp_time",
"kind": "POSITIONAL_OR_KEYWORD",
"default": 0,
"annotation": "float",
},
{
"name": "burst_at_each_point",
"kind": "POSITIONAL_OR_KEYWORD",
"default": 1,
"annotation": "int",
},
{
"name": "kwargs",
"kind": "VAR_KEYWORD",
"default": "_empty",
"annotation": "_empty",
},
],
},
"umv": {
"class": "UpdatedMove",
"base_class": "RequestBase",
"arg_input": {"device": "device", "target": "float"},
"required_kwargs": ["relative"],
"arg_bundle_size": {"bundle": 2, "min": 1, "max": None},
"scan_report_hint": "readback",
"doc": "\n Move device(s) to an absolute position and show live updates. This is a blocking call. For non-blocking use Move.\n Args:\n *args (Device, float): pairs of device / position arguments\n relative (bool): if True, move relative to current position\n\n Returns:\n ScanReport\n\n Examples:\n >>> scans.umv(dev.samx, 1, dev.samy,2)\n ",
"signature": [
{
"name": "args",
"kind": "VAR_POSITIONAL",
"default": "_empty",
"annotation": "_empty",
},
{
"name": "relative",
"kind": "KEYWORD_ONLY",
"default": False,
"annotation": "_empty",
},
{
"name": "kwargs",
"kind": "VAR_KEYWORD",
"default": "_empty",
"annotation": "_empty",
},
],
},
"lamni_fermat_scan": {
"class": "LamNIFermatScan",
"base_class": "ScanBase",
"arg_input": {},
"required_kwargs": ["fov_size", "exp_time", "step", "angle"],
"arg_bundle_size": {"bundle": 0, "min": None, "max": None},
"scan_report_hint": "table",
"doc": "\n A LamNI scan following Fermat's spiral.\n\n Kwargs:\n fov_size [um]: Fov in the piezo plane (i.e. piezo range). Max 80 um\n step [um]: stepsize\n shift_x/y [mm]: extra shift in x/y. The shift is directly applied to the scan. It will not be auto rotated. (default 0).\n center_x/center_y [mm]: center position in x/y at 0 deg. This shift is rotated\n using the geometry of LamNI\n It is determined by the first 'click' in the x-ray eye alignemnt procedure\n angle [deg]: rotation angle (will rotate first)\n scan_type: fly (i.e. HW triggered step in case of LamNI) or step\n stitch_x/y: shift scan to adjacent stitch region\n fov_circular [um]: generate a circular field of view in the sample plane. This is an additional cropping to fov_size.\n stitch_overlap [um]: overlap of the stitched regions\n Returns:\n\n Examples:\n >>> scans.lamni_fermat_scan(fov_size=[20], step=0.5, exp_time=0.1)\n >>> scans.lamni_fermat_scan(fov_size=[20, 25], center_x=0.02, center_y=0, shift_x=0, shift_y=0, angle=0, step=0.5, fov_circular=0, exp_time=0.1)\n ",
"signature": [
{
"name": "args",
"kind": "VAR_POSITIONAL",
"default": "_empty",
"annotation": "_empty",
},
{
"name": "parameter",
"kind": "KEYWORD_ONLY",
"default": None,
"annotation": "dict",
},
{
"name": "kwargs",
"kind": "VAR_KEYWORD",
"default": "_empty",
"annotation": "_empty",
},
],
},
"lamni_move_to_scan_center": {
"class": "LamNIMoveToScanCenter",
"base_class": "RequestBase",
"arg_input": {"shift_x": "float", "shift_y": "float", "angle": "float"},
"required_kwargs": [],
"arg_bundle_size": {"bundle": 3, "min": 1, "max": 1},
"scan_report_hint": None,
"doc": "\n Move LamNI to a new scan center.\n\n Args:\n *args: shift x, shift y, tomo angle in deg\n\n Examples:\n >>> scans.lamni_move_to_scan_center(1.2, 2.8, 12.5)\n ",
"signature": [
{
"name": "args",
"kind": "VAR_POSITIONAL",
"default": "_empty",
"annotation": "_empty",
},
{
"name": "parameter",
"kind": "KEYWORD_ONLY",
"default": None,
"annotation": "_empty",
},
{
"name": "kwargs",
"kind": "VAR_KEYWORD",
"default": "_empty",
"annotation": "_empty",
},
],
},
"owis_grid": {
"class": "OwisGrid",
"base_class": "FlyScanBase",
"arg_input": {},
"required_kwargs": [],
"arg_bundle_size": {"bundle": 0, "min": None, "max": None},
"scan_report_hint": "scan_progress",
"doc": "Owis-based grid scan.",
"signature": [
{
"name": "start_y",
"kind": "POSITIONAL_OR_KEYWORD",
"default": "_empty",
"annotation": "float",
},
{
"name": "end_y",
"kind": "POSITIONAL_OR_KEYWORD",
"default": "_empty",
"annotation": "float",
},
{
"name": "interval_y",
"kind": "POSITIONAL_OR_KEYWORD",
"default": "_empty",
"annotation": "int",
},
{
"name": "start_x",
"kind": "POSITIONAL_OR_KEYWORD",
"default": "_empty",
"annotation": "float",
},
{
"name": "end_x",
"kind": "POSITIONAL_OR_KEYWORD",
"default": "_empty",
"annotation": "float",
},
{
"name": "interval_x",
"kind": "POSITIONAL_OR_KEYWORD",
"default": "_empty",
"annotation": "int",
},
{
"name": "args",
"kind": "VAR_POSITIONAL",
"default": "_empty",
"annotation": "_empty",
},
{"name": "exp_time", "kind": "KEYWORD_ONLY", "default": 0.1, "annotation": "float"},
{
"name": "readout_time",
"kind": "KEYWORD_ONLY",
"default": 0.003,
"annotation": "float",
},
{
"name": "kwargs",
"kind": "VAR_KEYWORD",
"default": "_empty",
"annotation": "_empty",
},
],
},
"sgalil_grid": {
"class": "SgalilGrid",
"base_class": "FlyScanBase",
"arg_input": {},
"required_kwargs": [],
"arg_bundle_size": {"bundle": 0, "min": None, "max": None},
"scan_report_hint": "scan_progress",
"doc": "\n SGalil-based grid scan.\n\n Args:\n start_y (float): start position of y axis (fast axis)\n end_y (float): end position of y axis (fast axis)\n interval_y (int): number of points in y axis\n start_x (float): start position of x axis (slow axis)\n end_x (float): end position of x axis (slow axis)\n interval_x (int): number of points in x axis\n exp_time (float): exposure time in seconds. Default is 0.1s\n readout_time (float): readout time in seconds, minimum of 3e-3s (3ms)\n\n Exp:\n scans.sgalil_grid(start_y = val1, end_y= val1, interval_y = val1, start_x = val1, end_x = val1, interval_x = val1, exp_time = 0.02, readout_time = 3e-3)\n\n\n ",
"signature": [
{
"name": "start_y",
"kind": "POSITIONAL_OR_KEYWORD",
"default": "_empty",
"annotation": "float",
},
{
"name": "end_y",
"kind": "POSITIONAL_OR_KEYWORD",
"default": "_empty",
"annotation": "float",
},
{
"name": "interval_y",
"kind": "POSITIONAL_OR_KEYWORD",
"default": "_empty",
"annotation": "int",
},
{
"name": "start_x",
"kind": "POSITIONAL_OR_KEYWORD",
"default": "_empty",
"annotation": "float",
},
{
"name": "end_x",
"kind": "POSITIONAL_OR_KEYWORD",
"default": "_empty",
"annotation": "float",
},
{
"name": "interval_x",
"kind": "POSITIONAL_OR_KEYWORD",
"default": "_empty",
"annotation": "int",
},
{
"name": "args",
"kind": "VAR_POSITIONAL",
"default": "_empty",
"annotation": "_empty",
},
{"name": "exp_time", "kind": "KEYWORD_ONLY", "default": 0.1, "annotation": "float"},
{
"name": "readout_time",
"kind": "KEYWORD_ONLY",
"default": 0.1,
"annotation": "float",
},
{
"name": "kwargs",
"kind": "VAR_KEYWORD",
"default": "_empty",
"annotation": "_empty",
},
],
},
"hyst_scan": {
"class": "HystScan",
"base_class": "ScanBase",
"arg_input": {
"field_motor": "device",
"start_field": "float",
"end_field": "float",
"mono": "device",
"energy1": "float",
"energy2": "float",
},
"required_kwargs": [],
"arg_bundle_size": {"bundle": 3, "min": 1, "max": 1},
"scan_report_hint": "table",
"doc": "\n A hysteresis scan.\n\n scans.hyst_scan(field_motor, start_field, end_field, mono, energy1, energy2)\n\n Examples:\n >>> scans.hyst_scan(dev.field_x, 0, 0.5, dev.mono, 600, 640, ramp_rate=2)\n\n ",
"signature": [
{
"name": "args",
"kind": "VAR_POSITIONAL",
"default": "_empty",
"annotation": "_empty",
},
{
"name": "parameter",
"kind": "KEYWORD_ONLY",
"default": None,
"annotation": "dict",
},
{
"name": "kwargs",
"kind": "VAR_KEYWORD",
"default": "_empty",
"annotation": "_empty",
},
],
},
"otf_scan": {
"class": "OTFScan",
"base_class": "FlyScanBase",
"arg_input": {},
"required_kwargs": ["e1", "e2", "time"],
"arg_bundle_size": {"bundle": 0, "min": None, "max": None},
"scan_report_hint": "table",
"doc": "Scans the energy from e1 to e2 in <time> minutes.\n\n Examples:\n >>> scans.otf_scan(e1=700, e2=740, time=4)\n\n ",
"signature": [
{
"name": "args",
"kind": "VAR_POSITIONAL",
"default": "_empty",
"annotation": "_empty",
},
{
"name": "parameter",
"kind": "KEYWORD_ONLY",
"default": None,
"annotation": "dict",
},
{
"name": "kwargs",
"kind": "VAR_KEYWORD",
"default": "_empty",
"annotation": "_empty",
},
],
},
}
)

View File

@ -0,0 +1,63 @@
# pylint: disable=missing-function-docstring, missing-module-docstring, unused-import
import pytest
from .client_mocks import mocked_client
from .test_bec_figure import bec_figure
def test_init_plot_base(bec_figure):
plot_base = bec_figure.add_widget(widget_type="PlotBase", widget_id="test_plot")
assert plot_base is not None
assert plot_base.config.widget_class == "BECPlotBase"
assert plot_base.config.gui_id == "test_plot"
def test_plot_base_axes_by_separate_methods(bec_figure):
plot_base = bec_figure.add_widget(widget_type="PlotBase", widget_id="test_plot")
plot_base.set_title("Test Title")
plot_base.set_x_label("Test x Label")
plot_base.set_y_label("Test y Label")
plot_base.set_x_lim(1, 100)
plot_base.set_y_lim(5, 500)
plot_base.set_grid(True, True)
plot_base.set_x_scale("log")
plot_base.set_y_scale("log")
assert plot_base.plot_item.titleLabel.text == "Test Title"
assert plot_base.config.axis.title == "Test Title"
assert plot_base.plot_item.getAxis("bottom").labelText == "Test x Label"
assert plot_base.config.axis.x_label == "Test x Label"
assert plot_base.plot_item.getAxis("left").labelText == "Test y Label"
assert plot_base.config.axis.y_label == "Test y Label"
assert plot_base.config.axis.x_lim == (1, 100)
assert plot_base.config.axis.y_lim == (5, 500)
assert plot_base.plot_item.ctrl.xGridCheck.isChecked() == True
assert plot_base.plot_item.ctrl.yGridCheck.isChecked() == True
assert plot_base.plot_item.ctrl.logXCheck.isChecked() == True
assert plot_base.plot_item.ctrl.logYCheck.isChecked() == True
def test_plot_base_axes_added_by_kwargs(bec_figure):
plot_base = bec_figure.add_widget(widget_type="PlotBase", widget_id="test_plot")
plot_base.set(
title="Test Title",
x_label="Test x Label",
y_label="Test y Label",
x_lim=(1, 100),
y_lim=(5, 500),
x_scale="log",
y_scale="log",
)
assert plot_base.plot_item.titleLabel.text == "Test Title"
assert plot_base.config.axis.title == "Test Title"
assert plot_base.plot_item.getAxis("bottom").labelText == "Test x Label"
assert plot_base.config.axis.x_label == "Test x Label"
assert plot_base.plot_item.getAxis("left").labelText == "Test y Label"
assert plot_base.config.axis.y_label == "Test y Label"
assert plot_base.config.axis.x_lim == (1, 100)
assert plot_base.config.axis.y_lim == (5, 500)
assert plot_base.plot_item.ctrl.logXCheck.isChecked() == True
assert plot_base.plot_item.ctrl.logYCheck.isChecked() == True

View File

@ -0,0 +1,185 @@
# pylint: disable = no-name-in-module,missing-class-docstring, missing-module-docstring
from unittest.mock import MagicMock
import pytest
from qtpy.QtWidgets import QLineEdit
from bec_widgets.utils.widget_io import WidgetIO
from bec_widgets.widgets import ScanControl
from tests.unit_tests.test_msgs.available_scans_message import available_scans_message
class FakePositioner:
"""Fake minimal positioner class for testing."""
def __init__(self, name, enabled=True):
self.name = name
self.enabled = enabled
def __contains__(self, item):
return item == self.name
def get_mocked_device(device_name):
"""Helper function to mock the devices"""
if device_name == "samx":
return FakePositioner(name="samx", enabled=True)
@pytest.fixture(scope="function")
def mocked_client():
# Create a MagicMock object
client = MagicMock()
# Mock the producer.get method to return the packed message
client.producer.get.return_value = available_scans_message
# # Mock the device_manager.devices attribute to return a mock object for samx
client.device_manager.devices = MagicMock()
client.device_manager.devices.__contains__.side_effect = lambda x: x == "samx"
client.device_manager.devices.samx = get_mocked_device("samx")
return client
@pytest.fixture(scope="function")
def scan_control(qtbot, mocked_client): # , mock_dev):
widget = ScanControl(client=mocked_client)
# widget.dev.samx = MagicMock()
qtbot.addWidget(widget)
qtbot.waitExposed(widget)
yield widget
def test_populate_scans(scan_control, mocked_client):
# The comboBox should be populated with all scan from the message right after initialization
expected_scans = available_scans_message.resource.keys()
assert scan_control.comboBox_scan_selection.count() == len(expected_scans)
for scan in expected_scans: # Each scan should be in the comboBox
assert scan_control.comboBox_scan_selection.findText(scan) != -1
@pytest.mark.parametrize(
"scan_name", ["line_scan", "grid_scan"]
) # TODO now only for line_scan and grid_scan, later for all loaded scans
def test_on_scan_selected(scan_control, scan_name):
# Expected scan info from the message signature
expected_scan_info = available_scans_message.resource[scan_name]
# Select a scan from the comboBox
scan_control.comboBox_scan_selection.setCurrentText(scan_name)
# Check labels and widgets in args table
for index, (arg_key, arg_value) in enumerate(expected_scan_info["arg_input"].items()):
label = scan_control.args_table.horizontalHeaderItem(index)
assert label.text().lower() == arg_key # labes
for row in range(expected_scan_info["arg_bundle_size"]["min"]):
widget = scan_control.args_table.cellWidget(row, index)
assert widget is not None # Confirm that a widget exists
expected_widget_type = scan_control.WIDGET_HANDLER.get(arg_value, None)
assert isinstance(widget, expected_widget_type) # Confirm the widget type matches
# kwargs
kwargs_from_signature = [
param for param in expected_scan_info["signature"] if param["kind"] == "KEYWORD_ONLY"
]
# Check labels and widgets in kwargs grid layout
for index, kwarg_info in enumerate(kwargs_from_signature):
label_widget = scan_control.kwargs_layout.itemAtPosition(1, index).widget()
assert label_widget.text() == kwarg_info["name"].capitalize()
widget = scan_control.kwargs_layout.itemAtPosition(2, index).widget()
expected_widget_type = scan_control.WIDGET_HANDLER.get(kwarg_info["annotation"], QLineEdit)
assert isinstance(widget, expected_widget_type)
@pytest.mark.parametrize("scan_name", ["line_scan", "grid_scan"])
def test_add_remove_bundle(scan_control, scan_name):
# Expected scan info from the message signature
expected_scan_info = available_scans_message.resource[scan_name]
# Select a scan from the comboBox
scan_control.comboBox_scan_selection.setCurrentText(scan_name)
# Initial number of args row
initial_num_of_rows = scan_control.args_table.rowCount()
# Check initial row count of args table
assert scan_control.args_table.rowCount() == expected_scan_info["arg_bundle_size"]["min"]
# Try to remove default number of args row
scan_control.pushButton_remove_bundle.click()
assert scan_control.args_table.rowCount() == expected_scan_info["arg_bundle_size"]["min"]
# Try to add two bundles
scan_control.pushButton_add_bundle.click()
scan_control.pushButton_add_bundle.click()
# check the case where no max number of args are defined
# TODO do check also for the case where max number of args are defined
if expected_scan_info["arg_bundle_size"]["max"] is None:
assert scan_control.args_table.rowCount() == initial_num_of_rows + 2
# Remove one bundle
scan_control.pushButton_remove_bundle.click()
# check the case where no max number of args are defined
if expected_scan_info["arg_bundle_size"]["max"] is None:
assert scan_control.args_table.rowCount() == initial_num_of_rows + 1
def test_run_line_scan_with_parameters(scan_control, mocked_client):
scan_name = "line_scan"
kwargs = {"exp_time": 0.1, "steps": 10, "relative": True, "burst_at_each_point": 1}
args = {"device": "samx", "start": -5, "stop": 5}
# Select a scan from the comboBox
scan_control.comboBox_scan_selection.setCurrentText(scan_name)
# Set kwargs in the UI
for label_index in range(
scan_control.kwargs_layout.rowCount() + 1
): # from some reason rowCount() returns 1 less than the actual number of rows
label_item = scan_control.kwargs_layout.itemAtPosition(1, label_index)
if label_item:
label_widget = label_item.widget()
kwarg_key = WidgetIO.get_value(label_widget).lower()
if kwarg_key in kwargs:
widget_item = scan_control.kwargs_layout.itemAtPosition(2, label_index)
if widget_item:
widget = widget_item.widget()
WidgetIO.set_value(widget, kwargs[kwarg_key])
# Set args in the UI
for col_index in range(scan_control.args_table.columnCount()):
header_item = scan_control.args_table.horizontalHeaderItem(col_index)
if header_item:
arg_key = header_item.text().lower()
if arg_key in args:
for row_index in range(scan_control.args_table.rowCount()):
widget = scan_control.args_table.cellWidget(row_index, col_index)
WidgetIO.set_value(widget, args[arg_key])
# Mock the scan function
mocked_scan_function = MagicMock()
setattr(mocked_client.scans, scan_name, mocked_scan_function)
# Run the scan
scan_control.button_run_scan.click()
# Retrieve the actual arguments passed to the mock
called_args, called_kwargs = mocked_scan_function.call_args
# Check if the scan function was called correctly
expected_device = (
mocked_client.device_manager.devices.samx
) # This is the FakePositioner instance
expected_args_list = [expected_device, args["start"], args["stop"]]
assert called_args == tuple(
expected_args_list
), "The positional arguments passed to the scan function do not match expected values."
assert (
called_kwargs == kwargs
), "The keyword arguments passed to the scan function do not match expected values."

View File

@ -0,0 +1,167 @@
# pylint: disable = no-name-in-module,missing-class-docstring, missing-module-docstring
import threading
from unittest import mock
import numpy as np
import pytest
from bec_lib import RedisConnector, messages
from pytestqt import qtbot
from bec_widgets.examples.stream_plot.stream_plot import StreamPlot
@pytest.fixture(scope="function")
def stream_app(qtbot):
"""Helper function to set up the StreamPlot widget."""
client = mock.MagicMock()
widget = StreamPlot(client=client)
qtbot.addWidget(widget)
qtbot.waitExposed(widget)
yield widget
widget.close()
def test_roi_signals_emitted(qtbot, stream_app):
region = (0.1, 0.9)
with qtbot.waitSignal(stream_app.roi_signal, timeout=1000) as blocker:
stream_app.roi_signal.emit(region)
assert blocker.signal_triggered
assert blocker.args == [region]
def test_update_signals_emitted(qtbot, stream_app):
# Mimic data coming from the data stream
stream_app.plotter_data_x = [list(range(10))] # Replace with the actual x data
stream_app.plotter_data_y = [list(range(10))] # Replace with the actual y data
# Initialize curves
stream_app.init_curves()
with qtbot.waitSignal(stream_app.update_signal, timeout=1000) as blocker:
stream_app.update_signal.emit()
assert blocker.signal_triggered
def test_ui_initialization(qtbot, stream_app):
"""Checking the UI creation."""
# Check if UI elements are initialized correctly
assert stream_app.label_plot is not None
assert stream_app.label_plot_moved is not None
assert stream_app.label_plot_clicked is not None
assert stream_app.label_image_moved is not None
assert stream_app.label_image_clicked is not None
# Check if plots are initialized correctly
assert stream_app.plot is not None
assert stream_app.plot_image is not None
# Check if ROI selector is initialized correctly
assert stream_app.roi_selector is not None
def test_1d_plotting_data(qtbot, stream_app):
# Set up some mock data
x_data = [list(range(10))]
y_data = [list(range(10))]
# Manually set the data attributes
stream_app.plotter_data_x = x_data
stream_app.plotter_data_y = y_data
stream_app.y_value_list = ["Curve 1"]
# Initialize curves and update the plot
stream_app.init_curves()
stream_app.update() # This should update the plot with the new data
# Check the data on the plot
for idx, curve in enumerate(stream_app.curves):
np.testing.assert_array_equal(curve.xData, x_data[0]) # Access the first list of x_data
np.testing.assert_array_equal(
curve.yData, y_data[idx]
) # Access the list of y_data for each curve without additional indexing
def test_flip_even_rows(qtbot, stream_app):
# Create a numpy array with some known data
original_array = np.array(
[
[1, 2, 3, 4, 5],
[6, 7, 8, 9, 10],
[11, 12, 13, 14, 15],
[16, 17, 18, 19, 20],
]
)
# Call flip_even_rows on the original array
flipped_array = stream_app.flip_even_rows(original_array)
# Expected array flipped along the rows with even indices
expected_array = np.array(
[
[1, 2, 3, 4, 5],
[10, 9, 8, 7, 6],
[11, 12, 13, 14, 15],
[20, 19, 18, 17, 16],
]
)
# Check that flip_even_rows returned the expected result
np.testing.assert_array_equal(flipped_array, expected_array)
def test_on_dap_update(qtbot, stream_app):
"""2D image rendering by dap update"""
# Create some mock data to be "received" by the slot
data_dict = {"data": {"z": np.random.rand(10, 10)}}
metadata_dict = {}
# Trigger the slot
stream_app.on_dap_update(data_dict, metadata_dict)
# Apply the same transformation to the test data
expected_data = stream_app.flip_even_rows(data_dict["data"]["z"])
# Now check the state of the StreamPlot object
# For example, check the data of the image plot:
np.testing.assert_array_equal(stream_app.img.image, expected_data)
####################
# Until Here
####################
# def test_new_proj(qtbot, stream_app): #TODO this test is not working, does it make sense testing even?
# # Create some mock content to be "received" by the slot
# content_dict = {"signals": {"proj_nr": 1}}
# metadata_dict = {}
#
# # Manually create some mock data that new_proj would work with
# # This step may need to be adjusted to fit the actual behavior of new_proj
# mock_data = {
# "q": np.array([1, 2, 3, 4, 5]),
# "norm_sum": np.array([6, 7, 8, 9, 10]),
# "metadata": "some_metadata",
# }
#
# # Assume the RedisConnector client would return this data when new_proj is called
# mock_message = mock.MagicMock(spec=messages.DeviceMessage)
# mock_message.__getitem__.side_effect = lambda key: mock_data[key]
# stream_app.client.producer.get = mock.MagicMock(return_value=mock_message.dumps())
#
# # Trigger the slot
# stream_app.new_proj(content_dict, metadata_dict)
#
# # Now check the state of the StreamPlot object
# # For example, check that the plotter_data_x attribute was updated correctly:
# np.testing.assert_array_equal(stream_app.plotter_data_x, [mock_data["q"]])
# assert stream_app._current_proj == 1
# assert stream_app._current_q == mock_data["q"]
# assert stream_app._current_norm == mock_data["norm_sum"]
# assert stream_app._current_metadata == mock_data["metadata"]
# def test_connection_creation(qtbot, stream_app): #TODO maybe test connections in a different way?
# assert isinstance(stream_app.producer, RedisConnector)
# assert isinstance(stream_app.data_retriever, threading.Thread)
# assert stream_app.data_retriever.is_alive()

View File

@ -0,0 +1,110 @@
# pylint: disable = no-name-in-module,missing-class-docstring, missing-module-docstring
import pytest
from pydantic import ValidationError
from bec_widgets.validation.monitor_config_validator import (
AxisSignal,
MonitorConfigValidator,
PlotConfig,
Signal,
)
from .test_bec_monitor import mocked_client
@pytest.fixture(scope="function")
def setup_devices(mocked_client):
MonitorConfigValidator.devices = mocked_client.device_manager.devices
def test_signal_validation_name_missing(setup_devices):
with pytest.raises(ValidationError) as excinfo:
Signal(name=None)
errors = excinfo.value.errors()
assert len(errors) == 1
assert errors[0]["type"] == "no_device_name"
assert "Device name must be provided" in str(excinfo.value)
def test_signal_validation_name_not_in_bec(setup_devices):
with pytest.raises(ValidationError) as excinfo:
Signal(name="non_existent_device")
errors = excinfo.value.errors()
assert len(errors) == 1
assert errors[0]["type"] == "no_device_bec"
assert 'Device "non_existent_device" not found in current BEC session' in str(excinfo.value)
def test_signal_validation_entry_not_in_device(setup_devices):
with pytest.raises(ValidationError) as excinfo:
Signal(name="samx", entry="non_existent_entry")
errors = excinfo.value.errors()
assert len(errors) == 1
assert errors[0]["type"] == "no_entry_for_device"
assert 'Entry "non_existent_entry" not found in device "samx" signals' in errors[0]["msg"]
def test_signal_validation_success(setup_devices):
signal = Signal(name="samx")
assert signal.name == "samx"
def test_plot_config_x_axis_signal_validation(setup_devices):
# Setup a valid signal
valid_signal = Signal(name="samx")
with pytest.raises(ValidationError) as excinfo:
AxisSignal(x=[valid_signal, valid_signal], y=[valid_signal, valid_signal])
errors = excinfo.value.errors()
assert len(errors) == 1
assert errors[0]["type"] == "x_axis_multiple_signals"
assert "There must be exactly one signal for x axis" in errors[0]["msg"]
def test_plot_config_unsupported_source_type(setup_devices):
with pytest.raises(ValidationError) as excinfo:
PlotConfig(sources=[{"type": "unsupported_type", "signals": {}}])
errors = excinfo.value.errors()
print(errors)
assert len(errors) == 1
assert errors[0]["type"] == "literal_error"
def test_plot_config_no_source_type_provided(setup_devices):
with pytest.raises(ValidationError) as excinfo:
PlotConfig(sources=[{"signals": {}}])
errors = excinfo.value.errors()
assert len(errors) == 1
assert errors[0]["type"] == "missing"
def test_plot_config_history_source_type(setup_devices):
history_source = {
"type": "history",
"scan_id": "valid_scan_id",
"signals": {"x": [{"name": "samx"}], "y": [{"name": "samx"}]},
}
plot_config = PlotConfig(sources=[history_source])
assert len(plot_config.sources) == 1
assert plot_config.sources[0].type == "history"
assert plot_config.sources[0].scan_id == "valid_scan_id"
def test_plot_config_redis_source_type(setup_devices):
history_source = {
"type": "redis",
"endpoint": "valid_endpoint",
"update": "append",
"signals": {"x": [{"name": "samx"}], "y": [{"name": "samx"}]},
}
plot_config = PlotConfig(sources=[history_source])
assert len(plot_config.sources) == 1
assert plot_config.sources[0].type == "redis"

View File

@ -0,0 +1,477 @@
# pylint: disable=missing-function-docstring, missing-module-docstring, unused-import
from unittest.mock import MagicMock
import numpy as np
import pytest
from bec_widgets.widgets.plots.waveform import CurveConfig, Signal, SignalData
from .client_mocks import mocked_client
from .test_bec_figure import bec_figure
def test_adding_curve_to_waveform(bec_figure):
w1 = bec_figure.add_plot()
# adding curve which is in bec - only names
c1 = w1.add_curve_scan(x_name="samx", y_name="bpm4i")
assert c1.config.label == "bpm4i-bpm4i"
# adding curve which is in bec - names and entry
c2 = w1.add_curve_scan(x_name="samx", x_entry="samx", y_name="bpm3a", y_entry="bpm3a")
assert c2.config.label == "bpm3a-bpm3a"
# adding curve which is not in bec
with pytest.raises(ValueError) as excinfo:
w1.add_curve_scan(x_name="non_existent_device", y_name="non_existent_device")
assert "Device 'non_existent_device' not found in current BEC session" in str(excinfo.value)
# adding wrong entry for samx
with pytest.raises(ValueError) as excinfo:
w1.add_curve_scan(
x_name="samx", x_entry="non_existent_entry", y_name="bpm3a", y_entry="bpm3a"
)
assert "Entry 'non_existent_entry' not found in device 'samx' signals" in str(excinfo.value)
# adding wrong device with validation switched off
c3 = w1.add_curve_scan(x_name="samx", y_name="non_existent_device", validate_bec=False)
assert c3.config.label == "non_existent_device-non_existent_device"
def test_adding_curve_with_same_id(bec_figure):
w1 = bec_figure.add_plot()
c1 = w1.add_curve_scan(x_name="samx", y_name="bpm4i", gui_id="test_curve")
with pytest.raises(ValueError) as excinfo:
w1.add_curve_scan(x_name="samx", y_name="bpm4i", gui_id="test_curve")
assert "Curve with ID 'test_curve' already exists." in str(excinfo.value)
def test_create_waveform1D_by_config(bec_figure):
w1_config_input = {
"widget_class": "BECWaveform",
"gui_id": "widget_1",
"parent_id": "BECFigure_1708689320.788527",
"row": 0,
"col": 0,
"axis": {
"title": "Widget 1",
"x_label": None,
"y_label": None,
"x_scale": "linear",
"y_scale": "linear",
"x_lim": (1, 10),
"y_lim": None,
"x_grid": False,
"y_grid": False,
},
"color_palette": "plasma",
"curves": {
"bpm4i-bpm4i": {
"widget_class": "BECCurve",
"gui_id": "BECCurve_1708689321.226847",
"parent_id": "widget_1",
"label": "bpm4i-bpm4i",
"color": "#cc4778",
"colormap": "plasma",
"symbol": "o",
"symbol_color": None,
"symbol_size": 5,
"pen_width": 2,
"pen_style": "dash",
"source": "scan_segment",
"signals": {
"source": "scan_segment",
"x": {
"name": "samx",
"entry": "samx",
"unit": None,
"modifier": None,
"limits": None,
},
"y": {
"name": "bpm4i",
"entry": "bpm4i",
"unit": None,
"modifier": None,
"limits": None,
},
"z": None,
},
},
"curve-custom": {
"widget_class": "BECCurve",
"gui_id": "BECCurve_1708689321.22867",
"parent_id": "widget_1",
"label": "curve-custom",
"color": "blue",
"colormap": "plasma",
"symbol": "o",
"symbol_color": None,
"symbol_size": 5,
"pen_width": 2,
"pen_style": "dashdot",
"source": "custom",
"signals": None,
},
},
}
w1 = bec_figure.add_plot(config=w1_config_input)
w1_config_output = w1.get_config()
assert w1_config_input == w1_config_output
assert w1.plot_item.titleLabel.text == "Widget 1"
assert w1.config.axis.title == "Widget 1"
def test_change_gui_id(bec_figure):
w1 = bec_figure.add_plot()
c1 = w1.add_curve_scan(x_name="samx", y_name="bpm4i")
w1.change_gui_id("new_id")
assert w1.config.gui_id == "new_id"
assert c1.config.parent_id == "new_id"
def test_getting_curve(bec_figure):
w1 = bec_figure.add_plot()
c1 = w1.add_curve_scan(x_name="samx", y_name="bpm4i", gui_id="test_curve")
c1_expected_config = CurveConfig(
widget_class="BECCurve",
gui_id="test_curve",
parent_id="widget_1",
label="bpm4i-bpm4i",
color="#cc4778",
symbol="o",
symbol_color=None,
symbol_size=5,
pen_width=2,
pen_style="solid",
source="scan_segment",
signals=Signal(
source="scan_segment",
x=SignalData(name="samx", entry="samx", unit=None, modifier=None),
y=SignalData(name="bpm4i", entry="bpm4i", unit=None, modifier=None),
),
)
assert w1.curves[0].config == c1_expected_config
assert w1._curves_data["scan_segment"]["bpm4i-bpm4i"].config == c1_expected_config
assert w1.get_curve(0).config == c1_expected_config
assert w1.get_curve("bpm4i-bpm4i").config == c1_expected_config
assert c1.get_config(False) == c1_expected_config
assert c1.get_config() == c1_expected_config.model_dump()
def test_getting_curve_errors(bec_figure):
w1 = bec_figure.add_plot()
c1 = w1.add_curve_scan(x_name="samx", y_name="bpm4i", gui_id="test_curve")
with pytest.raises(ValueError) as excinfo:
w1.get_curve("non_existent_curve")
assert "Curve with ID 'non_existent_curve' not found." in str(excinfo.value)
with pytest.raises(IndexError) as excinfo:
w1.get_curve(1)
assert "list index out of range" in str(excinfo.value)
with pytest.raises(ValueError) as excinfo:
w1.get_curve(1.2)
assert "Identifier must be either an integer (index) or a string (curve_id)." in str(
excinfo.value
)
def test_add_curve(bec_figure):
w1 = bec_figure.add_plot()
c1 = w1.add_curve_scan(x_name="samx", y_name="bpm4i")
assert len(w1.curves) == 1
assert w1._curves_data["scan_segment"] == {"bpm4i-bpm4i": c1}
assert c1.config.label == "bpm4i-bpm4i"
assert c1.config.source == "scan_segment"
def test_remove_curve(bec_figure):
w1 = bec_figure.add_plot()
w1.add_curve_scan(x_name="samx", y_name="bpm4i")
w1.add_curve_scan(x_name="samx", y_name="bpm3a")
w1.remove_curve(0)
w1.remove_curve("bpm3a-bpm3a")
assert len(w1.plot_item.curves) == 0
assert w1._curves_data["scan_segment"] == {}
with pytest.raises(ValueError) as excinfo:
w1.remove_curve(1.2)
assert "Each identifier must be either an integer (index) or a string (curve_id)." in str(
excinfo.value
)
def test_change_curve_appearance_methods(bec_figure, qtbot):
w1 = bec_figure.add_plot()
c1 = w1.add_curve_scan(x_name="samx", y_name="bpm4i")
c1.set_color("#0000ff")
c1.set_symbol("x")
c1.set_symbol_color("#ff0000")
c1.set_symbol_size(10)
c1.set_pen_width(3)
c1.set_pen_style("dashdot")
qtbot.wait(500)
assert c1.config.color == "#0000ff"
assert c1.config.symbol == "x"
assert c1.config.symbol_color == "#ff0000"
assert c1.config.symbol_size == 10
assert c1.config.pen_width == 3
assert c1.config.pen_style == "dashdot"
assert c1.config.source == "scan_segment"
assert c1.config.signals.model_dump() == {
"source": "scan_segment",
"x": {"name": "samx", "entry": "samx", "unit": None, "modifier": None, "limits": None},
"y": {"name": "bpm4i", "entry": "bpm4i", "unit": None, "modifier": None, "limits": None},
"z": None,
}
def test_change_curve_appearance_args(bec_figure):
w1 = bec_figure.add_plot()
c1 = w1.add_curve_scan(x_name="samx", y_name="bpm4i")
c1.set(
color="#0000ff",
symbol="x",
symbol_color="#ff0000",
symbol_size=10,
pen_width=3,
pen_style="dashdot",
)
assert c1.config.color == "#0000ff"
assert c1.config.symbol == "x"
assert c1.config.symbol_color == "#ff0000"
assert c1.config.symbol_size == 10
assert c1.config.pen_width == 3
assert c1.config.pen_style == "dashdot"
assert c1.config.source == "scan_segment"
assert c1.config.signals.model_dump() == {
"source": "scan_segment",
"x": {"name": "samx", "entry": "samx", "unit": None, "modifier": None, "limits": None},
"y": {"name": "bpm4i", "entry": "bpm4i", "unit": None, "modifier": None, "limits": None},
"z": None,
}
def test_set_custom_curve_data(bec_figure, qtbot):
w1 = bec_figure.add_plot()
c1 = w1.add_curve_custom(
x=[1, 2, 3],
y=[4, 5, 6],
label="custom_curve",
color="#0000ff",
symbol="x",
symbol_color="#ff0000",
symbol_size=10,
pen_width=3,
pen_style="dashdot",
)
x_init, y_init = c1.get_data()
assert np.array_equal(x_init, [1, 2, 3])
assert np.array_equal(y_init, [4, 5, 6])
assert c1.config.label == "custom_curve"
assert c1.config.color == "#0000ff"
assert c1.config.symbol == "x"
assert c1.config.symbol_color == "#ff0000"
assert c1.config.symbol_size == 10
assert c1.config.pen_width == 3
assert c1.config.pen_style == "dashdot"
assert c1.config.source == "custom"
assert c1.config.signals == None
c1.set_data(x=[4, 5, 6], y=[7, 8, 9])
x_new, y_new = c1.get_data()
assert np.array_equal(x_new, [4, 5, 6])
assert np.array_equal(y_new, [7, 8, 9])
def test_get_all_data(bec_figure):
w1 = bec_figure.add_plot()
c1 = w1.add_curve_custom(
x=[1, 2, 3],
y=[4, 5, 6],
label="custom_curve-1",
color="#0000ff",
symbol="x",
symbol_color="#ff0000",
symbol_size=10,
pen_width=3,
pen_style="dashdot",
)
c2 = w1.add_curve_custom(
x=[4, 5, 6],
y=[7, 8, 9],
label="custom_curve-2",
color="#00ff00",
symbol="o",
symbol_color="#00ff00",
symbol_size=20,
pen_width=4,
pen_style="dash",
)
all_data = w1.get_all_data()
assert all_data == {
"custom_curve-1": {"x": [1, 2, 3], "y": [4, 5, 6]},
"custom_curve-2": {"x": [4, 5, 6], "y": [7, 8, 9]},
}
def test_curve_add_by_config(bec_figure):
w1 = bec_figure.add_plot()
c1_config_input = {
"widget_class": "BECCurve",
"gui_id": "BECCurve_1708689321.226847",
"parent_id": "widget_1",
"label": "bpm4i-bpm4i",
"color": "#cc4778",
"colormap": "plasma",
"symbol": "o",
"symbol_color": None,
"symbol_size": 5,
"pen_width": 2,
"pen_style": "dash",
"source": "scan_segment",
"signals": {
"source": "scan_segment",
"x": {"name": "samx", "entry": "samx", "unit": None, "modifier": None, "limits": None},
"y": {
"name": "bpm4i",
"entry": "bpm4i",
"unit": None,
"modifier": None,
"limits": None,
},
"z": None,
},
}
c1 = w1.add_curve_by_config(c1_config_input)
c1_config_dict = c1.get_config()
assert c1_config_dict == c1_config_input
assert c1.config == CurveConfig(**c1_config_input)
assert c1.get_config(False) == CurveConfig(**c1_config_input)
def test_scan_update(bec_figure, qtbot):
w1 = bec_figure.add_plot()
c1 = w1.add_curve_scan(x_name="samx", y_name="bpm4i")
msg_waveform = {
"data": {
"samx": {"samx": {"value": 10}},
"bpm4i": {"bpm4i": {"value": 5}},
"gauss_bpm": {"gauss_bpm": {"value": 6}},
"gauss_adc1": {"gauss_adc1": {"value": 8}},
"gauss_adc2": {"gauss_adc2": {"value": 9}},
},
"scan_id": 1,
}
# Mock scan_storage.find_scan_by_ID
mock_scan_data_waveform = MagicMock()
mock_scan_data_waveform.data = {
device_name: {
entry: MagicMock(val=[msg_waveform["data"][device_name][entry]["value"]])
for entry in msg_waveform["data"][device_name]
}
for device_name in msg_waveform["data"]
}
metadata_waveform = {"scan_name": "line_scan"}
w1.queue.scan_storage.find_scan_by_ID.return_value = mock_scan_data_waveform
w1.on_scan_segment(msg_waveform, metadata_waveform)
qtbot.wait(500)
assert c1.get_data() == ([10], [5])
def test_scan_history_with_val_access(bec_figure, qtbot):
w1 = bec_figure.add_plot()
c1 = w1.add_curve_scan(x_name="samx", y_name="bpm4i")
mock_scan_data = {
"samx": {"samx": MagicMock(val=np.array([1, 2, 3]))}, # Use MagicMock for .val
"bpm4i": {"bpm4i": MagicMock(val=np.array([4, 5, 6]))}, # Use MagicMock for .val
}
mock_scan_storage = MagicMock()
mock_scan_storage.find_scan_by_ID.return_value = MagicMock(data=mock_scan_data)
w1.queue.scan_storage = mock_scan_storage
fake_scan_id = "fake_scan_id"
w1.scan_history(scan_id=fake_scan_id)
qtbot.wait(500)
x_data, y_data = c1.get_data()
assert np.array_equal(x_data, [1, 2, 3])
assert np.array_equal(y_data, [4, 5, 6])
def test_scatter_2d_update(bec_figure, qtbot):
w1 = bec_figure.add_plot()
c1 = w1.add_curve_scan(x_name="samx", y_name="samx", z_name="bpm4i")
msg = {
"data": {
"samx": {"samx": {"value": [1, 2, 3]}},
"samy": {"samy": {"value": [4, 5, 6]}},
"bpm4i": {"bpm4i": {"value": [1, 3, 2]}},
},
"scan_id": 1,
}
msg_metadata = {"scan_name": "line_scan"}
mock_scan_data = MagicMock()
mock_scan_data.data = {
device_name: {
entry: MagicMock(val=msg["data"][device_name][entry]["value"])
for entry in msg["data"][device_name]
}
for device_name in msg["data"]
}
w1.queue.scan_storage.find_scan_by_ID.return_value = mock_scan_data
w1.on_scan_segment(msg, msg_metadata)
qtbot.wait(500)
data = c1.get_data()
expected_x_y_data = ([1, 2, 3], [1, 2, 3])
expected_z_colors = w1._make_z_gradient([1, 3, 2], "plasma")
scatter_points = c1.scatter.points()
colors = [point.brush().color() for point in scatter_points]
assert np.array_equal(data, expected_x_y_data)
assert colors == expected_z_colors

View File

@ -0,0 +1,90 @@
# pylint: disable = no-name-in-module,missing-class-docstring, missing-module-docstring
import pytest
from qtpy.QtWidgets import QComboBox, QLineEdit, QSpinBox, QTableWidget, QVBoxLayout, QWidget
from bec_widgets.utils.widget_io import WidgetHierarchy
@pytest.fixture(scope="function")
def example_widget(qtbot):
# Create a widget with a few child widgets
main_widget = QWidget()
layout = QVBoxLayout(main_widget)
line_edit = QLineEdit(main_widget)
combo_box = QComboBox(main_widget)
table_widget = QTableWidget(2, 2, main_widget)
spin_box = QSpinBox(main_widget)
layout.addWidget(line_edit)
layout.addWidget(combo_box)
layout.addWidget(table_widget)
layout.addWidget(spin_box)
# Add text items to the combo box
combo_box.addItems(["Option 1", "Option 2", "Option 3"])
qtbot.addWidget(main_widget)
qtbot.waitExposed(main_widget)
yield main_widget
def test_export_import_config(example_widget):
initial_config = {
"QWidget ()": {
"QLineEdit ()": {"value": "New Text"},
"QComboBox ()": {"value": 1},
"QTableWidget ()": {"value": [["a", "b"], ["c", "d"]]},
"QSpinBox ()": {"value": 10},
}
}
WidgetHierarchy.import_config_from_dict(example_widget, initial_config, set_values=True)
exported_config_full = WidgetHierarchy.export_config_to_dict(example_widget, grab_values=True)
exported_config_reduced = WidgetHierarchy.export_config_to_dict(
example_widget, grab_values=True, save_all=False
)
expected_full = {
"QWidget ()": {
"QVBoxLayout ()": {},
"QLineEdit ()": {"value": "New Text", "QObject ()": {}},
"QComboBox ()": {"value": 1, "QStandardItemModel ()": {}},
"QTableWidget ()": {
"value": [["a", "b"], ["c", "d"]],
"QWidget (qt_scrollarea_viewport)": {},
"QStyledItemDelegate ()": {},
"QHeaderView ()": {
"QWidget (qt_scrollarea_viewport)": {},
"QWidget (qt_scrollarea_hcontainer)": {
"QScrollBar ()": {},
"QBoxLayout ()": {},
},
"QWidget (qt_scrollarea_vcontainer)": {
"QScrollBar ()": {},
"QBoxLayout ()": {},
},
"QItemSelectionModel ()": {},
},
"QAbstractButton ()": {},
"QAbstractTableModel ()": {},
"QItemSelectionModel ()": {},
"QWidget (qt_scrollarea_hcontainer)": {"QScrollBar ()": {}, "QBoxLayout ()": {}},
"QWidget (qt_scrollarea_vcontainer)": {"QScrollBar ()": {}, "QBoxLayout ()": {}},
},
"QSpinBox ()": {
"value": 10,
"QLineEdit (qt_spinbox_lineedit)": {"value": "10", "QObject ()": {}},
"QValidator (qt_spinboxvalidator)": {},
},
}
}
expected_reduced = {
"QWidget ()": {
"QLineEdit ()": {"value": "New Text"},
"QComboBox ()": {"value": 1},
"QTableWidget ()": {"value": [["a", "b"], ["c", "d"]]},
"QSpinBox ()": {"value": 10, "QLineEdit (qt_spinbox_lineedit)": {"value": "10"}},
}
}
assert exported_config_full == expected_full
assert exported_config_reduced == expected_reduced

View File

@ -0,0 +1,163 @@
# pylint: disable = no-name-in-module,missing-class-docstring, missing-module-docstring
import os
import tempfile
from unittest.mock import patch
import pytest
import yaml
from qtpy.QtWidgets import QPushButton, QVBoxLayout, QWidget
from bec_widgets.utils.yaml_dialog import load_yaml, save_yaml
@pytest.fixture(scope="function")
def example_widget(qtbot):
main_widget = QWidget()
layout = QVBoxLayout(main_widget)
main_widget.import_button = QPushButton("Import", main_widget)
main_widget.export_button = QPushButton("Export", main_widget)
layout.addWidget(main_widget.import_button)
layout.addWidget(main_widget.export_button)
main_widget.config = {} # Dictionary to store the loaded configuration
main_widget.saved_config = None # To store the saved configuration
qtbot.addWidget(main_widget)
qtbot.waitExposed(main_widget)
yield main_widget
def test_load_yaml(qtbot, example_widget):
# Create a temporary file with YAML content
with tempfile.NamedTemporaryFile(delete=False, suffix=".yaml") as temp_file:
temp_file.write(b"name: test\nvalue: 42")
def load_yaml_wrapper():
config = load_yaml(example_widget)
if config:
example_widget.config.update(config)
example_widget.import_button.clicked.connect(load_yaml_wrapper)
# Mock user selecting the file in the dialog
with patch("qtpy.QtWidgets.QFileDialog.getOpenFileName", return_value=(temp_file.name, "")):
example_widget.import_button.click()
assert example_widget.config == {"name": "test", "value": 42}
os.remove(temp_file.name) # Clean up
def test_load_yaml_file_not_found(qtbot, example_widget, capsys):
def load_yaml_wrapper():
config = load_yaml(example_widget)
if config:
example_widget.config.update(config)
example_widget.import_button.clicked.connect(load_yaml_wrapper)
# Mock user selecting a non-existent file in the dialog
with patch(
"qtpy.QtWidgets.QFileDialog.getOpenFileName", return_value=("non_existent_file.yaml", "")
):
example_widget.import_button.click()
# Catch the print output
captured = capsys.readouterr()
assert "The file non_existent_file.yaml was not found." in captured.out
assert example_widget.config == {} # No update should happen
def test_load_yaml_general_exception(qtbot, example_widget, capsys, monkeypatch):
# Mock the open function to raise a general exception
def mock_open(*args, **kwargs):
raise Exception("General error")
monkeypatch.setattr("builtins.open", mock_open)
def load_yaml_wrapper():
config = load_yaml(example_widget)
if config:
example_widget.config.update(config)
example_widget.import_button.clicked.connect(load_yaml_wrapper)
# Mock user selecting a file in the dialog
with patch("qtpy.QtWidgets.QFileDialog.getOpenFileName", return_value=("somefile.yaml", "")):
example_widget.import_button.click()
assert example_widget.config == {}
def test_load_yaml_permission_error(qtbot, example_widget, monkeypatch, capsys):
# Create a temporary file and remove read permissions
with tempfile.NamedTemporaryFile(delete=False, suffix=".yaml") as temp_file:
temp_file_path = temp_file.name
os.chmod(temp_file_path, 0o000) # Remove permissions
def load_yaml_wrapper():
config = load_yaml(example_widget)
if config:
example_widget.config.update(config)
example_widget.import_button.clicked.connect(load_yaml_wrapper)
# Mock user selecting the file in the dialog
with patch("qtpy.QtWidgets.QFileDialog.getOpenFileName", return_value=(temp_file_path, "")):
example_widget.import_button.click()
# # Catch the print output
# captured = capsys.readouterr()
# assert "Permission denied for file" in captured.out
assert example_widget.config == {} # No update should happen
os.remove(temp_file_path) # Clean up
def test_load_yaml_invalid_yaml(qtbot, example_widget, capsys):
# Create a temporary file with invalid YAML content
with tempfile.NamedTemporaryFile(delete=False, suffix=".yaml") as temp_file:
temp_file.write(b"\tinvalid_yaml: [unbalanced_brackets: ]")
def load_yaml_wrapper():
config = load_yaml(example_widget)
if config:
example_widget.config.update(config)
example_widget.import_button.clicked.connect(load_yaml_wrapper)
# Mock user selecting the file in the dialog
with patch("qtpy.QtWidgets.QFileDialog.getOpenFileName", return_value=(temp_file.name, "")):
example_widget.import_button.click()
# Catch the print output
captured = capsys.readouterr()
assert "Error parsing YAML file" in captured.out
assert example_widget.config == {} # No update should happen
os.remove(temp_file.name) # Clean up
def test_save_yaml(qtbot, example_widget):
# Create a temporary file
with tempfile.NamedTemporaryFile(delete=False, suffix=".yaml") as temp_file:
temp_file_path = temp_file.name
# Prepare data to be saved
example_widget.saved_config = {"name": "test", "value": 42}
def save_yaml_wrapper():
save_yaml(example_widget, example_widget.saved_config)
example_widget.export_button.clicked.connect(save_yaml_wrapper)
# Mock user selecting the file in the dialog
with patch("qtpy.QtWidgets.QFileDialog.getSaveFileName", return_value=(temp_file_path, "")):
example_widget.export_button.click()
# Check if the data was saved correctly
with open(temp_file_path, "r") as file:
saved_config = yaml.safe_load(file)
assert saved_config == {"name": "test", "value": 42}
os.remove(temp_file_path) # Clean up