0
0
mirror of https://github.com/bec-project/bec_widgets.git synced 2025-07-14 03:31:50 +02:00

refactor(bec_figure): BECFigure removed

This commit is contained in:
2025-03-20 20:46:57 +01:00
parent 6a70c0ff50
commit 838f9aa011
31 changed files with 146 additions and 8036 deletions

View File

@ -5,6 +5,8 @@ import random
import pytest
from bec_widgets.cli.client_utils import BECGuiClient
from bec_widgets.cli.client_utils import BECGuiClient, _start_plot_process
from bec_widgets.utils import BECDispatcher
# pylint: disable=unused-argument
# pylint: disable=redefined-outer-name
@ -23,6 +25,26 @@ def threads_check_fixture(threads_check):
@pytest.fixture
def gui_id():
return f"figure_{random.randint(0,100)}" # make a new gui id each time, to ensure no 'gui is alive' zombie key can perturbate
@contextmanager
def plot_server(gui_id, klass, client_lib):
dispatcher = BECDispatcher(client=client_lib) # Has to init singleton with fixture client
process, _ = _start_plot_process(
gui_id, klass, gui_class_id="bec", config=client_lib._client._service_config.config_path
)
try:
while client_lib._client.connector.get(MessageEndpoints.gui_heartbeat(gui_id)) is None:
time.sleep(0.3)
yield gui_id
finally:
process.terminate()
process.wait()
dispatcher.disconnect_all()
dispatcher.reset_singleton()
"""New gui id each time, to ensure no 'gui is alive' zombie key can perturbate"""
return f"figure_{random.randint(0,100)}"

View File

@ -1,242 +0,0 @@
# import time
# import numpy as np
# import pytest
# from bec_lib.endpoints import MessageEndpoints
# from bec_widgets.cli.client import BECFigure, BECImageShow, BECMotorMap, BECWaveform
# from bec_widgets.cli.rpc.rpc_base import RPCReference
# from bec_widgets.tests.utils import check_remote_data_size
# # pylint: disable=protected-access
# @pytest.fixture
# def connected_figure(connected_client_gui_obj):
# gui = connected_client_gui_obj
# dock = gui.window_list[0].new("dock")
# fig = dock.new(name="fig", widget="BECFigure")
# return fig
# def test_rpc_waveform1d_custom_curve(connected_figure):
# fig = connected_figure
# ax = fig.plot()
# curve = ax.plot(x=[1, 2, 3], y=[1, 2, 3])
# curve.set_color("red")
# curve = ax.curves[0]
# curve.set_color("blue")
# assert len(fig.widgets) == 1
# assert len(fig.widgets[ax._rpc_id].curves) == 1
# def test_rpc_plotting_shortcuts_init_configs(connected_figure, qtbot):
# fig = connected_figure
# plt = fig.plot(x_name="samx", y_name="bpm4i")
# im = fig.image("eiger")
# motor_map = fig.motor_map("samx", "samy")
# plt_z = fig.plot(x_name="samx", y_name="samy", z_name="bpm4i", new=True)
# # Checking if classes are correctly initialised
# assert len(fig.widgets) == 4
# assert plt.__class__.__name__ == "RPCReference"
# assert plt.__class__ == RPCReference
# assert plt._root._ipython_registry[plt._gui_id].__class__ == BECWaveform
# assert im.__class__.__name__ == "RPCReference"
# assert im.__class__ == RPCReference
# assert im._root._ipython_registry[im._gui_id].__class__ == BECImageShow
# assert motor_map.__class__.__name__ == "RPCReference"
# assert motor_map.__class__ == RPCReference
# assert motor_map._root._ipython_registry[motor_map._gui_id].__class__ == BECMotorMap
# # check if the correct devices are set
# # plot
# assert plt._config_dict["curves"]["bpm4i-bpm4i"]["signals"] == {
# "dap": None,
# "source": "scan_segment",
# "x": {"name": "samx", "entry": "samx", "unit": None, "modifier": None, "limits": None},
# "y": {"name": "bpm4i", "entry": "bpm4i", "unit": None, "modifier": None, "limits": None},
# "z": None,
# }
# # image
# assert im._config_dict["images"]["eiger"]["monitor"] == "eiger"
# # motor map
# assert motor_map._config_dict["signals"] == {
# "dap": None,
# "source": "device_readback",
# "x": {
# "name": "samx",
# "entry": "samx",
# "unit": None,
# "modifier": None,
# "limits": [-50.0, 50.0],
# },
# "y": {
# "name": "samy",
# "entry": "samy",
# "unit": None,
# "modifier": None,
# "limits": [-50.0, 50.0],
# },
# "z": None,
# }
# # plot with z scatter
# assert plt_z._config_dict["curves"]["bpm4i-bpm4i"]["signals"] == {
# "dap": None,
# "source": "scan_segment",
# "x": {"name": "samx", "entry": "samx", "unit": None, "modifier": None, "limits": None},
# "y": {"name": "samy", "entry": "samy", "unit": None, "modifier": None, "limits": None},
# "z": {"name": "bpm4i", "entry": "bpm4i", "unit": None, "modifier": None, "limits": None},
# }
# def test_rpc_waveform_scan(qtbot, connected_figure, bec_client_lib):
# fig = connected_figure
# # add 3 different curves to track
# plt = fig.plot(x_name="samx", y_name="bpm4i")
# fig.plot(x_name="samx", y_name="bpm3a")
# fig.plot(x_name="samx", y_name="bpm4d")
# client = bec_client_lib
# dev = client.device_manager.devices
# scans = client.scans
# queue = client.queue
# status = scans.line_scan(dev.samx, -5, 5, steps=10, exp_time=0.05, relative=False)
# status.wait()
# item = queue.scan_storage.storage[-1]
# last_scan_data = item.live_data if hasattr(item, "live_data") else item.data
# num_elements = 10
# for plot_name in ["bpm4i-bpm4i", "bpm3a-bpm3a", "bpm4d-bpm4d"]:
# qtbot.waitUntil(lambda: check_remote_data_size(plt, plot_name, num_elements))
# # get data from curves
# plt_data = plt.get_all_data()
# # check plotted data
# assert plt_data["bpm4i-bpm4i"]["x"] == last_scan_data["samx"]["samx"].val
# assert plt_data["bpm4i-bpm4i"]["y"] == last_scan_data["bpm4i"]["bpm4i"].val
# assert plt_data["bpm3a-bpm3a"]["x"] == last_scan_data["samx"]["samx"].val
# assert plt_data["bpm3a-bpm3a"]["y"] == last_scan_data["bpm3a"]["bpm3a"].val
# assert plt_data["bpm4d-bpm4d"]["x"] == last_scan_data["samx"]["samx"].val
# assert plt_data["bpm4d-bpm4d"]["y"] == last_scan_data["bpm4d"]["bpm4d"].val
# def test_rpc_image(connected_figure, bec_client_lib):
# fig = connected_figure
# im = fig.image("eiger")
# client = bec_client_lib
# dev = client.device_manager.devices
# scans = client.scans
# status = scans.line_scan(dev.samx, -5, 5, steps=10, exp_time=0.05, relative=False)
# status.wait()
# last_image_device = client.connector.get_last(MessageEndpoints.device_monitor_2d("eiger"))[
# "data"
# ].data
# last_image_plot = im.images[0].get_data()
# # check plotted data
# np.testing.assert_equal(last_image_device, last_image_plot)
# def test_rpc_motor_map(connected_figure, bec_client_lib):
# fig = connected_figure
# motor_map = fig.motor_map("samx", "samy")
# client = bec_client_lib
# dev = client.device_manager.devices
# scans = client.scans
# initial_pos_x = dev.samx.read()["samx"]["value"]
# initial_pos_y = dev.samy.read()["samy"]["value"]
# status = scans.mv(dev.samx, 1, dev.samy, 2, relative=True)
# status.wait()
# final_pos_x = dev.samx.read()["samx"]["value"]
# final_pos_y = dev.samy.read()["samy"]["value"]
# # check plotted data
# motor_map_data = motor_map.get_data()
# np.testing.assert_equal(
# [motor_map_data["x"][0], motor_map_data["y"][0]], [initial_pos_x, initial_pos_y]
# )
# np.testing.assert_equal(
# [motor_map_data["x"][-1], motor_map_data["y"][-1]], [final_pos_x, final_pos_y]
# )
# def test_dap_rpc(connected_figure, bec_client_lib, qtbot):
# fig = connected_figure
# plt = fig.plot(x_name="samx", y_name="bpm4i", dap="GaussianModel")
# client = bec_client_lib
# dev = client.device_manager.devices
# scans = client.scans
# dev.bpm4i.sim.select_model("GaussianModel")
# params = dev.bpm4i.sim.params
# params.update(
# {"noise": "uniform", "noise_multiplier": 10, "center": 5, "sigma": 1, "amplitude": 200}
# )
# dev.bpm4i.sim.params = params
# time.sleep(1)
# res = scans.line_scan(dev.samx, 0, 8, steps=50, relative=False)
# res.wait()
# # especially on slow machines, the fit might not be done yet
# # so we wait until the fit reaches the expected value
# def wait_for_fit():
# dap_curve = plt.get_curve("bpm4i-bpm4i-GaussianModel")
# fit_params = dap_curve.dap_params
# if fit_params is None:
# return False
# print(fit_params)
# return np.isclose(fit_params["center"], 5, atol=0.5)
# qtbot.waitUntil(wait_for_fit, timeout=10000)
# # Repeat fit after adding a region of interest
# plt.select_roi(region=(3, 7))
# res = scans.line_scan(dev.samx, 0, 8, steps=50, relative=False)
# res.wait()
# qtbot.waitUntil(wait_for_fit, timeout=10000)
# def test_removing_subplots(connected_figure, bec_client_lib):
# fig = connected_figure
# plt = fig.plot(x_name="samx", y_name="bpm4i", dap="GaussianModel")
# # Registry can't handle multiple subplots on one widget, BECFigure will be deprecated though
# # im = fig.image(monitor="eiger")
# # mm = fig.motor_map(motor_x="samx", motor_y="samy")
# assert len(fig.widget_list) == 1
# # removing curves
# assert len(plt.curves) == 2
# plt.curves[0].remove()
# assert len(plt.curves) == 1
# plt.remove_curve("bpm4i-bpm4i")
# assert len(plt.curves) == 0
# # removing all subplots from figure
# plt.remove()
# # im.remove()
# # mm.remove()
# assert len(fig.widget_list) == 0

View File

@ -57,26 +57,6 @@ def test_bec_dock_area_add_remove_dock(bec_dock_area, qtbot):
assert d2.name() in dict(bec_dock_area.dock_area.docks)
def test_add_remove_bec_figure_to_dock(bec_dock_area):
d0 = bec_dock_area.new()
fig = d0.new("BECFigure")
plt = fig.plot(x_name="samx", y_name="bpm4i")
im = fig.image("eiger")
mm = fig.motor_map("samx", "samy")
mw = fig.multi_waveform("waveform1d")
assert len(bec_dock_area.dock_area.docks) == 1
assert len(d0.elements) == 1
assert len(d0.element_list) == 1
assert len(fig.widgets) == 4
assert fig.config.widget_class == "BECFigure"
assert plt.config.widget_class == "BECWaveform"
assert im.config.widget_class == "BECImageShow"
assert mm.config.widget_class == "BECMotorMap"
assert mw.config.widget_class == "BECMultiWaveform"
def test_close_docks(bec_dock_area, qtbot):
d0 = bec_dock_area.new(name="dock_0")
d1 = bec_dock_area.new(name="dock_1")

View File

@ -1,275 +0,0 @@
# pylint: disable=missing-function-docstring, missing-module-docstring, unused-import
import numpy as np
import pytest
from bec_widgets.widgets.containers.figure import BECFigure
from bec_widgets.widgets.containers.figure.plots.image.image import BECImageShow
from bec_widgets.widgets.containers.figure.plots.motor_map.motor_map import BECMotorMap
from bec_widgets.widgets.containers.figure.plots.multi_waveform.multi_waveform import (
BECMultiWaveform,
)
from bec_widgets.widgets.containers.figure.plots.waveform.waveform import BECWaveform
from .client_mocks import mocked_client
from .conftest import create_widget
def test_bec_figure_init(qtbot, mocked_client):
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
assert bec_figure is not None
assert bec_figure.client is not None
assert isinstance(bec_figure, BECFigure)
assert bec_figure.config.widget_class == "BECFigure"
def test_bec_figure_init_with_config(mocked_client):
config = {"widget_class": "BECFigure", "gui_id": "test_gui_id", "theme": "dark"}
widget = BECFigure(client=mocked_client, config=config)
assert widget.config.gui_id == "test_gui_id"
assert widget.config.theme == "dark"
def test_bec_figure_add_remove_plot(qtbot, mocked_client):
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
initial_count = len(bec_figure._widgets)
# Adding 3 widgets - 2 WaveformBase and 1 PlotBase
w0 = bec_figure.plot(new=True)
w1 = bec_figure.plot(new=True)
w2 = bec_figure.add_widget(widget_type="BECPlotBase")
# Check if the widgets were added
assert len(bec_figure._widgets) == initial_count + 3
assert w0.gui_id in bec_figure._widgets
assert w1.gui_id in bec_figure._widgets
assert w2.gui_id in bec_figure._widgets
assert bec_figure._widgets[w0.gui_id].config.widget_class == "BECWaveform"
assert bec_figure._widgets[w1.gui_id].config.widget_class == "BECWaveform"
assert bec_figure._widgets[w2.gui_id].config.widget_class == "BECPlotBase"
# Check accessing positions by the grid in figure
assert bec_figure[0, 0] == w0
assert bec_figure[1, 0] == w1
assert bec_figure[2, 0] == w2
# Removing 1 widget
bec_figure.remove(widget_id=w0.gui_id)
assert len(bec_figure._widgets) == initial_count + 2
assert w0.gui_id not in bec_figure._widgets
assert w2.gui_id in bec_figure._widgets
assert bec_figure._widgets[w1.gui_id].config.widget_class == "BECWaveform"
def test_add_different_types_of_widgets(qtbot, mocked_client):
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
plt = bec_figure.plot(x_name="samx", y_name="bpm4i")
im = bec_figure.image("eiger")
motor_map = bec_figure.motor_map("samx", "samy")
multi_waveform = bec_figure.multi_waveform("waveform")
assert plt.__class__ == BECWaveform
assert im.__class__ == BECImageShow
assert motor_map.__class__ == BECMotorMap
assert multi_waveform.__class__ == BECMultiWaveform
def test_access_widgets_access_errors(qtbot, mocked_client):
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
bec_figure.plot(row=0, col=0)
# access widget by non-existent coordinates
with pytest.raises(ValueError) as excinfo:
bec_figure[0, 2]
assert "No widget at coordinates (0, 2)" in str(excinfo.value)
# access widget by non-existent widget_id
with pytest.raises(KeyError) as excinfo:
bec_figure["non_existent_widget"]
assert "Widget with id 'non_existent_widget' not found" in str(excinfo.value)
# access widget by wrong type
with pytest.raises(TypeError) as excinfo:
bec_figure[1.2]
assert (
"Key must be a string (widget id) or a tuple of two integers (grid coordinates)"
in str(excinfo.value)
)
def test_add_plot_to_occupied_position(qtbot, mocked_client):
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
bec_figure.plot(row=0, col=0)
with pytest.raises(ValueError) as excinfo:
bec_figure.plot(row=0, col=0, new=True)
assert "Position at row 0 and column 0 is already occupied." in str(excinfo.value)
def test_remove_plots(qtbot, mocked_client):
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
w1 = bec_figure.plot(row=0, col=0)
w2 = bec_figure.plot(row=0, col=1)
w3 = bec_figure.plot(row=1, col=0)
w4 = bec_figure.plot(row=1, col=1)
assert bec_figure[0, 0] == w1
assert bec_figure[0, 1] == w2
assert bec_figure[1, 0] == w3
assert bec_figure[1, 1] == w4
# remove by coordinates
bec_figure[0, 0].remove()
assert w1.gui_id not in bec_figure._widgets
# remove by widget_id
bec_figure.remove(widget_id=w2.gui_id)
assert w2.gui_id not in bec_figure._widgets
# remove by widget object
w3.remove()
assert w3.gui_id not in bec_figure._widgets
# check the remaining widget 4
assert bec_figure[0, 0] == w4
assert bec_figure[w4.gui_id] == w4
assert w4.gui_id in bec_figure._widgets
assert len(bec_figure._widgets) == 1
def test_remove_plots_by_coordinates_ints(qtbot, mocked_client):
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
w1 = bec_figure.plot(row=0, col=0)
w2 = bec_figure.plot(row=0, col=1)
bec_figure.remove(row=0, col=0)
assert w1.gui_id not in bec_figure._widgets
assert w2.gui_id in bec_figure._widgets
assert bec_figure[0, 0] == w2
assert len(bec_figure._widgets) == 1
def test_remove_plots_by_coordinates_tuple(qtbot, mocked_client):
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
w1 = bec_figure.plot(row=0, col=0)
w2 = bec_figure.plot(row=0, col=1)
bec_figure.remove(coordinates=(0, 0))
assert w1.gui_id not in bec_figure._widgets
assert w2.gui_id in bec_figure._widgets
assert bec_figure[0, 0] == w2
assert len(bec_figure._widgets) == 1
def test_remove_plot_by_id_error(qtbot, mocked_client):
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
bec_figure.plot()
with pytest.raises(ValueError) as excinfo:
bec_figure.remove(widget_id="non_existent_widget")
assert "Widget with ID 'non_existent_widget' does not exist." in str(excinfo.value)
def test_remove_plot_by_coordinates_error(qtbot, mocked_client):
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
bec_figure.plot(row=0, col=0)
with pytest.raises(ValueError) as excinfo:
bec_figure.remove(0, 1)
assert "No widget at coordinates (0, 1)" in str(excinfo.value)
def test_remove_plot_by_providing_nothing(qtbot, mocked_client):
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
bec_figure.plot(row=0, col=0)
with pytest.raises(ValueError) as excinfo:
bec_figure.remove()
assert "Must provide either widget_id or coordinates for removal." in str(excinfo.value)
# def test_change_theme(bec_figure): #TODO do no work at python 3.12
# bec_figure.change_theme("dark")
# assert bec_figure.config.theme == "dark"
# assert bec_figure.backgroundBrush().color().name() == "#000000"
# bec_figure.change_theme("light")
# assert bec_figure.config.theme == "light"
# assert bec_figure.backgroundBrush().color().name() == "#ffffff"
# bec_figure.change_theme("dark")
# assert bec_figure.config.theme == "dark"
# assert bec_figure.backgroundBrush().color().name() == "#000000"
def test_change_layout(qtbot, mocked_client):
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
w1 = bec_figure.plot(row=0, col=0)
w2 = bec_figure.plot(row=0, col=1)
w3 = bec_figure.plot(row=1, col=0)
w4 = bec_figure.plot(row=1, col=1)
bec_figure.change_layout(max_columns=1)
assert np.shape(bec_figure.grid) == (4, 1)
assert bec_figure[0, 0] == w1
assert bec_figure[1, 0] == w2
assert bec_figure[2, 0] == w3
assert bec_figure[3, 0] == w4
bec_figure.change_layout(max_rows=1)
assert np.shape(bec_figure.grid) == (1, 4)
assert bec_figure[0, 0] == w1
assert bec_figure[0, 1] == w2
assert bec_figure[0, 2] == w3
assert bec_figure[0, 3] == w4
def test_clear_all(qtbot, mocked_client):
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
bec_figure.plot(row=0, col=0)
bec_figure.plot(row=0, col=1)
bec_figure.plot(row=1, col=0)
bec_figure.plot(row=1, col=1)
bec_figure.clear_all()
assert len(bec_figure._widgets) == 0
assert np.shape(bec_figure.grid) == (0,)
def test_shortcuts(qtbot, mocked_client):
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
plt = bec_figure.plot(x_name="samx", y_name="bpm4i")
im = bec_figure.image("eiger")
motor_map = bec_figure.motor_map("samx", "samy")
assert plt.config.widget_class == "BECWaveform"
assert plt.__class__ == BECWaveform
assert im.config.widget_class == "BECImageShow"
assert im.__class__ == BECImageShow
assert motor_map.config.widget_class == "BECMotorMap"
assert motor_map.__class__ == BECMotorMap
def test_plot_access_factory(qtbot, mocked_client):
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
plt_00 = bec_figure.plot(x_name="samx", y_name="bpm4i")
plt_01 = bec_figure.plot(x_name="samx", y_name="bpm4i", row=0, col=1)
plt_10 = bec_figure.plot(new=True)
assert bec_figure.widget_list[0] == plt_00
assert bec_figure.widget_list[1] == plt_01
assert bec_figure.widget_list[2] == plt_10
assert bec_figure.axes(row=0, col=0) == plt_00
assert bec_figure.axes(row=0, col=1) == plt_01
assert bec_figure.axes(row=1, col=0) == plt_10
assert len(plt_00.curves) == 1
assert len(plt_01.curves) == 1
assert len(plt_10.curves) == 0
# update plt_00
bec_figure.plot(x_name="samx", y_name="bpm3a")
bec_figure.plot(x=[1, 2, 3], y=[1, 2, 3], row=0, col=0)
assert len(plt_00.curves) == 3

View File

@ -1,97 +0,0 @@
# pylint: disable=missing-function-docstring, missing-module-docstring, unused-import
import numpy as np
from bec_lib import messages
from bec_widgets.widgets.containers.figure import BECFigure
from .client_mocks import mocked_client
from .conftest import create_widget
def test_on_image_update(qtbot, mocked_client):
bec_image_show = create_widget(qtbot, BECFigure, client=mocked_client).image("eiger")
data = np.random.rand(100, 100)
msg = messages.DeviceMonitor2DMessage(device="eiger", data=data, metadata={"scan_id": "12345"})
bec_image_show.on_image_update(msg.content, msg.metadata)
img = bec_image_show.images[0]
assert np.array_equal(img.get_data(), data)
def test_autorange_on_image_update(qtbot, mocked_client):
bec_image_show = create_widget(qtbot, BECFigure, client=mocked_client).image("eiger")
# Check if autorange mode "mean" works, should be default
data = np.random.rand(100, 100)
msg = messages.DeviceMonitor2DMessage(device="eiger", data=data, metadata={"scan_id": "12345"})
bec_image_show.on_image_update(msg.content, msg.metadata)
img = bec_image_show.images[0]
assert np.array_equal(img.get_data(), data)
vmin = max(np.mean(data) - 2 * np.std(data), 0)
vmax = np.mean(data) + 2 * np.std(data)
assert np.isclose(img.color_bar.getLevels(), (vmin, vmax), rtol=(1e-5, 1e-5)).all()
# Test general update with autorange True, mode "max"
bec_image_show.set_autorange_mode("max")
bec_image_show.on_image_update(msg.content, msg.metadata)
img = bec_image_show.images[0]
vmin = np.min(data)
vmax = np.max(data)
assert np.array_equal(img.get_data(), data)
assert np.isclose(img.color_bar.getLevels(), (vmin, vmax), rtol=(1e-5, 1e-5)).all()
# Change the input data, and switch to autorange False, colormap levels should stay untouched
data *= 100
msg = messages.DeviceMonitor2DMessage(device="eiger", data=data, metadata={"scan_id": "12345"})
bec_image_show.set_autorange(False)
bec_image_show.on_image_update(msg.content, msg.metadata)
img = bec_image_show.images[0]
assert np.array_equal(img.get_data(), data)
assert np.isclose(img.color_bar.getLevels(), (vmin, vmax), rtol=(1e-3, 1e-3)).all()
# Reactivate autorange, should now scale the new data
bec_image_show.set_autorange(True)
bec_image_show.set_autorange_mode("mean")
bec_image_show.on_image_update(msg.content, msg.metadata)
img = bec_image_show.images[0]
vmin = max(np.mean(data) - 2 * np.std(data), 0)
vmax = np.mean(data) + 2 * np.std(data)
assert np.isclose(img.color_bar.getLevels(), (vmin, vmax), rtol=(1e-5, 1e-5)).all()
def test_on_image_update_variable_length(qtbot, mocked_client):
"""
Test the on_image_update slot with data arrays of varying lengths for 'device_monitor_1d' image type.
"""
# Create the widget and set image_type to 'device_monitor_1d'
bec_image_show = create_widget(qtbot, BECFigure, client=mocked_client).image("waveform1d", "1d")
# Generate data arrays of varying lengths
data_lengths = [10, 15, 12, 20, 5, 8, 1, 21]
data_arrays = [np.random.rand(length) for length in data_lengths]
# Simulate sending messages with these data arrays
device = "waveform1d"
for data in data_arrays:
msg = messages.DeviceMonitor1DMessage(
device=device, data=data, metadata={"scan_id": "12345"}
)
bec_image_show.on_image_update(msg.content, msg.metadata)
# After processing all data, retrieve the image and its data
img = bec_image_show.images[0]
image_buffer = img.get_data()
# The image_buffer should be a 2D array with number of rows equal to number of data arrays
# and number of columns equal to the maximum data length
expected_num_rows = len(data_arrays)
expected_num_cols = max(data_lengths)
assert image_buffer.shape == (
expected_num_rows,
expected_num_cols,
), f"Expected image buffer shape {(expected_num_rows, expected_num_cols)}, got {image_buffer.shape}"
# Check that each row in image_buffer corresponds to the padded data arrays
for i, data in enumerate(data_arrays):
padded_data = np.pad(
data, (0, expected_num_cols - len(data)), mode="constant", constant_values=0
)
assert np.array_equal(
image_buffer[i], padded_data
), f"Row {i} in image buffer does not match expected padded data"

View File

@ -1,282 +0,0 @@
import numpy as np
from bec_lib.messages import DeviceMessage
from bec_widgets.widgets.containers.figure import BECFigure
from bec_widgets.widgets.containers.figure.plots.motor_map.motor_map import MotorMapConfig
from bec_widgets.widgets.containers.figure.plots.waveform.waveform_curve import SignalData
from .client_mocks import mocked_client
from .conftest import create_widget
def test_motor_map_init(qtbot, mocked_client):
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
default_config = MotorMapConfig(widget_class="BECMotorMap")
mm = bec_figure.motor_map(config=default_config.model_dump())
default_config.gui_id = mm.gui_id
assert mm.config == default_config
def test_motor_map_change_motors(qtbot, mocked_client):
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
mm = bec_figure.motor_map("samx", "samy")
assert mm.motor_x == "samx"
assert mm.motor_y == "samy"
assert mm.config.signals.x == SignalData(name="samx", entry="samx", limits=[-10, 10])
assert mm.config.signals.y == SignalData(name="samy", entry="samy", limits=[-5, 5])
mm.change_motors("samx", "samz")
assert mm.config.signals.x == SignalData(name="samx", entry="samx", limits=[-10, 10])
assert mm.config.signals.y == SignalData(name="samz", entry="samz", limits=[-8, 8])
def test_motor_map_get_limits(qtbot, mocked_client):
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
mm = bec_figure.motor_map("samx", "samy")
expected_limits = {"samx": [-10, 10], "samy": [-5, 5]}
for motor_name, expected_limit in expected_limits.items():
actual_limit = mm._get_motor_limit(motor_name)
assert actual_limit == expected_limit
def test_motor_map_get_init_position(qtbot, mocked_client):
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
mm = bec_figure.motor_map("samx", "samy")
mm.set_precision(2)
motor_map_dev = mm.client.device_manager.devices
expected_positions = {
("samx", "samx"): motor_map_dev["samx"].read()["samx"]["value"],
("samy", "samy"): motor_map_dev["samy"].read()["samy"]["value"],
}
for (motor_name, entry), expected_position in expected_positions.items():
actual_position = mm._get_motor_init_position(motor_name, entry, 2)
assert actual_position == expected_position
def test_motor_movement_updates_position_and_database(qtbot, mocked_client):
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
mm = bec_figure.motor_map("samx", "samy")
motor_map_dev = mm.client.device_manager.devices
init_positions = {
"samx": [motor_map_dev["samx"].read()["samx"]["value"]],
"samy": [motor_map_dev["samy"].read()["samy"]["value"]],
}
mm.change_motors("samx", "samy")
assert mm.database_buffer["x"] == init_positions["samx"]
assert mm.database_buffer["y"] == init_positions["samy"]
# Simulate motor movement for 'samx' only
new_position_samx = 4.0
msg = DeviceMessage(signals={"samx": {"value": new_position_samx}}, metadata={})
mm.on_device_readback(msg.content, msg.metadata)
init_positions["samx"].append(new_position_samx)
init_positions["samy"].append(init_positions["samy"][-1])
# Verify database update for 'samx'
assert mm.database_buffer["x"] == init_positions["samx"]
# Verify 'samy' retains its last known position
assert mm.database_buffer["y"] == init_positions["samy"]
def test_scatter_plot_rendering(qtbot, mocked_client):
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
mm = bec_figure.motor_map("samx", "samy")
motor_map_dev = mm.client.device_manager.devices
init_positions = {
"samx": [motor_map_dev["samx"].read()["samx"]["value"]],
"samy": [motor_map_dev["samy"].read()["samy"]["value"]],
}
mm.change_motors("samx", "samy")
# Simulate motor movement for 'samx' only
new_position_samx = 4.0
msg = DeviceMessage(signals={"samx": {"value": new_position_samx}}, metadata={})
mm.on_device_readback(msg.content, msg.metadata)
mm._update_plot()
# Get the scatter plot item
scatter_plot_item = mm.plot_components["scatter"]
# Check the scatter plot item properties
assert len(scatter_plot_item.data) > 0, "Scatter plot data is empty"
x_data = scatter_plot_item.data["x"]
y_data = scatter_plot_item.data["y"]
assert x_data[-1] == new_position_samx, "Scatter plot X data not updated correctly"
assert (
y_data[-1] == init_positions["samy"][-1]
), "Scatter plot Y data should retain last known position"
def test_plot_visualization_consistency(qtbot, mocked_client):
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
mm = bec_figure.motor_map("samx", "samy")
mm.change_motors("samx", "samy")
# Simulate updating the plot with new data
msg = DeviceMessage(signals={"samx": {"value": 5}}, metadata={})
mm.on_device_readback(msg.content, msg.metadata)
msg = DeviceMessage(signals={"samy": {"value": 9}}, metadata={})
mm.on_device_readback(msg.content, msg.metadata)
mm._update_plot()
scatter_plot_item = mm.plot_components["scatter"]
# Check if the scatter plot reflects the new data correctly
assert (
scatter_plot_item.data["x"][-1] == 5 and scatter_plot_item.data["y"][-1] == 9
), "Plot not updated correctly with new data"
def test_change_background_value(qtbot, mocked_client):
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
mm = bec_figure.motor_map("samx", "samy")
assert mm.config.background_value == 25
assert np.all(mm.plot_components["limit_map"].image == 25.0)
mm.set_background_value(50)
qtbot.wait(200)
assert mm.config.background_value == 50
assert np.all(mm.plot_components["limit_map"].image == 50.0)
def test_motor_map_init_from_config(qtbot, mocked_client):
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
config = {
"widget_class": "BECMotorMap",
"gui_id": "mm_id",
"parent_id": bec_figure.gui_id,
"row": 0,
"col": 0,
"axis": {
"title": "Motor position: (-0.0, 0.0)",
"title_size": None,
"x_label": "Motor X (samx)",
"x_label_size": None,
"y_label": "Motor Y (samy)",
"y_label_size": None,
"legend_label_size": None,
"x_scale": "linear",
"y_scale": "linear",
"x_lim": None,
"y_lim": None,
"x_grid": True,
"y_grid": True,
"outer_axes": False,
},
"signals": {
"source": "device_readback",
"x": {
"name": "samx",
"entry": "samx",
"unit": None,
"modifier": None,
"limits": [-10.0, 10.0],
},
"y": {
"name": "samy",
"entry": "samy",
"unit": None,
"modifier": None,
"limits": [-5.0, 5.0],
},
"z": None,
"dap": None,
},
"color": (255, 255, 255, 255),
"scatter_size": 5,
"max_points": 50,
"num_dim_points": 10,
"precision": 5,
"background_value": 50,
}
mm = bec_figure.motor_map(config=config)
config["gui_id"] = mm.gui_id
assert mm._config_dict == config
def test_motor_map_set_scatter_size(qtbot, mocked_client):
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
mm = bec_figure.motor_map("samx", "samy")
assert mm.config.scatter_size == 5
assert mm.plot_components["scatter"].opts["size"] == 5
mm.set_scatter_size(10)
qtbot.wait(200)
assert mm.config.scatter_size == 10
assert mm.plot_components["scatter"].opts["size"] == 10
def test_motor_map_change_precision(qtbot, mocked_client):
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
mm = bec_figure.motor_map("samx", "samy")
assert mm.config.precision == 2
mm.set_precision(10)
assert mm.config.precision == 10
def test_motor_map_set_color(qtbot, mocked_client):
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
mm = bec_figure.motor_map("samx", "samy")
assert mm.config.color == (255, 255, 255, 255)
mm.set_color((0, 0, 0, 255))
qtbot.wait(200)
assert mm.config.color == (0, 0, 0, 255)
def test_motor_map_get_data_max_points(qtbot, mocked_client):
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
mm = bec_figure.motor_map("samx", "samy")
motor_map_dev = mm.client.device_manager.devices
init_positions = {
"samx": [motor_map_dev["samx"].read()["samx"]["value"]],
"samy": [motor_map_dev["samy"].read()["samy"]["value"]],
}
msg = DeviceMessage(signals={"samx": {"value": 5.0}}, metadata={})
mm.on_device_readback(msg.content, msg.metadata)
msg = DeviceMessage(signals={"samy": {"value": 9.0}}, metadata={})
mm.on_device_readback(msg.content, msg.metadata)
msg = DeviceMessage(signals={"samx": {"value": 6.0}}, metadata={})
mm.on_device_readback(msg.content, msg.metadata)
msg = DeviceMessage(signals={"samy": {"value": 7.0}}, metadata={})
mm.on_device_readback(msg.content, msg.metadata)
expected_x = [init_positions["samx"][-1], 5.0, 5.0, 6.0, 6.0]
expected_y = [init_positions["samy"][-1], init_positions["samy"][-1], 9.0, 9.0, 7.0]
get_data = mm.get_data()
assert mm.database_buffer["x"] == expected_x
assert mm.database_buffer["y"] == expected_y
assert get_data["x"] == expected_x
assert get_data["y"] == expected_y
mm.set_max_points(3)
qtbot.wait(200)
get_data = mm.get_data()
assert len(get_data["x"]) == 3
assert len(get_data["y"]) == 3
assert get_data["x"] == expected_x[-3:]
assert get_data["y"] == expected_y[-3:]
assert mm.database_buffer["x"] == expected_x[-3:]
assert mm.database_buffer["y"] == expected_y[-3:]

View File

@ -1,253 +0,0 @@
from unittest import mock
import numpy as np
import pytest
from bec_lib.endpoints import messages
from bec_widgets.utils import Colors
from bec_widgets.widgets.containers.figure import BECFigure
from .client_mocks import mocked_client
from .conftest import create_widget
def test_set_monitor(qtbot, mocked_client):
"""Test that setting the monitor connects the appropriate slot."""
# Create a BECFigure
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
# Add a multi_waveform plot
multi_waveform = bec_figure.multi_waveform()
multi_waveform.set_monitor("waveform1d")
assert multi_waveform.config.monitor == "waveform1d"
assert multi_waveform.connected is True
data_0 = np.random.rand(100)
msg = messages.DeviceMonitor1DMessage(
device="waveform1d", data=data_0, metadata={"scan_id": "12345"}
)
multi_waveform.on_monitor_1d_update(msg.content, msg.metadata)
data_waveform = multi_waveform.get_all_data()
print(data_waveform)
assert len(data_waveform) == 1
assert np.array_equal(data_waveform["curve_0"]["y"], data_0)
data_1 = np.random.rand(100)
msg = messages.DeviceMonitor1DMessage(
device="waveform1d", data=data_1, metadata={"scan_id": "12345"}
)
multi_waveform.on_monitor_1d_update(msg.content, msg.metadata)
data_waveform = multi_waveform.get_all_data()
assert len(data_waveform) == 2
assert np.array_equal(data_waveform["curve_0"]["y"], data_0)
assert np.array_equal(data_waveform["curve_1"]["y"], data_1)
def test_on_monitor_1d_update(qtbot, mocked_client):
"""Test that data updates add curves to the plot."""
# Create a BECFigure
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
# Add a multi_waveform plot
multi_waveform = bec_figure.multi_waveform()
multi_waveform.set_monitor("test_monitor")
# Simulate receiving data updates
test_data = np.array([1, 2, 3, 4, 5])
msg = {"data": test_data}
metadata = {"scan_id": "scan_1"}
# Call the on_monitor_1d_update method
multi_waveform.on_monitor_1d_update(msg, metadata)
# Check that a curve has been added
assert len(multi_waveform.curves) == 1
# Check that the data in the curve is correct
curve = multi_waveform.curves[-1]
x_data, y_data = curve.getData()
assert np.array_equal(y_data, test_data)
# Simulate another data update
test_data_2 = np.array([6, 7, 8, 9, 10])
msg2 = {"data": test_data_2}
metadata2 = {"scan_id": "scan_1"}
multi_waveform.on_monitor_1d_update(msg2, metadata2)
# Check that another curve has been added
assert len(multi_waveform.curves) == 2
# Check that the data in the curve is correct
curve2 = multi_waveform.curves[-1]
x_data2, y_data2 = curve2.getData()
assert np.array_equal(y_data2, test_data_2)
def test_set_curve_limit_no_flush(qtbot, mocked_client):
"""Test set_curve_limit with flush_buffer=False."""
# Create a BECFigure
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
# Add a multi_waveform plot
multi_waveform = bec_figure.multi_waveform()
multi_waveform.set_monitor("test_monitor")
# Simulate adding multiple curves
for i in range(5):
test_data = np.array([i, i + 1, i + 2])
msg = {"data": test_data}
metadata = {"scan_id": "scan_1"}
multi_waveform.on_monitor_1d_update(msg, metadata)
# Check that there are 5 curves
assert len(multi_waveform.curves) == 5
# Set curve limit to 3 with flush_buffer=False
multi_waveform.set_curve_limit(3, flush_buffer=False)
# Check that curves are hidden, but not removed
assert len(multi_waveform.curves) == 5
visible_curves = [curve for curve in multi_waveform.curves if curve.isVisible()]
assert len(visible_curves) == 3
# The first two curves should be hidden
assert not multi_waveform.curves[0].isVisible()
assert not multi_waveform.curves[1].isVisible()
assert multi_waveform.curves[2].isVisible()
assert multi_waveform.curves[3].isVisible()
assert multi_waveform.curves[4].isVisible()
def test_set_curve_limit_flush(qtbot, mocked_client):
"""Test set_curve_limit with flush_buffer=True."""
# Create a BECFigure
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
# Add a multi_waveform plot
multi_waveform = bec_figure.multi_waveform()
multi_waveform.set_monitor("test_monitor")
# Simulate adding multiple curves
for i in range(5):
test_data = np.array([i, i + 1, i + 2])
msg = {"data": test_data}
metadata = {"scan_id": "scan_1"}
multi_waveform.on_monitor_1d_update(msg, metadata)
# Check that there are 5 curves
assert len(multi_waveform.curves) == 5
# Set curve limit to 3 with flush_buffer=True
multi_waveform.set_curve_limit(3, flush_buffer=True)
# Check that only 3 curves remain
assert len(multi_waveform.curves) == 3
# The curves should be the last 3 added
x_data, y_data = multi_waveform.curves[0].getData()
assert np.array_equal(y_data, [2, 3, 4])
x_data, y_data = multi_waveform.curves[1].getData()
assert np.array_equal(y_data, [3, 4, 5])
x_data, y_data = multi_waveform.curves[2].getData()
assert np.array_equal(y_data, [4, 5, 6])
def test_set_curve_highlight(qtbot, mocked_client):
"""Test that the correct curve is highlighted."""
# Create a BECFigure
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
# Add a multi_waveform plot
multi_waveform = bec_figure.multi_waveform()
multi_waveform.set_monitor("test_monitor")
# Simulate adding multiple curves
for i in range(3):
test_data = np.array([i, i + 1, i + 2])
msg = {"data": test_data}
metadata = {"scan_id": "scan_1"}
multi_waveform.on_monitor_1d_update(msg, metadata)
# Set highlight_last_curve to False
multi_waveform.highlight_last_curve = False
multi_waveform.set_curve_highlight(1) # Highlight the second curve (index 1)
# Check that the second curve is highlighted
visible_curves = [curve for curve in multi_waveform.curves if curve.isVisible()]
# Reverse the list to match indexing in set_curve_highlight
visible_curves = list(reversed(visible_curves))
for i, curve in enumerate(visible_curves):
pen = curve.opts["pen"]
width = pen.width()
if i == 1:
# Highlighted curve should have width 5
assert width == 5
else:
assert width == 1
def test_set_opacity(qtbot, mocked_client):
"""Test that setting opacity updates the curves."""
# Create a BECFigure
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
# Add a multi_waveform plot
multi_waveform = bec_figure.multi_waveform()
multi_waveform.set_monitor("waveform1d")
# Simulate adding a curve
test_data = np.array([1, 2, 3])
msg = {"data": test_data}
metadata = {"scan_id": "scan_1"}
multi_waveform.on_monitor_1d_update(msg, metadata)
# Set opacity to 30
multi_waveform.set_opacity(30)
assert multi_waveform.config.opacity == 30
def test_set_colormap(qtbot, mocked_client):
"""Test that setting the colormap updates the curve colors."""
# Create a BECFigure
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
# Add a multi_waveform plot
multi_waveform = bec_figure.multi_waveform()
multi_waveform.set_monitor("waveform1d")
# Simulate adding multiple curves
for i in range(3):
test_data = np.array([i, i + 1, i + 2])
msg = {"data": test_data}
metadata = {"scan_id": "scan_1"}
multi_waveform.on_monitor_1d_update(msg, metadata)
# Set a new colormap
multi_waveform.set_opacity(100)
multi_waveform.set_colormap("viridis")
# Check that the colors of the curves have changed accordingly
visible_curves = [curve for curve in multi_waveform.curves if curve.isVisible()]
# Get the colors applied
colors = Colors.evenly_spaced_colors(colormap="viridis", num=len(visible_curves), format="HEX")
for i, curve in enumerate(visible_curves):
pen = curve.opts["pen"]
pen_color = pen.color().name()
expected_color = colors[i]
# Compare pen color to expected color
assert pen_color.lower() == expected_color.lower()
def test_export_to_matplotlib(qtbot, mocked_client):
"""Test that export_to_matplotlib can be called without errors."""
try:
import matplotlib
except ImportError:
pytest.skip("Matplotlib not installed")
# Create a BECFigure
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
# Add a multi_waveform plot
multi_waveform = bec_figure.multi_waveform()
multi_waveform.set_monitor("test_monitor")
# Simulate adding a curve
test_data = np.array([1, 2, 3])
msg = {"data": test_data}
metadata = {"scan_id": "scan_1"}
multi_waveform.on_monitor_1d_update(msg, metadata)
# Call export_to_matplotlib
with mock.patch("pyqtgraph.exporters.MatplotlibExporter.export") as mock_export:
multi_waveform.export_to_matplotlib()
mock_export.assert_called_once()

View File

@ -1,247 +0,0 @@
# pylint: disable=missing-function-docstring, missing-module-docstring, unused-import
from unittest import mock
import pytest
from bec_widgets.widgets.containers.figure import BECFigure
from .client_mocks import mocked_client
from .conftest import create_widget
def test_init_plot_base(qtbot, mocked_client):
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
plot_base = bec_figure.add_widget(widget_type="BECPlotBase", widget_id="test_plot")
assert plot_base is not None
assert plot_base.config.widget_class == "BECPlotBase"
assert plot_base.config.gui_id == plot_base.gui_id
def test_plot_base_axes_by_separate_methods(qtbot, mocked_client):
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
plot_base = bec_figure.add_widget(widget_type="BECPlotBase", widget_id="test_plot")
plot_base.set_title("Test Title")
plot_base.set_x_label("Test x Label")
plot_base.set_y_label("Test y Label")
plot_base.set_x_lim(1, 100)
plot_base.set_y_lim(5, 500)
plot_base.set_grid(True, True)
plot_base.set_x_scale("log")
plot_base.set_y_scale("log")
assert plot_base.plot_item.titleLabel.text == "Test Title"
assert plot_base.config.axis.title == "Test Title"
assert plot_base.plot_item.getAxis("bottom").labelText == "Test x Label"
assert plot_base.config.axis.x_label == "Test x Label"
assert plot_base.plot_item.getAxis("left").labelText == "Test y Label"
assert plot_base.config.axis.y_label == "Test y Label"
assert plot_base.config.axis.x_lim == (1, 100)
assert plot_base.config.axis.y_lim == (5, 500)
assert plot_base.plot_item.ctrl.xGridCheck.isChecked() == True
assert plot_base.plot_item.ctrl.yGridCheck.isChecked() == True
assert plot_base.plot_item.ctrl.logXCheck.isChecked() == True
assert plot_base.plot_item.ctrl.logYCheck.isChecked() == True
# Check the font size by mocking the set functions
# I struggled retrieving it from the QFont object directly
# thus I mocked the set functions to check internally the functionality
with (
mock.patch.object(plot_base.plot_item, "setLabel") as mock_set_label,
mock.patch.object(plot_base.plot_item, "setTitle") as mock_set_title,
):
plot_base.set_x_label("Test x Label", 20)
plot_base.set_y_label("Test y Label", 16)
assert mock_set_label.call_count == 2
assert plot_base.config.axis.x_label_size == 20
assert plot_base.config.axis.y_label_size == 16
col = plot_base.get_text_color()
calls = []
style = {"color": col, "font-size": "20pt"}
calls.append(mock.call("bottom", "Test x Label", **style))
style = {"color": col, "font-size": "16pt"}
calls.append(mock.call("left", "Test y Label", **style))
assert mock_set_label.call_args_list == calls
plot_base.set_title("Test Title", 16)
style = {"color": col, "size": "16pt"}
call = mock.call("Test Title", **style)
assert mock_set_title.call_args == call
def test_plot_base_axes_added_by_kwargs(qtbot, mocked_client):
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
plot_base = bec_figure.add_widget(widget_type="BECPlotBase", widget_id="test_plot")
plot_base.set(
title="Test Title",
x_label="Test x Label",
y_label="Test y Label",
x_lim=(1, 100),
y_lim=(5, 500),
x_scale="log",
y_scale="log",
)
assert plot_base.plot_item.titleLabel.text == "Test Title"
assert plot_base.config.axis.title == "Test Title"
assert plot_base.plot_item.getAxis("bottom").labelText == "Test x Label"
assert plot_base.config.axis.x_label == "Test x Label"
assert plot_base.plot_item.getAxis("left").labelText == "Test y Label"
assert plot_base.config.axis.y_label == "Test y Label"
assert plot_base.config.axis.x_lim == (1, 100)
assert plot_base.config.axis.y_lim == (5, 500)
assert plot_base.plot_item.ctrl.logXCheck.isChecked() == True
assert plot_base.plot_item.ctrl.logYCheck.isChecked() == True
def test_lock_aspect_ratio(qtbot, mocked_client):
"""
Test locking and unlocking the aspect ratio of the plot.
"""
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
plot_base = bec_figure.add_widget(widget_type="BECPlotBase", widget_id="test_plot")
# Lock the aspect ratio
plot_base.lock_aspect_ratio(True)
assert plot_base.plot_item.vb.state["aspectLocked"] == 1
# Unlock the aspect ratio
plot_base.lock_aspect_ratio(False)
assert plot_base.plot_item.vb.state["aspectLocked"] == 0
def test_set_auto_range(qtbot, mocked_client):
"""
Test enabling and disabling auto range for the plot.
"""
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
plot_base = bec_figure.add_widget(widget_type="BECPlotBase", widget_id="test_plot")
# Enable auto range for both axes
plot_base.set_auto_range(True, axis="xy")
assert plot_base.plot_item.vb.state["autoRange"] == [True, True]
# Disable auto range for x-axis
plot_base.set_auto_range(False, axis="x")
assert plot_base.plot_item.vb.state["autoRange"] == [False, True]
# Disable auto range for y-axis
plot_base.set_auto_range(False, axis="y")
assert plot_base.plot_item.vb.state["autoRange"] == [False, False]
def test_set_outer_axes(qtbot, mocked_client):
"""
Test showing and hiding the outer axes of the plot.
"""
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
plot_base = bec_figure.add_widget(widget_type="BECPlotBase", widget_id="test_plot")
# Show outer axes
plot_base.set_outer_axes(True)
assert plot_base.plot_item.getAxis("top").isVisible()
assert plot_base.plot_item.getAxis("right").isVisible()
assert plot_base.config.axis.outer_axes is True
# Hide outer axes
plot_base.set_outer_axes(False)
assert not plot_base.plot_item.getAxis("top").isVisible()
assert not plot_base.plot_item.getAxis("right").isVisible()
assert plot_base.config.axis.outer_axes is False
def test_toggle_crosshair(qtbot, mocked_client):
"""
Test toggling the crosshair on and off.
"""
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
plot_base = bec_figure.add_widget(widget_type="BECPlotBase", widget_id="test_plot")
# Toggle crosshair on
plot_base.toggle_crosshair()
assert plot_base.crosshair is not None
# Toggle crosshair off
plot_base.toggle_crosshair()
assert plot_base.crosshair is None
def test_invalid_scale_input(qtbot, mocked_client):
"""
Test setting an invalid scale for x and y axes.
"""
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
plot_base = bec_figure.add_widget(widget_type="BECPlotBase", widget_id="test_plot")
with pytest.raises(ValueError):
plot_base.set_x_scale("invalid_scale")
with pytest.raises(ValueError):
plot_base.set_y_scale("invalid_scale")
def test_set_x_lim_invalid_arguments(qtbot, mocked_client):
"""
Test passing invalid arguments to set_x_lim.
"""
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
plot_base = bec_figure.add_widget(widget_type="BECPlotBase", widget_id="test_plot")
with pytest.raises(ValueError):
plot_base.set_x_lim(1)
with pytest.raises(ValueError):
plot_base.set_x_lim((1, 2, 3))
def test_set_y_lim_invalid_arguments(qtbot, mocked_client):
"""
Test passing invalid arguments to set_y_lim.
"""
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
plot_base = bec_figure.add_widget(widget_type="BECPlotBase", widget_id="test_plot")
with pytest.raises(ValueError):
plot_base.set_y_lim(1)
with pytest.raises(ValueError):
plot_base.set_y_lim((1, 2, 3))
def test_remove_plot(qtbot, mocked_client):
"""
Test removing the plot widget from the figure.
"""
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
with mock.patch.object(bec_figure, "remove") as mock_remove:
plot_base = bec_figure.add_widget(widget_type="BECPlotBase", widget_id="test_plot")
plot_base.remove()
mock_remove.assert_called_once_with(widget_id=plot_base.gui_id)
def test_add_fps_monitor(qtbot, mocked_client):
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
plot_base = bec_figure.add_widget(widget_type="BECPlotBase", widget_id="test_plot")
plot_base.enable_fps_monitor(True)
assert plot_base.fps_monitor is not None
assert plot_base.fps_monitor.view_box is plot_base.plot_item.getViewBox()
assert plot_base.fps_monitor.timer.isActive() == True
assert plot_base.fps_monitor.timer.interval() == 1000
assert plot_base.fps_monitor.sigFpsUpdate is not None
assert plot_base.fps_monitor.sigFpsUpdate.connect is not None
def test_hook_unhook_fps_monitor(qtbot, mocked_client):
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
plot_base = bec_figure.add_widget(widget_type="BECPlotBase", widget_id="test_plot")
plot_base.enable_fps_monitor(True)
assert plot_base.fps_monitor is not None
plot_base.enable_fps_monitor(False)
assert plot_base.fps_monitor is None
plot_base.enable_fps_monitor(True)
assert plot_base.fps_monitor is not None

View File

@ -6,7 +6,7 @@ def test_client_generator_classes():
connector_cls_names = [cls.__name__ for cls in out.connector_classes]
plugins = [cls.__name__ for cls in out.plugins]
assert "BECFigure" in connector_cls_names
assert "BECWaveform" in connector_cls_names
assert "Image" in connector_cls_names
assert "Waveform" in connector_cls_names
assert "BECDockArea" in plugins
assert "BECWaveform" not in plugins
assert "NonExisting" not in plugins

View File

@ -3,7 +3,7 @@ from unittest import mock
import pytest
from bec_widgets.cli.server import _start_server
from bec_widgets.widgets.containers.figure import BECFigure
from bec_widgets.widgets.containers.dock import BECDockArea
@pytest.fixture
@ -20,9 +20,9 @@ def test_rpc_server_start_server_without_service_config(mocked_cli_server):
"""
mock_server, mock_config, _ = mocked_cli_server
_start_server("gui_id", BECFigure, config=None)
_start_server("gui_id", BECDockArea, config=None)
mock_server.assert_called_once_with(
gui_id="gui_id", config=mock_config(), gui_class=BECFigure, gui_class_id="bec"
gui_id="gui_id", config=mock_config(), gui_class=BECDockArea, gui_class_id="bec"
)
@ -39,7 +39,7 @@ def test_rpc_server_start_server_with_service_config(mocked_cli_server, config,
"""
mock_server, mock_config, _ = mocked_cli_server
config = mock_config(**call_config)
_start_server("gui_id", BECFigure, config=config)
_start_server("gui_id", BECDockArea, config=config)
mock_server.assert_called_once_with(
gui_id="gui_id", config=config, gui_class=BECFigure, gui_class_id="bec"
gui_id="gui_id", config=config, gui_class=BECDockArea, gui_class_id="bec"
)

View File

@ -1,113 +1,114 @@
import pytest
from qtpy.QtCore import QPointF
from bec_widgets.widgets.containers.figure import BECFigure
from .client_mocks import mocked_client
@pytest.fixture
def plot_widget_with_arrow_item(qtbot, mocked_client):
widget = BECFigure(client=mocked_client())
qtbot.addWidget(widget)
qtbot.waitExposed(widget)
waveform = widget.plot()
yield waveform.arrow_item, waveform.plot_item
@pytest.fixture
def plot_widget_with_tick_item(qtbot, mocked_client):
widget = BECFigure(client=mocked_client())
qtbot.addWidget(widget)
qtbot.waitExposed(widget)
waveform = widget.plot()
yield waveform.tick_item, waveform.plot_item
def test_arrow_item_add_to_plot(plot_widget_with_arrow_item):
"""Test the add_to_plot method"""
arrow_item, plot_item = plot_widget_with_arrow_item
assert arrow_item.plot_item is not None
assert arrow_item.plot_item.items == []
arrow_item.add_to_plot()
assert arrow_item.plot_item.items == [arrow_item.arrow_item]
arrow_item.remove_from_plot()
def test_arrow_item_set_position(plot_widget_with_arrow_item):
"""Test the set_position method"""
arrow_item, plot_item = plot_widget_with_arrow_item
container = []
def signal_callback(tup: tuple):
container.append(tup)
arrow_item.add_to_plot()
arrow_item.position_changed.connect(signal_callback)
arrow_item.set_position(pos=(1, 1))
point = QPointF(1.0, 1.0)
assert arrow_item.arrow_item.pos() == point
arrow_item.set_position(pos=(2, 2))
point = QPointF(2.0, 2.0)
assert arrow_item.arrow_item.pos() == point
assert container == [(1, 1), (2, 2)]
arrow_item.remove_from_plot()
def test_arrow_item_cleanup(plot_widget_with_arrow_item):
"""Test cleanup procedure"""
arrow_item, plot_item = plot_widget_with_arrow_item
arrow_item.add_to_plot()
assert arrow_item.item_on_plot is True
arrow_item.cleanup()
assert arrow_item.plot_item.items == []
assert arrow_item.item_on_plot is False
assert arrow_item.arrow_item is None
def test_tick_item_add_to_plot(plot_widget_with_tick_item):
"""Test the add_to_plot method"""
tick_item, plot_item = plot_widget_with_tick_item
assert tick_item.plot_item is not None
assert tick_item.plot_item.items == []
tick_item.add_to_plot()
assert tick_item.plot_item.layout.itemAt(2, 1) == tick_item.tick_item
assert tick_item.item_on_plot is True
new_pos = plot_item.vb.geometry().bottom()
pos = tick_item.tick.pos()
new_pos = tick_item.tick_item.mapFromParent(QPointF(pos.x(), new_pos))
assert new_pos.y() == pos.y()
tick_item.remove_from_plot()
def test_tick_item_set_position(plot_widget_with_tick_item):
"""Test the set_position method"""
tick_item, plot_item = plot_widget_with_tick_item
container = []
def signal_callback(val: float):
container.append(val)
tick_item.add_to_plot()
tick_item.position_changed.connect(signal_callback)
tick_item.set_position(pos=1)
assert tick_item._pos == 1
tick_item.set_position(pos=2)
assert tick_item._pos == 2
assert container == [1.0, 2.0]
tick_item.remove_from_plot()
def test_tick_item_cleanup(plot_widget_with_tick_item):
"""Test cleanup procedure"""
tick_item, plot_item = plot_widget_with_tick_item
tick_item.add_to_plot()
assert tick_item.item_on_plot is True
tick_item.cleanup()
ticks = getattr(tick_item.plot_item.layout.itemAt(3, 1), "ticks", None)
assert ticks == None
assert tick_item.item_on_plot is False
assert tick_item.tick_item is None
# TODO temporary disabled until migrate tick and arrow items to new system
# import pytest
# from qtpy.QtCore import QPointF
#
# from bec_widgets.widgets.containers.figure import BECFigure
#
# from .client_mocks import mocked_client
#
#
# @pytest.fixture
# def plot_widget_with_arrow_item(qtbot, mocked_client):
# widget = BECFigure(client=mocked_client())
# qtbot.addWidget(widget)
# qtbot.waitExposed(widget)
# waveform = widget.plot()
#
# yield waveform.arrow_item, waveform.plot_item
#
#
# @pytest.fixture
# def plot_widget_with_tick_item(qtbot, mocked_client):
# widget = BECFigure(client=mocked_client())
# qtbot.addWidget(widget)
# qtbot.waitExposed(widget)
# waveform = widget.plot()
#
# yield waveform.tick_item, waveform.plot_item
#
#
# def test_arrow_item_add_to_plot(plot_widget_with_arrow_item):
# """Test the add_to_plot method"""
# arrow_item, plot_item = plot_widget_with_arrow_item
# assert arrow_item.plot_item is not None
# assert arrow_item.plot_item.items == []
# arrow_item.add_to_plot()
# assert arrow_item.plot_item.items == [arrow_item.arrow_item]
# arrow_item.remove_from_plot()
#
#
# def test_arrow_item_set_position(plot_widget_with_arrow_item):
# """Test the set_position method"""
# arrow_item, plot_item = plot_widget_with_arrow_item
# container = []
#
# def signal_callback(tup: tuple):
# container.append(tup)
#
# arrow_item.add_to_plot()
# arrow_item.position_changed.connect(signal_callback)
# arrow_item.set_position(pos=(1, 1))
# point = QPointF(1.0, 1.0)
# assert arrow_item.arrow_item.pos() == point
# arrow_item.set_position(pos=(2, 2))
# point = QPointF(2.0, 2.0)
# assert arrow_item.arrow_item.pos() == point
# assert container == [(1, 1), (2, 2)]
# arrow_item.remove_from_plot()
#
#
# def test_arrow_item_cleanup(plot_widget_with_arrow_item):
# """Test cleanup procedure"""
# arrow_item, plot_item = plot_widget_with_arrow_item
# arrow_item.add_to_plot()
# assert arrow_item.item_on_plot is True
# arrow_item.cleanup()
# assert arrow_item.plot_item.items == []
# assert arrow_item.item_on_plot is False
# assert arrow_item.arrow_item is None
#
#
# def test_tick_item_add_to_plot(plot_widget_with_tick_item):
# """Test the add_to_plot method"""
# tick_item, plot_item = plot_widget_with_tick_item
# assert tick_item.plot_item is not None
# assert tick_item.plot_item.items == []
# tick_item.add_to_plot()
# assert tick_item.plot_item.layout.itemAt(2, 1) == tick_item.tick_item
# assert tick_item.item_on_plot is True
# new_pos = plot_item.vb.geometry().bottom()
# pos = tick_item.tick.pos()
# new_pos = tick_item.tick_item.mapFromParent(QPointF(pos.x(), new_pos))
# assert new_pos.y() == pos.y()
# tick_item.remove_from_plot()
#
#
# def test_tick_item_set_position(plot_widget_with_tick_item):
# """Test the set_position method"""
# tick_item, plot_item = plot_widget_with_tick_item
# container = []
#
# def signal_callback(val: float):
# container.append(val)
#
# tick_item.add_to_plot()
# tick_item.position_changed.connect(signal_callback)
#
# tick_item.set_position(pos=1)
# assert tick_item._pos == 1
# tick_item.set_position(pos=2)
# assert tick_item._pos == 2
# assert container == [1.0, 2.0]
# tick_item.remove_from_plot()
#
#
# def test_tick_item_cleanup(plot_widget_with_tick_item):
# """Test cleanup procedure"""
# tick_item, plot_item = plot_widget_with_tick_item
# tick_item.add_to_plot()
# assert tick_item.item_on_plot is True
# tick_item.cleanup()
# ticks = getattr(tick_item.plot_item.layout.itemAt(3, 1), "ticks", None)
# assert ticks == None
# assert tick_item.item_on_plot is False
# assert tick_item.tick_item is None

View File

@ -1,766 +0,0 @@
# pylint: disable=missing-function-docstring, missing-module-docstring, unused-import
from unittest import mock
import numpy as np
import pytest
from bec_lib.scan_items import ScanItem
from bec_widgets.widgets.containers.figure import BECFigure
from bec_widgets.widgets.containers.figure.plots.waveform.waveform_curve import (
CurveConfig,
Signal,
SignalData,
)
from .client_mocks import mocked_client
from .conftest import create_widget
def test_adding_curve_to_waveform(qtbot, mocked_client):
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
w1 = bec_figure.plot()
# adding curve which is in bec - only names
c1 = w1.add_curve_bec(x_name="samx", y_name="bpm4i")
assert c1.config.label == "bpm4i-bpm4i"
# adding curve which is in bec - names and entry
c2 = w1.add_curve_bec(x_name="samx", x_entry="samx", y_name="bpm3a", y_entry="bpm3a")
assert c2.config.label == "bpm3a-bpm3a"
# adding curve which is not in bec
with pytest.raises(ValueError) as excinfo:
w1.add_curve_bec(x_name="non_existent_device", y_name="non_existent_device")
assert "Device 'non_existent_device' not found in current BEC session" in str(excinfo.value)
# adding wrong entry for samx
with pytest.raises(ValueError) as excinfo:
w1.add_curve_bec(
x_name="samx", x_entry="non_existent_entry", y_name="bpm3a", y_entry="bpm3a"
)
assert "Entry 'non_existent_entry' not found in device 'samx' signals" in str(excinfo.value)
# adding wrong device with validation switched off
c3 = w1.add_curve_bec(x_name="samx", y_name="non_existent_device", validate_bec=False)
assert c3.config.label == "non_existent_device-non_existent_device"
def test_adding_curve_with_same_id(qtbot, mocked_client):
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
w1 = bec_figure.plot()
c1 = w1.add_curve_bec(x_name="samx", y_name="bpm4i", gui_id="test_curve")
with pytest.raises(ValueError) as excinfo:
w1.add_curve_bec(x_name="samx", y_name="bpm4i", gui_id="test_curve")
assert "Curve with ID 'test_curve' already exists." in str(excinfo.value)
def test_create_waveform1D_by_config(qtbot, mocked_client):
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
w1_config_input = {
"widget_class": "BECWaveform",
"gui_id": "widget_1",
"parent_id": "BECFigure_1708689320.788527",
"row": 0,
"col": 0,
"axis": {
"title": "Widget 1",
"title_size": None,
"x_label": None,
"x_label_size": None,
"y_label": None,
"y_label_size": None,
"legend_label_size": None,
"x_scale": "linear",
"y_scale": "linear",
"x_lim": (1, 10),
"y_lim": None,
"x_grid": False,
"y_grid": False,
"outer_axes": False,
},
"color_palette": "magma",
"curves": {
"bpm4i-bpm4i": {
"widget_class": "BECCurve",
"gui_id": "BECCurve_1708689321.226847",
"parent_id": "widget_1",
"label": "bpm4i-bpm4i",
"color": "#cc4778",
"color_map_z": "magma",
"symbol": "o",
"symbol_color": None,
"symbol_size": 7,
"pen_width": 4,
"pen_style": "dash",
"source": "scan_segment",
"signals": {
"dap": None,
"source": "scan_segment",
"x": {
"name": "samx",
"entry": "samx",
"unit": None,
"modifier": None,
"limits": None,
},
"y": {
"name": "bpm4i",
"entry": "bpm4i",
"unit": None,
"modifier": None,
"limits": None,
},
"z": None,
},
},
"curve-custom": {
"widget_class": "BECCurve",
"gui_id": "BECCurve_1708689321.22867",
"parent_id": "widget_1",
"label": "curve-custom",
"color": "blue",
"color_map_z": "magma",
"symbol": "o",
"symbol_color": None,
"symbol_size": 7,
"pen_width": 5,
"pen_style": "dashdot",
"source": "custom",
"signals": None,
},
},
}
w1 = bec_figure.plot(config=w1_config_input)
w1_config_output = w1.get_config()
w1_config_input["gui_id"] = w1.gui_id
assert w1_config_input == w1_config_output
assert w1.plot_item.titleLabel.text == "Widget 1"
assert w1.config.axis.title == "Widget 1"
def test_change_gui_id(qtbot, mocked_client):
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
w1 = bec_figure.plot()
c1 = w1.add_curve_bec(x_name="samx", y_name="bpm4i")
w1.change_gui_id("new_id")
assert w1.config.gui_id == "new_id"
assert c1.config.parent_id == "new_id"
def test_getting_curve(qtbot, mocked_client):
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
w1 = bec_figure.plot()
c1 = w1.add_curve_bec(x_name="samx", y_name="bpm4i", gui_id="test_curve")
c1_expected_config_dark = CurveConfig(
widget_class="BECCurve",
gui_id="test_curve",
parent_id=w1.gui_id,
label="bpm4i-bpm4i",
color="#3b0f70",
symbol="o",
symbol_color=None,
symbol_size=7,
pen_width=4,
pen_style="solid",
source="scan_segment",
signals=Signal(
source="scan_segment",
x=SignalData(name="samx", entry="samx", unit=None, modifier=None),
y=SignalData(name="bpm4i", entry="bpm4i", unit=None, modifier=None),
),
)
c1_expected_config_light = CurveConfig(
widget_class="BECCurve",
gui_id="test_curve",
parent_id=w1.gui_id,
label="bpm4i-bpm4i",
color="#000004",
symbol="o",
symbol_color=None,
symbol_size=7,
pen_width=4,
pen_style="solid",
source="scan_segment",
signals=Signal(
source="scan_segment",
x=SignalData(name="samx", entry="samx", unit=None, modifier=None),
y=SignalData(name="bpm4i", entry="bpm4i", unit=None, modifier=None),
),
)
assert (
w1.curves[0].config == c1_expected_config_dark
or w1.curves[0].config == c1_expected_config_light
)
assert (
w1._curves_data["scan_segment"]["bpm4i-bpm4i"].config == c1_expected_config_dark
or w1._curves_data["scan_segment"]["bpm4i-bpm4i"].config == c1_expected_config_light
)
assert (
w1.get_curve(0).config == c1_expected_config_dark
or w1.get_curve(0).config == c1_expected_config_light
)
assert (
w1.get_curve_config("bpm4i-bpm4i", dict_output=True) == c1_expected_config_dark.model_dump()
or w1.get_curve_config("bpm4i-bpm4i", dict_output=True)
== c1_expected_config_light.model_dump()
)
assert (
w1.get_curve_config("bpm4i-bpm4i", dict_output=False) == c1_expected_config_dark
or w1.get_curve_config("bpm4i-bpm4i", dict_output=False) == c1_expected_config_light
)
assert (
w1.get_curve("bpm4i-bpm4i").config == c1_expected_config_dark
or w1.get_curve("bpm4i-bpm4i").config == c1_expected_config_light
)
assert (
c1.get_config(False) == c1_expected_config_dark
or c1.get_config(False) == c1_expected_config_light
)
assert (
c1.get_config() == c1_expected_config_dark.model_dump()
or c1.get_config() == c1_expected_config_light.model_dump()
)
def test_getting_curve_errors(qtbot, mocked_client):
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
w1 = bec_figure.plot()
c1 = w1.add_curve_bec(x_name="samx", y_name="bpm4i", gui_id="test_curve")
with pytest.raises(ValueError) as excinfo:
w1.get_curve("non_existent_curve")
assert "Curve with ID 'non_existent_curve' not found." in str(excinfo.value)
with pytest.raises(IndexError) as excinfo:
w1.get_curve(1)
assert "list index out of range" in str(excinfo.value)
with pytest.raises(ValueError) as excinfo:
w1.get_curve(1.2)
assert "Identifier must be either an integer (index) or a string (curve_id)." in str(
excinfo.value
)
def test_add_curve(qtbot, mocked_client):
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
w1 = bec_figure.plot()
c1 = w1.add_curve_bec(x_name="samx", y_name="bpm4i")
assert len(w1.curves) == 1
assert w1._curves_data["scan_segment"] == {"bpm4i-bpm4i": c1}
assert c1.config.label == "bpm4i-bpm4i"
assert c1.config.source == "scan_segment"
def test_change_legend_font_size(qtbot, mocked_client):
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
plot = bec_figure.plot()
w1 = plot.add_curve_bec(x_name="samx", y_name="bpm4i")
my_func = plot.plot_item.legend
with mock.patch.object(my_func, "setScale") as mock_set_scale:
plot.set_legend_label_size(18)
assert plot.config.axis.legend_label_size == 18
assert mock_set_scale.call_count == 1
assert mock_set_scale.call_args == mock.call(2)
def test_remove_curve(qtbot, mocked_client):
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
w1 = bec_figure.plot()
w1.add_curve_bec(x_name="samx", y_name="bpm4i")
w1.add_curve_bec(x_name="samx", y_name="bpm3a")
w1.remove_curve(0)
w1.remove_curve("bpm3a-bpm3a")
assert len(w1.plot_item.curves) == 0
assert w1._curves_data["scan_segment"] == {}
with pytest.raises(ValueError) as excinfo:
w1.remove_curve(1.2)
assert "Each identifier must be either an integer (index) or a string (curve_id)." in str(
excinfo.value
)
def test_change_curve_appearance_methods(qtbot, mocked_client):
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
w1 = bec_figure.plot()
c1 = w1.add_curve_bec(x_name="samx", y_name="bpm4i")
c1.set_color("#0000ff")
c1.set_symbol("x")
c1.set_symbol_color("#ff0000")
c1.set_symbol_size(10)
c1.set_pen_width(3)
c1.set_pen_style("dashdot")
qtbot.wait(500)
assert c1.config.color == "#0000ff"
assert c1.config.symbol == "x"
assert c1.config.symbol_color == "#ff0000"
assert c1.config.symbol_size == 10
assert c1.config.pen_width == 3
assert c1.config.pen_style == "dashdot"
assert c1.config.source == "scan_segment"
assert c1.config.signals.model_dump() == {
"dap": None,
"source": "scan_segment",
"x": {"name": "samx", "entry": "samx", "unit": None, "modifier": None, "limits": None},
"y": {"name": "bpm4i", "entry": "bpm4i", "unit": None, "modifier": None, "limits": None},
"z": None,
}
def test_change_curve_appearance_args(qtbot, mocked_client):
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
w1 = bec_figure.plot()
c1 = w1.add_curve_bec(x_name="samx", y_name="bpm4i")
c1.set(
color="#0000ff",
symbol="x",
symbol_color="#ff0000",
symbol_size=10,
pen_width=3,
pen_style="dashdot",
)
assert c1.config.color == "#0000ff"
assert c1.config.symbol == "x"
assert c1.config.symbol_color == "#ff0000"
assert c1.config.symbol_size == 10
assert c1.config.pen_width == 3
assert c1.config.pen_style == "dashdot"
assert c1.config.source == "scan_segment"
assert c1.config.signals.model_dump() == {
"dap": None,
"source": "scan_segment",
"x": {"name": "samx", "entry": "samx", "unit": None, "modifier": None, "limits": None},
"y": {"name": "bpm4i", "entry": "bpm4i", "unit": None, "modifier": None, "limits": None},
"z": None,
}
def test_set_custom_curve_data(qtbot, mocked_client):
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
w1 = bec_figure.plot()
c1 = w1.add_curve_custom(
x=[1, 2, 3],
y=[4, 5, 6],
label="custom_curve",
color="#0000ff",
symbol="x",
symbol_color="#ff0000",
symbol_size=10,
pen_width=3,
pen_style="dashdot",
)
x_init, y_init = c1.get_data()
assert np.array_equal(x_init, [1, 2, 3])
assert np.array_equal(y_init, [4, 5, 6])
assert c1.config.label == "custom_curve"
assert c1.config.color == "#0000ff"
assert c1.config.symbol == "x"
assert c1.config.symbol_color == "#ff0000"
assert c1.config.symbol_size == 10
assert c1.config.pen_width == 3
assert c1.config.pen_style == "dashdot"
assert c1.config.source == "custom"
assert c1.config.signals == None
c1.set_data(x=[4, 5, 6], y=[7, 8, 9])
x_new, y_new = c1.get_data()
assert np.array_equal(x_new, [4, 5, 6])
assert np.array_equal(y_new, [7, 8, 9])
def test_custom_data_2D_array(qtbot, mocked_client):
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
data = np.random.rand(10, 2)
plt = bec_figure.plot(data)
x, y = plt.curves[0].get_data()
assert np.array_equal(x, data[:, 0])
assert np.array_equal(y, data[:, 1])
def test_get_all_data(qtbot, mocked_client):
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
w1 = bec_figure.plot()
c1 = w1.add_curve_custom(
x=[1, 2, 3],
y=[4, 5, 6],
label="custom_curve-1",
color="#0000ff",
symbol="x",
symbol_color="#ff0000",
symbol_size=10,
pen_width=3,
pen_style="dashdot",
)
c2 = w1.add_curve_custom(
x=[4, 5, 6],
y=[7, 8, 9],
label="custom_curve-2",
color="#00ff00",
symbol="o",
symbol_color="#00ff00",
symbol_size=20,
pen_width=4,
pen_style="dash",
)
all_data = w1.get_all_data()
assert all_data == {
"custom_curve-1": {"x": [1, 2, 3], "y": [4, 5, 6]},
"custom_curve-2": {"x": [4, 5, 6], "y": [7, 8, 9]},
}
def test_curve_add_by_config(qtbot, mocked_client):
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
w1 = bec_figure.plot()
c1_config_input = {
"widget_class": "BECCurve",
"gui_id": "BECCurve_1708689321.226847",
"parent_id": "widget_1",
"label": "bpm4i-bpm4i",
"color": "#cc4778",
"color_map_z": "magma",
"symbol": "o",
"symbol_color": None,
"symbol_size": 7,
"pen_width": 4,
"pen_style": "dash",
"source": "scan_segment",
"signals": {
"dap": None,
"source": "scan_segment",
"x": {"name": "samx", "entry": "samx", "unit": None, "modifier": None, "limits": None},
"y": {
"name": "bpm4i",
"entry": "bpm4i",
"unit": None,
"modifier": None,
"limits": None,
},
"z": None,
},
}
c1 = w1.add_curve_by_config(c1_config_input)
c1_config_dict = c1.get_config()
assert c1_config_dict == c1_config_input
assert c1.config == CurveConfig(**c1_config_input)
assert c1.get_config(False) == CurveConfig(**c1_config_input)
def test_scan_update(qtbot, mocked_client):
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
w1 = bec_figure.plot()
c1 = w1.add_curve_bec(x_name="samx", y_name="bpm4i")
msg_waveform = {
"data": {
"samx": {"samx": {"value": 10}},
"bpm4i": {"bpm4i": {"value": 5}},
"gauss_bpm": {"gauss_bpm": {"value": 6}},
"gauss_adc1": {"gauss_adc1": {"value": 8}},
"gauss_adc2": {"gauss_adc2": {"value": 9}},
},
"scan_id": 1,
}
# Mock scan_storage.find_scan_by_ID
mock_scan_data_waveform = mock.MagicMock(spec=ScanItem)
mock_scan_data_waveform.live_data = {
device_name: {
entry: mock.MagicMock(val=[msg_waveform["data"][device_name][entry]["value"]])
for entry in msg_waveform["data"][device_name]
}
for device_name in msg_waveform["data"]
}
metadata_waveform = {"scan_name": "line_scan"}
w1.queue.scan_storage.find_scan_by_ID.return_value = mock_scan_data_waveform
w1.on_scan_segment(msg_waveform, metadata_waveform)
qtbot.wait(500)
assert c1.get_data() == ([10], [5])
def test_scan_history_with_val_access(qtbot, mocked_client):
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
w1 = bec_figure.plot()
w1.plot(x_name="samx", y_name="bpm4i")
mock_scan_data = {
"samx": {"samx": mock.MagicMock(val=np.array([1, 2, 3]))}, # Use mock.MagicMock for .val
"bpm4i": {"bpm4i": mock.MagicMock(val=np.array([4, 5, 6]))}, # Use mock.MagicMock for .val
}
mock_scan_storage = mock.MagicMock()
scan_item_mock = mock.MagicMock(spec=ScanItem)
scan_item_mock.data = mock_scan_data
mock_scan_storage.find_scan_by_ID.return_value = scan_item_mock
w1.queue.scan_storage = mock_scan_storage
fake_scan_id = "fake_scan_id"
w1.scan_history(scan_id=fake_scan_id)
qtbot.wait(500)
x_data, y_data = w1.curves[0].get_data()
assert np.array_equal(x_data, [1, 2, 3])
assert np.array_equal(y_data, [4, 5, 6])
def test_scatter_2d_update(qtbot, mocked_client):
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
w1 = bec_figure.plot()
c1 = w1.add_curve_bec(x_name="samx", y_name="samx", z_name="bpm4i")
msg = {
"data": {
"samx": {"samx": {"value": [1, 2, 3]}},
"samy": {"samy": {"value": [4, 5, 6]}},
"bpm4i": {"bpm4i": {"value": [1, 3, 2]}},
},
"scan_id": 1,
}
msg_metadata = {"scan_name": "line_scan"}
mock_scan_item = mock.MagicMock(spec=ScanItem)
mock_scan_item.live_data = {
device_name: {
entry: mock.MagicMock(val=msg["data"][device_name][entry]["value"])
for entry in msg["data"][device_name]
}
for device_name in msg["data"]
}
w1.queue.scan_storage.find_scan_by_ID.return_value = mock_scan_item
w1.on_scan_segment(msg, msg_metadata)
qtbot.wait(500)
data = c1.get_data()
expected_x_y_data = ([1, 2, 3], [1, 2, 3])
expected_z_colors = w1._make_z_gradient([1, 3, 2], "magma")
scatter_points = c1.scatter.points()
colors = [point.brush().color() for point in scatter_points]
assert np.array_equal(data, expected_x_y_data)
assert colors == expected_z_colors
def test_waveform_single_arg_inputs(qtbot, mocked_client):
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
w1 = bec_figure.plot()
w1.plot("bpm4i")
w1.plot([1, 2, 3], label="just_y")
w1.plot([3, 4, 5], [7, 8, 9], label="x_y")
w1.plot(x=[1, 2, 3], y=[4, 5, 6], label="x_y_kwargs")
data_array_1D = np.random.rand(10)
data_array_2D = np.random.rand(10, 2)
w1.plot(data_array_1D, label="np_ndarray 1D")
w1.plot(data_array_2D, label="np_ndarray 2D")
qtbot.wait(200)
assert w1._curves_data["scan_segment"]["bpm4i-bpm4i"].config.label == "bpm4i-bpm4i"
assert w1._curves_data["custom"]["just_y"].config.label == "just_y"
assert w1._curves_data["custom"]["x_y"].config.label == "x_y"
assert w1._curves_data["custom"]["x_y_kwargs"].config.label == "x_y_kwargs"
assert np.array_equal(w1._curves_data["custom"]["just_y"].get_data(), ([0, 1, 2], [1, 2, 3]))
assert np.array_equal(w1._curves_data["custom"]["just_y"].get_data(), ([0, 1, 2], [1, 2, 3]))
assert np.array_equal(w1._curves_data["custom"]["x_y"].get_data(), ([3, 4, 5], [7, 8, 9]))
assert np.array_equal(
w1._curves_data["custom"]["x_y_kwargs"].get_data(), ([1, 2, 3], [4, 5, 6])
)
assert np.array_equal(
w1._curves_data["custom"]["np_ndarray 1D"].get_data(),
(np.arange(data_array_1D.size), data_array_1D.T),
)
assert np.array_equal(w1._curves_data["custom"]["np_ndarray 2D"].get_data(), data_array_2D.T)
def test_waveform_set_x_sync(qtbot, mocked_client):
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
w1 = bec_figure.plot()
custom_label = "custom_label"
w1.plot("bpm4i")
w1.set_x_label(custom_label)
scan_item_mock = mock.MagicMock(spec=ScanItem)
mock_data = {
"samx": {"samx": mock.MagicMock(val=np.array([1, 2, 3]))},
"samy": {"samy": mock.MagicMock(val=np.array([4, 5, 6]))},
"bpm4i": {
"bpm4i": mock.MagicMock(
val=np.array([7, 8, 9]),
timestamps=np.array([1720520189.959115, 1720520189.986618, 1720520190.0157812]),
)
},
}
scan_item_mock.live_data = mock_data
scan_item_mock.status_message = mock.MagicMock()
scan_item_mock.status_message.info = {"scan_report_devices": ["samx"]}
w1.queue.scan_storage.find_scan_by_ID.return_value = scan_item_mock
w1.on_scan_segment({"scan_id": 1}, {})
qtbot.wait(200)
# Best effort - samx
x_data, y_data = w1.curves[0].get_data()
assert np.array_equal(x_data, [1, 2, 3])
assert np.array_equal(y_data, [7, 8, 9])
assert w1.plot_item.getAxis("bottom").labelText == custom_label + " [auto: samx-samx]"
# Change to samy
w1.set_x("samy")
qtbot.wait(200)
x_data, y_data = w1.curves[0].get_data()
assert np.array_equal(x_data, [4, 5, 6])
assert np.array_equal(y_data, [7, 8, 9])
assert w1.plot_item.getAxis("bottom").labelText == custom_label + " [samy-samy]"
# change to index
w1.set_x("index")
qtbot.wait(200)
x_data, y_data = w1.curves[0].get_data()
assert np.array_equal(x_data, [0, 1, 2])
assert np.array_equal(y_data, [7, 8, 9])
assert w1.plot_item.getAxis("bottom").labelText == custom_label + " [index]"
# change to timestamp
w1.set_x("timestamp")
qtbot.wait(200)
x_data, y_data = w1.curves[0].get_data()
assert np.allclose(x_data, np.array([1.72052019e09, 1.72052019e09, 1.72052019e09]))
assert np.array_equal(y_data, [7, 8, 9])
assert w1.plot_item.getAxis("bottom").labelText == custom_label + " [timestamp]"
def test_waveform_async_data_update(qtbot, mocked_client):
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
w1 = bec_figure.plot("async_device")
custom_label = "custom_label"
w1.set_x_label(custom_label)
# scan_item_mock = mock.MagicMock()
# mock_data = {
# "async_device": {
# "async_device": mock.MagicMock(
# val=np.array([7, 8, 9]),
# timestamps=np.array([1720520189.959115, 1720520189.986618, 1720520190.0157812]),
# )
# }
# }
#
# scan_item_mock.async_data = mock_data
# w1.queue.scan_storage.find_scan_by_ID.return_value = scan_item_mock
msg_1 = {"signals": {"async_device": {"value": [7, 8, 9]}}}
metadata_1 = {"async_update": {"max_shape": [None], "type": "add"}}
w1.on_async_readback(msg_1, metadata_1)
qtbot.wait(200)
x_data, y_data = w1.curves[0].get_data()
assert np.array_equal(x_data, [0, 1, 2])
assert np.array_equal(y_data, [7, 8, 9])
assert w1.plot_item.getAxis("bottom").labelText == custom_label + " [best_effort]"
msg_2 = {"signals": {"async_device": {"value": [10, 11, 12]}}}
w1.on_async_readback(msg_2, metadata_1)
qtbot.wait(200)
x_data, y_data = w1.curves[0].get_data()
assert np.array_equal(x_data, [0, 1, 2, 3, 4, 5])
assert np.array_equal(y_data, [7, 8, 9, 10, 11, 12])
assert w1.plot_item.getAxis("bottom").labelText == custom_label + " [best_effort]"
msg_3 = {"signals": {"async_device": {"value": [20, 21, 22]}}}
metadata_3 = {"async_update": {"max_shape": [None], "type": "replace"}}
w1.on_async_readback(msg_3, metadata_3)
qtbot.wait(200)
x_data, y_data = w1.curves[0].get_data()
assert np.array_equal(x_data, [0, 1, 2])
assert np.array_equal(y_data, [20, 21, 22])
assert w1.plot_item.getAxis("bottom").labelText == custom_label + " [best_effort]"
def test_waveform_set_x_async(qtbot, mocked_client):
bec_figure = create_widget(qtbot, BECFigure, client=mocked_client)
w1 = bec_figure.plot("async_device")
custom_label = "custom_label"
w1.set_x_label(custom_label)
scan_item_mock = mock.MagicMock()
mock_data = {
"async_device": {
"async_device": {
"value": np.array([7, 8, 9]),
"timestamp": np.array([1720520189.959115, 1720520189.986618, 1720520190.0157812]),
}
}
}
scan_item_mock.async_data = mock_data
w1.queue.scan_storage.find_scan_by_ID.return_value = scan_item_mock
w1.on_scan_status({"scan_id": 1})
w1.replot_async_curve()
qtbot.wait(200)
x_data, y_data = w1.curves[0].get_data()
assert np.array_equal(x_data, [0, 1, 2])
assert np.array_equal(y_data, [7, 8, 9])
assert w1.plot_item.getAxis("bottom").labelText == custom_label + " [best_effort]"
w1.set_x("timestamp")
qtbot.wait(200)
x_data, y_data = w1.curves[0].get_data()
assert np.allclose(x_data, np.array([1.72052019e09, 1.72052019e09, 1.72052019e09]))
assert np.array_equal(y_data, [7, 8, 9])
assert w1.plot_item.getAxis("bottom").labelText == custom_label + " [timestamp]"
w1.set_x("index")
qtbot.wait(200)
x_data, y_data = w1.curves[0].get_data()
assert np.array_equal(x_data, [0, 1, 2])
assert np.array_equal(y_data, [7, 8, 9])
assert w1.plot_item.getAxis("bottom").labelText == custom_label + " [index]"